1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use common::slowjob::{SlowJob, SlowJobPool};
use hashbrown::{hash_map::Entry, HashMap};
use std::{
    hash::Hash,
    time::{Duration, Instant},
};

enum KeyedJobTask<V> {
    Pending(Instant, Option<SlowJob>),
    Completed(Instant, V),
}

pub struct KeyedJobs<K, V> {
    tx: crossbeam_channel::Sender<(K, V)>,
    rx: crossbeam_channel::Receiver<(K, V)>,
    tasks: HashMap<K, KeyedJobTask<V>>,
    name: &'static str,
    last_gc: Instant,
}

const KEYEDJOBS_GC_INTERVAL: Duration = Duration::from_secs(1);

impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Send + Sync + 'static> KeyedJobs<K, V> {
    pub fn new(name: &'static str) -> Self {
        let (tx, rx) = crossbeam_channel::unbounded();
        Self {
            tx,
            rx,
            tasks: HashMap::new(),
            name,
            last_gc: Instant::now(),
        }
    }

    /// Spawn a task on a specified threadpool. The function is given as a thunk
    /// so that if work is needed to create captured variables (e.g.
    /// `Arc::clone`), that only occurs if the task hasn't yet been scheduled.
    pub fn spawn<F: FnOnce(&K) -> V + Send + Sync + 'static>(
        &mut self,
        pool: Option<&SlowJobPool>,
        k: K,
        f: impl FnOnce() -> F,
    ) -> Option<(K, V)> {
        if let Some(pool) = pool {
            while let Ok((k2, v)) = self.rx.try_recv() {
                if k == k2 {
                    return Some((k, v));
                } else {
                    self.tasks
                        .insert(k2, KeyedJobTask::Completed(Instant::now(), v));
                }
            }
            let now = Instant::now();
            if now - self.last_gc > KEYEDJOBS_GC_INTERVAL {
                self.last_gc = now;
                self.tasks.retain(|_, task| match task {
                    KeyedJobTask::Completed(at, _) => now - *at < KEYEDJOBS_GC_INTERVAL,
                    KeyedJobTask::Pending(at, job) => {
                        let fresh = now - *at < KEYEDJOBS_GC_INTERVAL;
                        if !fresh {
                            if let Some(job) = job.take() {
                                // Cancelling a job only fails if the job doesn't exist anymore,
                                // which means that it completed while we tried to GC its pending
                                // struct, which means that we'll GC it in the next cycle, so ignore
                                // the error in this collection.
                                let _ = pool.cancel(job);
                            }
                        }
                        fresh
                    },
                });
            }
            match self.tasks.entry(k.clone()) {
                Entry::Occupied(e) => {
                    let mut ret = None;
                    e.replace_entry_with(|_, v| {
                        if let KeyedJobTask::Completed(_, v) = v {
                            ret = Some((k, v));
                            None
                        } else {
                            Some(v)
                        }
                    });
                    ret
                },
                Entry::Vacant(e) => {
                    // TODO: consider adding a limit to the number of submitted jobs based on the
                    // number of available threads, once SlowJobPool supports a notion of
                    // approximating that
                    let tx = self.tx.clone();
                    let f = f();
                    let job = pool.spawn(self.name, move || {
                        let v = f(&k);
                        let _ = tx.send((k, v));
                    });
                    e.insert(KeyedJobTask::Pending(Instant::now(), Some(job)));
                    None
                },
            }
        } else {
            let v = f()(&k);
            Some((k, v))
        }
    }
}