Struct veloren_network::Stream

source ·
pub struct Stream {
    local_pid: Pid,
    remote_pid: Pid,
    sid: Sid,
    prio: Prio,
    promises: Promises,
    guaranteed_bandwidth: Bandwidth,
    send_closed: Arc<AtomicBool>,
    a2b_msg_s: Sender<(Sid, Bytes)>,
    b2a_msg_recv_r: Option<Receiver<Bytes>>,
    a2b_close_stream_s: Option<UnboundedSender<Sid>>,
}
Expand description

Streams represents a channel to send n messages with a certain priority and Promises. messages need always to be send between 2 Streams.

Streams are generated by the Participant. Look at the open and opened method of Participant on how to generate Streams

Unlike Network and Participant, Streams don’t implement interior mutability, as multiple threads don’t need access to the same Stream.

Fields§

§local_pid: Pid§remote_pid: Pid§sid: Sid§prio: Prio§promises: Promises§guaranteed_bandwidth: Bandwidth§send_closed: Arc<AtomicBool>§a2b_msg_s: Sender<(Sid, Bytes)>§b2a_msg_recv_r: Option<Receiver<Bytes>>§a2b_close_stream_s: Option<UnboundedSender<Sid>>

Implementations§

source§

impl Stream

source

pub(crate) fn new( local_pid: Pid, remote_pid: Pid, sid: Sid, prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, send_closed: Arc<AtomicBool>, a2b_msg_s: Sender<(Sid, Bytes)>, b2a_msg_recv_r: Receiver<Bytes>, a2b_close_stream_s: UnboundedSender<Sid> ) -> Self

source

pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError>

use to send a arbitrary message to the remote side, by having the remote side also opened a Stream linked to this. the message will be Serialized, which actually is quite slow compared to most other calculations done. A faster method send_raw exists, when extra speed is needed. The other side needs to use the respective recv function and know the type send.

send is an exception to the async messages, as it’s probably called quite often so it doesn’t wait for execution. Which also means, that no feedback is provided. It’s to assume that the Message got send correctly. If a error occurred, the next call will return an Error. If the Participant disconnected it will also be unable to be used any more. A StreamError will be returned in the error case, e.g. when the Stream got closed already.

Note when a Stream is dropped locally, it will still send all messages, though the drop will return immediately, however, when a Participant gets gracefully shut down, all remaining messages will be send. If the Stream is dropped from remote side no further messages are send, because the remote side has no way of listening to them either way. If the last channel is destroyed (e.g. by losing the internet connection or non-graceful shutdown, pending messages are also dropped.

Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};

// Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
    let mut participant_a = network.connected().await?;
    let mut stream_a = participant_a.opened().await?;
    //Send  Message
    stream_a.send("Hello World")?;
    drop(network);
})
source

pub fn send_raw(&self, message: &Message) -> Result<(), StreamError>

This methods give the option to skip multiple calls of bincode and compress, e.g. in case the same Message needs to send on multiple Streams to multiple Participants. Other then that, the same rules apply than for send. You need to create a Message via Message::serialize.

Example
use tokio::runtime::Runtime;
use bincode;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};

let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
    let mut participant_a = network.connected().await?;
    let mut participant_b = network.connected().await?;
    let mut stream_a = participant_a.opened().await?;
    let mut stream_b = participant_b.opened().await?;

    //Prepare Message and decode it
    let msg = Message::serialize("Hello World", stream_a.params());
    //Send same Message to multiple Streams
    stream_a.send_raw(&msg);
    stream_b.send_raw(&msg);
    drop(network);
})
source

fn send_raw_move(&self, message: Message) -> Result<(), StreamError>

source

pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError>

use recv to wait on a Message send from the remote side by their Stream. The Message needs to implement DeserializeOwned and thus, the resulting type must already be known by the receiving side. If this is not know from the Application logic, one could use a Enum and then handle the received message via a match state.

A StreamError will be returned in the error case, e.g. when the Stream got closed already.

Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};

// Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
    let mut participant_a = network.connected().await?;
    let mut stream_a = participant_a.opened().await?;
    //Recv  Message
    println!("{}", stream_a.recv::<String>().await?);
    drop(network);
})
source

pub async fn recv_raw(&mut self) -> Result<Message, StreamError>

the equivalent like send_raw but for recv, no bincode or decompress is executed for performance reasons.

Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};

// Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
    let mut participant_a = network.connected().await?;
    let mut stream_a = participant_a.opened().await?;
    //Recv  Message
    let msg = stream_a.recv_raw().await?;
    //Resend Message, without deserializing
    stream_a.send_raw(&msg)?;
    drop(network);
})
source

pub fn try_recv<M: DeserializeOwned>( &mut self ) -> Result<Option<M>, StreamError>

use try_recv to check for a Message send from the remote side by their Stream. This function does not block and returns immediately. It’s intended for use in non-async context only. Other then that, the same rules apply than for recv.

Example
use tokio::runtime::Runtime;
use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};

// Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
    let mut participant_a = network.connected().await?;
    let mut stream_a = participant_a.opened().await?;
    //Try Recv  Message
    println!("{:?}", stream_a.try_recv::<String>()?);
    drop(network);
})
source

pub fn params(&self) -> StreamParams

Trait Implementations§

source§

impl Debug for Stream

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl Drop for Stream

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl !RefUnwindSafe for Stream

§

impl Send for Stream

§

impl Sync for Stream

§

impl !Unpin for Stream

§

impl !UnwindSafe for Stream

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more