1use hashbrown::HashMap;
2use rayon::ThreadPool;
3use std::{
4 collections::VecDeque,
5 sync::{Arc, Mutex},
6 time::Instant,
7};
8use tracing::{error, warn};
9
10#[derive(Clone)]
48pub struct SlowJobPool {
49 internal: Arc<Mutex<InternalSlowJobPool>>,
50}
51
52#[derive(Debug)]
53pub struct SlowJob {
54 name: String,
55 id: u64,
56}
57
58type JobType = Box<dyn FnOnce() + Send + Sync + 'static>;
59
60struct InternalSlowJobPool {
61 next_id: u64,
62 queue: HashMap<String, VecDeque<Queue>>,
63 configs: HashMap<String, Config>,
64 last_spawned_configs: Vec<String>,
65 global_spawned_and_running: u64,
66 global_limit: u64,
67 jobs_metrics_cnt: usize,
68 jobs_metrics: HashMap<String, Vec<JobMetrics>>,
69 threadpool: Arc<ThreadPool>,
70 internal: Option<Arc<Mutex<Self>>>,
71}
72
73#[derive(Debug)]
74struct Config {
75 local_limit: u64,
76 local_spawned_and_running: u64,
77}
78
79struct Queue {
80 id: u64,
81 name: String,
82 task: JobType,
83}
84
85pub struct JobMetrics {
86 pub queue_created: Instant,
87 pub execution_start: Instant,
88 pub execution_end: Instant,
89}
90
91impl Queue {
92 fn new<F>(name: &str, id: u64, internal: &Arc<Mutex<InternalSlowJobPool>>, f: F) -> Self
93 where
94 F: FnOnce() + Send + Sync + 'static,
95 {
96 let internal = Arc::clone(internal);
97 let name_cloned = name.to_owned();
98 let queue_created = Instant::now();
99 Self {
100 id,
101 name: name.to_owned(),
102 task: Box::new(move || {
103 common_base::prof_span_alloc!(_guard, &name_cloned);
104 let execution_start = Instant::now();
105 f();
106 let execution_end = Instant::now();
107 let metrics = JobMetrics {
108 queue_created,
109 execution_start,
110 execution_end,
111 };
112 {
114 let mut lock = internal.lock().expect("slowjob lock poisoned");
115 lock.finish(&name_cloned, metrics);
116 lock.spawn_queued();
117 }
118 }),
119 }
120 }
121}
122
123impl InternalSlowJobPool {
124 pub fn new(
125 global_limit: u64,
126 jobs_metrics_cnt: usize,
127 _threadpool: Arc<ThreadPool>,
128 ) -> Arc<Mutex<Self>> {
129 let threadpool = Arc::new(
132 rayon::ThreadPoolBuilder::new()
133 .num_threads(global_limit as usize)
134 .thread_name(move |i| format!("slowjob-{}", i))
135 .spawn_handler(|thread| {
136 let mut b = std::thread::Builder::new();
137 if let Some(name) = thread.name() {
138 b = b.name(name.to_owned());
139 }
140 if let Some(stack_size) = thread.stack_size() {
141 b = b.stack_size(stack_size);
142 }
143 b.spawn(|| {
144 use thread_priority::*;
145 let priority =
146 ThreadPriority::Crossplatform(TryFrom::try_from(15).unwrap());
147 if let Err(err) = cfg_select! {
148 target_os = "linux" => std::thread::current().set_priority_and_policy(
149 ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
150 priority,
151 ),
152 _ => std::thread::current().set_priority(priority),
153 } {
154 tracing::warn!(
155 "Unable to set priority/schedule policy for slow job pool thread: \
156 {err}"
157 );
158 }
159 thread.run()
160 })?;
161 Ok(())
162 })
163 .build()
164 .unwrap(),
165 );
166 let link = Arc::new(Mutex::new(Self {
167 next_id: 0,
168 queue: HashMap::new(),
169 configs: HashMap::new(),
170 last_spawned_configs: Vec::new(),
171 global_spawned_and_running: 0,
172 global_limit: global_limit.max(1),
173 jobs_metrics_cnt,
174 jobs_metrics: HashMap::new(),
175 threadpool,
176 internal: None,
177 }));
178
179 let link_clone = Arc::clone(&link);
180 link.lock()
181 .expect("poisoned on InternalSlowJobPool::new")
182 .internal = Some(link_clone);
183 link
184 }
185
186 fn calc_queued_order(
188 &self,
189 mut queued: HashMap<&String, u64>,
190 mut limit: usize,
191 ) -> Vec<String> {
192 let mut roundrobin = self.last_spawned_configs.clone();
193 let mut result = vec![];
194 let spawned = self
195 .configs
196 .iter()
197 .map(|(n, c)| (n, c.local_spawned_and_running))
198 .collect::<HashMap<_, u64>>();
199 let mut queried_capped = self
200 .configs
201 .iter()
202 .map(|(n, c)| {
203 (
204 n,
205 queued
206 .get(&n)
207 .cloned()
208 .unwrap_or(0)
209 .min(c.local_limit - c.local_spawned_and_running),
210 )
211 })
212 .collect::<HashMap<_, _>>();
213 for n in roundrobin.clone().into_iter() {
215 if let Some(c) = queued.get_mut(&n)
216 && *c > 0
217 && spawned.get(&n).cloned().unwrap_or(0) == 0
218 {
219 result.push(n.clone());
220 *c -= 1;
221 limit -= 1;
222 queried_capped.get_mut(&n).map(|v| *v -= 1);
223 roundrobin
224 .iter()
225 .position(|e| e == &n)
226 .map(|i| roundrobin.remove(i));
227 roundrobin.push(n);
228 if limit == 0 {
229 return result;
230 }
231 }
232 }
233 let total_limit = queried_capped.values().sum::<u64>() as f32;
235 if total_limit < f32::EPSILON {
236 return result;
237 }
238 let mut spawn_rates = queried_capped
239 .iter()
240 .map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32)))
241 .collect::<Vec<_>>();
242 while limit > 0 {
243 spawn_rates.sort_by(|(_, a), (_, b)| {
244 if b < a {
245 core::cmp::Ordering::Less
246 } else if (b - a).abs() < f32::EPSILON {
247 core::cmp::Ordering::Equal
248 } else {
249 core::cmp::Ordering::Greater
250 }
251 });
252 match spawn_rates.first_mut() {
253 Some((n, r)) => {
254 if *r > f32::EPSILON {
255 result.push(n.clone());
256 limit -= 1;
257 *r -= 1.0;
258 } else {
259 break;
260 }
261 },
262 None => break,
263 }
264 }
265 result
266 }
267
268 fn can_spawn(&self, name: &str) -> bool {
269 let queued = self
270 .queue
271 .iter()
272 .map(|(n, m)| (n, m.len() as u64))
273 .collect::<HashMap<_, u64>>();
274 let mut to_be_queued = queued.clone();
275 let name = name.to_owned();
276 *to_be_queued.entry(&name).or_default() += 1;
277 let limit = (self.global_limit - self.global_spawned_and_running) as usize;
278 let to_be_queued_order = self.calc_queued_order(to_be_queued, limit);
280 let queued_order = self.calc_queued_order(queued, limit);
281 let to_be_queued_cnt = to_be_queued_order
283 .into_iter()
284 .filter(|n| n == &name)
285 .count();
286 let queued_cnt = queued_order.into_iter().filter(|n| n == &name).count();
287 to_be_queued_cnt > queued_cnt
288 }
289
290 pub fn spawn<F>(&mut self, name: &str, f: F) -> SlowJob
291 where
292 F: FnOnce() + Send + Sync + 'static,
293 {
294 let id = self.next_id;
295 self.next_id += 1;
296 let queue = Queue::new(name, id, self.internal.as_ref().expect("internal empty"), f);
297 self.queue
298 .entry(name.to_string())
299 .or_default()
300 .push_back(queue);
301 debug_assert!(
302 self.configs.contains_key(name),
303 "Can't spawn unconfigured task!"
304 );
305 self.spawn_queued();
307 SlowJob {
308 name: name.to_string(),
309 id,
310 }
311 }
312
313 fn finish(&mut self, name: &str, metrics: JobMetrics) {
314 let metric = self.jobs_metrics.entry(name.to_string()).or_default();
315
316 if metric.len() < self.jobs_metrics_cnt {
317 metric.push(metrics);
318 }
319 self.global_spawned_and_running -= 1;
320 if let Some(c) = self.configs.get_mut(name) {
321 c.local_spawned_and_running -= 1;
322 } else {
323 warn!(?name, "sync_maintain on a no longer existing config");
324 }
325 }
326
327 fn spawn_queued(&mut self) {
328 let queued = self
329 .queue
330 .iter()
331 .map(|(n, m)| (n, m.len() as u64))
332 .collect::<HashMap<_, u64>>();
333 let limit = self.global_limit as usize;
334 let queued_order = self.calc_queued_order(queued, limit);
335 for name in queued_order.into_iter() {
336 match self.queue.get_mut(&name) {
337 Some(deque) => match deque.pop_front() {
338 Some(queue) => {
339 self.global_spawned_and_running += 1;
341 self.configs
342 .get_mut(&queue.name)
343 .expect("cannot fire a unconfigured job")
344 .local_spawned_and_running += 1;
345 self.last_spawned_configs
346 .iter()
347 .position(|e| e == &queue.name)
348 .map(|i| self.last_spawned_configs.remove(i));
349 self.last_spawned_configs.push(queue.name.to_owned());
350 self.threadpool.spawn(queue.task);
351 },
352 None => error!(
353 "internal calculation is wrong, we extected a schedulable job to be \
354 present in the queue"
355 ),
356 },
357 None => error!(
358 "internal calculation is wrong, we marked a queue as schedulable which \
359 doesn't exist"
360 ),
361 }
362 }
363 }
364
365 pub fn take_metrics(&mut self) -> HashMap<String, Vec<JobMetrics>> {
366 core::mem::take(&mut self.jobs_metrics)
367 }
368}
369
370impl SlowJobPool {
371 pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc<ThreadPool>) -> Self {
372 Self {
373 internal: InternalSlowJobPool::new(global_limit, jobs_metrics_cnt, threadpool),
374 }
375 }
376
377 pub fn configure<F>(&self, name: &str, f: F)
380 where
381 F: Fn(u64) -> u64,
382 {
383 let mut lock = self.internal.lock().expect("lock poisoned while configure");
384 let cnf = Config {
385 local_limit: f(lock.global_limit).max(1),
386 local_spawned_and_running: 0,
387 };
388 lock.configs.insert(name.to_owned(), cnf);
389 lock.last_spawned_configs.push(name.to_owned());
390 }
391
392 #[expect(clippy::result_unit_err)]
394 pub fn try_run<F>(&self, name: &str, f: F) -> Result<SlowJob, ()>
395 where
396 F: FnOnce() + Send + Sync + 'static,
397 {
398 let mut lock = self.internal.lock().expect("lock poisoned while try_run");
399 lock.spawn_queued();
401 if lock.can_spawn(name) {
402 Ok(lock.spawn(name, f))
403 } else {
404 Err(())
405 }
406 }
407
408 pub fn spawn<F>(&self, name: &str, f: F) -> SlowJob
409 where
410 F: FnOnce() + Send + Sync + 'static,
411 {
412 self.internal
413 .lock()
414 .expect("lock poisoned while spawn")
415 .spawn(name, f)
416 }
417
418 pub fn cancel(&self, job: SlowJob) -> Result<(), SlowJob> {
419 let mut lock = self.internal.lock().expect("lock poisoned while cancel");
420 if let Some(m) = lock.queue.get_mut(&job.name) {
421 let p = match m.iter().position(|p| p.id == job.id) {
422 Some(p) => p,
423 None => return Err(job),
424 };
425 if m.remove(p).is_some() {
426 return Ok(());
427 }
428 }
429 Err(job)
430 }
431
432 pub fn take_metrics(&self) -> HashMap<String, Vec<JobMetrics>> {
433 self.internal
434 .lock()
435 .expect("lock poisoned while take_metrics")
436 .take_metrics()
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use std::{
444 sync::{
445 Barrier,
446 atomic::{AtomicBool, AtomicU64, Ordering},
447 },
448 time::Duration,
449 };
450
451 fn mock_pool(
452 pool_threads: usize,
453 global_threads: u64,
454 metrics: usize,
455 foo: u64,
456 bar: u64,
457 baz: u64,
458 ) -> SlowJobPool {
459 let threadpool = rayon::ThreadPoolBuilder::new()
460 .num_threads(pool_threads)
461 .build()
462 .unwrap();
463 let pool = SlowJobPool::new(global_threads, metrics, Arc::new(threadpool));
464 if foo != 0 {
465 pool.configure("FOO", |x| x / foo);
466 }
467 if bar != 0 {
468 pool.configure("BAR", |x| x / bar);
469 }
470 if baz != 0 {
471 pool.configure("BAZ", |x| x / baz);
472 }
473 pool
474 }
475
476 #[test]
477 fn simple_queue() {
478 let pool = mock_pool(4, 4, 0, 1, 0, 0);
479 let internal = pool.internal.lock().unwrap();
480 let queue_data = [("FOO", 1u64)]
481 .iter()
482 .map(|(n, c)| ((*n).to_owned(), *c))
483 .collect::<Vec<_>>();
484 let queued = queue_data
485 .iter()
486 .map(|(s, c)| (s, *c))
487 .collect::<HashMap<_, _>>();
488 let result = internal.calc_queued_order(queued, 4);
489 assert_eq!(result.len(), 1);
490 assert_eq!(result[0], "FOO");
491 }
492
493 #[test]
494 fn multiple_queue() {
495 let pool = mock_pool(4, 4, 0, 1, 0, 0);
496 let internal = pool.internal.lock().unwrap();
497 let queue_data = [("FOO", 2u64)]
498 .iter()
499 .map(|(n, c)| ((*n).to_owned(), *c))
500 .collect::<Vec<_>>();
501 let queued = queue_data
502 .iter()
503 .map(|(s, c)| (s, *c))
504 .collect::<HashMap<_, _>>();
505 let result = internal.calc_queued_order(queued, 4);
506 assert_eq!(result.len(), 2);
507 assert_eq!(result[0], "FOO");
508 assert_eq!(result[1], "FOO");
509 }
510
511 #[test]
512 fn limit_queue() {
513 let pool = mock_pool(5, 5, 0, 1, 0, 0);
514 let internal = pool.internal.lock().unwrap();
515 let queue_data = [("FOO", 80u64)]
516 .iter()
517 .map(|(n, c)| ((*n).to_owned(), *c))
518 .collect::<Vec<_>>();
519 let queued = queue_data
520 .iter()
521 .map(|(s, c)| (s, *c))
522 .collect::<HashMap<_, _>>();
523 let result = internal.calc_queued_order(queued, 4);
524 assert_eq!(result.len(), 4);
525 assert_eq!(result[0], "FOO");
526 assert_eq!(result[1], "FOO");
527 assert_eq!(result[2], "FOO");
528 assert_eq!(result[3], "FOO");
529 }
530
531 #[test]
532 fn simple_queue_2() {
533 let pool = mock_pool(4, 4, 0, 1, 1, 0);
534 let internal = pool.internal.lock().unwrap();
535 let queue_data = [("FOO", 1u64), ("BAR", 1u64)]
536 .iter()
537 .map(|(n, c)| ((*n).to_owned(), *c))
538 .collect::<Vec<_>>();
539 let queued = queue_data
540 .iter()
541 .map(|(s, c)| (s, *c))
542 .collect::<HashMap<_, _>>();
543 let result = internal.calc_queued_order(queued, 4);
544 assert_eq!(result.len(), 2);
545 assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 1);
546 assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 1);
547 }
548
549 #[test]
550 fn multiple_queue_3() {
551 let pool = mock_pool(4, 4, 0, 1, 1, 0);
552 let internal = pool.internal.lock().unwrap();
553 let queue_data = [("FOO", 2u64), ("BAR", 2u64)]
554 .iter()
555 .map(|(n, c)| ((*n).to_owned(), *c))
556 .collect::<Vec<_>>();
557 let queued = queue_data
558 .iter()
559 .map(|(s, c)| (s, *c))
560 .collect::<HashMap<_, _>>();
561 let result = internal.calc_queued_order(queued, 4);
562 assert_eq!(result.len(), 4);
563 assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
564 assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
565 }
566
567 #[test]
568 fn multiple_queue_4() {
569 let pool = mock_pool(4, 4, 0, 2, 1, 0);
570 let internal = pool.internal.lock().unwrap();
571 let queue_data = [("FOO", 3u64), ("BAR", 3u64)]
572 .iter()
573 .map(|(n, c)| ((*n).to_owned(), *c))
574 .collect::<Vec<_>>();
575 let queued = queue_data
576 .iter()
577 .map(|(s, c)| (s, *c))
578 .collect::<HashMap<_, _>>();
579 let result = internal.calc_queued_order(queued, 4);
580 assert_eq!(result.len(), 4);
581 assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
582 assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
583 }
584
585 #[test]
586 fn multiple_queue_5() {
587 let pool = mock_pool(4, 4, 0, 2, 1, 0);
588 let internal = pool.internal.lock().unwrap();
589 let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
590 .iter()
591 .map(|(n, c)| ((*n).to_owned(), *c))
592 .collect::<Vec<_>>();
593 let queued = queue_data
594 .iter()
595 .map(|(s, c)| (s, *c))
596 .collect::<HashMap<_, _>>();
597 let result = internal.calc_queued_order(queued, 5);
598 assert_eq!(result.len(), 5);
599 assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
600 assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 3);
601 }
602
603 #[test]
604 fn multiple_queue_6() {
605 let pool = mock_pool(40, 40, 0, 2, 1, 0);
606 let internal = pool.internal.lock().unwrap();
607 let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
608 .iter()
609 .map(|(n, c)| ((*n).to_owned(), *c))
610 .collect::<Vec<_>>();
611 let queued = queue_data
612 .iter()
613 .map(|(s, c)| (s, *c))
614 .collect::<HashMap<_, _>>();
615 let result = internal.calc_queued_order(queued, 11);
616 assert_eq!(result.len(), 10);
617 assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 5);
618 assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5);
619 }
620
621 #[test]
622 fn roundrobin() {
623 let pool = mock_pool(4, 4, 0, 2, 2, 0);
624 let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
625 .iter()
626 .map(|(n, c)| ((*n).to_owned(), *c))
627 .collect::<Vec<_>>();
628 let queued = queue_data
629 .iter()
630 .map(|(s, c)| (s, *c))
631 .collect::<HashMap<_, _>>();
632 pool.internal
634 .lock()
635 .unwrap()
636 .spawn("FOO", || println!("foo"));
637 while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
639 std::thread::yield_now();
640 }
641 let result = pool
642 .internal
643 .lock()
644 .unwrap()
645 .calc_queued_order(queued.clone(), 1);
646 assert_eq!(result.len(), 1);
647 assert_eq!(result[0], "BAR");
648 let result = pool
650 .internal
651 .lock()
652 .unwrap()
653 .calc_queued_order(queued.clone(), 1);
654 assert_eq!(result.len(), 1);
655 assert_eq!(result[0], "BAR");
656 pool.internal
658 .lock()
659 .unwrap()
660 .spawn("BAR", || println!("bar"));
661 while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
662 std::thread::yield_now();
663 }
664 let result = pool.internal.lock().unwrap().calc_queued_order(queued, 1);
665 assert_eq!(result.len(), 1);
666 assert_eq!(result[0], "FOO");
667 }
668
669 #[test]
670 #[should_panic]
671 fn unconfigured() {
672 let pool = mock_pool(4, 4, 0, 2, 1, 0);
673 let mut internal = pool.internal.lock().unwrap();
674 internal.spawn("UNCONFIGURED", || println!());
675 }
676
677 #[test]
678 fn correct_spawn_doesnt_panic() {
679 let pool = mock_pool(4, 4, 0, 2, 1, 0);
680 let mut internal = pool.internal.lock().unwrap();
681 internal.spawn("FOO", || println!("foo"));
682 internal.spawn("BAR", || println!("bar"));
683 }
684
685 #[test]
686 fn can_spawn() {
687 let pool = mock_pool(4, 4, 0, 2, 1, 0);
688 let internal = pool.internal.lock().unwrap();
689 assert!(internal.can_spawn("FOO"));
690 assert!(internal.can_spawn("BAR"));
691 }
692
693 #[test]
694 fn try_run_works() {
695 let pool = mock_pool(4, 4, 0, 2, 1, 0);
696 pool.try_run("FOO", || println!("foo")).unwrap();
697 pool.try_run("BAR", || println!("bar")).unwrap();
698 }
699
700 #[test]
701 fn try_run_exhausted() {
702 let pool = mock_pool(8, 8, 0, 4, 2, 0);
703 let func = || loop {
704 std::thread::sleep(Duration::from_secs(1))
705 };
706 pool.try_run("FOO", func).unwrap();
707 pool.try_run("BAR", func).unwrap();
708 pool.try_run("FOO", func).unwrap();
709 pool.try_run("BAR", func).unwrap();
710 pool.try_run("FOO", func).unwrap_err();
711 pool.try_run("BAR", func).unwrap();
712 pool.try_run("FOO", func).unwrap_err();
713 pool.try_run("BAR", func).unwrap();
714 pool.try_run("FOO", func).unwrap_err();
715 pool.try_run("BAR", func).unwrap_err();
716 pool.try_run("FOO", func).unwrap_err();
717 }
718
719 #[test]
720 fn actually_runs_1() {
721 let pool = mock_pool(4, 4, 0, 0, 0, 1);
722 let barrier = Arc::new(Barrier::new(2));
723 let barrier_clone = Arc::clone(&barrier);
724 pool.try_run("BAZ", move || {
725 barrier_clone.wait();
726 })
727 .unwrap();
728 barrier.wait();
729 }
730
731 #[test]
732 fn actually_runs_2() {
733 let pool = mock_pool(4, 4, 0, 0, 0, 1);
734 let barrier = Arc::new(Barrier::new(2));
735 let barrier_clone = Arc::clone(&barrier);
736 pool.spawn("BAZ", move || {
737 barrier_clone.wait();
738 });
739 barrier.wait();
740 }
741
742 #[test]
743 fn actually_waits() {
744 let pool = mock_pool(4, 4, 0, 4, 0, 1);
745 let ops_i_ran = Arc::new(AtomicBool::new(false));
746 let ops_i_ran_clone = Arc::clone(&ops_i_ran);
747 let barrier = Arc::new(Barrier::new(2));
748 let barrier_clone = Arc::clone(&barrier);
749 let barrier2 = Arc::new(Barrier::new(2));
750 let barrier2_clone = Arc::clone(&barrier2);
751 pool.try_run("FOO", move || {
752 barrier_clone.wait();
753 })
754 .unwrap();
755 pool.spawn("FOO", move || {
756 ops_i_ran_clone.store(true, Ordering::SeqCst);
757 barrier2_clone.wait();
758 });
759 std::thread::sleep(Duration::from_secs(1));
761 assert!(!ops_i_ran.load(Ordering::SeqCst));
762 barrier.wait();
764 barrier2.wait();
766 }
767
768 #[test]
769 fn verify_metrics() {
770 let pool = mock_pool(4, 4, 2, 1, 0, 4);
771 let barrier = Arc::new(Barrier::new(5));
772 for name in &["FOO", "BAZ", "FOO", "FOO"] {
773 let barrier_clone = Arc::clone(&barrier);
774 pool.spawn(name, move || {
775 barrier_clone.wait();
776 });
777 }
778 barrier.wait();
780 std::thread::sleep(Duration::from_secs(2));
782 let metrics = pool.take_metrics();
783 let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics");
784 assert_eq!(foo.len(), 2);
786 assert!(metrics.get("BAR").is_none());
787 let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics");
788 assert_eq!(baz.len(), 1);
789 }
790
791 fn work_barrier(counter: &Arc<AtomicU64>, ms: u64) -> impl std::ops::FnOnce() -> () + use<> {
792 let counter = Arc::clone(counter);
793 println!("Create work_barrier");
794 move || {
795 println!(".{}..", ms);
796 std::thread::sleep(Duration::from_millis(ms));
797 println!(".{}..Done", ms);
798 counter.fetch_add(1, Ordering::SeqCst);
799 }
800 }
801
802 #[test]
803 fn verify_that_spawn_doesnt_block_par_iter() {
804 let threadpool = Arc::new(
805 rayon::ThreadPoolBuilder::new()
806 .num_threads(20)
807 .build()
808 .unwrap(),
809 );
810 let pool = SlowJobPool::new(2, 100, Arc::<rayon::ThreadPool>::clone(&threadpool));
811 pool.configure("BAZ", |_| 2);
812 let counter = Arc::new(AtomicU64::new(0));
813 let start = Instant::now();
814
815 threadpool.install(|| {
816 use rayon::prelude::*;
817 (0..100)
818 .into_par_iter()
819 .map(|i| {
820 std::thread::sleep(Duration::from_millis(10));
821 if i == 50 {
822 pool.spawn("BAZ", work_barrier(&counter, 2000));
823 }
824 if i == 99 {
825 println!("The first ITER end, at {}ms", start.elapsed().as_millis());
826 }
827 })
828 .collect::<Vec<_>>();
829 let elapsed = start.elapsed().as_millis();
830 println!("The first ITER finished, at {}ms", elapsed);
831 assert!(
832 elapsed < 1900,
833 "It seems like the par_iter waited on the 2s sleep task to finish"
834 );
835 });
836
837 while counter.load(Ordering::SeqCst) == 0 {
838 println!("waiting for BAZ task to finish");
839 std::thread::sleep(Duration::from_secs(1));
840 }
841 }
842}