veloren_server/sys/
chunk_serialize.rs

1use crate::{
2    Tick,
3    chunk_serialize::{ChunkSendEntry, SerializedChunk},
4    client::Client,
5    metrics::NetworkRequestMetrics,
6};
7use common::{comp::Presence, event::EventBus, slowjob::SlowJobPool, terrain::TerrainGrid};
8use common_ecs::{Job, Origin, Phase, System};
9use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
10use hashbrown::{HashMap, hash_map::Entry};
11use network::StreamParams;
12use specs::{Entity, Read, ReadExpect, ReadStorage};
13use std::sync::Arc;
14
15/// This system will handle sending terrain to clients by
16/// collecting chunks that need to be send for a single generation run and then
17/// trigger a SlowJob for serialisation.
18#[derive(Default)]
19pub struct Sys;
20impl<'a> System<'a> for Sys {
21    type SystemData = (
22        Read<'a, Tick>,
23        ReadStorage<'a, Client>,
24        ReadStorage<'a, Presence>,
25        ReadExpect<'a, EventBus<ChunkSendEntry>>,
26        ReadExpect<'a, NetworkRequestMetrics>,
27        ReadExpect<'a, SlowJobPool>,
28        ReadExpect<'a, TerrainGrid>,
29        ReadExpect<'a, crossbeam_channel::Sender<SerializedChunk>>,
30    );
31
32    const NAME: &'static str = "chunk_serialize";
33    const ORIGIN: Origin = Origin::Server;
34    const PHASE: Phase = Phase::Create;
35
36    fn run(
37        _job: &mut Job<Self>,
38        (
39            tick,
40            clients,
41            presences,
42            chunk_send_queues_bus,
43            network_metrics,
44            slow_jobs,
45            terrain,
46            chunk_sender,
47        ): Self::SystemData,
48    ) {
49        // Only operate twice per second
50        //TODO: move out of this system and now even spawn this.
51        if tick.0.rem_euclid(15) != 0 {
52            return;
53        }
54
55        struct Metadata {
56            recipients: Vec<Entity>,
57            lossy_compression: bool,
58            params: StreamParams,
59        }
60
61        // collect all deduped entities that request a chunk
62        let mut chunks = HashMap::<_, Metadata>::new();
63        let mut requests = 0u64;
64        let mut distinct_requests = 0u64;
65
66        for queue_entry in chunk_send_queues_bus.recv_all() {
67            let entry = chunks.entry(queue_entry.chunk_key);
68            let meta = match entry {
69                Entry::Vacant(ve) => {
70                    match clients.get(queue_entry.entity).map(|c| c.terrain_params()) {
71                        Some(params) => {
72                            distinct_requests += 1;
73                            ve.insert(Metadata {
74                                recipients: Vec::new(),
75                                lossy_compression: true,
76                                params,
77                            })
78                        },
79                        None => continue,
80                    }
81                },
82                Entry::Occupied(oe) => oe.into_mut(),
83            };
84
85            // We decide here, to ONLY send lossy compressed data If all clients want those.
86            // If at least 1 client here does not want lossy we don't compress it twice.
87            // It would just be too expensive for the server
88            meta.lossy_compression = meta.lossy_compression
89                && presences
90                    .get(queue_entry.entity)
91                    .map(|p| p.lossy_terrain_compression)
92                    .unwrap_or(true);
93            meta.recipients.push(queue_entry.entity);
94            requests += 1;
95        }
96
97        network_metrics
98            .chunks_serialisation_requests
99            .inc_by(requests);
100        network_metrics
101            .chunks_distinct_serialisation_requests
102            .inc_by(distinct_requests);
103
104        // Trigger serialization in a SlowJob
105        const CHUNK_SIZE: usize = 10; // trigger one job per 10 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this
106        let mut chunks_iter = chunks
107            .into_iter()
108            .filter_map(|(chunk_key, meta)| {
109                terrain
110                    .get_key_arc_real(chunk_key)
111                    .map(|chunk| (Arc::clone(chunk), chunk_key, meta))
112            })
113            .peekable();
114
115        while chunks_iter.peek().is_some() {
116            let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect();
117            let chunk_sender = chunk_sender.clone();
118            slow_jobs.spawn("CHUNK_SERIALIZER", move || {
119                for (chunk, chunk_key, mut meta) in chunks {
120                    let msg = Client::prepare_chunk_update_msg(
121                        ServerGeneral::TerrainChunkUpdate {
122                            key: chunk_key,
123                            chunk: Ok(SerializedTerrainChunk::via_heuristic(
124                                &chunk,
125                                meta.lossy_compression,
126                            )),
127                        },
128                        &meta.params,
129                    );
130                    meta.recipients.sort_unstable();
131                    meta.recipients.dedup();
132                    if let Err(e) = chunk_sender.send(SerializedChunk {
133                        lossy_compression: meta.lossy_compression,
134                        msg,
135                        recipients: meta.recipients,
136                    }) {
137                        tracing::warn!(?e, "cannot send serialized chunk to sender");
138                        break;
139                    };
140                }
141            });
142        }
143    }
144}