Struct veloren_network::api::Stream
source · pub struct Stream {
local_pid: Pid,
remote_pid: Pid,
sid: Sid,
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
send_closed: Arc<AtomicBool>,
a2b_msg_s: Sender<(Sid, Bytes)>,
b2a_msg_recv_r: Option<Receiver<Bytes>>,
a2b_close_stream_s: Option<UnboundedSender<Sid>>,
}
Expand description
Streams
represents a channel to send n
messages with a certain priority
and Promises
. messages need always to be send between 2 Streams
.
Streams
are generated by the Participant
.
Look at the open
and opened
method of Participant
on how to
generate Streams
Unlike Network
and Participant
, Streams
don’t implement interior
mutability, as multiple threads don’t need access to the same Stream
.
Fields§
§local_pid: Pid
§remote_pid: Pid
§sid: Sid
§prio: Prio
§promises: Promises
§guaranteed_bandwidth: Bandwidth
§send_closed: Arc<AtomicBool>
§a2b_msg_s: Sender<(Sid, Bytes)>
§b2a_msg_recv_r: Option<Receiver<Bytes>>
§a2b_close_stream_s: Option<UnboundedSender<Sid>>
Implementations§
source§impl Stream
impl Stream
pub(crate) fn new( local_pid: Pid, remote_pid: Pid, sid: Sid, prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, send_closed: Arc<AtomicBool>, a2b_msg_s: Sender<(Sid, Bytes)>, b2a_msg_recv_r: Receiver<Bytes>, a2b_close_stream_s: UnboundedSender<Sid>, ) -> Self
sourcepub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError>
pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError>
use to send a arbitrary message to the remote side, by having the remote
side also opened a Stream
linked to this. the message will be
Serialized
, which actually is quite slow compared to most other
calculations done. A faster method send_raw
exists, when extra
speed is needed. The other side needs to use the respective recv
function and know the type send.
send
is an exception to the async
messages, as it’s probably called
quite often so it doesn’t wait for execution. Which also means, that
no feedback is provided. It’s to assume that the Message got send
correctly. If a error occurred, the next call will return an Error.
If the Participant
disconnected it will also be unable to be used
any more. A StreamError
will be returned in the error case, e.g.
when the Stream
got closed already.
Note when a Stream
is dropped locally, it will still send all
messages, though the drop
will return immediately, however, when a
Participant
gets gracefully shut down, all remaining messages
will be send. If the Stream
is dropped from remote side no further
messages are send, because the remote side has no way of listening
to them either way. If the last channel is destroyed (e.g. by losing
the internet connection or non-graceful shutdown, pending messages
are also dropped.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Send Message
stream_a.send("Hello World")?;
drop(network);
})
sourcepub fn send_raw(&self, message: &Message) -> Result<(), StreamError>
pub fn send_raw(&self, message: &Message) -> Result<(), StreamError>
This methods give the option to skip multiple calls of bincode
and
compress
, e.g. in case the same Message needs to send on
multiple Streams
to multiple Participants
. Other then that,
the same rules apply than for send
.
You need to create a Message via Message::serialize
.
§Example
use tokio::runtime::Runtime;
use bincode;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut participant_b = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
let mut stream_b = participant_b.opened().await?;
//Prepare Message and decode it
let msg = Message::serialize("Hello World", stream_a.params());
//Send same Message to multiple Streams
stream_a.send_raw(&msg);
stream_b.send_raw(&msg);
drop(network);
})
fn send_raw_move(&self, message: Message) -> Result<(), StreamError>
sourcepub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError>
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError>
use recv
to wait on a Message send from the remote side by their
Stream
. The Message needs to implement DeserializeOwned
and
thus, the resulting type must already be known by the receiving side.
If this is not know from the Application logic, one could use a Enum
and then handle the received message via a match
state.
A StreamError
will be returned in the error case, e.g. when the
Stream
got closed already.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Recv Message
println!("{}", stream_a.recv::<String>().await?);
drop(network);
})
sourcepub async fn recv_raw(&mut self) -> Result<Message, StreamError>
pub async fn recv_raw(&mut self) -> Result<Message, StreamError>
the equivalent like send_raw
but for recv
, no bincode
or
decompress
is executed for performance reasons.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Recv Message
let msg = stream_a.recv_raw().await?;
//Resend Message, without deserializing
stream_a.send_raw(&msg)?;
drop(network);
})
sourcepub fn try_recv<M: DeserializeOwned>(
&mut self,
) -> Result<Option<M>, StreamError>
pub fn try_recv<M: DeserializeOwned>( &mut self, ) -> Result<Option<M>, StreamError>
use try_recv
to check for a Message send from the remote side by their
Stream
. This function does not block and returns immediately. It’s
intended for use in non-async context only. Other then that, the
same rules apply than for recv
.
§Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
// Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
//Try Recv Message
println!("{:?}", stream_a.try_recv::<String>()?);
drop(network);
})
pub fn params(&self) -> StreamParams
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Stream
impl RefUnwindSafe for Stream
impl Send for Stream
impl Sync for Stream
impl !Unpin for Stream
impl UnwindSafe for Stream
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more