1use crate::types::Sid;
2#[cfg(feature = "metrics")]
3use prometheus::{
4 IntCounterVec, IntGaugeVec, Opts, Registry,
5 core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
6};
7#[cfg(feature = "metrics")]
8use std::collections::HashMap;
9use std::{error::Error, sync::Arc};
10
11pub enum RemoveReason {
12 Finished,
13 Dropped,
14}
15
16#[cfg(feature = "metrics")]
21pub struct ProtocolMetrics {
22 smsg_it: IntCounterVec,
30 smsg_ib: IntCounterVec,
32 smsg_ot: IntCounterVec,
35 smsg_ob: IntCounterVec,
38 sdata_frames_t: IntCounterVec,
40 sdata_frames_b: IntCounterVec,
42
43 rmsg_it: IntCounterVec,
46 rmsg_ib: IntCounterVec,
48 rmsg_ot: IntCounterVec,
51 rmsg_ob: IntCounterVec,
54 rdata_frames_t: IntCounterVec,
56 rdata_frames_b: IntCounterVec,
58 ping: IntGaugeVec,
60}
61
62#[cfg(feature = "metrics")]
67#[derive(Debug, Clone)]
68pub struct ProtocolMetricCache {
69 cid: String,
70 m: Arc<ProtocolMetrics>,
71 cache: HashMap<Sid, CacheLine>,
72 sdata_frames_t: GenericCounter<AtomicU64>,
73 sdata_frames_b: GenericCounter<AtomicU64>,
74 rdata_frames_t: GenericCounter<AtomicU64>,
75 rdata_frames_b: GenericCounter<AtomicU64>,
76 #[expect(dead_code)]
77 ping: GenericGauge<AtomicI64>,
78}
79
80#[cfg(not(feature = "metrics"))]
81#[derive(Debug, Clone)]
82pub struct ProtocolMetricCache {}
83
84#[cfg(feature = "metrics")]
85impl ProtocolMetrics {
86 pub fn new() -> Result<Self, Box<dyn Error>> {
87 let smsg_it = IntCounterVec::new(
88 Opts::new(
89 "send_messages_in_total",
90 "All Messages that are added to this Protocol to be send at stream level",
91 ),
92 &["channel", "stream"],
93 )?;
94 let smsg_ib = IntCounterVec::new(
95 Opts::new(
96 "send_messages_in_throughput",
97 "All Message bytes that are added to this Protocol to be send at stream level",
98 ),
99 &["channel", "stream"],
100 )?;
101 let smsg_ot = IntCounterVec::new(
102 Opts::new(
103 "send_messages_out_total",
104 "All Messages that are removed from this Protocol to be send at stream and \
105 reason(finished/canceled) level",
106 ),
107 &["channel", "stream", "reason"],
108 )?;
109 let smsg_ob = IntCounterVec::new(
110 Opts::new(
111 "send_messages_out_throughput",
112 "All Message bytes that are removed from this Protocol to be send at stream and \
113 reason(finished/canceled) level",
114 ),
115 &["channel", "stream", "reason"],
116 )?;
117 let sdata_frames_t = IntCounterVec::new(
118 Opts::new(
119 "send_data_frames_total",
120 "Number of data frames send per channel",
121 ),
122 &["channel"],
123 )?;
124 let sdata_frames_b = IntCounterVec::new(
125 Opts::new(
126 "send_data_frames_throughput",
127 "Number of data frames bytes send per channel",
128 ),
129 &["channel"],
130 )?;
131
132 let rmsg_it = IntCounterVec::new(
133 Opts::new(
134 "recv_messages_in_total",
135 "All Messages that are added to this Protocol to be received at stream level",
136 ),
137 &["channel", "stream"],
138 )?;
139 let rmsg_ib = IntCounterVec::new(
140 Opts::new(
141 "recv_messages_in_throughput",
142 "All Message bytes that are added to this Protocol to be received at stream level",
143 ),
144 &["channel", "stream"],
145 )?;
146 let rmsg_ot = IntCounterVec::new(
147 Opts::new(
148 "recv_messages_out_total",
149 "All Messages that are removed from this Protocol to be received at stream and \
150 reason(finished/canceled) level",
151 ),
152 &["channel", "stream", "reason"],
153 )?;
154 let rmsg_ob = IntCounterVec::new(
155 Opts::new(
156 "recv_messages_out_throughput",
157 "All Message bytes that are removed from this Protocol to be received at stream \
158 and reason(finished/canceled) level",
159 ),
160 &["channel", "stream", "reason"],
161 )?;
162 let rdata_frames_t = IntCounterVec::new(
163 Opts::new(
164 "recv_data_frames_total",
165 "Number of data frames received per channel",
166 ),
167 &["channel"],
168 )?;
169 let rdata_frames_b = IntCounterVec::new(
170 Opts::new(
171 "recv_data_frames_throughput",
172 "Number of data frames bytes received per channel",
173 ),
174 &["channel"],
175 )?;
176 let ping = IntGaugeVec::new(Opts::new("ping", "Ping per channel"), &["channel"])?;
177
178 Ok(Self {
179 smsg_it,
180 smsg_ib,
181 smsg_ot,
182 smsg_ob,
183 sdata_frames_t,
184 sdata_frames_b,
185 rmsg_it,
186 rmsg_ib,
187 rmsg_ot,
188 rmsg_ob,
189 rdata_frames_t,
190 rdata_frames_b,
191 ping,
192 })
193 }
194
195 pub fn register(&self, registry: &Registry) -> Result<(), Box<dyn Error>> {
196 registry.register(Box::new(self.smsg_it.clone()))?;
197 registry.register(Box::new(self.smsg_ib.clone()))?;
198 registry.register(Box::new(self.smsg_ot.clone()))?;
199 registry.register(Box::new(self.smsg_ob.clone()))?;
200 registry.register(Box::new(self.sdata_frames_t.clone()))?;
201 registry.register(Box::new(self.sdata_frames_b.clone()))?;
202 registry.register(Box::new(self.rmsg_it.clone()))?;
203 registry.register(Box::new(self.rmsg_ib.clone()))?;
204 registry.register(Box::new(self.rmsg_ot.clone()))?;
205 registry.register(Box::new(self.rmsg_ob.clone()))?;
206 registry.register(Box::new(self.rdata_frames_t.clone()))?;
207 registry.register(Box::new(self.rdata_frames_b.clone()))?;
208 registry.register(Box::new(self.ping.clone()))?;
209 Ok(())
210 }
211}
212
213#[cfg(not(feature = "metrics"))]
214pub struct ProtocolMetrics {}
215
216#[cfg(feature = "metrics")]
217#[derive(Debug, Clone)]
218pub(crate) struct CacheLine {
219 pub smsg_it: GenericCounter<AtomicU64>,
220 pub smsg_ib: GenericCounter<AtomicU64>,
221 pub smsg_ot: [GenericCounter<AtomicU64>; 2],
222 pub smsg_ob: [GenericCounter<AtomicU64>; 2],
223 pub rmsg_it: GenericCounter<AtomicU64>,
224 pub rmsg_ib: GenericCounter<AtomicU64>,
225 pub rmsg_ot: [GenericCounter<AtomicU64>; 2],
226 pub rmsg_ob: [GenericCounter<AtomicU64>; 2],
227}
228
229#[cfg(feature = "metrics")]
230impl ProtocolMetricCache {
231 pub fn new(channel_key: &str, metrics: Arc<ProtocolMetrics>) -> Self {
232 let cid = channel_key.to_string();
233 let sdata_frames_t = metrics.sdata_frames_t.with_label_values(&[&cid]);
234 let sdata_frames_b = metrics.sdata_frames_b.with_label_values(&[&cid]);
235 let rdata_frames_t = metrics.rdata_frames_t.with_label_values(&[&cid]);
236 let rdata_frames_b = metrics.rdata_frames_b.with_label_values(&[&cid]);
237 let ping = metrics.ping.with_label_values(&[&cid]);
238 Self {
239 cid,
240 m: metrics,
241 cache: HashMap::new(),
242 sdata_frames_t,
243 sdata_frames_b,
244 rdata_frames_t,
245 rdata_frames_b,
246 ping,
247 }
248 }
249
250 pub(crate) fn init_sid(&mut self, sid: Sid) -> &CacheLine {
251 let cid = &self.cid;
252 let m = &self.m;
253 self.cache.entry(sid).or_insert_with_key(|sid| {
254 let s = sid.to_string();
255 let finished = RemoveReason::Finished.to_str();
256 let dropped = RemoveReason::Dropped.to_str();
257 CacheLine {
258 smsg_it: m.smsg_it.with_label_values(&[cid, &s]),
259 smsg_ib: m.smsg_ib.with_label_values(&[cid, &s]),
260 smsg_ot: [
261 m.smsg_ot.with_label_values(&[cid, &s, finished]),
262 m.smsg_ot.with_label_values(&[cid, &s, dropped]),
263 ],
264 smsg_ob: [
265 m.smsg_ob.with_label_values(&[cid, &s, finished]),
266 m.smsg_ob.with_label_values(&[cid, &s, dropped]),
267 ],
268 rmsg_it: m.rmsg_it.with_label_values(&[cid, &s]),
269 rmsg_ib: m.rmsg_ib.with_label_values(&[cid, &s]),
270 rmsg_ot: [
271 m.rmsg_ot.with_label_values(&[cid, &s, finished]),
272 m.rmsg_ot.with_label_values(&[cid, &s, dropped]),
273 ],
274 rmsg_ob: [
275 m.rmsg_ob.with_label_values(&[cid, &s, finished]),
276 m.rmsg_ob.with_label_values(&[cid, &s, dropped]),
277 ],
278 }
279 })
280 }
281
282 pub(crate) fn smsg_ib(&mut self, sid: Sid, bytes: u64) {
283 let line = self.init_sid(sid);
284 line.smsg_it.inc();
285 line.smsg_ib.inc_by(bytes);
286 }
287
288 pub(crate) fn smsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) {
289 let line = self.init_sid(sid);
290 line.smsg_ot[reason.i()].inc();
291 line.smsg_ob[reason.i()].inc_by(bytes);
292 }
293
294 pub(crate) fn sdata_frames_b(&mut self, cnt: u64, bytes: u64) {
295 self.sdata_frames_t.inc_by(cnt);
296 self.sdata_frames_b.inc_by(bytes);
297 }
298
299 pub(crate) fn rmsg_ib(&mut self, sid: Sid, bytes: u64) {
300 let line = self.init_sid(sid);
301 line.rmsg_it.inc();
302 line.rmsg_ib.inc_by(bytes);
303 }
304
305 pub(crate) fn rmsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) {
306 let line = self.init_sid(sid);
307 line.rmsg_ot[reason.i()].inc();
308 line.rmsg_ob[reason.i()].inc_by(bytes);
309 }
310
311 pub(crate) fn rdata_frames_b(&mut self, bytes: u64) {
312 self.rdata_frames_t.inc();
313 self.rdata_frames_b.inc_by(bytes);
314 }
315
316 #[cfg(test)]
317 pub(crate) fn assert_msg(&mut self, sid: Sid, cnt: u64, reason: RemoveReason) {
318 let line = self.init_sid(sid);
319 assert_eq!(line.smsg_it.get(), cnt);
320 assert_eq!(line.smsg_ot[reason.i()].get(), cnt);
321 assert_eq!(line.rmsg_it.get(), cnt);
322 assert_eq!(line.rmsg_ot[reason.i()].get(), cnt);
323 }
324
325 #[cfg(test)]
326 pub(crate) fn assert_msg_bytes(&mut self, sid: Sid, bytes: u64, reason: RemoveReason) {
327 let line = self.init_sid(sid);
328 assert_eq!(line.smsg_ib.get(), bytes);
329 assert_eq!(line.smsg_ob[reason.i()].get(), bytes);
330 assert_eq!(line.rmsg_ib.get(), bytes);
331 assert_eq!(line.rmsg_ob[reason.i()].get(), bytes);
332 }
333
334 #[cfg(test)]
335 pub(crate) fn assert_data_frames(&mut self, cnt: u64) {
336 assert_eq!(self.sdata_frames_t.get(), cnt);
337 assert_eq!(self.rdata_frames_t.get(), cnt);
338 }
339
340 #[cfg(test)]
341 pub(crate) fn assert_data_frames_bytes(&mut self, bytes: u64) {
342 assert_eq!(self.sdata_frames_b.get(), bytes);
343 assert_eq!(self.rdata_frames_b.get(), bytes);
344 }
345}
346
347#[cfg(feature = "metrics")]
348impl Drop for ProtocolMetricCache {
349 fn drop(&mut self) {
350 let cid = &self.cid;
351 let m = &self.m;
352 let finished = RemoveReason::Finished.to_str();
353 let dropped = RemoveReason::Dropped.to_str();
354 for (sid, _) in self.cache.drain() {
355 let s = sid.to_string();
356 let _ = m.smsg_it.remove_label_values(&[cid, &s]);
357 let _ = m.smsg_ib.remove_label_values(&[cid, &s]);
358 let _ = m.smsg_ot.remove_label_values(&[cid, &s, finished]);
359 let _ = m.smsg_ot.remove_label_values(&[cid, &s, dropped]);
360 let _ = m.smsg_ob.remove_label_values(&[cid, &s, finished]);
361 let _ = m.smsg_ob.remove_label_values(&[cid, &s, dropped]);
362 let _ = m.rmsg_it.remove_label_values(&[cid, &s]);
363 let _ = m.rmsg_ib.remove_label_values(&[cid, &s]);
364 let _ = m.rmsg_ot.remove_label_values(&[cid, &s, finished]);
365 let _ = m.rmsg_ot.remove_label_values(&[cid, &s, dropped]);
366 let _ = m.rmsg_ob.remove_label_values(&[cid, &s, finished]);
367 let _ = m.rmsg_ob.remove_label_values(&[cid, &s, dropped]);
368 }
369 let _ = m.ping.remove_label_values(&[cid]);
370 let _ = m.sdata_frames_t.remove_label_values(&[cid]);
371 let _ = m.sdata_frames_b.remove_label_values(&[cid]);
372 let _ = m.rdata_frames_t.remove_label_values(&[cid]);
373 let _ = m.rdata_frames_b.remove_label_values(&[cid]);
374 }
375}
376
377#[cfg(feature = "metrics")]
378impl std::fmt::Debug for ProtocolMetrics {
379 #[inline]
380 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381 write!(f, "ProtocolMetrics()")
382 }
383}
384
385#[cfg(not(feature = "metrics"))]
386impl ProtocolMetricCache {
387 pub fn new(_channel_key: &str, _metrics: Arc<ProtocolMetrics>) -> Self { Self {} }
388
389 pub(crate) fn smsg_ib(&mut self, _sid: Sid, _b: u64) {}
390
391 pub(crate) fn smsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
392
393 pub(crate) fn sdata_frames_b(&mut self, _cnt: u64, _b: u64) {}
394
395 pub(crate) fn rmsg_ib(&mut self, _sid: Sid, _b: u64) {}
396
397 pub(crate) fn rmsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
398
399 pub(crate) fn rdata_frames_b(&mut self, _b: u64) {}
400}
401
402#[cfg(not(feature = "metrics"))]
403impl ProtocolMetrics {
404 pub fn new() -> Result<Self, Box<dyn Error>> { Ok(Self {}) }
405}
406
407impl RemoveReason {
408 #[cfg(feature = "metrics")]
409 fn to_str(&self) -> &str {
410 match self {
411 RemoveReason::Finished => "Finished",
412 RemoveReason::Dropped => "Dropped",
413 }
414 }
415
416 #[cfg(feature = "metrics")]
417 pub(crate) fn i(&self) -> usize {
418 match self {
419 RemoveReason::Finished => 0,
420 RemoveReason::Dropped => 1,
421 }
422 }
423}