1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
use crate::{Client, ClientType, ServerInfo};
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use futures_util::future::FutureExt;
use network::{Network, Participant, Promises};
use std::time::Duration;
use tokio::{runtime::Runtime, select, sync::oneshot};
use tracing::{debug, error, trace, warn};

pub(crate) struct ServerInfoPacket {
    pub info: ServerInfo,
    pub time: f64,
}

pub(crate) type IncomingClient = Client;

pub(crate) struct ConnectionHandler {
    /// We never actually use this, but if it's dropped before the network has a
    /// chance to exit, it won't block the main thread, and if it is dropped
    /// after the network thread ends, it will drop the network here (rather
    /// than delaying the network thread).  So it emulates the effects of
    /// storing the network in an Arc, without us losing mutability in the
    /// network thread.
    _network_receiver: oneshot::Receiver<Network>,
    thread_handle: Option<tokio::task::JoinHandle<()>>,
    pub client_receiver: Receiver<IncomingClient>,
    pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
    stop_sender: Option<oneshot::Sender<()>>,
}

/// Instead of waiting the main loop we are handling connections, especially
/// their slow network .await part on a different thread. We need to communicate
/// to the Server main thread sometimes though to get the current server_info
/// and time
impl ConnectionHandler {
    pub fn new(network: Network, runtime: &Runtime) -> Self {
        let (stop_sender, stop_receiver) = oneshot::channel();
        let (network_sender, _network_receiver) = oneshot::channel();

        let (client_sender, client_receiver) = unbounded::<IncomingClient>();
        let (info_requester_sender, info_requester_receiver) =
            bounded::<Sender<ServerInfoPacket>>(1);

        let thread_handle = Some(runtime.spawn(Self::work(
            network,
            client_sender,
            info_requester_sender,
            stop_receiver,
            network_sender,
        )));

        Self {
            thread_handle,
            client_receiver,
            info_requester_receiver,
            stop_sender: Some(stop_sender),
            _network_receiver,
        }
    }

    async fn work(
        network: Network,
        client_sender: Sender<IncomingClient>,
        info_requester_sender: Sender<Sender<ServerInfoPacket>>,
        stop_receiver: oneshot::Receiver<()>,
        network_sender: oneshot::Sender<Network>,
    ) {
        // Emulate the effects of storing the network in an Arc, without losing
        // mutability.
        let mut network_sender = Some(network_sender);
        let mut network = drop_guard::guard(network, move |network| {
            // If the network receiver was already dropped, we just drop the network here,
            // just like Arc, so we don't care about the result.
            let _ = network_sender
                .take()
                .expect("Only used once in drop")
                .send(network);
        });
        let mut stop_receiver = stop_receiver.fuse();
        loop {
            let participant = match select!(
                _ = &mut stop_receiver => None,
                p = network.connected().fuse() => Some(p),
            ) {
                None => break,
                Some(Ok(p)) => p,
                Some(Err(e)) => {
                    error!(
                        ?e,
                        "Stopping Connection Handler, no new connections can be made to server \
                         now!"
                    );
                    break;
                },
            };

            let client_sender = client_sender.clone();
            let info_requester_sender = info_requester_sender.clone();

            match select!(
                _ = &mut stop_receiver => None,
                e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e),
            ) {
                None => break,
                Some(Ok(())) => (),
                Some(Err(e)) => warn!(?e, "drop new participant, because an error occurred"),
            }
        }
    }

    async fn init_participant(
        mut participant: Participant,
        client_sender: Sender<IncomingClient>,
        info_requester_sender: Sender<Sender<ServerInfoPacket>>,
    ) -> Result<(), Box<dyn std::error::Error>> {
        debug!("New Participant connected to the server");
        let (sender, receiver) = bounded(1);
        info_requester_sender.send(sender)?;

        let reliable = Promises::ORDERED | Promises::CONSISTENCY;
        let reliablec = reliable | Promises::COMPRESSED;

        let general_stream = participant.open(3, reliablec, 500).await?;
        let ping_stream = participant.open(2, reliable, 500).await?;
        let mut register_stream = participant.open(3, reliablec, 500).await?;
        let character_screen_stream = participant.open(3, reliablec, 500).await?;
        let in_game_stream = participant.open(3, reliablec, 100_000).await?;
        let terrain_stream = participant.open(4, reliable, 20_000).await?;

        let server_data = receiver.recv()?;

        register_stream.send(server_data.info)?;

        const TIMEOUT: Duration = Duration::from_secs(5);
        let client_type = match select!(
            _ = tokio::time::sleep(TIMEOUT).fuse() => None,
            t = register_stream.recv::<ClientType>().fuse() => Some(t),
        ) {
            None => {
                debug!("Timeout for incoming client elapsed, aborting connection");
                return Ok(());
            },
            Some(client_type) => client_type?,
        };

        use network::ParticipantEvent;
        let connected_from = match select!(
            _ = tokio::time::sleep(TIMEOUT).fuse() => None,
            connected_from = participant.fetch_event().fuse() => Some(connected_from),
        ) {
            None => {
                error!("Did not receive initial channel created event. This is a bug!");
                return Ok(());
            },
            Some(Err(err)) => {
                debug!("Participant error when trying to receive event: {err:?}");
                return Ok(());
            },
            Some(Ok(ParticipantEvent::ChannelDeleted(_))) => {
                error!(
                    "Received channel deleted event instead of the initial channel created event. \
                     This is a bug!"
                );
                return Ok(());
            },
            Some(Ok(ParticipantEvent::ChannelCreated(connected_from))) => connected_from,
        };

        let client = Client::new(
            client_type,
            participant,
            connected_from,
            server_data.time,
            None,
            general_stream,
            ping_stream,
            register_stream,
            character_screen_stream,
            in_game_stream,
            terrain_stream,
        );

        client_sender.send(client)?;
        Ok(())
    }
}

impl Drop for ConnectionHandler {
    fn drop(&mut self) {
        let _ = self
            .stop_sender
            .take()
            .expect("`stop_sender` is private, initialized as `Some`, and only updated in Drop")
            .send(());
        trace!("aborting ConnectionHandler");
        self.thread_handle
            .take()
            .expect("`thread_handle` is private, initialized as `Some`, and only updated in Drop")
            .abort();
        trace!("aborted ConnectionHandler!");
    }
}