veloren_server/
chat.rs

1use chrono::{DateTime, Utc};
2use common::{
3    comp,
4    comp::{ChatType, Content, Group, Player, UnresolvedChatMsg, chat::KillType},
5    uid::IdMaps,
6    uuid::Uuid,
7};
8use serde::{Deserialize, Serialize};
9use specs::{Join, World, WorldExt};
10use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration};
11use tokio::sync::Mutex;
12use tracing::{Instrument, info_span};
13
14#[derive(Clone, Serialize, Deserialize)]
15pub struct PlayerInfo {
16    uuid: Uuid,
17    alias: String,
18}
19
20/// Enum representing death reasons
21///
22/// All variants should be strictly typed, no string content.
23#[derive(Clone, Serialize, Deserialize)]
24pub enum KillSource {
25    Player(PlayerInfo, KillType),
26    NonPlayer(String, KillType),
27    NonExistent(KillType),
28    FallDamage,
29    Suicide,
30    Other,
31}
32
33#[derive(Clone, Serialize, Deserialize)]
34/// partially mapped to common::comp::ChatMsg
35pub enum ChatParties {
36    Online(PlayerInfo),
37    Offline(PlayerInfo),
38    CommandInfo(PlayerInfo),
39    CommandError(PlayerInfo),
40    Kill(KillSource, PlayerInfo),
41    GroupMeta(Vec<PlayerInfo>),
42    Group(PlayerInfo, Vec<PlayerInfo>),
43    Tell(PlayerInfo, PlayerInfo),
44    Say(PlayerInfo),
45    FactionMeta(String),
46    Faction(PlayerInfo, String),
47    Region(PlayerInfo),
48    World(PlayerInfo),
49}
50
51#[derive(Clone, Serialize, Deserialize)]
52pub struct ChatMessage {
53    pub time: DateTime<Utc>,
54    pub parties: ChatParties,
55    pub content: Content,
56}
57
58type MessagesStore = Arc<Mutex<VecDeque<ChatMessage>>>;
59
60/// The chat cache gets it data from the gameserver and will keep it for some
61/// time It will be made available for its consumers, the REST Api
62#[derive(Clone)]
63pub struct ChatCache {
64    pub messages: MessagesStore,
65}
66
67/// Will internally run on tokio and take stress from main loop
68struct ChatForwarder {
69    chat_r: tokio::sync::mpsc::Receiver<ChatMessage>,
70    messages: MessagesStore,
71    keep_duration: chrono::Duration,
72}
73
74pub struct ChatExporter {
75    chat_s: tokio::sync::mpsc::Sender<ChatMessage>,
76}
77
78impl ChatMessage {
79    fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self {
80        ChatMessage {
81            time: Utc::now(),
82            content: chatmsg.content().clone(),
83            parties,
84        }
85    }
86}
87
88impl ChatExporter {
89    pub fn generate(chatmsg: &UnresolvedChatMsg, ecs: &World) -> Option<ChatMessage> {
90        let id_maps = ecs.read_resource::<IdMaps>();
91        let players = ecs.read_storage::<Player>();
92        let player_info_from_uid = |uid| {
93            id_maps
94                .uid_entity(uid)
95                .and_then(|entry| players.get(entry))
96                .map(|player| PlayerInfo {
97                    alias: player.alias.clone(),
98                    uuid: player.uuid(),
99                })
100        };
101        let group_members_from_group = |g| -> Vec<_> {
102            let groups = ecs.read_storage::<Group>();
103            (&players, &groups)
104                .join()
105                .filter_map(|(player, group)| {
106                    if g == group {
107                        Some(PlayerInfo {
108                            alias: player.alias.clone(),
109                            uuid: player.uuid(),
110                        })
111                    } else {
112                        None
113                    }
114                })
115                .collect()
116        };
117
118        match &chatmsg.chat_type {
119            ChatType::Offline(from) => {
120                if let Some(player_info) = player_info_from_uid(*from) {
121                    return Some(ChatMessage::new(chatmsg, ChatParties::Offline(player_info)));
122                }
123            },
124            ChatType::Online(from) => {
125                if let Some(player_info) = player_info_from_uid(*from) {
126                    return Some(ChatMessage::new(chatmsg, ChatParties::Online(player_info)));
127                }
128            },
129            ChatType::Region(from) => {
130                if let Some(player_info) = player_info_from_uid(*from) {
131                    return Some(ChatMessage::new(chatmsg, ChatParties::Region(player_info)));
132                }
133            },
134            ChatType::World(from) => {
135                if let Some(player_info) = player_info_from_uid(*from) {
136                    return Some(ChatMessage::new(chatmsg, ChatParties::World(player_info)));
137                }
138            },
139            ChatType::Say(from) => {
140                if let Some(player_info) = player_info_from_uid(*from) {
141                    return Some(ChatMessage::new(chatmsg, ChatParties::Say(player_info)));
142                }
143            },
144            ChatType::Tell(from, to) => {
145                if let (Some(from_player_info), Some(to_player_info)) =
146                    (player_info_from_uid(*from), player_info_from_uid(*to))
147                {
148                    return Some(ChatMessage::new(
149                        chatmsg,
150                        ChatParties::Tell(from_player_info, to_player_info),
151                    ));
152                }
153            },
154            ChatType::Kill(kill_source, from) => {
155                let kill_source = match kill_source.clone() {
156                    comp::chat::KillSource::Player(uid, t) => {
157                        if let Some(player_info) = player_info_from_uid(uid) {
158                            KillSource::Player(player_info, t)
159                        } else {
160                            return None;
161                        }
162                    },
163                    comp::chat::KillSource::NonPlayer(str, t) => KillSource::NonPlayer(str, t),
164                    comp::chat::KillSource::NonExistent(t) => KillSource::NonExistent(t),
165                    comp::chat::KillSource::FallDamage => KillSource::FallDamage,
166                    comp::chat::KillSource::Suicide => KillSource::Suicide,
167                    comp::chat::KillSource::Other => KillSource::Other,
168                };
169                if let Some(player_info) = player_info_from_uid(*from) {
170                    return Some(ChatMessage::new(
171                        chatmsg,
172                        ChatParties::Kill(kill_source, player_info),
173                    ));
174                }
175            },
176            ChatType::FactionMeta(s) => {
177                return Some(ChatMessage::new(
178                    chatmsg,
179                    ChatParties::FactionMeta(s.clone()),
180                ));
181            },
182            ChatType::Faction(from, s) => {
183                if let Some(player_info) = player_info_from_uid(*from) {
184                    return Some(ChatMessage::new(
185                        chatmsg,
186                        ChatParties::Faction(player_info, s.clone()),
187                    ));
188                }
189            },
190            ChatType::GroupMeta(g) => {
191                let members = group_members_from_group(g);
192                return Some(ChatMessage::new(chatmsg, ChatParties::GroupMeta(members)));
193            },
194            ChatType::Group(from, g) => {
195                let members = group_members_from_group(g);
196                if let Some(player_info) = player_info_from_uid(*from) {
197                    return Some(ChatMessage::new(
198                        chatmsg,
199                        ChatParties::Group(player_info, members),
200                    ));
201                }
202            },
203            _ => (),
204        };
205
206        None
207    }
208
209    pub fn send(&self, msg: ChatMessage) {
210        if let Err(e) = self.chat_s.blocking_send(msg) {
211            tracing::warn!(
212                ?e,
213                "could not export chat message. the tokio sender seems to be broken"
214            );
215        }
216    }
217}
218
219impl ChatForwarder {
220    async fn run(mut self) {
221        while let Some(msg) = self.chat_r.recv().await {
222            let drop_older_than = msg.time.sub(self.keep_duration);
223            let mut messages = self.messages.lock().await;
224            while let Some(msg) = messages.front()
225                && msg.time < drop_older_than
226            {
227                messages.pop_front();
228            }
229            messages.push_back(msg);
230            const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever
231            if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES {
232                let msg_count = messages.len();
233                tracing::debug!(?msg_count, "shrinking cache");
234                messages.shrink_to_fit();
235            }
236        }
237    }
238}
239
240impl ChatCache {
241    pub fn new(keep_duration: Duration, runtime: &tokio::runtime::Runtime) -> (Self, ChatExporter) {
242        const BUFFER_SIZE: usize = 1_000;
243        let (chat_s, chat_r) = tokio::sync::mpsc::channel(BUFFER_SIZE);
244        let messages: Arc<Mutex<VecDeque<ChatMessage>>> = Default::default();
245        let messages_clone = Arc::clone(&messages);
246        let keep_duration = chrono::Duration::from_std(keep_duration).unwrap();
247
248        let worker = ChatForwarder {
249            keep_duration,
250            chat_r,
251            messages: messages_clone,
252        };
253
254        runtime.spawn(worker.run().instrument(info_span!("chat_forwarder")));
255
256        (Self { messages }, ChatExporter { chat_s })
257    }
258}