Skip to main content

veloren_network_protocol/
tcp.rs

1use crate::{
2    RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
3    error::ProtocolError,
4    event::ProtocolEvent,
5    frame::{ITFrame, InitFrame, OTFrame},
6    handshake::{ReliableDrain, ReliableSink},
7    message::{ALLOC_BLOCK, ITMessage},
8    metrics::{ProtocolMetricCache, RemoveReason},
9    prio::PrioManager,
10    types::{Bandwidth, Mid, Promises, Sid},
11};
12use async_trait::async_trait;
13use bytes::BytesMut;
14use hashbrown::HashMap;
15use std::time::{Duration, Instant};
16use tracing::info;
17#[cfg(feature = "trace_pedantic")]
18use tracing::trace;
19
20/// TCP implementation of [`SendProtocol`]
21///
22/// [`SendProtocol`]: crate::SendProtocol
23#[derive(Debug)]
24pub struct TcpSendProtocol<D>
25where
26    D: UnreliableDrain<DataFormat = BytesMut>,
27{
28    buffer: BytesMut,
29    store: PrioManager,
30    next_mid: Mid,
31    closing_streams: Vec<Sid>,
32    notify_closing_streams: Vec<Sid>,
33    pending_shutdown: bool,
34    drain: D,
35    #[expect(dead_code)]
36    last: Instant,
37    metrics: ProtocolMetricCache,
38}
39
40/// TCP implementation of [`RecvProtocol`]
41///
42/// [`RecvProtocol`]: crate::RecvProtocol
43#[derive(Debug)]
44pub struct TcpRecvProtocol<S>
45where
46    S: UnreliableSink<DataFormat = BytesMut>,
47{
48    buffer: BytesMut,
49    itmsg_allocator: BytesMut,
50    incoming: HashMap<Mid, ITMessage>,
51    sink: S,
52    metrics: ProtocolMetricCache,
53}
54
55impl<D> TcpSendProtocol<D>
56where
57    D: UnreliableDrain<DataFormat = BytesMut>,
58{
59    pub fn new(drain: D, metrics: ProtocolMetricCache) -> Self {
60        Self {
61            buffer: BytesMut::new(),
62            store: PrioManager::new(metrics.clone()),
63            next_mid: 0u64,
64            closing_streams: vec![],
65            notify_closing_streams: vec![],
66            pending_shutdown: false,
67            drain,
68            last: Instant::now(),
69            metrics,
70        }
71    }
72
73    /// returns all promises that this Protocol can take care of
74    /// If you open a Stream anyway, unsupported promises are ignored.
75    pub fn supported_promises() -> Promises {
76        Promises::ORDERED
77            | Promises::CONSISTENCY
78            | Promises::GUARANTEED_DELIVERY
79            | Promises::COMPRESSED
80    }
81}
82
83impl<S> TcpRecvProtocol<S>
84where
85    S: UnreliableSink<DataFormat = BytesMut>,
86{
87    pub fn new(sink: S, metrics: ProtocolMetricCache) -> Self {
88        Self {
89            buffer: BytesMut::new(),
90            itmsg_allocator: BytesMut::with_capacity(ALLOC_BLOCK),
91            incoming: HashMap::new(),
92            sink,
93            metrics,
94        }
95    }
96}
97
98#[async_trait]
99impl<D> SendProtocol for TcpSendProtocol<D>
100where
101    D: UnreliableDrain<DataFormat = BytesMut>,
102{
103    type CustomErr = D::CustomErr;
104
105    fn notify_from_recv(&mut self, event: ProtocolEvent) {
106        match event {
107            ProtocolEvent::OpenStream {
108                sid,
109                prio,
110                promises,
111                guaranteed_bandwidth,
112            } => {
113                self.store
114                    .open_stream(sid, prio, promises, guaranteed_bandwidth);
115            },
116            ProtocolEvent::CloseStream { sid } if !self.store.try_close_stream(sid) => {
117                #[cfg(feature = "trace_pedantic")]
118                trace!(?sid, "hold back notify close stream");
119                self.notify_closing_streams.push(sid);
120            },
121            _ => {},
122        }
123    }
124
125    async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError<Self::CustomErr>> {
126        #[cfg(feature = "trace_pedantic")]
127        trace!(?event, "send");
128        match event {
129            ProtocolEvent::OpenStream {
130                sid,
131                prio,
132                promises,
133                guaranteed_bandwidth,
134            } => {
135                self.store
136                    .open_stream(sid, prio, promises, guaranteed_bandwidth);
137                event.to_frame().write_bytes(&mut self.buffer);
138                self.drain.send(self.buffer.split()).await?;
139            },
140            ProtocolEvent::CloseStream { sid } => {
141                if self.store.try_close_stream(sid) {
142                    event.to_frame().write_bytes(&mut self.buffer);
143                    self.drain.send(self.buffer.split()).await?;
144                } else {
145                    #[cfg(feature = "trace_pedantic")]
146                    trace!(?sid, "hold back close stream");
147                    self.closing_streams.push(sid);
148                }
149            },
150            ProtocolEvent::Shutdown => {
151                if self.store.is_empty() {
152                    event.to_frame().write_bytes(&mut self.buffer);
153                    self.drain.send(self.buffer.split()).await?;
154                } else {
155                    #[cfg(feature = "trace_pedantic")]
156                    trace!("hold back shutdown");
157                    self.pending_shutdown = true;
158                }
159            },
160            ProtocolEvent::Message { data, sid } => {
161                self.metrics.smsg_ib(sid, data.len() as u64);
162                self.store.add(data, self.next_mid, sid);
163                self.next_mid += 1;
164            },
165        }
166        Ok(())
167    }
168
169    async fn flush(
170        &mut self,
171        bandwidth: Bandwidth,
172        dt: Duration,
173    ) -> Result</* actual */ Bandwidth, ProtocolError<Self::CustomErr>> {
174        let (frames, total_bytes) = self.store.grab(bandwidth, dt);
175        self.buffer.reserve(total_bytes as usize);
176        let mut data_frames = 0;
177        let mut data_bandwidth = 0;
178        for (_, frame) in frames {
179            if let OTFrame::Data { mid: _, data } = &frame {
180                data_bandwidth += data.len();
181                data_frames += 1;
182            }
183            frame.write_bytes(&mut self.buffer);
184        }
185        self.drain.send(self.buffer.split()).await?;
186        self.metrics
187            .sdata_frames_b(data_frames, data_bandwidth as u64);
188
189        let mut finished_streams = vec![];
190        for (i, &sid) in self.closing_streams.iter().enumerate() {
191            if self.store.try_close_stream(sid) {
192                #[cfg(feature = "trace_pedantic")]
193                trace!(?sid, "close stream, as it's now empty");
194                OTFrame::CloseStream { sid }.write_bytes(&mut self.buffer);
195                self.drain.send(self.buffer.split()).await?;
196                finished_streams.push(i);
197            }
198        }
199        for i in finished_streams.iter().rev() {
200            self.closing_streams.remove(*i);
201        }
202
203        let mut finished_streams = vec![];
204        for (i, sid) in self.notify_closing_streams.iter().enumerate() {
205            if self.store.try_close_stream(*sid) {
206                #[cfg(feature = "trace_pedantic")]
207                trace!(?sid, "close stream, as it's now empty");
208                finished_streams.push(i);
209            }
210        }
211        for i in finished_streams.iter().rev() {
212            self.notify_closing_streams.remove(*i);
213        }
214
215        if self.pending_shutdown && self.store.is_empty() {
216            #[cfg(feature = "trace_pedantic")]
217            trace!("shutdown, as it's now empty");
218            OTFrame::Shutdown {}.write_bytes(&mut self.buffer);
219            self.drain.send(self.buffer.split()).await?;
220            self.pending_shutdown = false;
221        }
222        Ok(data_bandwidth as u64)
223    }
224}
225
226#[async_trait]
227impl<S> RecvProtocol for TcpRecvProtocol<S>
228where
229    S: UnreliableSink<DataFormat = BytesMut>,
230{
231    type CustomErr = S::CustomErr;
232
233    async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError<Self::CustomErr>> {
234        'outer: loop {
235            loop {
236                match ITFrame::read_frame(&mut self.buffer) {
237                    Ok(Some(frame)) => {
238                        #[cfg(feature = "trace_pedantic")]
239                        trace!(?frame, "recv");
240                        match frame {
241                            ITFrame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown),
242                            ITFrame::OpenStream {
243                                sid,
244                                prio,
245                                promises,
246                                guaranteed_bandwidth,
247                            } => {
248                                break 'outer Ok(ProtocolEvent::OpenStream {
249                                    sid,
250                                    prio: prio.min(crate::types::HIGHEST_PRIO),
251                                    promises,
252                                    guaranteed_bandwidth,
253                                });
254                            },
255                            ITFrame::CloseStream { sid } => {
256                                break 'outer Ok(ProtocolEvent::CloseStream { sid });
257                            },
258                            ITFrame::DataHeader { sid, mid, length } => {
259                                let m = ITMessage::new(sid, length, &mut self.itmsg_allocator);
260                                self.metrics.rmsg_ib(sid, length);
261                                self.incoming.insert(mid, m);
262                            },
263                            ITFrame::Data { mid, data } => {
264                                self.metrics.rdata_frames_b(data.len() as u64);
265                                let m = match self.incoming.get_mut(&mid) {
266                                    Some(m) => m,
267                                    None => {
268                                        info!(
269                                            ?mid,
270                                            "protocol violation by remote side: send Data before \
271                                             Header"
272                                        );
273                                        break 'outer Err(ProtocolError::Violated);
274                                    },
275                                };
276                                m.data.extend_from_slice(&data);
277                                if m.data.len() == m.length as usize {
278                                    // finished, yay
279                                    let m = self.incoming.remove(&mid).unwrap();
280                                    self.metrics.rmsg_ob(
281                                        m.sid,
282                                        RemoveReason::Finished,
283                                        m.data.len() as u64,
284                                    );
285                                    break 'outer Ok(ProtocolEvent::Message {
286                                        sid: m.sid,
287                                        data: m.data.freeze(),
288                                    });
289                                }
290                            },
291                        };
292                    },
293                    Ok(None) => break, //inner => read more data
294                    Err(()) => return Err(ProtocolError::Violated),
295                }
296            }
297            let chunk = self.sink.recv().await?;
298            if self.buffer.is_empty() {
299                self.buffer = chunk;
300            } else {
301                self.buffer.extend_from_slice(&chunk);
302            }
303        }
304    }
305}
306
307#[async_trait]
308impl<D> ReliableDrain for TcpSendProtocol<D>
309where
310    D: UnreliableDrain<DataFormat = BytesMut>,
311{
312    type CustomErr = D::CustomErr;
313
314    async fn send(&mut self, frame: InitFrame) -> Result<(), ProtocolError<Self::CustomErr>> {
315        let mut buffer = BytesMut::with_capacity(500);
316        frame.write_bytes(&mut buffer);
317        self.drain.send(buffer).await
318    }
319}
320
321#[async_trait]
322impl<S> ReliableSink for TcpRecvProtocol<S>
323where
324    S: UnreliableSink<DataFormat = BytesMut>,
325{
326    type CustomErr = S::CustomErr;
327
328    async fn recv(&mut self) -> Result<InitFrame, ProtocolError<Self::CustomErr>> {
329        while self.buffer.len() < 100 {
330            let chunk = self.sink.recv().await?;
331            self.buffer.extend_from_slice(&chunk);
332            if let Some(frame) = InitFrame::read_frame(&mut self.buffer) {
333                return Ok(frame);
334            }
335        }
336        Err(ProtocolError::Violated)
337    }
338}
339
340#[cfg(test)]
341mod test_utils {
342    //TCP protocol based on Channel
343    use super::*;
344    use crate::metrics::{ProtocolMetricCache, ProtocolMetrics};
345    use async_channel::*;
346    use std::sync::Arc;
347
348    pub struct TcpDrain {
349        pub sender: Sender<BytesMut>,
350    }
351
352    pub struct TcpSink {
353        pub receiver: Receiver<BytesMut>,
354    }
355
356    /// emulate Tcp protocol on Channels
357    pub fn tcp_bound(
358        cap: usize,
359        metrics: Option<ProtocolMetricCache>,
360    ) -> [(TcpSendProtocol<TcpDrain>, TcpRecvProtocol<TcpSink>); 2] {
361        let (s1, r1) = bounded(cap);
362        let (s2, r2) = bounded(cap);
363        let m = metrics.unwrap_or_else(|| {
364            ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()))
365        });
366        [
367            (
368                TcpSendProtocol::new(TcpDrain { sender: s1 }, m.clone()),
369                TcpRecvProtocol::new(TcpSink { receiver: r2 }, m.clone()),
370            ),
371            (
372                TcpSendProtocol::new(TcpDrain { sender: s2 }, m.clone()),
373                TcpRecvProtocol::new(TcpSink { receiver: r1 }, m),
374            ),
375        ]
376    }
377
378    #[async_trait]
379    impl UnreliableDrain for TcpDrain {
380        type CustomErr = ();
381        type DataFormat = BytesMut;
382
383        async fn send(
384            &mut self,
385            data: Self::DataFormat,
386        ) -> Result<(), ProtocolError<Self::CustomErr>> {
387            self.sender
388                .send(data)
389                .await
390                .map_err(|_| ProtocolError::Custom(()))
391        }
392    }
393
394    #[async_trait]
395    impl UnreliableSink for TcpSink {
396        type CustomErr = ();
397        type DataFormat = BytesMut;
398
399        async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError<Self::CustomErr>> {
400            self.receiver
401                .recv()
402                .await
403                .map_err(|_| ProtocolError::Custom(()))
404        }
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use crate::{
411        InitProtocol, ProtocolEvent, RecvProtocol, SendProtocol,
412        error::ProtocolError,
413        frame::OTFrame,
414        metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason},
415        tcp::test_utils::*,
416        types::{Pid, Promises, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, Sid},
417    };
418    use bytes::{Bytes, BytesMut};
419    use std::{sync::Arc, time::Duration};
420
421    #[tokio::test]
422    async fn handshake_all_good() {
423        let [mut p1, mut p2] = tcp_bound(10, None);
424        let r1 = tokio::spawn(async move { p1.initialize(true, Pid::fake(2), 1337).await });
425        let r2 = tokio::spawn(async move { p2.initialize(false, Pid::fake(3), 42).await });
426        let (r1, r2) = tokio::join!(r1, r2);
427        assert_eq!(r1.unwrap(), Ok((Pid::fake(3), STREAM_ID_OFFSET1, 42)));
428        assert_eq!(r2.unwrap(), Ok((Pid::fake(2), STREAM_ID_OFFSET2, 1337)));
429    }
430
431    #[tokio::test]
432    async fn open_stream() {
433        let [p1, p2] = tcp_bound(10, None);
434        let (mut s, mut r) = (p1.0, p2.1);
435        let event = ProtocolEvent::OpenStream {
436            sid: Sid::new(10),
437            prio: 0u8,
438            promises: Promises::ORDERED,
439            guaranteed_bandwidth: 1_000_000,
440        };
441        s.send(event.clone()).await.unwrap();
442        let e = r.recv().await.unwrap();
443        assert_eq!(event, e);
444    }
445
446    #[tokio::test]
447    async fn send_short_msg() {
448        let [p1, p2] = tcp_bound(10, None);
449        let (mut s, mut r) = (p1.0, p2.1);
450        let event = ProtocolEvent::OpenStream {
451            sid: Sid::new(10),
452            prio: 3u8,
453            promises: Promises::ORDERED,
454            guaranteed_bandwidth: 1_000_000,
455        };
456        s.send(event).await.unwrap();
457        let _ = r.recv().await.unwrap();
458        let event = ProtocolEvent::Message {
459            sid: Sid::new(10),
460            data: Bytes::from(&[188u8; 600][..]),
461        };
462        s.send(event.clone()).await.unwrap();
463        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
464        let e = r.recv().await.unwrap();
465        assert_eq!(event, e);
466        // 2nd short message
467        let event = ProtocolEvent::Message {
468            sid: Sid::new(10),
469            data: Bytes::from(&[7u8; 30][..]),
470        };
471        s.send(event.clone()).await.unwrap();
472        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
473        let e = r.recv().await.unwrap();
474        assert_eq!(event, e)
475    }
476
477    #[tokio::test]
478    async fn send_long_msg() {
479        let mut metrics =
480            ProtocolMetricCache::new("long_tcp", Arc::new(ProtocolMetrics::new().unwrap()));
481        let sid = Sid::new(1);
482        let [p1, p2] = tcp_bound(10000, Some(metrics.clone()));
483        let (mut s, mut r) = (p1.0, p2.1);
484        let event = ProtocolEvent::OpenStream {
485            sid,
486            prio: 5u8,
487            promises: Promises::COMPRESSED,
488            guaranteed_bandwidth: 1_000_000,
489        };
490        s.send(event).await.unwrap();
491        let _ = r.recv().await.unwrap();
492        let event = ProtocolEvent::Message {
493            sid,
494            data: Bytes::from(&[99u8; 500_000][..]),
495        };
496        s.send(event.clone()).await.unwrap();
497        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
498        let e = r.recv().await.unwrap();
499        assert_eq!(event, e);
500        metrics.assert_msg(sid, 1, RemoveReason::Finished);
501        metrics.assert_msg_bytes(sid, 500_000, RemoveReason::Finished);
502        metrics.assert_data_frames(358);
503        metrics.assert_data_frames_bytes(500_000);
504    }
505
506    #[tokio::test]
507    async fn msg_finishes_after_close() {
508        let sid = Sid::new(1);
509        let [p1, p2] = tcp_bound(10000, None);
510        let (mut s, mut r) = (p1.0, p2.1);
511        let event = ProtocolEvent::OpenStream {
512            sid,
513            prio: 5u8,
514            promises: Promises::COMPRESSED,
515            guaranteed_bandwidth: 0,
516        };
517        s.send(event).await.unwrap();
518        let _ = r.recv().await.unwrap();
519        let event = ProtocolEvent::Message {
520            sid,
521            data: Bytes::from(&[99u8; 500_000][..]),
522        };
523        s.send(event).await.unwrap();
524        let event = ProtocolEvent::CloseStream { sid };
525        s.send(event).await.unwrap();
526        //send
527        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
528        let e = r.recv().await.unwrap();
529        assert!(matches!(e, ProtocolEvent::Message { .. }));
530        let e = r.recv().await.unwrap();
531        assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
532    }
533
534    #[tokio::test]
535    async fn msg_finishes_after_shutdown() {
536        let sid = Sid::new(1);
537        let [p1, p2] = tcp_bound(10000, None);
538        let (mut s, mut r) = (p1.0, p2.1);
539        let event = ProtocolEvent::OpenStream {
540            sid,
541            prio: 5u8,
542            promises: Promises::COMPRESSED,
543            guaranteed_bandwidth: 0,
544        };
545        s.send(event).await.unwrap();
546        let _ = r.recv().await.unwrap();
547        let event = ProtocolEvent::Message {
548            sid,
549            data: Bytes::from(&[99u8; 500_000][..]),
550        };
551        s.send(event).await.unwrap();
552        let event = ProtocolEvent::Shutdown {};
553        s.send(event).await.unwrap();
554        let event = ProtocolEvent::CloseStream { sid };
555        s.send(event).await.unwrap();
556        //send
557        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
558        let e = r.recv().await.unwrap();
559        assert!(matches!(e, ProtocolEvent::Message { .. }));
560        let e = r.recv().await.unwrap();
561        assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
562        let e = r.recv().await.unwrap();
563        assert!(matches!(e, ProtocolEvent::Shutdown));
564    }
565
566    #[tokio::test]
567    async fn msg_finishes_after_drop() {
568        let sid = Sid::new(1);
569        let [p1, p2] = tcp_bound(10000, None);
570        let (mut s, mut r) = (p1.0, p2.1);
571        let event = ProtocolEvent::OpenStream {
572            sid,
573            prio: 5u8,
574            promises: Promises::COMPRESSED,
575            guaranteed_bandwidth: 0,
576        };
577        s.send(event).await.unwrap();
578        let event = ProtocolEvent::Message {
579            sid,
580            data: Bytes::from(&[99u8; 500_000][..]),
581        };
582        s.send(event).await.unwrap();
583        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
584        let event = ProtocolEvent::Message {
585            sid,
586            data: Bytes::from(&[100u8; 500_000][..]),
587        };
588        s.send(event).await.unwrap();
589        s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
590        drop(s);
591        let e = r.recv().await.unwrap();
592        assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
593        let e = r.recv().await.unwrap();
594        assert!(matches!(e, ProtocolEvent::Message { .. }));
595        let e = r.recv().await.unwrap();
596        assert!(matches!(e, ProtocolEvent::Message { .. }));
597    }
598
599    #[tokio::test]
600    async fn header_and_data_in_seperate_msg() {
601        let sid = Sid::new(1);
602        let (s, r) = async_channel::bounded(10);
603        let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
604        let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
605
606        const DATA1: &[u8; 69] =
607            b"We need to make sure that its okay to send OPEN_STREAM and DATA_HEAD ";
608        const DATA2: &[u8; 95] = b"in one chunk and (DATA and CLOSE_STREAM) in the second chunk. and then keep the connection open";
609        let mut bytes = BytesMut::with_capacity(1500);
610        OTFrame::OpenStream {
611            sid,
612            prio: 5u8,
613            promises: Promises::COMPRESSED,
614            guaranteed_bandwidth: 1_000_000,
615        }
616        .write_bytes(&mut bytes);
617        OTFrame::DataHeader {
618            mid: 99,
619            sid,
620            length: (DATA1.len() + DATA2.len()) as u64,
621        }
622        .write_bytes(&mut bytes);
623        s.send(bytes.split()).await.unwrap();
624
625        OTFrame::Data {
626            mid: 99,
627            data: Bytes::from(&DATA1[..]),
628        }
629        .write_bytes(&mut bytes);
630        OTFrame::Data {
631            mid: 99,
632            data: Bytes::from(&DATA2[..]),
633        }
634        .write_bytes(&mut bytes);
635        OTFrame::CloseStream { sid }.write_bytes(&mut bytes);
636        s.send(bytes.split()).await.unwrap();
637
638        let e = r.recv().await.unwrap();
639        assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
640
641        let e = r.recv().await.unwrap();
642        assert!(matches!(e, ProtocolEvent::Message { .. }));
643
644        let e = r.recv().await.unwrap();
645        assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
646    }
647
648    #[tokio::test]
649    async fn drop_sink_while_recv() {
650        let sid = Sid::new(1);
651        let (s, r) = async_channel::bounded(10);
652        let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
653        let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
654
655        let mut bytes = BytesMut::with_capacity(1500);
656        OTFrame::OpenStream {
657            sid,
658            prio: 5u8,
659            promises: Promises::COMPRESSED,
660            guaranteed_bandwidth: 1_000_000,
661        }
662        .write_bytes(&mut bytes);
663        s.send(bytes.split()).await.unwrap();
664        let e = r.recv().await.unwrap();
665        assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
666
667        let e = tokio::spawn(async move { r.recv().await });
668        drop(s);
669
670        let e = e.await.unwrap();
671        assert_eq!(e, Err(ProtocolError::Custom(())));
672    }
673
674    #[tokio::test]
675    #[should_panic]
676    async fn send_on_stream_from_remote_without_notify() {
677        //remote opens stream
678        //we send on it
679        let [mut p1, mut p2] = tcp_bound(10, None);
680        let event = ProtocolEvent::OpenStream {
681            sid: Sid::new(10),
682            prio: 3u8,
683            promises: Promises::ORDERED,
684            guaranteed_bandwidth: 1_000_000,
685        };
686        p1.0.send(event).await.unwrap();
687        let _ = p2.1.recv().await.unwrap();
688        let event = ProtocolEvent::Message {
689            sid: Sid::new(10),
690            data: Bytes::from(&[188u8; 600][..]),
691        };
692        p2.0.send(event.clone()).await.unwrap();
693        p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
694        let e = p1.1.recv().await.unwrap();
695        assert_eq!(event, e);
696    }
697
698    #[tokio::test]
699    async fn send_on_stream_from_remote() {
700        //remote opens stream
701        //we send on it
702        let [mut p1, mut p2] = tcp_bound(10, None);
703        let event = ProtocolEvent::OpenStream {
704            sid: Sid::new(10),
705            prio: 3u8,
706            promises: Promises::ORDERED,
707            guaranteed_bandwidth: 1_000_000,
708        };
709        p1.0.send(event).await.unwrap();
710        let e = p2.1.recv().await.unwrap();
711        p2.0.notify_from_recv(e);
712        let event = ProtocolEvent::Message {
713            sid: Sid::new(10),
714            data: Bytes::from(&[188u8; 600][..]),
715        };
716        p2.0.send(event.clone()).await.unwrap();
717        p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
718        let e = p1.1.recv().await.unwrap();
719        assert_eq!(event, e);
720    }
721}