veloren_server/sys/
chunk_serialize.rs1use crate::{
2 Tick,
3 chunk_serialize::{ChunkSendEntry, SerializedChunk},
4 client::Client,
5 metrics::NetworkRequestMetrics,
6};
7use common::{comp::Presence, event::EventBus, slowjob::SlowJobPool, terrain::TerrainGrid};
8use common_ecs::{Job, Origin, Phase, System};
9use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
10use hashbrown::{HashMap, hash_map::Entry};
11use network::StreamParams;
12use specs::{Entity, Read, ReadExpect, ReadStorage};
13use std::sync::Arc;
14
15#[derive(Default)]
19pub struct Sys;
20impl<'a> System<'a> for Sys {
21 type SystemData = (
22 Read<'a, Tick>,
23 ReadStorage<'a, Client>,
24 ReadStorage<'a, Presence>,
25 ReadExpect<'a, EventBus<ChunkSendEntry>>,
26 ReadExpect<'a, NetworkRequestMetrics>,
27 ReadExpect<'a, SlowJobPool>,
28 ReadExpect<'a, TerrainGrid>,
29 ReadExpect<'a, crossbeam_channel::Sender<SerializedChunk>>,
30 );
31
32 const NAME: &'static str = "chunk_serialize";
33 const ORIGIN: Origin = Origin::Server;
34 const PHASE: Phase = Phase::Create;
35
36 fn run(
37 _job: &mut Job<Self>,
38 (
39 tick,
40 clients,
41 presences,
42 chunk_send_queues_bus,
43 network_metrics,
44 slow_jobs,
45 terrain,
46 chunk_sender,
47 ): Self::SystemData,
48 ) {
49 if tick.0.rem_euclid(15) != 0 {
52 return;
53 }
54
55 struct Metadata {
56 recipients: Vec<Entity>,
57 lossy_compression: bool,
58 params: StreamParams,
59 }
60
61 let mut chunks = HashMap::<_, Metadata>::new();
63 let mut requests = 0u64;
64 let mut distinct_requests = 0u64;
65
66 for queue_entry in chunk_send_queues_bus.recv_all() {
67 let entry = chunks.entry(queue_entry.chunk_key);
68 let meta = match entry {
69 Entry::Vacant(ve) => {
70 match clients.get(queue_entry.entity).map(|c| c.terrain_params()) {
71 Some(params) => {
72 distinct_requests += 1;
73 ve.insert(Metadata {
74 recipients: Vec::new(),
75 lossy_compression: true,
76 params,
77 })
78 },
79 None => continue,
80 }
81 },
82 Entry::Occupied(oe) => oe.into_mut(),
83 };
84
85 meta.lossy_compression = meta.lossy_compression
89 && presences
90 .get(queue_entry.entity)
91 .map(|p| p.lossy_terrain_compression)
92 .unwrap_or(true);
93 meta.recipients.push(queue_entry.entity);
94 requests += 1;
95 }
96
97 network_metrics
98 .chunks_serialisation_requests
99 .inc_by(requests);
100 network_metrics
101 .chunks_distinct_serialisation_requests
102 .inc_by(distinct_requests);
103
104 const CHUNK_SIZE: usize = 10; let mut chunks_iter = chunks
107 .into_iter()
108 .filter_map(|(chunk_key, meta)| {
109 terrain
110 .get_key_arc_real(chunk_key)
111 .map(|chunk| (Arc::clone(chunk), chunk_key, meta))
112 })
113 .peekable();
114
115 while chunks_iter.peek().is_some() {
116 let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect();
117 let chunk_sender = chunk_sender.clone();
118 slow_jobs.spawn("CHUNK_SERIALIZER", move || {
119 for (chunk, chunk_key, mut meta) in chunks {
120 let msg = Client::prepare_chunk_update_msg(
121 ServerGeneral::TerrainChunkUpdate {
122 key: chunk_key,
123 chunk: Ok(SerializedTerrainChunk::via_heuristic(
124 &chunk,
125 meta.lossy_compression,
126 )),
127 },
128 &meta.params,
129 );
130 meta.recipients.sort_unstable();
131 meta.recipients.dedup();
132 if let Err(e) = chunk_sender.send(SerializedChunk {
133 lossy_compression: meta.lossy_compression,
134 msg,
135 recipients: meta.recipients,
136 }) {
137 tracing::warn!(?e, "cannot send serialized chunk to sender");
138 break;
139 };
140 }
141 });
142 }
143 }
144}