veloren_server/sys/
entity_sync.rs

1use super::sentinel::{DeletedEntities, TrackedStorages, UpdateTrackers};
2use crate::{EditableSettings, Tick, client::Client, presence::RegionSubscription};
3use common::{
4    calendar::Calendar,
5    comp::{
6        Collider, ForceUpdate, InventoryUpdateBuffer, Last, Ori, Player, Pos, Presence, Vel,
7        presence::SpectatingEntity,
8    },
9    event::EventBus,
10    link::Is,
11    mounting::Rider,
12    outcome::Outcome,
13    region::{RegionEvent, RegionMap},
14    resources::{PlayerPhysicsSettings, Time, TimeOfDay, TimeScale},
15    terrain::TerrainChunkSize,
16    uid::Uid,
17    vol::RectVolSize,
18};
19use common_base::dev_panic;
20use common_ecs::{Job, Origin, Phase, System};
21use common_net::{msg::ServerGeneral, sync::CompSyncPackage};
22use itertools::Either;
23use specs::{Entities, Join, LendJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
24use vek::*;
25
26/// This system will send physics updates to the client
27#[derive(Default)]
28pub struct Sys;
29
30impl<'a> System<'a> for Sys {
31    type SystemData = (
32        Entities<'a>,
33        Read<'a, Tick>,
34        Read<'a, PlayerPhysicsSettings>,
35        TrackedStorages<'a>,
36        ReadExpect<'a, TimeOfDay>,
37        ReadExpect<'a, Time>,
38        ReadExpect<'a, Calendar>,
39        ReadExpect<'a, TimeScale>,
40        ReadExpect<'a, RegionMap>,
41        ReadExpect<'a, UpdateTrackers>,
42        Write<'a, DeletedEntities>,
43        Read<'a, EventBus<Outcome>>,
44        ReadExpect<'a, EditableSettings>,
45        (
46            ReadStorage<'a, Pos>,
47            ReadStorage<'a, Vel>,
48            ReadStorage<'a, Ori>,
49            ReadStorage<'a, RegionSubscription>,
50            ReadStorage<'a, Player>,
51            ReadStorage<'a, Presence>,
52            ReadStorage<'a, SpectatingEntity>,
53            ReadStorage<'a, Client>,
54            WriteStorage<'a, Last<Pos>>,
55            WriteStorage<'a, Last<Vel>>,
56            WriteStorage<'a, Last<Ori>>,
57            WriteStorage<'a, ForceUpdate>,
58            WriteStorage<'a, InventoryUpdateBuffer>,
59        ),
60    );
61
62    const NAME: &'static str = "entity_sync";
63    const ORIGIN: Origin = Origin::Server;
64    const PHASE: Phase = Phase::Create;
65
66    fn run(
67        job: &mut Job<Self>,
68        (
69            entities,
70            tick,
71            player_physics_settings,
72            tracked_storages,
73            time_of_day,
74            time,
75            calendar,
76            time_scale,
77            region_map,
78            trackers,
79            mut deleted_entities,
80            outcomes,
81            editable_settings,
82            (
83                positions,
84                velocities,
85                orientations,
86                subscriptions,
87                players,
88                presences,
89                spectating_entities,
90                clients,
91                mut last_pos,
92                mut last_vel,
93                mut last_ori,
94                mut force_updates,
95                mut inventory_update_buffers,
96            ),
97        ): Self::SystemData,
98    ) {
99        let tick = tick.0;
100
101        // Storages already provided in `TrackedStorages` that we need to use
102        // for other things besides change detection.
103        let uids = &tracked_storages.uid;
104        let colliders = &tracked_storages.collider;
105        let inventories = &tracked_storages.inventory;
106        let is_rider = &tracked_storages.is_rider;
107
108        // To send entity updates
109        // 1. Iterate through regions
110        // 2. Iterate through region subscribers (ie clients)
111        //     - Collect a list of entity ids for clients who are subscribed to this
112        //       region (hash calc to check each)
113        // 3. Iterate through events from that region
114        //     - For each entity entered event, iterate through the client list and
115        //       check if they are subscribed to the source (hash calc per subscribed
116        //       client per entity event), if not subscribed to the source send a entity
117        //       creation message to that client
118        //     - For each entity left event, iterate through the client list and check
119        //       if they are subscribed to the destination (hash calc per subscribed
120        //       client per entity event)
121        // 4. Iterate through entities in that region
122        // 5. Inform clients of the component changes for that entity
123        //     - Throttle update rate base on distance to each client
124
125        // Sync physics and other components
126        // via iterating through regions (in parallel)
127
128        // Pre-collect regions paired with deleted entity list so we can iterate over
129        // them in parallel below
130        let regions_and_deleted_entities = region_map
131            .iter()
132            .map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
133            .collect::<Vec<_>>();
134
135        use rayon::iter::{IntoParallelIterator, ParallelIterator};
136        job.cpu_stats.measure(common_ecs::ParMode::Rayon);
137        common_base::prof_span!(guard, "regions");
138        regions_and_deleted_entities.into_par_iter().for_each_init(
139            || {
140                common_base::prof_span!(guard, "entity sync rayon job");
141                guard
142            },
143            |_guard, (key, region, deleted_entities_in_region)| {
144                // Assemble subscriber list for this region by iterating through clients and
145                // checking if they are subscribed to this region
146                let mut subscribers = (
147                    &clients,
148                    &entities,
149                    presences.maybe(),
150                    &subscriptions,
151                    &positions,
152                )
153                    .join()
154                    .filter_map(|(client, entity, presence, subscription, pos)| {
155                        if presence.is_some() && subscription.regions.contains(&key) {
156                            Some((client, &subscription.regions, entity, *pos))
157                        } else {
158                            None
159                        }
160                    })
161                    .collect::<Vec<_>>();
162
163                for event in region.events() {
164                    match event {
165                        RegionEvent::Entered(id, maybe_key) => {
166                            // Don't process newly created entities here (redundant network
167                            // messages)
168                            if trackers.uid.inserted().contains(*id) {
169                                continue;
170                            }
171                            let entity = entities.entity(*id);
172                            if let Some(pkg) = positions
173                                .get(entity)
174                                .map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
175                                .and_then(|(pos, vel, ori)| {
176                                    tracked_storages.create_entity_package(
177                                        entity,
178                                        Some(*pos),
179                                        vel.copied(),
180                                        ori.copied(),
181                                    )
182                                })
183                            {
184                                let create_msg = ServerGeneral::CreateEntity(pkg);
185                                for (client, regions, client_entity, _) in &mut subscribers {
186                                    if maybe_key
187                                    .as_ref()
188                                    .map(|key| !regions.contains(key))
189                                    .unwrap_or(true)
190                                    // Client doesn't need to know about itself
191                                    && *client_entity != entity
192                                    {
193                                        client.send_fallible(create_msg.clone());
194                                    }
195                                }
196                            }
197                        },
198                        RegionEvent::Left(id, maybe_key) => {
199                            // Lookup UID for entity
200                            if let Some(&uid) = uids.get(entities.entity(*id)) {
201                                for (client, regions, _, _) in &mut subscribers {
202                                    if maybe_key
203                                        .as_ref()
204                                        .map(|key| !regions.contains(key))
205                                        .unwrap_or(true)
206                                    {
207                                        // TODO: I suspect it would be more efficient (in terms of
208                                        // bandwidth) to batch messages like this (same in
209                                        // subscription.rs).
210                                        client.send_fallible(ServerGeneral::DeleteEntity(uid));
211                                    }
212                                }
213                            }
214                        },
215                    }
216                }
217
218                // Sync tracked components
219                // Get deleted entities in this region from DeletedEntities
220                let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
221                    &tracked_storages,
222                    region.entities(),
223                    deleted_entities_in_region,
224                );
225                // We lazily initialize the the synchronization messages in case there are no
226                // clients.
227                let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
228                for (client, _, client_entity, _) in &mut subscribers {
229                    let msg = entity_comp_sync.right_or_else(
230                        |(entity_sync_package, comp_sync_package)| {
231                            (
232                                client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
233                                client.prepare(ServerGeneral::CompSync(
234                                    comp_sync_package,
235                                    force_updates.get(*client_entity).map_or(0, |f| f.counter()),
236                                )),
237                            )
238                        },
239                    );
240                    // We don't care much about stream errors here since they could just represent
241                    // network disconnection, which is handled elsewhere.
242                    let _ = client.send_prepared(&msg.0);
243                    let _ = client.send_prepared(&msg.1);
244                    entity_comp_sync = Either::Right(msg);
245                }
246
247                for (client, _, client_entity, client_pos) in &mut subscribers {
248                    let mut comp_sync_package = CompSyncPackage::new();
249
250                    for (_, entity, &uid, (&pos, last_pos), vel, ori, collider) in (
251                        region.entities(),
252                        &entities,
253                        uids,
254                        (&positions, last_pos.mask().maybe()),
255                        (&velocities, last_vel.mask().maybe()).maybe(),
256                        (&orientations, last_vel.mask().maybe()).maybe(),
257                        colliders.maybe(),
258                    )
259                        .join()
260                    {
261                        // Decide how regularly to send physics updates.
262                        let send_now = if client_entity == &entity {
263                            should_sync_client_physics(
264                                entity,
265                                &player_physics_settings,
266                                &players,
267                                &force_updates,
268                                is_rider,
269                                &editable_settings,
270                            )
271                        } else if matches!(collider, Some(Collider::Voxel { .. })) {
272                            // Things with a voxel collider (airships, etc.) need to have very
273                            // stable physics so we always send updated
274                            // for these where we can.
275                            true
276                        } else {
277                            // Throttle update rates for all other entities based on distance to
278                            // client
279                            let distance_sq = client_pos.0.distance_squared(pos.0);
280                            let id_staggered_tick = tick + entity.id() as u64;
281
282                            // More entities farther away so checks start there
283                            if distance_sq > 500.0f32.powi(2) {
284                                id_staggered_tick.is_multiple_of(32)
285                            } else if distance_sq > 300.0f32.powi(2) {
286                                id_staggered_tick.is_multiple_of(16)
287                            } else if distance_sq > 200.0f32.powi(2) {
288                                id_staggered_tick.is_multiple_of(8)
289                            } else if distance_sq > 120.0f32.powi(2) {
290                                id_staggered_tick.is_multiple_of(6)
291                            } else if distance_sq > 64.0f32.powi(2) {
292                                id_staggered_tick.is_multiple_of(3)
293                            } else if distance_sq > 24.0f32.powi(2) {
294                                id_staggered_tick.is_multiple_of(2)
295                            } else {
296                                true
297                            }
298                        };
299
300                        add_physics_components(
301                            send_now,
302                            &mut comp_sync_package,
303                            uid,
304                            pos,
305                            last_pos,
306                            ori,
307                            vel,
308                        );
309                    }
310
311                    // TODO: force update counter only needs to be sent once per frame (and only if
312                    // it changed, although it might not be worth having a separate message for
313                    // optionally sending it since individual messages may have a bandwidth
314                    // overhead), however, here we send it potentially 2 times per subscribed
315                    // region by including it in the `CompSync` message.
316                    client.send_fallible(ServerGeneral::CompSync(
317                        comp_sync_package,
318                        force_updates.get(*client_entity).map_or(0, |f| f.counter()),
319                    ));
320                }
321            },
322        );
323        drop(guard);
324        job.cpu_stats.measure(common_ecs::ParMode::Single);
325
326        // Sync components that are only synced for the client's own entity.
327        for (entity, client, &uid, (maybe_pos, last_pos), vel, ori) in (
328            &entities,
329            &clients,
330            uids,
331            (positions.maybe(), last_pos.mask().maybe()),
332            (&velocities, last_vel.mask().maybe()).maybe(),
333            (&orientations, last_vel.mask().maybe()).maybe(),
334        )
335            .join()
336        {
337            // Include additional components for clients that aren't in a region (e.g. due
338            // to having no position or have sync_me as `false`) since those
339            // won't be synced above.
340            let include_all_comps = region_map.in_region_map(entity);
341
342            let mut comp_sync_package = trackers.create_sync_from_client_package(
343                &tracked_storages,
344                entity,
345                include_all_comps,
346            );
347
348            if include_all_comps && let Some(&pos) = maybe_pos {
349                let send_now = should_sync_client_physics(
350                    entity,
351                    &player_physics_settings,
352                    &players,
353                    &force_updates,
354                    is_rider,
355                    &editable_settings,
356                );
357                add_physics_components(
358                    send_now,
359                    &mut comp_sync_package,
360                    uid,
361                    pos,
362                    last_pos,
363                    ori,
364                    vel,
365                );
366            }
367
368            if !comp_sync_package.is_empty() {
369                client.send_fallible(ServerGeneral::CompSync(
370                    comp_sync_package,
371                    force_updates.get(entity).map_or(0, |f| f.counter()),
372                ));
373            }
374        }
375
376        for (entity, client, spectating_entity) in
377            (&entities, &clients, &spectating_entities).join()
378        {
379            // TODO: If the spectated entity is out of range while a change occurs it will
380            // cause the client to log errors, and those changes will be missed
381            // by the spectating entity.
382            //
383            // Additionally, when we stop spectating we don't delete the components that are
384            // synced for spectators. Leaving stale components on the client.
385            let comp_sync_package = trackers.create_sync_from_spectated_entity_package(
386                &tracked_storages,
387                entity,
388                spectating_entity.0,
389            );
390
391            if !comp_sync_package.is_empty() {
392                client.send_fallible(ServerGeneral::CompSync(
393                    comp_sync_package,
394                    force_updates.get(entity).map_or(0, |f| f.counter()),
395                ));
396            }
397        }
398
399        // Update the last physics components for each entity
400
401        (
402            &entities,
403            &positions,
404            velocities.maybe(),
405            orientations.maybe(),
406            last_pos.entries(),
407            last_vel.entries(),
408            last_ori.entries(),
409        )
410            .lend_join()
411            .for_each(|(_, &pos, vel, ori, last_pos, last_vel, last_ori)| {
412                last_pos.replace(Last(pos));
413                vel.and_then(|&v| last_vel.replace(Last(v)));
414                ori.and_then(|&o| last_ori.replace(Last(o)));
415            });
416
417        // Handle entity deletion in regions that don't exist in RegionMap
418        // (theoretically none)
419        for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
420            for client in (presences.maybe(), &subscriptions, &clients)
421                .join()
422                .filter_map(|(presence, subscription, client)| {
423                    if presence.is_some() && subscription.regions.contains(&region_key) {
424                        Some(client)
425                    } else {
426                        None
427                    }
428                })
429            {
430                for uid in &deleted {
431                    client.send_fallible(ServerGeneral::DeleteEntity(*uid));
432                }
433            }
434        }
435
436        let mut entities_to_remove_buf = Vec::new();
437
438        // Sync inventories
439        for (entity, buf, inventory, client) in (
440            &entities,
441            &mut inventory_update_buffers,
442            inventories.maybe(),
443            clients.maybe(),
444        )
445            .join()
446        {
447            let Some(inventory) = inventory else {
448                dev_panic!(format!(
449                    "Entity without Inventory has InventoryUpdateBuffer component. This is a bug. \
450                     entity={:?}",
451                    entity
452                ));
453                entities_to_remove_buf.push(entity);
454                continue;
455            };
456            let Some(client) = client else {
457                dev_panic!(format!(
458                    "Entity without Client has InventoryUpdateBuffer component. This is a bug. \
459                     entity={:?}",
460                    entity
461                ));
462                entities_to_remove_buf.push(entity);
463                continue;
464            };
465
466            let events = buf.take_events();
467            if !events.is_empty() {
468                client.send_fallible(ServerGeneral::InventoryUpdate(inventory.clone(), events));
469            }
470        }
471
472        // In optimized builds, remove InventoryUpdateBuffer component from entities
473        // that shouldn't have it (builds with debug assertions will panic)
474        for entity in entities_to_remove_buf {
475            inventory_update_buffers.remove(entity);
476        }
477
478        // Consume/clear the current outcomes and convert them to a vec
479        let outcomes = outcomes.recv_all().collect::<Vec<_>>();
480
481        // Sync outcomes
482        for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
483            let is_near = |o_pos: Vec3<f32>| {
484                pos.zip_with(presence, |pos, presence| {
485                    pos.0.xy().distance_squared(o_pos.xy())
486                        < (presence.entity_view_distance.current() as f32
487                            * TerrainChunkSize::RECT_SIZE.x as f32)
488                            .powi(2)
489                })
490            };
491
492            let outcomes = outcomes
493                .iter()
494                .filter(|o| o.get_pos().and_then(is_near).unwrap_or(true))
495                .cloned()
496                .collect::<Vec<_>>();
497
498            if !outcomes.is_empty() {
499                client.send_fallible(ServerGeneral::Outcomes(outcomes));
500            }
501        }
502
503        // Remove all force flags.
504        for force_update in (&mut force_updates).join() {
505            force_update.clear();
506        }
507
508        // Sync resources
509        // TODO: doesn't really belong in this system (rename system or create another
510        // system?)
511        const TOD_SYNC_FREQ: u64 = 100;
512        if tick % TOD_SYNC_FREQ == 0 {
513            let mut tod_lazymsg = None;
514            for client in (&clients).join() {
515                let msg = tod_lazymsg.unwrap_or_else(|| {
516                    client.prepare(ServerGeneral::TimeOfDay(
517                        *time_of_day,
518                        (*calendar).clone(),
519                        *time,
520                        *time_scale,
521                    ))
522                });
523                // We don't care much about stream errors here since they could just represent
524                // network disconnection, which is handled elsewhere.
525                let _ = client.send_prepared(&msg);
526                tod_lazymsg = Some(msg);
527            }
528        }
529    }
530}
531
532/// Determines whether a client should receive an update about its own physics
533/// components.
534fn should_sync_client_physics(
535    entity: specs::Entity,
536    player_physics_settings: &PlayerPhysicsSettings,
537    players: &ReadStorage<'_, Player>,
538    force_updates: &WriteStorage<'_, ForceUpdate>,
539    is_rider: &ReadStorage<'_, Is<Rider>>,
540    editable_settings: &EditableSettings,
541) -> bool {
542    let server_authoritative_physics = players.get(entity).is_none_or(|player| {
543        player_physics_settings
544            .settings
545            .get(&player.uuid())
546            .is_some_and(|settings| settings.server_authoritative_physics_optin())
547            || editable_settings
548                .server_physics_force_list
549                .contains_key(&player.uuid())
550    });
551    // Don't send client physics updates about itself unless force update is
552    // set or the client is subject to
553    // server-authoritative physics
554    force_updates.get(entity).is_some_and(|f| f.is_forced())
555        || server_authoritative_physics
556        || is_rider.contains(entity)
557}
558
559/// Adds physics components if `send_now` is true or `Option<Last<T>>` is
560/// `None`.
561///
562/// If `Last<T>` isn't present, this is recorded as an insertion rather than a
563/// modification.
564fn add_physics_components(
565    send_now: bool,
566    comp_sync_package: &mut CompSyncPackage<common_net::msg::EcsCompPacket>,
567    uid: Uid,
568    pos: Pos,
569    last_pos: Option<u32>,
570    ori: Option<(&Ori, Option<u32>)>,
571    vel: Option<(&Vel, Option<u32>)>,
572) {
573    if last_pos.is_none() {
574        comp_sync_package.comp_inserted(uid, pos);
575    } else if send_now {
576        comp_sync_package.comp_modified(uid, pos);
577    }
578
579    if let Some((v, last_vel)) = vel {
580        if last_vel.is_none() {
581            comp_sync_package.comp_inserted(uid, *v);
582        } else if send_now {
583            comp_sync_package.comp_modified(uid, *v);
584        }
585    }
586
587    if let Some((o, last_ori)) = ori {
588        if last_ori.is_none() {
589            comp_sync_package.comp_inserted(uid, *o);
590        } else if send_now {
591            comp_sync_package.comp_modified(uid, *o);
592        }
593    }
594}