veloren_network/api.rs
1use crate::{
2 channel::ProtocolsError,
3 message::{Message, partial_eq_bincode},
4 participant::{A2bStreamOpen, S2bShutdownBparticipant},
5 scheduler::{A2sConnect, Scheduler},
6};
7use bytes::Bytes;
8use hashbrown::HashMap;
9#[cfg(feature = "compression")]
10use lz_fear::raw::DecodeError;
11use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
12#[cfg(feature = "metrics")]
13use prometheus::Registry;
14use serde::{Serialize, de::DeserializeOwned};
15use std::{
16 net::SocketAddr,
17 sync::{
18 Arc,
19 atomic::{AtomicBool, Ordering},
20 },
21 time::Duration,
22};
23use tokio::{
24 io,
25 runtime::Runtime,
26 sync::{Mutex, mpsc, oneshot, watch},
27};
28use tracing::*;
29
30type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
31
32/// Represents a Tcp, Quic, Udp or Mpsc connection address
33#[derive(Clone, Debug)]
34pub enum ConnectAddr {
35 Tcp(SocketAddr),
36 Udp(SocketAddr),
37 #[cfg(feature = "quic")]
38 Quic(SocketAddr, quinn::ClientConfig, String),
39 Mpsc(u64),
40}
41
42impl ConnectAddr {
43 /// Returns the `Some` if the protocol is TCP or QUIC and `None` if the
44 /// protocol is a local channel (mpsc).
45 pub fn socket_addr(&self) -> Option<SocketAddr> {
46 match self {
47 Self::Tcp(addr) => Some(*addr),
48 Self::Udp(addr) => Some(*addr),
49 Self::Mpsc(_) => None,
50 #[cfg(feature = "quic")]
51 Self::Quic(addr, _, _) => Some(*addr),
52 }
53 }
54}
55
56/// Represents a Tcp, Quic, Udp or Mpsc listen address
57#[derive(Clone, Debug)]
58pub enum ListenAddr {
59 Tcp(SocketAddr),
60 Udp(SocketAddr),
61 #[cfg(feature = "quic")]
62 Quic(SocketAddr, quinn::ServerConfig),
63 Mpsc(u64),
64}
65
66/// A Participant can throw different events, you are obligated to carefully
67/// empty the queue from time to time.
68#[derive(Clone, Debug)]
69pub enum ParticipantEvent {
70 ChannelCreated(ConnectAddr),
71 ChannelDeleted(ConnectAddr),
72}
73
74/// `Participants` are generated by the [`Network`] and represent a connection
75/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
76/// [`Networks`] on how to generate `Participants`
77///
78/// [`Networks`]: crate::api::Network
79/// [`connect`]: Network::connect
80/// [`connected`]: Network::connected
81pub struct Participant {
82 local_pid: Pid,
83 remote_pid: Pid,
84 a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
85 b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
86 b2a_event_r: mpsc::UnboundedReceiver<ParticipantEvent>,
87 b2a_bandwidth_stats_r: watch::Receiver<f32>,
88 a2s_disconnect_s: A2sDisconnect,
89}
90
91/// `Streams` represents a channel to send `n` messages with a certain priority
92/// and [`Promises`]. messages need always to be send between 2 `Streams`.
93///
94/// `Streams` are generated by the [`Participant`].
95/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
96/// generate `Streams`
97///
98/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
99/// mutability, as multiple threads don't need access to the same `Stream`.
100///
101/// [`Networks`]: crate::api::Network
102/// [`open`]: Participant::open
103/// [`opened`]: Participant::opened
104#[derive(Debug)]
105pub struct Stream {
106 local_pid: Pid,
107 remote_pid: Pid,
108 sid: Sid,
109 #[expect(dead_code)]
110 prio: Prio,
111 promises: Promises,
112 #[expect(dead_code)]
113 guaranteed_bandwidth: Bandwidth,
114 send_closed: Arc<AtomicBool>,
115 a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
116 b2a_msg_recv_r: Option<async_channel::Receiver<Bytes>>,
117 a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
118}
119
120/// Error type thrown by [`Networks`](Network) methods
121#[derive(Debug)]
122pub enum NetworkError {
123 NetworkClosed,
124 ListenFailed(io::Error),
125 ConnectFailed(NetworkConnectError),
126}
127
128/// Error type thrown by [`Networks`](Network) connect
129#[derive(Debug)]
130pub enum NetworkConnectError {
131 /// Either a Pid UUID clash or you are trying to hijack a connection
132 InvalidSecret,
133 Handshake(InitProtocolError<ProtocolsError>),
134 Io(io::Error),
135}
136
137/// Error type thrown by [`Participants`](Participant) methods
138#[derive(Debug, PartialEq, Eq, Clone)]
139pub enum ParticipantError {
140 ///Participant was closed by remote side
141 ParticipantDisconnected,
142 ///Underlying Protocol failed and wasn't able to recover, expect some Data
143 /// loss unfortunately, there is no method to get the exact messages
144 /// that failed. This is also returned when local side tries to do
145 /// something while remote site gracefully disconnects
146 ProtocolFailedUnrecoverable,
147}
148
149/// Error type thrown by [`Streams`](Stream) methods
150/// A Compression Error should only happen if a client sends malicious code.
151/// A Deserialize Error probably means you are expecting Type X while you
152/// actually got send type Y.
153#[derive(Debug)]
154pub enum StreamError {
155 StreamClosed,
156 #[cfg(feature = "compression")]
157 Compression(DecodeError),
158 Deserialize(bincode::Error),
159}
160
161/// All Parameters of a Stream, can be used to generate RawMessages
162#[derive(Debug, Clone)]
163pub struct StreamParams {
164 pub(crate) promises: Promises,
165}
166
167/// Use the `Network` to create connections to other [`Participants`]
168///
169/// The `Network` is the single source that handles all connections in your
170/// Application. You can pass it around multiple threads in an
171/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
172///
173/// The `Network` has methods to [`connect`] to other [`Participants`] actively
174/// via their [`ConnectAddr`], or [`listen`] passively for [`connected`]
175/// [`Participants`] via [`ListenAddr`].
176///
177/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be dropped before
178/// the Network.
179///
180/// # Examples
181/// ```rust
182/// use tokio::runtime::Runtime;
183/// use veloren_network::{Network, ConnectAddr, ListenAddr, Pid};
184///
185/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
186/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
187/// let runtime = Runtime::new().unwrap();
188/// let mut network = Network::new(Pid::new(), &runtime);
189/// runtime.block_on(async{
190/// # //setup pseudo database!
191/// # let database = Network::new(Pid::new(), &runtime);
192/// # database.listen(ListenAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
193/// network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
194/// let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
195/// drop(network);
196/// # drop(database);
197/// # Ok(())
198/// })
199/// # }
200/// ```
201///
202/// [`Participants`]: crate::api::Participant
203/// [`Runtime`]: tokio::runtime::Runtime
204/// [`connect`]: Network::connect
205/// [`listen`]: Network::listen
206/// [`connected`]: Network::connected
207/// [`ConnectAddr`]: crate::api::ConnectAddr
208/// [`ListenAddr`]: crate::api::ListenAddr
209pub struct Network {
210 local_pid: Pid,
211 participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
212 listen_sender: mpsc::UnboundedSender<(ListenAddr, oneshot::Sender<io::Result<()>>)>,
213 connect_sender: mpsc::UnboundedSender<A2sConnect>,
214 connected_receiver: mpsc::UnboundedReceiver<Participant>,
215 shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
216}
217
218impl Network {
219 /// Generates a new `Network` to handle all connections in an Application
220 ///
221 /// # Arguments
222 /// * `participant_id` - provide it by calling [`Pid::new()`], usually you
223 /// don't want to reuse a Pid for 2 `Networks`
224 /// * `runtime` - provide a [`Runtime`], it's used to internally spawn
225 /// tasks. It is necessary to clean up in the non-async `Drop`. **All**
226 /// network related components **must** be dropped before the runtime is
227 /// stopped. dropping the runtime while a shutdown is still in progress
228 /// leaves the network in a bad state which might cause a panic!
229 ///
230 /// # Result
231 /// * `Self` - returns a `Network` which can be `Send` to multiple areas of
232 /// your code, including multiple threads. This is the base strct of this
233 /// crate.
234 ///
235 /// # Examples
236 /// ```rust
237 /// use tokio::runtime::Runtime;
238 /// use veloren_network::{Network, Pid};
239 ///
240 /// let runtime = Runtime::new().unwrap();
241 /// let network = Network::new(Pid::new(), &runtime);
242 /// ```
243 ///
244 /// Usually you only create a single `Network` for an application,
245 /// except when client and server are in the same application, then you
246 /// will want 2. However there are no technical limitations from
247 /// creating more.
248 ///
249 /// [`Pid::new()`]: network_protocol::Pid::new
250 /// [`Runtime`]: tokio::runtime::Runtime
251 pub fn new(participant_id: Pid, runtime: &Runtime) -> Self {
252 Self::internal_new(
253 participant_id,
254 runtime,
255 #[cfg(feature = "metrics")]
256 None,
257 )
258 }
259
260 /// See [`new`]
261 ///
262 /// # additional Arguments
263 /// * `registry` - Provide a Registry in order to collect Prometheus metrics
264 /// by this `Network`, `None` will deactivate Tracing. Tracing is done via
265 /// [`prometheus`]
266 ///
267 /// # Examples
268 /// ```rust
269 /// use prometheus::Registry;
270 /// use tokio::runtime::Runtime;
271 /// use veloren_network::{Network, Pid};
272 ///
273 /// let runtime = Runtime::new().unwrap();
274 /// let registry = Registry::new();
275 /// let network = Network::new_with_registry(Pid::new(), &runtime, ®istry);
276 /// ```
277 /// [`new`]: crate::api::Network::new
278 #[cfg(feature = "metrics")]
279 pub fn new_with_registry(participant_id: Pid, runtime: &Runtime, registry: &Registry) -> Self {
280 Self::internal_new(participant_id, runtime, Some(registry))
281 }
282
283 fn internal_new(
284 participant_id: Pid,
285 runtime: &Runtime,
286 #[cfg(feature = "metrics")] registry: Option<&Registry>,
287 ) -> Self {
288 let p = participant_id;
289 let span = info_span!("network", ?p);
290 span.in_scope(|| trace!("Starting Network"));
291 let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
292 Scheduler::new(
293 participant_id,
294 #[cfg(feature = "metrics")]
295 registry,
296 );
297 let participant_disconnect_sender = Arc::new(Mutex::new(HashMap::new()));
298 let (shutdown_network_s, shutdown_network_r) = oneshot::channel();
299 let f = Self::shutdown_mgr(
300 p,
301 shutdown_network_r,
302 Arc::clone(&participant_disconnect_sender),
303 shutdown_sender,
304 );
305 runtime.spawn(f);
306 runtime.spawn(
307 async move {
308 trace!("Starting scheduler in own thread");
309 scheduler.run().await;
310 trace!("Stopping scheduler and his own thread");
311 }
312 .instrument(info_span!("network", ?p)),
313 );
314 Self {
315 local_pid: participant_id,
316 participant_disconnect_sender,
317 listen_sender,
318 connect_sender,
319 connected_receiver,
320 shutdown_network_s: Some(shutdown_network_s),
321 }
322 }
323
324 /// starts listening on an [`ListenAddr`].
325 /// When the method returns the `Network` is ready to listen for incoming
326 /// connections OR has returned a [`NetworkError`] (e.g. port already used).
327 /// You can call [`connected`] to asynchrony wait for a [`Participant`] to
328 /// connect. You can call `listen` on multiple addresses, e.g. to
329 /// support multiple Protocols or NICs.
330 ///
331 /// # Examples
332 /// ```ignore
333 /// use tokio::runtime::Runtime;
334 /// use veloren_network::{Network, Pid, ListenAddr};
335 ///
336 /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
337 /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
338 /// let runtime = Runtime::new().unwrap();
339 /// let mut network = Network::new(Pid::new(), &runtime);
340 /// runtime.block_on(async {
341 /// network
342 /// .listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
343 /// .await?;
344 /// network
345 /// .listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
346 /// .await?;
347 /// drop(network);
348 /// # Ok(())
349 /// })
350 /// # }
351 /// ```
352 ///
353 /// [`connected`]: Network::connected
354 /// [`ListenAddr`]: crate::api::ListenAddr
355 #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
356 pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> {
357 let (s2a_result_s, s2a_result_r) = oneshot::channel::<io::Result<()>>();
358 debug!(?address, "listening on address");
359 self.listen_sender.send((address, s2a_result_s))?;
360 match s2a_result_r.await? {
361 //waiting guarantees that we either listened successfully or get an error like port in
362 // use
363 Ok(()) => Ok(()),
364 Err(e) => Err(NetworkError::ListenFailed(e)),
365 }
366 }
367
368 /// starts connection to an [`ConnectAddr`].
369 /// When the method returns the Network either returns a [`Participant`]
370 /// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
371 /// can't connect, or invalid Handshake) # Examples
372 /// ```ignore
373 /// use tokio::runtime::Runtime;
374 /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
375 ///
376 /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
377 /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
378 /// let runtime = Runtime::new().unwrap();
379 /// let network = Network::new(Pid::new(), &runtime);
380 /// # let remote = Network::new(Pid::new(), &runtime);
381 /// runtime.block_on(async {
382 /// # remote.listen(ListenAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
383 /// # remote.listen(ListenAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
384 /// let p1 = network
385 /// .connect(ConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
386 /// .await?;
387 /// # //this doesn't work yet, so skip the test
388 /// # //TODO fixme!
389 /// # return Ok(());
390 /// let p2 = network
391 /// .connect(ConnectAddr::Udp("127.0.0.1:2011".parse().unwrap()))
392 /// .await?;
393 /// assert_eq!(&p1, &p2);
394 /// # Ok(())
395 /// })?;
396 /// drop(network);
397 /// # drop(remote);
398 /// # Ok(())
399 /// # }
400 /// ```
401 /// Usually the `Network` guarantees that a operation on a [`Participant`]
402 /// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
403 /// disconnecting from the remote. If 2 [`ConnectAddr] you
404 /// `connect` to belongs to the same [`Participant`], you get the same
405 /// [`Participant`] as a result. This is useful e.g. by connecting to
406 /// the same [`Participant`] via multiple Protocols.
407 ///
408 /// [`Streams`]: crate::api::Stream
409 /// [`ConnectAddr`]: crate::api::ConnectAddr
410 #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
411 pub async fn connect(&self, address: ConnectAddr) -> Result<Participant, NetworkError> {
412 let (pid_sender, pid_receiver) =
413 oneshot::channel::<Result<Participant, NetworkConnectError>>();
414 debug!(?address, "Connect to address");
415 self.connect_sender.send((address, pid_sender))?;
416 let participant = match pid_receiver.await? {
417 Ok(p) => p,
418 Err(e) => return Err(NetworkError::ConnectFailed(e)),
419 };
420 let remote_pid = participant.remote_pid;
421 trace!(?remote_pid, "connected");
422 self.participant_disconnect_sender
423 .lock()
424 .await
425 .insert(remote_pid, Arc::clone(&participant.a2s_disconnect_s));
426 Ok(participant)
427 }
428
429 /// Returns a [`Participant`] created from a [`ListenAddr`] you
430 /// called [`listen`] on before. This function will either return a
431 /// working [`Participant`] ready to open [`Streams`] on OR has returned
432 /// a [`NetworkError`] (e.g. Network got closed)
433 ///
434 /// # Examples
435 /// ```rust
436 /// use tokio::runtime::Runtime;
437 /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid};
438 ///
439 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
440 /// // Create a Network, listen on port `2020` TCP and opens returns their Pid
441 /// let runtime = Runtime::new().unwrap();
442 /// let mut network = Network::new(Pid::new(), &runtime);
443 /// # let remote = Network::new(Pid::new(), &runtime);
444 /// runtime.block_on(async {
445 /// network
446 /// .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
447 /// .await?;
448 /// # remote.connect(ConnectAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
449 /// while let Ok(participant) = network.connected().await {
450 /// println!("Participant connected: {}", participant.remote_pid());
451 /// # //skip test here as it would be a endless loop
452 /// # break;
453 /// }
454 /// drop(network);
455 /// # drop(remote);
456 /// # Ok(())
457 /// })
458 /// # }
459 /// ```
460 ///
461 /// [`Streams`]: crate::api::Stream
462 /// [`listen`]: crate::api::Network::listen
463 /// [`ListenAddr`]: crate::api::ListenAddr
464 #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
465 pub async fn connected(&mut self) -> Result<Participant, NetworkError> {
466 let participant = self
467 .connected_receiver
468 .recv()
469 .await
470 .ok_or(NetworkError::NetworkClosed)?;
471 self.participant_disconnect_sender.lock().await.insert(
472 participant.remote_pid,
473 Arc::clone(&participant.a2s_disconnect_s),
474 );
475 Ok(participant)
476 }
477
478 /// Use a mgr to handle shutdown smoothly and not in `Drop`
479 #[instrument(name="network", skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid))]
480 async fn shutdown_mgr(
481 local_pid: Pid,
482 shutdown_network_r: oneshot::Receiver<oneshot::Sender<()>>,
483 participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
484 shutdown_scheduler_s: oneshot::Sender<()>,
485 ) {
486 trace!("waiting for shutdown triggerNetwork");
487 let return_s = shutdown_network_r.await;
488 trace!("Shutting down Participants of Network");
489 let mut finished_receiver_list = vec![];
490
491 for (remote_pid, a2s_disconnect_s) in participant_disconnect_sender.lock().await.drain() {
492 match a2s_disconnect_s.lock().await.take() {
493 Some(a2s_disconnect_s) => {
494 trace!(?remote_pid, "Participants will be closed");
495 let (finished_sender, finished_receiver) = oneshot::channel();
496 finished_receiver_list.push((remote_pid, finished_receiver));
497 // If the channel was already dropped, we can assume that the other side
498 // already released its resources.
499 let _ = a2s_disconnect_s
500 .send((remote_pid, (Duration::from_secs(10), finished_sender)));
501 },
502 None => trace!(?remote_pid, "Participant already disconnected gracefully"),
503 }
504 }
505 //wait after close is requested for all
506 for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
507 match finished_receiver.await {
508 Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
509 Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
510 Err(e) => warn!(
511 ?remote_pid,
512 ?e,
513 "Failed to get a message back from the scheduler, seems like the network is \
514 already closed"
515 ),
516 }
517 }
518
519 trace!("Participants have shut down - next: Scheduler");
520 if let Err(()) = shutdown_scheduler_s.send(()) {
521 error!("Scheduler is closed, but nobody other should be able to close it")
522 };
523 if let Ok(return_s) = return_s {
524 if return_s.send(()).is_err() {
525 warn!("Network::drop stopped after a timeout and didn't wait for our shutdown");
526 };
527 }
528 debug!("Network has shut down");
529 }
530}
531
532impl Participant {
533 pub(crate) fn new(
534 local_pid: Pid,
535 remote_pid: Pid,
536 a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
537 b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
538 b2a_event_r: mpsc::UnboundedReceiver<ParticipantEvent>,
539 b2a_bandwidth_stats_r: watch::Receiver<f32>,
540 a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
541 ) -> Self {
542 Self {
543 local_pid,
544 remote_pid,
545 a2b_open_stream_s,
546 b2a_stream_opened_r,
547 b2a_event_r,
548 b2a_bandwidth_stats_r,
549 a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
550 }
551 }
552
553 /// Opens a [`Stream`] on this `Participant` with a certain Priority and
554 /// [`Promises`]
555 ///
556 /// # Arguments
557 /// * `prio` - defines which stream is processed first when limited on
558 /// bandwidth. See [`Prio`] for documentation.
559 /// * `promises` - use a combination of you preferred [`Promises`], see the
560 /// link for further documentation. You can combine them, e.g.
561 /// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
562 /// guarantee that those promises are met.
563 /// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this
564 /// stream. When excess bandwidth is available it will be used. See
565 /// [`Bandwidth`] for details.
566 ///
567 /// A [`ParticipantError`] might be thrown if the `Participant` is already
568 /// closed. [`Streams`] can be created without a answer from the remote
569 /// side, resulting in very fast creation and closing latency.
570 ///
571 /// # Examples
572 /// ```rust
573 /// use tokio::runtime::Runtime;
574 /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
575 ///
576 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
577 /// // Create a Network, connect on port 2100 and open a stream
578 /// let runtime = Runtime::new().unwrap();
579 /// let network = Network::new(Pid::new(), &runtime);
580 /// # let remote = Network::new(Pid::new(), &runtime);
581 /// runtime.block_on(async {
582 /// # remote.listen(ListenAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
583 /// let p1 = network
584 /// .connect(ConnectAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
585 /// .await?;
586 /// let _s1 = p1
587 /// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
588 /// .await?;
589 /// drop(network);
590 /// # drop(remote);
591 /// # Ok(())
592 /// })
593 /// # }
594 /// ```
595 ///
596 /// [`Prio`]: network_protocol::Prio
597 /// [`Bandwidth`]: network_protocol::Bandwidth
598 /// [`Promises`]: network_protocol::Promises
599 /// [`Streams`]: crate::api::Stream
600 #[instrument(name="network", skip(self, prio, promises, bandwidth), fields(p = %self.local_pid))]
601 pub async fn open(
602 &self,
603 prio: u8,
604 promises: Promises,
605 bandwidth: Bandwidth,
606 ) -> Result<Stream, ParticipantError> {
607 debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio");
608 let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
609 if let Err(e) =
610 self.a2b_open_stream_s
611 .send((prio, promises, bandwidth, p2a_return_stream_s))
612 {
613 debug!(?e, "bParticipant is already closed, notifying");
614 return Err(ParticipantError::ParticipantDisconnected);
615 }
616 match p2a_return_stream_r.await {
617 Ok(stream) => {
618 let sid = stream.sid;
619 trace!(?sid, "opened stream");
620 Ok(stream)
621 },
622 Err(_) => {
623 debug!("p2a_return_stream_r failed, closing participant");
624 Err(ParticipantError::ParticipantDisconnected)
625 },
626 }
627 }
628
629 /// Use this method to handle [`Streams`] opened from remote site, like the
630 /// [`connected`] method of [`Network`]. This is the associated method
631 /// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
632 /// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
633 /// the other side. A [`ParticipantError`] might be thrown if the
634 /// `Participant` is already closed.
635 ///
636 /// # Examples
637 /// ```rust
638 /// use tokio::runtime::Runtime;
639 /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises};
640 ///
641 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
642 /// // Create a Network, connect on port 2110 and wait for the other side to open a stream
643 /// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
644 /// let runtime = Runtime::new().unwrap();
645 /// let mut network = Network::new(Pid::new(), &runtime);
646 /// # let mut remote = Network::new(Pid::new(), &runtime);
647 /// runtime.block_on(async {
648 /// # remote.listen(ListenAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
649 /// let mut p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
650 /// # let p2 = remote.connected().await?;
651 /// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
652 /// let _s1 = p1.opened().await?;
653 /// drop(network);
654 /// # drop(remote);
655 /// # Ok(())
656 /// })
657 /// # }
658 /// ```
659 ///
660 /// [`Streams`]: crate::api::Stream
661 /// [`connected`]: Network::connected
662 /// [`open`]: Participant::open
663 #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
664 pub async fn opened(&mut self) -> Result<Stream, ParticipantError> {
665 match self.b2a_stream_opened_r.recv().await {
666 Some(stream) => {
667 let sid = stream.sid;
668 debug!(?sid, "Receive opened stream");
669 Ok(stream)
670 },
671 None => {
672 debug!("stream_opened_receiver failed, closing participant");
673 Err(ParticipantError::ParticipantDisconnected)
674 },
675 }
676 }
677
678 /// disconnecting a `Participant` in a async way.
679 /// Use this rather than `Participant::Drop` if you want to close multiple
680 /// `Participants`.
681 ///
682 /// This function will wait for all [`Streams`] to properly close, including
683 /// all messages to be send before closing. If an error occurs with one
684 /// of the messages.
685 /// Except if the remote side already dropped the `Participant`
686 /// simultaneously, then messages won't be send
687 ///
688 /// There is NO `disconnected` function in `Participant`, if a `Participant`
689 /// is no longer reachable (e.g. as the network cable was unplugged) the
690 /// `Participant` will fail all action, but needs to be manually
691 /// disconnected, using this function.
692 ///
693 /// # Examples
694 /// ```rust
695 /// use tokio::runtime::Runtime;
696 /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
697 ///
698 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
699 /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
700 /// let runtime = Runtime::new().unwrap();
701 /// let mut network = Network::new(Pid::new(), &runtime);
702 /// # let mut remote = Network::new(Pid::new(), &runtime);
703 /// let err = runtime.block_on(async {
704 /// network
705 /// .listen(ListenAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
706 /// .await?;
707 /// # let keep_alive = remote.connect(ConnectAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
708 /// while let Ok(participant) = network.connected().await {
709 /// println!("Participant connected: {}", participant.remote_pid());
710 /// participant.disconnect().await?;
711 /// # //skip test here as it would be a endless loop
712 /// # break;
713 /// }
714 /// # Ok(())
715 /// });
716 /// drop(network);
717 /// # drop(remote);
718 /// # err
719 /// # }
720 /// ```
721 ///
722 /// [`Streams`]: crate::api::Stream
723 #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
724 pub async fn disconnect(self) -> Result<(), ParticipantError> {
725 // Remove, Close and try_unwrap error when unwrap fails!
726 debug!("Closing participant from network");
727
728 //Streams will be closed by BParticipant
729 match self.a2s_disconnect_s.lock().await.take() {
730 Some(a2s_disconnect_s) => {
731 let (finished_sender, finished_receiver) = oneshot::channel();
732 // Participant is connecting to Scheduler here, not as usual
733 // Participant<->BParticipant
734
735 // If this is already dropped, we can assume the other side already freed its
736 // resources.
737 let _ = a2s_disconnect_s
738 .send((self.remote_pid, (Duration::from_secs(120), finished_sender)));
739 match finished_receiver.await {
740 Ok(res) => {
741 match res {
742 Ok(()) => trace!("Participant is now closed"),
743 Err(ref e) => {
744 trace!(?e, "Error occurred during shutdown of participant")
745 },
746 };
747 res
748 },
749 Err(e) => {
750 //this is a bug. but as i am Participant i can't destroy the network
751 error!(
752 ?e,
753 "Failed to get a message back from the scheduler, seems like the \
754 network is already closed"
755 );
756 Err(ParticipantError::ProtocolFailedUnrecoverable)
757 },
758 }
759 },
760 None => {
761 warn!(
762 "seems like you are trying to disconnecting a participant after the network \
763 was already dropped. It was already dropped with the network!"
764 );
765 Err(ParticipantError::ParticipantDisconnected)
766 },
767 }
768 }
769
770 /// Use this method to query [`ParticipantEvent`]. Those are internal events
771 /// from the network crate that will get reported to the frontend.
772 /// E.g. Creation and Deletion of Channels.
773 ///
774 /// Make sure to call this function from time to time to not let events
775 /// stack up endlessly and create a memory leak.
776 ///
777 /// # Examples
778 /// ```rust
779 /// use tokio::runtime::Runtime;
780 /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises, ParticipantEvent};
781 ///
782 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
783 /// // Create a Network, connect on port 2040 and wait for the other side to open a stream
784 /// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
785 /// let runtime = Runtime::new().unwrap();
786 /// let mut network = Network::new(Pid::new(), &runtime);
787 /// # let mut remote = Network::new(Pid::new(), &runtime);
788 /// runtime.block_on(async {
789 /// # remote.listen(ListenAddr::Tcp("127.0.0.1:2040".parse().unwrap())).await?;
790 /// let mut p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2040".parse().unwrap())).await?;
791 /// # let p2 = remote.connected().await?;
792 /// let event = p1.fetch_event().await?;
793 /// drop(network);
794 /// # drop(remote);
795 /// # Ok(())
796 /// })
797 /// # }
798 /// ```
799 ///
800 /// [`ParticipantEvent`]: crate::api::ParticipantEvent
801 pub async fn fetch_event(&mut self) -> Result<ParticipantEvent, ParticipantError> {
802 match self.b2a_event_r.recv().await {
803 Some(event) => Ok(event),
804 None => {
805 debug!("event_receiver failed, closing participant");
806 Err(ParticipantError::ParticipantDisconnected)
807 },
808 }
809 }
810
811 /// use `try_fetch_event` to check for a [`ParticipantEvent`] . This
812 /// function does not block and returns immediately. It's intended for
813 /// use in non-async context only. Other then that, the same rules apply
814 /// than for [`fetch_event`].
815 ///
816 /// [`ParticipantEvent`]: crate::api::ParticipantEvent
817 /// [`fetch_event`]: Participant::fetch_event
818 pub fn try_fetch_event(&mut self) -> Result<Option<ParticipantEvent>, ParticipantError> {
819 match self.b2a_event_r.try_recv() {
820 Ok(event) => Ok(Some(event)),
821 Err(mpsc::error::TryRecvError::Empty) => Ok(None),
822 Err(mpsc::error::TryRecvError::Disconnected) => {
823 Err(ParticipantError::ParticipantDisconnected)
824 },
825 }
826 }
827
828 /// Returns the current approximation on the maximum bandwidth available.
829 /// This WILL fluctuate based on the amount/size of send messages.
830 pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }
831
832 /// Returns the remote [`Pid`](network_protocol::Pid)
833 pub fn remote_pid(&self) -> Pid { self.remote_pid }
834}
835
836impl Stream {
837 pub(crate) fn new(
838 local_pid: Pid,
839 remote_pid: Pid,
840 sid: Sid,
841 prio: Prio,
842 promises: Promises,
843 guaranteed_bandwidth: Bandwidth,
844 send_closed: Arc<AtomicBool>,
845 a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
846 b2a_msg_recv_r: async_channel::Receiver<Bytes>,
847 a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
848 ) -> Self {
849 Self {
850 local_pid,
851 remote_pid,
852 sid,
853 prio,
854 promises,
855 guaranteed_bandwidth,
856 send_closed,
857 a2b_msg_s,
858 b2a_msg_recv_r: Some(b2a_msg_recv_r),
859 a2b_close_stream_s: Some(a2b_close_stream_s),
860 }
861 }
862
863 /// use to send a arbitrary message to the remote side, by having the remote
864 /// side also opened a `Stream` linked to this. the message will be
865 /// [`Serialized`], which actually is quite slow compared to most other
866 /// calculations done. A faster method [`send_raw`] exists, when extra
867 /// speed is needed. The other side needs to use the respective [`recv`]
868 /// function and know the type send.
869 ///
870 /// `send` is an exception to the `async` messages, as it's probably called
871 /// quite often so it doesn't wait for execution. Which also means, that
872 /// no feedback is provided. It's to assume that the Message got `send`
873 /// correctly. If a error occurred, the next call will return an Error.
874 /// If the [`Participant`] disconnected it will also be unable to be used
875 /// any more. A [`StreamError`] will be returned in the error case, e.g.
876 /// when the `Stream` got closed already.
877 ///
878 /// Note when a `Stream` is dropped locally, it will still send all
879 /// messages, though the `drop` will return immediately, however, when a
880 /// [`Participant`] gets gracefully shut down, all remaining messages
881 /// will be send. If the `Stream` is dropped from remote side no further
882 /// messages are send, because the remote side has no way of listening
883 /// to them either way. If the last channel is destroyed (e.g. by losing
884 /// the internet connection or non-graceful shutdown, pending messages
885 /// are also dropped.
886 ///
887 /// # Example
888 /// ```
889 /// # use veloren_network::Promises;
890 /// use tokio::runtime::Runtime;
891 /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
892 ///
893 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
894 /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
895 /// let runtime = Runtime::new().unwrap();
896 /// let mut network = Network::new(Pid::new(), &runtime);
897 /// # let remote = Network::new(Pid::new(), &runtime);
898 /// runtime.block_on(async {
899 /// network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
900 /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
901 /// # // keep it alive
902 /// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
903 /// let mut participant_a = network.connected().await?;
904 /// let mut stream_a = participant_a.opened().await?;
905 /// //Send Message
906 /// stream_a.send("Hello World")?;
907 /// drop(network);
908 /// # drop(remote);
909 /// # Ok(())
910 /// })
911 /// # }
912 /// ```
913 ///
914 /// [`send_raw`]: Stream::send_raw
915 /// [`recv`]: Stream::recv
916 /// [`Serialized`]: Serialize
917 #[inline]
918 pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
919 self.send_raw_move(Message::serialize(&msg, self.params()))
920 }
921
922 /// This methods give the option to skip multiple calls of [`bincode`] and
923 /// [`compress`], e.g. in case the same Message needs to send on
924 /// multiple `Streams` to multiple [`Participants`]. Other then that,
925 /// the same rules apply than for [`send`].
926 /// You need to create a Message via [`Message::serialize`].
927 ///
928 /// # Example
929 /// ```rust
930 /// # use veloren_network::Promises;
931 /// use tokio::runtime::Runtime;
932 /// use bincode;
933 /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
934 ///
935 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
936 /// let runtime = Runtime::new().unwrap();
937 /// let mut network = Network::new(Pid::new(), &runtime);
938 /// # let remote1 = Network::new(Pid::new(), &runtime);
939 /// # let remote2 = Network::new(Pid::new(), &runtime);
940 /// runtime.block_on(async {
941 /// network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
942 /// # let remote1_p = remote1.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
943 /// # let remote2_p = remote2.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
944 /// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
945 /// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
946 /// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
947 /// let mut participant_a = network.connected().await?;
948 /// let mut participant_b = network.connected().await?;
949 /// let mut stream_a = participant_a.opened().await?;
950 /// let mut stream_b = participant_b.opened().await?;
951 ///
952 /// //Prepare Message and decode it
953 /// let msg = Message::serialize("Hello World", stream_a.params());
954 /// //Send same Message to multiple Streams
955 /// stream_a.send_raw(&msg);
956 /// stream_b.send_raw(&msg);
957 /// drop(network);
958 /// # drop(remote1);
959 /// # drop(remote2);
960 /// # Ok(())
961 /// })
962 /// # }
963 /// ```
964 ///
965 /// [`send`]: Stream::send
966 /// [`Participants`]: crate::api::Participant
967 /// [`compress`]: lz_fear::raw::compress2
968 /// [`Message::serialize`]: crate::message::Message::serialize
969 #[inline]
970 pub fn send_raw(&self, message: &Message) -> Result<(), StreamError> {
971 self.send_raw_move(Message {
972 data: message.data.clone(),
973 #[cfg(feature = "compression")]
974 compressed: message.compressed,
975 })
976 }
977
978 fn send_raw_move(&self, message: Message) -> Result<(), StreamError> {
979 if self.send_closed.load(Ordering::Relaxed) {
980 return Err(StreamError::StreamClosed);
981 }
982 #[cfg(debug_assertions)]
983 message.verify(self.params());
984 self.a2b_msg_s.send((self.sid, message.data))?;
985 Ok(())
986 }
987
988 /// use `recv` to wait on a Message send from the remote side by their
989 /// `Stream`. The Message needs to implement [`DeserializeOwned`] and
990 /// thus, the resulting type must already be known by the receiving side.
991 /// If this is not know from the Application logic, one could use a `Enum`
992 /// and then handle the received message via a `match` state.
993 ///
994 /// A [`StreamError`] will be returned in the error case, e.g. when the
995 /// `Stream` got closed already.
996 ///
997 /// # Example
998 /// ```
999 /// # use veloren_network::Promises;
1000 /// use tokio::runtime::Runtime;
1001 /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
1002 ///
1003 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1004 /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
1005 /// let runtime = Runtime::new().unwrap();
1006 /// let mut network = Network::new(Pid::new(), &runtime);
1007 /// # let remote = Network::new(Pid::new(), &runtime);
1008 /// runtime.block_on(async {
1009 /// network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
1010 /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
1011 /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
1012 /// # stream_p.send("Hello World");
1013 /// let mut participant_a = network.connected().await?;
1014 /// let mut stream_a = participant_a.opened().await?;
1015 /// //Recv Message
1016 /// println!("{}", stream_a.recv::<String>().await?);
1017 /// drop(network);
1018 /// # drop(remote);
1019 /// # Ok(())
1020 /// })
1021 /// # }
1022 /// ```
1023 #[inline]
1024 pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
1025 self.recv_raw().await?.deserialize()
1026 }
1027
1028 /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or
1029 /// [`decompress`] is executed for performance reasons.
1030 ///
1031 /// # Example
1032 /// ```
1033 /// # use veloren_network::Promises;
1034 /// use tokio::runtime::Runtime;
1035 /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
1036 ///
1037 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1038 /// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
1039 /// let runtime = Runtime::new().unwrap();
1040 /// let mut network = Network::new(Pid::new(), &runtime);
1041 /// # let remote = Network::new(Pid::new(), &runtime);
1042 /// runtime.block_on(async {
1043 /// network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
1044 /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
1045 /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
1046 /// # stream_p.send("Hello World");
1047 /// let mut participant_a = network.connected().await?;
1048 /// let mut stream_a = participant_a.opened().await?;
1049 /// //Recv Message
1050 /// let msg = stream_a.recv_raw().await?;
1051 /// //Resend Message, without deserializing
1052 /// stream_a.send_raw(&msg)?;
1053 /// drop(network);
1054 /// # drop(remote);
1055 /// # Ok(())
1056 /// })
1057 /// # }
1058 /// ```
1059 ///
1060 /// [`send_raw`]: Stream::send_raw
1061 /// [`recv`]: Stream::recv
1062 /// [`decompress`]: lz_fear::raw::decompress_raw
1063 pub async fn recv_raw(&mut self) -> Result<Message, StreamError> {
1064 match &mut self.b2a_msg_recv_r {
1065 Some(b2a_msg_recv_r) => {
1066 match b2a_msg_recv_r.recv().await {
1067 Ok(data) => Ok(Message {
1068 data,
1069 #[cfg(feature = "compression")]
1070 compressed: self.promises.contains(Promises::COMPRESSED),
1071 }),
1072 Err(_) => {
1073 self.b2a_msg_recv_r = None; //prevent panic
1074 Err(StreamError::StreamClosed)
1075 },
1076 }
1077 },
1078 None => Err(StreamError::StreamClosed),
1079 }
1080 }
1081
1082 /// use `try_recv` to check for a Message send from the remote side by their
1083 /// `Stream`. This function does not block and returns immediately. It's
1084 /// intended for use in non-async context only. Other then that, the
1085 /// same rules apply than for [`recv`].
1086 ///
1087 /// # Example
1088 /// ```
1089 /// # use veloren_network::Promises;
1090 /// use tokio::runtime::Runtime;
1091 /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
1092 ///
1093 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1094 /// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
1095 /// let runtime = Runtime::new().unwrap();
1096 /// let mut network = Network::new(Pid::new(), &runtime);
1097 /// # let remote = Network::new(Pid::new(), &runtime);
1098 /// runtime.block_on(async {
1099 /// network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
1100 /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
1101 /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
1102 /// # stream_p.send("Hello World");
1103 /// # std::thread::sleep(std::time::Duration::from_secs(1));
1104 /// let mut participant_a = network.connected().await?;
1105 /// let mut stream_a = participant_a.opened().await?;
1106 /// //Try Recv Message
1107 /// println!("{:?}", stream_a.try_recv::<String>()?);
1108 /// drop(network);
1109 /// # drop(remote);
1110 /// # Ok(())
1111 /// })
1112 /// # }
1113 /// ```
1114 ///
1115 /// [`recv`]: Stream::recv
1116 #[inline]
1117 pub fn try_recv<M: DeserializeOwned>(&mut self) -> Result<Option<M>, StreamError> {
1118 match &mut self.b2a_msg_recv_r {
1119 Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() {
1120 Ok(data) => Ok(Some(
1121 Message {
1122 data,
1123 #[cfg(feature = "compression")]
1124 compressed: self.promises.contains(Promises::COMPRESSED),
1125 }
1126 .deserialize()?,
1127 )),
1128 Err(async_channel::TryRecvError::Empty) => Ok(None),
1129 Err(async_channel::TryRecvError::Closed) => {
1130 self.b2a_msg_recv_r = None; //prevent panic
1131 Err(StreamError::StreamClosed)
1132 },
1133 },
1134 None => Err(StreamError::StreamClosed),
1135 }
1136 }
1137
1138 pub fn params(&self) -> StreamParams {
1139 StreamParams {
1140 promises: self.promises,
1141 }
1142 }
1143}
1144
1145impl PartialEq for Participant {
1146 fn eq(&self, other: &Self) -> bool {
1147 //don't check local_pid, 2 Participant from different network should match if
1148 // they are the "same"
1149 self.remote_pid == other.remote_pid
1150 }
1151}
1152
1153fn actively_wait<T, F>(name: &'static str, mut finished_receiver: oneshot::Receiver<T>, f: F)
1154where
1155 F: FnOnce(T) + Send + 'static,
1156 T: Send + 'static,
1157{
1158 const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding";
1159
1160 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1161 // When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as
1162 // other is queued behind). And we CANNOT join our Future_Handle
1163 trace!("async context detected, defer shutdown");
1164 handle.spawn(async move {
1165 match finished_receiver.await {
1166 Ok(data) => f(data),
1167 Err(e) => error!("{}{}: {}", name, CHANNEL_ERR, e),
1168 }
1169 });
1170 } else {
1171 let mut cnt = 0;
1172 loop {
1173 use tokio::sync::oneshot::error::TryRecvError;
1174 match finished_receiver.try_recv() {
1175 Ok(data) => {
1176 f(data);
1177 break;
1178 },
1179 Err(TryRecvError::Closed) => panic!("{}{}", name, CHANNEL_ERR),
1180 Err(TryRecvError::Empty) => {
1181 trace!("actively sleeping");
1182 cnt += 1;
1183 if cnt > 10 {
1184 error!("Timeout waiting for shutdown, dropping");
1185 break;
1186 }
1187 std::thread::sleep(Duration::from_millis(100) * cnt);
1188 },
1189 }
1190 }
1191 };
1192}
1193
1194impl Drop for Network {
1195 #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
1196 fn drop(&mut self) {
1197 trace!("Dropping Network");
1198 let (finished_sender, finished_receiver) = oneshot::channel();
1199 match self
1200 .shutdown_network_s
1201 .take()
1202 .unwrap()
1203 .send(finished_sender)
1204 {
1205 Err(e) => warn!(?e, "Runtime seems to be dropped already"),
1206 Ok(()) => actively_wait("network", finished_receiver, |()| {
1207 info!("Network dropped gracefully")
1208 }),
1209 };
1210 }
1211}
1212
1213impl Drop for Participant {
1214 #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
1215 #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
1216 fn drop(&mut self) {
1217 const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \
1218 outgoing messages, dropping remaining";
1219 const SCHEDULER_ERR: &str =
1220 "Something is wrong in internal scheduler coding or you dropped the runtime to early";
1221 // ignore closed, as we need to send it even though we disconnected the
1222 // participant from network
1223 debug!("Shutting down Participant");
1224
1225 match self.a2s_disconnect_s.try_lock() {
1226 Err(e) => debug!(?e, "Participant is being dropped by Network right now"),
1227 Ok(mut s) => match s.take() {
1228 None => info!("Participant already has been shutdown gracefully"),
1229 Some(a2s_disconnect_s) => {
1230 debug!("Disconnect from Scheduler");
1231 let (finished_sender, finished_receiver) = oneshot::channel();
1232 match a2s_disconnect_s
1233 .send((self.remote_pid, (Duration::from_secs(10), finished_sender)))
1234 {
1235 Err(e) => warn!(?e, SCHEDULER_ERR),
1236 Ok(()) => {
1237 actively_wait("participant", finished_receiver, |d| match d {
1238 Ok(()) => info!("Participant dropped gracefully"),
1239 Err(e) => error!(?e, SHUTDOWN_ERR),
1240 });
1241 },
1242 }
1243 },
1244 },
1245 }
1246 }
1247}
1248
1249impl Drop for Stream {
1250 #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
1251 #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
1252
1253 fn drop(&mut self) {
1254 // send if closed is unnecessary but doesn't hurt, we must not crash
1255 let sid = self.sid;
1256 if !self.send_closed.load(Ordering::Relaxed) {
1257 debug!(?sid, "Shutting down Stream");
1258 if let Err(e) = self.a2b_close_stream_s.take().unwrap().send(self.sid) {
1259 debug!(
1260 ?e,
1261 "bparticipant part of a gracefully shutdown was already closed"
1262 );
1263 }
1264 } else {
1265 trace!(?sid, "Stream Drop not needed");
1266 }
1267 }
1268}
1269
1270impl std::fmt::Debug for Participant {
1271 #[inline]
1272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1273 write!(
1274 f,
1275 "Participant {{ local_pid: {:?}, remote_pid: {:?} }}",
1276 &self.local_pid, &self.remote_pid,
1277 )
1278 }
1279}
1280
1281impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
1282 fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
1283}
1284
1285impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
1286 fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
1287}
1288
1289impl<T> From<mpsc::error::SendError<T>> for NetworkError {
1290 fn from(_err: mpsc::error::SendError<T>) -> Self { NetworkError::NetworkClosed }
1291}
1292
1293impl From<oneshot::error::RecvError> for NetworkError {
1294 fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed }
1295}
1296
1297impl From<io::Error> for NetworkError {
1298 fn from(_err: io::Error) -> Self { NetworkError::NetworkClosed }
1299}
1300
1301impl From<Box<bincode::ErrorKind>> for StreamError {
1302 fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::Deserialize(err) }
1303}
1304
1305impl core::fmt::Display for StreamError {
1306 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1307 match self {
1308 StreamError::StreamClosed => write!(f, "stream closed"),
1309 #[cfg(feature = "compression")]
1310 StreamError::Compression(err) => write!(f, "compression error on message: {}", err),
1311 StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err),
1312 }
1313 }
1314}
1315
1316impl core::fmt::Display for ParticipantError {
1317 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1318 match self {
1319 ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"),
1320 ParticipantError::ProtocolFailedUnrecoverable => {
1321 write!(f, "underlying protocol failed unrecoverable")
1322 },
1323 }
1324 }
1325}
1326
1327impl core::fmt::Display for NetworkError {
1328 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1329 match self {
1330 NetworkError::NetworkClosed => write!(f, "Network closed"),
1331 NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
1332 NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
1333 }
1334 }
1335}
1336
1337impl core::fmt::Display for NetworkConnectError {
1338 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1339 match self {
1340 NetworkConnectError::Io(e) => write!(f, "Io error: {}", e),
1341 NetworkConnectError::Handshake(e) => write!(f, "Handshake error: {}", e),
1342 NetworkConnectError::InvalidSecret => {
1343 write!(f, "You specified the wrong secret on your second channel")
1344 },
1345 }
1346 }
1347}
1348
1349/// implementing PartialEq as it's super convenient in tests
1350impl PartialEq for StreamError {
1351 fn eq(&self, other: &Self) -> bool {
1352 match self {
1353 StreamError::StreamClosed => match other {
1354 StreamError::StreamClosed => true,
1355 #[cfg(feature = "compression")]
1356 StreamError::Compression(_) => false,
1357 StreamError::Deserialize(_) => false,
1358 },
1359 #[cfg(feature = "compression")]
1360 StreamError::Compression(err) => match other {
1361 StreamError::StreamClosed => false,
1362 #[cfg(feature = "compression")]
1363 StreamError::Compression(other_err) => err == other_err,
1364 StreamError::Deserialize(_) => false,
1365 },
1366 StreamError::Deserialize(err) => match other {
1367 StreamError::StreamClosed => false,
1368 #[cfg(feature = "compression")]
1369 StreamError::Compression(_) => false,
1370 StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err),
1371 },
1372 }
1373 }
1374}
1375
1376impl std::error::Error for StreamError {}
1377impl std::error::Error for ParticipantError {}
1378impl std::error::Error for NetworkError {}
1379impl std::error::Error for NetworkConnectError {}