pub struct Network {
    local_pid: Pid,
    participant_disconnect_sender: Arc<Mutex<HashMap<Pid, Arc<Mutex<Option<UnboundedSender<(Pid, (Duration, Sender<Result<(), ParticipantError>>))>>>>>>>,
    listen_sender: UnboundedSender<(ListenAddr, Sender<Result<()>>)>,
    connect_sender: UnboundedSender<(ConnectAddr, Sender<Result<Participant, NetworkConnectError>>)>,
    connected_receiver: UnboundedReceiver<Participant>,
    shutdown_network_s: Option<Sender<Sender<()>>>,
}
Expand description

Use the Network to create connections to other Participants

The Network is the single source that handles all connections in your Application. You can pass it around multiple threads in an Arc as all commands have internal mutability.

The Network has methods to connect to other Participants actively via their ConnectAddr, or listen passively for connected Participants via ListenAddr.

Too guarantee a clean shutdown, the Runtime MUST NOT be dropped before the Network.

Examples

use tokio::runtime::Runtime;
use veloren_network::{Network, ConnectAddr, ListenAddr, Pid};

// Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async{
    network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
    let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
    drop(network);
})

Fields§

§local_pid: Pid§participant_disconnect_sender: Arc<Mutex<HashMap<Pid, Arc<Mutex<Option<UnboundedSender<(Pid, (Duration, Sender<Result<(), ParticipantError>>))>>>>>>>§listen_sender: UnboundedSender<(ListenAddr, Sender<Result<()>>)>§connect_sender: UnboundedSender<(ConnectAddr, Sender<Result<Participant, NetworkConnectError>>)>§connected_receiver: UnboundedReceiver<Participant>§shutdown_network_s: Option<Sender<Sender<()>>>

Implementations§

source§

impl Network

source

pub fn new(participant_id: Pid, runtime: &Runtime) -> Self

Generates a new Network to handle all connections in an Application

Arguments
  • participant_id - provide it by calling Pid::new(), usually you don’t want to reuse a Pid for 2 Networks
  • runtime - provide a Runtime, it’s used to internally spawn tasks. It is necessary to clean up in the non-async Drop. All network related components must be dropped before the runtime is stopped. dropping the runtime while a shutdown is still in progress leaves the network in a bad state which might cause a panic!
Result
  • Self - returns a Network which can be Send to multiple areas of your code, including multiple threads. This is the base strct of this crate.
Examples
use tokio::runtime::Runtime;
use veloren_network::{Network, Pid};

let runtime = Runtime::new().unwrap();
let network = Network::new(Pid::new(), &runtime);

Usually you only create a single Network for an application, except when client and server are in the same application, then you will want 2. However there are no technical limitations from creating more.

source

pub fn new_with_registry( participant_id: Pid, runtime: &Runtime, registry: &Registry ) -> Self

See new

additional Arguments
  • registry - Provide a Registry in order to collect Prometheus metrics by this Network, None will deactivate Tracing. Tracing is done via [prometheus]
Examples
use prometheus::Registry;
use tokio::runtime::Runtime;
use veloren_network::{Network, Pid};

let runtime = Runtime::new().unwrap();
let registry = Registry::new();
let network = Network::new_with_registry(Pid::new(), &runtime, &registry);
source

fn internal_new( participant_id: Pid, runtime: &Runtime, registry: Option<&Registry> ) -> Self

source

pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError>

starts listening on an ListenAddr. When the method returns the Network is ready to listen for incoming connections OR has returned a NetworkError (e.g. port already used). You can call connected to asynchrony wait for a Participant to connect. You can call listen on multiple addresses, e.g. to support multiple Protocols or NICs.

Examples
use tokio::runtime::Runtime;
use veloren_network::{Network, Pid, ListenAddr};

// Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    network
        .listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
        .await?;
    network
        .listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
        .await?;
    drop(network);
})
source

pub async fn connect( &self, address: ConnectAddr ) -> Result<Participant, NetworkError>

starts connection to an ConnectAddr. When the method returns the Network either returns a Participant ready to open Streams on OR has returned a NetworkError (e.g. can’t connect, or invalid Handshake) # Examples

use tokio::runtime::Runtime;
use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};

// Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
let runtime = Runtime::new().unwrap();
let network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    let p1 = network
        .connect(ConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
        .await?;
    let p2 = network
        .connect(ConnectAddr::Udp("127.0.0.1:2011".parse().unwrap()))
        .await?;
    assert_eq!(&p1, &p2);
})?;
drop(network);

Usually the Network guarantees that a operation on a Participant succeeds, e.g. by automatic retrying unless it fails completely e.g. by disconnecting from the remote. If 2 [ConnectAddr] you connect to belongs to the same [Participant], you get the same [Participant] as a result. This is useful e.g. by connecting to the same [Participant`] via multiple Protocols.

source

pub async fn connected(&mut self) -> Result<Participant, NetworkError>

returns a Participant created from a ListenAddr you called listen on before. This function will either return a working Participant ready to open Streams on OR has returned a NetworkError (e.g. Network got closed)

Examples
use tokio::runtime::Runtime;
use veloren_network::{ConnectAddr, ListenAddr, Network, Pid};

// Create a Network, listen on port `2020` TCP and opens returns their Pid
let runtime = Runtime::new().unwrap();
let mut network = Network::new(Pid::new(), &runtime);
runtime.block_on(async {
    network
        .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
        .await?;
    while let Ok(participant) = network.connected().await {
        println!("Participant connected: {}", participant.remote_pid());
    }
    drop(network);
})
source

async fn shutdown_mgr( local_pid: Pid, shutdown_network_r: Receiver<Sender<()>>, participant_disconnect_sender: Arc<Mutex<HashMap<Pid, Arc<Mutex<Option<UnboundedSender<(Pid, (Duration, Sender<Result<(), ParticipantError>>))>>>>>>>, shutdown_scheduler_s: Sender<()> )

Use a mgr to handle shutdown smoothly and not in Drop

Trait Implementations§

source§

impl Drop for Network

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more