veloren_network_protocol/
prio.rs1use crate::{
2 frame::OTFrame,
3 message::OTMessage,
4 metrics::{ProtocolMetricCache, RemoveReason},
5 types::{Bandwidth, HIGHEST_PRIO, Mid, Prio, Promises, Sid},
6};
7use bytes::Bytes;
8use std::{
9 collections::{HashMap, VecDeque},
10 time::Duration,
11};
12
13#[derive(Debug)]
14struct StreamInfo {
15 pub(crate) guaranteed_bandwidth: Bandwidth,
16 pub(crate) prio: Prio,
17 #[expect(dead_code)]
18 pub(crate) promises: Promises,
19 pub(crate) messages: VecDeque<OTMessage>,
20}
21
22#[derive(Debug)]
27pub(crate) struct PrioManager {
28 streams: HashMap<Sid, StreamInfo>,
29 metrics: ProtocolMetricCache,
30}
31
32impl PrioManager {
35 pub fn new(metrics: ProtocolMetricCache) -> Self {
36 Self {
37 streams: HashMap::new(),
38 metrics,
39 }
40 }
41
42 pub fn open_stream(
43 &mut self,
44 sid: Sid,
45 prio: Prio,
46 promises: Promises,
47 guaranteed_bandwidth: Bandwidth,
48 ) {
49 self.streams.insert(sid, StreamInfo {
50 guaranteed_bandwidth,
51 prio,
52 promises,
53 messages: VecDeque::new(),
54 });
55 }
56
57 pub fn try_close_stream(&mut self, sid: Sid) -> bool {
58 if let Some(si) = self.streams.get(&sid) {
59 if si.messages.is_empty() {
60 self.streams.remove(&sid);
61 return true;
62 }
63 }
64 false
65 }
66
67 pub fn is_empty(&self) -> bool { self.streams.is_empty() }
68
69 pub fn add(&mut self, buffer: Bytes, mid: Mid, sid: Sid) {
70 self.streams
71 .get_mut(&sid)
72 .unwrap()
73 .messages
74 .push_back(OTMessage::new(buffer, mid, sid));
75 }
76
77 pub fn grab(&mut self, bandwidth: Bandwidth, dt: Duration) -> (Vec<(Sid, OTFrame)>, Bandwidth) {
80 let total_bytes = (bandwidth as f64 * dt.as_secs_f64()) as u64;
81 let mut cur_bytes = 0u64;
82 let mut frames = vec![];
83
84 let mut prios = [0u64; (HIGHEST_PRIO + 1) as usize];
85 let metrics = &mut self.metrics;
86
87 let mut process_stream =
88 |sid: &Sid, stream: &mut StreamInfo, mut bandwidth: i64, cur_bytes: &mut u64| {
89 let mut finished = None;
90 'outer: for (i, msg) in stream.messages.iter_mut().enumerate() {
91 while let Some(frame) = msg.next() {
92 let b = if let OTFrame::Data { data, .. } = &frame {
93 crate::frame::TCP_DATA_CNS + 1 + data.len()
94 } else {
95 crate::frame::TCP_DATA_HEADER_CNS + 1
96 } as u64;
97 bandwidth -= b as i64;
98 *cur_bytes += b;
99 frames.push((*sid, frame));
100 if bandwidth <= 0 {
101 break 'outer;
102 }
103 }
104 let (sid, bytes) = msg.get_sid_len();
105 metrics.smsg_ob(sid, RemoveReason::Finished, bytes);
106 finished = Some(i);
107 }
108 if let Some(i) = finished {
109 stream.messages.drain(..=i);
111 }
112 };
113
114 for (sid, stream) in self.streams.iter_mut() {
116 prios[stream.prio as usize] += 1;
117 let stream_byte_cnt = (stream.guaranteed_bandwidth as f64 * dt.as_secs_f64()) as u64;
118 process_stream(sid, stream, stream_byte_cnt as i64, &mut cur_bytes);
119 }
120
121 if cur_bytes < total_bytes {
122 'outer: for prio in 0..=HIGHEST_PRIO {
124 if prios[prio as usize] == 0 {
125 continue;
126 }
127 let per_stream_bytes = ((total_bytes - cur_bytes) / prios[prio as usize]) as i64;
128 for (sid, stream) in self.streams.iter_mut() {
129 if stream.prio != prio {
130 continue;
131 }
132 process_stream(sid, stream, per_stream_bytes, &mut cur_bytes);
133 if cur_bytes >= total_bytes {
134 break 'outer;
135 }
136 }
137 }
138 }
139 (frames, cur_bytes)
140 }
141}