From 5c02dc783066e98b445206c18cebe86972620de2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 14 Sep 2021 14:23:43 +0200 Subject: [PATCH] improvement: batch inserts for inserting pdus --- src/client_server/membership.rs | 9 ++++----- src/database/rooms.rs | 9 ++++----- src/database/sending.rs | 26 ++++++++++++++++++-------- src/server_server.rs | 9 ++++----- 4 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 10c052ea..146af791 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -1092,14 +1092,13 @@ pub(crate) async fn invite_helper<'a>( "Could not accept incoming PDU as timeline event.", ))?; - for server in db + let servers = db .rooms .room_servers(room_id) .filter_map(|r| r.ok()) - .filter(|server| &**server != db.globals.server_name()) - { - db.sending.send_pdu(&server, &pdu_id)?; - } + .filter(|server| &**server != db.globals.server_name()); + + db.sending.send_pdu(servers, &pdu_id)?; return Ok(()); } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index b272a5ce..ec03e3ac 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -2104,13 +2104,12 @@ impl Rooms { // where events in the current room state do not exist self.set_room_state(room_id, statehashid)?; - for server in self + let servers = self .room_servers(room_id) .filter_map(|r| r.ok()) - .filter(|server| &**server != db.globals.server_name()) - { - db.sending.send_pdu(&server, &pdu_id)?; - } + .filter(|server| &**server != db.globals.server_name()); + + db.sending.send_pdu(servers, &pdu_id)?; for appservice in db.appservice.all()? { if self.appservice_in_room(room_id, &appservice, db)? { diff --git a/src/database/sending.rs b/src/database/sending.rs index c14f5819..70ff1b6d 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -84,7 +84,7 @@ pub enum SendingEventType { pub struct Sending { /// The state for a given state hash. pub(super) servername_educount: Arc, // EduCount: Count of last EDU sync - pub(super) servernameevent_data: Arc, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content + pub(super) servernameevent_data: Arc, // ServernameEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content pub(super) servercurrentevent_data: Arc, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content pub(super) maximum_requests: Arc, pub sender: mpsc::UnboundedSender<(Vec, Vec)>, @@ -423,13 +423,23 @@ impl Sending { Ok(()) } - #[tracing::instrument(skip(self, server, pdu_id))] - pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> { - let mut key = server.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(pdu_id); - self.servernameevent_data.insert(&key, &[])?; - self.sender.unbounded_send((key, vec![])).unwrap(); + #[tracing::instrument(skip(self, servers, pdu_id))] + pub fn send_pdu>>( + &self, + servers: I, + pdu_id: &[u8], + ) -> Result<()> { + let mut batch = servers.map(|server| { + let mut key = server.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(pdu_id); + + self.sender.unbounded_send((key.clone(), vec![])).unwrap(); + + (key, Vec::new()) + }); + + self.servernameevent_data.insert_batch(&mut batch)?; Ok(()) } diff --git a/src/server_server.rs b/src/server_server.rs index 1d9ba61d..2b8b06c7 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -2880,14 +2880,13 @@ async fn create_join_event( db, )?; - for server in db + let servers = db .rooms .room_servers(room_id) .filter_map(|r| r.ok()) - .filter(|server| &**server != db.globals.server_name()) - { - db.sending.send_pdu(&server, &pdu_id)?; - } + .filter(|server| &**server != db.globals.server_name()); + + db.sending.send_pdu(servers, &pdu_id)?; db.flush()?;