veloren_network/
api.rs

1use crate::{
2    channel::ProtocolsError,
3    message::{Message, partial_eq_bincode},
4    participant::{A2bStreamOpen, S2bShutdownBparticipant},
5    scheduler::{A2sConnect, Scheduler},
6};
7use bytes::Bytes;
8use hashbrown::HashMap;
9#[cfg(feature = "compression")]
10use lz_fear::raw::DecodeError;
11use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
12#[cfg(feature = "metrics")]
13use prometheus::Registry;
14use serde::{Serialize, de::DeserializeOwned};
15use std::{
16    net::SocketAddr,
17    sync::{
18        Arc,
19        atomic::{AtomicBool, Ordering},
20    },
21    time::Duration,
22};
23use tokio::{
24    io,
25    runtime::Runtime,
26    sync::{Mutex, mpsc, oneshot, watch},
27};
28use tracing::*;
29
30type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
31
32/// Represents a Tcp, Quic, Udp or Mpsc connection address
33#[derive(Clone, Debug)]
34pub enum ConnectAddr {
35    Tcp(SocketAddr),
36    Udp(SocketAddr),
37    #[cfg(feature = "quic")]
38    Quic(SocketAddr, quinn::ClientConfig, String),
39    Mpsc(u64),
40}
41
42impl ConnectAddr {
43    /// Returns the `Some` if the protocol is TCP or QUIC and `None` if the
44    /// protocol is a local channel (mpsc).
45    pub fn socket_addr(&self) -> Option<SocketAddr> {
46        match self {
47            Self::Tcp(addr) => Some(*addr),
48            Self::Udp(addr) => Some(*addr),
49            Self::Mpsc(_) => None,
50            #[cfg(feature = "quic")]
51            Self::Quic(addr, _, _) => Some(*addr),
52        }
53    }
54}
55
56/// Represents a Tcp, Quic, Udp or Mpsc listen address
57#[derive(Clone, Debug)]
58pub enum ListenAddr {
59    Tcp(SocketAddr),
60    Udp(SocketAddr),
61    #[cfg(feature = "quic")]
62    Quic(SocketAddr, quinn::ServerConfig),
63    Mpsc(u64),
64}
65
66/// A Participant can throw different events, you are obligated to carefully
67/// empty the queue from time to time.
68#[derive(Clone, Debug)]
69pub enum ParticipantEvent {
70    ChannelCreated(ConnectAddr),
71    ChannelDeleted(ConnectAddr),
72}
73
74/// `Participants` are generated by the [`Network`] and represent a connection
75/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
76/// [`Networks`] on how to generate `Participants`
77///
78/// [`Networks`]: crate::api::Network
79/// [`connect`]: Network::connect
80/// [`connected`]: Network::connected
81pub struct Participant {
82    local_pid: Pid,
83    remote_pid: Pid,
84    a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
85    b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
86    b2a_event_r: mpsc::UnboundedReceiver<ParticipantEvent>,
87    b2a_bandwidth_stats_r: watch::Receiver<f32>,
88    a2s_disconnect_s: A2sDisconnect,
89}
90
91/// `Streams` represents a channel to send `n` messages with a certain priority
92/// and [`Promises`]. messages need always to be send between 2 `Streams`.
93///
94/// `Streams` are generated by the [`Participant`].
95/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
96/// generate `Streams`
97///
98/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
99/// mutability, as multiple threads don't need access to the same `Stream`.
100///
101/// [`Networks`]: crate::api::Network
102/// [`open`]: Participant::open
103/// [`opened`]: Participant::opened
104#[derive(Debug)]
105pub struct Stream {
106    local_pid: Pid,
107    remote_pid: Pid,
108    sid: Sid,
109    #[expect(dead_code)]
110    prio: Prio,
111    promises: Promises,
112    #[expect(dead_code)]
113    guaranteed_bandwidth: Bandwidth,
114    send_closed: Arc<AtomicBool>,
115    a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
116    b2a_msg_recv_r: Option<async_channel::Receiver<Bytes>>,
117    a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
118}
119
120/// Error type thrown by [`Networks`](Network) methods
121#[derive(Debug)]
122pub enum NetworkError {
123    NetworkClosed,
124    ListenFailed(io::Error),
125    ConnectFailed(NetworkConnectError),
126}
127
128/// Error type thrown by [`Networks`](Network) connect
129#[derive(Debug)]
130pub enum NetworkConnectError {
131    /// Either a Pid UUID clash or you are trying to hijack a connection
132    InvalidSecret,
133    Handshake(InitProtocolError<ProtocolsError>),
134    Io(io::Error),
135}
136
137/// Error type thrown by [`Participants`](Participant) methods
138#[derive(Debug, PartialEq, Eq, Clone)]
139pub enum ParticipantError {
140    ///Participant was closed by remote side
141    ParticipantDisconnected,
142    ///Underlying Protocol failed and wasn't able to recover, expect some Data
143    /// loss unfortunately, there is no method to get the exact messages
144    /// that failed. This is also returned when local side tries to do
145    /// something while remote site gracefully disconnects
146    ProtocolFailedUnrecoverable,
147}
148
149/// Error type thrown by [`Streams`](Stream) methods
150/// A Compression Error should only happen if a client sends malicious code.
151/// A Deserialize Error probably means you are expecting Type X while you
152/// actually got send type Y.
153#[derive(Debug)]
154pub enum StreamError {
155    StreamClosed,
156    #[cfg(feature = "compression")]
157    Compression(DecodeError),
158    Deserialize(bincode::Error),
159}
160
161/// All Parameters of a Stream, can be used to generate RawMessages
162#[derive(Debug, Clone)]
163pub struct StreamParams {
164    pub(crate) promises: Promises,
165}
166
167/// Use the `Network` to create connections to other [`Participants`]
168///
169/// The `Network` is the single source that handles all connections in your
170/// Application. You can pass it around multiple threads in an
171/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
172///
173/// The `Network` has methods to [`connect`] to other [`Participants`] actively
174/// via their [`ConnectAddr`], or [`listen`] passively for [`connected`]
175/// [`Participants`] via [`ListenAddr`].
176///
177/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be dropped before
178/// the Network.
179///
180/// # Examples
181/// ```rust
182/// use tokio::runtime::Runtime;
183/// use veloren_network::{Network, ConnectAddr, ListenAddr, Pid};
184///
185/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
186/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
187/// let runtime = Runtime::new().unwrap();
188/// let mut network = Network::new(Pid::new(), &runtime);
189/// runtime.block_on(async{
190///     # //setup pseudo database!
191///     # let database = Network::new(Pid::new(), &runtime);
192///     # database.listen(ListenAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
193///     network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
194///     let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
195///     drop(network);
196///     # drop(database);
197///     # Ok(())
198/// })
199/// # }
200/// ```
201///
202/// [`Participants`]: crate::api::Participant
203/// [`Runtime`]: tokio::runtime::Runtime
204/// [`connect`]: Network::connect
205/// [`listen`]: Network::listen
206/// [`connected`]: Network::connected
207/// [`ConnectAddr`]: crate::api::ConnectAddr
208/// [`ListenAddr`]: crate::api::ListenAddr
209pub struct Network {
210    local_pid: Pid,
211    participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
212    listen_sender: mpsc::UnboundedSender<(ListenAddr, oneshot::Sender<io::Result<()>>)>,
213    connect_sender: mpsc::UnboundedSender<A2sConnect>,
214    connected_receiver: mpsc::UnboundedReceiver<Participant>,
215    shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
216}
217
218impl Network {
219    /// Generates a new `Network` to handle all connections in an Application
220    ///
221    /// # Arguments
222    /// * `participant_id` - provide it by calling [`Pid::new()`], usually you
223    ///   don't want to reuse a Pid for 2 `Networks`
224    /// * `runtime` - provide a [`Runtime`], it's used to internally spawn
225    ///   tasks. It is necessary to clean up in the non-async `Drop`. **All**
226    ///   network related components **must** be dropped before the runtime is
227    ///   stopped. dropping the runtime while a shutdown is still in progress
228    ///   leaves the network in a bad state which might cause a panic!
229    ///
230    /// # Result
231    /// * `Self` - returns a `Network` which can be `Send` to multiple areas of
232    ///   your code, including multiple threads. This is the base strct of this
233    ///   crate.
234    ///
235    /// # Examples
236    /// ```rust
237    /// use tokio::runtime::Runtime;
238    /// use veloren_network::{Network, Pid};
239    ///
240    /// let runtime = Runtime::new().unwrap();
241    /// let network = Network::new(Pid::new(), &runtime);
242    /// ```
243    ///
244    /// Usually you only create a single `Network` for an application,
245    /// except when client and server are in the same application, then you
246    /// will want 2. However there are no technical limitations from
247    /// creating more.
248    ///
249    /// [`Pid::new()`]: network_protocol::Pid::new
250    /// [`Runtime`]: tokio::runtime::Runtime
251    pub fn new(participant_id: Pid, runtime: &Runtime) -> Self {
252        Self::internal_new(
253            participant_id,
254            runtime,
255            #[cfg(feature = "metrics")]
256            None,
257        )
258    }
259
260    /// See [`new`]
261    ///
262    /// # additional Arguments
263    /// * `registry` - Provide a Registry in order to collect Prometheus metrics
264    ///   by this `Network`, `None` will deactivate Tracing. Tracing is done via
265    ///   [`prometheus`]
266    ///
267    /// # Examples
268    /// ```rust
269    /// use prometheus::Registry;
270    /// use tokio::runtime::Runtime;
271    /// use veloren_network::{Network, Pid};
272    ///
273    /// let runtime = Runtime::new().unwrap();
274    /// let registry = Registry::new();
275    /// let network = Network::new_with_registry(Pid::new(), &runtime, &registry);
276    /// ```
277    /// [`new`]: crate::api::Network::new
278    #[cfg(feature = "metrics")]
279    pub fn new_with_registry(participant_id: Pid, runtime: &Runtime, registry: &Registry) -> Self {
280        Self::internal_new(participant_id, runtime, Some(registry))
281    }
282
283    fn internal_new(
284        participant_id: Pid,
285        runtime: &Runtime,
286        #[cfg(feature = "metrics")] registry: Option<&Registry>,
287    ) -> Self {
288        let p = participant_id;
289        let span = info_span!("network", ?p);
290        span.in_scope(|| trace!("Starting Network"));
291        let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
292            Scheduler::new(
293                participant_id,
294                #[cfg(feature = "metrics")]
295                registry,
296            );
297        let participant_disconnect_sender = Arc::new(Mutex::new(HashMap::new()));
298        let (shutdown_network_s, shutdown_network_r) = oneshot::channel();
299        let f = Self::shutdown_mgr(
300            p,
301            shutdown_network_r,
302            Arc::clone(&participant_disconnect_sender),
303            shutdown_sender,
304        );
305        runtime.spawn(f);
306        runtime.spawn(
307            async move {
308                trace!("Starting scheduler in own thread");
309                scheduler.run().await;
310                trace!("Stopping scheduler and his own thread");
311            }
312            .instrument(info_span!("network", ?p)),
313        );
314        Self {
315            local_pid: participant_id,
316            participant_disconnect_sender,
317            listen_sender,
318            connect_sender,
319            connected_receiver,
320            shutdown_network_s: Some(shutdown_network_s),
321        }
322    }
323
324    /// starts listening on an [`ListenAddr`].
325    /// When the method returns the `Network` is ready to listen for incoming
326    /// connections OR has returned a [`NetworkError`] (e.g. port already used).
327    /// You can call [`connected`] to asynchrony wait for a [`Participant`] to
328    /// connect. You can call `listen` on multiple addresses, e.g. to
329    /// support multiple Protocols or NICs.
330    ///
331    /// # Examples
332    /// ```ignore
333    /// use tokio::runtime::Runtime;
334    /// use veloren_network::{Network, Pid, ListenAddr};
335    ///
336    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
337    /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
338    /// let runtime = Runtime::new().unwrap();
339    /// let mut network = Network::new(Pid::new(), &runtime);
340    /// runtime.block_on(async {
341    ///     network
342    ///         .listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
343    ///         .await?;
344    ///     network
345    ///         .listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
346    ///         .await?;
347    ///     drop(network);
348    ///     # Ok(())
349    /// })
350    /// # }
351    /// ```
352    ///
353    /// [`connected`]: Network::connected
354    /// [`ListenAddr`]: crate::api::ListenAddr
355    #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
356    pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> {
357        let (s2a_result_s, s2a_result_r) = oneshot::channel::<io::Result<()>>();
358        debug!(?address, "listening on address");
359        self.listen_sender.send((address, s2a_result_s))?;
360        match s2a_result_r.await? {
361            //waiting guarantees that we either listened successfully or get an error like port in
362            // use
363            Ok(()) => Ok(()),
364            Err(e) => Err(NetworkError::ListenFailed(e)),
365        }
366    }
367
368    /// starts connection to an [`ConnectAddr`].
369    /// When the method returns the Network either returns a [`Participant`]
370    /// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
371    /// can't connect, or invalid Handshake) # Examples
372    /// ```ignore
373    /// use tokio::runtime::Runtime;
374    /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
375    ///
376    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
377    /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
378    /// let runtime = Runtime::new().unwrap();
379    /// let network = Network::new(Pid::new(), &runtime);
380    /// # let remote = Network::new(Pid::new(), &runtime);
381    /// runtime.block_on(async {
382    ///     # remote.listen(ListenAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
383    ///     # remote.listen(ListenAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
384    ///     let p1 = network
385    ///         .connect(ConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
386    ///         .await?;
387    ///     # //this doesn't work yet, so skip the test
388    ///     # //TODO fixme!
389    ///     # return Ok(());
390    ///     let p2 = network
391    ///         .connect(ConnectAddr::Udp("127.0.0.1:2011".parse().unwrap()))
392    ///         .await?;
393    ///     assert_eq!(&p1, &p2);
394    ///     # Ok(())
395    /// })?;
396    /// drop(network);
397    /// # drop(remote);
398    /// # Ok(())
399    /// # }
400    /// ```
401    /// Usually the `Network` guarantees that a operation on a [`Participant`]
402    /// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
403    /// disconnecting from the remote. If 2 [`ConnectAddr] you
404    /// `connect` to belongs to the same [`Participant`], you get the same
405    /// [`Participant`] as a result. This is useful e.g. by connecting to
406    /// the same [`Participant`] via multiple Protocols.
407    ///
408    /// [`Streams`]: crate::api::Stream
409    /// [`ConnectAddr`]: crate::api::ConnectAddr
410    #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
411    pub async fn connect(&self, address: ConnectAddr) -> Result<Participant, NetworkError> {
412        let (pid_sender, pid_receiver) =
413            oneshot::channel::<Result<Participant, NetworkConnectError>>();
414        debug!(?address, "Connect to address");
415        self.connect_sender.send((address, pid_sender))?;
416        let participant = match pid_receiver.await? {
417            Ok(p) => p,
418            Err(e) => return Err(NetworkError::ConnectFailed(e)),
419        };
420        let remote_pid = participant.remote_pid;
421        trace!(?remote_pid, "connected");
422        self.participant_disconnect_sender
423            .lock()
424            .await
425            .insert(remote_pid, Arc::clone(&participant.a2s_disconnect_s));
426        Ok(participant)
427    }
428
429    /// Returns a [`Participant`] created from a [`ListenAddr`] you
430    /// called [`listen`] on before. This function will either return a
431    /// working [`Participant`] ready to open [`Streams`] on OR has returned
432    /// a [`NetworkError`] (e.g. Network got closed)
433    ///
434    /// # Examples
435    /// ```rust
436    /// use tokio::runtime::Runtime;
437    /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid};
438    ///
439    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
440    /// // Create a Network, listen on port `2020` TCP and opens returns their Pid
441    /// let runtime = Runtime::new().unwrap();
442    /// let mut network = Network::new(Pid::new(), &runtime);
443    /// # let remote = Network::new(Pid::new(), &runtime);
444    /// runtime.block_on(async {
445    ///     network
446    ///         .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
447    ///         .await?;
448    ///     # remote.connect(ConnectAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
449    ///     while let Ok(participant) = network.connected().await {
450    ///         println!("Participant connected: {}", participant.remote_pid());
451    ///         # //skip test here as it would be a endless loop
452    ///         # break;
453    ///     }
454    ///     drop(network);
455    ///     # drop(remote);
456    ///     # Ok(())
457    /// })
458    /// # }
459    /// ```
460    ///
461    /// [`Streams`]: crate::api::Stream
462    /// [`listen`]: crate::api::Network::listen
463    /// [`ListenAddr`]: crate::api::ListenAddr
464    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
465    pub async fn connected(&mut self) -> Result<Participant, NetworkError> {
466        let participant = self
467            .connected_receiver
468            .recv()
469            .await
470            .ok_or(NetworkError::NetworkClosed)?;
471        self.participant_disconnect_sender.lock().await.insert(
472            participant.remote_pid,
473            Arc::clone(&participant.a2s_disconnect_s),
474        );
475        Ok(participant)
476    }
477
478    /// Use a mgr to handle shutdown smoothly and not in `Drop`
479    #[instrument(name="network", skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid))]
480    async fn shutdown_mgr(
481        local_pid: Pid,
482        shutdown_network_r: oneshot::Receiver<oneshot::Sender<()>>,
483        participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
484        shutdown_scheduler_s: oneshot::Sender<()>,
485    ) {
486        trace!("waiting for shutdown triggerNetwork");
487        let return_s = shutdown_network_r.await;
488        trace!("Shutting down Participants of Network");
489        let mut finished_receiver_list = vec![];
490
491        for (remote_pid, a2s_disconnect_s) in participant_disconnect_sender.lock().await.drain() {
492            match a2s_disconnect_s.lock().await.take() {
493                Some(a2s_disconnect_s) => {
494                    trace!(?remote_pid, "Participants will be closed");
495                    let (finished_sender, finished_receiver) = oneshot::channel();
496                    finished_receiver_list.push((remote_pid, finished_receiver));
497                    // If the channel was already dropped, we can assume that the other side
498                    // already released its resources.
499                    let _ = a2s_disconnect_s
500                        .send((remote_pid, (Duration::from_secs(10), finished_sender)));
501                },
502                None => trace!(?remote_pid, "Participant already disconnected gracefully"),
503            }
504        }
505        //wait after close is requested for all
506        for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
507            match finished_receiver.await {
508                Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
509                Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
510                Err(e) => warn!(
511                    ?remote_pid,
512                    ?e,
513                    "Failed to get a message back from the scheduler, seems like the network is \
514                     already closed"
515                ),
516            }
517        }
518
519        trace!("Participants have shut down - next: Scheduler");
520        if let Err(()) = shutdown_scheduler_s.send(()) {
521            error!("Scheduler is closed, but nobody other should be able to close it")
522        };
523        if let Ok(return_s) = return_s {
524            if return_s.send(()).is_err() {
525                warn!("Network::drop stopped after a timeout and didn't wait for our shutdown");
526            };
527        }
528        debug!("Network has shut down");
529    }
530}
531
532impl Participant {
533    pub(crate) fn new(
534        local_pid: Pid,
535        remote_pid: Pid,
536        a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
537        b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
538        b2a_event_r: mpsc::UnboundedReceiver<ParticipantEvent>,
539        b2a_bandwidth_stats_r: watch::Receiver<f32>,
540        a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
541    ) -> Self {
542        Self {
543            local_pid,
544            remote_pid,
545            a2b_open_stream_s,
546            b2a_stream_opened_r,
547            b2a_event_r,
548            b2a_bandwidth_stats_r,
549            a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
550        }
551    }
552
553    /// Opens a [`Stream`] on this `Participant` with a certain Priority and
554    /// [`Promises`]
555    ///
556    /// # Arguments
557    /// * `prio` - defines which stream is processed first when limited on
558    ///   bandwidth. See [`Prio`] for documentation.
559    /// * `promises` - use a combination of you preferred [`Promises`], see the
560    ///   link for further documentation. You can combine them, e.g.
561    ///   `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
562    ///   guarantee that those promises are met.
563    /// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this
564    ///   stream. When excess bandwidth is available it will be used. See
565    ///   [`Bandwidth`] for details.
566    ///
567    /// A [`ParticipantError`] might be thrown if the `Participant` is already
568    /// closed. [`Streams`] can be created without a answer from the remote
569    /// side, resulting in very fast creation and closing latency.
570    ///
571    /// # Examples
572    /// ```rust
573    /// use tokio::runtime::Runtime;
574    /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
575    ///
576    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
577    /// // Create a Network, connect on port 2100 and open a stream
578    /// let runtime = Runtime::new().unwrap();
579    /// let network = Network::new(Pid::new(), &runtime);
580    /// # let remote = Network::new(Pid::new(), &runtime);
581    /// runtime.block_on(async {
582    ///     # remote.listen(ListenAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
583    ///     let p1 = network
584    ///         .connect(ConnectAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
585    ///         .await?;
586    ///     let _s1 = p1
587    ///         .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
588    ///         .await?;
589    ///     drop(network);
590    ///     # drop(remote);
591    ///     # Ok(())
592    /// })
593    /// # }
594    /// ```
595    ///
596    /// [`Prio`]: network_protocol::Prio
597    /// [`Bandwidth`]: network_protocol::Bandwidth
598    /// [`Promises`]: network_protocol::Promises
599    /// [`Streams`]: crate::api::Stream
600    #[instrument(name="network", skip(self, prio, promises, bandwidth), fields(p = %self.local_pid))]
601    pub async fn open(
602        &self,
603        prio: u8,
604        promises: Promises,
605        bandwidth: Bandwidth,
606    ) -> Result<Stream, ParticipantError> {
607        debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio");
608        let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
609        if let Err(e) =
610            self.a2b_open_stream_s
611                .send((prio, promises, bandwidth, p2a_return_stream_s))
612        {
613            debug!(?e, "bParticipant is already closed, notifying");
614            return Err(ParticipantError::ParticipantDisconnected);
615        }
616        match p2a_return_stream_r.await {
617            Ok(stream) => {
618                let sid = stream.sid;
619                trace!(?sid, "opened stream");
620                Ok(stream)
621            },
622            Err(_) => {
623                debug!("p2a_return_stream_r failed, closing participant");
624                Err(ParticipantError::ParticipantDisconnected)
625            },
626        }
627    }
628
629    /// Use this method to handle [`Streams`] opened from remote site, like the
630    /// [`connected`] method of [`Network`]. This is the associated method
631    /// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
632    /// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
633    /// the other side. A [`ParticipantError`] might be thrown if the
634    /// `Participant` is already closed.
635    ///
636    /// # Examples
637    /// ```rust
638    /// use tokio::runtime::Runtime;
639    /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises};
640    ///
641    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
642    /// // Create a Network, connect on port 2110 and wait for the other side to open a stream
643    /// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
644    /// let runtime = Runtime::new().unwrap();
645    /// let mut network = Network::new(Pid::new(), &runtime);
646    /// # let mut remote = Network::new(Pid::new(), &runtime);
647    /// runtime.block_on(async {
648    ///     # remote.listen(ListenAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
649    ///     let mut p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
650    ///     # let p2 = remote.connected().await?;
651    ///     # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
652    ///     let _s1 = p1.opened().await?;
653    ///     drop(network);
654    ///     # drop(remote);
655    ///     # Ok(())
656    /// })
657    /// # }
658    /// ```
659    ///
660    /// [`Streams`]: crate::api::Stream
661    /// [`connected`]: Network::connected
662    /// [`open`]: Participant::open
663    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
664    pub async fn opened(&mut self) -> Result<Stream, ParticipantError> {
665        match self.b2a_stream_opened_r.recv().await {
666            Some(stream) => {
667                let sid = stream.sid;
668                debug!(?sid, "Receive opened stream");
669                Ok(stream)
670            },
671            None => {
672                debug!("stream_opened_receiver failed, closing participant");
673                Err(ParticipantError::ParticipantDisconnected)
674            },
675        }
676    }
677
678    /// disconnecting a `Participant` in a async way.
679    /// Use this rather than `Participant::Drop` if you want to close multiple
680    /// `Participants`.
681    ///
682    /// This function will wait for all [`Streams`] to properly close, including
683    /// all messages to be send before closing. If an error occurs with one
684    /// of the messages.
685    /// Except if the remote side already dropped the `Participant`
686    /// simultaneously, then messages won't be send
687    ///
688    /// There is NO `disconnected` function in `Participant`, if a `Participant`
689    /// is no longer reachable (e.g. as the network cable was unplugged) the
690    /// `Participant` will fail all action, but needs to be manually
691    /// disconnected, using this function.
692    ///
693    /// # Examples
694    /// ```rust
695    /// use tokio::runtime::Runtime;
696    /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
697    ///
698    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
699    /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
700    /// let runtime = Runtime::new().unwrap();
701    /// let mut network = Network::new(Pid::new(), &runtime);
702    /// # let mut remote = Network::new(Pid::new(), &runtime);
703    /// let err = runtime.block_on(async {
704    ///     network
705    ///         .listen(ListenAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
706    ///         .await?;
707    ///     # let keep_alive = remote.connect(ConnectAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
708    ///     while let Ok(participant) = network.connected().await {
709    ///         println!("Participant connected: {}", participant.remote_pid());
710    ///         participant.disconnect().await?;
711    ///         # //skip test here as it would be a endless loop
712    ///         # break;
713    ///     }
714    ///     # Ok(())
715    /// });
716    /// drop(network);
717    /// # drop(remote);
718    /// # err
719    /// # }
720    /// ```
721    ///
722    /// [`Streams`]: crate::api::Stream
723    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
724    pub async fn disconnect(self) -> Result<(), ParticipantError> {
725        // Remove, Close and try_unwrap error when unwrap fails!
726        debug!("Closing participant from network");
727
728        //Streams will be closed by BParticipant
729        match self.a2s_disconnect_s.lock().await.take() {
730            Some(a2s_disconnect_s) => {
731                let (finished_sender, finished_receiver) = oneshot::channel();
732                // Participant is connecting to Scheduler here, not as usual
733                // Participant<->BParticipant
734
735                // If this is already dropped, we can assume the other side already freed its
736                // resources.
737                let _ = a2s_disconnect_s
738                    .send((self.remote_pid, (Duration::from_secs(120), finished_sender)));
739                match finished_receiver.await {
740                    Ok(res) => {
741                        match res {
742                            Ok(()) => trace!("Participant is now closed"),
743                            Err(ref e) => {
744                                trace!(?e, "Error occurred during shutdown of participant")
745                            },
746                        };
747                        res
748                    },
749                    Err(e) => {
750                        //this is a bug. but as i am Participant i can't destroy the network
751                        error!(
752                            ?e,
753                            "Failed to get a message back from the scheduler, seems like the \
754                             network is already closed"
755                        );
756                        Err(ParticipantError::ProtocolFailedUnrecoverable)
757                    },
758                }
759            },
760            None => {
761                warn!(
762                    "seems like you are trying to disconnecting a participant after the network \
763                     was already dropped. It was already dropped with the network!"
764                );
765                Err(ParticipantError::ParticipantDisconnected)
766            },
767        }
768    }
769
770    /// Use this method to query [`ParticipantEvent`]. Those are internal events
771    /// from the network crate that will get reported to the frontend.
772    /// E.g. Creation and Deletion of Channels.
773    ///
774    /// Make sure to call this function from time to time to not let events
775    /// stack up endlessly and create a memory leak.
776    ///
777    /// # Examples
778    /// ```rust
779    /// use tokio::runtime::Runtime;
780    /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises, ParticipantEvent};
781    ///
782    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
783    /// // Create a Network, connect on port 2040 and wait for the other side to open a stream
784    /// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
785    /// let runtime = Runtime::new().unwrap();
786    /// let mut network = Network::new(Pid::new(), &runtime);
787    /// # let mut remote = Network::new(Pid::new(), &runtime);
788    /// runtime.block_on(async {
789    ///     # remote.listen(ListenAddr::Tcp("127.0.0.1:2040".parse().unwrap())).await?;
790    ///     let mut p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2040".parse().unwrap())).await?;
791    ///     # let p2 = remote.connected().await?;
792    ///     let event = p1.fetch_event().await?;
793    ///     drop(network);
794    ///     # drop(remote);
795    ///     # Ok(())
796    /// })
797    /// # }
798    /// ```
799    ///
800    /// [`ParticipantEvent`]: crate::api::ParticipantEvent
801    pub async fn fetch_event(&mut self) -> Result<ParticipantEvent, ParticipantError> {
802        match self.b2a_event_r.recv().await {
803            Some(event) => Ok(event),
804            None => {
805                debug!("event_receiver failed, closing participant");
806                Err(ParticipantError::ParticipantDisconnected)
807            },
808        }
809    }
810
811    /// use `try_fetch_event` to check for a [`ParticipantEvent`] . This
812    /// function does not block and returns immediately. It's intended for
813    /// use in non-async context only. Other then that, the same rules apply
814    /// than for [`fetch_event`].
815    ///
816    /// [`ParticipantEvent`]: crate::api::ParticipantEvent
817    /// [`fetch_event`]: Participant::fetch_event
818    pub fn try_fetch_event(&mut self) -> Result<Option<ParticipantEvent>, ParticipantError> {
819        match self.b2a_event_r.try_recv() {
820            Ok(event) => Ok(Some(event)),
821            Err(mpsc::error::TryRecvError::Empty) => Ok(None),
822            Err(mpsc::error::TryRecvError::Disconnected) => {
823                Err(ParticipantError::ParticipantDisconnected)
824            },
825        }
826    }
827
828    /// Returns the current approximation on the maximum bandwidth available.
829    /// This WILL fluctuate based on the amount/size of send messages.
830    pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }
831
832    /// Returns the remote [`Pid`](network_protocol::Pid)
833    pub fn remote_pid(&self) -> Pid { self.remote_pid }
834}
835
836impl Stream {
837    pub(crate) fn new(
838        local_pid: Pid,
839        remote_pid: Pid,
840        sid: Sid,
841        prio: Prio,
842        promises: Promises,
843        guaranteed_bandwidth: Bandwidth,
844        send_closed: Arc<AtomicBool>,
845        a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
846        b2a_msg_recv_r: async_channel::Receiver<Bytes>,
847        a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
848    ) -> Self {
849        Self {
850            local_pid,
851            remote_pid,
852            sid,
853            prio,
854            promises,
855            guaranteed_bandwidth,
856            send_closed,
857            a2b_msg_s,
858            b2a_msg_recv_r: Some(b2a_msg_recv_r),
859            a2b_close_stream_s: Some(a2b_close_stream_s),
860        }
861    }
862
863    /// use to send a arbitrary message to the remote side, by having the remote
864    /// side also opened a `Stream` linked to this. the message will be
865    /// [`Serialized`], which actually is quite slow compared to most other
866    /// calculations done. A faster method [`send_raw`] exists, when extra
867    /// speed is needed. The other side needs to use the respective [`recv`]
868    /// function and know the type send.
869    ///
870    /// `send` is an exception to the `async` messages, as it's probably called
871    /// quite often so it doesn't wait for execution. Which also means, that
872    /// no feedback is provided. It's to assume that the Message got `send`
873    /// correctly. If a error occurred, the next call will return an Error.
874    /// If the [`Participant`] disconnected it will also be unable to be used
875    /// any more. A [`StreamError`] will be returned in the error case, e.g.
876    /// when the `Stream` got closed already.
877    ///
878    /// Note when a `Stream` is dropped locally, it will still send all
879    /// messages, though the `drop` will return immediately, however, when a
880    /// [`Participant`] gets gracefully shut down, all remaining messages
881    /// will be send. If the `Stream` is dropped from remote side no further
882    /// messages are send, because the remote side has no way of listening
883    /// to them either way. If the last channel is destroyed (e.g. by losing
884    /// the internet connection or non-graceful shutdown, pending messages
885    /// are also dropped.
886    ///
887    /// # Example
888    /// ```
889    /// # use veloren_network::Promises;
890    /// use tokio::runtime::Runtime;
891    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
892    ///
893    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
894    /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
895    /// let runtime = Runtime::new().unwrap();
896    /// let mut network = Network::new(Pid::new(), &runtime);
897    /// # let remote = Network::new(Pid::new(), &runtime);
898    /// runtime.block_on(async {
899    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
900    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
901    ///     # // keep it alive
902    ///     # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
903    ///     let mut participant_a = network.connected().await?;
904    ///     let mut stream_a = participant_a.opened().await?;
905    ///     //Send  Message
906    ///     stream_a.send("Hello World")?;
907    ///     drop(network);
908    ///     # drop(remote);
909    ///     # Ok(())
910    /// })
911    /// # }
912    /// ```
913    ///
914    /// [`send_raw`]: Stream::send_raw
915    /// [`recv`]: Stream::recv
916    /// [`Serialized`]: Serialize
917    #[inline]
918    pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
919        self.send_raw_move(Message::serialize(&msg, self.params()))
920    }
921
922    /// This methods give the option to skip multiple calls of [`bincode`] and
923    /// [`compress`], e.g. in case the same Message needs to send on
924    /// multiple `Streams` to multiple [`Participants`]. Other then that,
925    /// the same rules apply than for [`send`].
926    /// You need to create a Message via [`Message::serialize`].
927    ///
928    /// # Example
929    /// ```rust
930    /// # use veloren_network::Promises;
931    /// use tokio::runtime::Runtime;
932    /// use bincode;
933    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
934    ///
935    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
936    /// let runtime = Runtime::new().unwrap();
937    /// let mut network = Network::new(Pid::new(), &runtime);
938    /// # let remote1 = Network::new(Pid::new(), &runtime);
939    /// # let remote2 = Network::new(Pid::new(), &runtime);
940    /// runtime.block_on(async {
941    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
942    ///     # let remote1_p = remote1.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
943    ///     # let remote2_p = remote2.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
944    ///     # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
945    ///     # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
946    ///     # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
947    ///     let mut participant_a = network.connected().await?;
948    ///     let mut participant_b = network.connected().await?;
949    ///     let mut stream_a = participant_a.opened().await?;
950    ///     let mut stream_b = participant_b.opened().await?;
951    ///
952    ///     //Prepare Message and decode it
953    ///     let msg = Message::serialize("Hello World", stream_a.params());
954    ///     //Send same Message to multiple Streams
955    ///     stream_a.send_raw(&msg);
956    ///     stream_b.send_raw(&msg);
957    ///     drop(network);
958    ///     # drop(remote1);
959    ///     # drop(remote2);
960    ///     # Ok(())
961    /// })
962    /// # }
963    /// ```
964    ///
965    /// [`send`]: Stream::send
966    /// [`Participants`]: crate::api::Participant
967    /// [`compress`]: lz_fear::raw::compress2
968    /// [`Message::serialize`]: crate::message::Message::serialize
969    #[inline]
970    pub fn send_raw(&self, message: &Message) -> Result<(), StreamError> {
971        self.send_raw_move(Message {
972            data: message.data.clone(),
973            #[cfg(feature = "compression")]
974            compressed: message.compressed,
975        })
976    }
977
978    fn send_raw_move(&self, message: Message) -> Result<(), StreamError> {
979        if self.send_closed.load(Ordering::Relaxed) {
980            return Err(StreamError::StreamClosed);
981        }
982        #[cfg(debug_assertions)]
983        message.verify(self.params());
984        self.a2b_msg_s.send((self.sid, message.data))?;
985        Ok(())
986    }
987
988    /// use `recv` to wait on a Message send from the remote side by their
989    /// `Stream`. The Message needs to implement [`DeserializeOwned`] and
990    /// thus, the resulting type must already be known by the receiving side.
991    /// If this is not know from the Application logic, one could use a `Enum`
992    /// and then handle the received message via a `match` state.
993    ///
994    /// A [`StreamError`] will be returned in the error case, e.g. when the
995    /// `Stream` got closed already.
996    ///
997    /// # Example
998    /// ```
999    /// # use veloren_network::Promises;
1000    /// use tokio::runtime::Runtime;
1001    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
1002    ///
1003    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1004    /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
1005    /// let runtime = Runtime::new().unwrap();
1006    /// let mut network = Network::new(Pid::new(), &runtime);
1007    /// # let remote = Network::new(Pid::new(), &runtime);
1008    /// runtime.block_on(async {
1009    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
1010    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
1011    ///     # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
1012    ///     # stream_p.send("Hello World");
1013    ///     let mut participant_a = network.connected().await?;
1014    ///     let mut stream_a = participant_a.opened().await?;
1015    ///     //Recv  Message
1016    ///     println!("{}", stream_a.recv::<String>().await?);
1017    ///     drop(network);
1018    ///     # drop(remote);
1019    ///     # Ok(())
1020    /// })
1021    /// # }
1022    /// ```
1023    #[inline]
1024    pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
1025        self.recv_raw().await?.deserialize()
1026    }
1027
1028    /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or
1029    /// [`decompress`] is executed for performance reasons.
1030    ///
1031    /// # Example
1032    /// ```
1033    /// # use veloren_network::Promises;
1034    /// use tokio::runtime::Runtime;
1035    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
1036    ///
1037    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1038    /// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
1039    /// let runtime = Runtime::new().unwrap();
1040    /// let mut network = Network::new(Pid::new(), &runtime);
1041    /// # let remote = Network::new(Pid::new(), &runtime);
1042    /// runtime.block_on(async {
1043    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
1044    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
1045    ///     # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
1046    ///     # stream_p.send("Hello World");
1047    ///     let mut participant_a = network.connected().await?;
1048    ///     let mut stream_a = participant_a.opened().await?;
1049    ///     //Recv  Message
1050    ///     let msg = stream_a.recv_raw().await?;
1051    ///     //Resend Message, without deserializing
1052    ///     stream_a.send_raw(&msg)?;
1053    ///     drop(network);
1054    ///     # drop(remote);
1055    ///     # Ok(())
1056    /// })
1057    /// # }
1058    /// ```
1059    ///
1060    /// [`send_raw`]: Stream::send_raw
1061    /// [`recv`]: Stream::recv
1062    /// [`decompress`]: lz_fear::raw::decompress_raw
1063    pub async fn recv_raw(&mut self) -> Result<Message, StreamError> {
1064        match &mut self.b2a_msg_recv_r {
1065            Some(b2a_msg_recv_r) => {
1066                match b2a_msg_recv_r.recv().await {
1067                    Ok(data) => Ok(Message {
1068                        data,
1069                        #[cfg(feature = "compression")]
1070                        compressed: self.promises.contains(Promises::COMPRESSED),
1071                    }),
1072                    Err(_) => {
1073                        self.b2a_msg_recv_r = None; //prevent panic
1074                        Err(StreamError::StreamClosed)
1075                    },
1076                }
1077            },
1078            None => Err(StreamError::StreamClosed),
1079        }
1080    }
1081
1082    /// use `try_recv` to check for a Message send from the remote side by their
1083    /// `Stream`. This function does not block and returns immediately. It's
1084    /// intended for use in non-async context only. Other then that, the
1085    /// same rules apply than for [`recv`].
1086    ///
1087    /// # Example
1088    /// ```
1089    /// # use veloren_network::Promises;
1090    /// use tokio::runtime::Runtime;
1091    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
1092    ///
1093    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1094    /// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
1095    /// let runtime = Runtime::new().unwrap();
1096    /// let mut network = Network::new(Pid::new(), &runtime);
1097    /// # let remote = Network::new(Pid::new(), &runtime);
1098    /// runtime.block_on(async {
1099    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
1100    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
1101    ///     # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
1102    ///     # stream_p.send("Hello World");
1103    ///     # std::thread::sleep(std::time::Duration::from_secs(1));
1104    ///     let mut participant_a = network.connected().await?;
1105    ///     let mut stream_a = participant_a.opened().await?;
1106    ///     //Try Recv  Message
1107    ///     println!("{:?}", stream_a.try_recv::<String>()?);
1108    ///     drop(network);
1109    ///     # drop(remote);
1110    ///     # Ok(())
1111    /// })
1112    /// # }
1113    /// ```
1114    ///
1115    /// [`recv`]: Stream::recv
1116    #[inline]
1117    pub fn try_recv<M: DeserializeOwned>(&mut self) -> Result<Option<M>, StreamError> {
1118        match &mut self.b2a_msg_recv_r {
1119            Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() {
1120                Ok(data) => Ok(Some(
1121                    Message {
1122                        data,
1123                        #[cfg(feature = "compression")]
1124                        compressed: self.promises.contains(Promises::COMPRESSED),
1125                    }
1126                    .deserialize()?,
1127                )),
1128                Err(async_channel::TryRecvError::Empty) => Ok(None),
1129                Err(async_channel::TryRecvError::Closed) => {
1130                    self.b2a_msg_recv_r = None; //prevent panic
1131                    Err(StreamError::StreamClosed)
1132                },
1133            },
1134            None => Err(StreamError::StreamClosed),
1135        }
1136    }
1137
1138    pub fn params(&self) -> StreamParams {
1139        StreamParams {
1140            promises: self.promises,
1141        }
1142    }
1143}
1144
1145impl PartialEq for Participant {
1146    fn eq(&self, other: &Self) -> bool {
1147        //don't check local_pid, 2 Participant from different network should match if
1148        // they are the "same"
1149        self.remote_pid == other.remote_pid
1150    }
1151}
1152
1153fn actively_wait<T, F>(name: &'static str, mut finished_receiver: oneshot::Receiver<T>, f: F)
1154where
1155    F: FnOnce(T) + Send + 'static,
1156    T: Send + 'static,
1157{
1158    const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding";
1159
1160    if let Ok(handle) = tokio::runtime::Handle::try_current() {
1161        // When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as
1162        // other is queued behind). And we CANNOT join our Future_Handle
1163        trace!("async context detected, defer shutdown");
1164        handle.spawn(async move {
1165            match finished_receiver.await {
1166                Ok(data) => f(data),
1167                Err(e) => error!("{}{}: {}", name, CHANNEL_ERR, e),
1168            }
1169        });
1170    } else {
1171        let mut cnt = 0;
1172        loop {
1173            use tokio::sync::oneshot::error::TryRecvError;
1174            match finished_receiver.try_recv() {
1175                Ok(data) => {
1176                    f(data);
1177                    break;
1178                },
1179                Err(TryRecvError::Closed) => panic!("{}{}", name, CHANNEL_ERR),
1180                Err(TryRecvError::Empty) => {
1181                    trace!("actively sleeping");
1182                    cnt += 1;
1183                    if cnt > 10 {
1184                        error!("Timeout waiting for shutdown, dropping");
1185                        break;
1186                    }
1187                    std::thread::sleep(Duration::from_millis(100) * cnt);
1188                },
1189            }
1190        }
1191    };
1192}
1193
1194impl Drop for Network {
1195    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
1196    fn drop(&mut self) {
1197        trace!("Dropping Network");
1198        let (finished_sender, finished_receiver) = oneshot::channel();
1199        match self
1200            .shutdown_network_s
1201            .take()
1202            .unwrap()
1203            .send(finished_sender)
1204        {
1205            Err(e) => warn!(?e, "Runtime seems to be dropped already"),
1206            Ok(()) => actively_wait("network", finished_receiver, |()| {
1207                info!("Network dropped gracefully")
1208            }),
1209        };
1210    }
1211}
1212
1213impl Drop for Participant {
1214    #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
1215    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
1216    fn drop(&mut self) {
1217        const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \
1218                                    outgoing messages, dropping remaining";
1219        const SCHEDULER_ERR: &str =
1220            "Something is wrong in internal scheduler coding or you dropped the runtime to early";
1221        // ignore closed, as we need to send it even though we disconnected the
1222        // participant from network
1223        debug!("Shutting down Participant");
1224
1225        match self.a2s_disconnect_s.try_lock() {
1226            Err(e) => debug!(?e, "Participant is being dropped by Network right now"),
1227            Ok(mut s) => match s.take() {
1228                None => info!("Participant already has been shutdown gracefully"),
1229                Some(a2s_disconnect_s) => {
1230                    debug!("Disconnect from Scheduler");
1231                    let (finished_sender, finished_receiver) = oneshot::channel();
1232                    match a2s_disconnect_s
1233                        .send((self.remote_pid, (Duration::from_secs(10), finished_sender)))
1234                    {
1235                        Err(e) => warn!(?e, SCHEDULER_ERR),
1236                        Ok(()) => {
1237                            actively_wait("participant", finished_receiver, |d| match d {
1238                                Ok(()) => info!("Participant dropped gracefully"),
1239                                Err(e) => error!(?e, SHUTDOWN_ERR),
1240                            });
1241                        },
1242                    }
1243                },
1244            },
1245        }
1246    }
1247}
1248
1249impl Drop for Stream {
1250    #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
1251    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
1252
1253    fn drop(&mut self) {
1254        // send if closed is unnecessary but doesn't hurt, we must not crash
1255        let sid = self.sid;
1256        if !self.send_closed.load(Ordering::Relaxed) {
1257            debug!(?sid, "Shutting down Stream");
1258            if let Err(e) = self.a2b_close_stream_s.take().unwrap().send(self.sid) {
1259                debug!(
1260                    ?e,
1261                    "bparticipant part of a gracefully shutdown was already closed"
1262                );
1263            }
1264        } else {
1265            trace!(?sid, "Stream Drop not needed");
1266        }
1267    }
1268}
1269
1270impl std::fmt::Debug for Participant {
1271    #[inline]
1272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1273        write!(
1274            f,
1275            "Participant {{ local_pid: {:?}, remote_pid: {:?} }}",
1276            &self.local_pid, &self.remote_pid,
1277        )
1278    }
1279}
1280
1281impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
1282    fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
1283}
1284
1285impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
1286    fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
1287}
1288
1289impl<T> From<mpsc::error::SendError<T>> for NetworkError {
1290    fn from(_err: mpsc::error::SendError<T>) -> Self { NetworkError::NetworkClosed }
1291}
1292
1293impl From<oneshot::error::RecvError> for NetworkError {
1294    fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed }
1295}
1296
1297impl From<io::Error> for NetworkError {
1298    fn from(_err: io::Error) -> Self { NetworkError::NetworkClosed }
1299}
1300
1301impl From<Box<bincode::ErrorKind>> for StreamError {
1302    fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::Deserialize(err) }
1303}
1304
1305impl core::fmt::Display for StreamError {
1306    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1307        match self {
1308            StreamError::StreamClosed => write!(f, "stream closed"),
1309            #[cfg(feature = "compression")]
1310            StreamError::Compression(err) => write!(f, "compression error on message: {}", err),
1311            StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err),
1312        }
1313    }
1314}
1315
1316impl core::fmt::Display for ParticipantError {
1317    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1318        match self {
1319            ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"),
1320            ParticipantError::ProtocolFailedUnrecoverable => {
1321                write!(f, "underlying protocol failed unrecoverable")
1322            },
1323        }
1324    }
1325}
1326
1327impl core::fmt::Display for NetworkError {
1328    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1329        match self {
1330            NetworkError::NetworkClosed => write!(f, "Network closed"),
1331            NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
1332            NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
1333        }
1334    }
1335}
1336
1337impl core::fmt::Display for NetworkConnectError {
1338    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1339        match self {
1340            NetworkConnectError::Io(e) => write!(f, "Io error: {}", e),
1341            NetworkConnectError::Handshake(e) => write!(f, "Handshake error: {}", e),
1342            NetworkConnectError::InvalidSecret => {
1343                write!(f, "You specified the wrong secret on your second channel")
1344            },
1345        }
1346    }
1347}
1348
1349/// implementing PartialEq as it's super convenient in tests
1350impl PartialEq for StreamError {
1351    fn eq(&self, other: &Self) -> bool {
1352        match self {
1353            StreamError::StreamClosed => match other {
1354                StreamError::StreamClosed => true,
1355                #[cfg(feature = "compression")]
1356                StreamError::Compression(_) => false,
1357                StreamError::Deserialize(_) => false,
1358            },
1359            #[cfg(feature = "compression")]
1360            StreamError::Compression(err) => match other {
1361                StreamError::StreamClosed => false,
1362                #[cfg(feature = "compression")]
1363                StreamError::Compression(other_err) => err == other_err,
1364                StreamError::Deserialize(_) => false,
1365            },
1366            StreamError::Deserialize(err) => match other {
1367                StreamError::StreamClosed => false,
1368                #[cfg(feature = "compression")]
1369                StreamError::Compression(_) => false,
1370                StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err),
1371            },
1372        }
1373    }
1374}
1375
1376impl std::error::Error for StreamError {}
1377impl std::error::Error for ParticipantError {}
1378impl std::error::Error for NetworkError {}
1379impl std::error::Error for NetworkConnectError {}