1use crate::comp;
2use common::{character::CharacterId, event::PermanentChange};
3
4use crate::persistence::{
5 ConnectionMode, DatabaseSettings, EditableComponents, PersistedComponents, VelorenConnection,
6 character_loader::{
7 CharacterScreenResponse, CharacterScreenResponseKind, CharacterUpdaterMessage,
8 },
9 error::PersistenceError,
10 establish_connection,
11};
12use crossbeam_channel::TryIter;
13use rusqlite::DropBehavior;
14use specs::Entity;
15use std::{
16 collections::HashMap,
17 sync::{
18 Arc, RwLock,
19 atomic::{AtomicBool, Ordering},
20 },
21};
22use tracing::{debug, error, info, trace, warn};
23
24pub type CharacterUpdateData = (
25 CharacterId,
26 comp::SkillSet,
27 comp::Inventory,
28 Vec<PetPersistenceData>,
29 Option<comp::Waypoint>,
30 comp::ability::ActiveAbilities,
31 Option<comp::MapMarker>,
32);
33
34pub type PetPersistenceData = (comp::Pet, comp::Body, comp::Stats);
35
36#[expect(clippy::large_enum_variant)]
37enum CharacterUpdaterAction {
38 BatchUpdate {
39 batch_id: u64,
40 updates: Vec<DatabaseActionKind>,
41 },
42 CreateCharacter {
43 entity: Entity,
44 player_uuid: String,
45 character_alias: String,
46 persisted_components: PersistedComponents,
47 },
48 EditCharacter {
49 entity: Entity,
50 player_uuid: String,
51 character_id: CharacterId,
52 character_alias: Option<String>,
53 editable_components: EditableComponents,
54 trusted_change: Option<PermanentChange>,
55 },
56 DisconnectedSuccess,
57}
58
59#[derive(Clone)]
60enum DatabaseAction {
61 New(DatabaseActionKind),
62 Submitted { batch_id: u64 },
63}
64
65impl DatabaseAction {
66 fn take_new(&mut self, batch_id: u64) -> Option<DatabaseActionKind> {
67 match core::mem::replace(self, Self::Submitted { batch_id }) {
68 Self::New(action) => Some(action),
69 submitted @ Self::Submitted { .. } => {
70 *self = submitted; None
72 },
73 }
74 }
75}
76
77#[derive(Clone)]
78enum DatabaseActionKind {
79 UpdateCharacter(Box<CharacterUpdateData>),
80 DeleteCharacter {
81 requesting_player_uuid: String,
82 character_id: CharacterId,
83 },
84}
85
86pub struct CharacterUpdater {
92 update_tx: Option<crossbeam_channel::Sender<CharacterUpdaterAction>>,
93 response_rx: crossbeam_channel::Receiver<CharacterUpdaterMessage>,
94 handle: Option<std::thread::JoinHandle<()>>,
95 pending_database_actions: HashMap<CharacterId, DatabaseAction>,
98 disconnect_all_clients_requested: Arc<AtomicBool>,
101 last_pending_database_event_id: u64,
102}
103
104impl CharacterUpdater {
105 pub fn new(settings: Arc<RwLock<DatabaseSettings>>) -> rusqlite::Result<Self> {
106 let (update_tx, update_rx) = crossbeam_channel::unbounded::<CharacterUpdaterAction>();
107 let (response_tx, response_rx) = crossbeam_channel::unbounded::<CharacterUpdaterMessage>();
108
109 let disconnect_all_clients_requested = Arc::new(AtomicBool::new(false));
110 let disconnect_all_clients_requested_clone = Arc::clone(&disconnect_all_clients_requested);
111
112 let builder = std::thread::Builder::new().name("persistence_updater".into());
113 let handle = builder
114 .spawn(move || {
115 let mut conn =
118 establish_connection(&settings.read().unwrap(), ConnectionMode::ReadWrite);
119 while let Ok(action) = update_rx.recv() {
120 match action {
121 CharacterUpdaterAction::BatchUpdate { batch_id, updates } => {
122 if disconnect_all_clients_requested_clone.load(Ordering::Relaxed) {
123 debug!(
124 "Skipping persistence due to pending disconnection of all \
125 clients"
126 );
127 continue;
128 }
129 conn.update_log_mode(&settings);
130
131 if let Err(e) = execute_batch_update(updates.into_iter(), &mut conn) {
132 error!(
133 ?e,
134 "Error during character batch update, disconnecting all \
135 clients to avoid loss of data integrity."
136 );
137 disconnect_all_clients_requested_clone
138 .store(true, Ordering::Relaxed);
139 };
140
141 if let Err(e) = response_tx
142 .send(CharacterUpdaterMessage::DatabaseBatchCompletion(batch_id))
143 {
144 error!(?e, "Could not send DatabaseBatchCompletion message");
145 } else {
146 debug!(
147 "Submitted DatabaseBatchCompletion - Batch ID: {}",
148 batch_id
149 );
150 }
151 },
152 CharacterUpdaterAction::CreateCharacter {
153 entity,
154 character_alias,
155 player_uuid,
156 persisted_components,
157 } => {
158 match execute_character_create(
159 entity,
160 character_alias,
161 &player_uuid,
162 persisted_components,
163 &mut conn,
164 ) {
165 Ok(response) => {
166 if let Err(e) = response_tx.send(response) {
167 error!(?e, "Could not send character creation response");
168 } else {
169 debug!(
170 "Processed character create for player {}",
171 player_uuid
172 );
173 }
174 },
175 Err(e) => error!(
176 "Error creating character for player {}, error: {:?}",
177 player_uuid, e
178 ),
179 }
180 },
181 CharacterUpdaterAction::EditCharacter {
182 entity,
183 character_id,
184 character_alias,
185 player_uuid,
186 editable_components,
187 trusted_change,
188 } => {
189 match execute_character_edit(
190 entity,
191 character_id,
192 character_alias,
193 &player_uuid,
194 editable_components,
195 trusted_change,
196 &mut conn,
197 ) {
198 Ok(response) => {
199 if let Err(e) = response_tx.send(response) {
200 error!(?e, "Could not send character edit response");
201 } else {
202 debug!(
203 "Processed character edit for player {}",
204 player_uuid
205 );
206 }
207 },
208 Err(e) => error!(
209 "Error editing character for player {}, error: {:?}",
210 player_uuid, e
211 ),
212 }
213 },
214 CharacterUpdaterAction::DisconnectedSuccess => {
215 info!(
216 "CharacterUpdater received DisconnectedSuccess event, resuming \
217 batch updates"
218 );
219 disconnect_all_clients_requested_clone.store(false, Ordering::Relaxed);
222 },
223 }
224 }
225 })
226 .unwrap();
227
228 Ok(Self {
229 update_tx: Some(update_tx),
230 response_rx,
231 handle: Some(handle),
232 pending_database_actions: HashMap::new(),
233 disconnect_all_clients_requested,
234 last_pending_database_event_id: 0,
235 })
236 }
237
238 pub fn add_pending_logout_update(&mut self, update_data: CharacterUpdateData) {
241 if self
242 .disconnect_all_clients_requested
243 .load(Ordering::Relaxed)
244 {
245 warn!(
246 "Ignoring request to add pending logout update for character ID {} as there is a \
247 disconnection of all clients in progress",
248 update_data.0.0
249 );
250 return;
251 }
252
253 if self.pending_database_actions.contains_key(&update_data.0) {
254 warn!(
255 "Ignoring request to add pending logout update for character ID {} as there is \
256 already a pending delete for this character",
257 update_data.0.0
258 );
259 return;
260 }
261
262 self.pending_database_actions.insert(
263 update_data.0, DatabaseAction::New(DatabaseActionKind::UpdateCharacter(Box::new(update_data))),
265 );
266 }
267
268 pub fn has_pending_database_action(&self, character_id: CharacterId) -> bool {
269 self.pending_database_actions.contains_key(&character_id)
270 }
271
272 pub fn process_batch_completion(&mut self, completed_batch_id: u64) {
273 self.pending_database_actions.retain(|_, event| {
274 !matches!(event, DatabaseAction::Submitted {
275 batch_id,
276 } if completed_batch_id == *batch_id)
277 });
278 debug!(
279 "Processed database batch completion - Batch ID: {}",
280 completed_batch_id
281 )
282 }
283
284 pub fn disconnect_all_clients_requested(&self) -> bool {
287 self.disconnect_all_clients_requested
288 .load(Ordering::Relaxed)
289 }
290
291 pub fn create_character(
292 &mut self,
293 entity: Entity,
294 requesting_player_uuid: String,
295 alias: String,
296 persisted_components: PersistedComponents,
297 ) {
298 if let Err(e) =
299 self.update_tx
300 .as_ref()
301 .unwrap()
302 .send(CharacterUpdaterAction::CreateCharacter {
303 entity,
304 player_uuid: requesting_player_uuid,
305 character_alias: alias,
306 persisted_components,
307 })
308 {
309 error!(?e, "Could not send character creation request");
310 }
311 }
312
313 pub fn edit_character(
314 &mut self,
315 entity: Entity,
316 requesting_player_uuid: String,
317 character_id: CharacterId,
318 alias: Option<String>,
319 editable_components: EditableComponents,
320 trusted_change: Option<PermanentChange>,
321 ) {
322 if let Err(e) =
323 self.update_tx
324 .as_ref()
325 .unwrap()
326 .send(CharacterUpdaterAction::EditCharacter {
327 entity,
328 player_uuid: requesting_player_uuid,
329 character_id,
330 character_alias: alias,
331 editable_components,
332 trusted_change,
333 })
334 {
335 error!(?e, "Could not send character edit request");
336 }
337 }
338
339 fn next_pending_database_event_id(&mut self) -> u64 {
340 self.last_pending_database_event_id += 1;
341 self.last_pending_database_event_id
342 }
343
344 pub fn queue_character_deletion(
345 &mut self,
346 requesting_player_uuid: String,
347 character_id: CharacterId,
348 ) {
349 self.pending_database_actions.insert(
353 character_id,
354 DatabaseAction::New(DatabaseActionKind::DeleteCharacter {
355 requesting_player_uuid,
356 character_id,
357 }),
358 );
359 }
360
361 pub fn batch_update(&mut self, updates: impl Iterator<Item = CharacterUpdateData>) {
363 let batch_id = self.next_pending_database_event_id();
364
365 let existing_pending_actions = self
368 .pending_database_actions
369 .iter_mut()
370 .filter_map(|(_, event)| event.take_new(batch_id));
371
372 let pending_actions = existing_pending_actions
374 .into_iter()
375 .chain(updates.map(|update| DatabaseActionKind::UpdateCharacter(Box::new(update))))
376 .collect::<Vec<DatabaseActionKind>>();
377
378 if !pending_actions.is_empty() {
379 debug!(
380 "Sending persistence update batch ID {} containing {} updates",
381 batch_id,
382 pending_actions.len()
383 );
384 if let Err(e) =
385 self.update_tx
386 .as_ref()
387 .unwrap()
388 .send(CharacterUpdaterAction::BatchUpdate {
389 batch_id,
390 updates: pending_actions,
391 })
392 {
393 error!(?e, "Could not send persistence batch update");
394 }
395 } else {
396 trace!("Skipping persistence batch - no pending updates")
397 }
398 }
399
400 pub fn disconnected_success(&mut self) {
403 self.update_tx
404 .as_ref()
405 .unwrap()
406 .send(CharacterUpdaterAction::DisconnectedSuccess)
407 .expect(
408 "Failed to send DisconnectedSuccess event - not sending this event will prevent \
409 future persistence batches from running",
410 );
411 }
412
413 pub fn messages(&self) -> TryIter<CharacterUpdaterMessage> { self.response_rx.try_iter() }
415}
416
417fn execute_batch_update(
418 updates: impl Iterator<Item = DatabaseActionKind>,
419 connection: &mut VelorenConnection,
420) -> Result<(), PersistenceError> {
421 let mut transaction = connection.connection.transaction()?;
422 transaction.set_drop_behavior(DropBehavior::Rollback);
423 trace!("Transaction started for character batch update");
424 updates.into_iter().try_for_each(|event| match event {
425 DatabaseActionKind::UpdateCharacter(box (
426 character_id,
427 stats,
428 inventory,
429 pets,
430 waypoint,
431 active_abilities,
432 map_marker,
433 )) => super::character::update(
434 character_id,
435 stats,
436 inventory,
437 pets,
438 waypoint,
439 active_abilities,
440 map_marker,
441 &mut transaction,
442 ),
443 DatabaseActionKind::DeleteCharacter {
444 requesting_player_uuid,
445 character_id,
446 } => super::character::delete_character(
447 &requesting_player_uuid,
448 character_id,
449 &mut transaction,
450 ),
451 })?;
452
453 transaction.commit()?;
454
455 trace!("Commit for character batch update completed");
456 Ok(())
457}
458
459fn execute_character_create(
460 entity: Entity,
461 alias: String,
462 requesting_player_uuid: &str,
463 persisted_components: PersistedComponents,
464 connection: &mut VelorenConnection,
465) -> Result<CharacterUpdaterMessage, PersistenceError> {
466 let mut transaction = connection.connection.transaction()?;
467
468 let response = CharacterScreenResponse {
469 target_entity: entity,
470 response_kind: CharacterScreenResponseKind::CharacterCreation(
471 super::character::create_character(
472 requesting_player_uuid,
473 &alias,
474 persisted_components,
475 &mut transaction,
476 ),
477 ),
478 };
479
480 if !response.is_err() {
481 transaction.commit()?;
482 };
483
484 Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
485}
486
487fn execute_character_edit(
488 entity: Entity,
489 character_id: CharacterId,
490 alias: Option<String>,
491 requesting_player_uuid: &str,
492 editable_components: EditableComponents,
493 trusted_change: Option<PermanentChange>,
494 connection: &mut VelorenConnection,
495) -> Result<CharacterUpdaterMessage, PersistenceError> {
496 let mut transaction = connection.connection.transaction()?;
497
498 let response = CharacterScreenResponse {
499 target_entity: entity,
500 response_kind: CharacterScreenResponseKind::CharacterEdit(
501 super::character::edit_character(
502 editable_components,
503 trusted_change,
504 &mut transaction,
505 character_id,
506 requesting_player_uuid,
507 alias.as_deref(),
508 ),
509 ),
510 };
511
512 if !response.is_err() {
513 transaction.commit()?;
514 };
515
516 Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
517}
518
519impl Drop for CharacterUpdater {
520 fn drop(&mut self) {
521 drop(self.update_tx.take());
522 if let Err(e) = self.handle.take().unwrap().join() {
523 error!(?e, "Error from joining character update thread");
524 }
525 }
526}