Skip to main content

veloren_common/
slowjob.rs

1use hashbrown::HashMap;
2use rayon::ThreadPool;
3use std::{
4    collections::VecDeque,
5    sync::{Arc, Mutex},
6    time::Instant,
7};
8use tracing::{error, warn};
9
10/// Provides a Wrapper around rayon threadpool to execute slow-jobs.
11/// slow means, the job doesn't need to not complete within the same tick.
12/// DO NOT USE I/O blocking jobs, but only CPU heavy jobs.
13/// Jobs run here, will reduce the ammount of threads rayon can use during the
14/// main tick.
15///
16/// ## Configuration
17/// This Pool allows you to configure certain names of jobs and assign them a
18/// maximum number of threads # Example
19/// Your system has 16 cores, you assign 12 cores for slow-jobs.
20/// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on
21/// max 50% (6 = cores)
22///
23/// ## Spawn Order
24/// - At least 1 job of a configuration is allowed to run if global limit isn't
25///   hit.
26/// - remaining capacities are spread in relation to their limit. e.g. a
27///   configuration with double the limit will be sheduled to spawn double the
28///   tasks, starting by a round robin.
29///
30/// ## States
31/// - queued
32/// - spawned
33/// - started
34/// - finished
35/// ```
36/// # use veloren_common::slowjob::SlowJobPool;
37/// # use std::sync::Arc;
38///
39/// let threadpool = rayon::ThreadPoolBuilder::new()
40///     .num_threads(16)
41///     .build()
42///     .unwrap();
43/// let pool = SlowJobPool::new(3, 10, Arc::new(threadpool));
44/// pool.configure("CHUNK_GENERATOR", |n| n / 2);
45/// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job"));
46/// ```
47#[derive(Clone)]
48pub struct SlowJobPool {
49    internal: Arc<Mutex<InternalSlowJobPool>>,
50}
51
52#[derive(Debug)]
53pub struct SlowJob {
54    name: String,
55    id: u64,
56}
57
58type JobType = Box<dyn FnOnce() + Send + Sync + 'static>;
59
60struct InternalSlowJobPool {
61    next_id: u64,
62    queue: HashMap<String, VecDeque<Queue>>,
63    configs: HashMap<String, Config>,
64    last_spawned_configs: Vec<String>,
65    global_spawned_and_running: u64,
66    global_limit: u64,
67    jobs_metrics_cnt: usize,
68    jobs_metrics: HashMap<String, Vec<JobMetrics>>,
69    threadpool: Arc<ThreadPool>,
70    internal: Option<Arc<Mutex<Self>>>,
71}
72
73#[derive(Debug)]
74struct Config {
75    local_limit: u64,
76    local_spawned_and_running: u64,
77}
78
79struct Queue {
80    id: u64,
81    name: String,
82    task: JobType,
83}
84
85pub struct JobMetrics {
86    pub queue_created: Instant,
87    pub execution_start: Instant,
88    pub execution_end: Instant,
89}
90
91impl Queue {
92    fn new<F>(name: &str, id: u64, internal: &Arc<Mutex<InternalSlowJobPool>>, f: F) -> Self
93    where
94        F: FnOnce() + Send + Sync + 'static,
95    {
96        let internal = Arc::clone(internal);
97        let name_cloned = name.to_owned();
98        let queue_created = Instant::now();
99        Self {
100            id,
101            name: name.to_owned(),
102            task: Box::new(move || {
103                common_base::prof_span_alloc!(_guard, &name_cloned);
104                let execution_start = Instant::now();
105                f();
106                let execution_end = Instant::now();
107                let metrics = JobMetrics {
108                    queue_created,
109                    execution_start,
110                    execution_end,
111                };
112                // directly maintain the next task afterwards
113                {
114                    let mut lock = internal.lock().expect("slowjob lock poisoned");
115                    lock.finish(&name_cloned, metrics);
116                    lock.spawn_queued();
117                }
118            }),
119        }
120    }
121}
122
123impl InternalSlowJobPool {
124    pub fn new(
125        global_limit: u64,
126        jobs_metrics_cnt: usize,
127        _threadpool: Arc<ThreadPool>,
128    ) -> Arc<Mutex<Self>> {
129        // rayon is having a bug where a ECS task could work-steal a slowjob if we use
130        // the same threadpool, which would cause lagspikes we dont want!
131        let threadpool = Arc::new(
132            rayon::ThreadPoolBuilder::new()
133                .num_threads(global_limit as usize)
134                .thread_name(move |i| format!("slowjob-{}", i))
135                .spawn_handler(|thread| {
136                    let mut b = std::thread::Builder::new();
137                    if let Some(name) = thread.name() {
138                        b = b.name(name.to_owned());
139                    }
140                    if let Some(stack_size) = thread.stack_size() {
141                        b = b.stack_size(stack_size);
142                    }
143                    b.spawn(|| {
144                        use thread_priority::*;
145                        let priority =
146                            ThreadPriority::Crossplatform(TryFrom::try_from(15).unwrap());
147                        if let Err(err) = cfg_select! {
148                            target_os = "linux" => std::thread::current().set_priority_and_policy(
149                                ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
150                                priority,
151                            ),
152                            _ => std::thread::current().set_priority(priority),
153                        } {
154                            tracing::warn!(
155                                "Unable to set priority/schedule policy for slow job pool thread: \
156                                 {err}"
157                            );
158                        }
159                        thread.run()
160                    })?;
161                    Ok(())
162                })
163                .build()
164                .unwrap(),
165        );
166        let link = Arc::new(Mutex::new(Self {
167            next_id: 0,
168            queue: HashMap::new(),
169            configs: HashMap::new(),
170            last_spawned_configs: Vec::new(),
171            global_spawned_and_running: 0,
172            global_limit: global_limit.max(1),
173            jobs_metrics_cnt,
174            jobs_metrics: HashMap::new(),
175            threadpool,
176            internal: None,
177        }));
178
179        let link_clone = Arc::clone(&link);
180        link.lock()
181            .expect("poisoned on InternalSlowJobPool::new")
182            .internal = Some(link_clone);
183        link
184    }
185
186    /// returns order of configuration which are queued next
187    fn calc_queued_order(
188        &self,
189        mut queued: HashMap<&String, u64>,
190        mut limit: usize,
191    ) -> Vec<String> {
192        let mut roundrobin = self.last_spawned_configs.clone();
193        let mut result = vec![];
194        let spawned = self
195            .configs
196            .iter()
197            .map(|(n, c)| (n, c.local_spawned_and_running))
198            .collect::<HashMap<_, u64>>();
199        let mut queried_capped = self
200            .configs
201            .iter()
202            .map(|(n, c)| {
203                (
204                    n,
205                    queued
206                        .get(&n)
207                        .cloned()
208                        .unwrap_or(0)
209                        .min(c.local_limit - c.local_spawned_and_running),
210                )
211            })
212            .collect::<HashMap<_, _>>();
213        // grab all configs that are queued and not running. in roundrobin order
214        for n in roundrobin.clone().into_iter() {
215            if let Some(c) = queued.get_mut(&n)
216                && *c > 0
217                && spawned.get(&n).cloned().unwrap_or(0) == 0
218            {
219                result.push(n.clone());
220                *c -= 1;
221                limit -= 1;
222                queried_capped.get_mut(&n).map(|v| *v -= 1);
223                roundrobin
224                    .iter()
225                    .position(|e| e == &n)
226                    .map(|i| roundrobin.remove(i));
227                roundrobin.push(n);
228                if limit == 0 {
229                    return result;
230                }
231            }
232        }
233        //schedule rest based on their possible limites, don't use round robin here
234        let total_limit = queried_capped.values().sum::<u64>() as f32;
235        if total_limit < f32::EPSILON {
236            return result;
237        }
238        let mut spawn_rates = queried_capped
239            .iter()
240            .map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32)))
241            .collect::<Vec<_>>();
242        while limit > 0 {
243            spawn_rates.sort_by(|(_, a), (_, b)| {
244                if b < a {
245                    core::cmp::Ordering::Less
246                } else if (b - a).abs() < f32::EPSILON {
247                    core::cmp::Ordering::Equal
248                } else {
249                    core::cmp::Ordering::Greater
250                }
251            });
252            match spawn_rates.first_mut() {
253                Some((n, r)) => {
254                    if *r > f32::EPSILON {
255                        result.push(n.clone());
256                        limit -= 1;
257                        *r -= 1.0;
258                    } else {
259                        break;
260                    }
261                },
262                None => break,
263            }
264        }
265        result
266    }
267
268    fn can_spawn(&self, name: &str) -> bool {
269        let queued = self
270            .queue
271            .iter()
272            .map(|(n, m)| (n, m.len() as u64))
273            .collect::<HashMap<_, u64>>();
274        let mut to_be_queued = queued.clone();
275        let name = name.to_owned();
276        *to_be_queued.entry(&name).or_default() += 1;
277        let limit = (self.global_limit - self.global_spawned_and_running) as usize;
278        // calculate to_be_queued first
279        let to_be_queued_order = self.calc_queued_order(to_be_queued, limit);
280        let queued_order = self.calc_queued_order(queued, limit);
281        // if its queued one time more then its okay to spawn
282        let to_be_queued_cnt = to_be_queued_order
283            .into_iter()
284            .filter(|n| n == &name)
285            .count();
286        let queued_cnt = queued_order.into_iter().filter(|n| n == &name).count();
287        to_be_queued_cnt > queued_cnt
288    }
289
290    pub fn spawn<F>(&mut self, name: &str, f: F) -> SlowJob
291    where
292        F: FnOnce() + Send + Sync + 'static,
293    {
294        let id = self.next_id;
295        self.next_id += 1;
296        let queue = Queue::new(name, id, self.internal.as_ref().expect("internal empty"), f);
297        self.queue
298            .entry(name.to_string())
299            .or_default()
300            .push_back(queue);
301        debug_assert!(
302            self.configs.contains_key(name),
303            "Can't spawn unconfigured task!"
304        );
305        //spawn already queued
306        self.spawn_queued();
307        SlowJob {
308            name: name.to_string(),
309            id,
310        }
311    }
312
313    fn finish(&mut self, name: &str, metrics: JobMetrics) {
314        let metric = self.jobs_metrics.entry(name.to_string()).or_default();
315
316        if metric.len() < self.jobs_metrics_cnt {
317            metric.push(metrics);
318        }
319        self.global_spawned_and_running -= 1;
320        if let Some(c) = self.configs.get_mut(name) {
321            c.local_spawned_and_running -= 1;
322        } else {
323            warn!(?name, "sync_maintain on a no longer existing config");
324        }
325    }
326
327    fn spawn_queued(&mut self) {
328        let queued = self
329            .queue
330            .iter()
331            .map(|(n, m)| (n, m.len() as u64))
332            .collect::<HashMap<_, u64>>();
333        let limit = self.global_limit as usize;
334        let queued_order = self.calc_queued_order(queued, limit);
335        for name in queued_order.into_iter() {
336            match self.queue.get_mut(&name) {
337                Some(deque) => match deque.pop_front() {
338                    Some(queue) => {
339                        //fire
340                        self.global_spawned_and_running += 1;
341                        self.configs
342                            .get_mut(&queue.name)
343                            .expect("cannot fire a unconfigured job")
344                            .local_spawned_and_running += 1;
345                        self.last_spawned_configs
346                            .iter()
347                            .position(|e| e == &queue.name)
348                            .map(|i| self.last_spawned_configs.remove(i));
349                        self.last_spawned_configs.push(queue.name.to_owned());
350                        self.threadpool.spawn(queue.task);
351                    },
352                    None => error!(
353                        "internal calculation is wrong, we extected a schedulable job to be \
354                         present in the queue"
355                    ),
356                },
357                None => error!(
358                    "internal calculation is wrong, we marked a queue as schedulable which \
359                     doesn't exist"
360                ),
361            }
362        }
363    }
364
365    pub fn take_metrics(&mut self) -> HashMap<String, Vec<JobMetrics>> {
366        core::mem::take(&mut self.jobs_metrics)
367    }
368}
369
370impl SlowJobPool {
371    pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc<ThreadPool>) -> Self {
372        Self {
373            internal: InternalSlowJobPool::new(global_limit, jobs_metrics_cnt, threadpool),
374        }
375    }
376
377    /// configure a NAME to spawn up to f(n) threads, depending on how many
378    /// threads we globally have available
379    pub fn configure<F>(&self, name: &str, f: F)
380    where
381        F: Fn(u64) -> u64,
382    {
383        let mut lock = self.internal.lock().expect("lock poisoned while configure");
384        let cnf = Config {
385            local_limit: f(lock.global_limit).max(1),
386            local_spawned_and_running: 0,
387        };
388        lock.configs.insert(name.to_owned(), cnf);
389        lock.last_spawned_configs.push(name.to_owned());
390    }
391
392    /// spawn a new slow job on a certain NAME IF it can run immediately
393    #[expect(clippy::result_unit_err)]
394    pub fn try_run<F>(&self, name: &str, f: F) -> Result<SlowJob, ()>
395    where
396        F: FnOnce() + Send + Sync + 'static,
397    {
398        let mut lock = self.internal.lock().expect("lock poisoned while try_run");
399        //spawn already queued
400        lock.spawn_queued();
401        if lock.can_spawn(name) {
402            Ok(lock.spawn(name, f))
403        } else {
404            Err(())
405        }
406    }
407
408    pub fn spawn<F>(&self, name: &str, f: F) -> SlowJob
409    where
410        F: FnOnce() + Send + Sync + 'static,
411    {
412        self.internal
413            .lock()
414            .expect("lock poisoned while spawn")
415            .spawn(name, f)
416    }
417
418    pub fn cancel(&self, job: SlowJob) -> Result<(), SlowJob> {
419        let mut lock = self.internal.lock().expect("lock poisoned while cancel");
420        if let Some(m) = lock.queue.get_mut(&job.name) {
421            let p = match m.iter().position(|p| p.id == job.id) {
422                Some(p) => p,
423                None => return Err(job),
424            };
425            if m.remove(p).is_some() {
426                return Ok(());
427            }
428        }
429        Err(job)
430    }
431
432    pub fn take_metrics(&self) -> HashMap<String, Vec<JobMetrics>> {
433        self.internal
434            .lock()
435            .expect("lock poisoned while take_metrics")
436            .take_metrics()
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use std::{
444        sync::{
445            Barrier,
446            atomic::{AtomicBool, AtomicU64, Ordering},
447        },
448        time::Duration,
449    };
450
451    fn mock_pool(
452        pool_threads: usize,
453        global_threads: u64,
454        metrics: usize,
455        foo: u64,
456        bar: u64,
457        baz: u64,
458    ) -> SlowJobPool {
459        let threadpool = rayon::ThreadPoolBuilder::new()
460            .num_threads(pool_threads)
461            .build()
462            .unwrap();
463        let pool = SlowJobPool::new(global_threads, metrics, Arc::new(threadpool));
464        if foo != 0 {
465            pool.configure("FOO", |x| x / foo);
466        }
467        if bar != 0 {
468            pool.configure("BAR", |x| x / bar);
469        }
470        if baz != 0 {
471            pool.configure("BAZ", |x| x / baz);
472        }
473        pool
474    }
475
476    #[test]
477    fn simple_queue() {
478        let pool = mock_pool(4, 4, 0, 1, 0, 0);
479        let internal = pool.internal.lock().unwrap();
480        let queue_data = [("FOO", 1u64)]
481            .iter()
482            .map(|(n, c)| ((*n).to_owned(), *c))
483            .collect::<Vec<_>>();
484        let queued = queue_data
485            .iter()
486            .map(|(s, c)| (s, *c))
487            .collect::<HashMap<_, _>>();
488        let result = internal.calc_queued_order(queued, 4);
489        assert_eq!(result.len(), 1);
490        assert_eq!(result[0], "FOO");
491    }
492
493    #[test]
494    fn multiple_queue() {
495        let pool = mock_pool(4, 4, 0, 1, 0, 0);
496        let internal = pool.internal.lock().unwrap();
497        let queue_data = [("FOO", 2u64)]
498            .iter()
499            .map(|(n, c)| ((*n).to_owned(), *c))
500            .collect::<Vec<_>>();
501        let queued = queue_data
502            .iter()
503            .map(|(s, c)| (s, *c))
504            .collect::<HashMap<_, _>>();
505        let result = internal.calc_queued_order(queued, 4);
506        assert_eq!(result.len(), 2);
507        assert_eq!(result[0], "FOO");
508        assert_eq!(result[1], "FOO");
509    }
510
511    #[test]
512    fn limit_queue() {
513        let pool = mock_pool(5, 5, 0, 1, 0, 0);
514        let internal = pool.internal.lock().unwrap();
515        let queue_data = [("FOO", 80u64)]
516            .iter()
517            .map(|(n, c)| ((*n).to_owned(), *c))
518            .collect::<Vec<_>>();
519        let queued = queue_data
520            .iter()
521            .map(|(s, c)| (s, *c))
522            .collect::<HashMap<_, _>>();
523        let result = internal.calc_queued_order(queued, 4);
524        assert_eq!(result.len(), 4);
525        assert_eq!(result[0], "FOO");
526        assert_eq!(result[1], "FOO");
527        assert_eq!(result[2], "FOO");
528        assert_eq!(result[3], "FOO");
529    }
530
531    #[test]
532    fn simple_queue_2() {
533        let pool = mock_pool(4, 4, 0, 1, 1, 0);
534        let internal = pool.internal.lock().unwrap();
535        let queue_data = [("FOO", 1u64), ("BAR", 1u64)]
536            .iter()
537            .map(|(n, c)| ((*n).to_owned(), *c))
538            .collect::<Vec<_>>();
539        let queued = queue_data
540            .iter()
541            .map(|(s, c)| (s, *c))
542            .collect::<HashMap<_, _>>();
543        let result = internal.calc_queued_order(queued, 4);
544        assert_eq!(result.len(), 2);
545        assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 1);
546        assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 1);
547    }
548
549    #[test]
550    fn multiple_queue_3() {
551        let pool = mock_pool(4, 4, 0, 1, 1, 0);
552        let internal = pool.internal.lock().unwrap();
553        let queue_data = [("FOO", 2u64), ("BAR", 2u64)]
554            .iter()
555            .map(|(n, c)| ((*n).to_owned(), *c))
556            .collect::<Vec<_>>();
557        let queued = queue_data
558            .iter()
559            .map(|(s, c)| (s, *c))
560            .collect::<HashMap<_, _>>();
561        let result = internal.calc_queued_order(queued, 4);
562        assert_eq!(result.len(), 4);
563        assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
564        assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
565    }
566
567    #[test]
568    fn multiple_queue_4() {
569        let pool = mock_pool(4, 4, 0, 2, 1, 0);
570        let internal = pool.internal.lock().unwrap();
571        let queue_data = [("FOO", 3u64), ("BAR", 3u64)]
572            .iter()
573            .map(|(n, c)| ((*n).to_owned(), *c))
574            .collect::<Vec<_>>();
575        let queued = queue_data
576            .iter()
577            .map(|(s, c)| (s, *c))
578            .collect::<HashMap<_, _>>();
579        let result = internal.calc_queued_order(queued, 4);
580        assert_eq!(result.len(), 4);
581        assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
582        assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
583    }
584
585    #[test]
586    fn multiple_queue_5() {
587        let pool = mock_pool(4, 4, 0, 2, 1, 0);
588        let internal = pool.internal.lock().unwrap();
589        let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
590            .iter()
591            .map(|(n, c)| ((*n).to_owned(), *c))
592            .collect::<Vec<_>>();
593        let queued = queue_data
594            .iter()
595            .map(|(s, c)| (s, *c))
596            .collect::<HashMap<_, _>>();
597        let result = internal.calc_queued_order(queued, 5);
598        assert_eq!(result.len(), 5);
599        assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
600        assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 3);
601    }
602
603    #[test]
604    fn multiple_queue_6() {
605        let pool = mock_pool(40, 40, 0, 2, 1, 0);
606        let internal = pool.internal.lock().unwrap();
607        let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
608            .iter()
609            .map(|(n, c)| ((*n).to_owned(), *c))
610            .collect::<Vec<_>>();
611        let queued = queue_data
612            .iter()
613            .map(|(s, c)| (s, *c))
614            .collect::<HashMap<_, _>>();
615        let result = internal.calc_queued_order(queued, 11);
616        assert_eq!(result.len(), 10);
617        assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 5);
618        assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5);
619    }
620
621    #[test]
622    fn roundrobin() {
623        let pool = mock_pool(4, 4, 0, 2, 2, 0);
624        let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
625            .iter()
626            .map(|(n, c)| ((*n).to_owned(), *c))
627            .collect::<Vec<_>>();
628        let queued = queue_data
629            .iter()
630            .map(|(s, c)| (s, *c))
631            .collect::<HashMap<_, _>>();
632        // Spawn a FOO task.
633        pool.internal
634            .lock()
635            .unwrap()
636            .spawn("FOO", || println!("foo"));
637        // a barrier in f doesnt work as we need to wait for the cleanup
638        while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
639            std::thread::yield_now();
640        }
641        let result = pool
642            .internal
643            .lock()
644            .unwrap()
645            .calc_queued_order(queued.clone(), 1);
646        assert_eq!(result.len(), 1);
647        assert_eq!(result[0], "BAR");
648        // keep order if no new is spawned
649        let result = pool
650            .internal
651            .lock()
652            .unwrap()
653            .calc_queued_order(queued.clone(), 1);
654        assert_eq!(result.len(), 1);
655        assert_eq!(result[0], "BAR");
656        // spawn a BAR task
657        pool.internal
658            .lock()
659            .unwrap()
660            .spawn("BAR", || println!("bar"));
661        while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
662            std::thread::yield_now();
663        }
664        let result = pool.internal.lock().unwrap().calc_queued_order(queued, 1);
665        assert_eq!(result.len(), 1);
666        assert_eq!(result[0], "FOO");
667    }
668
669    #[test]
670    #[should_panic]
671    fn unconfigured() {
672        let pool = mock_pool(4, 4, 0, 2, 1, 0);
673        let mut internal = pool.internal.lock().unwrap();
674        internal.spawn("UNCONFIGURED", || println!());
675    }
676
677    #[test]
678    fn correct_spawn_doesnt_panic() {
679        let pool = mock_pool(4, 4, 0, 2, 1, 0);
680        let mut internal = pool.internal.lock().unwrap();
681        internal.spawn("FOO", || println!("foo"));
682        internal.spawn("BAR", || println!("bar"));
683    }
684
685    #[test]
686    fn can_spawn() {
687        let pool = mock_pool(4, 4, 0, 2, 1, 0);
688        let internal = pool.internal.lock().unwrap();
689        assert!(internal.can_spawn("FOO"));
690        assert!(internal.can_spawn("BAR"));
691    }
692
693    #[test]
694    fn try_run_works() {
695        let pool = mock_pool(4, 4, 0, 2, 1, 0);
696        pool.try_run("FOO", || println!("foo")).unwrap();
697        pool.try_run("BAR", || println!("bar")).unwrap();
698    }
699
700    #[test]
701    fn try_run_exhausted() {
702        let pool = mock_pool(8, 8, 0, 4, 2, 0);
703        let func = || loop {
704            std::thread::sleep(Duration::from_secs(1))
705        };
706        pool.try_run("FOO", func).unwrap();
707        pool.try_run("BAR", func).unwrap();
708        pool.try_run("FOO", func).unwrap();
709        pool.try_run("BAR", func).unwrap();
710        pool.try_run("FOO", func).unwrap_err();
711        pool.try_run("BAR", func).unwrap();
712        pool.try_run("FOO", func).unwrap_err();
713        pool.try_run("BAR", func).unwrap();
714        pool.try_run("FOO", func).unwrap_err();
715        pool.try_run("BAR", func).unwrap_err();
716        pool.try_run("FOO", func).unwrap_err();
717    }
718
719    #[test]
720    fn actually_runs_1() {
721        let pool = mock_pool(4, 4, 0, 0, 0, 1);
722        let barrier = Arc::new(Barrier::new(2));
723        let barrier_clone = Arc::clone(&barrier);
724        pool.try_run("BAZ", move || {
725            barrier_clone.wait();
726        })
727        .unwrap();
728        barrier.wait();
729    }
730
731    #[test]
732    fn actually_runs_2() {
733        let pool = mock_pool(4, 4, 0, 0, 0, 1);
734        let barrier = Arc::new(Barrier::new(2));
735        let barrier_clone = Arc::clone(&barrier);
736        pool.spawn("BAZ", move || {
737            barrier_clone.wait();
738        });
739        barrier.wait();
740    }
741
742    #[test]
743    fn actually_waits() {
744        let pool = mock_pool(4, 4, 0, 4, 0, 1);
745        let ops_i_ran = Arc::new(AtomicBool::new(false));
746        let ops_i_ran_clone = Arc::clone(&ops_i_ran);
747        let barrier = Arc::new(Barrier::new(2));
748        let barrier_clone = Arc::clone(&barrier);
749        let barrier2 = Arc::new(Barrier::new(2));
750        let barrier2_clone = Arc::clone(&barrier2);
751        pool.try_run("FOO", move || {
752            barrier_clone.wait();
753        })
754        .unwrap();
755        pool.spawn("FOO", move || {
756            ops_i_ran_clone.store(true, Ordering::SeqCst);
757            barrier2_clone.wait();
758        });
759        // in this case we have to sleep
760        std::thread::sleep(Duration::from_secs(1));
761        assert!(!ops_i_ran.load(Ordering::SeqCst));
762        // now finish the first job
763        barrier.wait();
764        // now wait on the second job to be actually finished
765        barrier2.wait();
766    }
767
768    #[test]
769    fn verify_metrics() {
770        let pool = mock_pool(4, 4, 2, 1, 0, 4);
771        let barrier = Arc::new(Barrier::new(5));
772        for name in &["FOO", "BAZ", "FOO", "FOO"] {
773            let barrier_clone = Arc::clone(&barrier);
774            pool.spawn(name, move || {
775                barrier_clone.wait();
776            });
777        }
778        // now finish all jobs
779        barrier.wait();
780        // in this case we have to sleep to give it some time to store all the metrics
781        std::thread::sleep(Duration::from_secs(2));
782        let metrics = pool.take_metrics();
783        let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics");
784        //its limited to 2, even though we had 3 jobs
785        assert_eq!(foo.len(), 2);
786        assert!(metrics.get("BAR").is_none());
787        let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics");
788        assert_eq!(baz.len(), 1);
789    }
790
791    fn work_barrier(counter: &Arc<AtomicU64>, ms: u64) -> impl std::ops::FnOnce() -> () + use<> {
792        let counter = Arc::clone(counter);
793        println!("Create work_barrier");
794        move || {
795            println!(".{}..", ms);
796            std::thread::sleep(Duration::from_millis(ms));
797            println!(".{}..Done", ms);
798            counter.fetch_add(1, Ordering::SeqCst);
799        }
800    }
801
802    #[test]
803    fn verify_that_spawn_doesnt_block_par_iter() {
804        let threadpool = Arc::new(
805            rayon::ThreadPoolBuilder::new()
806                .num_threads(20)
807                .build()
808                .unwrap(),
809        );
810        let pool = SlowJobPool::new(2, 100, Arc::<rayon::ThreadPool>::clone(&threadpool));
811        pool.configure("BAZ", |_| 2);
812        let counter = Arc::new(AtomicU64::new(0));
813        let start = Instant::now();
814
815        threadpool.install(|| {
816            use rayon::prelude::*;
817            (0..100)
818                .into_par_iter()
819                .map(|i| {
820                    std::thread::sleep(Duration::from_millis(10));
821                    if i == 50 {
822                        pool.spawn("BAZ", work_barrier(&counter, 2000));
823                    }
824                    if i == 99 {
825                        println!("The first ITER end, at {}ms", start.elapsed().as_millis());
826                    }
827                })
828                .collect::<Vec<_>>();
829            let elapsed = start.elapsed().as_millis();
830            println!("The first ITER finished, at {}ms", elapsed);
831            assert!(
832                elapsed < 1900,
833                "It seems like the par_iter waited on the 2s sleep task to finish"
834            );
835        });
836
837        while counter.load(Ordering::SeqCst) == 0 {
838            println!("waiting for BAZ task to finish");
839            std::thread::sleep(Duration::from_secs(1));
840        }
841    }
842}