1use crate::metrics::SysMetrics;
2use specs::{ReadExpect, RunNow};
3use std::{collections::HashMap, time::Instant};
4
5#[derive(Clone, Copy, PartialEq, Eq, Debug)]
9pub enum ParMode {
10 None, Single,
12 Rayon,
13 Exact(u32),
14}
15
16#[derive(Clone, Copy, PartialEq, Eq, Debug)]
18pub enum Phase {
19 Create,
20 Review,
21 Apply,
22}
23
24#[derive(Clone, PartialEq, Eq, Debug)]
26pub enum Origin {
27 Common,
28 Client,
29 Server,
30 Frontend(&'static str),
31}
32
33impl Origin {
34 fn name(&self) -> &'static str {
35 match self {
36 Origin::Common => "Common",
37 Origin::Client => "Client",
38 Origin::Server => "Server",
39 Origin::Frontend(name) => name,
40 }
41 }
42}
43
44#[derive(Default, Debug, Clone)]
45pub struct CpuTimeline {
46 measures: Vec<(Instant, ParMode)>,
54}
55
56#[derive(Default)]
57pub struct CpuTimeStats {
58 measures: Vec<(u64, f32)>,
61}
62
63impl ParMode {
67 fn threads(&self, rayon_threads: u32) -> u32 {
68 match self {
69 ParMode::None => 0,
70 ParMode::Single => 1,
71 ParMode::Rayon => rayon_threads,
72 ParMode::Exact(u) => *u,
73 }
74 }
75}
76
77impl CpuTimeline {
78 fn reset(&mut self) {
79 self.measures.clear();
80 self.measures.push((Instant::now(), ParMode::Single));
81 }
82
83 pub fn measure(&mut self, par: ParMode) { self.measures.push((Instant::now(), par)); }
86
87 fn end(&mut self) -> std::time::Duration {
88 let end = Instant::now();
89 self.measures.push((end, ParMode::None));
90 end.duration_since(
91 self.measures
92 .first()
93 .expect("We just pushed onto the vector.")
94 .0,
95 )
96 }
97
98 fn get(&self, time: Instant) -> ParMode {
99 match self.measures.binary_search_by_key(&time, |&(a, _)| a) {
100 Ok(id) => self.measures[id].1,
101 Err(0) => ParMode::None, Err(id) => self.measures[id - 1].1,
103 }
104 }
105}
106
107impl CpuTimeStats {
108 pub fn length_ns(&self) -> u64 { self.end_ns() - self.start_ns() }
109
110 pub fn start_ns(&self) -> u64 {
111 self.measures
112 .iter()
113 .find(|e| e.1 > 0.001)
114 .unwrap_or(&(0, 0.0))
115 .0
116 }
117
118 pub fn end_ns(&self) -> u64 { self.measures.last().unwrap_or(&(0, 0.0)).0 }
119
120 pub fn avg_threads(&self) -> f32 {
121 let mut sum = 0.0;
122 for w in self.measures.windows(2) {
123 let len = w[1].0 - w[0].0;
124 let h = w[0].1;
125 sum += len as f32 * h;
126 }
127 sum / (self.length_ns() as f32)
128 }
129}
130
131pub fn gen_stats(
164 timelines: &HashMap<String, CpuTimeline>,
165 tick_work_start: Instant,
166 rayon_threads: u32,
167 physical_threads: u32,
168) -> HashMap<String, CpuTimeStats> {
169 let mut result = HashMap::new();
170 let mut all = timelines
171 .iter()
172 .flat_map(|(s, t)| {
173 let mut stat = CpuTimeStats::default();
174 stat.measures.push((0, 0.0));
175 result.insert(s.clone(), stat);
176 t.measures.iter().map(|e| &e.0)
177 })
178 .collect::<Vec<_>>();
179
180 all.sort();
181 all.dedup();
182 for time in all {
183 let relative_time = time.duration_since(tick_work_start).as_nanos() as u64;
184 let individual_cores_wanted = timelines
186 .iter()
187 .map(|(k, t)| (k, t.get(*time).threads(rayon_threads)))
188 .collect::<Vec<_>>();
189 let total = individual_cores_wanted
190 .iter()
191 .map(|(_, a)| a)
192 .sum::<u32>()
193 .max(1) as f32;
194 let total_or_max = total.max(physical_threads as f32);
195 for individual in individual_cores_wanted.iter() {
197 let actual = (individual.1 as f32 / total_or_max) * physical_threads as f32;
198 if let Some(p) = result.get_mut(individual.0) {
199 if p.measures
200 .last()
201 .map(|last| (last.1 - actual).abs())
202 .unwrap_or(0.0)
203 > 0.0001
204 {
205 p.measures.push((relative_time, actual));
206 }
207 } else {
208 tracing::warn!("Invariant violation: keys in both hashmaps should be the same.");
209 }
210 }
211 }
212 result
213}
214
215pub trait System<'a> {
240 const NAME: &'static str;
241 const PHASE: Phase;
242 const ORIGIN: Origin;
243
244 type SystemData: specs::SystemData<'a>;
245 fn run(job: &mut Job<Self>, data: Self::SystemData);
246 fn sys_name() -> String { format!("{}_{}_sys", Self::ORIGIN.name(), Self::NAME) }
247}
248
249pub fn dispatch<'a, 'b, T>(builder: &mut specs::DispatcherBuilder<'a, 'b>, dep: &[&str])
250where
251 T: for<'c> System<'c> + Send + 'a + Default,
252{
253 builder.add(Job::<T>::default(), &T::sys_name(), dep);
254}
255
256pub fn run_now<'a, 'b, T>(world: &'a specs::World)
257where
258 T: for<'c> System<'c> + Send + 'a + Default,
259{
260 Job::<T>::default().run_now(world);
261}
262
263pub struct Job<T>
266where
267 T: ?Sized,
268{
269 pub own: Box<T>,
270 pub cpu_stats: CpuTimeline,
271}
272
273impl<'a, T> specs::System<'a> for Job<T>
274where
275 T: System<'a>,
276{
277 type SystemData = (T::SystemData, ReadExpect<'a, SysMetrics>);
278
279 fn run(&mut self, data: Self::SystemData) {
280 common_base::span!(_guard, "run", &format!("{}::Sys::run", T::NAME));
281 self.cpu_stats.reset();
282 T::run(self, data.0);
283 let millis = self.cpu_stats.end().as_millis();
284 let name = T::NAME;
285 if millis > 500 {
286 tracing::warn!(?millis, ?name, "slow system execution");
287 }
288 data.1
289 .stats
290 .lock()
291 .unwrap()
292 .insert(T::NAME.to_string(), self.cpu_stats.clone());
293 }
294}
295
296impl<'a, T> Default for Job<T>
297where
298 T: System<'a> + Default,
299{
300 fn default() -> Self {
301 Self {
302 own: Box::<T>::default(),
303 cpu_stats: CpuTimeline::default(),
304 }
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use float_cmp::approx_eq;
312 use std::time::Duration;
313
314 fn mock_timelines(
315 tick_start: Instant,
316 durations: Vec<(u64, u64, ParMode)>,
317 ) -> HashMap<String, CpuTimeline> {
318 let job = durations
319 .iter()
320 .enumerate()
321 .map(|(i, (s, e, p))| {
322 (
323 i,
324 tick_start + Duration::from_millis(*s),
325 tick_start + Duration::from_millis(*e),
326 *p,
327 )
328 })
329 .collect::<Vec<_>>();
330
331 job.iter()
332 .map(|(i, f, s, p)| {
333 (i.to_string(), CpuTimeline {
334 measures: vec![(*f, *p), (*s, ParMode::None)],
335 })
336 })
337 .collect()
338 }
339
340 #[test]
341 fn single() {
342 const RAYON_THREADS: u32 = 4;
343 const PHYSICAL_THREADS: u32 = RAYON_THREADS;
344 let tick_start = Instant::now();
345 let job_d = vec![(500, 1500, ParMode::Rayon)];
346 let timelines = mock_timelines(tick_start, job_d);
347
348 let stats = gen_stats(&timelines, tick_start, RAYON_THREADS, PHYSICAL_THREADS);
349
350 const THREADS: f32 = PHYSICAL_THREADS as f32;
351
352 let s = &stats["0"];
353 let measures = &s.measures;
354 assert_eq!(measures.len(), 3);
355 assert_eq!(measures[0].0, 0);
356 assert!(approx_eq!(f32, measures[0].1, 0.0));
357 assert_eq!(measures[1].0, 500000000);
358 assert!(approx_eq!(f32, measures[1].1, THREADS));
359 assert_eq!(measures[2].0, 1500000000);
360 assert!(approx_eq!(f32, measures[2].1, 0.0));
361 assert_eq!(s.start_ns(), 500000000);
362 assert_eq!(s.end_ns(), 1500000000);
363 assert_eq!(s.length_ns(), 1000000000);
364 assert!(approx_eq!(f32, s.avg_threads(), THREADS));
365 }
366
367 #[test]
368 fn two_jobs() {
369 const RAYON_THREADS: u32 = 8;
370 const PHYSICAL_THREADS: u32 = RAYON_THREADS;
371 let tick_start = Instant::now();
372 let job_d = vec![(2000, 3000, ParMode::Single), (5000, 6500, ParMode::Single)];
373 let timelines = mock_timelines(tick_start, job_d);
374
375 let stats = gen_stats(&timelines, tick_start, RAYON_THREADS, PHYSICAL_THREADS);
376
377 let s = &stats["0"];
378 let measures = &s.measures;
379 assert_eq!(measures.len(), 3);
380 assert_eq!(measures[0].0, 0);
381 assert!(approx_eq!(f32, measures[0].1, 0.0));
382 assert_eq!(measures[1].0, 2000000000);
383 assert!(approx_eq!(f32, measures[1].1, 1.0));
384 assert_eq!(measures[2].0, 3000000000);
385 assert!(approx_eq!(f32, measures[2].1, 0.0));
386 assert_eq!(s.start_ns(), 2000000000);
387 assert_eq!(s.end_ns(), 3000000000);
388 assert_eq!(s.length_ns(), 1000000000);
389 assert!(approx_eq!(f32, s.avg_threads(), 1.0));
390
391 let s = &stats["1"];
392 let measures = &s.measures;
393 assert_eq!(measures.len(), 3);
394 assert_eq!(measures[0].0, 0);
395 assert!(approx_eq!(f32, measures[0].1, 0.0));
396 assert_eq!(measures[1].0, 5000000000);
397 assert!(approx_eq!(f32, measures[1].1, 1.0));
398 assert_eq!(measures[2].0, 6500000000);
399 assert!(approx_eq!(f32, measures[2].1, 0.0));
400 assert_eq!(s.start_ns(), 5000000000);
401 assert_eq!(s.end_ns(), 6500000000);
402 assert_eq!(s.length_ns(), 1500000000);
403 assert!(approx_eq!(f32, s.avg_threads(), 1.0));
404 }
405
406 #[test]
407 fn generate_stats() {
408 const RAYON_THREADS: u32 = 6;
409 const PHYSICAL_THREADS: u32 = RAYON_THREADS;
410 let tick_start = Instant::now();
411 let job_d = vec![
412 (2000, 5000, ParMode::Rayon),
413 (3000, 7000, ParMode::Rayon),
414 (3500, 4500, ParMode::Single),
415 ];
416 let timelines = mock_timelines(tick_start, job_d);
417
418 let stats = gen_stats(&timelines, tick_start, RAYON_THREADS, PHYSICAL_THREADS);
419
420 const THREADS: f32 = PHYSICAL_THREADS as f32;
421
422 let s = &stats["0"];
423 let measures = &s.measures;
424 assert_eq!(measures.len(), 6);
425 assert_eq!(measures[0].0, 0);
426 assert!(approx_eq!(f32, measures[0].1, 0.0));
427 assert_eq!(measures[1].0, 2000000000);
428 assert!(approx_eq!(f32, measures[1].1, THREADS));
429 assert_eq!(measures[2].0, 3000000000);
430 assert!(approx_eq!(f32, measures[2].1, THREADS / 2.0));
431 assert_eq!(measures[3].0, 3500000000);
432 assert!(approx_eq!(
433 f32,
434 measures[3].1,
435 THREADS * THREADS / (THREADS * 2.0 + 1.0)
436 ));
437 assert_eq!(measures[4].0, 4500000000);
438 assert!(approx_eq!(f32, measures[4].1, THREADS / 2.0));
439 assert_eq!(measures[5].0, 5000000000);
440 assert!(approx_eq!(f32, measures[5].1, 0.0));
441 assert_eq!(s.start_ns(), 2000000000);
442 assert_eq!(s.end_ns(), 5000000000);
443 assert_eq!(s.length_ns(), 3000000000);
444 assert!(approx_eq!(f32, s.avg_threads(), 3.923077));
445
446 let s = &stats["1"];
447 let measures = &s.measures;
448 assert_eq!(measures.len(), 6);
449 assert_eq!(measures[0].0, 0);
450 assert!(approx_eq!(f32, measures[0].1, 0.0));
451 assert_eq!(measures[1].0, 3000000000);
452 assert!(approx_eq!(f32, measures[1].1, THREADS / 2.0));
453 assert_eq!(measures[2].0, 3500000000);
454 assert!(approx_eq!(
455 f32,
456 measures[2].1,
457 THREADS * THREADS / (THREADS * 2.0 + 1.0)
458 ));
459 assert_eq!(measures[3].0, 4500000000);
460 assert!(approx_eq!(f32, measures[3].1, THREADS / 2.0));
461 assert_eq!(measures[4].0, 5000000000);
462 assert!(approx_eq!(f32, measures[4].1, THREADS));
463 assert_eq!(measures[5].0, 7000000000);
464 assert!(approx_eq!(f32, measures[5].1, 0.0));
465 assert_eq!(s.start_ns(), 3000000000);
466 assert_eq!(s.end_ns(), 7000000000);
467 assert_eq!(s.length_ns(), 4000000000);
468 assert!(approx_eq!(f32, s.avg_threads(), 4.4423075));
469
470 let s = &stats["2"];
471 let measures = &s.measures;
472 assert_eq!(measures.len(), 3);
473 assert_eq!(measures[0].0, 0);
474 assert!(approx_eq!(f32, measures[0].1, 0.0));
475 assert_eq!(measures[1].0, 3500000000);
476 assert!(approx_eq!(
477 f32,
478 measures[1].1,
479 THREADS / (THREADS * 2.0 + 1.0)
480 ));
481 assert_eq!(measures[2].0, 4500000000);
482 assert!(approx_eq!(f32, measures[2].1, 0.0));
483 assert_eq!(s.start_ns(), 3500000000);
484 assert_eq!(s.end_ns(), 4500000000);
485 assert_eq!(s.length_ns(), 1000000000);
486 assert!(approx_eq!(f32, s.avg_threads(), 0.4615385));
487 }
488}