1use super::sentinel::{DeletedEntities, TrackedStorages, UpdateTrackers};
2use crate::{EditableSettings, Tick, client::Client, presence::RegionSubscription};
3use common::{
4 calendar::Calendar,
5 comp::{
6 Collider, ForceUpdate, InventoryUpdateBuffer, Last, Ori, Player, Pos, Presence, Vel,
7 presence::SpectatingEntity,
8 },
9 event::EventBus,
10 link::Is,
11 mounting::Rider,
12 outcome::Outcome,
13 region::{RegionEvent, RegionMap},
14 resources::{PlayerPhysicsSettings, Time, TimeOfDay, TimeScale},
15 terrain::TerrainChunkSize,
16 uid::Uid,
17 vol::RectVolSize,
18};
19use common_base::dev_panic;
20use common_ecs::{Job, Origin, Phase, System};
21use common_net::{msg::ServerGeneral, sync::CompSyncPackage};
22use itertools::Either;
23use specs::{Entities, Join, LendJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
24use vek::*;
25
26#[derive(Default)]
28pub struct Sys;
29
30impl<'a> System<'a> for Sys {
31 type SystemData = (
32 Entities<'a>,
33 Read<'a, Tick>,
34 Read<'a, PlayerPhysicsSettings>,
35 TrackedStorages<'a>,
36 ReadExpect<'a, TimeOfDay>,
37 ReadExpect<'a, Time>,
38 ReadExpect<'a, Calendar>,
39 ReadExpect<'a, TimeScale>,
40 ReadExpect<'a, RegionMap>,
41 ReadExpect<'a, UpdateTrackers>,
42 Write<'a, DeletedEntities>,
43 Read<'a, EventBus<Outcome>>,
44 ReadExpect<'a, EditableSettings>,
45 (
46 ReadStorage<'a, Pos>,
47 ReadStorage<'a, Vel>,
48 ReadStorage<'a, Ori>,
49 ReadStorage<'a, RegionSubscription>,
50 ReadStorage<'a, Player>,
51 ReadStorage<'a, Presence>,
52 ReadStorage<'a, SpectatingEntity>,
53 ReadStorage<'a, Client>,
54 WriteStorage<'a, Last<Pos>>,
55 WriteStorage<'a, Last<Vel>>,
56 WriteStorage<'a, Last<Ori>>,
57 WriteStorage<'a, ForceUpdate>,
58 WriteStorage<'a, InventoryUpdateBuffer>,
59 ),
60 );
61
62 const NAME: &'static str = "entity_sync";
63 const ORIGIN: Origin = Origin::Server;
64 const PHASE: Phase = Phase::Create;
65
66 fn run(
67 job: &mut Job<Self>,
68 (
69 entities,
70 tick,
71 player_physics_settings,
72 tracked_storages,
73 time_of_day,
74 time,
75 calendar,
76 time_scale,
77 region_map,
78 trackers,
79 mut deleted_entities,
80 outcomes,
81 editable_settings,
82 (
83 positions,
84 velocities,
85 orientations,
86 subscriptions,
87 players,
88 presences,
89 spectating_entities,
90 clients,
91 mut last_pos,
92 mut last_vel,
93 mut last_ori,
94 mut force_updates,
95 mut inventory_update_buffers,
96 ),
97 ): Self::SystemData,
98 ) {
99 let tick = tick.0;
100
101 let uids = &tracked_storages.uid;
104 let colliders = &tracked_storages.collider;
105 let inventories = &tracked_storages.inventory;
106 let is_rider = &tracked_storages.is_rider;
107
108 let regions_and_deleted_entities = region_map
131 .iter()
132 .map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
133 .collect::<Vec<_>>();
134
135 use rayon::iter::{IntoParallelIterator, ParallelIterator};
136 job.cpu_stats.measure(common_ecs::ParMode::Rayon);
137 common_base::prof_span!(guard, "regions");
138 regions_and_deleted_entities.into_par_iter().for_each_init(
139 || {
140 common_base::prof_span!(guard, "entity sync rayon job");
141 guard
142 },
143 |_guard, (key, region, deleted_entities_in_region)| {
144 let mut subscribers = (
147 &clients,
148 &entities,
149 presences.maybe(),
150 &subscriptions,
151 &positions,
152 )
153 .join()
154 .filter_map(|(client, entity, presence, subscription, pos)| {
155 if presence.is_some() && subscription.regions.contains(&key) {
156 Some((client, &subscription.regions, entity, *pos))
157 } else {
158 None
159 }
160 })
161 .collect::<Vec<_>>();
162
163 for event in region.events() {
164 match event {
165 RegionEvent::Entered(id, maybe_key) => {
166 if trackers.uid.inserted().contains(*id) {
169 continue;
170 }
171 let entity = entities.entity(*id);
172 if let Some(pkg) = positions
173 .get(entity)
174 .map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
175 .and_then(|(pos, vel, ori)| {
176 tracked_storages.create_entity_package(
177 entity,
178 Some(*pos),
179 vel.copied(),
180 ori.copied(),
181 )
182 })
183 {
184 let create_msg = ServerGeneral::CreateEntity(pkg);
185 for (client, regions, client_entity, _) in &mut subscribers {
186 if maybe_key
187 .as_ref()
188 .map(|key| !regions.contains(key))
189 .unwrap_or(true)
190 && *client_entity != entity
192 {
193 client.send_fallible(create_msg.clone());
194 }
195 }
196 }
197 },
198 RegionEvent::Left(id, maybe_key) => {
199 if let Some(&uid) = uids.get(entities.entity(*id)) {
201 for (client, regions, _, _) in &mut subscribers {
202 if maybe_key
203 .as_ref()
204 .map(|key| !regions.contains(key))
205 .unwrap_or(true)
206 {
207 client.send_fallible(ServerGeneral::DeleteEntity(uid));
211 }
212 }
213 }
214 },
215 }
216 }
217
218 let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
221 &tracked_storages,
222 region.entities(),
223 deleted_entities_in_region,
224 );
225 let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
228 for (client, _, client_entity, _) in &mut subscribers {
229 let msg = entity_comp_sync.right_or_else(
230 |(entity_sync_package, comp_sync_package)| {
231 (
232 client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
233 client.prepare(ServerGeneral::CompSync(
234 comp_sync_package,
235 force_updates.get(*client_entity).map_or(0, |f| f.counter()),
236 )),
237 )
238 },
239 );
240 let _ = client.send_prepared(&msg.0);
243 let _ = client.send_prepared(&msg.1);
244 entity_comp_sync = Either::Right(msg);
245 }
246
247 for (client, _, client_entity, client_pos) in &mut subscribers {
248 let mut comp_sync_package = CompSyncPackage::new();
249
250 for (_, entity, &uid, (&pos, last_pos), vel, ori, collider) in (
251 region.entities(),
252 &entities,
253 uids,
254 (&positions, last_pos.mask().maybe()),
255 (&velocities, last_vel.mask().maybe()).maybe(),
256 (&orientations, last_vel.mask().maybe()).maybe(),
257 colliders.maybe(),
258 )
259 .join()
260 {
261 let send_now = if client_entity == &entity {
263 should_sync_client_physics(
264 entity,
265 &player_physics_settings,
266 &players,
267 &force_updates,
268 is_rider,
269 &editable_settings,
270 )
271 } else if matches!(collider, Some(Collider::Voxel { .. })) {
272 true
276 } else {
277 let distance_sq = client_pos.0.distance_squared(pos.0);
280 let id_staggered_tick = tick + entity.id() as u64;
281
282 if distance_sq > 500.0f32.powi(2) {
284 id_staggered_tick.is_multiple_of(32)
285 } else if distance_sq > 300.0f32.powi(2) {
286 id_staggered_tick.is_multiple_of(16)
287 } else if distance_sq > 200.0f32.powi(2) {
288 id_staggered_tick.is_multiple_of(8)
289 } else if distance_sq > 120.0f32.powi(2) {
290 id_staggered_tick.is_multiple_of(6)
291 } else if distance_sq > 64.0f32.powi(2) {
292 id_staggered_tick.is_multiple_of(3)
293 } else if distance_sq > 24.0f32.powi(2) {
294 id_staggered_tick.is_multiple_of(2)
295 } else {
296 true
297 }
298 };
299
300 add_physics_components(
301 send_now,
302 &mut comp_sync_package,
303 uid,
304 pos,
305 last_pos,
306 ori,
307 vel,
308 );
309 }
310
311 client.send_fallible(ServerGeneral::CompSync(
317 comp_sync_package,
318 force_updates.get(*client_entity).map_or(0, |f| f.counter()),
319 ));
320 }
321 },
322 );
323 drop(guard);
324 job.cpu_stats.measure(common_ecs::ParMode::Single);
325
326 for (entity, client, &uid, (maybe_pos, last_pos), vel, ori) in (
328 &entities,
329 &clients,
330 uids,
331 (positions.maybe(), last_pos.mask().maybe()),
332 (&velocities, last_vel.mask().maybe()).maybe(),
333 (&orientations, last_vel.mask().maybe()).maybe(),
334 )
335 .join()
336 {
337 let include_all_comps = region_map.in_region_map(entity);
341
342 let mut comp_sync_package = trackers.create_sync_from_client_package(
343 &tracked_storages,
344 entity,
345 include_all_comps,
346 );
347
348 if include_all_comps && let Some(&pos) = maybe_pos {
349 let send_now = should_sync_client_physics(
350 entity,
351 &player_physics_settings,
352 &players,
353 &force_updates,
354 is_rider,
355 &editable_settings,
356 );
357 add_physics_components(
358 send_now,
359 &mut comp_sync_package,
360 uid,
361 pos,
362 last_pos,
363 ori,
364 vel,
365 );
366 }
367
368 if !comp_sync_package.is_empty() {
369 client.send_fallible(ServerGeneral::CompSync(
370 comp_sync_package,
371 force_updates.get(entity).map_or(0, |f| f.counter()),
372 ));
373 }
374 }
375
376 for (entity, client, spectating_entity) in
377 (&entities, &clients, &spectating_entities).join()
378 {
379 let comp_sync_package = trackers.create_sync_from_spectated_entity_package(
386 &tracked_storages,
387 entity,
388 spectating_entity.0,
389 );
390
391 if !comp_sync_package.is_empty() {
392 client.send_fallible(ServerGeneral::CompSync(
393 comp_sync_package,
394 force_updates.get(entity).map_or(0, |f| f.counter()),
395 ));
396 }
397 }
398
399 (
402 &entities,
403 &positions,
404 velocities.maybe(),
405 orientations.maybe(),
406 last_pos.entries(),
407 last_vel.entries(),
408 last_ori.entries(),
409 )
410 .lend_join()
411 .for_each(|(_, &pos, vel, ori, last_pos, last_vel, last_ori)| {
412 last_pos.replace(Last(pos));
413 vel.and_then(|&v| last_vel.replace(Last(v)));
414 ori.and_then(|&o| last_ori.replace(Last(o)));
415 });
416
417 for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
420 for client in (presences.maybe(), &subscriptions, &clients)
421 .join()
422 .filter_map(|(presence, subscription, client)| {
423 if presence.is_some() && subscription.regions.contains(®ion_key) {
424 Some(client)
425 } else {
426 None
427 }
428 })
429 {
430 for uid in &deleted {
431 client.send_fallible(ServerGeneral::DeleteEntity(*uid));
432 }
433 }
434 }
435
436 let mut entities_to_remove_buf = Vec::new();
437
438 for (entity, buf, inventory, client) in (
440 &entities,
441 &mut inventory_update_buffers,
442 inventories.maybe(),
443 clients.maybe(),
444 )
445 .join()
446 {
447 let Some(inventory) = inventory else {
448 dev_panic!(format!(
449 "Entity without Inventory has InventoryUpdateBuffer component. This is a bug. \
450 entity={:?}",
451 entity
452 ));
453 entities_to_remove_buf.push(entity);
454 continue;
455 };
456 let Some(client) = client else {
457 dev_panic!(format!(
458 "Entity without Client has InventoryUpdateBuffer component. This is a bug. \
459 entity={:?}",
460 entity
461 ));
462 entities_to_remove_buf.push(entity);
463 continue;
464 };
465
466 let events = buf.take_events();
467 if !events.is_empty() {
468 client.send_fallible(ServerGeneral::InventoryUpdate(inventory.clone(), events));
469 }
470 }
471
472 for entity in entities_to_remove_buf {
475 inventory_update_buffers.remove(entity);
476 }
477
478 let outcomes = outcomes.recv_all().collect::<Vec<_>>();
480
481 for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
483 let is_near = |o_pos: Vec3<f32>| {
484 pos.zip_with(presence, |pos, presence| {
485 pos.0.xy().distance_squared(o_pos.xy())
486 < (presence.entity_view_distance.current() as f32
487 * TerrainChunkSize::RECT_SIZE.x as f32)
488 .powi(2)
489 })
490 };
491
492 let outcomes = outcomes
493 .iter()
494 .filter(|o| o.get_pos().and_then(is_near).unwrap_or(true))
495 .cloned()
496 .collect::<Vec<_>>();
497
498 if !outcomes.is_empty() {
499 client.send_fallible(ServerGeneral::Outcomes(outcomes));
500 }
501 }
502
503 for force_update in (&mut force_updates).join() {
505 force_update.clear();
506 }
507
508 const TOD_SYNC_FREQ: u64 = 100;
512 if tick % TOD_SYNC_FREQ == 0 {
513 let mut tod_lazymsg = None;
514 for client in (&clients).join() {
515 let msg = tod_lazymsg.unwrap_or_else(|| {
516 client.prepare(ServerGeneral::TimeOfDay(
517 *time_of_day,
518 (*calendar).clone(),
519 *time,
520 *time_scale,
521 ))
522 });
523 let _ = client.send_prepared(&msg);
526 tod_lazymsg = Some(msg);
527 }
528 }
529 }
530}
531
532fn should_sync_client_physics(
535 entity: specs::Entity,
536 player_physics_settings: &PlayerPhysicsSettings,
537 players: &ReadStorage<'_, Player>,
538 force_updates: &WriteStorage<'_, ForceUpdate>,
539 is_rider: &ReadStorage<'_, Is<Rider>>,
540 editable_settings: &EditableSettings,
541) -> bool {
542 let server_authoritative_physics = players.get(entity).is_none_or(|player| {
543 player_physics_settings
544 .settings
545 .get(&player.uuid())
546 .is_some_and(|settings| settings.server_authoritative_physics_optin())
547 || editable_settings
548 .server_physics_force_list
549 .contains_key(&player.uuid())
550 });
551 force_updates.get(entity).is_some_and(|f| f.is_forced())
555 || server_authoritative_physics
556 || is_rider.contains(entity)
557}
558
559fn add_physics_components(
565 send_now: bool,
566 comp_sync_package: &mut CompSyncPackage<common_net::msg::EcsCompPacket>,
567 uid: Uid,
568 pos: Pos,
569 last_pos: Option<u32>,
570 ori: Option<(&Ori, Option<u32>)>,
571 vel: Option<(&Vel, Option<u32>)>,
572) {
573 if last_pos.is_none() {
574 comp_sync_package.comp_inserted(uid, pos);
575 } else if send_now {
576 comp_sync_package.comp_modified(uid, pos);
577 }
578
579 if let Some((v, last_vel)) = vel {
580 if last_vel.is_none() {
581 comp_sync_package.comp_inserted(uid, *v);
582 } else if send_now {
583 comp_sync_package.comp_modified(uid, *v);
584 }
585 }
586
587 if let Some((o, last_ori)) = ori {
588 if last_ori.is_none() {
589 comp_sync_package.comp_inserted(uid, *o);
590 } else if send_now {
591 comp_sync_package.comp_modified(uid, *o);
592 }
593 }
594}