veloren_network_protocol/
prio.rs

1use crate::{
2    frame::OTFrame,
3    message::OTMessage,
4    metrics::{ProtocolMetricCache, RemoveReason},
5    types::{Bandwidth, HIGHEST_PRIO, Mid, Prio, Promises, Sid},
6};
7use bytes::Bytes;
8use std::{
9    collections::{HashMap, VecDeque},
10    time::Duration,
11};
12
13#[derive(Debug)]
14struct StreamInfo {
15    pub(crate) guaranteed_bandwidth: Bandwidth,
16    pub(crate) prio: Prio,
17    #[expect(dead_code)]
18    pub(crate) promises: Promises,
19    pub(crate) messages: VecDeque<OTMessage>,
20}
21
22/// Responsible for queueing messages.
23/// every stream has a guaranteed bandwidth and a prio 0-7.
24/// when `n` Bytes are available in the buffer, first the guaranteed bandwidth
25/// is used. Then remaining bandwidth is used to fill up the prios.
26#[derive(Debug)]
27pub(crate) struct PrioManager {
28    streams: HashMap<Sid, StreamInfo>,
29    metrics: ProtocolMetricCache,
30}
31
32// Send everything ONCE, then keep it till it's confirmed
33
34impl PrioManager {
35    pub fn new(metrics: ProtocolMetricCache) -> Self {
36        Self {
37            streams: HashMap::new(),
38            metrics,
39        }
40    }
41
42    pub fn open_stream(
43        &mut self,
44        sid: Sid,
45        prio: Prio,
46        promises: Promises,
47        guaranteed_bandwidth: Bandwidth,
48    ) {
49        self.streams.insert(sid, StreamInfo {
50            guaranteed_bandwidth,
51            prio,
52            promises,
53            messages: VecDeque::new(),
54        });
55    }
56
57    pub fn try_close_stream(&mut self, sid: Sid) -> bool {
58        if let Some(si) = self.streams.get(&sid) {
59            if si.messages.is_empty() {
60                self.streams.remove(&sid);
61                return true;
62            }
63        }
64        false
65    }
66
67    pub fn is_empty(&self) -> bool { self.streams.is_empty() }
68
69    pub fn add(&mut self, buffer: Bytes, mid: Mid, sid: Sid) {
70        self.streams
71            .get_mut(&sid)
72            .unwrap()
73            .messages
74            .push_back(OTMessage::new(buffer, mid, sid));
75    }
76
77    /// bandwidth might be extended, as for technical reasons
78    /// guaranteed_bandwidth is used and frames are always 1400 bytes.
79    pub fn grab(&mut self, bandwidth: Bandwidth, dt: Duration) -> (Vec<(Sid, OTFrame)>, Bandwidth) {
80        let total_bytes = (bandwidth as f64 * dt.as_secs_f64()) as u64;
81        let mut cur_bytes = 0u64;
82        let mut frames = vec![];
83
84        let mut prios = [0u64; (HIGHEST_PRIO + 1) as usize];
85        let metrics = &mut self.metrics;
86
87        let mut process_stream =
88            |sid: &Sid, stream: &mut StreamInfo, mut bandwidth: i64, cur_bytes: &mut u64| {
89                let mut finished = None;
90                'outer: for (i, msg) in stream.messages.iter_mut().enumerate() {
91                    while let Some(frame) = msg.next() {
92                        let b = if let OTFrame::Data { data, .. } = &frame {
93                            crate::frame::TCP_DATA_CNS + 1 + data.len()
94                        } else {
95                            crate::frame::TCP_DATA_HEADER_CNS + 1
96                        } as u64;
97                        bandwidth -= b as i64;
98                        *cur_bytes += b;
99                        frames.push((*sid, frame));
100                        if bandwidth <= 0 {
101                            break 'outer;
102                        }
103                    }
104                    let (sid, bytes) = msg.get_sid_len();
105                    metrics.smsg_ob(sid, RemoveReason::Finished, bytes);
106                    finished = Some(i);
107                }
108                if let Some(i) = finished {
109                    //cleanup
110                    stream.messages.drain(..=i);
111                }
112            };
113
114        // Add guaranteed bandwidth
115        for (sid, stream) in self.streams.iter_mut() {
116            prios[stream.prio as usize] += 1;
117            let stream_byte_cnt = (stream.guaranteed_bandwidth as f64 * dt.as_secs_f64()) as u64;
118            process_stream(sid, stream, stream_byte_cnt as i64, &mut cur_bytes);
119        }
120
121        if cur_bytes < total_bytes {
122            // Add optional bandwidth
123            'outer: for prio in 0..=HIGHEST_PRIO {
124                if prios[prio as usize] == 0 {
125                    continue;
126                }
127                let per_stream_bytes = ((total_bytes - cur_bytes) / prios[prio as usize]) as i64;
128                for (sid, stream) in self.streams.iter_mut() {
129                    if stream.prio != prio {
130                        continue;
131                    }
132                    process_stream(sid, stream, per_stream_bytes, &mut cur_bytes);
133                    if cur_bytes >= total_bytes {
134                        break 'outer;
135                    }
136                }
137            }
138        }
139        (frames, cur_bytes)
140    }
141}