1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2024-11-04 17:29:14 +01:00

improvement: batch inserts for inserting pdus

This commit is contained in:
Timo Kösters 2021-09-14 14:23:43 +02:00
parent 159e22e450
commit 5c02dc7830
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
4 changed files with 30 additions and 23 deletions

View file

@ -1092,14 +1092,13 @@ pub(crate) async fn invite_helper<'a>(
"Could not accept incoming PDU as timeline event.", "Could not accept incoming PDU as timeline event.",
))?; ))?;
for server in db let servers = db
.rooms .rooms
.room_servers(room_id) .room_servers(room_id)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.filter(|server| &**server != db.globals.server_name()) .filter(|server| &**server != db.globals.server_name());
{
db.sending.send_pdu(&server, &pdu_id)?; db.sending.send_pdu(servers, &pdu_id)?;
}
return Ok(()); return Ok(());
} }

View file

@ -2104,13 +2104,12 @@ impl Rooms {
// where events in the current room state do not exist // where events in the current room state do not exist
self.set_room_state(room_id, statehashid)?; self.set_room_state(room_id, statehashid)?;
for server in self let servers = self
.room_servers(room_id) .room_servers(room_id)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.filter(|server| &**server != db.globals.server_name()) .filter(|server| &**server != db.globals.server_name());
{
db.sending.send_pdu(&server, &pdu_id)?; db.sending.send_pdu(servers, &pdu_id)?;
}
for appservice in db.appservice.all()? { for appservice in db.appservice.all()? {
if self.appservice_in_room(room_id, &appservice, db)? { if self.appservice_in_room(room_id, &appservice, db)? {

View file

@ -84,7 +84,7 @@ pub enum SendingEventType {
pub struct Sending { pub struct Sending {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) servername_educount: Arc<dyn Tree>, // EduCount: Count of last EDU sync pub(super) servername_educount: Arc<dyn Tree>, // EduCount: Count of last EDU sync
pub(super) servernameevent_data: Arc<dyn Tree>, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content pub(super) servernameevent_data: Arc<dyn Tree>, // ServernameEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub(super) servercurrentevent_data: Arc<dyn Tree>, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content pub(super) servercurrentevent_data: Arc<dyn Tree>, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(Vec<u8>, Vec<u8>)>, pub sender: mpsc::UnboundedSender<(Vec<u8>, Vec<u8>)>,
@ -423,13 +423,23 @@ impl Sending {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, server, pdu_id))] #[tracing::instrument(skip(self, servers, pdu_id))]
pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> { pub fn send_pdu<I: Iterator<Item = Box<ServerName>>>(
let mut key = server.as_bytes().to_vec(); &self,
key.push(0xff); servers: I,
key.extend_from_slice(pdu_id); pdu_id: &[u8],
self.servernameevent_data.insert(&key, &[])?; ) -> Result<()> {
self.sender.unbounded_send((key, vec![])).unwrap(); let mut batch = servers.map(|server| {
let mut key = server.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(pdu_id);
self.sender.unbounded_send((key.clone(), vec![])).unwrap();
(key, Vec::new())
});
self.servernameevent_data.insert_batch(&mut batch)?;
Ok(()) Ok(())
} }

View file

@ -2880,14 +2880,13 @@ async fn create_join_event(
db, db,
)?; )?;
for server in db let servers = db
.rooms .rooms
.room_servers(room_id) .room_servers(room_id)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.filter(|server| &**server != db.globals.server_name()) .filter(|server| &**server != db.globals.server_name());
{
db.sending.send_pdu(&server, &pdu_id)?; db.sending.send_pdu(servers, &pdu_id)?;
}
db.flush()?; db.flush()?;