1use crate::{
2 RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
3 error::ProtocolError,
4 event::ProtocolEvent,
5 frame::{ITFrame, InitFrame, OTFrame},
6 handshake::{ReliableDrain, ReliableSink},
7 message::{ALLOC_BLOCK, ITMessage},
8 metrics::{ProtocolMetricCache, RemoveReason},
9 prio::PrioManager,
10 types::{Bandwidth, Mid, Promises, Sid},
11};
12use async_trait::async_trait;
13use bytes::BytesMut;
14use hashbrown::HashMap;
15use std::time::{Duration, Instant};
16use tracing::info;
17#[cfg(feature = "trace_pedantic")]
18use tracing::trace;
19
20#[derive(Debug)]
24pub struct TcpSendProtocol<D>
25where
26 D: UnreliableDrain<DataFormat = BytesMut>,
27{
28 buffer: BytesMut,
29 store: PrioManager,
30 next_mid: Mid,
31 closing_streams: Vec<Sid>,
32 notify_closing_streams: Vec<Sid>,
33 pending_shutdown: bool,
34 drain: D,
35 #[expect(dead_code)]
36 last: Instant,
37 metrics: ProtocolMetricCache,
38}
39
40#[derive(Debug)]
44pub struct TcpRecvProtocol<S>
45where
46 S: UnreliableSink<DataFormat = BytesMut>,
47{
48 buffer: BytesMut,
49 itmsg_allocator: BytesMut,
50 incoming: HashMap<Mid, ITMessage>,
51 sink: S,
52 metrics: ProtocolMetricCache,
53}
54
55impl<D> TcpSendProtocol<D>
56where
57 D: UnreliableDrain<DataFormat = BytesMut>,
58{
59 pub fn new(drain: D, metrics: ProtocolMetricCache) -> Self {
60 Self {
61 buffer: BytesMut::new(),
62 store: PrioManager::new(metrics.clone()),
63 next_mid: 0u64,
64 closing_streams: vec![],
65 notify_closing_streams: vec![],
66 pending_shutdown: false,
67 drain,
68 last: Instant::now(),
69 metrics,
70 }
71 }
72
73 pub fn supported_promises() -> Promises {
76 Promises::ORDERED
77 | Promises::CONSISTENCY
78 | Promises::GUARANTEED_DELIVERY
79 | Promises::COMPRESSED
80 }
81}
82
83impl<S> TcpRecvProtocol<S>
84where
85 S: UnreliableSink<DataFormat = BytesMut>,
86{
87 pub fn new(sink: S, metrics: ProtocolMetricCache) -> Self {
88 Self {
89 buffer: BytesMut::new(),
90 itmsg_allocator: BytesMut::with_capacity(ALLOC_BLOCK),
91 incoming: HashMap::new(),
92 sink,
93 metrics,
94 }
95 }
96}
97
98#[async_trait]
99impl<D> SendProtocol for TcpSendProtocol<D>
100where
101 D: UnreliableDrain<DataFormat = BytesMut>,
102{
103 type CustomErr = D::CustomErr;
104
105 fn notify_from_recv(&mut self, event: ProtocolEvent) {
106 match event {
107 ProtocolEvent::OpenStream {
108 sid,
109 prio,
110 promises,
111 guaranteed_bandwidth,
112 } => {
113 self.store
114 .open_stream(sid, prio, promises, guaranteed_bandwidth);
115 },
116 ProtocolEvent::CloseStream { sid } if !self.store.try_close_stream(sid) => {
117 #[cfg(feature = "trace_pedantic")]
118 trace!(?sid, "hold back notify close stream");
119 self.notify_closing_streams.push(sid);
120 },
121 _ => {},
122 }
123 }
124
125 async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError<Self::CustomErr>> {
126 #[cfg(feature = "trace_pedantic")]
127 trace!(?event, "send");
128 match event {
129 ProtocolEvent::OpenStream {
130 sid,
131 prio,
132 promises,
133 guaranteed_bandwidth,
134 } => {
135 self.store
136 .open_stream(sid, prio, promises, guaranteed_bandwidth);
137 event.to_frame().write_bytes(&mut self.buffer);
138 self.drain.send(self.buffer.split()).await?;
139 },
140 ProtocolEvent::CloseStream { sid } => {
141 if self.store.try_close_stream(sid) {
142 event.to_frame().write_bytes(&mut self.buffer);
143 self.drain.send(self.buffer.split()).await?;
144 } else {
145 #[cfg(feature = "trace_pedantic")]
146 trace!(?sid, "hold back close stream");
147 self.closing_streams.push(sid);
148 }
149 },
150 ProtocolEvent::Shutdown => {
151 if self.store.is_empty() {
152 event.to_frame().write_bytes(&mut self.buffer);
153 self.drain.send(self.buffer.split()).await?;
154 } else {
155 #[cfg(feature = "trace_pedantic")]
156 trace!("hold back shutdown");
157 self.pending_shutdown = true;
158 }
159 },
160 ProtocolEvent::Message { data, sid } => {
161 self.metrics.smsg_ib(sid, data.len() as u64);
162 self.store.add(data, self.next_mid, sid);
163 self.next_mid += 1;
164 },
165 }
166 Ok(())
167 }
168
169 async fn flush(
170 &mut self,
171 bandwidth: Bandwidth,
172 dt: Duration,
173 ) -> Result<Bandwidth, ProtocolError<Self::CustomErr>> {
174 let (frames, total_bytes) = self.store.grab(bandwidth, dt);
175 self.buffer.reserve(total_bytes as usize);
176 let mut data_frames = 0;
177 let mut data_bandwidth = 0;
178 for (_, frame) in frames {
179 if let OTFrame::Data { mid: _, data } = &frame {
180 data_bandwidth += data.len();
181 data_frames += 1;
182 }
183 frame.write_bytes(&mut self.buffer);
184 }
185 self.drain.send(self.buffer.split()).await?;
186 self.metrics
187 .sdata_frames_b(data_frames, data_bandwidth as u64);
188
189 let mut finished_streams = vec![];
190 for (i, &sid) in self.closing_streams.iter().enumerate() {
191 if self.store.try_close_stream(sid) {
192 #[cfg(feature = "trace_pedantic")]
193 trace!(?sid, "close stream, as it's now empty");
194 OTFrame::CloseStream { sid }.write_bytes(&mut self.buffer);
195 self.drain.send(self.buffer.split()).await?;
196 finished_streams.push(i);
197 }
198 }
199 for i in finished_streams.iter().rev() {
200 self.closing_streams.remove(*i);
201 }
202
203 let mut finished_streams = vec![];
204 for (i, sid) in self.notify_closing_streams.iter().enumerate() {
205 if self.store.try_close_stream(*sid) {
206 #[cfg(feature = "trace_pedantic")]
207 trace!(?sid, "close stream, as it's now empty");
208 finished_streams.push(i);
209 }
210 }
211 for i in finished_streams.iter().rev() {
212 self.notify_closing_streams.remove(*i);
213 }
214
215 if self.pending_shutdown && self.store.is_empty() {
216 #[cfg(feature = "trace_pedantic")]
217 trace!("shutdown, as it's now empty");
218 OTFrame::Shutdown {}.write_bytes(&mut self.buffer);
219 self.drain.send(self.buffer.split()).await?;
220 self.pending_shutdown = false;
221 }
222 Ok(data_bandwidth as u64)
223 }
224}
225
226#[async_trait]
227impl<S> RecvProtocol for TcpRecvProtocol<S>
228where
229 S: UnreliableSink<DataFormat = BytesMut>,
230{
231 type CustomErr = S::CustomErr;
232
233 async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError<Self::CustomErr>> {
234 'outer: loop {
235 loop {
236 match ITFrame::read_frame(&mut self.buffer) {
237 Ok(Some(frame)) => {
238 #[cfg(feature = "trace_pedantic")]
239 trace!(?frame, "recv");
240 match frame {
241 ITFrame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown),
242 ITFrame::OpenStream {
243 sid,
244 prio,
245 promises,
246 guaranteed_bandwidth,
247 } => {
248 break 'outer Ok(ProtocolEvent::OpenStream {
249 sid,
250 prio: prio.min(crate::types::HIGHEST_PRIO),
251 promises,
252 guaranteed_bandwidth,
253 });
254 },
255 ITFrame::CloseStream { sid } => {
256 break 'outer Ok(ProtocolEvent::CloseStream { sid });
257 },
258 ITFrame::DataHeader { sid, mid, length } => {
259 let m = ITMessage::new(sid, length, &mut self.itmsg_allocator);
260 self.metrics.rmsg_ib(sid, length);
261 self.incoming.insert(mid, m);
262 },
263 ITFrame::Data { mid, data } => {
264 self.metrics.rdata_frames_b(data.len() as u64);
265 let m = match self.incoming.get_mut(&mid) {
266 Some(m) => m,
267 None => {
268 info!(
269 ?mid,
270 "protocol violation by remote side: send Data before \
271 Header"
272 );
273 break 'outer Err(ProtocolError::Violated);
274 },
275 };
276 m.data.extend_from_slice(&data);
277 if m.data.len() == m.length as usize {
278 let m = self.incoming.remove(&mid).unwrap();
280 self.metrics.rmsg_ob(
281 m.sid,
282 RemoveReason::Finished,
283 m.data.len() as u64,
284 );
285 break 'outer Ok(ProtocolEvent::Message {
286 sid: m.sid,
287 data: m.data.freeze(),
288 });
289 }
290 },
291 };
292 },
293 Ok(None) => break, Err(()) => return Err(ProtocolError::Violated),
295 }
296 }
297 let chunk = self.sink.recv().await?;
298 if self.buffer.is_empty() {
299 self.buffer = chunk;
300 } else {
301 self.buffer.extend_from_slice(&chunk);
302 }
303 }
304 }
305}
306
307#[async_trait]
308impl<D> ReliableDrain for TcpSendProtocol<D>
309where
310 D: UnreliableDrain<DataFormat = BytesMut>,
311{
312 type CustomErr = D::CustomErr;
313
314 async fn send(&mut self, frame: InitFrame) -> Result<(), ProtocolError<Self::CustomErr>> {
315 let mut buffer = BytesMut::with_capacity(500);
316 frame.write_bytes(&mut buffer);
317 self.drain.send(buffer).await
318 }
319}
320
321#[async_trait]
322impl<S> ReliableSink for TcpRecvProtocol<S>
323where
324 S: UnreliableSink<DataFormat = BytesMut>,
325{
326 type CustomErr = S::CustomErr;
327
328 async fn recv(&mut self) -> Result<InitFrame, ProtocolError<Self::CustomErr>> {
329 while self.buffer.len() < 100 {
330 let chunk = self.sink.recv().await?;
331 self.buffer.extend_from_slice(&chunk);
332 if let Some(frame) = InitFrame::read_frame(&mut self.buffer) {
333 return Ok(frame);
334 }
335 }
336 Err(ProtocolError::Violated)
337 }
338}
339
340#[cfg(test)]
341mod test_utils {
342 use super::*;
344 use crate::metrics::{ProtocolMetricCache, ProtocolMetrics};
345 use async_channel::*;
346 use std::sync::Arc;
347
348 pub struct TcpDrain {
349 pub sender: Sender<BytesMut>,
350 }
351
352 pub struct TcpSink {
353 pub receiver: Receiver<BytesMut>,
354 }
355
356 pub fn tcp_bound(
358 cap: usize,
359 metrics: Option<ProtocolMetricCache>,
360 ) -> [(TcpSendProtocol<TcpDrain>, TcpRecvProtocol<TcpSink>); 2] {
361 let (s1, r1) = bounded(cap);
362 let (s2, r2) = bounded(cap);
363 let m = metrics.unwrap_or_else(|| {
364 ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()))
365 });
366 [
367 (
368 TcpSendProtocol::new(TcpDrain { sender: s1 }, m.clone()),
369 TcpRecvProtocol::new(TcpSink { receiver: r2 }, m.clone()),
370 ),
371 (
372 TcpSendProtocol::new(TcpDrain { sender: s2 }, m.clone()),
373 TcpRecvProtocol::new(TcpSink { receiver: r1 }, m),
374 ),
375 ]
376 }
377
378 #[async_trait]
379 impl UnreliableDrain for TcpDrain {
380 type CustomErr = ();
381 type DataFormat = BytesMut;
382
383 async fn send(
384 &mut self,
385 data: Self::DataFormat,
386 ) -> Result<(), ProtocolError<Self::CustomErr>> {
387 self.sender
388 .send(data)
389 .await
390 .map_err(|_| ProtocolError::Custom(()))
391 }
392 }
393
394 #[async_trait]
395 impl UnreliableSink for TcpSink {
396 type CustomErr = ();
397 type DataFormat = BytesMut;
398
399 async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError<Self::CustomErr>> {
400 self.receiver
401 .recv()
402 .await
403 .map_err(|_| ProtocolError::Custom(()))
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use crate::{
411 InitProtocol, ProtocolEvent, RecvProtocol, SendProtocol,
412 error::ProtocolError,
413 frame::OTFrame,
414 metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason},
415 tcp::test_utils::*,
416 types::{Pid, Promises, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, Sid},
417 };
418 use bytes::{Bytes, BytesMut};
419 use std::{sync::Arc, time::Duration};
420
421 #[tokio::test]
422 async fn handshake_all_good() {
423 let [mut p1, mut p2] = tcp_bound(10, None);
424 let r1 = tokio::spawn(async move { p1.initialize(true, Pid::fake(2), 1337).await });
425 let r2 = tokio::spawn(async move { p2.initialize(false, Pid::fake(3), 42).await });
426 let (r1, r2) = tokio::join!(r1, r2);
427 assert_eq!(r1.unwrap(), Ok((Pid::fake(3), STREAM_ID_OFFSET1, 42)));
428 assert_eq!(r2.unwrap(), Ok((Pid::fake(2), STREAM_ID_OFFSET2, 1337)));
429 }
430
431 #[tokio::test]
432 async fn open_stream() {
433 let [p1, p2] = tcp_bound(10, None);
434 let (mut s, mut r) = (p1.0, p2.1);
435 let event = ProtocolEvent::OpenStream {
436 sid: Sid::new(10),
437 prio: 0u8,
438 promises: Promises::ORDERED,
439 guaranteed_bandwidth: 1_000_000,
440 };
441 s.send(event.clone()).await.unwrap();
442 let e = r.recv().await.unwrap();
443 assert_eq!(event, e);
444 }
445
446 #[tokio::test]
447 async fn send_short_msg() {
448 let [p1, p2] = tcp_bound(10, None);
449 let (mut s, mut r) = (p1.0, p2.1);
450 let event = ProtocolEvent::OpenStream {
451 sid: Sid::new(10),
452 prio: 3u8,
453 promises: Promises::ORDERED,
454 guaranteed_bandwidth: 1_000_000,
455 };
456 s.send(event).await.unwrap();
457 let _ = r.recv().await.unwrap();
458 let event = ProtocolEvent::Message {
459 sid: Sid::new(10),
460 data: Bytes::from(&[188u8; 600][..]),
461 };
462 s.send(event.clone()).await.unwrap();
463 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
464 let e = r.recv().await.unwrap();
465 assert_eq!(event, e);
466 let event = ProtocolEvent::Message {
468 sid: Sid::new(10),
469 data: Bytes::from(&[7u8; 30][..]),
470 };
471 s.send(event.clone()).await.unwrap();
472 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
473 let e = r.recv().await.unwrap();
474 assert_eq!(event, e)
475 }
476
477 #[tokio::test]
478 async fn send_long_msg() {
479 let mut metrics =
480 ProtocolMetricCache::new("long_tcp", Arc::new(ProtocolMetrics::new().unwrap()));
481 let sid = Sid::new(1);
482 let [p1, p2] = tcp_bound(10000, Some(metrics.clone()));
483 let (mut s, mut r) = (p1.0, p2.1);
484 let event = ProtocolEvent::OpenStream {
485 sid,
486 prio: 5u8,
487 promises: Promises::COMPRESSED,
488 guaranteed_bandwidth: 1_000_000,
489 };
490 s.send(event).await.unwrap();
491 let _ = r.recv().await.unwrap();
492 let event = ProtocolEvent::Message {
493 sid,
494 data: Bytes::from(&[99u8; 500_000][..]),
495 };
496 s.send(event.clone()).await.unwrap();
497 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
498 let e = r.recv().await.unwrap();
499 assert_eq!(event, e);
500 metrics.assert_msg(sid, 1, RemoveReason::Finished);
501 metrics.assert_msg_bytes(sid, 500_000, RemoveReason::Finished);
502 metrics.assert_data_frames(358);
503 metrics.assert_data_frames_bytes(500_000);
504 }
505
506 #[tokio::test]
507 async fn msg_finishes_after_close() {
508 let sid = Sid::new(1);
509 let [p1, p2] = tcp_bound(10000, None);
510 let (mut s, mut r) = (p1.0, p2.1);
511 let event = ProtocolEvent::OpenStream {
512 sid,
513 prio: 5u8,
514 promises: Promises::COMPRESSED,
515 guaranteed_bandwidth: 0,
516 };
517 s.send(event).await.unwrap();
518 let _ = r.recv().await.unwrap();
519 let event = ProtocolEvent::Message {
520 sid,
521 data: Bytes::from(&[99u8; 500_000][..]),
522 };
523 s.send(event).await.unwrap();
524 let event = ProtocolEvent::CloseStream { sid };
525 s.send(event).await.unwrap();
526 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
528 let e = r.recv().await.unwrap();
529 assert!(matches!(e, ProtocolEvent::Message { .. }));
530 let e = r.recv().await.unwrap();
531 assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
532 }
533
534 #[tokio::test]
535 async fn msg_finishes_after_shutdown() {
536 let sid = Sid::new(1);
537 let [p1, p2] = tcp_bound(10000, None);
538 let (mut s, mut r) = (p1.0, p2.1);
539 let event = ProtocolEvent::OpenStream {
540 sid,
541 prio: 5u8,
542 promises: Promises::COMPRESSED,
543 guaranteed_bandwidth: 0,
544 };
545 s.send(event).await.unwrap();
546 let _ = r.recv().await.unwrap();
547 let event = ProtocolEvent::Message {
548 sid,
549 data: Bytes::from(&[99u8; 500_000][..]),
550 };
551 s.send(event).await.unwrap();
552 let event = ProtocolEvent::Shutdown {};
553 s.send(event).await.unwrap();
554 let event = ProtocolEvent::CloseStream { sid };
555 s.send(event).await.unwrap();
556 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
558 let e = r.recv().await.unwrap();
559 assert!(matches!(e, ProtocolEvent::Message { .. }));
560 let e = r.recv().await.unwrap();
561 assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
562 let e = r.recv().await.unwrap();
563 assert!(matches!(e, ProtocolEvent::Shutdown));
564 }
565
566 #[tokio::test]
567 async fn msg_finishes_after_drop() {
568 let sid = Sid::new(1);
569 let [p1, p2] = tcp_bound(10000, None);
570 let (mut s, mut r) = (p1.0, p2.1);
571 let event = ProtocolEvent::OpenStream {
572 sid,
573 prio: 5u8,
574 promises: Promises::COMPRESSED,
575 guaranteed_bandwidth: 0,
576 };
577 s.send(event).await.unwrap();
578 let event = ProtocolEvent::Message {
579 sid,
580 data: Bytes::from(&[99u8; 500_000][..]),
581 };
582 s.send(event).await.unwrap();
583 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
584 let event = ProtocolEvent::Message {
585 sid,
586 data: Bytes::from(&[100u8; 500_000][..]),
587 };
588 s.send(event).await.unwrap();
589 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
590 drop(s);
591 let e = r.recv().await.unwrap();
592 assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
593 let e = r.recv().await.unwrap();
594 assert!(matches!(e, ProtocolEvent::Message { .. }));
595 let e = r.recv().await.unwrap();
596 assert!(matches!(e, ProtocolEvent::Message { .. }));
597 }
598
599 #[tokio::test]
600 async fn header_and_data_in_seperate_msg() {
601 let sid = Sid::new(1);
602 let (s, r) = async_channel::bounded(10);
603 let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
604 let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
605
606 const DATA1: &[u8; 69] =
607 b"We need to make sure that its okay to send OPEN_STREAM and DATA_HEAD ";
608 const DATA2: &[u8; 95] = b"in one chunk and (DATA and CLOSE_STREAM) in the second chunk. and then keep the connection open";
609 let mut bytes = BytesMut::with_capacity(1500);
610 OTFrame::OpenStream {
611 sid,
612 prio: 5u8,
613 promises: Promises::COMPRESSED,
614 guaranteed_bandwidth: 1_000_000,
615 }
616 .write_bytes(&mut bytes);
617 OTFrame::DataHeader {
618 mid: 99,
619 sid,
620 length: (DATA1.len() + DATA2.len()) as u64,
621 }
622 .write_bytes(&mut bytes);
623 s.send(bytes.split()).await.unwrap();
624
625 OTFrame::Data {
626 mid: 99,
627 data: Bytes::from(&DATA1[..]),
628 }
629 .write_bytes(&mut bytes);
630 OTFrame::Data {
631 mid: 99,
632 data: Bytes::from(&DATA2[..]),
633 }
634 .write_bytes(&mut bytes);
635 OTFrame::CloseStream { sid }.write_bytes(&mut bytes);
636 s.send(bytes.split()).await.unwrap();
637
638 let e = r.recv().await.unwrap();
639 assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
640
641 let e = r.recv().await.unwrap();
642 assert!(matches!(e, ProtocolEvent::Message { .. }));
643
644 let e = r.recv().await.unwrap();
645 assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
646 }
647
648 #[tokio::test]
649 async fn drop_sink_while_recv() {
650 let sid = Sid::new(1);
651 let (s, r) = async_channel::bounded(10);
652 let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
653 let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
654
655 let mut bytes = BytesMut::with_capacity(1500);
656 OTFrame::OpenStream {
657 sid,
658 prio: 5u8,
659 promises: Promises::COMPRESSED,
660 guaranteed_bandwidth: 1_000_000,
661 }
662 .write_bytes(&mut bytes);
663 s.send(bytes.split()).await.unwrap();
664 let e = r.recv().await.unwrap();
665 assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
666
667 let e = tokio::spawn(async move { r.recv().await });
668 drop(s);
669
670 let e = e.await.unwrap();
671 assert_eq!(e, Err(ProtocolError::Custom(())));
672 }
673
674 #[tokio::test]
675 #[should_panic]
676 async fn send_on_stream_from_remote_without_notify() {
677 let [mut p1, mut p2] = tcp_bound(10, None);
680 let event = ProtocolEvent::OpenStream {
681 sid: Sid::new(10),
682 prio: 3u8,
683 promises: Promises::ORDERED,
684 guaranteed_bandwidth: 1_000_000,
685 };
686 p1.0.send(event).await.unwrap();
687 let _ = p2.1.recv().await.unwrap();
688 let event = ProtocolEvent::Message {
689 sid: Sid::new(10),
690 data: Bytes::from(&[188u8; 600][..]),
691 };
692 p2.0.send(event.clone()).await.unwrap();
693 p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
694 let e = p1.1.recv().await.unwrap();
695 assert_eq!(event, e);
696 }
697
698 #[tokio::test]
699 async fn send_on_stream_from_remote() {
700 let [mut p1, mut p2] = tcp_bound(10, None);
703 let event = ProtocolEvent::OpenStream {
704 sid: Sid::new(10),
705 prio: 3u8,
706 promises: Promises::ORDERED,
707 guaranteed_bandwidth: 1_000_000,
708 };
709 p1.0.send(event).await.unwrap();
710 let e = p2.1.recv().await.unwrap();
711 p2.0.notify_from_recv(e);
712 let event = ProtocolEvent::Message {
713 sid: Sid::new(10),
714 data: Bytes::from(&[188u8; 600][..]),
715 };
716 p2.0.send(event.clone()).await.unwrap();
717 p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
718 let e = p1.1.recv().await.unwrap();
719 assert_eq!(event, e);
720 }
721}