1use super::sentinel::{DeletedEntities, TrackedStorages, UpdateTrackers};
2use crate::{EditableSettings, Tick, client::Client, presence::RegionSubscription};
3use common::{
4 calendar::Calendar,
5 comp::{
6 Collider, ForceUpdate, InventoryUpdate, Last, Ori, Player, Pos, Presence, Vel,
7 presence::SpectatingEntity,
8 },
9 event::EventBus,
10 link::Is,
11 mounting::Rider,
12 outcome::Outcome,
13 region::{RegionEvent, RegionMap},
14 resources::{PlayerPhysicsSettings, Time, TimeOfDay, TimeScale},
15 terrain::TerrainChunkSize,
16 uid::Uid,
17 vol::RectVolSize,
18};
19use common_ecs::{Job, Origin, Phase, System};
20use common_net::{msg::ServerGeneral, sync::CompSyncPackage};
21use itertools::Either;
22use specs::{Entities, Join, LendJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
23use vek::*;
24
25#[derive(Default)]
27pub struct Sys;
28
29impl<'a> System<'a> for Sys {
30 type SystemData = (
31 Entities<'a>,
32 Read<'a, Tick>,
33 Read<'a, PlayerPhysicsSettings>,
34 TrackedStorages<'a>,
35 ReadExpect<'a, TimeOfDay>,
36 ReadExpect<'a, Time>,
37 ReadExpect<'a, Calendar>,
38 ReadExpect<'a, TimeScale>,
39 ReadExpect<'a, RegionMap>,
40 ReadExpect<'a, UpdateTrackers>,
41 Write<'a, DeletedEntities>,
42 Read<'a, EventBus<Outcome>>,
43 ReadExpect<'a, EditableSettings>,
44 (
45 ReadStorage<'a, Pos>,
46 ReadStorage<'a, Vel>,
47 ReadStorage<'a, Ori>,
48 ReadStorage<'a, RegionSubscription>,
49 ReadStorage<'a, Player>,
50 ReadStorage<'a, Presence>,
51 ReadStorage<'a, SpectatingEntity>,
52 ReadStorage<'a, Client>,
53 WriteStorage<'a, Last<Pos>>,
54 WriteStorage<'a, Last<Vel>>,
55 WriteStorage<'a, Last<Ori>>,
56 WriteStorage<'a, ForceUpdate>,
57 WriteStorage<'a, InventoryUpdate>,
58 ),
59 );
60
61 const NAME: &'static str = "entity_sync";
62 const ORIGIN: Origin = Origin::Server;
63 const PHASE: Phase = Phase::Create;
64
65 fn run(
66 job: &mut Job<Self>,
67 (
68 entities,
69 tick,
70 player_physics_settings,
71 tracked_storages,
72 time_of_day,
73 time,
74 calendar,
75 time_scale,
76 region_map,
77 trackers,
78 mut deleted_entities,
79 outcomes,
80 editable_settings,
81 (
82 positions,
83 velocities,
84 orientations,
85 subscriptions,
86 players,
87 presences,
88 spectating_entities,
89 clients,
90 mut last_pos,
91 mut last_vel,
92 mut last_ori,
93 mut force_updates,
94 mut inventory_updates,
95 ),
96 ): Self::SystemData,
97 ) {
98 let tick = tick.0;
99
100 let uids = &tracked_storages.uid;
103 let colliders = &tracked_storages.collider;
104 let inventories = &tracked_storages.inventory;
105 let is_rider = &tracked_storages.is_rider;
106
107 let regions_and_deleted_entities = region_map
130 .iter()
131 .map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
132 .collect::<Vec<_>>();
133
134 use rayon::iter::{IntoParallelIterator, ParallelIterator};
135 job.cpu_stats.measure(common_ecs::ParMode::Rayon);
136 common_base::prof_span!(guard, "regions");
137 regions_and_deleted_entities.into_par_iter().for_each_init(
138 || {
139 common_base::prof_span!(guard, "entity sync rayon job");
140 guard
141 },
142 |_guard, (key, region, deleted_entities_in_region)| {
143 let mut subscribers = (
146 &clients,
147 &entities,
148 presences.maybe(),
149 &subscriptions,
150 &positions,
151 )
152 .join()
153 .filter_map(|(client, entity, presence, subscription, pos)| {
154 if presence.is_some() && subscription.regions.contains(&key) {
155 Some((client, &subscription.regions, entity, *pos))
156 } else {
157 None
158 }
159 })
160 .collect::<Vec<_>>();
161
162 for event in region.events() {
163 match event {
164 RegionEvent::Entered(id, maybe_key) => {
165 if trackers.uid.inserted().contains(*id) {
168 continue;
169 }
170 let entity = entities.entity(*id);
171 if let Some(pkg) = positions
172 .get(entity)
173 .map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
174 .and_then(|(pos, vel, ori)| {
175 tracked_storages.create_entity_package(
176 entity,
177 Some(*pos),
178 vel.copied(),
179 ori.copied(),
180 )
181 })
182 {
183 let create_msg = ServerGeneral::CreateEntity(pkg);
184 for (client, regions, client_entity, _) in &mut subscribers {
185 if maybe_key
186 .as_ref()
187 .map(|key| !regions.contains(key))
188 .unwrap_or(true)
189 && *client_entity != entity
191 {
192 client.send_fallible(create_msg.clone());
193 }
194 }
195 }
196 },
197 RegionEvent::Left(id, maybe_key) => {
198 if let Some(&uid) = uids.get(entities.entity(*id)) {
200 for (client, regions, _, _) in &mut subscribers {
201 if maybe_key
202 .as_ref()
203 .map(|key| !regions.contains(key))
204 .unwrap_or(true)
205 {
206 client.send_fallible(ServerGeneral::DeleteEntity(uid));
210 }
211 }
212 }
213 },
214 }
215 }
216
217 let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
220 &tracked_storages,
221 region.entities(),
222 deleted_entities_in_region,
223 );
224 let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
227 for (client, _, client_entity, _) in &mut subscribers {
228 let msg = entity_comp_sync.right_or_else(
229 |(entity_sync_package, comp_sync_package)| {
230 (
231 client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
232 client.prepare(ServerGeneral::CompSync(
233 comp_sync_package,
234 force_updates.get(*client_entity).map_or(0, |f| f.counter()),
235 )),
236 )
237 },
238 );
239 let _ = client.send_prepared(&msg.0);
242 let _ = client.send_prepared(&msg.1);
243 entity_comp_sync = Either::Right(msg);
244 }
245
246 for (client, _, client_entity, client_pos) in &mut subscribers {
247 let mut comp_sync_package = CompSyncPackage::new();
248
249 for (_, entity, &uid, (&pos, last_pos), vel, ori, collider) in (
250 region.entities(),
251 &entities,
252 uids,
253 (&positions, last_pos.mask().maybe()),
254 (&velocities, last_vel.mask().maybe()).maybe(),
255 (&orientations, last_vel.mask().maybe()).maybe(),
256 colliders.maybe(),
257 )
258 .join()
259 {
260 let send_now = if client_entity == &entity {
262 should_sync_client_physics(
263 entity,
264 &player_physics_settings,
265 &players,
266 &force_updates,
267 is_rider,
268 &editable_settings,
269 )
270 } else if matches!(collider, Some(Collider::Voxel { .. })) {
271 true
275 } else {
276 let distance_sq = client_pos.0.distance_squared(pos.0);
279 let id_staggered_tick = tick + entity.id() as u64;
280
281 if distance_sq > 500.0f32.powi(2) {
283 id_staggered_tick % 32 == 0
284 } else if distance_sq > 300.0f32.powi(2) {
285 id_staggered_tick % 16 == 0
286 } else if distance_sq > 200.0f32.powi(2) {
287 id_staggered_tick % 8 == 0
288 } else if distance_sq > 120.0f32.powi(2) {
289 id_staggered_tick % 6 == 0
290 } else if distance_sq > 64.0f32.powi(2) {
291 id_staggered_tick % 3 == 0
292 } else if distance_sq > 24.0f32.powi(2) {
293 id_staggered_tick % 2 == 0
294 } else {
295 true
296 }
297 };
298
299 add_physics_components(
300 send_now,
301 &mut comp_sync_package,
302 uid,
303 pos,
304 last_pos,
305 ori,
306 vel,
307 );
308 }
309
310 client.send_fallible(ServerGeneral::CompSync(
316 comp_sync_package,
317 force_updates.get(*client_entity).map_or(0, |f| f.counter()),
318 ));
319 }
320 },
321 );
322 drop(guard);
323 job.cpu_stats.measure(common_ecs::ParMode::Single);
324
325 for (entity, client, &uid, (maybe_pos, last_pos), vel, ori) in (
327 &entities,
328 &clients,
329 uids,
330 (positions.maybe(), last_pos.mask().maybe()),
331 (&velocities, last_vel.mask().maybe()).maybe(),
332 (&orientations, last_vel.mask().maybe()).maybe(),
333 )
334 .join()
335 {
336 let include_all_comps = region_map.in_region_map(entity);
340
341 let mut comp_sync_package = trackers.create_sync_from_client_package(
342 &tracked_storages,
343 entity,
344 include_all_comps,
345 );
346
347 if include_all_comps && let Some(&pos) = maybe_pos {
348 let send_now = should_sync_client_physics(
349 entity,
350 &player_physics_settings,
351 &players,
352 &force_updates,
353 is_rider,
354 &editable_settings,
355 );
356 add_physics_components(
357 send_now,
358 &mut comp_sync_package,
359 uid,
360 pos,
361 last_pos,
362 ori,
363 vel,
364 );
365 }
366
367 if !comp_sync_package.is_empty() {
368 client.send_fallible(ServerGeneral::CompSync(
369 comp_sync_package,
370 force_updates.get(entity).map_or(0, |f| f.counter()),
371 ));
372 }
373 }
374
375 for (entity, client, spectating_entity) in
376 (&entities, &clients, &spectating_entities).join()
377 {
378 let comp_sync_package = trackers.create_sync_from_spectated_entity_package(
385 &tracked_storages,
386 entity,
387 spectating_entity.0,
388 );
389
390 if !comp_sync_package.is_empty() {
391 client.send_fallible(ServerGeneral::CompSync(
392 comp_sync_package,
393 force_updates.get(entity).map_or(0, |f| f.counter()),
394 ));
395 }
396 }
397
398 (
401 &entities,
402 &positions,
403 velocities.maybe(),
404 orientations.maybe(),
405 last_pos.entries(),
406 last_vel.entries(),
407 last_ori.entries(),
408 )
409 .lend_join()
410 .for_each(|(_, &pos, vel, ori, last_pos, last_vel, last_ori)| {
411 last_pos.replace(Last(pos));
412 vel.and_then(|&v| last_vel.replace(Last(v)));
413 ori.and_then(|&o| last_ori.replace(Last(o)));
414 });
415
416 for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
419 for client in (presences.maybe(), &subscriptions, &clients)
420 .join()
421 .filter_map(|(presence, subscription, client)| {
422 if presence.is_some() && subscription.regions.contains(®ion_key) {
423 Some(client)
424 } else {
425 None
426 }
427 })
428 {
429 for uid in &deleted {
430 client.send_fallible(ServerGeneral::DeleteEntity(*uid));
431 }
432 }
433 }
434
435 for (inventory, update, client) in (inventories, &mut inventory_updates, &clients).join() {
437 client.send_fallible(ServerGeneral::InventoryUpdate(
438 inventory.clone(),
439 update.take_events(),
440 ));
441 }
442
443 let outcomes = outcomes.recv_all().collect::<Vec<_>>();
445
446 for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
448 let is_near = |o_pos: Vec3<f32>| {
449 pos.zip_with(presence, |pos, presence| {
450 pos.0.xy().distance_squared(o_pos.xy())
451 < (presence.entity_view_distance.current() as f32
452 * TerrainChunkSize::RECT_SIZE.x as f32)
453 .powi(2)
454 })
455 };
456
457 let outcomes = outcomes
458 .iter()
459 .filter(|o| o.get_pos().and_then(is_near).unwrap_or(true))
460 .cloned()
461 .collect::<Vec<_>>();
462
463 if !outcomes.is_empty() {
464 client.send_fallible(ServerGeneral::Outcomes(outcomes));
465 }
466 }
467
468 for force_update in (&mut force_updates).join() {
470 force_update.clear();
471 }
472 inventory_updates.clear();
473
474 const TOD_SYNC_FREQ: u64 = 100;
478 if tick % TOD_SYNC_FREQ == 0 {
479 let mut tod_lazymsg = None;
480 for client in (&clients).join() {
481 let msg = tod_lazymsg.unwrap_or_else(|| {
482 client.prepare(ServerGeneral::TimeOfDay(
483 *time_of_day,
484 (*calendar).clone(),
485 *time,
486 *time_scale,
487 ))
488 });
489 let _ = client.send_prepared(&msg);
492 tod_lazymsg = Some(msg);
493 }
494 }
495 }
496}
497
498fn should_sync_client_physics(
501 entity: specs::Entity,
502 player_physics_settings: &PlayerPhysicsSettings,
503 players: &ReadStorage<'_, Player>,
504 force_updates: &WriteStorage<'_, ForceUpdate>,
505 is_rider: &ReadStorage<'_, Is<Rider>>,
506 editable_settings: &EditableSettings,
507) -> bool {
508 let server_authoritative_physics = players.get(entity).is_none_or(|player| {
509 player_physics_settings
510 .settings
511 .get(&player.uuid())
512 .is_some_and(|settings| settings.server_authoritative_physics_optin())
513 || editable_settings
514 .server_physics_force_list
515 .contains_key(&player.uuid())
516 });
517 force_updates.get(entity).is_some_and(|f| f.is_forced())
521 || server_authoritative_physics
522 || is_rider.contains(entity)
523}
524
525fn add_physics_components(
531 send_now: bool,
532 comp_sync_package: &mut CompSyncPackage<common_net::msg::EcsCompPacket>,
533 uid: Uid,
534 pos: Pos,
535 last_pos: Option<u32>,
536 ori: Option<(&Ori, Option<u32>)>,
537 vel: Option<(&Vel, Option<u32>)>,
538) {
539 if last_pos.is_none() {
540 comp_sync_package.comp_inserted(uid, pos);
541 } else if send_now {
542 comp_sync_package.comp_modified(uid, pos);
543 }
544
545 if let Some((v, last_vel)) = vel {
546 if last_vel.is_none() {
547 comp_sync_package.comp_inserted(uid, *v);
548 } else if send_now {
549 comp_sync_package.comp_modified(uid, *v);
550 }
551 }
552
553 if let Some((o, last_ori)) = ori {
554 if last_ori.is_none() {
555 comp_sync_package.comp_inserted(uid, *o);
556 } else if send_now {
557 comp_sync_package.comp_modified(uid, *o);
558 }
559 }
560}