diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 103daeb0..ba2ceefa 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -1055,10 +1055,10 @@ async fn load_joined_room( .map(|(_, _, v)| v) .collect(); - if services().rooms.edus.typing.last_typing_update(room_id)? > since { + if services().rooms.edus.typing.last_typing_update(room_id).await? > since { edus.push( serde_json::from_str( - &serde_json::to_string(&services().rooms.edus.typing.typings_all(room_id)?) + &serde_json::to_string(&services().rooms.edus.typing.typings_all(room_id).await?) .expect("event is valid, we just created it"), ) .expect("event is valid, we just created it"), diff --git a/src/api/client_server/typing.rs b/src/api/client_server/typing.rs index 43217e1a..eff84051 100644 --- a/src/api/client_server/typing.rs +++ b/src/api/client_server/typing.rs @@ -27,13 +27,13 @@ pub async fn create_typing_event_route( sender_user, &body.room_id, duration.as_millis() as u64 + utils::millis_since_unix_epoch(), - )?; + ).await?; } else { services() .rooms .edus .typing - .typing_remove(sender_user, &body.room_id)?; + .typing_remove(sender_user, &body.room_id).await?; } Ok(create_typing_event::v3::Response {}) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index e0335493..59724b03 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -830,13 +830,13 @@ pub async fn send_transaction_message_route( &typing.user_id, &typing.room_id, 3000 + utils::millis_since_unix_epoch(), - )?; + ).await?; } else { services() .rooms .edus .typing - .typing_remove(&typing.user_id, &typing.room_id)?; + .typing_remove(&typing.user_id, &typing.room_id).await?; } } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 8f9fb0a5..8c97cdd0 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -7,6 +7,7 @@ use lru_cache::LruCache; use tokio::sync::Mutex; use crate::{Config, Result}; +use tokio::sync::RwLock; pub mod account_data; pub mod admin; @@ -65,7 +66,7 @@ impl Services { edus: rooms::edus::Service { presence: rooms::edus::presence::Service { db }, read_receipt: rooms::edus::read_receipt::Service { db }, - typing: rooms::edus::typing::Service { db }, + typing: rooms::edus::typing::Service { db, typing: RwLock::new(BTreeMap::new()), last_typing_update: RwLock::new(BTreeMap::new()) }, }, event_handler: rooms::event_handler::Service, lazy_loading: rooms::lazy_loading::Service { diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs index 7d44f7d7..f3436741 100644 --- a/src/service/rooms/edus/typing/mod.rs +++ b/src/service/rooms/edus/typing/mod.rs @@ -1,48 +1,73 @@ mod data; pub use data::Data; -use ruma::{events::SyncEphemeralRoomEvent, RoomId, UserId}; +use ruma::{events::SyncEphemeralRoomEvent, RoomId, UserId, OwnedRoomId, OwnedUserId}; +use tokio::sync::RwLock; +use std::collections::BTreeMap; -use crate::Result; +use crate::{utils, services, Result}; pub struct Service { pub db: &'static dyn Data, + pub typing: RwLock>>, // u64 is unix timestamp of timeout + pub last_typing_update: RwLock>, // timestamp of the last change to typing users } impl Service { /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is /// called. - pub fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { - self.db.typing_add(user_id, room_id, timeout) + pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { + self.typing.write().await.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), timeout); + self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + Ok(()) } /// Removes a user from typing before the timeout is reached. - pub fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { - self.db.typing_remove(user_id, room_id) + pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + self.typing.write().await.entry(room_id.to_owned()).or_default().remove(user_id); + self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + Ok(()) } /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { - self.db.typings_maintain(room_id) + async fn typings_maintain(&self, room_id: &RoomId) -> Result<()> { + let current_timestamp = utils::millis_since_unix_epoch(); + let mut removable = Vec::new(); + { + let typing = self.typing.read().await; + let Some(room) = typing.get(room_id) else { return Ok(()); }; + for (user, timeout) in room { + if *timeout < current_timestamp { + removable.push(user.clone()); + } + } + drop(typing); + } + if !removable.is_empty() { + let typing = &mut self.typing.write().await; + let room = typing.entry(room_id.to_owned()).or_default(); + for user in removable { + room.remove(&user); + } + self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?); + } + Ok(()) } /// Returns the count of the last typing update in this room. - pub fn last_typing_update(&self, room_id: &RoomId) -> Result { - self.typings_maintain(room_id)?; - - self.db.last_typing_update(room_id) + pub async fn last_typing_update(&self, room_id: &RoomId) -> Result { + self.typings_maintain(room_id).await?; + Ok(self.last_typing_update.read().await.get(room_id).copied().unwrap_or(0)) } /// Returns a new typing EDU. - pub fn typings_all( + pub async fn typings_all( &self, room_id: &RoomId, ) -> Result> { - let user_ids = self.db.typings_all(room_id)?; - Ok(SyncEphemeralRoomEvent { content: ruma::events::typing::TypingEventContent { - user_ids: user_ids.into_iter().collect(), + user_ids: self.typing.read().await.get(room_id).map(|m| m.keys().cloned().collect()).unwrap_or_default(), }, }) }