veloren_server/persistence/
character_updater.rs

1use crate::comp;
2use common::character::CharacterId;
3
4use crate::persistence::{
5    ConnectionMode, DatabaseSettings, EditableComponents, PersistedComponents, VelorenConnection,
6    character_loader::{
7        CharacterScreenResponse, CharacterScreenResponseKind, CharacterUpdaterMessage,
8    },
9    error::PersistenceError,
10    establish_connection,
11};
12use crossbeam_channel::TryIter;
13use rusqlite::DropBehavior;
14use specs::Entity;
15use std::{
16    collections::HashMap,
17    sync::{
18        Arc, RwLock,
19        atomic::{AtomicBool, Ordering},
20    },
21};
22use tracing::{debug, error, info, trace, warn};
23
24pub type CharacterUpdateData = (
25    CharacterId,
26    comp::SkillSet,
27    comp::Inventory,
28    Vec<PetPersistenceData>,
29    Option<comp::Waypoint>,
30    comp::ability::ActiveAbilities,
31    Option<comp::MapMarker>,
32);
33
34pub type PetPersistenceData = (comp::Pet, comp::Body, comp::Stats);
35
36#[expect(clippy::large_enum_variant)]
37enum CharacterUpdaterAction {
38    BatchUpdate {
39        batch_id: u64,
40        updates: Vec<DatabaseActionKind>,
41    },
42    CreateCharacter {
43        entity: Entity,
44        player_uuid: String,
45        character_alias: String,
46        persisted_components: PersistedComponents,
47    },
48    EditCharacter {
49        entity: Entity,
50        player_uuid: String,
51        character_id: CharacterId,
52        character_alias: String,
53        editable_components: EditableComponents,
54    },
55    DisconnectedSuccess,
56}
57
58#[derive(Clone)]
59enum DatabaseAction {
60    New(DatabaseActionKind),
61    Submitted { batch_id: u64 },
62}
63
64impl DatabaseAction {
65    fn take_new(&mut self, batch_id: u64) -> Option<DatabaseActionKind> {
66        match core::mem::replace(self, Self::Submitted { batch_id }) {
67            Self::New(action) => Some(action),
68            submitted @ Self::Submitted { .. } => {
69                *self = submitted; // restore old batch_id
70                None
71            },
72        }
73    }
74}
75
76#[derive(Clone)]
77enum DatabaseActionKind {
78    UpdateCharacter(Box<CharacterUpdateData>),
79    DeleteCharacter {
80        requesting_player_uuid: String,
81        character_id: CharacterId,
82    },
83}
84
85/// A unidirectional messaging resource for saving characters in a
86/// background thread.
87///
88/// This is used to make updates to a character and their persisted components,
89/// such as inventory, loadout, etc...
90pub struct CharacterUpdater {
91    update_tx: Option<crossbeam_channel::Sender<CharacterUpdaterAction>>,
92    response_rx: crossbeam_channel::Receiver<CharacterUpdaterMessage>,
93    handle: Option<std::thread::JoinHandle<()>>,
94    /// Pending actions to be performed during the next persistence batch, such
95    /// as updates for recently logged out players and character deletions
96    pending_database_actions: HashMap<CharacterId, DatabaseAction>,
97    /// Will disconnect all characters (without persistence) on the next tick if
98    /// set to true
99    disconnect_all_clients_requested: Arc<AtomicBool>,
100    last_pending_database_event_id: u64,
101}
102
103impl CharacterUpdater {
104    pub fn new(settings: Arc<RwLock<DatabaseSettings>>) -> rusqlite::Result<Self> {
105        let (update_tx, update_rx) = crossbeam_channel::unbounded::<CharacterUpdaterAction>();
106        let (response_tx, response_rx) = crossbeam_channel::unbounded::<CharacterUpdaterMessage>();
107
108        let disconnect_all_clients_requested = Arc::new(AtomicBool::new(false));
109        let disconnect_all_clients_requested_clone = Arc::clone(&disconnect_all_clients_requested);
110
111        let builder = std::thread::Builder::new().name("persistence_updater".into());
112        let handle = builder
113            .spawn(move || {
114                // Unwrap here is safe as there is no code that can panic when the write lock is
115                // taken that could cause the RwLock to become poisoned.
116                let mut conn =
117                    establish_connection(&settings.read().unwrap(), ConnectionMode::ReadWrite);
118                while let Ok(action) = update_rx.recv() {
119                    match action {
120                        CharacterUpdaterAction::BatchUpdate { batch_id, updates } => {
121                            if disconnect_all_clients_requested_clone.load(Ordering::Relaxed) {
122                                debug!(
123                                    "Skipping persistence due to pending disconnection of all \
124                                     clients"
125                                );
126                                continue;
127                            }
128                            conn.update_log_mode(&settings);
129
130                            if let Err(e) = execute_batch_update(updates.into_iter(), &mut conn) {
131                                error!(
132                                    ?e,
133                                    "Error during character batch update, disconnecting all \
134                                     clients to avoid loss of data integrity."
135                                );
136                                disconnect_all_clients_requested_clone
137                                    .store(true, Ordering::Relaxed);
138                            };
139
140                            if let Err(e) = response_tx
141                                .send(CharacterUpdaterMessage::DatabaseBatchCompletion(batch_id))
142                            {
143                                error!(?e, "Could not send DatabaseBatchCompletion message");
144                            } else {
145                                debug!(
146                                    "Submitted DatabaseBatchCompletion - Batch ID: {}",
147                                    batch_id
148                                );
149                            }
150                        },
151                        CharacterUpdaterAction::CreateCharacter {
152                            entity,
153                            character_alias,
154                            player_uuid,
155                            persisted_components,
156                        } => {
157                            match execute_character_create(
158                                entity,
159                                character_alias,
160                                &player_uuid,
161                                persisted_components,
162                                &mut conn,
163                            ) {
164                                Ok(response) => {
165                                    if let Err(e) = response_tx.send(response) {
166                                        error!(?e, "Could not send character creation response");
167                                    } else {
168                                        debug!(
169                                            "Processed character create for player {}",
170                                            player_uuid
171                                        );
172                                    }
173                                },
174                                Err(e) => error!(
175                                    "Error creating character for player {}, error: {:?}",
176                                    player_uuid, e
177                                ),
178                            }
179                        },
180                        CharacterUpdaterAction::EditCharacter {
181                            entity,
182                            character_id,
183                            character_alias,
184                            player_uuid,
185                            editable_components,
186                        } => {
187                            match execute_character_edit(
188                                entity,
189                                character_id,
190                                character_alias,
191                                &player_uuid,
192                                editable_components,
193                                &mut conn,
194                            ) {
195                                Ok(response) => {
196                                    if let Err(e) = response_tx.send(response) {
197                                        error!(?e, "Could not send character edit response");
198                                    } else {
199                                        debug!(
200                                            "Processed character edit for player {}",
201                                            player_uuid
202                                        );
203                                    }
204                                },
205                                Err(e) => error!(
206                                    "Error editing character for player {}, error: {:?}",
207                                    player_uuid, e
208                                ),
209                            }
210                        },
211                        CharacterUpdaterAction::DisconnectedSuccess => {
212                            info!(
213                                "CharacterUpdater received DisconnectedSuccess event, resuming \
214                                 batch updates"
215                            );
216                            // Reset the disconnection request as we have had confirmation that all
217                            // clients have been disconnected
218                            disconnect_all_clients_requested_clone.store(false, Ordering::Relaxed);
219                        },
220                    }
221                }
222            })
223            .unwrap();
224
225        Ok(Self {
226            update_tx: Some(update_tx),
227            response_rx,
228            handle: Some(handle),
229            pending_database_actions: HashMap::new(),
230            disconnect_all_clients_requested,
231            last_pending_database_event_id: 0,
232        })
233    }
234
235    /// Adds a character to the list of characters that have recently logged out
236    /// and will be persisted in the next batch update.
237    pub fn add_pending_logout_update(&mut self, update_data: CharacterUpdateData) {
238        if self
239            .disconnect_all_clients_requested
240            .load(Ordering::Relaxed)
241        {
242            warn!(
243                "Ignoring request to add pending logout update for character ID {} as there is a \
244                 disconnection of all clients in progress",
245                update_data.0.0
246            );
247            return;
248        }
249
250        if self.pending_database_actions.contains_key(&update_data.0) {
251            warn!(
252                "Ignoring request to add pending logout update for character ID {} as there is \
253                 already a pending delete for this character",
254                update_data.0.0
255            );
256            return;
257        }
258
259        self.pending_database_actions.insert(
260            update_data.0, // CharacterId
261            DatabaseAction::New(DatabaseActionKind::UpdateCharacter(Box::new(update_data))),
262        );
263    }
264
265    pub fn has_pending_database_action(&self, character_id: CharacterId) -> bool {
266        self.pending_database_actions.contains_key(&character_id)
267    }
268
269    pub fn process_batch_completion(&mut self, completed_batch_id: u64) {
270        self.pending_database_actions.retain(|_, event| {
271            !matches!(event, DatabaseAction::Submitted {
272                    batch_id,
273            } if completed_batch_id == *batch_id)
274        });
275        debug!(
276            "Processed database batch completion - Batch ID: {}",
277            completed_batch_id
278        )
279    }
280
281    /// Returns a value indicating whether there is a pending request to
282    /// disconnect all clients due to a batch update transaction failure
283    pub fn disconnect_all_clients_requested(&self) -> bool {
284        self.disconnect_all_clients_requested
285            .load(Ordering::Relaxed)
286    }
287
288    pub fn create_character(
289        &mut self,
290        entity: Entity,
291        requesting_player_uuid: String,
292        alias: String,
293        persisted_components: PersistedComponents,
294    ) {
295        if let Err(e) =
296            self.update_tx
297                .as_ref()
298                .unwrap()
299                .send(CharacterUpdaterAction::CreateCharacter {
300                    entity,
301                    player_uuid: requesting_player_uuid,
302                    character_alias: alias,
303                    persisted_components,
304                })
305        {
306            error!(?e, "Could not send character creation request");
307        }
308    }
309
310    pub fn edit_character(
311        &mut self,
312        entity: Entity,
313        requesting_player_uuid: String,
314        character_id: CharacterId,
315        alias: String,
316        editable_components: EditableComponents,
317    ) {
318        if let Err(e) =
319            self.update_tx
320                .as_ref()
321                .unwrap()
322                .send(CharacterUpdaterAction::EditCharacter {
323                    entity,
324                    player_uuid: requesting_player_uuid,
325                    character_id,
326                    character_alias: alias,
327                    editable_components,
328                })
329        {
330            error!(?e, "Could not send character edit request");
331        }
332    }
333
334    fn next_pending_database_event_id(&mut self) -> u64 {
335        self.last_pending_database_event_id += 1;
336        self.last_pending_database_event_id
337    }
338
339    pub fn queue_character_deletion(
340        &mut self,
341        requesting_player_uuid: String,
342        character_id: CharacterId,
343    ) {
344        // Insert the delete as a pending database action - if the player has recently
345        // logged out this will replace their pending update with a delete which
346        // is fine, as the user has actively chosen to delete the character.
347        self.pending_database_actions.insert(
348            character_id,
349            DatabaseAction::New(DatabaseActionKind::DeleteCharacter {
350                requesting_player_uuid,
351                character_id,
352            }),
353        );
354    }
355
356    /// Updates a collection of characters based on their id and components
357    pub fn batch_update(&mut self, updates: impl Iterator<Item = CharacterUpdateData>) {
358        let batch_id = self.next_pending_database_event_id();
359
360        // Collect any new updates, ignoring updates from a previous update that are
361        // still pending completion
362        let existing_pending_actions = self
363            .pending_database_actions
364            .iter_mut()
365            .filter_map(|(_, event)| event.take_new(batch_id));
366
367        // Combine the pending actions with the updates for logged in characters
368        let pending_actions = existing_pending_actions
369            .into_iter()
370            .chain(updates.map(|update| DatabaseActionKind::UpdateCharacter(Box::new(update))))
371            .collect::<Vec<DatabaseActionKind>>();
372
373        if !pending_actions.is_empty() {
374            debug!(
375                "Sending persistence update batch ID {} containing {} updates",
376                batch_id,
377                pending_actions.len()
378            );
379            if let Err(e) =
380                self.update_tx
381                    .as_ref()
382                    .unwrap()
383                    .send(CharacterUpdaterAction::BatchUpdate {
384                        batch_id,
385                        updates: pending_actions,
386                    })
387            {
388                error!(?e, "Could not send persistence batch update");
389            }
390        } else {
391            trace!("Skipping persistence batch - no pending updates")
392        }
393    }
394
395    /// Indicates to the batch update thread that a requested disconnection of
396    /// all clients has been processed
397    pub fn disconnected_success(&mut self) {
398        self.update_tx
399            .as_ref()
400            .unwrap()
401            .send(CharacterUpdaterAction::DisconnectedSuccess)
402            .expect(
403                "Failed to send DisconnectedSuccess event - not sending this event will prevent \
404                 future persistence batches from running",
405            );
406    }
407
408    /// Returns a non-blocking iterator over CharacterLoaderResponse messages
409    pub fn messages(&self) -> TryIter<CharacterUpdaterMessage> { self.response_rx.try_iter() }
410}
411
412fn execute_batch_update(
413    updates: impl Iterator<Item = DatabaseActionKind>,
414    connection: &mut VelorenConnection,
415) -> Result<(), PersistenceError> {
416    let mut transaction = connection.connection.transaction()?;
417    transaction.set_drop_behavior(DropBehavior::Rollback);
418    trace!("Transaction started for character batch update");
419    updates.into_iter().try_for_each(|event| match event {
420        DatabaseActionKind::UpdateCharacter(box (
421            character_id,
422            stats,
423            inventory,
424            pets,
425            waypoint,
426            active_abilities,
427            map_marker,
428        )) => super::character::update(
429            character_id,
430            stats,
431            inventory,
432            pets,
433            waypoint,
434            active_abilities,
435            map_marker,
436            &mut transaction,
437        ),
438        DatabaseActionKind::DeleteCharacter {
439            requesting_player_uuid,
440            character_id,
441        } => super::character::delete_character(
442            &requesting_player_uuid,
443            character_id,
444            &mut transaction,
445        ),
446    })?;
447
448    transaction.commit()?;
449
450    trace!("Commit for character batch update completed");
451    Ok(())
452}
453
454fn execute_character_create(
455    entity: Entity,
456    alias: String,
457    requesting_player_uuid: &str,
458    persisted_components: PersistedComponents,
459    connection: &mut VelorenConnection,
460) -> Result<CharacterUpdaterMessage, PersistenceError> {
461    let mut transaction = connection.connection.transaction()?;
462
463    let response = CharacterScreenResponse {
464        target_entity: entity,
465        response_kind: CharacterScreenResponseKind::CharacterCreation(
466            super::character::create_character(
467                requesting_player_uuid,
468                &alias,
469                persisted_components,
470                &mut transaction,
471            ),
472        ),
473    };
474
475    if !response.is_err() {
476        transaction.commit()?;
477    };
478
479    Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
480}
481
482fn execute_character_edit(
483    entity: Entity,
484    character_id: CharacterId,
485    alias: String,
486    requesting_player_uuid: &str,
487    editable_components: EditableComponents,
488    connection: &mut VelorenConnection,
489) -> Result<CharacterUpdaterMessage, PersistenceError> {
490    let mut transaction = connection.connection.transaction()?;
491
492    let response = CharacterScreenResponse {
493        target_entity: entity,
494        response_kind: CharacterScreenResponseKind::CharacterEdit(
495            super::character::edit_character(
496                editable_components,
497                &mut transaction,
498                character_id,
499                requesting_player_uuid,
500                &alias,
501            ),
502        ),
503    };
504
505    if !response.is_err() {
506        transaction.commit()?;
507    };
508
509    Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
510}
511
512impl Drop for CharacterUpdater {
513    fn drop(&mut self) {
514        drop(self.update_tx.take());
515        if let Err(e) = self.handle.take().unwrap().join() {
516            error!(?e, "Error from joining character update thread");
517        }
518    }
519}