1use super::sentinel::{DeletedEntities, TrackedStorages, UpdateTrackers};
2use crate::{EditableSettings, Tick, client::Client, presence::RegionSubscription};
3use common::{
4 calendar::Calendar,
5 comp::{Collider, ForceUpdate, InventoryUpdate, Last, Ori, Player, Pos, Presence, Vel},
6 event::EventBus,
7 link::Is,
8 mounting::Rider,
9 outcome::Outcome,
10 region::{Event as RegionEvent, RegionMap},
11 resources::{PlayerPhysicsSettings, Time, TimeOfDay, TimeScale},
12 terrain::TerrainChunkSize,
13 uid::Uid,
14 vol::RectVolSize,
15};
16use common_ecs::{Job, Origin, Phase, System};
17use common_net::{msg::ServerGeneral, sync::CompSyncPackage};
18use itertools::Either;
19use specs::{Entities, Join, LendJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
20use vek::*;
21
22#[derive(Default)]
24pub struct Sys;
25
26impl<'a> System<'a> for Sys {
27 type SystemData = (
28 Entities<'a>,
29 Read<'a, Tick>,
30 Read<'a, PlayerPhysicsSettings>,
31 TrackedStorages<'a>,
32 ReadExpect<'a, TimeOfDay>,
33 ReadExpect<'a, Time>,
34 ReadExpect<'a, Calendar>,
35 ReadExpect<'a, TimeScale>,
36 ReadExpect<'a, RegionMap>,
37 ReadExpect<'a, UpdateTrackers>,
38 ReadStorage<'a, Pos>,
39 ReadStorage<'a, Vel>,
40 ReadStorage<'a, Ori>,
41 ReadStorage<'a, RegionSubscription>,
42 ReadStorage<'a, Player>,
43 ReadStorage<'a, Presence>,
44 ReadStorage<'a, Client>,
45 WriteStorage<'a, Last<Pos>>,
46 WriteStorage<'a, Last<Vel>>,
47 WriteStorage<'a, Last<Ori>>,
48 WriteStorage<'a, ForceUpdate>,
49 WriteStorage<'a, InventoryUpdate>,
50 Write<'a, DeletedEntities>,
51 Read<'a, EventBus<Outcome>>,
52 ReadExpect<'a, EditableSettings>,
53 );
54
55 const NAME: &'static str = "entity_sync";
56 const ORIGIN: Origin = Origin::Server;
57 const PHASE: Phase = Phase::Create;
58
59 fn run(
60 job: &mut Job<Self>,
61 (
62 entities,
63 tick,
64 player_physics_settings,
65 tracked_storages,
66 time_of_day,
67 time,
68 calendar,
69 time_scale,
70 region_map,
71 trackers,
72 positions,
73 velocities,
74 orientations,
75 subscriptions,
76 players,
77 presences,
78 clients,
79 mut last_pos,
80 mut last_vel,
81 mut last_ori,
82 mut force_updates,
83 mut inventory_updates,
84 mut deleted_entities,
85 outcomes,
86 editable_settings,
87 ): Self::SystemData,
88 ) {
89 let tick = tick.0;
90
91 let uids = &tracked_storages.uid;
94 let colliders = &tracked_storages.collider;
95 let inventories = &tracked_storages.inventory;
96 let is_rider = &tracked_storages.is_rider;
97
98 let regions_and_deleted_entities = region_map
121 .iter()
122 .map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
123 .collect::<Vec<_>>();
124
125 use rayon::iter::{IntoParallelIterator, ParallelIterator};
126 job.cpu_stats.measure(common_ecs::ParMode::Rayon);
127 common_base::prof_span!(guard, "regions");
128 regions_and_deleted_entities.into_par_iter().for_each_init(
129 || {
130 common_base::prof_span!(guard, "entity sync rayon job");
131 guard
132 },
133 |_guard, (key, region, deleted_entities_in_region)| {
134 let mut subscribers = (
137 &clients,
138 &entities,
139 presences.maybe(),
140 &subscriptions,
141 &positions,
142 )
143 .join()
144 .filter_map(|(client, entity, presence, subscription, pos)| {
145 if presence.is_some() && subscription.regions.contains(&key) {
146 Some((client, &subscription.regions, entity, *pos))
147 } else {
148 None
149 }
150 })
151 .collect::<Vec<_>>();
152
153 for event in region.events() {
154 match event {
155 RegionEvent::Entered(id, maybe_key) => {
156 if trackers.uid.inserted().contains(*id) {
159 continue;
160 }
161 let entity = entities.entity(*id);
162 if let Some(pkg) = positions
163 .get(entity)
164 .map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
165 .and_then(|(pos, vel, ori)| {
166 tracked_storages.create_entity_package(
167 entity,
168 Some(*pos),
169 vel.copied(),
170 ori.copied(),
171 )
172 })
173 {
174 let create_msg = ServerGeneral::CreateEntity(pkg);
175 for (client, regions, client_entity, _) in &mut subscribers {
176 if maybe_key
177 .as_ref()
178 .map(|key| !regions.contains(key))
179 .unwrap_or(true)
180 && *client_entity != entity
182 {
183 client.send_fallible(create_msg.clone());
184 }
185 }
186 }
187 },
188 RegionEvent::Left(id, maybe_key) => {
189 if let Some(&uid) = uids.get(entities.entity(*id)) {
191 for (client, regions, _, _) in &mut subscribers {
192 if maybe_key
193 .as_ref()
194 .map(|key| !regions.contains(key))
195 .unwrap_or(true)
196 {
197 client.send_fallible(ServerGeneral::DeleteEntity(uid));
201 }
202 }
203 }
204 },
205 }
206 }
207
208 let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
211 &tracked_storages,
212 region.entities(),
213 deleted_entities_in_region,
214 );
215 let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
218 for (client, _, client_entity, _) in &mut subscribers {
219 let msg = entity_comp_sync.right_or_else(
220 |(entity_sync_package, comp_sync_package)| {
221 (
222 client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
223 client.prepare(ServerGeneral::CompSync(
224 comp_sync_package,
225 force_updates.get(*client_entity).map_or(0, |f| f.counter()),
226 )),
227 )
228 },
229 );
230 let _ = client.send_prepared(&msg.0);
233 let _ = client.send_prepared(&msg.1);
234 entity_comp_sync = Either::Right(msg);
235 }
236
237 for (client, _, client_entity, client_pos) in &mut subscribers {
238 let mut comp_sync_package = CompSyncPackage::new();
239
240 for (_, entity, &uid, (&pos, last_pos), vel, ori, collider) in (
241 region.entities(),
242 &entities,
243 uids,
244 (&positions, last_pos.mask().maybe()),
245 (&velocities, last_vel.mask().maybe()).maybe(),
246 (&orientations, last_vel.mask().maybe()).maybe(),
247 colliders.maybe(),
248 )
249 .join()
250 {
251 let send_now = if client_entity == &entity {
253 should_sync_client_physics(
254 entity,
255 &player_physics_settings,
256 &players,
257 &force_updates,
258 is_rider,
259 &editable_settings,
260 )
261 } else if matches!(collider, Some(Collider::Voxel { .. })) {
262 true
266 } else {
267 let distance_sq = client_pos.0.distance_squared(pos.0);
270 let id_staggered_tick = tick + entity.id() as u64;
271
272 if distance_sq > 500.0f32.powi(2) {
274 id_staggered_tick % 32 == 0
275 } else if distance_sq > 300.0f32.powi(2) {
276 id_staggered_tick % 16 == 0
277 } else if distance_sq > 200.0f32.powi(2) {
278 id_staggered_tick % 8 == 0
279 } else if distance_sq > 120.0f32.powi(2) {
280 id_staggered_tick % 6 == 0
281 } else if distance_sq > 64.0f32.powi(2) {
282 id_staggered_tick % 3 == 0
283 } else if distance_sq > 24.0f32.powi(2) {
284 id_staggered_tick % 2 == 0
285 } else {
286 true
287 }
288 };
289
290 add_physics_components(
291 send_now,
292 &mut comp_sync_package,
293 uid,
294 pos,
295 last_pos,
296 ori,
297 vel,
298 );
299 }
300
301 client.send_fallible(ServerGeneral::CompSync(
307 comp_sync_package,
308 force_updates.get(*client_entity).map_or(0, |f| f.counter()),
309 ));
310 }
311 },
312 );
313 drop(guard);
314 job.cpu_stats.measure(common_ecs::ParMode::Single);
315
316 for (entity, client, &uid, (maybe_pos, last_pos), vel, ori) in (
318 &entities,
319 &clients,
320 uids,
321 (positions.maybe(), last_pos.mask().maybe()),
322 (&velocities, last_vel.mask().maybe()).maybe(),
323 (&orientations, last_vel.mask().maybe()).maybe(),
324 )
325 .join()
326 {
327 let include_all_comps = region_map.in_region_map(entity);
331
332 let mut comp_sync_package = trackers.create_sync_from_client_package(
333 &tracked_storages,
334 entity,
335 include_all_comps,
336 );
337
338 if include_all_comps && let Some(&pos) = maybe_pos {
339 let send_now = should_sync_client_physics(
340 entity,
341 &player_physics_settings,
342 &players,
343 &force_updates,
344 is_rider,
345 &editable_settings,
346 );
347 add_physics_components(
348 send_now,
349 &mut comp_sync_package,
350 uid,
351 pos,
352 last_pos,
353 ori,
354 vel,
355 );
356 }
357
358 if !comp_sync_package.is_empty() {
359 client.send_fallible(ServerGeneral::CompSync(
360 comp_sync_package,
361 force_updates.get(entity).map_or(0, |f| f.counter()),
362 ));
363 }
364 }
365
366 (
369 &entities,
370 &positions,
371 velocities.maybe(),
372 orientations.maybe(),
373 last_pos.entries(),
374 last_vel.entries(),
375 last_ori.entries(),
376 )
377 .lend_join()
378 .for_each(|(_, &pos, vel, ori, last_pos, last_vel, last_ori)| {
379 last_pos.replace(Last(pos));
380 vel.and_then(|&v| last_vel.replace(Last(v)));
381 ori.and_then(|&o| last_ori.replace(Last(o)));
382 });
383
384 for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
387 for client in (presences.maybe(), &subscriptions, &clients)
388 .join()
389 .filter_map(|(presence, subscription, client)| {
390 if presence.is_some() && subscription.regions.contains(®ion_key) {
391 Some(client)
392 } else {
393 None
394 }
395 })
396 {
397 for uid in &deleted {
398 client.send_fallible(ServerGeneral::DeleteEntity(*uid));
399 }
400 }
401 }
402
403 for (inventory, update, client) in (inventories, &mut inventory_updates, &clients).join() {
405 client.send_fallible(ServerGeneral::InventoryUpdate(
406 inventory.clone(),
407 update.take_events(),
408 ));
409 }
410
411 let outcomes = outcomes.recv_all().collect::<Vec<_>>();
413
414 for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
416 let is_near = |o_pos: Vec3<f32>| {
417 pos.zip_with(presence, |pos, presence| {
418 pos.0.xy().distance_squared(o_pos.xy())
419 < (presence.entity_view_distance.current() as f32
420 * TerrainChunkSize::RECT_SIZE.x as f32)
421 .powi(2)
422 })
423 };
424
425 let outcomes = outcomes
426 .iter()
427 .filter(|o| o.get_pos().and_then(is_near).unwrap_or(true))
428 .cloned()
429 .collect::<Vec<_>>();
430
431 if !outcomes.is_empty() {
432 client.send_fallible(ServerGeneral::Outcomes(outcomes));
433 }
434 }
435
436 for force_update in (&mut force_updates).join() {
438 force_update.clear();
439 }
440 inventory_updates.clear();
441
442 const TOD_SYNC_FREQ: u64 = 100;
446 if tick % TOD_SYNC_FREQ == 0 {
447 let mut tod_lazymsg = None;
448 for client in (&clients).join() {
449 let msg = tod_lazymsg.unwrap_or_else(|| {
450 client.prepare(ServerGeneral::TimeOfDay(
451 *time_of_day,
452 (*calendar).clone(),
453 *time,
454 *time_scale,
455 ))
456 });
457 let _ = client.send_prepared(&msg);
460 tod_lazymsg = Some(msg);
461 }
462 }
463 }
464}
465
466fn should_sync_client_physics(
469 entity: specs::Entity,
470 player_physics_settings: &PlayerPhysicsSettings,
471 players: &ReadStorage<'_, Player>,
472 force_updates: &WriteStorage<'_, ForceUpdate>,
473 is_rider: &ReadStorage<'_, Is<Rider>>,
474 editable_settings: &EditableSettings,
475) -> bool {
476 let server_authoritative_physics = players.get(entity).is_none_or(|player| {
477 player_physics_settings
478 .settings
479 .get(&player.uuid())
480 .is_some_and(|settings| settings.server_authoritative_physics_optin())
481 || editable_settings
482 .server_physics_force_list
483 .contains_key(&player.uuid())
484 });
485 force_updates.get(entity).is_some_and(|f| f.is_forced())
489 || server_authoritative_physics
490 || is_rider.contains(entity)
491}
492
493fn add_physics_components(
499 send_now: bool,
500 comp_sync_package: &mut CompSyncPackage<common_net::msg::EcsCompPacket>,
501 uid: Uid,
502 pos: Pos,
503 last_pos: Option<u32>,
504 ori: Option<(&Ori, Option<u32>)>,
505 vel: Option<(&Vel, Option<u32>)>,
506) {
507 if last_pos.is_none() {
508 comp_sync_package.comp_inserted(uid, pos);
509 } else if send_now {
510 comp_sync_package.comp_modified(uid, pos);
511 }
512
513 if let Some((v, last_vel)) = vel {
514 if last_vel.is_none() {
515 comp_sync_package.comp_inserted(uid, *v);
516 } else if send_now {
517 comp_sync_package.comp_modified(uid, *v);
518 }
519 }
520
521 if let Some((o, last_ori)) = ori {
522 if last_ori.is_none() {
523 comp_sync_package.comp_inserted(uid, *o);
524 } else if send_now {
525 comp_sync_package.comp_modified(uid, *o);
526 }
527 }
528}