1use chrono::{DateTime, Utc};
2use common::{
3 comp,
4 comp::{ChatType, Content, Group, Player, UnresolvedChatMsg, chat::KillType},
5 uid::IdMaps,
6 uuid::Uuid,
7};
8use serde::{Deserialize, Serialize};
9use specs::{Join, World, WorldExt};
10use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration};
11use tokio::sync::Mutex;
12use tracing::{Instrument, info_span};
13
14#[derive(Clone, Serialize, Deserialize)]
15pub struct PlayerInfo {
16 uuid: Uuid,
17 alias: String,
18}
19
20#[derive(Clone, Serialize, Deserialize)]
24pub enum KillSource {
25 Player(PlayerInfo, KillType),
26 NonPlayer(String, KillType),
27 NonExistent(KillType),
28 FallDamage,
29 Suicide,
30 Other,
31}
32
33#[derive(Clone, Serialize, Deserialize)]
34pub enum ChatParties {
36 Online(PlayerInfo),
37 Offline(PlayerInfo),
38 CommandInfo(PlayerInfo),
39 CommandError(PlayerInfo),
40 Kill(KillSource, PlayerInfo),
41 GroupMeta(Vec<PlayerInfo>),
42 Group(PlayerInfo, Vec<PlayerInfo>),
43 Tell(PlayerInfo, PlayerInfo),
44 Say(PlayerInfo),
45 FactionMeta(String),
46 Faction(PlayerInfo, String),
47 Region(PlayerInfo),
48 World(PlayerInfo),
49}
50
51#[derive(Clone, Serialize, Deserialize)]
52pub struct ChatMessage {
53 pub time: DateTime<Utc>,
54 pub parties: ChatParties,
55 pub content: Content,
56}
57
58type MessagesStore = Arc<Mutex<VecDeque<ChatMessage>>>;
59
60#[derive(Clone)]
63pub struct ChatCache {
64 pub messages: MessagesStore,
65}
66
67struct ChatForwarder {
69 chat_r: tokio::sync::mpsc::Receiver<ChatMessage>,
70 messages: MessagesStore,
71 keep_duration: chrono::Duration,
72}
73
74pub struct ChatExporter {
75 chat_s: tokio::sync::mpsc::Sender<ChatMessage>,
76}
77
78impl ChatMessage {
79 fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self {
80 ChatMessage {
81 time: Utc::now(),
82 content: chatmsg.content().clone(),
83 parties,
84 }
85 }
86}
87
88impl ChatExporter {
89 pub fn generate(chatmsg: &UnresolvedChatMsg, ecs: &World) -> Option<ChatMessage> {
90 let id_maps = ecs.read_resource::<IdMaps>();
91 let players = ecs.read_storage::<Player>();
92 let player_info_from_uid = |uid| {
93 id_maps
94 .uid_entity(uid)
95 .and_then(|entry| players.get(entry))
96 .map(|player| PlayerInfo {
97 alias: player.alias.clone(),
98 uuid: player.uuid(),
99 })
100 };
101 let group_members_from_group = |g| -> Vec<_> {
102 let groups = ecs.read_storage::<Group>();
103 (&players, &groups)
104 .join()
105 .filter_map(|(player, group)| {
106 if g == group {
107 Some(PlayerInfo {
108 alias: player.alias.clone(),
109 uuid: player.uuid(),
110 })
111 } else {
112 None
113 }
114 })
115 .collect()
116 };
117
118 match &chatmsg.chat_type {
119 ChatType::Offline(from) => {
120 if let Some(player_info) = player_info_from_uid(*from) {
121 return Some(ChatMessage::new(chatmsg, ChatParties::Offline(player_info)));
122 }
123 },
124 ChatType::Online(from) => {
125 if let Some(player_info) = player_info_from_uid(*from) {
126 return Some(ChatMessage::new(chatmsg, ChatParties::Online(player_info)));
127 }
128 },
129 ChatType::Region(from) => {
130 if let Some(player_info) = player_info_from_uid(*from) {
131 return Some(ChatMessage::new(chatmsg, ChatParties::Region(player_info)));
132 }
133 },
134 ChatType::World(from) => {
135 if let Some(player_info) = player_info_from_uid(*from) {
136 return Some(ChatMessage::new(chatmsg, ChatParties::World(player_info)));
137 }
138 },
139 ChatType::Say(from) => {
140 if let Some(player_info) = player_info_from_uid(*from) {
141 return Some(ChatMessage::new(chatmsg, ChatParties::Say(player_info)));
142 }
143 },
144 ChatType::Tell(from, to) => {
145 if let (Some(from_player_info), Some(to_player_info)) =
146 (player_info_from_uid(*from), player_info_from_uid(*to))
147 {
148 return Some(ChatMessage::new(
149 chatmsg,
150 ChatParties::Tell(from_player_info, to_player_info),
151 ));
152 }
153 },
154 ChatType::Kill(kill_source, from) => {
155 let kill_source = match kill_source.clone() {
156 comp::chat::KillSource::Player(uid, t) => {
157 if let Some(player_info) = player_info_from_uid(uid) {
158 KillSource::Player(player_info, t)
159 } else {
160 return None;
161 }
162 },
163 comp::chat::KillSource::NonPlayer(str, t) => KillSource::NonPlayer(str, t),
164 comp::chat::KillSource::NonExistent(t) => KillSource::NonExistent(t),
165 comp::chat::KillSource::FallDamage => KillSource::FallDamage,
166 comp::chat::KillSource::Suicide => KillSource::Suicide,
167 comp::chat::KillSource::Other => KillSource::Other,
168 };
169 if let Some(player_info) = player_info_from_uid(*from) {
170 return Some(ChatMessage::new(
171 chatmsg,
172 ChatParties::Kill(kill_source, player_info),
173 ));
174 }
175 },
176 ChatType::FactionMeta(s) => {
177 return Some(ChatMessage::new(
178 chatmsg,
179 ChatParties::FactionMeta(s.clone()),
180 ));
181 },
182 ChatType::Faction(from, s) => {
183 if let Some(player_info) = player_info_from_uid(*from) {
184 return Some(ChatMessage::new(
185 chatmsg,
186 ChatParties::Faction(player_info, s.clone()),
187 ));
188 }
189 },
190 ChatType::GroupMeta(g) => {
191 let members = group_members_from_group(g);
192 return Some(ChatMessage::new(chatmsg, ChatParties::GroupMeta(members)));
193 },
194 ChatType::Group(from, g) => {
195 let members = group_members_from_group(g);
196 if let Some(player_info) = player_info_from_uid(*from) {
197 return Some(ChatMessage::new(
198 chatmsg,
199 ChatParties::Group(player_info, members),
200 ));
201 }
202 },
203 _ => (),
204 };
205
206 None
207 }
208
209 pub fn send(&self, msg: ChatMessage) {
210 if let Err(e) = self.chat_s.blocking_send(msg) {
211 tracing::warn!(
212 ?e,
213 "could not export chat message. the tokio sender seems to be broken"
214 );
215 }
216 }
217}
218
219impl ChatForwarder {
220 async fn run(mut self) {
221 while let Some(msg) = self.chat_r.recv().await {
222 let drop_older_than = msg.time.sub(self.keep_duration);
223 let mut messages = self.messages.lock().await;
224 while let Some(msg) = messages.front()
225 && msg.time < drop_older_than
226 {
227 messages.pop_front();
228 }
229 messages.push_back(msg);
230 const MAX_CACHE_MESSAGES: usize = 10_000; if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES {
232 let msg_count = messages.len();
233 tracing::debug!(?msg_count, "shrinking cache");
234 messages.shrink_to_fit();
235 }
236 }
237 }
238}
239
240impl ChatCache {
241 pub fn new(keep_duration: Duration, runtime: &tokio::runtime::Runtime) -> (Self, ChatExporter) {
242 const BUFFER_SIZE: usize = 1_000;
243 let (chat_s, chat_r) = tokio::sync::mpsc::channel(BUFFER_SIZE);
244 let messages: Arc<Mutex<VecDeque<ChatMessage>>> = Default::default();
245 let messages_clone = Arc::clone(&messages);
246 let keep_duration = chrono::Duration::from_std(keep_duration).unwrap();
247
248 let worker = ChatForwarder {
249 keep_duration,
250 chat_r,
251 messages: messages_clone,
252 };
253
254 runtime.spawn(worker.run().instrument(info_span!("chat_forwarder")));
255
256 (Self { messages }, ChatExporter { chat_s })
257 }
258}