1#[expect(deprecated)] use std::hash::SipHasher;
2use std::{
3 hash::{Hash, Hasher},
4 io::{self, ErrorKind},
5 net::SocketAddr,
6 sync::{Arc, Mutex},
7 time::{Duration, Instant},
8};
9
10use protocol::Parcel;
11use rand::{RngExt, rng};
12use tokio::{net::UdpSocket, sync::watch};
13use tracing::{debug, error, trace};
14
15use crate::{
16 proto::{
17 Init, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, QueryServerRequest, QueryServerResponse,
18 RawQueryServerRequest, RawQueryServerResponse, ServerInfo, VELOREN_HEADER, VERSION,
19 },
20 ratelimit::{RateLimiter, ReducedIpAddr},
21};
22
23const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300);
24
25pub struct QueryServer {
26 addr: SocketAddr,
27 server_info: watch::Receiver<ServerInfo>,
28 settings: protocol::Settings,
29 ratelimit: RateLimiter,
30}
31
32#[derive(Default, Clone, Copy, Debug)]
33pub struct Metrics {
34 pub received_packets: u32,
35 pub dropped_packets: u32,
36 pub invalid_packets: u32,
37 pub proccessing_errors: u32,
38 pub info_requests: u32,
39 pub init_requests: u32,
40 pub sent_responses: u32,
41 pub failed_responses: u32,
42 pub timed_out_responses: u32,
43 pub ratelimited: u32,
44}
45
46impl QueryServer {
47 pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>, ratelimit: u16) -> Self {
48 Self {
49 addr,
50 server_info,
51 ratelimit: RateLimiter::new(ratelimit),
52 settings: Default::default(),
53 }
54 }
55
56 pub async fn run(&mut self, metrics: Arc<Mutex<Metrics>>) -> Result<(), tokio::io::Error> {
63 let mut socket = UdpSocket::bind(self.addr).await?;
64
65 let gen_secret = || {
66 let mut rng = rng();
67 (rng.random::<u64>(), rng.random::<u64>())
68 };
69 let mut secrets = gen_secret();
70 let mut last_secret_refresh = Instant::now();
71
72 let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
73 loop {
74 let (len, remote_addr) = match socket.recv_from(&mut *buf).await {
75 Ok(v) => v,
76 Err(e) if e.kind() == ErrorKind::NotConnected => {
77 error!(
78 ?e,
79 "Query server connection was closed, re-binding to socket..."
80 );
81 socket = UdpSocket::bind(self.addr).await?;
82 continue;
83 },
84 err => {
85 debug!(?err, "Error while receiving from query server socket");
86 continue;
87 },
88 };
89
90 let mut new_metrics = Metrics {
91 received_packets: 1,
92 ..Default::default()
93 };
94
95 let raw_msg_buf = &buf[..len];
96 let msg_buf = if Self::validate_datagram(raw_msg_buf) {
97 &raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())]
99 } else {
100 if let Ok(mut metrics) = metrics.lock() {
101 metrics.dropped_packets += 1;
102 }
103 continue;
104 };
105
106 self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket)
107 .await;
108
109 if let Ok(mut metrics) = metrics.lock() {
111 *metrics += new_metrics;
112 }
113
114 {
115 let now = Instant::now();
116 if now.duration_since(last_secret_refresh) > SECRET_REGEN_INTERNVAL {
117 last_secret_refresh = now;
118 secrets = gen_secret();
119 }
120
121 self.ratelimit.maintain(now);
122 }
123 }
124 }
125
126 fn validate_datagram(data: &[u8]) -> bool {
128 let len = data.len();
129 if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) {
131 trace!(?len, "Datagram too short");
132 false
133 } else if len > MAX_REQUEST_SIZE {
134 trace!(?len, "Datagram too large");
135 false
136 } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER {
137 trace!(?len, "Datagram header invalid");
138 false
139 } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION {
141 trace!(
142 "Datagram has invalid version {:?}, current {VERSION:?}",
143 &data[..2]
144 );
145 false
146 } else {
147 true
148 }
149 }
150
151 async fn process_datagram(
152 &mut self,
153 datagram: &[u8],
154 remote: SocketAddr,
155 secrets: (u64, u64),
156 metrics: &mut Metrics,
157 socket: &UdpSocket,
158 ) {
159 let Ok(RawQueryServerRequest {
160 p: client_p,
161 request,
162 }) =
163 <RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
164 else {
165 metrics.invalid_packets += 1;
166 return;
167 };
168
169 trace!(?request, "Received packet");
170
171 #[expect(deprecated)]
172 let real_p = {
173 let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
183 ReducedIpAddr::from(remote.ip()).hash(&mut hasher);
184 hasher.finish()
185 };
186
187 if real_p != client_p {
188 Self::send_response(
189 RawQueryServerResponse::Init(Init {
190 p: real_p,
191 max_supported_version: VERSION,
192 }),
193 remote,
194 socket,
195 metrics,
196 )
197 .await;
198
199 return;
200 }
201
202 if !self.ratelimit.can_request(remote.ip().into()) {
203 trace!("Ratelimited request");
204 metrics.ratelimited += 1;
205 return;
206 }
207
208 match request {
209 QueryServerRequest::Init => {
210 metrics.init_requests += 1;
211 Self::send_response(
212 RawQueryServerResponse::Init(Init {
213 p: real_p,
214 max_supported_version: VERSION,
215 }),
216 remote,
217 socket,
218 metrics,
219 )
220 .await;
221 },
222 QueryServerRequest::ServerInfo => {
223 metrics.info_requests += 1;
224 let server_info = *self.server_info.borrow();
225 Self::send_response(
226 RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)),
227 remote,
228 socket,
229 metrics,
230 )
231 .await;
232 },
233 }
234 }
235
236 async fn send_response(
237 response: RawQueryServerResponse,
238 addr: SocketAddr,
239 socket: &UdpSocket,
240 metrics: &mut Metrics,
241 ) {
242 match <RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default()) {
245 Ok(data) => {
246 if data.len() > MAX_RESPONSE_SIZE {
247 error!(
248 ?MAX_RESPONSE_SIZE,
249 "Attempted to send a response larger than the maximum allowed size (size: \
250 {}, response: {response:?})",
251 data.len()
252 );
253 #[cfg(debug_assertions)]
254 panic!(
255 "Attempted to send a response larger than the maximum allowed size (size: \
256 {}, max: {}, response: {response:?})",
257 data.len(),
258 MAX_RESPONSE_SIZE
259 );
260 }
261
262 match socket.send_to(&data, addr).await {
263 Ok(_) => {
264 metrics.sent_responses += 1;
265 },
266 Err(err) => {
267 metrics.failed_responses += 1;
268 debug!(?err, "Failed to send query server response");
269 },
270 }
271 },
272 Err(error) => {
273 trace!(?error, "Failed to serialize response");
274 #[cfg(debug_assertions)]
275 panic!("Serializing response failed: {error:?} ({response:?})");
276 },
277 }
278 }
279}
280
281impl std::ops::AddAssign for Metrics {
282 fn add_assign(
283 &mut self,
284 Self {
285 received_packets,
286 dropped_packets,
287 invalid_packets,
288 proccessing_errors,
289 info_requests,
290 init_requests,
291 sent_responses,
292 failed_responses,
293 timed_out_responses,
294 ratelimited,
295 }: Self,
296 ) {
297 self.received_packets += received_packets;
298 self.dropped_packets += dropped_packets;
299 self.invalid_packets += invalid_packets;
300 self.proccessing_errors += proccessing_errors;
301 self.info_requests += info_requests;
302 self.init_requests += init_requests;
303 self.sent_responses += sent_responses;
304 self.failed_responses += failed_responses;
305 self.timed_out_responses += timed_out_responses;
306 self.ratelimited += ratelimited;
307 }
308}
309
310impl Metrics {
311 pub fn reset(&mut self) -> Self { std::mem::take(self) }
315}