veloren_network_protocol/
tcp.rs

1use crate::{
2    RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
3    error::ProtocolError,
4    event::ProtocolEvent,
5    frame::{ITFrame, InitFrame, OTFrame},
6    handshake::{ReliableDrain, ReliableSink},
7    message::{ALLOC_BLOCK, ITMessage},
8    metrics::{ProtocolMetricCache, RemoveReason},
9    prio::PrioManager,
10    types::{Bandwidth, Mid, Promises, Sid},
11};
12use async_trait::async_trait;
13use bytes::BytesMut;
14use hashbrown::HashMap;
15use std::time::{Duration, Instant};
16use tracing::info;
17#[cfg(feature = "trace_pedantic")]
18use tracing::trace;
19
20/// TCP implementation of [`SendProtocol`]
21///
22/// [`SendProtocol`]: crate::SendProtocol
23#[derive(Debug)]
24pub struct TcpSendProtocol<D>
25where
26    D: UnreliableDrain<DataFormat = BytesMut>,
27{
28    buffer: BytesMut,
29    store: PrioManager,
30    next_mid: Mid,
31    closing_streams: Vec<Sid>,
32    notify_closing_streams: Vec<Sid>,
33    pending_shutdown: bool,
34    drain: D,
35    #[expect(dead_code)]
36    last: Instant,
37    metrics: ProtocolMetricCache,
38}
39
40/// TCP implementation of [`RecvProtocol`]
41///
42/// [`RecvProtocol`]: crate::RecvProtocol
43#[derive(Debug)]
44pub struct TcpRecvProtocol<S>
45where
46    S: UnreliableSink<DataFormat = BytesMut>,
47{
48    buffer: BytesMut,
49    itmsg_allocator: BytesMut,
50    incoming: HashMap<Mid, ITMessage>,
51    sink: S,
52    metrics: ProtocolMetricCache,
53}
54
55impl<D> TcpSendProtocol<D>
56where
57    D: UnreliableDrain<DataFormat = BytesMut>,
58{
59    pub fn new(drain: D, metrics: ProtocolMetricCache) -> Self {
60        Self {
61            buffer: BytesMut::new(),
62            store: PrioManager::new(metrics.clone()),
63            next_mid: 0u64,
64            closing_streams: vec![],
65            notify_closing_streams: vec![],
66            pending_shutdown: false,
67            drain,
68            last: Instant::now(),
69            metrics,
70        }
71    }
72
73    /// returns all promises that this Protocol can take care of
74    /// If you open a Stream anyway, unsupported promises are ignored.
75    pub fn supported_promises() -> Promises {
76        Promises::ORDERED
77            | Promises::CONSISTENCY
78            | Promises::GUARANTEED_DELIVERY
79            | Promises::COMPRESSED
80    }
81}
82
83impl<S> TcpRecvProtocol<S>
84where
85    S: UnreliableSink<DataFormat = BytesMut>,
86{
87    pub fn new(sink: S, metrics: ProtocolMetricCache) -> Self {
88        Self {
89            buffer: BytesMut::new(),
90            itmsg_allocator: BytesMut::with_capacity(ALLOC_BLOCK),
91            incoming: HashMap::new(),
92            sink,
93            metrics,
94        }
95    }
96}
97
98#[async_trait]
99impl<D> SendProtocol for TcpSendProtocol<D>
100where
101    D: UnreliableDrain<DataFormat = BytesMut>,
102{
103    type CustomErr = D::CustomErr;
104
105    fn notify_from_recv(&mut self, event: ProtocolEvent) {
106        match event {
107            ProtocolEvent::OpenStream {
108                sid,
109                prio,
110                promises,
111                guaranteed_bandwidth,
112            } => {
113                self.store
114                    .open_stream(sid, prio, promises, guaranteed_bandwidth);
115            },
116            ProtocolEvent::CloseStream { sid } => {
117                if !self.store.try_close_stream(sid) {
118                    #[cfg(feature = "trace_pedantic")]
119                    trace!(?sid, "hold back notify close stream");
120                    self.notify_closing_streams.push(sid);
121                }
122            },
123            _ => {},
124        }
125    }
126
127    async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError<Self::CustomErr>> {
128        #[cfg(feature = "trace_pedantic")]
129        trace!(?event, "send");
130        match event {
131            ProtocolEvent::OpenStream {
132                sid,
133                prio,
134                promises,
135                guaranteed_bandwidth,
136            } => {
137                self.store
138                    .open_stream(sid, prio, promises, guaranteed_bandwidth);
139                event.to_frame().write_bytes(&mut self.buffer);
140                self.drain.send(self.buffer.split()).await?;
141            },
142            ProtocolEvent::CloseStream { sid } => {
143                if self.store.try_close_stream(sid) {
144                    event.to_frame().write_bytes(&mut self.buffer);
145                    self.drain.send(self.buffer.split()).await?;
146                } else {
147                    #[cfg(feature = "trace_pedantic")]
148                    trace!(?sid, "hold back close stream");
149                    self.closing_streams.push(sid);
150                }
151            },
152            ProtocolEvent::Shutdown => {
153                if self.store.is_empty() {
154                    event.to_frame().write_bytes(&mut self.buffer);
155                    self.drain.send(self.buffer.split()).await?;
156                } else {
157                    #[cfg(feature = "trace_pedantic")]
158                    trace!("hold back shutdown");
159                    self.pending_shutdown = true;
160                }
161            },
162            ProtocolEvent::Message { data, sid } => {
163                self.metrics.smsg_ib(sid, data.len() as u64);
164                self.store.add(data, self.next_mid, sid);
165                self.next_mid += 1;
166            },
167        }
168        Ok(())
169    }
170
171    async fn flush(
172        &mut self,
173        bandwidth: Bandwidth,
174        dt: Duration,
175    ) -> Result</* actual */ Bandwidth, ProtocolError<Self::CustomErr>> {
176        let (frames, total_bytes) = self.store.grab(bandwidth, dt);
177        self.buffer.reserve(total_bytes as usize);
178        let mut data_frames = 0;
179        let mut data_bandwidth = 0;
180        for (_, frame) in frames {
181            if let OTFrame::Data { mid: _, data } = &frame {
182                data_bandwidth += data.len();
183                data_frames += 1;
184            }
185            frame.write_bytes(&mut self.buffer);
186        }
187        self.drain.send(self.buffer.split()).await?;
188        self.metrics
189            .sdata_frames_b(data_frames, data_bandwidth as u64);
190
191        let mut finished_streams = vec![];
192        for (i, &sid) in self.closing_streams.iter().enumerate() {
193            if self.store.try_close_stream(sid) {
194                #[cfg(feature = "trace_pedantic")]
195                trace!(?sid, "close stream, as it's now empty");
196                OTFrame::CloseStream { sid }.write_bytes(&mut self.buffer);
197                self.drain.send(self.buffer.split()).await?;
198                finished_streams.push(i);
199            }
200        }
201        for i in finished_streams.iter().rev() {
202            self.closing_streams.remove(*i);
203        }
204
205        let mut finished_streams = vec![];
206        for (i, sid) in self.notify_closing_streams.iter().enumerate() {
207            if self.store.try_close_stream(*sid) {
208                #[cfg(feature = "trace_pedantic")]
209                trace!(?sid, "close stream, as it's now empty");
210                finished_streams.push(i);
211            }
212        }
213        for i in finished_streams.iter().rev() {
214            self.notify_closing_streams.remove(*i);
215        }
216
217        if self.pending_shutdown && self.store.is_empty() {
218            #[cfg(feature = "trace_pedantic")]
219            trace!("shutdown, as it's now empty");
220            OTFrame::Shutdown {}.write_bytes(&mut self.buffer);
221            self.drain.send(self.buffer.split()).await?;
222            self.pending_shutdown = false;
223        }
224        Ok(data_bandwidth as u64)
225    }
226}
227
228#[async_trait]
229impl<S> RecvProtocol for TcpRecvProtocol<S>
230where
231    S: UnreliableSink<DataFormat = BytesMut>,
232{
233    type CustomErr = S::CustomErr;
234
235    async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError<Self::CustomErr>> {
236        'outer: loop {
237            loop {
238                match ITFrame::read_frame(&mut self.buffer) {
239                    Ok(Some(frame)) => {
240                        #[cfg(feature = "trace_pedantic")]
241                        trace!(?frame, "recv");
242                        match frame {
243                            ITFrame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown),
244                            ITFrame::OpenStream {
245                                sid,
246                                prio,
247                                promises,
248                                guaranteed_bandwidth,
249                            } => {
250                                break 'outer Ok(ProtocolEvent::OpenStream {
251                                    sid,
252                                    prio: prio.min(crate::types::HIGHEST_PRIO),
253                                    promises,
254                                    guaranteed_bandwidth,
255                                });
256                            },
257                            ITFrame::CloseStream { sid } => {
258                                break 'outer Ok(ProtocolEvent::CloseStream { sid });
259                            },
260                            ITFrame::DataHeader { sid, mid, length } => {
261                                let m = ITMessage::new(sid, length, &mut self.itmsg_allocator);
262                                self.metrics.rmsg_ib(sid, length);
263                                self.incoming.insert(mid, m);
264                            },
265                            ITFrame::Data { mid, data } => {
266                                self.metrics.rdata_frames_b(data.len() as u64);
267                                let m = match self.incoming.get_mut(&mid) {
268                                    Some(m) => m,
269                                    None => {
270                                        info!(
271                                            ?mid,
272                                            "protocol violation by remote side: send Data before \
273                                             Header"
274                                        );
275                                        break 'outer Err(ProtocolError::Violated);
276                                    },
277                                };
278                                m.data.extend_from_slice(&data);
279                                if m.data.len() == m.length as usize {
280                                    // finished, yay
281                                    let m = self.incoming.remove(&mid).unwrap();
282                                    self.metrics.rmsg_ob(
283                                        m.sid,
284                                        RemoveReason::Finished,
285                                        m.data.len() as u64,
286                                    );
287                                    break 'outer Ok(ProtocolEvent::Message {
288                                        sid: m.sid,
289                                        data: m.data.freeze(),
290                                    });
291                                }
292                            },
293                        };
294                    },
295                    Ok(None) => break, //inner => read more data
296                    Err(()) => return Err(ProtocolError::Violated),
297                }
298            }
299            let chunk = self.sink.recv().await?;
300            if self.buffer.is_empty() {
301                self.buffer = chunk;
302            } else {
303                self.buffer.extend_from_slice(&chunk);
304            }
305        }
306    }
307}
308
309#[async_trait]
310impl<D> ReliableDrain for TcpSendProtocol<D>
311where
312    D: UnreliableDrain<DataFormat = BytesMut>,
313{
314    type CustomErr = D::CustomErr;
315
316    async fn send(&mut self, frame: InitFrame) -> Result<(), ProtocolError<Self::CustomErr>> {
317        let mut buffer = BytesMut::with_capacity(500);
318        frame.write_bytes(&mut buffer);
319        self.drain.send(buffer).await
320    }
321}
322
323#[async_trait]
324impl<S> ReliableSink for TcpRecvProtocol<S>
325where
326    S: UnreliableSink<DataFormat = BytesMut>,
327{
328    type CustomErr = S::CustomErr;
329
330    async fn recv(&mut self) -> Result<InitFrame, ProtocolError<Self::CustomErr>> {
331        while self.buffer.len() < 100 {
332            let chunk = self.sink.recv().await?;
333            self.buffer.extend_from_slice(&chunk);
334            if let Some(frame) = InitFrame::read_frame(&mut self.buffer) {
335                return Ok(frame);
336            }
337        }
338        Err(ProtocolError::Violated)
339    }
340}
341
342#[cfg(test)]
343mod test_utils {
344    //TCP protocol based on Channel
345    use super::*;
346    use crate::metrics::{ProtocolMetricCache, ProtocolMetrics};
347    use async_channel::*;
348    use std::sync::Arc;
349
350    pub struct TcpDrain {
351        pub sender: Sender<BytesMut>,
352    }
353
354    pub struct TcpSink {
355        pub receiver: Receiver<BytesMut>,
356    }
357
358    /// emulate Tcp protocol on Channels
359    pub fn tcp_bound(
360        cap: usize,
361        metrics: Option<ProtocolMetricCache>,
362    ) -> [(TcpSendProtocol<TcpDrain>, TcpRecvProtocol<TcpSink>); 2] {
363        let (s1, r1) = bounded(cap);
364        let (s2, r2) = bounded(cap);
365        let m = metrics.unwrap_or_else(|| {
366            ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()))
367        });
368        [
369            (
370                TcpSendProtocol::new(TcpDrain { sender: s1 }, m.clone()),
371                TcpRecvProtocol::new(TcpSink { receiver: r2 }, m.clone()),
372            ),
373            (
374                TcpSendProtocol::new(TcpDrain { sender: s2 }, m.clone()),
375                TcpRecvProtocol::new(TcpSink { receiver: r1 }, m),
376            ),
377        ]
378    }
379
380    #[async_trait]
381    impl UnreliableDrain for TcpDrain {
382        type CustomErr = ();
383        type DataFormat = BytesMut;
384
385        async fn send(
386            &mut self,
387            data: Self::DataFormat,
388        ) -> Result<(), ProtocolError<Self::CustomErr>> {
389            self.sender
390                .send(data)
391                .await
392                .map_err(|_| ProtocolError::Custom(()))
393        }
394    }
395
396    #[async_trait]
397    impl UnreliableSink for TcpSink {
398        type CustomErr = ();
399        type DataFormat = BytesMut;
400
401        async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError<Self::CustomErr>> {
402            self.receiver
403                .recv()
404                .await
405                .map_err(|_| ProtocolError::Custom(()))
406        }
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use crate::{
413        InitProtocol, ProtocolEvent, RecvProtocol, SendProtocol,
414        error::ProtocolError,
415        frame::OTFrame,
416        metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason},
417        tcp::test_utils::*,
418        types::{Pid, Promises, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, Sid},
419    };
420    use bytes::{Bytes, BytesMut};
421    use std::{sync::Arc, time::Duration};
422
423    #[tokio::test]
424    async fn handshake_all_good() {
425        let [mut p1, mut p2] = tcp_bound(10, None);
426        let r1 = tokio::spawn(async move { p1.initialize(true, Pid::fake(2), 1337).await });
427        let r2 = tokio::spawn(async move { p2.initialize(false, Pid::fake(3), 42).await });
428        let (r1, r2) = tokio::join!(r1, r2);
429        assert_eq!(r1.unwrap(), Ok((Pid::fake(3), STREAM_ID_OFFSET1, 42)));
430        assert_eq!(r2.unwrap(), Ok((Pid::fake(2), STREAM_ID_OFFSET2, 1337)));
431    }
432
433    #[tokio::test]
434    async fn open_stream() {
435        let [p1, p2] = tcp_bound(10, None);
436        let (mut s, mut r) = (p1.0, p2.1);
437        let event = ProtocolEvent::OpenStream {
438            sid: Sid::new(10),
439            prio: 0u8,
440            promises: Promises::ORDERED,
441            guaranteed_bandwidth: 1_000_000,
442        };
443        s.send(event.clone()).await.unwrap();
444        let e = r.recv().await.unwrap();
445        assert_eq!(event, e);
446    }
447
448    #[tokio::test]
449    async fn send_short_msg() {
450        let [p1, p2] = tcp_bound(10, None);
451        let (mut s, mut r) = (p1.0, p2.1);
452        let event = ProtocolEvent::OpenStream {
453            sid: Sid::new(10),
454            prio: 3u8,
455            promises: Promises::ORDERED,
456            guaranteed_bandwidth: 1_000_000,
457        };
458        s.send(event).await.unwrap();
459        let _ = r.recv().await.unwrap();
460        let event = ProtocolEvent::Message {
461            sid: Sid::new(10),
462            data: Bytes::from(&[188u8; 600][..]),
463        };
464        s.send(event.clone()).await.unwrap();
465        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
466        let e = r.recv().await.unwrap();
467        assert_eq!(event, e);
468        // 2nd short message
469        let event = ProtocolEvent::Message {
470            sid: Sid::new(10),
471            data: Bytes::from(&[7u8; 30][..]),
472        };
473        s.send(event.clone()).await.unwrap();
474        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
475        let e = r.recv().await.unwrap();
476        assert_eq!(event, e)
477    }
478
479    #[tokio::test]
480    async fn send_long_msg() {
481        let mut metrics =
482            ProtocolMetricCache::new("long_tcp", Arc::new(ProtocolMetrics::new().unwrap()));
483        let sid = Sid::new(1);
484        let [p1, p2] = tcp_bound(10000, Some(metrics.clone()));
485        let (mut s, mut r) = (p1.0, p2.1);
486        let event = ProtocolEvent::OpenStream {
487            sid,
488            prio: 5u8,
489            promises: Promises::COMPRESSED,
490            guaranteed_bandwidth: 1_000_000,
491        };
492        s.send(event).await.unwrap();
493        let _ = r.recv().await.unwrap();
494        let event = ProtocolEvent::Message {
495            sid,
496            data: Bytes::from(&[99u8; 500_000][..]),
497        };
498        s.send(event.clone()).await.unwrap();
499        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
500        let e = r.recv().await.unwrap();
501        assert_eq!(event, e);
502        metrics.assert_msg(sid, 1, RemoveReason::Finished);
503        metrics.assert_msg_bytes(sid, 500_000, RemoveReason::Finished);
504        metrics.assert_data_frames(358);
505        metrics.assert_data_frames_bytes(500_000);
506    }
507
508    #[tokio::test]
509    async fn msg_finishes_after_close() {
510        let sid = Sid::new(1);
511        let [p1, p2] = tcp_bound(10000, None);
512        let (mut s, mut r) = (p1.0, p2.1);
513        let event = ProtocolEvent::OpenStream {
514            sid,
515            prio: 5u8,
516            promises: Promises::COMPRESSED,
517            guaranteed_bandwidth: 0,
518        };
519        s.send(event).await.unwrap();
520        let _ = r.recv().await.unwrap();
521        let event = ProtocolEvent::Message {
522            sid,
523            data: Bytes::from(&[99u8; 500_000][..]),
524        };
525        s.send(event).await.unwrap();
526        let event = ProtocolEvent::CloseStream { sid };
527        s.send(event).await.unwrap();
528        //send
529        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
530        let e = r.recv().await.unwrap();
531        assert!(matches!(e, ProtocolEvent::Message { .. }));
532        let e = r.recv().await.unwrap();
533        assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
534    }
535
536    #[tokio::test]
537    async fn msg_finishes_after_shutdown() {
538        let sid = Sid::new(1);
539        let [p1, p2] = tcp_bound(10000, None);
540        let (mut s, mut r) = (p1.0, p2.1);
541        let event = ProtocolEvent::OpenStream {
542            sid,
543            prio: 5u8,
544            promises: Promises::COMPRESSED,
545            guaranteed_bandwidth: 0,
546        };
547        s.send(event).await.unwrap();
548        let _ = r.recv().await.unwrap();
549        let event = ProtocolEvent::Message {
550            sid,
551            data: Bytes::from(&[99u8; 500_000][..]),
552        };
553        s.send(event).await.unwrap();
554        let event = ProtocolEvent::Shutdown {};
555        s.send(event).await.unwrap();
556        let event = ProtocolEvent::CloseStream { sid };
557        s.send(event).await.unwrap();
558        //send
559        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
560        let e = r.recv().await.unwrap();
561        assert!(matches!(e, ProtocolEvent::Message { .. }));
562        let e = r.recv().await.unwrap();
563        assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
564        let e = r.recv().await.unwrap();
565        assert!(matches!(e, ProtocolEvent::Shutdown));
566    }
567
568    #[tokio::test]
569    async fn msg_finishes_after_drop() {
570        let sid = Sid::new(1);
571        let [p1, p2] = tcp_bound(10000, None);
572        let (mut s, mut r) = (p1.0, p2.1);
573        let event = ProtocolEvent::OpenStream {
574            sid,
575            prio: 5u8,
576            promises: Promises::COMPRESSED,
577            guaranteed_bandwidth: 0,
578        };
579        s.send(event).await.unwrap();
580        let event = ProtocolEvent::Message {
581            sid,
582            data: Bytes::from(&[99u8; 500_000][..]),
583        };
584        s.send(event).await.unwrap();
585        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
586        let event = ProtocolEvent::Message {
587            sid,
588            data: Bytes::from(&[100u8; 500_000][..]),
589        };
590        s.send(event).await.unwrap();
591        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
592        drop(s);
593        let e = r.recv().await.unwrap();
594        assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
595        let e = r.recv().await.unwrap();
596        assert!(matches!(e, ProtocolEvent::Message { .. }));
597        let e = r.recv().await.unwrap();
598        assert!(matches!(e, ProtocolEvent::Message { .. }));
599    }
600
601    #[tokio::test]
602    async fn header_and_data_in_seperate_msg() {
603        let sid = Sid::new(1);
604        let (s, r) = async_channel::bounded(10);
605        let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
606        let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
607
608        const DATA1: &[u8; 69] =
609            b"We need to make sure that its okay to send OPEN_STREAM and DATA_HEAD ";
610        const DATA2: &[u8; 95] = b"in one chunk and (DATA and CLOSE_STREAM) in the second chunk. and then keep the connection open";
611        let mut bytes = BytesMut::with_capacity(1500);
612        OTFrame::OpenStream {
613            sid,
614            prio: 5u8,
615            promises: Promises::COMPRESSED,
616            guaranteed_bandwidth: 1_000_000,
617        }
618        .write_bytes(&mut bytes);
619        OTFrame::DataHeader {
620            mid: 99,
621            sid,
622            length: (DATA1.len() + DATA2.len()) as u64,
623        }
624        .write_bytes(&mut bytes);
625        s.send(bytes.split()).await.unwrap();
626
627        OTFrame::Data {
628            mid: 99,
629            data: Bytes::from(&DATA1[..]),
630        }
631        .write_bytes(&mut bytes);
632        OTFrame::Data {
633            mid: 99,
634            data: Bytes::from(&DATA2[..]),
635        }
636        .write_bytes(&mut bytes);
637        OTFrame::CloseStream { sid }.write_bytes(&mut bytes);
638        s.send(bytes.split()).await.unwrap();
639
640        let e = r.recv().await.unwrap();
641        assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
642
643        let e = r.recv().await.unwrap();
644        assert!(matches!(e, ProtocolEvent::Message { .. }));
645
646        let e = r.recv().await.unwrap();
647        assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
648    }
649
650    #[tokio::test]
651    async fn drop_sink_while_recv() {
652        let sid = Sid::new(1);
653        let (s, r) = async_channel::bounded(10);
654        let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
655        let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
656
657        let mut bytes = BytesMut::with_capacity(1500);
658        OTFrame::OpenStream {
659            sid,
660            prio: 5u8,
661            promises: Promises::COMPRESSED,
662            guaranteed_bandwidth: 1_000_000,
663        }
664        .write_bytes(&mut bytes);
665        s.send(bytes.split()).await.unwrap();
666        let e = r.recv().await.unwrap();
667        assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
668
669        let e = tokio::spawn(async move { r.recv().await });
670        drop(s);
671
672        let e = e.await.unwrap();
673        assert_eq!(e, Err(ProtocolError::Custom(())));
674    }
675
676    #[tokio::test]
677    #[should_panic]
678    async fn send_on_stream_from_remote_without_notify() {
679        //remote opens stream
680        //we send on it
681        let [mut p1, mut p2] = tcp_bound(10, None);
682        let event = ProtocolEvent::OpenStream {
683            sid: Sid::new(10),
684            prio: 3u8,
685            promises: Promises::ORDERED,
686            guaranteed_bandwidth: 1_000_000,
687        };
688        p1.0.send(event).await.unwrap();
689        let _ = p2.1.recv().await.unwrap();
690        let event = ProtocolEvent::Message {
691            sid: Sid::new(10),
692            data: Bytes::from(&[188u8; 600][..]),
693        };
694        p2.0.send(event.clone()).await.unwrap();
695        p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
696        let e = p1.1.recv().await.unwrap();
697        assert_eq!(event, e);
698    }
699
700    #[tokio::test]
701    async fn send_on_stream_from_remote() {
702        //remote opens stream
703        //we send on it
704        let [mut p1, mut p2] = tcp_bound(10, None);
705        let event = ProtocolEvent::OpenStream {
706            sid: Sid::new(10),
707            prio: 3u8,
708            promises: Promises::ORDERED,
709            guaranteed_bandwidth: 1_000_000,
710        };
711        p1.0.send(event).await.unwrap();
712        let e = p2.1.recv().await.unwrap();
713        p2.0.notify_from_recv(e);
714        let event = ProtocolEvent::Message {
715            sid: Sid::new(10),
716            data: Bytes::from(&[188u8; 600][..]),
717        };
718        p2.0.send(event.clone()).await.unwrap();
719        p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
720        let e = p1.1.recv().await.unwrap();
721        assert_eq!(event, e);
722    }
723}