veloren_server/persistence/
character_updater.rs

1use crate::comp;
2use common::{character::CharacterId, event::PermanentChange};
3
4use crate::persistence::{
5    ConnectionMode, DatabaseSettings, EditableComponents, PersistedComponents, VelorenConnection,
6    character_loader::{
7        CharacterScreenResponse, CharacterScreenResponseKind, CharacterUpdaterMessage,
8    },
9    error::PersistenceError,
10    establish_connection,
11};
12use crossbeam_channel::TryIter;
13use rusqlite::DropBehavior;
14use specs::Entity;
15use std::{
16    collections::HashMap,
17    sync::{
18        Arc, RwLock,
19        atomic::{AtomicBool, Ordering},
20    },
21};
22use tracing::{debug, error, info, trace, warn};
23
24pub type CharacterUpdateData = (
25    CharacterId,
26    comp::SkillSet,
27    comp::Inventory,
28    Vec<PetPersistenceData>,
29    Option<comp::Waypoint>,
30    comp::ability::ActiveAbilities,
31    Option<comp::MapMarker>,
32);
33
34pub type PetPersistenceData = (comp::Pet, comp::Body, comp::Stats);
35
36#[expect(clippy::large_enum_variant)]
37enum CharacterUpdaterAction {
38    BatchUpdate {
39        batch_id: u64,
40        updates: Vec<DatabaseActionKind>,
41    },
42    CreateCharacter {
43        entity: Entity,
44        player_uuid: String,
45        character_alias: String,
46        persisted_components: PersistedComponents,
47    },
48    EditCharacter {
49        entity: Entity,
50        player_uuid: String,
51        character_id: CharacterId,
52        character_alias: Option<String>,
53        editable_components: EditableComponents,
54        trusted_change: Option<PermanentChange>,
55    },
56    DisconnectedSuccess,
57}
58
59#[derive(Clone)]
60enum DatabaseAction {
61    New(DatabaseActionKind),
62    Submitted { batch_id: u64 },
63}
64
65impl DatabaseAction {
66    fn take_new(&mut self, batch_id: u64) -> Option<DatabaseActionKind> {
67        match core::mem::replace(self, Self::Submitted { batch_id }) {
68            Self::New(action) => Some(action),
69            submitted @ Self::Submitted { .. } => {
70                *self = submitted; // restore old batch_id
71                None
72            },
73        }
74    }
75}
76
77#[derive(Clone)]
78enum DatabaseActionKind {
79    UpdateCharacter(Box<CharacterUpdateData>),
80    DeleteCharacter {
81        requesting_player_uuid: String,
82        character_id: CharacterId,
83    },
84}
85
86/// A unidirectional messaging resource for saving characters in a
87/// background thread.
88///
89/// This is used to make updates to a character and their persisted components,
90/// such as inventory, loadout, etc...
91pub struct CharacterUpdater {
92    update_tx: Option<crossbeam_channel::Sender<CharacterUpdaterAction>>,
93    response_rx: crossbeam_channel::Receiver<CharacterUpdaterMessage>,
94    handle: Option<std::thread::JoinHandle<()>>,
95    /// Pending actions to be performed during the next persistence batch, such
96    /// as updates for recently logged out players and character deletions
97    pending_database_actions: HashMap<CharacterId, DatabaseAction>,
98    /// Will disconnect all characters (without persistence) on the next tick if
99    /// set to true
100    disconnect_all_clients_requested: Arc<AtomicBool>,
101    last_pending_database_event_id: u64,
102}
103
104impl CharacterUpdater {
105    pub fn new(settings: Arc<RwLock<DatabaseSettings>>) -> rusqlite::Result<Self> {
106        let (update_tx, update_rx) = crossbeam_channel::unbounded::<CharacterUpdaterAction>();
107        let (response_tx, response_rx) = crossbeam_channel::unbounded::<CharacterUpdaterMessage>();
108
109        let disconnect_all_clients_requested = Arc::new(AtomicBool::new(false));
110        let disconnect_all_clients_requested_clone = Arc::clone(&disconnect_all_clients_requested);
111
112        let builder = std::thread::Builder::new().name("persistence_updater".into());
113        let handle = builder
114            .spawn(move || {
115                // Unwrap here is safe as there is no code that can panic when the write lock is
116                // taken that could cause the RwLock to become poisoned.
117                let mut conn =
118                    establish_connection(&settings.read().unwrap(), ConnectionMode::ReadWrite);
119                while let Ok(action) = update_rx.recv() {
120                    match action {
121                        CharacterUpdaterAction::BatchUpdate { batch_id, updates } => {
122                            if disconnect_all_clients_requested_clone.load(Ordering::Relaxed) {
123                                debug!(
124                                    "Skipping persistence due to pending disconnection of all \
125                                     clients"
126                                );
127                                continue;
128                            }
129                            conn.update_log_mode(&settings);
130
131                            if let Err(e) = execute_batch_update(updates.into_iter(), &mut conn) {
132                                error!(
133                                    ?e,
134                                    "Error during character batch update, disconnecting all \
135                                     clients to avoid loss of data integrity."
136                                );
137                                disconnect_all_clients_requested_clone
138                                    .store(true, Ordering::Relaxed);
139                            };
140
141                            if let Err(e) = response_tx
142                                .send(CharacterUpdaterMessage::DatabaseBatchCompletion(batch_id))
143                            {
144                                error!(?e, "Could not send DatabaseBatchCompletion message");
145                            } else {
146                                debug!(
147                                    "Submitted DatabaseBatchCompletion - Batch ID: {}",
148                                    batch_id
149                                );
150                            }
151                        },
152                        CharacterUpdaterAction::CreateCharacter {
153                            entity,
154                            character_alias,
155                            player_uuid,
156                            persisted_components,
157                        } => {
158                            match execute_character_create(
159                                entity,
160                                character_alias,
161                                &player_uuid,
162                                persisted_components,
163                                &mut conn,
164                            ) {
165                                Ok(response) => {
166                                    if let Err(e) = response_tx.send(response) {
167                                        error!(?e, "Could not send character creation response");
168                                    } else {
169                                        debug!(
170                                            "Processed character create for player {}",
171                                            player_uuid
172                                        );
173                                    }
174                                },
175                                Err(e) => error!(
176                                    "Error creating character for player {}, error: {:?}",
177                                    player_uuid, e
178                                ),
179                            }
180                        },
181                        CharacterUpdaterAction::EditCharacter {
182                            entity,
183                            character_id,
184                            character_alias,
185                            player_uuid,
186                            editable_components,
187                            trusted_change,
188                        } => {
189                            match execute_character_edit(
190                                entity,
191                                character_id,
192                                character_alias,
193                                &player_uuid,
194                                editable_components,
195                                trusted_change,
196                                &mut conn,
197                            ) {
198                                Ok(response) => {
199                                    if let Err(e) = response_tx.send(response) {
200                                        error!(?e, "Could not send character edit response");
201                                    } else {
202                                        debug!(
203                                            "Processed character edit for player {}",
204                                            player_uuid
205                                        );
206                                    }
207                                },
208                                Err(e) => error!(
209                                    "Error editing character for player {}, error: {:?}",
210                                    player_uuid, e
211                                ),
212                            }
213                        },
214                        CharacterUpdaterAction::DisconnectedSuccess => {
215                            info!(
216                                "CharacterUpdater received DisconnectedSuccess event, resuming \
217                                 batch updates"
218                            );
219                            // Reset the disconnection request as we have had confirmation that all
220                            // clients have been disconnected
221                            disconnect_all_clients_requested_clone.store(false, Ordering::Relaxed);
222                        },
223                    }
224                }
225            })
226            .unwrap();
227
228        Ok(Self {
229            update_tx: Some(update_tx),
230            response_rx,
231            handle: Some(handle),
232            pending_database_actions: HashMap::new(),
233            disconnect_all_clients_requested,
234            last_pending_database_event_id: 0,
235        })
236    }
237
238    /// Adds a character to the list of characters that have recently logged out
239    /// and will be persisted in the next batch update.
240    pub fn add_pending_logout_update(&mut self, update_data: CharacterUpdateData) {
241        if self
242            .disconnect_all_clients_requested
243            .load(Ordering::Relaxed)
244        {
245            warn!(
246                "Ignoring request to add pending logout update for character ID {} as there is a \
247                 disconnection of all clients in progress",
248                update_data.0.0
249            );
250            return;
251        }
252
253        if self.pending_database_actions.contains_key(&update_data.0) {
254            warn!(
255                "Ignoring request to add pending logout update for character ID {} as there is \
256                 already a pending delete for this character",
257                update_data.0.0
258            );
259            return;
260        }
261
262        self.pending_database_actions.insert(
263            update_data.0, // CharacterId
264            DatabaseAction::New(DatabaseActionKind::UpdateCharacter(Box::new(update_data))),
265        );
266    }
267
268    pub fn has_pending_database_action(&self, character_id: CharacterId) -> bool {
269        self.pending_database_actions.contains_key(&character_id)
270    }
271
272    pub fn process_batch_completion(&mut self, completed_batch_id: u64) {
273        self.pending_database_actions.retain(|_, event| {
274            !matches!(event, DatabaseAction::Submitted {
275                    batch_id,
276            } if completed_batch_id == *batch_id)
277        });
278        debug!(
279            "Processed database batch completion - Batch ID: {}",
280            completed_batch_id
281        )
282    }
283
284    /// Returns a value indicating whether there is a pending request to
285    /// disconnect all clients due to a batch update transaction failure
286    pub fn disconnect_all_clients_requested(&self) -> bool {
287        self.disconnect_all_clients_requested
288            .load(Ordering::Relaxed)
289    }
290
291    pub fn create_character(
292        &mut self,
293        entity: Entity,
294        requesting_player_uuid: String,
295        alias: String,
296        persisted_components: PersistedComponents,
297    ) {
298        if let Err(e) =
299            self.update_tx
300                .as_ref()
301                .unwrap()
302                .send(CharacterUpdaterAction::CreateCharacter {
303                    entity,
304                    player_uuid: requesting_player_uuid,
305                    character_alias: alias,
306                    persisted_components,
307                })
308        {
309            error!(?e, "Could not send character creation request");
310        }
311    }
312
313    pub fn edit_character(
314        &mut self,
315        entity: Entity,
316        requesting_player_uuid: String,
317        character_id: CharacterId,
318        alias: Option<String>,
319        editable_components: EditableComponents,
320        trusted_change: Option<PermanentChange>,
321    ) {
322        if let Err(e) =
323            self.update_tx
324                .as_ref()
325                .unwrap()
326                .send(CharacterUpdaterAction::EditCharacter {
327                    entity,
328                    player_uuid: requesting_player_uuid,
329                    character_id,
330                    character_alias: alias,
331                    editable_components,
332                    trusted_change,
333                })
334        {
335            error!(?e, "Could not send character edit request");
336        }
337    }
338
339    fn next_pending_database_event_id(&mut self) -> u64 {
340        self.last_pending_database_event_id += 1;
341        self.last_pending_database_event_id
342    }
343
344    pub fn queue_character_deletion(
345        &mut self,
346        requesting_player_uuid: String,
347        character_id: CharacterId,
348    ) {
349        // Insert the delete as a pending database action - if the player has recently
350        // logged out this will replace their pending update with a delete which
351        // is fine, as the user has actively chosen to delete the character.
352        self.pending_database_actions.insert(
353            character_id,
354            DatabaseAction::New(DatabaseActionKind::DeleteCharacter {
355                requesting_player_uuid,
356                character_id,
357            }),
358        );
359    }
360
361    /// Updates a collection of characters based on their id and components
362    pub fn batch_update(&mut self, updates: impl Iterator<Item = CharacterUpdateData>) {
363        let batch_id = self.next_pending_database_event_id();
364
365        // Collect any new updates, ignoring updates from a previous update that are
366        // still pending completion
367        let existing_pending_actions = self
368            .pending_database_actions
369            .iter_mut()
370            .filter_map(|(_, event)| event.take_new(batch_id));
371
372        // Combine the pending actions with the updates for logged in characters
373        let pending_actions = existing_pending_actions
374            .into_iter()
375            .chain(updates.map(|update| DatabaseActionKind::UpdateCharacter(Box::new(update))))
376            .collect::<Vec<DatabaseActionKind>>();
377
378        if !pending_actions.is_empty() {
379            debug!(
380                "Sending persistence update batch ID {} containing {} updates",
381                batch_id,
382                pending_actions.len()
383            );
384            if let Err(e) =
385                self.update_tx
386                    .as_ref()
387                    .unwrap()
388                    .send(CharacterUpdaterAction::BatchUpdate {
389                        batch_id,
390                        updates: pending_actions,
391                    })
392            {
393                error!(?e, "Could not send persistence batch update");
394            }
395        } else {
396            trace!("Skipping persistence batch - no pending updates")
397        }
398    }
399
400    /// Indicates to the batch update thread that a requested disconnection of
401    /// all clients has been processed
402    pub fn disconnected_success(&mut self) {
403        self.update_tx
404            .as_ref()
405            .unwrap()
406            .send(CharacterUpdaterAction::DisconnectedSuccess)
407            .expect(
408                "Failed to send DisconnectedSuccess event - not sending this event will prevent \
409                 future persistence batches from running",
410            );
411    }
412
413    /// Returns a non-blocking iterator over CharacterUpdaterMessage messages
414    pub fn messages(&self) -> TryIter<CharacterUpdaterMessage> { self.response_rx.try_iter() }
415}
416
417fn execute_batch_update(
418    updates: impl Iterator<Item = DatabaseActionKind>,
419    connection: &mut VelorenConnection,
420) -> Result<(), PersistenceError> {
421    let mut transaction = connection.connection.transaction()?;
422    transaction.set_drop_behavior(DropBehavior::Rollback);
423    trace!("Transaction started for character batch update");
424    updates.into_iter().try_for_each(|event| match event {
425        DatabaseActionKind::UpdateCharacter(box (
426            character_id,
427            stats,
428            inventory,
429            pets,
430            waypoint,
431            active_abilities,
432            map_marker,
433        )) => super::character::update(
434            character_id,
435            stats,
436            inventory,
437            pets,
438            waypoint,
439            active_abilities,
440            map_marker,
441            &mut transaction,
442        ),
443        DatabaseActionKind::DeleteCharacter {
444            requesting_player_uuid,
445            character_id,
446        } => super::character::delete_character(
447            &requesting_player_uuid,
448            character_id,
449            &mut transaction,
450        ),
451    })?;
452
453    transaction.commit()?;
454
455    trace!("Commit for character batch update completed");
456    Ok(())
457}
458
459fn execute_character_create(
460    entity: Entity,
461    alias: String,
462    requesting_player_uuid: &str,
463    persisted_components: PersistedComponents,
464    connection: &mut VelorenConnection,
465) -> Result<CharacterUpdaterMessage, PersistenceError> {
466    let mut transaction = connection.connection.transaction()?;
467
468    let response = CharacterScreenResponse {
469        target_entity: entity,
470        response_kind: CharacterScreenResponseKind::CharacterCreation(
471            super::character::create_character(
472                requesting_player_uuid,
473                &alias,
474                persisted_components,
475                &mut transaction,
476            ),
477        ),
478    };
479
480    if !response.is_err() {
481        transaction.commit()?;
482    };
483
484    Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
485}
486
487fn execute_character_edit(
488    entity: Entity,
489    character_id: CharacterId,
490    alias: Option<String>,
491    requesting_player_uuid: &str,
492    editable_components: EditableComponents,
493    trusted_change: Option<PermanentChange>,
494    connection: &mut VelorenConnection,
495) -> Result<CharacterUpdaterMessage, PersistenceError> {
496    let mut transaction = connection.connection.transaction()?;
497
498    let response = CharacterScreenResponse {
499        target_entity: entity,
500        response_kind: CharacterScreenResponseKind::CharacterEdit(
501            super::character::edit_character(
502                editable_components,
503                trusted_change,
504                &mut transaction,
505                character_id,
506                requesting_player_uuid,
507                alias.as_deref(),
508            ),
509        ),
510    };
511
512    if !response.is_err() {
513        transaction.commit()?;
514    };
515
516    Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
517}
518
519impl Drop for CharacterUpdater {
520    fn drop(&mut self) {
521        drop(self.update_tx.take());
522        if let Err(e) = self.handle.take().unwrap().join() {
523            error!(?e, "Error from joining character update thread");
524        }
525    }
526}