veloren_server/
connection_handler.rs

1use crate::{Client, ClientType, ServerInfo};
2use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
3use futures_util::future::FutureExt;
4use network::{Network, Participant, Promises};
5use std::time::Duration;
6use tokio::{runtime::Runtime, select, sync::oneshot};
7use tracing::{debug, error, trace, warn};
8
9pub(crate) struct ServerInfoPacket {
10    pub info: ServerInfo,
11    pub time: f64,
12}
13
14pub(crate) type IncomingClient = Client;
15
16pub(crate) struct ConnectionHandler {
17    /// We never actually use this, but if it's dropped before the network has a
18    /// chance to exit, it won't block the main thread, and if it is dropped
19    /// after the network thread ends, it will drop the network here (rather
20    /// than delaying the network thread).  So it emulates the effects of
21    /// storing the network in an Arc, without us losing mutability in the
22    /// network thread.
23    _network_receiver: oneshot::Receiver<Network>,
24    thread_handle: Option<tokio::task::JoinHandle<()>>,
25    pub client_receiver: Receiver<IncomingClient>,
26    pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
27    stop_sender: Option<oneshot::Sender<()>>,
28}
29
30/// Instead of waiting the main loop we are handling connections, especially
31/// their slow network .await part on a different thread. We need to communicate
32/// to the Server main thread sometimes though to get the current server_info
33/// and time
34impl ConnectionHandler {
35    pub fn new(network: Network, runtime: &Runtime) -> Self {
36        let (stop_sender, stop_receiver) = oneshot::channel();
37        let (network_sender, _network_receiver) = oneshot::channel();
38
39        let (client_sender, client_receiver) = unbounded::<IncomingClient>();
40        let (info_requester_sender, info_requester_receiver) =
41            bounded::<Sender<ServerInfoPacket>>(1);
42
43        let thread_handle = Some(runtime.spawn(Self::work(
44            network,
45            client_sender,
46            info_requester_sender,
47            stop_receiver,
48            network_sender,
49        )));
50
51        Self {
52            thread_handle,
53            client_receiver,
54            info_requester_receiver,
55            stop_sender: Some(stop_sender),
56            _network_receiver,
57        }
58    }
59
60    async fn work(
61        network: Network,
62        client_sender: Sender<IncomingClient>,
63        info_requester_sender: Sender<Sender<ServerInfoPacket>>,
64        stop_receiver: oneshot::Receiver<()>,
65        network_sender: oneshot::Sender<Network>,
66    ) {
67        // Emulate the effects of storing the network in an Arc, without losing
68        // mutability.
69        let mut network_sender = Some(network_sender);
70        let mut network = drop_guard::guard(network, move |network| {
71            // If the network receiver was already dropped, we just drop the network here,
72            // just like Arc, so we don't care about the result.
73            let _ = network_sender
74                .take()
75                .expect("Only used once in drop")
76                .send(network);
77        });
78        let mut stop_receiver = stop_receiver.fuse();
79        loop {
80            let participant = match select!(
81                _ = &mut stop_receiver => None,
82                p = network.connected().fuse() => Some(p),
83            ) {
84                None => break,
85                Some(Ok(p)) => p,
86                Some(Err(e)) => {
87                    error!(
88                        ?e,
89                        "Stopping Connection Handler, no new connections can be made to server \
90                         now!"
91                    );
92                    break;
93                },
94            };
95
96            let client_sender = client_sender.clone();
97            let info_requester_sender = info_requester_sender.clone();
98
99            match select!(
100                _ = &mut stop_receiver => None,
101                e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e),
102            ) {
103                None => break,
104                Some(Ok(())) => (),
105                Some(Err(e)) => warn!(?e, "drop new participant, because an error occurred"),
106            }
107        }
108    }
109
110    async fn init_participant(
111        mut participant: Participant,
112        client_sender: Sender<IncomingClient>,
113        info_requester_sender: Sender<Sender<ServerInfoPacket>>,
114    ) -> Result<(), Box<dyn std::error::Error>> {
115        debug!("New Participant connected to the server");
116        let (sender, receiver) = bounded(1);
117        info_requester_sender.send(sender)?;
118
119        let reliable = Promises::ORDERED | Promises::CONSISTENCY;
120        let reliablec = reliable | Promises::COMPRESSED;
121
122        let general_stream = participant.open(3, reliablec, 500).await?;
123        let ping_stream = participant.open(2, reliable, 500).await?;
124        let mut register_stream = participant.open(3, reliablec, 500).await?;
125        let character_screen_stream = participant.open(3, reliablec, 500).await?;
126        let in_game_stream = participant.open(3, reliablec, 100_000).await?;
127        let terrain_stream = participant.open(4, reliable, 20_000).await?;
128
129        let server_data = receiver.recv()?;
130
131        register_stream.send(server_data.info)?;
132
133        const TIMEOUT: Duration = Duration::from_secs(5);
134        let client_type = match select!(
135            _ = tokio::time::sleep(TIMEOUT).fuse() => None,
136            t = register_stream.recv::<ClientType>().fuse() => Some(t),
137        ) {
138            None => {
139                debug!("Timeout for incoming client elapsed, aborting connection");
140                return Ok(());
141            },
142            Some(client_type) => client_type?,
143        };
144
145        use network::ParticipantEvent;
146        let connected_from = match select!(
147            _ = tokio::time::sleep(TIMEOUT).fuse() => None,
148            connected_from = participant.fetch_event().fuse() => Some(connected_from),
149        ) {
150            None => {
151                error!("Did not receive initial channel created event. This is a bug!");
152                return Ok(());
153            },
154            Some(Err(err)) => {
155                debug!("Participant error when trying to receive event: {err:?}");
156                return Ok(());
157            },
158            Some(Ok(ParticipantEvent::ChannelDeleted(_))) => {
159                error!(
160                    "Received channel deleted event instead of the initial channel created event. \
161                     This is a bug!"
162                );
163                return Ok(());
164            },
165            Some(Ok(ParticipantEvent::ChannelCreated(connected_from))) => connected_from,
166        };
167
168        let client = Client::new(
169            client_type,
170            participant,
171            connected_from,
172            server_data.time,
173            None,
174            general_stream,
175            ping_stream,
176            register_stream,
177            character_screen_stream,
178            in_game_stream,
179            terrain_stream,
180        );
181
182        client_sender.send(client)?;
183        Ok(())
184    }
185}
186
187impl Drop for ConnectionHandler {
188    fn drop(&mut self) {
189        let _ = self
190            .stop_sender
191            .take()
192            .expect("`stop_sender` is private, initialized as `Some`, and only updated in Drop")
193            .send(());
194        trace!("aborting ConnectionHandler");
195        self.thread_handle
196            .take()
197            .expect("`thread_handle` is private, initialized as `Some`, and only updated in Drop")
198            .abort();
199        trace!("aborted ConnectionHandler!");
200    }
201}