1use crate::comp;
2use common::character::CharacterId;
3
4use crate::persistence::{
5 ConnectionMode, DatabaseSettings, EditableComponents, PersistedComponents, VelorenConnection,
6 character_loader::{
7 CharacterScreenResponse, CharacterScreenResponseKind, CharacterUpdaterMessage,
8 },
9 error::PersistenceError,
10 establish_connection,
11};
12use crossbeam_channel::TryIter;
13use rusqlite::DropBehavior;
14use specs::Entity;
15use std::{
16 collections::HashMap,
17 sync::{
18 Arc, RwLock,
19 atomic::{AtomicBool, Ordering},
20 },
21};
22use tracing::{debug, error, info, trace, warn};
23
24pub type CharacterUpdateData = (
25 CharacterId,
26 comp::SkillSet,
27 comp::Inventory,
28 Vec<PetPersistenceData>,
29 Option<comp::Waypoint>,
30 comp::ability::ActiveAbilities,
31 Option<comp::MapMarker>,
32);
33
34pub type PetPersistenceData = (comp::Pet, comp::Body, comp::Stats);
35
36#[expect(clippy::large_enum_variant)]
37enum CharacterUpdaterAction {
38 BatchUpdate {
39 batch_id: u64,
40 updates: Vec<DatabaseActionKind>,
41 },
42 CreateCharacter {
43 entity: Entity,
44 player_uuid: String,
45 character_alias: String,
46 persisted_components: PersistedComponents,
47 },
48 EditCharacter {
49 entity: Entity,
50 player_uuid: String,
51 character_id: CharacterId,
52 character_alias: String,
53 editable_components: EditableComponents,
54 },
55 DisconnectedSuccess,
56}
57
58#[derive(Clone)]
59enum DatabaseAction {
60 New(DatabaseActionKind),
61 Submitted { batch_id: u64 },
62}
63
64impl DatabaseAction {
65 fn take_new(&mut self, batch_id: u64) -> Option<DatabaseActionKind> {
66 match core::mem::replace(self, Self::Submitted { batch_id }) {
67 Self::New(action) => Some(action),
68 submitted @ Self::Submitted { .. } => {
69 *self = submitted; None
71 },
72 }
73 }
74}
75
76#[derive(Clone)]
77enum DatabaseActionKind {
78 UpdateCharacter(Box<CharacterUpdateData>),
79 DeleteCharacter {
80 requesting_player_uuid: String,
81 character_id: CharacterId,
82 },
83}
84
85pub struct CharacterUpdater {
91 update_tx: Option<crossbeam_channel::Sender<CharacterUpdaterAction>>,
92 response_rx: crossbeam_channel::Receiver<CharacterUpdaterMessage>,
93 handle: Option<std::thread::JoinHandle<()>>,
94 pending_database_actions: HashMap<CharacterId, DatabaseAction>,
97 disconnect_all_clients_requested: Arc<AtomicBool>,
100 last_pending_database_event_id: u64,
101}
102
103impl CharacterUpdater {
104 pub fn new(settings: Arc<RwLock<DatabaseSettings>>) -> rusqlite::Result<Self> {
105 let (update_tx, update_rx) = crossbeam_channel::unbounded::<CharacterUpdaterAction>();
106 let (response_tx, response_rx) = crossbeam_channel::unbounded::<CharacterUpdaterMessage>();
107
108 let disconnect_all_clients_requested = Arc::new(AtomicBool::new(false));
109 let disconnect_all_clients_requested_clone = Arc::clone(&disconnect_all_clients_requested);
110
111 let builder = std::thread::Builder::new().name("persistence_updater".into());
112 let handle = builder
113 .spawn(move || {
114 let mut conn =
117 establish_connection(&settings.read().unwrap(), ConnectionMode::ReadWrite);
118 while let Ok(action) = update_rx.recv() {
119 match action {
120 CharacterUpdaterAction::BatchUpdate { batch_id, updates } => {
121 if disconnect_all_clients_requested_clone.load(Ordering::Relaxed) {
122 debug!(
123 "Skipping persistence due to pending disconnection of all \
124 clients"
125 );
126 continue;
127 }
128 conn.update_log_mode(&settings);
129
130 if let Err(e) = execute_batch_update(updates.into_iter(), &mut conn) {
131 error!(
132 ?e,
133 "Error during character batch update, disconnecting all \
134 clients to avoid loss of data integrity."
135 );
136 disconnect_all_clients_requested_clone
137 .store(true, Ordering::Relaxed);
138 };
139
140 if let Err(e) = response_tx
141 .send(CharacterUpdaterMessage::DatabaseBatchCompletion(batch_id))
142 {
143 error!(?e, "Could not send DatabaseBatchCompletion message");
144 } else {
145 debug!(
146 "Submitted DatabaseBatchCompletion - Batch ID: {}",
147 batch_id
148 );
149 }
150 },
151 CharacterUpdaterAction::CreateCharacter {
152 entity,
153 character_alias,
154 player_uuid,
155 persisted_components,
156 } => {
157 match execute_character_create(
158 entity,
159 character_alias,
160 &player_uuid,
161 persisted_components,
162 &mut conn,
163 ) {
164 Ok(response) => {
165 if let Err(e) = response_tx.send(response) {
166 error!(?e, "Could not send character creation response");
167 } else {
168 debug!(
169 "Processed character create for player {}",
170 player_uuid
171 );
172 }
173 },
174 Err(e) => error!(
175 "Error creating character for player {}, error: {:?}",
176 player_uuid, e
177 ),
178 }
179 },
180 CharacterUpdaterAction::EditCharacter {
181 entity,
182 character_id,
183 character_alias,
184 player_uuid,
185 editable_components,
186 } => {
187 match execute_character_edit(
188 entity,
189 character_id,
190 character_alias,
191 &player_uuid,
192 editable_components,
193 &mut conn,
194 ) {
195 Ok(response) => {
196 if let Err(e) = response_tx.send(response) {
197 error!(?e, "Could not send character edit response");
198 } else {
199 debug!(
200 "Processed character edit for player {}",
201 player_uuid
202 );
203 }
204 },
205 Err(e) => error!(
206 "Error editing character for player {}, error: {:?}",
207 player_uuid, e
208 ),
209 }
210 },
211 CharacterUpdaterAction::DisconnectedSuccess => {
212 info!(
213 "CharacterUpdater received DisconnectedSuccess event, resuming \
214 batch updates"
215 );
216 disconnect_all_clients_requested_clone.store(false, Ordering::Relaxed);
219 },
220 }
221 }
222 })
223 .unwrap();
224
225 Ok(Self {
226 update_tx: Some(update_tx),
227 response_rx,
228 handle: Some(handle),
229 pending_database_actions: HashMap::new(),
230 disconnect_all_clients_requested,
231 last_pending_database_event_id: 0,
232 })
233 }
234
235 pub fn add_pending_logout_update(&mut self, update_data: CharacterUpdateData) {
238 if self
239 .disconnect_all_clients_requested
240 .load(Ordering::Relaxed)
241 {
242 warn!(
243 "Ignoring request to add pending logout update for character ID {} as there is a \
244 disconnection of all clients in progress",
245 update_data.0.0
246 );
247 return;
248 }
249
250 if self.pending_database_actions.contains_key(&update_data.0) {
251 warn!(
252 "Ignoring request to add pending logout update for character ID {} as there is \
253 already a pending delete for this character",
254 update_data.0.0
255 );
256 return;
257 }
258
259 self.pending_database_actions.insert(
260 update_data.0, DatabaseAction::New(DatabaseActionKind::UpdateCharacter(Box::new(update_data))),
262 );
263 }
264
265 pub fn has_pending_database_action(&self, character_id: CharacterId) -> bool {
266 self.pending_database_actions.contains_key(&character_id)
267 }
268
269 pub fn process_batch_completion(&mut self, completed_batch_id: u64) {
270 self.pending_database_actions.retain(|_, event| {
271 !matches!(event, DatabaseAction::Submitted {
272 batch_id,
273 } if completed_batch_id == *batch_id)
274 });
275 debug!(
276 "Processed database batch completion - Batch ID: {}",
277 completed_batch_id
278 )
279 }
280
281 pub fn disconnect_all_clients_requested(&self) -> bool {
284 self.disconnect_all_clients_requested
285 .load(Ordering::Relaxed)
286 }
287
288 pub fn create_character(
289 &mut self,
290 entity: Entity,
291 requesting_player_uuid: String,
292 alias: String,
293 persisted_components: PersistedComponents,
294 ) {
295 if let Err(e) =
296 self.update_tx
297 .as_ref()
298 .unwrap()
299 .send(CharacterUpdaterAction::CreateCharacter {
300 entity,
301 player_uuid: requesting_player_uuid,
302 character_alias: alias,
303 persisted_components,
304 })
305 {
306 error!(?e, "Could not send character creation request");
307 }
308 }
309
310 pub fn edit_character(
311 &mut self,
312 entity: Entity,
313 requesting_player_uuid: String,
314 character_id: CharacterId,
315 alias: String,
316 editable_components: EditableComponents,
317 ) {
318 if let Err(e) =
319 self.update_tx
320 .as_ref()
321 .unwrap()
322 .send(CharacterUpdaterAction::EditCharacter {
323 entity,
324 player_uuid: requesting_player_uuid,
325 character_id,
326 character_alias: alias,
327 editable_components,
328 })
329 {
330 error!(?e, "Could not send character edit request");
331 }
332 }
333
334 fn next_pending_database_event_id(&mut self) -> u64 {
335 self.last_pending_database_event_id += 1;
336 self.last_pending_database_event_id
337 }
338
339 pub fn queue_character_deletion(
340 &mut self,
341 requesting_player_uuid: String,
342 character_id: CharacterId,
343 ) {
344 self.pending_database_actions.insert(
348 character_id,
349 DatabaseAction::New(DatabaseActionKind::DeleteCharacter {
350 requesting_player_uuid,
351 character_id,
352 }),
353 );
354 }
355
356 pub fn batch_update(&mut self, updates: impl Iterator<Item = CharacterUpdateData>) {
358 let batch_id = self.next_pending_database_event_id();
359
360 let existing_pending_actions = self
363 .pending_database_actions
364 .iter_mut()
365 .filter_map(|(_, event)| event.take_new(batch_id));
366
367 let pending_actions = existing_pending_actions
369 .into_iter()
370 .chain(updates.map(|update| DatabaseActionKind::UpdateCharacter(Box::new(update))))
371 .collect::<Vec<DatabaseActionKind>>();
372
373 if !pending_actions.is_empty() {
374 debug!(
375 "Sending persistence update batch ID {} containing {} updates",
376 batch_id,
377 pending_actions.len()
378 );
379 if let Err(e) =
380 self.update_tx
381 .as_ref()
382 .unwrap()
383 .send(CharacterUpdaterAction::BatchUpdate {
384 batch_id,
385 updates: pending_actions,
386 })
387 {
388 error!(?e, "Could not send persistence batch update");
389 }
390 } else {
391 trace!("Skipping persistence batch - no pending updates")
392 }
393 }
394
395 pub fn disconnected_success(&mut self) {
398 self.update_tx
399 .as_ref()
400 .unwrap()
401 .send(CharacterUpdaterAction::DisconnectedSuccess)
402 .expect(
403 "Failed to send DisconnectedSuccess event - not sending this event will prevent \
404 future persistence batches from running",
405 );
406 }
407
408 pub fn messages(&self) -> TryIter<CharacterUpdaterMessage> { self.response_rx.try_iter() }
410}
411
412fn execute_batch_update(
413 updates: impl Iterator<Item = DatabaseActionKind>,
414 connection: &mut VelorenConnection,
415) -> Result<(), PersistenceError> {
416 let mut transaction = connection.connection.transaction()?;
417 transaction.set_drop_behavior(DropBehavior::Rollback);
418 trace!("Transaction started for character batch update");
419 updates.into_iter().try_for_each(|event| match event {
420 DatabaseActionKind::UpdateCharacter(box (
421 character_id,
422 stats,
423 inventory,
424 pets,
425 waypoint,
426 active_abilities,
427 map_marker,
428 )) => super::character::update(
429 character_id,
430 stats,
431 inventory,
432 pets,
433 waypoint,
434 active_abilities,
435 map_marker,
436 &mut transaction,
437 ),
438 DatabaseActionKind::DeleteCharacter {
439 requesting_player_uuid,
440 character_id,
441 } => super::character::delete_character(
442 &requesting_player_uuid,
443 character_id,
444 &mut transaction,
445 ),
446 })?;
447
448 transaction.commit()?;
449
450 trace!("Commit for character batch update completed");
451 Ok(())
452}
453
454fn execute_character_create(
455 entity: Entity,
456 alias: String,
457 requesting_player_uuid: &str,
458 persisted_components: PersistedComponents,
459 connection: &mut VelorenConnection,
460) -> Result<CharacterUpdaterMessage, PersistenceError> {
461 let mut transaction = connection.connection.transaction()?;
462
463 let response = CharacterScreenResponse {
464 target_entity: entity,
465 response_kind: CharacterScreenResponseKind::CharacterCreation(
466 super::character::create_character(
467 requesting_player_uuid,
468 &alias,
469 persisted_components,
470 &mut transaction,
471 ),
472 ),
473 };
474
475 if !response.is_err() {
476 transaction.commit()?;
477 };
478
479 Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
480}
481
482fn execute_character_edit(
483 entity: Entity,
484 character_id: CharacterId,
485 alias: String,
486 requesting_player_uuid: &str,
487 editable_components: EditableComponents,
488 connection: &mut VelorenConnection,
489) -> Result<CharacterUpdaterMessage, PersistenceError> {
490 let mut transaction = connection.connection.transaction()?;
491
492 let response = CharacterScreenResponse {
493 target_entity: entity,
494 response_kind: CharacterScreenResponseKind::CharacterEdit(
495 super::character::edit_character(
496 editable_components,
497 &mut transaction,
498 character_id,
499 requesting_player_uuid,
500 &alias,
501 ),
502 ),
503 };
504
505 if !response.is_err() {
506 transaction.commit()?;
507 };
508
509 Ok(CharacterUpdaterMessage::CharacterScreenResponse(response))
510}
511
512impl Drop for CharacterUpdater {
513 fn drop(&mut self) {
514 drop(self.update_tx.take());
515 if let Err(e) = self.handle.take().unwrap().join() {
516 error!(?e, "Error from joining character update thread");
517 }
518 }
519}