1use crate::{Client, ClientType, ServerInfo};
2use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
3use futures_util::future::FutureExt;
4use network::{Network, Participant, Promises};
5use std::time::Duration;
6use tokio::{runtime::Runtime, select, sync::oneshot};
7use tracing::{debug, error, trace, warn};
8
9pub(crate) struct ServerInfoPacket {
10 pub info: ServerInfo,
11 pub time: f64,
12}
13
14pub(crate) type IncomingClient = Client;
15
16pub(crate) struct ConnectionHandler {
17 _network_receiver: oneshot::Receiver<Network>,
24 thread_handle: Option<tokio::task::JoinHandle<()>>,
25 pub client_receiver: Receiver<IncomingClient>,
26 pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
27 stop_sender: Option<oneshot::Sender<()>>,
28}
29
30impl ConnectionHandler {
35 pub fn new(network: Network, runtime: &Runtime) -> Self {
36 let (stop_sender, stop_receiver) = oneshot::channel();
37 let (network_sender, _network_receiver) = oneshot::channel();
38
39 let (client_sender, client_receiver) = unbounded::<IncomingClient>();
40 let (info_requester_sender, info_requester_receiver) =
41 bounded::<Sender<ServerInfoPacket>>(1);
42
43 let thread_handle = Some(runtime.spawn(Self::work(
44 network,
45 client_sender,
46 info_requester_sender,
47 stop_receiver,
48 network_sender,
49 )));
50
51 Self {
52 thread_handle,
53 client_receiver,
54 info_requester_receiver,
55 stop_sender: Some(stop_sender),
56 _network_receiver,
57 }
58 }
59
60 async fn work(
61 network: Network,
62 client_sender: Sender<IncomingClient>,
63 info_requester_sender: Sender<Sender<ServerInfoPacket>>,
64 stop_receiver: oneshot::Receiver<()>,
65 network_sender: oneshot::Sender<Network>,
66 ) {
67 let mut network_sender = Some(network_sender);
70 let mut network = drop_guard::guard(network, move |network| {
71 let _ = network_sender
74 .take()
75 .expect("Only used once in drop")
76 .send(network);
77 });
78 let mut stop_receiver = stop_receiver.fuse();
79 loop {
80 let participant = match select!(
81 _ = &mut stop_receiver => None,
82 p = network.connected().fuse() => Some(p),
83 ) {
84 None => break,
85 Some(Ok(p)) => p,
86 Some(Err(e)) => {
87 error!(
88 ?e,
89 "Stopping Connection Handler, no new connections can be made to server \
90 now!"
91 );
92 break;
93 },
94 };
95
96 let client_sender = client_sender.clone();
97 let info_requester_sender = info_requester_sender.clone();
98
99 match select!(
100 _ = &mut stop_receiver => None,
101 e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e),
102 ) {
103 None => break,
104 Some(Ok(())) => (),
105 Some(Err(e)) => warn!(?e, "drop new participant, because an error occurred"),
106 }
107 }
108 }
109
110 async fn init_participant(
111 mut participant: Participant,
112 client_sender: Sender<IncomingClient>,
113 info_requester_sender: Sender<Sender<ServerInfoPacket>>,
114 ) -> Result<(), Box<dyn std::error::Error>> {
115 debug!("New Participant connected to the server");
116 let (sender, receiver) = bounded(1);
117 info_requester_sender.send(sender)?;
118
119 let reliable = Promises::ORDERED | Promises::CONSISTENCY;
120 let reliablec = reliable | Promises::COMPRESSED;
121
122 let general_stream = participant.open(3, reliablec, 500).await?;
123 let ping_stream = participant.open(2, reliable, 500).await?;
124 let mut register_stream = participant.open(3, reliablec, 500).await?;
125 let character_screen_stream = participant.open(3, reliablec, 500).await?;
126 let in_game_stream = participant.open(3, reliablec, 100_000).await?;
127 let terrain_stream = participant.open(4, reliable, 20_000).await?;
128
129 let server_data = receiver.recv()?;
130
131 register_stream.send(server_data.info)?;
132
133 const TIMEOUT: Duration = Duration::from_secs(5);
134 let client_type = match select!(
135 _ = tokio::time::sleep(TIMEOUT).fuse() => None,
136 t = register_stream.recv::<ClientType>().fuse() => Some(t),
137 ) {
138 None => {
139 debug!("Timeout for incoming client elapsed, aborting connection");
140 return Ok(());
141 },
142 Some(client_type) => client_type?,
143 };
144
145 use network::ParticipantEvent;
146 let connected_from = match select!(
147 _ = tokio::time::sleep(TIMEOUT).fuse() => None,
148 connected_from = participant.fetch_event().fuse() => Some(connected_from),
149 ) {
150 None => {
151 error!("Did not receive initial channel created event. This is a bug!");
152 return Ok(());
153 },
154 Some(Err(err)) => {
155 debug!("Participant error when trying to receive event: {err:?}");
156 return Ok(());
157 },
158 Some(Ok(ParticipantEvent::ChannelDeleted(_))) => {
159 error!(
160 "Received channel deleted event instead of the initial channel created event. \
161 This is a bug!"
162 );
163 return Ok(());
164 },
165 Some(Ok(ParticipantEvent::ChannelCreated(connected_from))) => connected_from,
166 };
167
168 let client = Client::new(
169 client_type,
170 participant,
171 connected_from,
172 server_data.time,
173 None,
174 general_stream,
175 ping_stream,
176 register_stream,
177 character_screen_stream,
178 in_game_stream,
179 terrain_stream,
180 );
181
182 client_sender.send(client)?;
183 Ok(())
184 }
185}
186
187impl Drop for ConnectionHandler {
188 fn drop(&mut self) {
189 let _ = self
190 .stop_sender
191 .take()
192 .expect("`stop_sender` is private, initialized as `Some`, and only updated in Drop")
193 .send(());
194 trace!("aborting ConnectionHandler");
195 self.thread_handle
196 .take()
197 .expect("`thread_handle` is private, initialized as `Some`, and only updated in Drop")
198 .abort();
199 trace!("aborted ConnectionHandler!");
200 }
201}