veloren_common_ecs/
system.rs

1use crate::metrics::SysMetrics;
2use specs::{ReadExpect, RunNow};
3use std::{collections::HashMap, time::Instant};
4
5/// measuring the level of threads a unit of code ran on. Use Rayon when it ran
6/// on their threadpool. Use Exact when you know on how many threads your code
7/// ran on exactly.
8#[derive(Clone, Copy, PartialEq, Eq, Debug)]
9pub enum ParMode {
10    None, /* Job is not running at all */
11    Single,
12    Rayon,
13    Exact(u32),
14}
15
16//TODO: make use of the phase of a system for advanced scheduling and logging
17#[derive(Clone, Copy, PartialEq, Eq, Debug)]
18pub enum Phase {
19    Create,
20    Review,
21    Apply,
22}
23
24//TODO: make use of the origin of the system for better logging
25#[derive(Clone, PartialEq, Eq, Debug)]
26pub enum Origin {
27    Common,
28    Client,
29    Server,
30    Frontend(&'static str),
31}
32
33impl Origin {
34    fn name(&self) -> &'static str {
35        match self {
36            Origin::Common => "Common",
37            Origin::Client => "Client",
38            Origin::Server => "Server",
39            Origin::Frontend(name) => name,
40        }
41    }
42}
43
44#[derive(Default, Debug, Clone)]
45pub struct CpuTimeline {
46    /// measurements for a System
47    /// - The first entry will always be ParMode::Single, as when the
48    ///   System::run is executed, we run single threaded until we start a
49    ///   Rayon::ParIter or similar
50    /// - The last entry will contain the end time of the System. To mark the
51    ///   End it will always contain ParMode::None, which means from that point
52    ///   on 0 CPU threads work in this system
53    measures: Vec<(Instant, ParMode)>,
54}
55
56#[derive(Default)]
57pub struct CpuTimeStats {
58    /// the first entry will always be 0, the last entry will always be `dt`
59    /// `usage` starting from `ns`
60    measures: Vec<(/* ns */ u64, /* usage */ f32)>,
61}
62
63/// Parallel Mode tells us how much you are scaling. `None` means your code
64/// isn't running. `Single` means you are running single threaded.
65/// `Rayon` means you are running on the rayon threadpool.
66impl ParMode {
67    fn threads(&self, rayon_threads: u32) -> u32 {
68        match self {
69            ParMode::None => 0,
70            ParMode::Single => 1,
71            ParMode::Rayon => rayon_threads,
72            ParMode::Exact(u) => *u,
73        }
74    }
75}
76
77impl CpuTimeline {
78    fn reset(&mut self) {
79        self.measures.clear();
80        self.measures.push((Instant::now(), ParMode::Single));
81    }
82
83    /// Start a new measurement. par will be covering the parallelisation AFTER
84    /// this statement, till the next / end of the System.
85    pub fn measure(&mut self, par: ParMode) { self.measures.push((Instant::now(), par)); }
86
87    fn end(&mut self) -> std::time::Duration {
88        let end = Instant::now();
89        self.measures.push((end, ParMode::None));
90        end.duration_since(
91            self.measures
92                .first()
93                .expect("We just pushed onto the vector.")
94                .0,
95        )
96    }
97
98    fn get(&self, time: Instant) -> ParMode {
99        match self.measures.binary_search_by_key(&time, |&(a, _)| a) {
100            Ok(id) => self.measures[id].1,
101            Err(0) => ParMode::None, /* not yet started */
102            Err(id) => self.measures[id - 1].1,
103        }
104    }
105}
106
107impl CpuTimeStats {
108    pub fn length_ns(&self) -> u64 { self.end_ns() - self.start_ns() }
109
110    pub fn start_ns(&self) -> u64 {
111        self.measures
112            .iter()
113            .find(|e| e.1 > 0.001)
114            .unwrap_or(&(0, 0.0))
115            .0
116    }
117
118    pub fn end_ns(&self) -> u64 { self.measures.last().unwrap_or(&(0, 0.0)).0 }
119
120    pub fn avg_threads(&self) -> f32 {
121        let mut sum = 0.0;
122        for w in self.measures.windows(2) {
123            let len = w[1].0 - w[0].0;
124            let h = w[0].1;
125            sum += len as f32 * h;
126        }
127        sum / (self.length_ns() as f32)
128    }
129}
130
131/// The Idea is to transform individual timelines per system to a map of all
132/// cores and what they (prob) are working on.
133///
134/// # Example
135///
136/// - Input: 3 services, 0 and 1 are 100% parallel and 2 is single threaded. `-`
137///   means no work for *0.5s*. `#` means full work for *0.5s*. We see the first
138///   service starts after 1s and runs for 3s The second one starts a sec later
139///   and runs for 4s. The last service runs 2.5s after the tick start and runs
140///   for 1s. Read left to right.
141/// ```ignore
142/// [--######------]
143/// [----########--]
144/// [-----##-------]
145/// ```
146///
147/// - Output: a Map that calculates where our 6 cores are spending their time.
148///   Here each number means 50% of a core is working on it. A '-' represents an
149///   idling core. We start with all 6 cores idling. Then all cores start to
150///   work on task 0. 2s in, task1 starts and we have to split cores. 2.5s in
151///   task2 starts. We have 6 physical threads but work to fill 13. Later task 2
152///   and task 0 will finish their work and give more threads for task 1 to work
153///   on. Read top to bottom
154/// ```ignore
155/// 0-1s     [------------]
156/// 1-2s     [000000000000]
157/// 2-2.5s   [000000111111]
158/// 2.5-3.5s [000001111122]
159/// 3.5-4s   [000000111111]
160/// 4-6s     [111111111111]
161/// 6s..     [------------]
162/// ```
163pub fn gen_stats(
164    timelines: &HashMap<String, CpuTimeline>,
165    tick_work_start: Instant,
166    rayon_threads: u32,
167    physical_threads: u32,
168) -> HashMap<String, CpuTimeStats> {
169    let mut result = HashMap::new();
170    let mut all = timelines
171        .iter()
172        .flat_map(|(s, t)| {
173            let mut stat = CpuTimeStats::default();
174            stat.measures.push((0, 0.0));
175            result.insert(s.clone(), stat);
176            t.measures.iter().map(|e| &e.0)
177        })
178        .collect::<Vec<_>>();
179
180    all.sort();
181    all.dedup();
182    for time in all {
183        let relative_time = time.duration_since(tick_work_start).as_nanos() as u64;
184        // get all parallelisation at this particular time
185        let individual_cores_wanted = timelines
186            .iter()
187            .map(|(k, t)| (k, t.get(*time).threads(rayon_threads)))
188            .collect::<Vec<_>>();
189        let total = individual_cores_wanted
190            .iter()
191            .map(|(_, a)| a)
192            .sum::<u32>()
193            .max(1) as f32;
194        let total_or_max = total.max(physical_threads as f32);
195        // update ALL states
196        for individual in individual_cores_wanted.iter() {
197            let actual = (individual.1 as f32 / total_or_max) * physical_threads as f32;
198            if let Some(p) = result.get_mut(individual.0) {
199                if p.measures
200                    .last()
201                    .map(|last| (last.1 - actual).abs())
202                    .unwrap_or(0.0)
203                    > 0.0001
204                {
205                    p.measures.push((relative_time, actual));
206                }
207            } else {
208                tracing::warn!("Invariant violation: keys in both hashmaps should be the same.");
209            }
210        }
211    }
212    result
213}
214
215/// This trait wraps around specs::System and does additional veloren tasks like
216/// metrics collection
217///
218/// ```
219/// use specs::Read;
220/// pub use veloren_common_ecs::{Job, Origin, ParMode, Phase, System};
221/// # use std::time::Duration;
222/// pub struct Sys;
223/// impl<'a> System<'a> for Sys {
224///     type SystemData = (Read<'a, ()>, Read<'a, ()>);
225///
226///     const NAME: &'static str = "example";
227///     const ORIGIN: Origin = Origin::Frontend("voxygen");
228///     const PHASE: Phase = Phase::Create;
229///
230///     fn run(job: &mut Job<Self>, (_read, _read2): Self::SystemData) {
231///         std::thread::sleep(Duration::from_millis(100));
232///         job.cpu_stats.measure(ParMode::Rayon);
233///         std::thread::sleep(Duration::from_millis(500));
234///         job.cpu_stats.measure(ParMode::Single);
235///         std::thread::sleep(Duration::from_millis(40));
236///     }
237/// }
238/// ```
239pub trait System<'a> {
240    const NAME: &'static str;
241    const PHASE: Phase;
242    const ORIGIN: Origin;
243
244    type SystemData: specs::SystemData<'a>;
245    fn run(job: &mut Job<Self>, data: Self::SystemData);
246    fn sys_name() -> String { format!("{}_{}_sys", Self::ORIGIN.name(), Self::NAME) }
247}
248
249pub fn dispatch<'a, 'b, T>(builder: &mut specs::DispatcherBuilder<'a, 'b>, dep: &[&str])
250where
251    T: for<'c> System<'c> + Send + 'a + Default,
252{
253    builder.add(Job::<T>::default(), &T::sys_name(), dep);
254}
255
256pub fn run_now<'a, 'b, T>(world: &'a specs::World)
257where
258    T: for<'c> System<'c> + Send + 'a + Default,
259{
260    Job::<T>::default().run_now(world);
261}
262
263/// This Struct will wrap the System in order to avoid the can only impl trait
264/// for local defined structs error It also contains the cpu measurements
265pub struct Job<T>
266where
267    T: ?Sized,
268{
269    pub own: Box<T>,
270    pub cpu_stats: CpuTimeline,
271}
272
273impl<'a, T> specs::System<'a> for Job<T>
274where
275    T: System<'a>,
276{
277    type SystemData = (T::SystemData, ReadExpect<'a, SysMetrics>);
278
279    fn run(&mut self, data: Self::SystemData) {
280        common_base::span!(_guard, "run", &format!("{}::Sys::run", T::NAME));
281        self.cpu_stats.reset();
282        T::run(self, data.0);
283        let millis = self.cpu_stats.end().as_millis();
284        let name = T::NAME;
285        if millis > 500 {
286            tracing::warn!(?millis, ?name, "slow system execution");
287        }
288        data.1
289            .stats
290            .lock()
291            .unwrap()
292            .insert(T::NAME.to_string(), self.cpu_stats.clone());
293    }
294}
295
296impl<'a, T> Default for Job<T>
297where
298    T: System<'a> + Default,
299{
300    fn default() -> Self {
301        Self {
302            own: Box::<T>::default(),
303            cpu_stats: CpuTimeline::default(),
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use float_cmp::approx_eq;
312    use std::time::Duration;
313
314    fn mock_timelines(
315        tick_start: Instant,
316        durations: Vec<(u64, u64, ParMode)>,
317    ) -> HashMap<String, CpuTimeline> {
318        let job = durations
319            .iter()
320            .enumerate()
321            .map(|(i, (s, e, p))| {
322                (
323                    i,
324                    tick_start + Duration::from_millis(*s),
325                    tick_start + Duration::from_millis(*e),
326                    *p,
327                )
328            })
329            .collect::<Vec<_>>();
330
331        job.iter()
332            .map(|(i, f, s, p)| {
333                (i.to_string(), CpuTimeline {
334                    measures: vec![(*f, *p), (*s, ParMode::None)],
335                })
336            })
337            .collect()
338    }
339
340    #[test]
341    fn single() {
342        const RAYON_THREADS: u32 = 4;
343        const PHYSICAL_THREADS: u32 = RAYON_THREADS;
344        let tick_start = Instant::now();
345        let job_d = vec![(500, 1500, ParMode::Rayon)];
346        let timelines = mock_timelines(tick_start, job_d);
347
348        let stats = gen_stats(&timelines, tick_start, RAYON_THREADS, PHYSICAL_THREADS);
349
350        const THREADS: f32 = PHYSICAL_THREADS as f32;
351
352        let s = &stats["0"];
353        let measures = &s.measures;
354        assert_eq!(measures.len(), 3);
355        assert_eq!(measures[0].0, 0);
356        assert!(approx_eq!(f32, measures[0].1, 0.0));
357        assert_eq!(measures[1].0, 500000000);
358        assert!(approx_eq!(f32, measures[1].1, THREADS));
359        assert_eq!(measures[2].0, 1500000000);
360        assert!(approx_eq!(f32, measures[2].1, 0.0));
361        assert_eq!(s.start_ns(), 500000000);
362        assert_eq!(s.end_ns(), 1500000000);
363        assert_eq!(s.length_ns(), 1000000000);
364        assert!(approx_eq!(f32, s.avg_threads(), THREADS));
365    }
366
367    #[test]
368    fn two_jobs() {
369        const RAYON_THREADS: u32 = 8;
370        const PHYSICAL_THREADS: u32 = RAYON_THREADS;
371        let tick_start = Instant::now();
372        let job_d = vec![(2000, 3000, ParMode::Single), (5000, 6500, ParMode::Single)];
373        let timelines = mock_timelines(tick_start, job_d);
374
375        let stats = gen_stats(&timelines, tick_start, RAYON_THREADS, PHYSICAL_THREADS);
376
377        let s = &stats["0"];
378        let measures = &s.measures;
379        assert_eq!(measures.len(), 3);
380        assert_eq!(measures[0].0, 0);
381        assert!(approx_eq!(f32, measures[0].1, 0.0));
382        assert_eq!(measures[1].0, 2000000000);
383        assert!(approx_eq!(f32, measures[1].1, 1.0));
384        assert_eq!(measures[2].0, 3000000000);
385        assert!(approx_eq!(f32, measures[2].1, 0.0));
386        assert_eq!(s.start_ns(), 2000000000);
387        assert_eq!(s.end_ns(), 3000000000);
388        assert_eq!(s.length_ns(), 1000000000);
389        assert!(approx_eq!(f32, s.avg_threads(), 1.0));
390
391        let s = &stats["1"];
392        let measures = &s.measures;
393        assert_eq!(measures.len(), 3);
394        assert_eq!(measures[0].0, 0);
395        assert!(approx_eq!(f32, measures[0].1, 0.0));
396        assert_eq!(measures[1].0, 5000000000);
397        assert!(approx_eq!(f32, measures[1].1, 1.0));
398        assert_eq!(measures[2].0, 6500000000);
399        assert!(approx_eq!(f32, measures[2].1, 0.0));
400        assert_eq!(s.start_ns(), 5000000000);
401        assert_eq!(s.end_ns(), 6500000000);
402        assert_eq!(s.length_ns(), 1500000000);
403        assert!(approx_eq!(f32, s.avg_threads(), 1.0));
404    }
405
406    #[test]
407    fn generate_stats() {
408        const RAYON_THREADS: u32 = 6;
409        const PHYSICAL_THREADS: u32 = RAYON_THREADS;
410        let tick_start = Instant::now();
411        let job_d = vec![
412            (2000, 5000, ParMode::Rayon),
413            (3000, 7000, ParMode::Rayon),
414            (3500, 4500, ParMode::Single),
415        ];
416        let timelines = mock_timelines(tick_start, job_d);
417
418        let stats = gen_stats(&timelines, tick_start, RAYON_THREADS, PHYSICAL_THREADS);
419
420        const THREADS: f32 = PHYSICAL_THREADS as f32;
421
422        let s = &stats["0"];
423        let measures = &s.measures;
424        assert_eq!(measures.len(), 6);
425        assert_eq!(measures[0].0, 0);
426        assert!(approx_eq!(f32, measures[0].1, 0.0));
427        assert_eq!(measures[1].0, 2000000000);
428        assert!(approx_eq!(f32, measures[1].1, THREADS));
429        assert_eq!(measures[2].0, 3000000000);
430        assert!(approx_eq!(f32, measures[2].1, THREADS / 2.0));
431        assert_eq!(measures[3].0, 3500000000);
432        assert!(approx_eq!(
433            f32,
434            measures[3].1,
435            THREADS * THREADS / (THREADS * 2.0 + 1.0)
436        ));
437        assert_eq!(measures[4].0, 4500000000);
438        assert!(approx_eq!(f32, measures[4].1, THREADS / 2.0));
439        assert_eq!(measures[5].0, 5000000000);
440        assert!(approx_eq!(f32, measures[5].1, 0.0));
441        assert_eq!(s.start_ns(), 2000000000);
442        assert_eq!(s.end_ns(), 5000000000);
443        assert_eq!(s.length_ns(), 3000000000);
444        assert!(approx_eq!(f32, s.avg_threads(), 3.923077));
445
446        let s = &stats["1"];
447        let measures = &s.measures;
448        assert_eq!(measures.len(), 6);
449        assert_eq!(measures[0].0, 0);
450        assert!(approx_eq!(f32, measures[0].1, 0.0));
451        assert_eq!(measures[1].0, 3000000000);
452        assert!(approx_eq!(f32, measures[1].1, THREADS / 2.0));
453        assert_eq!(measures[2].0, 3500000000);
454        assert!(approx_eq!(
455            f32,
456            measures[2].1,
457            THREADS * THREADS / (THREADS * 2.0 + 1.0)
458        ));
459        assert_eq!(measures[3].0, 4500000000);
460        assert!(approx_eq!(f32, measures[3].1, THREADS / 2.0));
461        assert_eq!(measures[4].0, 5000000000);
462        assert!(approx_eq!(f32, measures[4].1, THREADS));
463        assert_eq!(measures[5].0, 7000000000);
464        assert!(approx_eq!(f32, measures[5].1, 0.0));
465        assert_eq!(s.start_ns(), 3000000000);
466        assert_eq!(s.end_ns(), 7000000000);
467        assert_eq!(s.length_ns(), 4000000000);
468        assert!(approx_eq!(f32, s.avg_threads(), 4.4423075));
469
470        let s = &stats["2"];
471        let measures = &s.measures;
472        assert_eq!(measures.len(), 3);
473        assert_eq!(measures[0].0, 0);
474        assert!(approx_eq!(f32, measures[0].1, 0.0));
475        assert_eq!(measures[1].0, 3500000000);
476        assert!(approx_eq!(
477            f32,
478            measures[1].1,
479            THREADS / (THREADS * 2.0 + 1.0)
480        ));
481        assert_eq!(measures[2].0, 4500000000);
482        assert!(approx_eq!(f32, measures[2].1, 0.0));
483        assert_eq!(s.start_ns(), 3500000000);
484        assert_eq!(s.end_ns(), 4500000000);
485        assert_eq!(s.length_ns(), 1000000000);
486        assert!(approx_eq!(f32, s.avg_threads(), 0.4615385));
487    }
488}