veloren_server/sys/
entity_sync.rs

1use super::sentinel::{DeletedEntities, TrackedStorages, UpdateTrackers};
2use crate::{EditableSettings, Tick, client::Client, presence::RegionSubscription};
3use common::{
4    calendar::Calendar,
5    comp::{
6        Collider, ForceUpdate, InventoryUpdate, Last, Ori, Player, Pos, Presence, Vel,
7        presence::SpectatingEntity,
8    },
9    event::EventBus,
10    link::Is,
11    mounting::Rider,
12    outcome::Outcome,
13    region::{RegionEvent, RegionMap},
14    resources::{PlayerPhysicsSettings, Time, TimeOfDay, TimeScale},
15    terrain::TerrainChunkSize,
16    uid::Uid,
17    vol::RectVolSize,
18};
19use common_ecs::{Job, Origin, Phase, System};
20use common_net::{msg::ServerGeneral, sync::CompSyncPackage};
21use itertools::Either;
22use specs::{Entities, Join, LendJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
23use vek::*;
24
25/// This system will send physics updates to the client
26#[derive(Default)]
27pub struct Sys;
28
29impl<'a> System<'a> for Sys {
30    type SystemData = (
31        Entities<'a>,
32        Read<'a, Tick>,
33        Read<'a, PlayerPhysicsSettings>,
34        TrackedStorages<'a>,
35        ReadExpect<'a, TimeOfDay>,
36        ReadExpect<'a, Time>,
37        ReadExpect<'a, Calendar>,
38        ReadExpect<'a, TimeScale>,
39        ReadExpect<'a, RegionMap>,
40        ReadExpect<'a, UpdateTrackers>,
41        Write<'a, DeletedEntities>,
42        Read<'a, EventBus<Outcome>>,
43        ReadExpect<'a, EditableSettings>,
44        (
45            ReadStorage<'a, Pos>,
46            ReadStorage<'a, Vel>,
47            ReadStorage<'a, Ori>,
48            ReadStorage<'a, RegionSubscription>,
49            ReadStorage<'a, Player>,
50            ReadStorage<'a, Presence>,
51            ReadStorage<'a, SpectatingEntity>,
52            ReadStorage<'a, Client>,
53            WriteStorage<'a, Last<Pos>>,
54            WriteStorage<'a, Last<Vel>>,
55            WriteStorage<'a, Last<Ori>>,
56            WriteStorage<'a, ForceUpdate>,
57            WriteStorage<'a, InventoryUpdate>,
58        ),
59    );
60
61    const NAME: &'static str = "entity_sync";
62    const ORIGIN: Origin = Origin::Server;
63    const PHASE: Phase = Phase::Create;
64
65    fn run(
66        job: &mut Job<Self>,
67        (
68            entities,
69            tick,
70            player_physics_settings,
71            tracked_storages,
72            time_of_day,
73            time,
74            calendar,
75            time_scale,
76            region_map,
77            trackers,
78            mut deleted_entities,
79            outcomes,
80            editable_settings,
81            (
82                positions,
83                velocities,
84                orientations,
85                subscriptions,
86                players,
87                presences,
88                spectating_entities,
89                clients,
90                mut last_pos,
91                mut last_vel,
92                mut last_ori,
93                mut force_updates,
94                mut inventory_updates,
95            ),
96        ): Self::SystemData,
97    ) {
98        let tick = tick.0;
99
100        // Storages already provided in `TrackedStorages` that we need to use
101        // for other things besides change detection.
102        let uids = &tracked_storages.uid;
103        let colliders = &tracked_storages.collider;
104        let inventories = &tracked_storages.inventory;
105        let is_rider = &tracked_storages.is_rider;
106
107        // To send entity updates
108        // 1. Iterate through regions
109        // 2. Iterate through region subscribers (ie clients)
110        //     - Collect a list of entity ids for clients who are subscribed to this
111        //       region (hash calc to check each)
112        // 3. Iterate through events from that region
113        //     - For each entity entered event, iterate through the client list and
114        //       check if they are subscribed to the source (hash calc per subscribed
115        //       client per entity event), if not subscribed to the source send a entity
116        //       creation message to that client
117        //     - For each entity left event, iterate through the client list and check
118        //       if they are subscribed to the destination (hash calc per subscribed
119        //       client per entity event)
120        // 4. Iterate through entities in that region
121        // 5. Inform clients of the component changes for that entity
122        //     - Throttle update rate base on distance to each client
123
124        // Sync physics and other components
125        // via iterating through regions (in parallel)
126
127        // Pre-collect regions paired with deleted entity list so we can iterate over
128        // them in parallel below
129        let regions_and_deleted_entities = region_map
130            .iter()
131            .map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
132            .collect::<Vec<_>>();
133
134        use rayon::iter::{IntoParallelIterator, ParallelIterator};
135        job.cpu_stats.measure(common_ecs::ParMode::Rayon);
136        common_base::prof_span!(guard, "regions");
137        regions_and_deleted_entities.into_par_iter().for_each_init(
138            || {
139                common_base::prof_span!(guard, "entity sync rayon job");
140                guard
141            },
142            |_guard, (key, region, deleted_entities_in_region)| {
143                // Assemble subscriber list for this region by iterating through clients and
144                // checking if they are subscribed to this region
145                let mut subscribers = (
146                    &clients,
147                    &entities,
148                    presences.maybe(),
149                    &subscriptions,
150                    &positions,
151                )
152                    .join()
153                    .filter_map(|(client, entity, presence, subscription, pos)| {
154                        if presence.is_some() && subscription.regions.contains(&key) {
155                            Some((client, &subscription.regions, entity, *pos))
156                        } else {
157                            None
158                        }
159                    })
160                    .collect::<Vec<_>>();
161
162                for event in region.events() {
163                    match event {
164                        RegionEvent::Entered(id, maybe_key) => {
165                            // Don't process newly created entities here (redundant network
166                            // messages)
167                            if trackers.uid.inserted().contains(*id) {
168                                continue;
169                            }
170                            let entity = entities.entity(*id);
171                            if let Some(pkg) = positions
172                                .get(entity)
173                                .map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
174                                .and_then(|(pos, vel, ori)| {
175                                    tracked_storages.create_entity_package(
176                                        entity,
177                                        Some(*pos),
178                                        vel.copied(),
179                                        ori.copied(),
180                                    )
181                                })
182                            {
183                                let create_msg = ServerGeneral::CreateEntity(pkg);
184                                for (client, regions, client_entity, _) in &mut subscribers {
185                                    if maybe_key
186                                    .as_ref()
187                                    .map(|key| !regions.contains(key))
188                                    .unwrap_or(true)
189                                    // Client doesn't need to know about itself
190                                    && *client_entity != entity
191                                    {
192                                        client.send_fallible(create_msg.clone());
193                                    }
194                                }
195                            }
196                        },
197                        RegionEvent::Left(id, maybe_key) => {
198                            // Lookup UID for entity
199                            if let Some(&uid) = uids.get(entities.entity(*id)) {
200                                for (client, regions, _, _) in &mut subscribers {
201                                    if maybe_key
202                                        .as_ref()
203                                        .map(|key| !regions.contains(key))
204                                        .unwrap_or(true)
205                                    {
206                                        // TODO: I suspect it would be more efficient (in terms of
207                                        // bandwidth) to batch messages like this (same in
208                                        // subscription.rs).
209                                        client.send_fallible(ServerGeneral::DeleteEntity(uid));
210                                    }
211                                }
212                            }
213                        },
214                    }
215                }
216
217                // Sync tracked components
218                // Get deleted entities in this region from DeletedEntities
219                let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
220                    &tracked_storages,
221                    region.entities(),
222                    deleted_entities_in_region,
223                );
224                // We lazily initialize the the synchronization messages in case there are no
225                // clients.
226                let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
227                for (client, _, client_entity, _) in &mut subscribers {
228                    let msg = entity_comp_sync.right_or_else(
229                        |(entity_sync_package, comp_sync_package)| {
230                            (
231                                client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
232                                client.prepare(ServerGeneral::CompSync(
233                                    comp_sync_package,
234                                    force_updates.get(*client_entity).map_or(0, |f| f.counter()),
235                                )),
236                            )
237                        },
238                    );
239                    // We don't care much about stream errors here since they could just represent
240                    // network disconnection, which is handled elsewhere.
241                    let _ = client.send_prepared(&msg.0);
242                    let _ = client.send_prepared(&msg.1);
243                    entity_comp_sync = Either::Right(msg);
244                }
245
246                for (client, _, client_entity, client_pos) in &mut subscribers {
247                    let mut comp_sync_package = CompSyncPackage::new();
248
249                    for (_, entity, &uid, (&pos, last_pos), vel, ori, collider) in (
250                        region.entities(),
251                        &entities,
252                        uids,
253                        (&positions, last_pos.mask().maybe()),
254                        (&velocities, last_vel.mask().maybe()).maybe(),
255                        (&orientations, last_vel.mask().maybe()).maybe(),
256                        colliders.maybe(),
257                    )
258                        .join()
259                    {
260                        // Decide how regularly to send physics updates.
261                        let send_now = if client_entity == &entity {
262                            should_sync_client_physics(
263                                entity,
264                                &player_physics_settings,
265                                &players,
266                                &force_updates,
267                                is_rider,
268                                &editable_settings,
269                            )
270                        } else if matches!(collider, Some(Collider::Voxel { .. })) {
271                            // Things with a voxel collider (airships, etc.) need to have very
272                            // stable physics so we always send updated
273                            // for these where we can.
274                            true
275                        } else {
276                            // Throttle update rates for all other entities based on distance to
277                            // client
278                            let distance_sq = client_pos.0.distance_squared(pos.0);
279                            let id_staggered_tick = tick + entity.id() as u64;
280
281                            // More entities farther away so checks start there
282                            if distance_sq > 500.0f32.powi(2) {
283                                id_staggered_tick % 32 == 0
284                            } else if distance_sq > 300.0f32.powi(2) {
285                                id_staggered_tick % 16 == 0
286                            } else if distance_sq > 200.0f32.powi(2) {
287                                id_staggered_tick % 8 == 0
288                            } else if distance_sq > 120.0f32.powi(2) {
289                                id_staggered_tick % 6 == 0
290                            } else if distance_sq > 64.0f32.powi(2) {
291                                id_staggered_tick % 3 == 0
292                            } else if distance_sq > 24.0f32.powi(2) {
293                                id_staggered_tick % 2 == 0
294                            } else {
295                                true
296                            }
297                        };
298
299                        add_physics_components(
300                            send_now,
301                            &mut comp_sync_package,
302                            uid,
303                            pos,
304                            last_pos,
305                            ori,
306                            vel,
307                        );
308                    }
309
310                    // TODO: force update counter only needs to be sent once per frame (and only if
311                    // it changed, although it might not be worth having a separate message for
312                    // optionally sending it since individual messages may have a bandwidth
313                    // overhead), however, here we send it potentially 2 times per subscribed
314                    // region by including it in the `CompSync` message.
315                    client.send_fallible(ServerGeneral::CompSync(
316                        comp_sync_package,
317                        force_updates.get(*client_entity).map_or(0, |f| f.counter()),
318                    ));
319                }
320            },
321        );
322        drop(guard);
323        job.cpu_stats.measure(common_ecs::ParMode::Single);
324
325        // Sync components that are only synced for the client's own entity.
326        for (entity, client, &uid, (maybe_pos, last_pos), vel, ori) in (
327            &entities,
328            &clients,
329            uids,
330            (positions.maybe(), last_pos.mask().maybe()),
331            (&velocities, last_vel.mask().maybe()).maybe(),
332            (&orientations, last_vel.mask().maybe()).maybe(),
333        )
334            .join()
335        {
336            // Include additional components for clients that aren't in a region (e.g. due
337            // to having no position or have sync_me as `false`) since those
338            // won't be synced above.
339            let include_all_comps = region_map.in_region_map(entity);
340
341            let mut comp_sync_package = trackers.create_sync_from_client_package(
342                &tracked_storages,
343                entity,
344                include_all_comps,
345            );
346
347            if include_all_comps && let Some(&pos) = maybe_pos {
348                let send_now = should_sync_client_physics(
349                    entity,
350                    &player_physics_settings,
351                    &players,
352                    &force_updates,
353                    is_rider,
354                    &editable_settings,
355                );
356                add_physics_components(
357                    send_now,
358                    &mut comp_sync_package,
359                    uid,
360                    pos,
361                    last_pos,
362                    ori,
363                    vel,
364                );
365            }
366
367            if !comp_sync_package.is_empty() {
368                client.send_fallible(ServerGeneral::CompSync(
369                    comp_sync_package,
370                    force_updates.get(entity).map_or(0, |f| f.counter()),
371                ));
372            }
373        }
374
375        for (entity, client, spectating_entity) in
376            (&entities, &clients, &spectating_entities).join()
377        {
378            // TODO: If the spectated entity is out of range while a change occurs it will
379            // cause the client to log errors, and those changes will be missed
380            // by the spectating entity.
381            //
382            // Additionally, when we stop spectating we don't delete the components that are
383            // synced for spectators. Leaving stale components on the client.
384            let comp_sync_package = trackers.create_sync_from_spectated_entity_package(
385                &tracked_storages,
386                entity,
387                spectating_entity.0,
388            );
389
390            if !comp_sync_package.is_empty() {
391                client.send_fallible(ServerGeneral::CompSync(
392                    comp_sync_package,
393                    force_updates.get(entity).map_or(0, |f| f.counter()),
394                ));
395            }
396        }
397
398        // Update the last physics components for each entity
399
400        (
401            &entities,
402            &positions,
403            velocities.maybe(),
404            orientations.maybe(),
405            last_pos.entries(),
406            last_vel.entries(),
407            last_ori.entries(),
408        )
409            .lend_join()
410            .for_each(|(_, &pos, vel, ori, last_pos, last_vel, last_ori)| {
411                last_pos.replace(Last(pos));
412                vel.and_then(|&v| last_vel.replace(Last(v)));
413                ori.and_then(|&o| last_ori.replace(Last(o)));
414            });
415
416        // Handle entity deletion in regions that don't exist in RegionMap
417        // (theoretically none)
418        for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
419            for client in (presences.maybe(), &subscriptions, &clients)
420                .join()
421                .filter_map(|(presence, subscription, client)| {
422                    if presence.is_some() && subscription.regions.contains(&region_key) {
423                        Some(client)
424                    } else {
425                        None
426                    }
427                })
428            {
429                for uid in &deleted {
430                    client.send_fallible(ServerGeneral::DeleteEntity(*uid));
431                }
432            }
433        }
434
435        // Sync inventories
436        for (inventory, update, client) in (inventories, &mut inventory_updates, &clients).join() {
437            client.send_fallible(ServerGeneral::InventoryUpdate(
438                inventory.clone(),
439                update.take_events(),
440            ));
441        }
442
443        // Consume/clear the current outcomes and convert them to a vec
444        let outcomes = outcomes.recv_all().collect::<Vec<_>>();
445
446        // Sync outcomes
447        for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
448            let is_near = |o_pos: Vec3<f32>| {
449                pos.zip_with(presence, |pos, presence| {
450                    pos.0.xy().distance_squared(o_pos.xy())
451                        < (presence.entity_view_distance.current() as f32
452                            * TerrainChunkSize::RECT_SIZE.x as f32)
453                            .powi(2)
454                })
455            };
456
457            let outcomes = outcomes
458                .iter()
459                .filter(|o| o.get_pos().and_then(is_near).unwrap_or(true))
460                .cloned()
461                .collect::<Vec<_>>();
462
463            if !outcomes.is_empty() {
464                client.send_fallible(ServerGeneral::Outcomes(outcomes));
465            }
466        }
467
468        // Remove all force flags.
469        for force_update in (&mut force_updates).join() {
470            force_update.clear();
471        }
472        inventory_updates.clear();
473
474        // Sync resources
475        // TODO: doesn't really belong in this system (rename system or create another
476        // system?)
477        const TOD_SYNC_FREQ: u64 = 100;
478        if tick % TOD_SYNC_FREQ == 0 {
479            let mut tod_lazymsg = None;
480            for client in (&clients).join() {
481                let msg = tod_lazymsg.unwrap_or_else(|| {
482                    client.prepare(ServerGeneral::TimeOfDay(
483                        *time_of_day,
484                        (*calendar).clone(),
485                        *time,
486                        *time_scale,
487                    ))
488                });
489                // We don't care much about stream errors here since they could just represent
490                // network disconnection, which is handled elsewhere.
491                let _ = client.send_prepared(&msg);
492                tod_lazymsg = Some(msg);
493            }
494        }
495    }
496}
497
498/// Determines whether a client should receive an update about its own physics
499/// components.
500fn should_sync_client_physics(
501    entity: specs::Entity,
502    player_physics_settings: &PlayerPhysicsSettings,
503    players: &ReadStorage<'_, Player>,
504    force_updates: &WriteStorage<'_, ForceUpdate>,
505    is_rider: &ReadStorage<'_, Is<Rider>>,
506    editable_settings: &EditableSettings,
507) -> bool {
508    let server_authoritative_physics = players.get(entity).is_none_or(|player| {
509        player_physics_settings
510            .settings
511            .get(&player.uuid())
512            .is_some_and(|settings| settings.server_authoritative_physics_optin())
513            || editable_settings
514                .server_physics_force_list
515                .contains_key(&player.uuid())
516    });
517    // Don't send client physics updates about itself unless force update is
518    // set or the client is subject to
519    // server-authoritative physics
520    force_updates.get(entity).is_some_and(|f| f.is_forced())
521        || server_authoritative_physics
522        || is_rider.contains(entity)
523}
524
525/// Adds physics components if `send_now` is true or `Option<Last<T>>` is
526/// `None`.
527///
528/// If `Last<T>` isn't present, this is recorded as an insertion rather than a
529/// modification.
530fn add_physics_components(
531    send_now: bool,
532    comp_sync_package: &mut CompSyncPackage<common_net::msg::EcsCompPacket>,
533    uid: Uid,
534    pos: Pos,
535    last_pos: Option<u32>,
536    ori: Option<(&Ori, Option<u32>)>,
537    vel: Option<(&Vel, Option<u32>)>,
538) {
539    if last_pos.is_none() {
540        comp_sync_package.comp_inserted(uid, pos);
541    } else if send_now {
542        comp_sync_package.comp_modified(uid, pos);
543    }
544
545    if let Some((v, last_vel)) = vel {
546        if last_vel.is_none() {
547            comp_sync_package.comp_inserted(uid, *v);
548        } else if send_now {
549            comp_sync_package.comp_modified(uid, *v);
550        }
551    }
552
553    if let Some((o, last_ori)) = ori {
554        if last_ori.is_none() {
555            comp_sync_package.comp_inserted(uid, *o);
556        } else if send_now {
557            comp_sync_package.comp_modified(uid, *o);
558        }
559    }
560}