1use crate::api::{ConnectAddr, ListenAddr};
2use network_protocol::{Cid, Pid};
3#[cfg(feature = "metrics")]
4use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
5use std::{error::Error, net::SocketAddr};
6
7#[derive(Clone, Debug, Hash, PartialEq, Eq)]
8pub(crate) enum ProtocolInfo {
9 Tcp(SocketAddr),
10 Udp(SocketAddr),
11 #[cfg(feature = "quic")]
12 Quic(SocketAddr),
13 Mpsc(u64),
14}
15
16impl From<ListenAddr> for ProtocolInfo {
17 fn from(other: ListenAddr) -> ProtocolInfo {
18 match other {
19 ListenAddr::Tcp(s) => ProtocolInfo::Tcp(s),
20 ListenAddr::Udp(s) => ProtocolInfo::Udp(s),
21 #[cfg(feature = "quic")]
22 ListenAddr::Quic(s, _) => ProtocolInfo::Quic(s),
23 ListenAddr::Mpsc(s) => ProtocolInfo::Mpsc(s),
24 }
25 }
26}
27
28#[cfg(feature = "metrics")]
30pub struct NetworkMetrics {
31 pub listen_requests_total: IntCounterVec,
32 pub connect_requests_total: IntCounterVec,
33 pub incoming_connections_total: IntCounterVec,
34 pub failed_handshakes_total: IntCounter,
35 pub participants_connected_total: IntCounter,
36 pub participants_disconnected_total: IntCounter,
37 pub participants_channel_ids: IntGaugeVec,
39 pub participants_bandwidth: IntGaugeVec,
41 pub channels_connected_total: IntCounterVec,
43 pub channels_disconnected_total: IntCounterVec,
44 pub streams_opened_total: IntCounterVec,
46 pub streams_closed_total: IntCounterVec,
47 pub network_info: IntGauge,
48}
49
50#[cfg(not(feature = "metrics"))]
51pub struct NetworkMetrics {}
52
53#[cfg(feature = "metrics")]
54impl NetworkMetrics {
55 pub fn new(local_pid: &Pid) -> Result<Self, Box<dyn Error>> {
56 let listen_requests_total = IntCounterVec::new(
57 Opts::new(
58 "listen_requests_total",
59 "Shows the number of listen requests to the scheduler",
60 ),
61 &["protocol"],
62 )?;
63 let connect_requests_total = IntCounterVec::new(
64 Opts::new(
65 "connect_requests_total",
66 "Shows the number of connect requests to the scheduler",
67 ),
68 &["protocol"],
69 )?;
70 let incoming_connections_total = IntCounterVec::new(
71 Opts::new(
72 "incoming_connections_total",
73 "Shows the number of external requests to the scheduler",
74 ),
75 &["protocol"],
76 )?;
77 let failed_handshakes_total = IntCounter::with_opts(Opts::new(
78 "failed_handshakes_total",
79 "Shows the number of failed handshakes",
80 ))?;
81 let participants_connected_total = IntCounter::with_opts(Opts::new(
82 "participants_connected_total",
83 "Shows the number of participants connected to the network",
84 ))?;
85 let participants_disconnected_total = IntCounter::with_opts(Opts::new(
86 "participants_disconnected_total",
87 "Shows the number of participants disconnected to the network",
88 ))?;
89 let participants_channel_ids = IntGaugeVec::new(
90 Opts::new(
91 "participants_channel_ids",
92 "Channel numbers belonging to a Participant in the network",
93 ),
94 &["participant", "no"],
95 )?;
96 let participants_bandwidth = IntGaugeVec::new(
97 Opts::new(
98 "participants_bandwidth",
99 "max upload possible to Participant",
100 ),
101 &["participant"],
102 )?;
103 let channels_connected_total = IntCounterVec::new(
104 Opts::new(
105 "channels_connected_total",
106 "Number of all channels currently connected on the network",
107 ),
108 &["participant"],
109 )?;
110 let channels_disconnected_total = IntCounterVec::new(
111 Opts::new(
112 "channels_disconnected_total",
113 "Number of all channels currently disconnected on the network",
114 ),
115 &["participant"],
116 )?;
117 let streams_opened_total = IntCounterVec::new(
118 Opts::new(
119 "streams_opened_total",
120 "Number of all streams currently open on the network",
121 ),
122 &["participant"],
123 )?;
124 let streams_closed_total = IntCounterVec::new(
125 Opts::new(
126 "streams_closed_total",
127 "Number of all streams currently open on the network",
128 ),
129 &["participant"],
130 )?;
131 let opts = Opts::new("network_info", "Static Network information")
132 .const_label(
133 "version",
134 format!(
135 "{}.{}.{}",
136 &network_protocol::VELOREN_NETWORK_VERSION[0],
137 &network_protocol::VELOREN_NETWORK_VERSION[1],
138 &network_protocol::VELOREN_NETWORK_VERSION[2]
139 ),
140 )
141 .const_label("local_pid", format!("{}", &local_pid));
142 let network_info = IntGauge::with_opts(opts)?;
143
144 Ok(Self {
145 listen_requests_total,
146 connect_requests_total,
147 incoming_connections_total,
148 failed_handshakes_total,
149 participants_connected_total,
150 participants_disconnected_total,
151 participants_channel_ids,
152 participants_bandwidth,
153 channels_connected_total,
154 channels_disconnected_total,
155 streams_opened_total,
156 streams_closed_total,
157 network_info,
158 })
159 }
160
161 pub fn register(&self, registry: &Registry) -> Result<(), Box<dyn Error>> {
162 registry.register(Box::new(self.listen_requests_total.clone()))?;
163 registry.register(Box::new(self.connect_requests_total.clone()))?;
164 registry.register(Box::new(self.incoming_connections_total.clone()))?;
165 registry.register(Box::new(self.failed_handshakes_total.clone()))?;
166 registry.register(Box::new(self.participants_connected_total.clone()))?;
167 registry.register(Box::new(self.participants_disconnected_total.clone()))?;
168 registry.register(Box::new(self.participants_channel_ids.clone()))?;
169 registry.register(Box::new(self.participants_bandwidth.clone()))?;
170 registry.register(Box::new(self.channels_connected_total.clone()))?;
171 registry.register(Box::new(self.channels_disconnected_total.clone()))?;
172 registry.register(Box::new(self.streams_opened_total.clone()))?;
173 registry.register(Box::new(self.streams_closed_total.clone()))?;
174 registry.register(Box::new(self.network_info.clone()))?;
175 Ok(())
176 }
177
178 pub(crate) fn connect_requests_cache(&self, protocol: &ListenAddr) -> IntCounter {
179 self.incoming_connections_total
180 .with_label_values(&[protocollisten_name(protocol)])
181 }
182
183 pub(crate) fn channels_connected(&self, remote_p: &str, no: usize, cid: Cid) {
184 self.channels_connected_total
185 .with_label_values(&[remote_p])
186 .inc();
187 self.participants_channel_ids
188 .with_label_values(&[remote_p, &no.to_string()])
189 .set(cid as i64);
190 }
191
192 pub(crate) fn channels_disconnected(&self, remote_p: &str) {
193 self.channels_disconnected_total
194 .with_label_values(&[remote_p])
195 .inc();
196 }
197
198 pub(crate) fn participant_bandwidth(&self, remote_p: &str, bandwidth: f32) {
199 self.participants_bandwidth
200 .with_label_values(&[remote_p])
201 .set(bandwidth as i64);
202 }
203
204 pub(crate) fn streams_opened(&self, remote_p: &str) {
205 self.streams_opened_total
206 .with_label_values(&[remote_p])
207 .inc();
208 }
209
210 pub(crate) fn streams_closed(&self, remote_p: &str) {
211 self.streams_closed_total
212 .with_label_values(&[remote_p])
213 .inc();
214 }
215
216 pub(crate) fn listen_request(&self, protocol: &ListenAddr) {
217 self.listen_requests_total
218 .with_label_values(&[protocollisten_name(protocol)])
219 .inc();
220 }
221
222 pub(crate) fn connect_request(&self, protocol: &ConnectAddr) {
223 self.connect_requests_total
224 .with_label_values(&[protocolconnect_name(protocol)])
225 .inc();
226 }
227
228 pub(crate) fn cleanup_participant(&self, remote_p: &str) {
229 for no in 0..5 {
230 let _ = self
231 .participants_channel_ids
232 .remove_label_values(&[remote_p, &no.to_string()]);
233 }
234 let _ = self
235 .channels_connected_total
236 .remove_label_values(&[remote_p]);
237 let _ = self
238 .channels_disconnected_total
239 .remove_label_values(&[remote_p]);
240 let _ = self.participants_bandwidth.remove_label_values(&[remote_p]);
241 let _ = self.streams_opened_total.remove_label_values(&[remote_p]);
242 let _ = self.streams_closed_total.remove_label_values(&[remote_p]);
243 }
244}
245
246#[cfg(feature = "metrics")]
247fn protocolconnect_name(protocol: &ConnectAddr) -> &str {
248 match protocol {
249 ConnectAddr::Tcp(_) => "tcp",
250 ConnectAddr::Udp(_) => "udp",
251 ConnectAddr::Mpsc(_) => "mpsc",
252 #[cfg(feature = "quic")]
253 ConnectAddr::Quic(_, _, _) => "quic",
254 }
255}
256
257#[cfg(feature = "metrics")]
258fn protocollisten_name(protocol: &ListenAddr) -> &str {
259 match protocol {
260 ListenAddr::Tcp(_) => "tcp",
261 ListenAddr::Udp(_) => "udp",
262 ListenAddr::Mpsc(_) => "mpsc",
263 #[cfg(feature = "quic")]
264 ListenAddr::Quic(_, _) => "quic",
265 }
266}
267
268#[cfg(not(feature = "metrics"))]
269impl NetworkMetrics {
270 pub fn new(_local_pid: &Pid) -> Result<Self, Box<dyn Error>> { Ok(Self {}) }
271
272 pub(crate) fn channels_connected(&self, _remote_p: &str, _no: usize, _cid: Cid) {}
273
274 pub(crate) fn channels_disconnected(&self, _remote_p: &str) {}
275
276 pub(crate) fn participant_bandwidth(&self, _remote_p: &str, _bandwidth: f32) {}
277
278 pub(crate) fn streams_opened(&self, _remote_p: &str) {}
279
280 pub(crate) fn streams_closed(&self, _remote_p: &str) {}
281
282 pub(crate) fn listen_request(&self, _protocol: &ListenAddr) {}
283
284 pub(crate) fn connect_request(&self, _protocol: &ConnectAddr) {}
285
286 pub(crate) fn cleanup_participant(&self, _remote_p: &str) {}
287}
288
289impl std::fmt::Debug for NetworkMetrics {
290 #[inline]
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 write!(f, "NetworkMetrics()")
293 }
294}