Skip to main content

veloren_query_server/
server.rs

1#[expect(deprecated)] use std::hash::SipHasher;
2use std::{
3    hash::{Hash, Hasher},
4    io::{self, ErrorKind},
5    net::SocketAddr,
6    sync::{Arc, Mutex},
7    time::{Duration, Instant},
8};
9
10use protocol::Parcel;
11use rand::{RngExt, rng};
12use tokio::{net::UdpSocket, sync::watch};
13use tracing::{debug, error, trace};
14
15use crate::{
16    proto::{
17        Init, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, QueryServerRequest, QueryServerResponse,
18        RawQueryServerRequest, RawQueryServerResponse, ServerInfo, VELOREN_HEADER, VERSION,
19    },
20    ratelimit::{RateLimiter, ReducedIpAddr},
21};
22
23const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300);
24
25pub struct QueryServer {
26    addr: SocketAddr,
27    server_info: watch::Receiver<ServerInfo>,
28    settings: protocol::Settings,
29    ratelimit: RateLimiter,
30}
31
32#[derive(Default, Clone, Copy, Debug)]
33pub struct Metrics {
34    pub received_packets: u32,
35    pub dropped_packets: u32,
36    pub invalid_packets: u32,
37    pub proccessing_errors: u32,
38    pub info_requests: u32,
39    pub init_requests: u32,
40    pub sent_responses: u32,
41    pub failed_responses: u32,
42    pub timed_out_responses: u32,
43    pub ratelimited: u32,
44}
45
46impl QueryServer {
47    pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>, ratelimit: u16) -> Self {
48        Self {
49            addr,
50            server_info,
51            ratelimit: RateLimiter::new(ratelimit),
52            settings: Default::default(),
53        }
54    }
55
56    /// This produces TRACE level logs for any packet received on the assigned
57    /// port. To prevent potentially unfettered log spam, disable the TRACE
58    /// level for this crate (when outside of debugging contexts).
59    ///
60    /// NOTE: TRACE and DEBUG levels are disabled by default for this crate when
61    /// using `veloren-common-frontend`.
62    pub async fn run(&mut self, metrics: Arc<Mutex<Metrics>>) -> Result<(), tokio::io::Error> {
63        let mut socket = UdpSocket::bind(self.addr).await?;
64
65        let gen_secret = || {
66            let mut rng = rng();
67            (rng.random::<u64>(), rng.random::<u64>())
68        };
69        let mut secrets = gen_secret();
70        let mut last_secret_refresh = Instant::now();
71
72        let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
73        loop {
74            let (len, remote_addr) = match socket.recv_from(&mut *buf).await {
75                Ok(v) => v,
76                Err(e) if e.kind() == ErrorKind::NotConnected => {
77                    error!(
78                        ?e,
79                        "Query server connection was closed, re-binding to socket..."
80                    );
81                    socket = UdpSocket::bind(self.addr).await?;
82                    continue;
83                },
84                err => {
85                    debug!(?err, "Error while receiving from query server socket");
86                    continue;
87                },
88            };
89
90            let mut new_metrics = Metrics {
91                received_packets: 1,
92                ..Default::default()
93            };
94
95            let raw_msg_buf = &buf[..len];
96            let msg_buf = if Self::validate_datagram(raw_msg_buf) {
97                // Require 2 extra bytes for version (currently unused)
98                &raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())]
99            } else {
100                if let Ok(mut metrics) = metrics.lock() {
101                    metrics.dropped_packets += 1;
102                }
103                continue;
104            };
105
106            self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket)
107                .await;
108
109            // Update metrics at the end of eath packet
110            if let Ok(mut metrics) = metrics.lock() {
111                *metrics += new_metrics;
112            }
113
114            {
115                let now = Instant::now();
116                if now.duration_since(last_secret_refresh) > SECRET_REGEN_INTERNVAL {
117                    last_secret_refresh = now;
118                    secrets = gen_secret();
119                }
120
121                self.ratelimit.maintain(now);
122            }
123        }
124    }
125
126    // Header must be discarded after this validation passes
127    fn validate_datagram(data: &[u8]) -> bool {
128        let len = data.len();
129        // Require 2 extra bytes for version (currently unused)
130        if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) {
131            trace!(?len, "Datagram too short");
132            false
133        } else if len > MAX_REQUEST_SIZE {
134            trace!(?len, "Datagram too large");
135            false
136        } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER {
137            trace!(?len, "Datagram header invalid");
138            false
139        // TODO: Allow lower versions once proper versioning is added.
140        } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION {
141            trace!(
142                "Datagram has invalid version {:?}, current {VERSION:?}",
143                &data[..2]
144            );
145            false
146        } else {
147            true
148        }
149    }
150
151    async fn process_datagram(
152        &mut self,
153        datagram: &[u8],
154        remote: SocketAddr,
155        secrets: (u64, u64),
156        metrics: &mut Metrics,
157        socket: &UdpSocket,
158    ) {
159        let Ok(RawQueryServerRequest {
160            p: client_p,
161            request,
162        }) =
163            <RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
164        else {
165            metrics.invalid_packets += 1;
166            return;
167        };
168
169        trace!(?request, "Received packet");
170
171        #[expect(deprecated)]
172        let real_p = {
173            // Use SipHash-2-4 to compute the `p` value from a server specific
174            // secret and the client's address.
175            //
176            // This is used to verify that packets are from an entity that can
177            // receive packets at the given address.
178            //
179            // Only use the first 64 bits from Ipv6 addresses since the latter
180            // 64 bits can change very frequently (as much as for every
181            // request).
182            let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
183            ReducedIpAddr::from(remote.ip()).hash(&mut hasher);
184            hasher.finish()
185        };
186
187        if real_p != client_p {
188            Self::send_response(
189                RawQueryServerResponse::Init(Init {
190                    p: real_p,
191                    max_supported_version: VERSION,
192                }),
193                remote,
194                socket,
195                metrics,
196            )
197            .await;
198
199            return;
200        }
201
202        if !self.ratelimit.can_request(remote.ip().into()) {
203            trace!("Ratelimited request");
204            metrics.ratelimited += 1;
205            return;
206        }
207
208        match request {
209            QueryServerRequest::Init => {
210                metrics.init_requests += 1;
211                Self::send_response(
212                    RawQueryServerResponse::Init(Init {
213                        p: real_p,
214                        max_supported_version: VERSION,
215                    }),
216                    remote,
217                    socket,
218                    metrics,
219                )
220                .await;
221            },
222            QueryServerRequest::ServerInfo => {
223                metrics.info_requests += 1;
224                let server_info = *self.server_info.borrow();
225                Self::send_response(
226                    RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)),
227                    remote,
228                    socket,
229                    metrics,
230                )
231                .await;
232            },
233        }
234    }
235
236    async fn send_response(
237        response: RawQueryServerResponse,
238        addr: SocketAddr,
239        socket: &UdpSocket,
240        metrics: &mut Metrics,
241    ) {
242        // TODO: Once more versions are added, send the packet in the same version as
243        // the request here.
244        match <RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default()) {
245            Ok(data) => {
246                if data.len() > MAX_RESPONSE_SIZE {
247                    error!(
248                        ?MAX_RESPONSE_SIZE,
249                        "Attempted to send a response larger than the maximum allowed size (size: \
250                         {}, response: {response:?})",
251                        data.len()
252                    );
253                    #[cfg(debug_assertions)]
254                    panic!(
255                        "Attempted to send a response larger than the maximum allowed size (size: \
256                         {}, max: {}, response: {response:?})",
257                        data.len(),
258                        MAX_RESPONSE_SIZE
259                    );
260                }
261
262                match socket.send_to(&data, addr).await {
263                    Ok(_) => {
264                        metrics.sent_responses += 1;
265                    },
266                    Err(err) => {
267                        metrics.failed_responses += 1;
268                        debug!(?err, "Failed to send query server response");
269                    },
270                }
271            },
272            Err(error) => {
273                trace!(?error, "Failed to serialize response");
274                #[cfg(debug_assertions)]
275                panic!("Serializing response failed: {error:?} ({response:?})");
276            },
277        }
278    }
279}
280
281impl std::ops::AddAssign for Metrics {
282    fn add_assign(
283        &mut self,
284        Self {
285            received_packets,
286            dropped_packets,
287            invalid_packets,
288            proccessing_errors,
289            info_requests,
290            init_requests,
291            sent_responses,
292            failed_responses,
293            timed_out_responses,
294            ratelimited,
295        }: Self,
296    ) {
297        self.received_packets += received_packets;
298        self.dropped_packets += dropped_packets;
299        self.invalid_packets += invalid_packets;
300        self.proccessing_errors += proccessing_errors;
301        self.info_requests += info_requests;
302        self.init_requests += init_requests;
303        self.sent_responses += sent_responses;
304        self.failed_responses += failed_responses;
305        self.timed_out_responses += timed_out_responses;
306        self.ratelimited += ratelimited;
307    }
308}
309
310impl Metrics {
311    /// Resets all metrics to 0 and returns previous ones
312    ///
313    /// Used by the consumer of the metrics.
314    pub fn reset(&mut self) -> Self { std::mem::take(self) }
315}