1use crate::{
2 RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
3 error::ProtocolError,
4 event::ProtocolEvent,
5 frame::{ITFrame, InitFrame, OTFrame},
6 handshake::{ReliableDrain, ReliableSink},
7 message::{ALLOC_BLOCK, ITMessage},
8 metrics::{ProtocolMetricCache, RemoveReason},
9 prio::PrioManager,
10 types::{Bandwidth, Mid, Promises, Sid},
11};
12use async_trait::async_trait;
13use bytes::BytesMut;
14use hashbrown::HashMap;
15use std::time::{Duration, Instant};
16use tracing::info;
17#[cfg(feature = "trace_pedantic")]
18use tracing::trace;
19
20#[derive(Debug)]
24pub struct TcpSendProtocol<D>
25where
26 D: UnreliableDrain<DataFormat = BytesMut>,
27{
28 buffer: BytesMut,
29 store: PrioManager,
30 next_mid: Mid,
31 closing_streams: Vec<Sid>,
32 notify_closing_streams: Vec<Sid>,
33 pending_shutdown: bool,
34 drain: D,
35 #[expect(dead_code)]
36 last: Instant,
37 metrics: ProtocolMetricCache,
38}
39
40#[derive(Debug)]
44pub struct TcpRecvProtocol<S>
45where
46 S: UnreliableSink<DataFormat = BytesMut>,
47{
48 buffer: BytesMut,
49 itmsg_allocator: BytesMut,
50 incoming: HashMap<Mid, ITMessage>,
51 sink: S,
52 metrics: ProtocolMetricCache,
53}
54
55impl<D> TcpSendProtocol<D>
56where
57 D: UnreliableDrain<DataFormat = BytesMut>,
58{
59 pub fn new(drain: D, metrics: ProtocolMetricCache) -> Self {
60 Self {
61 buffer: BytesMut::new(),
62 store: PrioManager::new(metrics.clone()),
63 next_mid: 0u64,
64 closing_streams: vec![],
65 notify_closing_streams: vec![],
66 pending_shutdown: false,
67 drain,
68 last: Instant::now(),
69 metrics,
70 }
71 }
72
73 pub fn supported_promises() -> Promises {
76 Promises::ORDERED
77 | Promises::CONSISTENCY
78 | Promises::GUARANTEED_DELIVERY
79 | Promises::COMPRESSED
80 }
81}
82
83impl<S> TcpRecvProtocol<S>
84where
85 S: UnreliableSink<DataFormat = BytesMut>,
86{
87 pub fn new(sink: S, metrics: ProtocolMetricCache) -> Self {
88 Self {
89 buffer: BytesMut::new(),
90 itmsg_allocator: BytesMut::with_capacity(ALLOC_BLOCK),
91 incoming: HashMap::new(),
92 sink,
93 metrics,
94 }
95 }
96}
97
98#[async_trait]
99impl<D> SendProtocol for TcpSendProtocol<D>
100where
101 D: UnreliableDrain<DataFormat = BytesMut>,
102{
103 type CustomErr = D::CustomErr;
104
105 fn notify_from_recv(&mut self, event: ProtocolEvent) {
106 match event {
107 ProtocolEvent::OpenStream {
108 sid,
109 prio,
110 promises,
111 guaranteed_bandwidth,
112 } => {
113 self.store
114 .open_stream(sid, prio, promises, guaranteed_bandwidth);
115 },
116 ProtocolEvent::CloseStream { sid } => {
117 if !self.store.try_close_stream(sid) {
118 #[cfg(feature = "trace_pedantic")]
119 trace!(?sid, "hold back notify close stream");
120 self.notify_closing_streams.push(sid);
121 }
122 },
123 _ => {},
124 }
125 }
126
127 async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError<Self::CustomErr>> {
128 #[cfg(feature = "trace_pedantic")]
129 trace!(?event, "send");
130 match event {
131 ProtocolEvent::OpenStream {
132 sid,
133 prio,
134 promises,
135 guaranteed_bandwidth,
136 } => {
137 self.store
138 .open_stream(sid, prio, promises, guaranteed_bandwidth);
139 event.to_frame().write_bytes(&mut self.buffer);
140 self.drain.send(self.buffer.split()).await?;
141 },
142 ProtocolEvent::CloseStream { sid } => {
143 if self.store.try_close_stream(sid) {
144 event.to_frame().write_bytes(&mut self.buffer);
145 self.drain.send(self.buffer.split()).await?;
146 } else {
147 #[cfg(feature = "trace_pedantic")]
148 trace!(?sid, "hold back close stream");
149 self.closing_streams.push(sid);
150 }
151 },
152 ProtocolEvent::Shutdown => {
153 if self.store.is_empty() {
154 event.to_frame().write_bytes(&mut self.buffer);
155 self.drain.send(self.buffer.split()).await?;
156 } else {
157 #[cfg(feature = "trace_pedantic")]
158 trace!("hold back shutdown");
159 self.pending_shutdown = true;
160 }
161 },
162 ProtocolEvent::Message { data, sid } => {
163 self.metrics.smsg_ib(sid, data.len() as u64);
164 self.store.add(data, self.next_mid, sid);
165 self.next_mid += 1;
166 },
167 }
168 Ok(())
169 }
170
171 async fn flush(
172 &mut self,
173 bandwidth: Bandwidth,
174 dt: Duration,
175 ) -> Result<Bandwidth, ProtocolError<Self::CustomErr>> {
176 let (frames, total_bytes) = self.store.grab(bandwidth, dt);
177 self.buffer.reserve(total_bytes as usize);
178 let mut data_frames = 0;
179 let mut data_bandwidth = 0;
180 for (_, frame) in frames {
181 if let OTFrame::Data { mid: _, data } = &frame {
182 data_bandwidth += data.len();
183 data_frames += 1;
184 }
185 frame.write_bytes(&mut self.buffer);
186 }
187 self.drain.send(self.buffer.split()).await?;
188 self.metrics
189 .sdata_frames_b(data_frames, data_bandwidth as u64);
190
191 let mut finished_streams = vec![];
192 for (i, &sid) in self.closing_streams.iter().enumerate() {
193 if self.store.try_close_stream(sid) {
194 #[cfg(feature = "trace_pedantic")]
195 trace!(?sid, "close stream, as it's now empty");
196 OTFrame::CloseStream { sid }.write_bytes(&mut self.buffer);
197 self.drain.send(self.buffer.split()).await?;
198 finished_streams.push(i);
199 }
200 }
201 for i in finished_streams.iter().rev() {
202 self.closing_streams.remove(*i);
203 }
204
205 let mut finished_streams = vec![];
206 for (i, sid) in self.notify_closing_streams.iter().enumerate() {
207 if self.store.try_close_stream(*sid) {
208 #[cfg(feature = "trace_pedantic")]
209 trace!(?sid, "close stream, as it's now empty");
210 finished_streams.push(i);
211 }
212 }
213 for i in finished_streams.iter().rev() {
214 self.notify_closing_streams.remove(*i);
215 }
216
217 if self.pending_shutdown && self.store.is_empty() {
218 #[cfg(feature = "trace_pedantic")]
219 trace!("shutdown, as it's now empty");
220 OTFrame::Shutdown {}.write_bytes(&mut self.buffer);
221 self.drain.send(self.buffer.split()).await?;
222 self.pending_shutdown = false;
223 }
224 Ok(data_bandwidth as u64)
225 }
226}
227
228#[async_trait]
229impl<S> RecvProtocol for TcpRecvProtocol<S>
230where
231 S: UnreliableSink<DataFormat = BytesMut>,
232{
233 type CustomErr = S::CustomErr;
234
235 async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError<Self::CustomErr>> {
236 'outer: loop {
237 loop {
238 match ITFrame::read_frame(&mut self.buffer) {
239 Ok(Some(frame)) => {
240 #[cfg(feature = "trace_pedantic")]
241 trace!(?frame, "recv");
242 match frame {
243 ITFrame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown),
244 ITFrame::OpenStream {
245 sid,
246 prio,
247 promises,
248 guaranteed_bandwidth,
249 } => {
250 break 'outer Ok(ProtocolEvent::OpenStream {
251 sid,
252 prio: prio.min(crate::types::HIGHEST_PRIO),
253 promises,
254 guaranteed_bandwidth,
255 });
256 },
257 ITFrame::CloseStream { sid } => {
258 break 'outer Ok(ProtocolEvent::CloseStream { sid });
259 },
260 ITFrame::DataHeader { sid, mid, length } => {
261 let m = ITMessage::new(sid, length, &mut self.itmsg_allocator);
262 self.metrics.rmsg_ib(sid, length);
263 self.incoming.insert(mid, m);
264 },
265 ITFrame::Data { mid, data } => {
266 self.metrics.rdata_frames_b(data.len() as u64);
267 let m = match self.incoming.get_mut(&mid) {
268 Some(m) => m,
269 None => {
270 info!(
271 ?mid,
272 "protocol violation by remote side: send Data before \
273 Header"
274 );
275 break 'outer Err(ProtocolError::Violated);
276 },
277 };
278 m.data.extend_from_slice(&data);
279 if m.data.len() == m.length as usize {
280 let m = self.incoming.remove(&mid).unwrap();
282 self.metrics.rmsg_ob(
283 m.sid,
284 RemoveReason::Finished,
285 m.data.len() as u64,
286 );
287 break 'outer Ok(ProtocolEvent::Message {
288 sid: m.sid,
289 data: m.data.freeze(),
290 });
291 }
292 },
293 };
294 },
295 Ok(None) => break, Err(()) => return Err(ProtocolError::Violated),
297 }
298 }
299 let chunk = self.sink.recv().await?;
300 if self.buffer.is_empty() {
301 self.buffer = chunk;
302 } else {
303 self.buffer.extend_from_slice(&chunk);
304 }
305 }
306 }
307}
308
309#[async_trait]
310impl<D> ReliableDrain for TcpSendProtocol<D>
311where
312 D: UnreliableDrain<DataFormat = BytesMut>,
313{
314 type CustomErr = D::CustomErr;
315
316 async fn send(&mut self, frame: InitFrame) -> Result<(), ProtocolError<Self::CustomErr>> {
317 let mut buffer = BytesMut::with_capacity(500);
318 frame.write_bytes(&mut buffer);
319 self.drain.send(buffer).await
320 }
321}
322
323#[async_trait]
324impl<S> ReliableSink for TcpRecvProtocol<S>
325where
326 S: UnreliableSink<DataFormat = BytesMut>,
327{
328 type CustomErr = S::CustomErr;
329
330 async fn recv(&mut self) -> Result<InitFrame, ProtocolError<Self::CustomErr>> {
331 while self.buffer.len() < 100 {
332 let chunk = self.sink.recv().await?;
333 self.buffer.extend_from_slice(&chunk);
334 if let Some(frame) = InitFrame::read_frame(&mut self.buffer) {
335 return Ok(frame);
336 }
337 }
338 Err(ProtocolError::Violated)
339 }
340}
341
342#[cfg(test)]
343mod test_utils {
344 use super::*;
346 use crate::metrics::{ProtocolMetricCache, ProtocolMetrics};
347 use async_channel::*;
348 use std::sync::Arc;
349
350 pub struct TcpDrain {
351 pub sender: Sender<BytesMut>,
352 }
353
354 pub struct TcpSink {
355 pub receiver: Receiver<BytesMut>,
356 }
357
358 pub fn tcp_bound(
360 cap: usize,
361 metrics: Option<ProtocolMetricCache>,
362 ) -> [(TcpSendProtocol<TcpDrain>, TcpRecvProtocol<TcpSink>); 2] {
363 let (s1, r1) = bounded(cap);
364 let (s2, r2) = bounded(cap);
365 let m = metrics.unwrap_or_else(|| {
366 ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()))
367 });
368 [
369 (
370 TcpSendProtocol::new(TcpDrain { sender: s1 }, m.clone()),
371 TcpRecvProtocol::new(TcpSink { receiver: r2 }, m.clone()),
372 ),
373 (
374 TcpSendProtocol::new(TcpDrain { sender: s2 }, m.clone()),
375 TcpRecvProtocol::new(TcpSink { receiver: r1 }, m),
376 ),
377 ]
378 }
379
380 #[async_trait]
381 impl UnreliableDrain for TcpDrain {
382 type CustomErr = ();
383 type DataFormat = BytesMut;
384
385 async fn send(
386 &mut self,
387 data: Self::DataFormat,
388 ) -> Result<(), ProtocolError<Self::CustomErr>> {
389 self.sender
390 .send(data)
391 .await
392 .map_err(|_| ProtocolError::Custom(()))
393 }
394 }
395
396 #[async_trait]
397 impl UnreliableSink for TcpSink {
398 type CustomErr = ();
399 type DataFormat = BytesMut;
400
401 async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError<Self::CustomErr>> {
402 self.receiver
403 .recv()
404 .await
405 .map_err(|_| ProtocolError::Custom(()))
406 }
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use crate::{
413 InitProtocol, ProtocolEvent, RecvProtocol, SendProtocol,
414 error::ProtocolError,
415 frame::OTFrame,
416 metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason},
417 tcp::test_utils::*,
418 types::{Pid, Promises, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, Sid},
419 };
420 use bytes::{Bytes, BytesMut};
421 use std::{sync::Arc, time::Duration};
422
423 #[tokio::test]
424 async fn handshake_all_good() {
425 let [mut p1, mut p2] = tcp_bound(10, None);
426 let r1 = tokio::spawn(async move { p1.initialize(true, Pid::fake(2), 1337).await });
427 let r2 = tokio::spawn(async move { p2.initialize(false, Pid::fake(3), 42).await });
428 let (r1, r2) = tokio::join!(r1, r2);
429 assert_eq!(r1.unwrap(), Ok((Pid::fake(3), STREAM_ID_OFFSET1, 42)));
430 assert_eq!(r2.unwrap(), Ok((Pid::fake(2), STREAM_ID_OFFSET2, 1337)));
431 }
432
433 #[tokio::test]
434 async fn open_stream() {
435 let [p1, p2] = tcp_bound(10, None);
436 let (mut s, mut r) = (p1.0, p2.1);
437 let event = ProtocolEvent::OpenStream {
438 sid: Sid::new(10),
439 prio: 0u8,
440 promises: Promises::ORDERED,
441 guaranteed_bandwidth: 1_000_000,
442 };
443 s.send(event.clone()).await.unwrap();
444 let e = r.recv().await.unwrap();
445 assert_eq!(event, e);
446 }
447
448 #[tokio::test]
449 async fn send_short_msg() {
450 let [p1, p2] = tcp_bound(10, None);
451 let (mut s, mut r) = (p1.0, p2.1);
452 let event = ProtocolEvent::OpenStream {
453 sid: Sid::new(10),
454 prio: 3u8,
455 promises: Promises::ORDERED,
456 guaranteed_bandwidth: 1_000_000,
457 };
458 s.send(event).await.unwrap();
459 let _ = r.recv().await.unwrap();
460 let event = ProtocolEvent::Message {
461 sid: Sid::new(10),
462 data: Bytes::from(&[188u8; 600][..]),
463 };
464 s.send(event.clone()).await.unwrap();
465 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
466 let e = r.recv().await.unwrap();
467 assert_eq!(event, e);
468 let event = ProtocolEvent::Message {
470 sid: Sid::new(10),
471 data: Bytes::from(&[7u8; 30][..]),
472 };
473 s.send(event.clone()).await.unwrap();
474 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
475 let e = r.recv().await.unwrap();
476 assert_eq!(event, e)
477 }
478
479 #[tokio::test]
480 async fn send_long_msg() {
481 let mut metrics =
482 ProtocolMetricCache::new("long_tcp", Arc::new(ProtocolMetrics::new().unwrap()));
483 let sid = Sid::new(1);
484 let [p1, p2] = tcp_bound(10000, Some(metrics.clone()));
485 let (mut s, mut r) = (p1.0, p2.1);
486 let event = ProtocolEvent::OpenStream {
487 sid,
488 prio: 5u8,
489 promises: Promises::COMPRESSED,
490 guaranteed_bandwidth: 1_000_000,
491 };
492 s.send(event).await.unwrap();
493 let _ = r.recv().await.unwrap();
494 let event = ProtocolEvent::Message {
495 sid,
496 data: Bytes::from(&[99u8; 500_000][..]),
497 };
498 s.send(event.clone()).await.unwrap();
499 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
500 let e = r.recv().await.unwrap();
501 assert_eq!(event, e);
502 metrics.assert_msg(sid, 1, RemoveReason::Finished);
503 metrics.assert_msg_bytes(sid, 500_000, RemoveReason::Finished);
504 metrics.assert_data_frames(358);
505 metrics.assert_data_frames_bytes(500_000);
506 }
507
508 #[tokio::test]
509 async fn msg_finishes_after_close() {
510 let sid = Sid::new(1);
511 let [p1, p2] = tcp_bound(10000, None);
512 let (mut s, mut r) = (p1.0, p2.1);
513 let event = ProtocolEvent::OpenStream {
514 sid,
515 prio: 5u8,
516 promises: Promises::COMPRESSED,
517 guaranteed_bandwidth: 0,
518 };
519 s.send(event).await.unwrap();
520 let _ = r.recv().await.unwrap();
521 let event = ProtocolEvent::Message {
522 sid,
523 data: Bytes::from(&[99u8; 500_000][..]),
524 };
525 s.send(event).await.unwrap();
526 let event = ProtocolEvent::CloseStream { sid };
527 s.send(event).await.unwrap();
528 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
530 let e = r.recv().await.unwrap();
531 assert!(matches!(e, ProtocolEvent::Message { .. }));
532 let e = r.recv().await.unwrap();
533 assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
534 }
535
536 #[tokio::test]
537 async fn msg_finishes_after_shutdown() {
538 let sid = Sid::new(1);
539 let [p1, p2] = tcp_bound(10000, None);
540 let (mut s, mut r) = (p1.0, p2.1);
541 let event = ProtocolEvent::OpenStream {
542 sid,
543 prio: 5u8,
544 promises: Promises::COMPRESSED,
545 guaranteed_bandwidth: 0,
546 };
547 s.send(event).await.unwrap();
548 let _ = r.recv().await.unwrap();
549 let event = ProtocolEvent::Message {
550 sid,
551 data: Bytes::from(&[99u8; 500_000][..]),
552 };
553 s.send(event).await.unwrap();
554 let event = ProtocolEvent::Shutdown {};
555 s.send(event).await.unwrap();
556 let event = ProtocolEvent::CloseStream { sid };
557 s.send(event).await.unwrap();
558 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
560 let e = r.recv().await.unwrap();
561 assert!(matches!(e, ProtocolEvent::Message { .. }));
562 let e = r.recv().await.unwrap();
563 assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
564 let e = r.recv().await.unwrap();
565 assert!(matches!(e, ProtocolEvent::Shutdown));
566 }
567
568 #[tokio::test]
569 async fn msg_finishes_after_drop() {
570 let sid = Sid::new(1);
571 let [p1, p2] = tcp_bound(10000, None);
572 let (mut s, mut r) = (p1.0, p2.1);
573 let event = ProtocolEvent::OpenStream {
574 sid,
575 prio: 5u8,
576 promises: Promises::COMPRESSED,
577 guaranteed_bandwidth: 0,
578 };
579 s.send(event).await.unwrap();
580 let event = ProtocolEvent::Message {
581 sid,
582 data: Bytes::from(&[99u8; 500_000][..]),
583 };
584 s.send(event).await.unwrap();
585 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
586 let event = ProtocolEvent::Message {
587 sid,
588 data: Bytes::from(&[100u8; 500_000][..]),
589 };
590 s.send(event).await.unwrap();
591 s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
592 drop(s);
593 let e = r.recv().await.unwrap();
594 assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
595 let e = r.recv().await.unwrap();
596 assert!(matches!(e, ProtocolEvent::Message { .. }));
597 let e = r.recv().await.unwrap();
598 assert!(matches!(e, ProtocolEvent::Message { .. }));
599 }
600
601 #[tokio::test]
602 async fn header_and_data_in_seperate_msg() {
603 let sid = Sid::new(1);
604 let (s, r) = async_channel::bounded(10);
605 let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
606 let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
607
608 const DATA1: &[u8; 69] =
609 b"We need to make sure that its okay to send OPEN_STREAM and DATA_HEAD ";
610 const DATA2: &[u8; 95] = b"in one chunk and (DATA and CLOSE_STREAM) in the second chunk. and then keep the connection open";
611 let mut bytes = BytesMut::with_capacity(1500);
612 OTFrame::OpenStream {
613 sid,
614 prio: 5u8,
615 promises: Promises::COMPRESSED,
616 guaranteed_bandwidth: 1_000_000,
617 }
618 .write_bytes(&mut bytes);
619 OTFrame::DataHeader {
620 mid: 99,
621 sid,
622 length: (DATA1.len() + DATA2.len()) as u64,
623 }
624 .write_bytes(&mut bytes);
625 s.send(bytes.split()).await.unwrap();
626
627 OTFrame::Data {
628 mid: 99,
629 data: Bytes::from(&DATA1[..]),
630 }
631 .write_bytes(&mut bytes);
632 OTFrame::Data {
633 mid: 99,
634 data: Bytes::from(&DATA2[..]),
635 }
636 .write_bytes(&mut bytes);
637 OTFrame::CloseStream { sid }.write_bytes(&mut bytes);
638 s.send(bytes.split()).await.unwrap();
639
640 let e = r.recv().await.unwrap();
641 assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
642
643 let e = r.recv().await.unwrap();
644 assert!(matches!(e, ProtocolEvent::Message { .. }));
645
646 let e = r.recv().await.unwrap();
647 assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
648 }
649
650 #[tokio::test]
651 async fn drop_sink_while_recv() {
652 let sid = Sid::new(1);
653 let (s, r) = async_channel::bounded(10);
654 let m = ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()));
655 let mut r = super::TcpRecvProtocol::new(TcpSink { receiver: r }, m.clone());
656
657 let mut bytes = BytesMut::with_capacity(1500);
658 OTFrame::OpenStream {
659 sid,
660 prio: 5u8,
661 promises: Promises::COMPRESSED,
662 guaranteed_bandwidth: 1_000_000,
663 }
664 .write_bytes(&mut bytes);
665 s.send(bytes.split()).await.unwrap();
666 let e = r.recv().await.unwrap();
667 assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
668
669 let e = tokio::spawn(async move { r.recv().await });
670 drop(s);
671
672 let e = e.await.unwrap();
673 assert_eq!(e, Err(ProtocolError::Custom(())));
674 }
675
676 #[tokio::test]
677 #[should_panic]
678 async fn send_on_stream_from_remote_without_notify() {
679 let [mut p1, mut p2] = tcp_bound(10, None);
682 let event = ProtocolEvent::OpenStream {
683 sid: Sid::new(10),
684 prio: 3u8,
685 promises: Promises::ORDERED,
686 guaranteed_bandwidth: 1_000_000,
687 };
688 p1.0.send(event).await.unwrap();
689 let _ = p2.1.recv().await.unwrap();
690 let event = ProtocolEvent::Message {
691 sid: Sid::new(10),
692 data: Bytes::from(&[188u8; 600][..]),
693 };
694 p2.0.send(event.clone()).await.unwrap();
695 p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
696 let e = p1.1.recv().await.unwrap();
697 assert_eq!(event, e);
698 }
699
700 #[tokio::test]
701 async fn send_on_stream_from_remote() {
702 let [mut p1, mut p2] = tcp_bound(10, None);
705 let event = ProtocolEvent::OpenStream {
706 sid: Sid::new(10),
707 prio: 3u8,
708 promises: Promises::ORDERED,
709 guaranteed_bandwidth: 1_000_000,
710 };
711 p1.0.send(event).await.unwrap();
712 let e = p2.1.recv().await.unwrap();
713 p2.0.notify_from_recv(e);
714 let event = ProtocolEvent::Message {
715 sid: Sid::new(10),
716 data: Bytes::from(&[188u8; 600][..]),
717 };
718 p2.0.send(event.clone()).await.unwrap();
719 p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
720 let e = p1.1.recv().await.unwrap();
721 assert_eq!(event, e);
722 }
723}