pub struct Stream {
local_pid: Pid,
remote_pid: Pid,
sid: Sid,
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
send_closed: Arc<AtomicBool>,
a2b_msg_s: Sender<(Sid, Bytes)>,
b2a_msg_recv_r: Option<Receiver<Bytes>>,
a2b_close_stream_s: Option<UnboundedSender<Sid>>,
}Expand description
Streams represents a channel to send n messages with a certain priority
and Promises. messages need always to be send between 2 Streams.
Streams are generated by the Participant.
Look at the open and opened method of Participant on how to
generate Streams
Unlike Network and Participant, Streams don’t implement interior
mutability, as multiple threads don’t need access to the same Stream.
Fields§
§local_pid: Pid§remote_pid: Pid§sid: Sid§prio: Prio§promises: Promises§guaranteed_bandwidth: Bandwidth§send_closed: Arc<AtomicBool>§a2b_msg_s: Sender<(Sid, Bytes)>§b2a_msg_recv_r: Option<Receiver<Bytes>>§a2b_close_stream_s: Option<UnboundedSender<Sid>>Implementations§
Source§impl Stream
impl Stream
pub(crate) fn new( local_pid: Pid, remote_pid: Pid, sid: Sid, prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, send_closed: Arc<AtomicBool>, a2b_msg_s: Sender<(Sid, Bytes)>, b2a_msg_recv_r: Receiver<Bytes>, a2b_close_stream_s: UnboundedSender<Sid>, ) -> Self
Sourcepub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError>
pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError>
use to send a arbitrary message to the remote side, by having the remote
side also opened a Stream linked to this. the message will be
Serialized, which actually is quite slow compared to most other
calculations done. A faster method send_raw exists, when extra
speed is needed. The other side needs to use the respective recv
function and know the type send.
send is an exception to the async messages, as it’s probably called
quite often so it doesn’t wait for execution. Which also means, that
no feedback is provided. It’s to assume that the Message got send
correctly. If a error occurred, the next call will return an Error.
If the Participant disconnected it will also be unable to be used
any more. A StreamError will be returned in the error case, e.g.
when the Stream got closed already.
Note when a Stream is dropped locally, it will still send all
messages, though the drop will return immediately, however, when a
Participant gets gracefully shut down, all remaining messages
will be send. If the Stream is dropped from remote side no further
messages are send, because the remote side has no way of listening
to them either way. If the last channel is destroyed (e.g. by losing
the internet connection or non-graceful shutdown, pending messages
are also dropped.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Send Message
stream_a.send("Hello World")?;
drop(network);
})Sourcepub fn send_raw(&self, message: &Message) -> Result<(), StreamError>
pub fn send_raw(&self, message: &Message) -> Result<(), StreamError>
This methods give the option to skip multiple calls of bincode and
compress, e.g. in case the same Message needs to send on
multiple Streams to multiple Participants. Other then that,
the same rules apply than for send.
You need to create a Message via Message::serialize.
§Example
use tokio::runtime::Runtime;
use bincode;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut participant_b = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
let mut stream_b = participant_b.opened().await?;
//Prepare Message and decode it
let msg = Message::serialize("Hello World", stream_a.params());
//Send same Message to multiple Streams
stream_a.send_raw(&msg);
stream_b.send_raw(&msg);
drop(network);
})fn send_raw_move(&self, message: Message) -> Result<(), StreamError>
Sourcepub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError>
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError>
use recv to wait on a Message send from the remote side by their
Stream. The Message needs to implement DeserializeOwned and
thus, the resulting type must already be known by the receiving side.
If this is not know from the Application logic, one could use a Enum
and then handle the received message via a match state.
A StreamError will be returned in the error case, e.g. when the
Stream got closed already.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Recv Message
println!("{}", stream_a.recv::<String>().await?);
drop(network);
})Sourcepub async fn recv_raw(&mut self) -> Result<Message, StreamError>
pub async fn recv_raw(&mut self) -> Result<Message, StreamError>
the equivalent like send_raw but for recv, no bincode or
decompress is executed for performance reasons.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Recv Message
let msg = stream_a.recv_raw().await?;
//Resend Message, without deserializing
stream_a.send_raw(&msg)?;
drop(network);
})Sourcepub fn try_recv<M: DeserializeOwned>(
&mut self,
) -> Result<Option<M>, StreamError>
pub fn try_recv<M: DeserializeOwned>( &mut self, ) -> Result<Option<M>, StreamError>
use try_recv to check for a Message send from the remote side by their
Stream. This function does not block and returns immediately. It’s
intended for use in non-async context only. Other then that, the
same rules apply than for recv.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Try Recv Message
println!("{:?}", stream_a.try_recv::<String>()?);
drop(network);
})pub fn params(&self) -> StreamParams
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Stream
impl RefUnwindSafe for Stream
impl Send for Stream
impl Sync for Stream
impl !Unpin for Stream
impl UnwindSafe for Stream
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more