veloren_voxygen/ui/
keyed_jobs.rs

1use common::slowjob::{SlowJob, SlowJobPool};
2use hashbrown::{HashMap, hash_map::Entry};
3use std::{
4    hash::Hash,
5    time::{Duration, Instant},
6};
7
8enum KeyedJobTask<V> {
9    Pending(Instant, Option<SlowJob>),
10    Completed(Instant, V),
11}
12
13pub struct KeyedJobs<K, V> {
14    tx: crossbeam_channel::Sender<(K, V)>,
15    rx: crossbeam_channel::Receiver<(K, V)>,
16    tasks: HashMap<K, KeyedJobTask<V>>,
17    name: &'static str,
18    last_gc: Instant,
19}
20
21const KEYEDJOBS_GC_INTERVAL: Duration = Duration::from_secs(1);
22
23impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Send + Sync + 'static> KeyedJobs<K, V> {
24    pub fn new(name: &'static str) -> Self {
25        let (tx, rx) = crossbeam_channel::unbounded();
26        Self {
27            tx,
28            rx,
29            tasks: HashMap::new(),
30            name,
31            last_gc: Instant::now(),
32        }
33    }
34
35    /// Spawn a task on a specified threadpool. The function is given as a thunk
36    /// so that if work is needed to create captured variables (e.g.
37    /// `Arc::clone`), that only occurs if the task hasn't yet been scheduled.
38    pub fn spawn<F: FnOnce(&K) -> V + Send + Sync + 'static>(
39        &mut self,
40        pool: Option<&SlowJobPool>,
41        k: K,
42        f: impl FnOnce() -> F,
43    ) -> Option<(K, V)> {
44        if let Some(pool) = pool {
45            while let Ok((k2, v)) = self.rx.try_recv() {
46                if k == k2 {
47                    return Some((k, v));
48                } else {
49                    self.tasks
50                        .insert(k2, KeyedJobTask::Completed(Instant::now(), v));
51                }
52            }
53            let now = Instant::now();
54            if now - self.last_gc > KEYEDJOBS_GC_INTERVAL {
55                self.last_gc = now;
56                self.tasks.retain(|_, task| match task {
57                    KeyedJobTask::Completed(at, _) => now - *at < KEYEDJOBS_GC_INTERVAL,
58                    KeyedJobTask::Pending(at, job) => {
59                        let fresh = now - *at < KEYEDJOBS_GC_INTERVAL;
60                        if !fresh {
61                            if let Some(job) = job.take() {
62                                // Cancelling a job only fails if the job doesn't exist anymore,
63                                // which means that it completed while we tried to GC its pending
64                                // struct, which means that we'll GC it in the next cycle, so ignore
65                                // the error in this collection.
66                                let _ = pool.cancel(job);
67                            }
68                        }
69                        fresh
70                    },
71                });
72            }
73            match self.tasks.entry(k.clone()) {
74                Entry::Occupied(e) => {
75                    let mut ret = None;
76                    e.replace_entry_with(|_, v| {
77                        if let KeyedJobTask::Completed(_, v) = v {
78                            ret = Some((k, v));
79                            None
80                        } else {
81                            Some(v)
82                        }
83                    });
84                    ret
85                },
86                Entry::Vacant(e) => {
87                    // TODO: consider adding a limit to the number of submitted jobs based on the
88                    // number of available threads, once SlowJobPool supports a notion of
89                    // approximating that
90                    let tx = self.tx.clone();
91                    let f = f();
92                    let job = pool.spawn(self.name, move || {
93                        let v = f(&k);
94                        let _ = tx.send((k, v));
95                    });
96                    e.insert(KeyedJobTask::Pending(Instant::now(), Some(job)));
97                    None
98                },
99            }
100        } else {
101            let v = f()(&k);
102            Some((k, v))
103        }
104    }
105}