veloren_server/sys/
entity_sync.rs

1use super::sentinel::{DeletedEntities, TrackedStorages, UpdateTrackers};
2use crate::{EditableSettings, Tick, client::Client, presence::RegionSubscription};
3use common::{
4    calendar::Calendar,
5    comp::{Collider, ForceUpdate, InventoryUpdate, Last, Ori, Player, Pos, Presence, Vel},
6    event::EventBus,
7    link::Is,
8    mounting::Rider,
9    outcome::Outcome,
10    region::{Event as RegionEvent, RegionMap},
11    resources::{PlayerPhysicsSettings, Time, TimeOfDay, TimeScale},
12    terrain::TerrainChunkSize,
13    uid::Uid,
14    vol::RectVolSize,
15};
16use common_ecs::{Job, Origin, Phase, System};
17use common_net::{msg::ServerGeneral, sync::CompSyncPackage};
18use itertools::Either;
19use specs::{Entities, Join, LendJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
20use vek::*;
21
22/// This system will send physics updates to the client
23#[derive(Default)]
24pub struct Sys;
25
26impl<'a> System<'a> for Sys {
27    type SystemData = (
28        Entities<'a>,
29        Read<'a, Tick>,
30        Read<'a, PlayerPhysicsSettings>,
31        TrackedStorages<'a>,
32        ReadExpect<'a, TimeOfDay>,
33        ReadExpect<'a, Time>,
34        ReadExpect<'a, Calendar>,
35        ReadExpect<'a, TimeScale>,
36        ReadExpect<'a, RegionMap>,
37        ReadExpect<'a, UpdateTrackers>,
38        ReadStorage<'a, Pos>,
39        ReadStorage<'a, Vel>,
40        ReadStorage<'a, Ori>,
41        ReadStorage<'a, RegionSubscription>,
42        ReadStorage<'a, Player>,
43        ReadStorage<'a, Presence>,
44        ReadStorage<'a, Client>,
45        WriteStorage<'a, Last<Pos>>,
46        WriteStorage<'a, Last<Vel>>,
47        WriteStorage<'a, Last<Ori>>,
48        WriteStorage<'a, ForceUpdate>,
49        WriteStorage<'a, InventoryUpdate>,
50        Write<'a, DeletedEntities>,
51        Read<'a, EventBus<Outcome>>,
52        ReadExpect<'a, EditableSettings>,
53    );
54
55    const NAME: &'static str = "entity_sync";
56    const ORIGIN: Origin = Origin::Server;
57    const PHASE: Phase = Phase::Create;
58
59    fn run(
60        job: &mut Job<Self>,
61        (
62            entities,
63            tick,
64            player_physics_settings,
65            tracked_storages,
66            time_of_day,
67            time,
68            calendar,
69            time_scale,
70            region_map,
71            trackers,
72            positions,
73            velocities,
74            orientations,
75            subscriptions,
76            players,
77            presences,
78            clients,
79            mut last_pos,
80            mut last_vel,
81            mut last_ori,
82            mut force_updates,
83            mut inventory_updates,
84            mut deleted_entities,
85            outcomes,
86            editable_settings,
87        ): Self::SystemData,
88    ) {
89        let tick = tick.0;
90
91        // Storages already provided in `TrackedStorages` that we need to use
92        // for other things besides change detection.
93        let uids = &tracked_storages.uid;
94        let colliders = &tracked_storages.collider;
95        let inventories = &tracked_storages.inventory;
96        let is_rider = &tracked_storages.is_rider;
97
98        // To send entity updates
99        // 1. Iterate through regions
100        // 2. Iterate through region subscribers (ie clients)
101        //     - Collect a list of entity ids for clients who are subscribed to this
102        //       region (hash calc to check each)
103        // 3. Iterate through events from that region
104        //     - For each entity entered event, iterate through the client list and
105        //       check if they are subscribed to the source (hash calc per subscribed
106        //       client per entity event), if not subscribed to the source send a entity
107        //       creation message to that client
108        //     - For each entity left event, iterate through the client list and check
109        //       if they are subscribed to the destination (hash calc per subscribed
110        //       client per entity event)
111        // 4. Iterate through entities in that region
112        // 5. Inform clients of the component changes for that entity
113        //     - Throttle update rate base on distance to each client
114
115        // Sync physics and other components
116        // via iterating through regions (in parallel)
117
118        // Pre-collect regions paired with deleted entity list so we can iterate over
119        // them in parallel below
120        let regions_and_deleted_entities = region_map
121            .iter()
122            .map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
123            .collect::<Vec<_>>();
124
125        use rayon::iter::{IntoParallelIterator, ParallelIterator};
126        job.cpu_stats.measure(common_ecs::ParMode::Rayon);
127        common_base::prof_span!(guard, "regions");
128        regions_and_deleted_entities.into_par_iter().for_each_init(
129            || {
130                common_base::prof_span!(guard, "entity sync rayon job");
131                guard
132            },
133            |_guard, (key, region, deleted_entities_in_region)| {
134                // Assemble subscriber list for this region by iterating through clients and
135                // checking if they are subscribed to this region
136                let mut subscribers = (
137                    &clients,
138                    &entities,
139                    presences.maybe(),
140                    &subscriptions,
141                    &positions,
142                )
143                    .join()
144                    .filter_map(|(client, entity, presence, subscription, pos)| {
145                        if presence.is_some() && subscription.regions.contains(&key) {
146                            Some((client, &subscription.regions, entity, *pos))
147                        } else {
148                            None
149                        }
150                    })
151                    .collect::<Vec<_>>();
152
153                for event in region.events() {
154                    match event {
155                        RegionEvent::Entered(id, maybe_key) => {
156                            // Don't process newly created entities here (redundant network
157                            // messages)
158                            if trackers.uid.inserted().contains(*id) {
159                                continue;
160                            }
161                            let entity = entities.entity(*id);
162                            if let Some(pkg) = positions
163                                .get(entity)
164                                .map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
165                                .and_then(|(pos, vel, ori)| {
166                                    tracked_storages.create_entity_package(
167                                        entity,
168                                        Some(*pos),
169                                        vel.copied(),
170                                        ori.copied(),
171                                    )
172                                })
173                            {
174                                let create_msg = ServerGeneral::CreateEntity(pkg);
175                                for (client, regions, client_entity, _) in &mut subscribers {
176                                    if maybe_key
177                                    .as_ref()
178                                    .map(|key| !regions.contains(key))
179                                    .unwrap_or(true)
180                                    // Client doesn't need to know about itself
181                                    && *client_entity != entity
182                                    {
183                                        client.send_fallible(create_msg.clone());
184                                    }
185                                }
186                            }
187                        },
188                        RegionEvent::Left(id, maybe_key) => {
189                            // Lookup UID for entity
190                            if let Some(&uid) = uids.get(entities.entity(*id)) {
191                                for (client, regions, _, _) in &mut subscribers {
192                                    if maybe_key
193                                        .as_ref()
194                                        .map(|key| !regions.contains(key))
195                                        .unwrap_or(true)
196                                    {
197                                        // TODO: I suspect it would be more efficient (in terms of
198                                        // bandwidth) to batch messages like this (same in
199                                        // subscription.rs).
200                                        client.send_fallible(ServerGeneral::DeleteEntity(uid));
201                                    }
202                                }
203                            }
204                        },
205                    }
206                }
207
208                // Sync tracked components
209                // Get deleted entities in this region from DeletedEntities
210                let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
211                    &tracked_storages,
212                    region.entities(),
213                    deleted_entities_in_region,
214                );
215                // We lazily initialize the the synchronization messages in case there are no
216                // clients.
217                let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
218                for (client, _, client_entity, _) in &mut subscribers {
219                    let msg = entity_comp_sync.right_or_else(
220                        |(entity_sync_package, comp_sync_package)| {
221                            (
222                                client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
223                                client.prepare(ServerGeneral::CompSync(
224                                    comp_sync_package,
225                                    force_updates.get(*client_entity).map_or(0, |f| f.counter()),
226                                )),
227                            )
228                        },
229                    );
230                    // We don't care much about stream errors here since they could just represent
231                    // network disconnection, which is handled elsewhere.
232                    let _ = client.send_prepared(&msg.0);
233                    let _ = client.send_prepared(&msg.1);
234                    entity_comp_sync = Either::Right(msg);
235                }
236
237                for (client, _, client_entity, client_pos) in &mut subscribers {
238                    let mut comp_sync_package = CompSyncPackage::new();
239
240                    for (_, entity, &uid, (&pos, last_pos), vel, ori, collider) in (
241                        region.entities(),
242                        &entities,
243                        uids,
244                        (&positions, last_pos.mask().maybe()),
245                        (&velocities, last_vel.mask().maybe()).maybe(),
246                        (&orientations, last_vel.mask().maybe()).maybe(),
247                        colliders.maybe(),
248                    )
249                        .join()
250                    {
251                        // Decide how regularly to send physics updates.
252                        let send_now = if client_entity == &entity {
253                            should_sync_client_physics(
254                                entity,
255                                &player_physics_settings,
256                                &players,
257                                &force_updates,
258                                is_rider,
259                                &editable_settings,
260                            )
261                        } else if matches!(collider, Some(Collider::Voxel { .. })) {
262                            // Things with a voxel collider (airships, etc.) need to have very
263                            // stable physics so we always send updated
264                            // for these where we can.
265                            true
266                        } else {
267                            // Throttle update rates for all other entities based on distance to
268                            // client
269                            let distance_sq = client_pos.0.distance_squared(pos.0);
270                            let id_staggered_tick = tick + entity.id() as u64;
271
272                            // More entities farther away so checks start there
273                            if distance_sq > 500.0f32.powi(2) {
274                                id_staggered_tick % 32 == 0
275                            } else if distance_sq > 300.0f32.powi(2) {
276                                id_staggered_tick % 16 == 0
277                            } else if distance_sq > 200.0f32.powi(2) {
278                                id_staggered_tick % 8 == 0
279                            } else if distance_sq > 120.0f32.powi(2) {
280                                id_staggered_tick % 6 == 0
281                            } else if distance_sq > 64.0f32.powi(2) {
282                                id_staggered_tick % 3 == 0
283                            } else if distance_sq > 24.0f32.powi(2) {
284                                id_staggered_tick % 2 == 0
285                            } else {
286                                true
287                            }
288                        };
289
290                        add_physics_components(
291                            send_now,
292                            &mut comp_sync_package,
293                            uid,
294                            pos,
295                            last_pos,
296                            ori,
297                            vel,
298                        );
299                    }
300
301                    // TODO: force update counter only needs to be sent once per frame (and only if
302                    // it changed, although it might not be worth having a separate message for
303                    // optionally sending it since individual messages may have a bandwidth
304                    // overhead), however, here we send it potentially 2 times per subscribed
305                    // region by including it in the `CompSync` message.
306                    client.send_fallible(ServerGeneral::CompSync(
307                        comp_sync_package,
308                        force_updates.get(*client_entity).map_or(0, |f| f.counter()),
309                    ));
310                }
311            },
312        );
313        drop(guard);
314        job.cpu_stats.measure(common_ecs::ParMode::Single);
315
316        // Sync components that are only synced for the client's own entity.
317        for (entity, client, &uid, (maybe_pos, last_pos), vel, ori) in (
318            &entities,
319            &clients,
320            uids,
321            (positions.maybe(), last_pos.mask().maybe()),
322            (&velocities, last_vel.mask().maybe()).maybe(),
323            (&orientations, last_vel.mask().maybe()).maybe(),
324        )
325            .join()
326        {
327            // Include additional components for clients that aren't in a region (e.g. due
328            // to having no position or have sync_me as `false`) since those
329            // won't be synced above.
330            let include_all_comps = region_map.in_region_map(entity);
331
332            let mut comp_sync_package = trackers.create_sync_from_client_package(
333                &tracked_storages,
334                entity,
335                include_all_comps,
336            );
337
338            if include_all_comps && let Some(&pos) = maybe_pos {
339                let send_now = should_sync_client_physics(
340                    entity,
341                    &player_physics_settings,
342                    &players,
343                    &force_updates,
344                    is_rider,
345                    &editable_settings,
346                );
347                add_physics_components(
348                    send_now,
349                    &mut comp_sync_package,
350                    uid,
351                    pos,
352                    last_pos,
353                    ori,
354                    vel,
355                );
356            }
357
358            if !comp_sync_package.is_empty() {
359                client.send_fallible(ServerGeneral::CompSync(
360                    comp_sync_package,
361                    force_updates.get(entity).map_or(0, |f| f.counter()),
362                ));
363            }
364        }
365
366        // Update the last physics components for each entity
367
368        (
369            &entities,
370            &positions,
371            velocities.maybe(),
372            orientations.maybe(),
373            last_pos.entries(),
374            last_vel.entries(),
375            last_ori.entries(),
376        )
377            .lend_join()
378            .for_each(|(_, &pos, vel, ori, last_pos, last_vel, last_ori)| {
379                last_pos.replace(Last(pos));
380                vel.and_then(|&v| last_vel.replace(Last(v)));
381                ori.and_then(|&o| last_ori.replace(Last(o)));
382            });
383
384        // Handle entity deletion in regions that don't exist in RegionMap
385        // (theoretically none)
386        for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
387            for client in (presences.maybe(), &subscriptions, &clients)
388                .join()
389                .filter_map(|(presence, subscription, client)| {
390                    if presence.is_some() && subscription.regions.contains(&region_key) {
391                        Some(client)
392                    } else {
393                        None
394                    }
395                })
396            {
397                for uid in &deleted {
398                    client.send_fallible(ServerGeneral::DeleteEntity(*uid));
399                }
400            }
401        }
402
403        // Sync inventories
404        for (inventory, update, client) in (inventories, &mut inventory_updates, &clients).join() {
405            client.send_fallible(ServerGeneral::InventoryUpdate(
406                inventory.clone(),
407                update.take_events(),
408            ));
409        }
410
411        // Consume/clear the current outcomes and convert them to a vec
412        let outcomes = outcomes.recv_all().collect::<Vec<_>>();
413
414        // Sync outcomes
415        for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
416            let is_near = |o_pos: Vec3<f32>| {
417                pos.zip_with(presence, |pos, presence| {
418                    pos.0.xy().distance_squared(o_pos.xy())
419                        < (presence.entity_view_distance.current() as f32
420                            * TerrainChunkSize::RECT_SIZE.x as f32)
421                            .powi(2)
422                })
423            };
424
425            let outcomes = outcomes
426                .iter()
427                .filter(|o| o.get_pos().and_then(is_near).unwrap_or(true))
428                .cloned()
429                .collect::<Vec<_>>();
430
431            if !outcomes.is_empty() {
432                client.send_fallible(ServerGeneral::Outcomes(outcomes));
433            }
434        }
435
436        // Remove all force flags.
437        for force_update in (&mut force_updates).join() {
438            force_update.clear();
439        }
440        inventory_updates.clear();
441
442        // Sync resources
443        // TODO: doesn't really belong in this system (rename system or create another
444        // system?)
445        const TOD_SYNC_FREQ: u64 = 100;
446        if tick % TOD_SYNC_FREQ == 0 {
447            let mut tod_lazymsg = None;
448            for client in (&clients).join() {
449                let msg = tod_lazymsg.unwrap_or_else(|| {
450                    client.prepare(ServerGeneral::TimeOfDay(
451                        *time_of_day,
452                        (*calendar).clone(),
453                        *time,
454                        *time_scale,
455                    ))
456                });
457                // We don't care much about stream errors here since they could just represent
458                // network disconnection, which is handled elsewhere.
459                let _ = client.send_prepared(&msg);
460                tod_lazymsg = Some(msg);
461            }
462        }
463    }
464}
465
466/// Determines whether a client should receive an update about its own physics
467/// components.
468fn should_sync_client_physics(
469    entity: specs::Entity,
470    player_physics_settings: &PlayerPhysicsSettings,
471    players: &ReadStorage<'_, Player>,
472    force_updates: &WriteStorage<'_, ForceUpdate>,
473    is_rider: &ReadStorage<'_, Is<Rider>>,
474    editable_settings: &EditableSettings,
475) -> bool {
476    let server_authoritative_physics = players.get(entity).is_none_or(|player| {
477        player_physics_settings
478            .settings
479            .get(&player.uuid())
480            .is_some_and(|settings| settings.server_authoritative_physics_optin())
481            || editable_settings
482                .server_physics_force_list
483                .contains_key(&player.uuid())
484    });
485    // Don't send client physics updates about itself unless force update is
486    // set or the client is subject to
487    // server-authoritative physics
488    force_updates.get(entity).is_some_and(|f| f.is_forced())
489        || server_authoritative_physics
490        || is_rider.contains(entity)
491}
492
493/// Adds physics components if `send_now` is true or `Option<Last<T>>` is
494/// `None`.
495///
496/// If `Last<T>` isn't present, this is recorded as an insertion rather than a
497/// modification.
498fn add_physics_components(
499    send_now: bool,
500    comp_sync_package: &mut CompSyncPackage<common_net::msg::EcsCompPacket>,
501    uid: Uid,
502    pos: Pos,
503    last_pos: Option<u32>,
504    ori: Option<(&Ori, Option<u32>)>,
505    vel: Option<(&Vel, Option<u32>)>,
506) {
507    if last_pos.is_none() {
508        comp_sync_package.comp_inserted(uid, pos);
509    } else if send_now {
510        comp_sync_package.comp_modified(uid, pos);
511    }
512
513    if let Some((v, last_vel)) = vel {
514        if last_vel.is_none() {
515            comp_sync_package.comp_inserted(uid, *v);
516        } else if send_now {
517            comp_sync_package.comp_modified(uid, *v);
518        }
519    }
520
521    if let Some((o, last_ori)) = ori {
522        if last_ori.is_none() {
523            comp_sync_package.comp_inserted(uid, *o);
524        } else if send_now {
525            comp_sync_package.comp_modified(uid, *o);
526        }
527    }
528}