veloren_network_protocol/
metrics.rs

1use crate::types::Sid;
2#[cfg(feature = "metrics")]
3use prometheus::{
4    IntCounterVec, IntGaugeVec, Opts, Registry,
5    core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
6};
7#[cfg(feature = "metrics")]
8use std::collections::HashMap;
9use std::{error::Error, sync::Arc};
10
11pub enum RemoveReason {
12    Finished,
13    Dropped,
14}
15
16/// Use 1 `ProtocolMetrics` per `Network`.
17/// I will contain all protocol related [`prometheus`] information
18///
19/// [`prometheus`]: prometheus
20#[cfg(feature = "metrics")]
21pub struct ProtocolMetrics {
22    // smsg=send_msg rdata=receive_data
23    // i=in o=out
24    // t=total b=byte throughput
25    //e.g smsg_it = sending messages, in (responsibility of protocol) total
26
27    // based on CHANNEL/STREAM
28    /// messages added to be send total, by STREAM,
29    smsg_it: IntCounterVec,
30    /// messages bytes added  to be send throughput, by STREAM,
31    smsg_ib: IntCounterVec,
32    /// messages removed from to be send, because they where finished total, by
33    /// STREAM AND REASON(finished/canceled),
34    smsg_ot: IntCounterVec,
35    /// messages bytes removed from to be send throughput, because they where
36    /// finished total, by STREAM AND REASON(finished/dropped),
37    smsg_ob: IntCounterVec,
38    /// data frames send by prio by CHANNEL,
39    sdata_frames_t: IntCounterVec,
40    /// data frames bytes send by prio by CHANNEL,
41    sdata_frames_b: IntCounterVec,
42
43    // based on CHANNEL/STREAM
44    /// messages added to be received total, by STREAM,
45    rmsg_it: IntCounterVec,
46    /// messages bytes added to be received throughput, by STREAM,
47    rmsg_ib: IntCounterVec,
48    /// messages removed from to be received, because they where finished total,
49    /// by STREAM AND REASON(finished/canceled),
50    rmsg_ot: IntCounterVec,
51    /// messages bytes removed from to be received throughput, because they
52    /// where finished total, by STREAM AND REASON(finished/dropped),
53    rmsg_ob: IntCounterVec,
54    /// data frames send by prio by CHANNEL,
55    rdata_frames_t: IntCounterVec,
56    /// data frames bytes send by prio by CHANNEL,
57    rdata_frames_b: IntCounterVec,
58    /// ping per CHANNEL //TODO: implement
59    ping: IntGaugeVec,
60}
61
62/// Cache for [`ProtocolMetrics`], more optimized and cleared up after channel
63/// disconnect.
64///
65/// [`ProtocolMetrics`]: crate::ProtocolMetrics
66#[cfg(feature = "metrics")]
67#[derive(Debug, Clone)]
68pub struct ProtocolMetricCache {
69    cid: String,
70    m: Arc<ProtocolMetrics>,
71    cache: HashMap<Sid, CacheLine>,
72    sdata_frames_t: GenericCounter<AtomicU64>,
73    sdata_frames_b: GenericCounter<AtomicU64>,
74    rdata_frames_t: GenericCounter<AtomicU64>,
75    rdata_frames_b: GenericCounter<AtomicU64>,
76    #[expect(dead_code)]
77    ping: GenericGauge<AtomicI64>,
78}
79
80#[cfg(not(feature = "metrics"))]
81#[derive(Debug, Clone)]
82pub struct ProtocolMetricCache {}
83
84#[cfg(feature = "metrics")]
85impl ProtocolMetrics {
86    pub fn new() -> Result<Self, Box<dyn Error>> {
87        let smsg_it = IntCounterVec::new(
88            Opts::new(
89                "send_messages_in_total",
90                "All Messages that are added to this Protocol to be send at stream level",
91            ),
92            &["channel", "stream"],
93        )?;
94        let smsg_ib = IntCounterVec::new(
95            Opts::new(
96                "send_messages_in_throughput",
97                "All Message bytes that are added to this Protocol to be send at stream level",
98            ),
99            &["channel", "stream"],
100        )?;
101        let smsg_ot = IntCounterVec::new(
102            Opts::new(
103                "send_messages_out_total",
104                "All Messages that are removed from this Protocol to be send at stream and \
105                 reason(finished/canceled) level",
106            ),
107            &["channel", "stream", "reason"],
108        )?;
109        let smsg_ob = IntCounterVec::new(
110            Opts::new(
111                "send_messages_out_throughput",
112                "All Message bytes that are removed from this Protocol to be send at stream and \
113                 reason(finished/canceled) level",
114            ),
115            &["channel", "stream", "reason"],
116        )?;
117        let sdata_frames_t = IntCounterVec::new(
118            Opts::new(
119                "send_data_frames_total",
120                "Number of data frames send per channel",
121            ),
122            &["channel"],
123        )?;
124        let sdata_frames_b = IntCounterVec::new(
125            Opts::new(
126                "send_data_frames_throughput",
127                "Number of data frames bytes send per channel",
128            ),
129            &["channel"],
130        )?;
131
132        let rmsg_it = IntCounterVec::new(
133            Opts::new(
134                "recv_messages_in_total",
135                "All Messages that are added to this Protocol to be received at stream level",
136            ),
137            &["channel", "stream"],
138        )?;
139        let rmsg_ib = IntCounterVec::new(
140            Opts::new(
141                "recv_messages_in_throughput",
142                "All Message bytes that are added to this Protocol to be received at stream level",
143            ),
144            &["channel", "stream"],
145        )?;
146        let rmsg_ot = IntCounterVec::new(
147            Opts::new(
148                "recv_messages_out_total",
149                "All Messages that are removed from this Protocol to be received at stream and \
150                 reason(finished/canceled) level",
151            ),
152            &["channel", "stream", "reason"],
153        )?;
154        let rmsg_ob = IntCounterVec::new(
155            Opts::new(
156                "recv_messages_out_throughput",
157                "All Message bytes that are removed from this Protocol to be received at stream \
158                 and reason(finished/canceled) level",
159            ),
160            &["channel", "stream", "reason"],
161        )?;
162        let rdata_frames_t = IntCounterVec::new(
163            Opts::new(
164                "recv_data_frames_total",
165                "Number of data frames received per channel",
166            ),
167            &["channel"],
168        )?;
169        let rdata_frames_b = IntCounterVec::new(
170            Opts::new(
171                "recv_data_frames_throughput",
172                "Number of data frames bytes received per channel",
173            ),
174            &["channel"],
175        )?;
176        let ping = IntGaugeVec::new(Opts::new("ping", "Ping per channel"), &["channel"])?;
177
178        Ok(Self {
179            smsg_it,
180            smsg_ib,
181            smsg_ot,
182            smsg_ob,
183            sdata_frames_t,
184            sdata_frames_b,
185            rmsg_it,
186            rmsg_ib,
187            rmsg_ot,
188            rmsg_ob,
189            rdata_frames_t,
190            rdata_frames_b,
191            ping,
192        })
193    }
194
195    pub fn register(&self, registry: &Registry) -> Result<(), Box<dyn Error>> {
196        registry.register(Box::new(self.smsg_it.clone()))?;
197        registry.register(Box::new(self.smsg_ib.clone()))?;
198        registry.register(Box::new(self.smsg_ot.clone()))?;
199        registry.register(Box::new(self.smsg_ob.clone()))?;
200        registry.register(Box::new(self.sdata_frames_t.clone()))?;
201        registry.register(Box::new(self.sdata_frames_b.clone()))?;
202        registry.register(Box::new(self.rmsg_it.clone()))?;
203        registry.register(Box::new(self.rmsg_ib.clone()))?;
204        registry.register(Box::new(self.rmsg_ot.clone()))?;
205        registry.register(Box::new(self.rmsg_ob.clone()))?;
206        registry.register(Box::new(self.rdata_frames_t.clone()))?;
207        registry.register(Box::new(self.rdata_frames_b.clone()))?;
208        registry.register(Box::new(self.ping.clone()))?;
209        Ok(())
210    }
211}
212
213#[cfg(not(feature = "metrics"))]
214pub struct ProtocolMetrics {}
215
216#[cfg(feature = "metrics")]
217#[derive(Debug, Clone)]
218pub(crate) struct CacheLine {
219    pub smsg_it: GenericCounter<AtomicU64>,
220    pub smsg_ib: GenericCounter<AtomicU64>,
221    pub smsg_ot: [GenericCounter<AtomicU64>; 2],
222    pub smsg_ob: [GenericCounter<AtomicU64>; 2],
223    pub rmsg_it: GenericCounter<AtomicU64>,
224    pub rmsg_ib: GenericCounter<AtomicU64>,
225    pub rmsg_ot: [GenericCounter<AtomicU64>; 2],
226    pub rmsg_ob: [GenericCounter<AtomicU64>; 2],
227}
228
229#[cfg(feature = "metrics")]
230impl ProtocolMetricCache {
231    pub fn new(channel_key: &str, metrics: Arc<ProtocolMetrics>) -> Self {
232        let cid = channel_key.to_string();
233        let sdata_frames_t = metrics.sdata_frames_t.with_label_values(&[&cid]);
234        let sdata_frames_b = metrics.sdata_frames_b.with_label_values(&[&cid]);
235        let rdata_frames_t = metrics.rdata_frames_t.with_label_values(&[&cid]);
236        let rdata_frames_b = metrics.rdata_frames_b.with_label_values(&[&cid]);
237        let ping = metrics.ping.with_label_values(&[&cid]);
238        Self {
239            cid,
240            m: metrics,
241            cache: HashMap::new(),
242            sdata_frames_t,
243            sdata_frames_b,
244            rdata_frames_t,
245            rdata_frames_b,
246            ping,
247        }
248    }
249
250    pub(crate) fn init_sid(&mut self, sid: Sid) -> &CacheLine {
251        let cid = &self.cid;
252        let m = &self.m;
253        self.cache.entry(sid).or_insert_with_key(|sid| {
254            let s = sid.to_string();
255            let finished = RemoveReason::Finished.to_str();
256            let dropped = RemoveReason::Dropped.to_str();
257            CacheLine {
258                smsg_it: m.smsg_it.with_label_values(&[cid, &s]),
259                smsg_ib: m.smsg_ib.with_label_values(&[cid, &s]),
260                smsg_ot: [
261                    m.smsg_ot.with_label_values(&[cid, &s, finished]),
262                    m.smsg_ot.with_label_values(&[cid, &s, dropped]),
263                ],
264                smsg_ob: [
265                    m.smsg_ob.with_label_values(&[cid, &s, finished]),
266                    m.smsg_ob.with_label_values(&[cid, &s, dropped]),
267                ],
268                rmsg_it: m.rmsg_it.with_label_values(&[cid, &s]),
269                rmsg_ib: m.rmsg_ib.with_label_values(&[cid, &s]),
270                rmsg_ot: [
271                    m.rmsg_ot.with_label_values(&[cid, &s, finished]),
272                    m.rmsg_ot.with_label_values(&[cid, &s, dropped]),
273                ],
274                rmsg_ob: [
275                    m.rmsg_ob.with_label_values(&[cid, &s, finished]),
276                    m.rmsg_ob.with_label_values(&[cid, &s, dropped]),
277                ],
278            }
279        })
280    }
281
282    pub(crate) fn smsg_ib(&mut self, sid: Sid, bytes: u64) {
283        let line = self.init_sid(sid);
284        line.smsg_it.inc();
285        line.smsg_ib.inc_by(bytes);
286    }
287
288    pub(crate) fn smsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) {
289        let line = self.init_sid(sid);
290        line.smsg_ot[reason.i()].inc();
291        line.smsg_ob[reason.i()].inc_by(bytes);
292    }
293
294    pub(crate) fn sdata_frames_b(&mut self, cnt: u64, bytes: u64) {
295        self.sdata_frames_t.inc_by(cnt);
296        self.sdata_frames_b.inc_by(bytes);
297    }
298
299    pub(crate) fn rmsg_ib(&mut self, sid: Sid, bytes: u64) {
300        let line = self.init_sid(sid);
301        line.rmsg_it.inc();
302        line.rmsg_ib.inc_by(bytes);
303    }
304
305    pub(crate) fn rmsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) {
306        let line = self.init_sid(sid);
307        line.rmsg_ot[reason.i()].inc();
308        line.rmsg_ob[reason.i()].inc_by(bytes);
309    }
310
311    pub(crate) fn rdata_frames_b(&mut self, bytes: u64) {
312        self.rdata_frames_t.inc();
313        self.rdata_frames_b.inc_by(bytes);
314    }
315
316    #[cfg(test)]
317    pub(crate) fn assert_msg(&mut self, sid: Sid, cnt: u64, reason: RemoveReason) {
318        let line = self.init_sid(sid);
319        assert_eq!(line.smsg_it.get(), cnt);
320        assert_eq!(line.smsg_ot[reason.i()].get(), cnt);
321        assert_eq!(line.rmsg_it.get(), cnt);
322        assert_eq!(line.rmsg_ot[reason.i()].get(), cnt);
323    }
324
325    #[cfg(test)]
326    pub(crate) fn assert_msg_bytes(&mut self, sid: Sid, bytes: u64, reason: RemoveReason) {
327        let line = self.init_sid(sid);
328        assert_eq!(line.smsg_ib.get(), bytes);
329        assert_eq!(line.smsg_ob[reason.i()].get(), bytes);
330        assert_eq!(line.rmsg_ib.get(), bytes);
331        assert_eq!(line.rmsg_ob[reason.i()].get(), bytes);
332    }
333
334    #[cfg(test)]
335    pub(crate) fn assert_data_frames(&mut self, cnt: u64) {
336        assert_eq!(self.sdata_frames_t.get(), cnt);
337        assert_eq!(self.rdata_frames_t.get(), cnt);
338    }
339
340    #[cfg(test)]
341    pub(crate) fn assert_data_frames_bytes(&mut self, bytes: u64) {
342        assert_eq!(self.sdata_frames_b.get(), bytes);
343        assert_eq!(self.rdata_frames_b.get(), bytes);
344    }
345}
346
347#[cfg(feature = "metrics")]
348impl Drop for ProtocolMetricCache {
349    fn drop(&mut self) {
350        let cid = &self.cid;
351        let m = &self.m;
352        let finished = RemoveReason::Finished.to_str();
353        let dropped = RemoveReason::Dropped.to_str();
354        for (sid, _) in self.cache.drain() {
355            let s = sid.to_string();
356            let _ = m.smsg_it.remove_label_values(&[cid, &s]);
357            let _ = m.smsg_ib.remove_label_values(&[cid, &s]);
358            let _ = m.smsg_ot.remove_label_values(&[cid, &s, finished]);
359            let _ = m.smsg_ot.remove_label_values(&[cid, &s, dropped]);
360            let _ = m.smsg_ob.remove_label_values(&[cid, &s, finished]);
361            let _ = m.smsg_ob.remove_label_values(&[cid, &s, dropped]);
362            let _ = m.rmsg_it.remove_label_values(&[cid, &s]);
363            let _ = m.rmsg_ib.remove_label_values(&[cid, &s]);
364            let _ = m.rmsg_ot.remove_label_values(&[cid, &s, finished]);
365            let _ = m.rmsg_ot.remove_label_values(&[cid, &s, dropped]);
366            let _ = m.rmsg_ob.remove_label_values(&[cid, &s, finished]);
367            let _ = m.rmsg_ob.remove_label_values(&[cid, &s, dropped]);
368        }
369        let _ = m.ping.remove_label_values(&[cid]);
370        let _ = m.sdata_frames_t.remove_label_values(&[cid]);
371        let _ = m.sdata_frames_b.remove_label_values(&[cid]);
372        let _ = m.rdata_frames_t.remove_label_values(&[cid]);
373        let _ = m.rdata_frames_b.remove_label_values(&[cid]);
374    }
375}
376
377#[cfg(feature = "metrics")]
378impl std::fmt::Debug for ProtocolMetrics {
379    #[inline]
380    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381        write!(f, "ProtocolMetrics()")
382    }
383}
384
385#[cfg(not(feature = "metrics"))]
386impl ProtocolMetricCache {
387    pub fn new(_channel_key: &str, _metrics: Arc<ProtocolMetrics>) -> Self { Self {} }
388
389    pub(crate) fn smsg_ib(&mut self, _sid: Sid, _b: u64) {}
390
391    pub(crate) fn smsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
392
393    pub(crate) fn sdata_frames_b(&mut self, _cnt: u64, _b: u64) {}
394
395    pub(crate) fn rmsg_ib(&mut self, _sid: Sid, _b: u64) {}
396
397    pub(crate) fn rmsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
398
399    pub(crate) fn rdata_frames_b(&mut self, _b: u64) {}
400}
401
402#[cfg(not(feature = "metrics"))]
403impl ProtocolMetrics {
404    pub fn new() -> Result<Self, Box<dyn Error>> { Ok(Self {}) }
405}
406
407impl RemoveReason {
408    #[cfg(feature = "metrics")]
409    fn to_str(&self) -> &str {
410        match self {
411            RemoveReason::Finished => "Finished",
412            RemoveReason::Dropped => "Dropped",
413        }
414    }
415
416    #[cfg(feature = "metrics")]
417    pub(crate) fn i(&self) -> usize {
418        match self {
419            RemoveReason::Finished => 0,
420            RemoveReason::Dropped => 1,
421        }
422    }
423}