1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2024-11-09 06:21:06 +01:00

improvement: save member count + sled fixes

This commit is contained in:
Timo Kösters 2021-08-04 21:15:01 +02:00
parent b813c34642
commit 902404d48d
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
6 changed files with 65 additions and 19 deletions

View file

@ -1,3 +1,5 @@
use std::convert::TryInto;
use crate::{database::DatabaseGuard, ConduitResult, Database, Error, Result, Ruma}; use crate::{database::DatabaseGuard, ConduitResult, Database, Error, Result, Ruma};
use ruma::{ use ruma::{
api::{ api::{
@ -21,7 +23,7 @@ use ruma::{
serde::Raw, serde::Raw,
ServerName, UInt, ServerName, UInt,
}; };
use tracing::info; use tracing::{info, warn};
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
use rocket::{get, post, put}; use rocket::{get, post, put};
@ -234,7 +236,15 @@ pub async fn get_public_rooms_filtered_helper(
.name .name
.map(|n| n.to_owned().into())) .map(|n| n.to_owned().into()))
})?, })?,
num_joined_members: (db.rooms.room_members(&room_id).count() as u32).into(), num_joined_members: db
.rooms
.room_joined_count(&room_id)?
.unwrap_or_else(|| {
warn!("Room {} has no member count", room_id);
0
})
.try_into()
.expect("user count should not be that big"),
topic: db topic: db
.rooms .rooms
.room_state_get(&room_id, &EventType::RoomTopic, "")? .room_state_get(&room_id, &EventType::RoomTopic, "")?

View file

@ -24,10 +24,11 @@ use rocket::{
request::{FromRequest, Request}, request::{FromRequest, Request},
Shutdown, State, Shutdown, State,
}; };
use ruma::{DeviceId, ServerName, UserId}; use ruma::{DeviceId, RoomId, ServerName, UserId};
use serde::{de::IgnoredAny, Deserialize}; use serde::{de::IgnoredAny, Deserialize};
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
convert::TryFrom,
fs::{self, remove_dir_all}, fs::{self, remove_dir_all},
io::Write, io::Write,
ops::Deref, ops::Deref,
@ -252,6 +253,7 @@ impl Database {
serverroomids: builder.open_tree("serverroomids")?, serverroomids: builder.open_tree("serverroomids")?,
userroomid_joined: builder.open_tree("userroomid_joined")?, userroomid_joined: builder.open_tree("userroomid_joined")?,
roomuserid_joined: builder.open_tree("roomuserid_joined")?, roomuserid_joined: builder.open_tree("roomuserid_joined")?,
roomid_joinedcount: builder.open_tree("roomid_joinedcount")?,
roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?, roomuseroncejoinedids: builder.open_tree("roomuseroncejoinedids")?,
userroomid_invitestate: builder.open_tree("userroomid_invitestate")?, userroomid_invitestate: builder.open_tree("userroomid_invitestate")?,
roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?, roomuserid_invitecount: builder.open_tree("roomuserid_invitecount")?,
@ -271,8 +273,8 @@ impl Database {
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
referencedevents: builder.open_tree("referencedevents")?, referencedevents: builder.open_tree("referencedevents")?,
pdu_cache: Mutex::new(LruCache::new(1_000_000)), pdu_cache: Mutex::new(LruCache::new(0)),
auth_chain_cache: Mutex::new(LruCache::new(1_000_000)), auth_chain_cache: Mutex::new(LruCache::new(0)),
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
@ -423,6 +425,20 @@ impl Database {
println!("Migration: 4 -> 5 finished"); println!("Migration: 4 -> 5 finished");
} }
if db.globals.database_version()? < 9 { // TODO update to 6
// Set room member count
for (roomid, _) in db.rooms.roomid_shortstatehash.iter() {
let room_id =
RoomId::try_from(utils::string_from_bytes(&roomid).unwrap()).unwrap();
db.rooms.update_joined_count(&room_id)?;
}
db.globals.bump_database_version(6)?;
println!("Migration: 5 -> 6 finished");
}
} }
let guard = db.read().await; let guard = db.read().await;

View file

@ -39,12 +39,21 @@ impl Tree for SledEngineTree {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, iter))]
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
for (key, value) in iter {
self.0.insert(key, value)?;
}
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> { fn remove(&self, key: &[u8]) -> Result<()> {
self.0.remove(key)?; self.0.remove(key)?;
Ok(()) Ok(())
} }
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
Box::new( Box::new(
self.0 self.0
.iter() .iter()
@ -62,7 +71,7 @@ impl Tree for SledEngineTree {
&self, &self,
from: &[u8], from: &[u8],
backwards: bool, backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> { ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)>> {
let iter = if backwards { let iter = if backwards {
self.0.range(..=from) self.0.range(..=from)
} else { } else {
@ -95,7 +104,7 @@ impl Tree for SledEngineTree {
fn scan_prefix<'a>( fn scan_prefix<'a>(
&'a self, &'a self,
prefix: Vec<u8>, prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let iter = self let iter = self
.0 .0
.scan_prefix(prefix) .scan_prefix(prefix)

View file

@ -55,7 +55,6 @@ impl Engine {
conn.pragma_update(Some(Main), "journal_mode", &"WAL")?; conn.pragma_update(Some(Main), "journal_mode", &"WAL")?;
conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?; conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?;
conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_size_kb)))?; conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_size_kb)))?;
conn.pragma_update(Some(Main), "wal_autocheckpoint", &0)?;
Ok(conn) Ok(conn)
} }

View file

@ -55,6 +55,7 @@ pub struct Rooms {
pub(super) userroomid_joined: Arc<dyn Tree>, pub(super) userroomid_joined: Arc<dyn Tree>,
pub(super) roomuserid_joined: Arc<dyn Tree>, pub(super) roomuserid_joined: Arc<dyn Tree>,
pub(super) roomid_joinedcount: Arc<dyn Tree>,
pub(super) roomuseroncejoinedids: Arc<dyn Tree>, pub(super) roomuseroncejoinedids: Arc<dyn Tree>,
pub(super) userroomid_invitestate: Arc<dyn Tree>, // InviteState = Vec<Raw<Pdu>> pub(super) userroomid_invitestate: Arc<dyn Tree>, // InviteState = Vec<Raw<Pdu>>
pub(super) roomuserid_invitecount: Arc<dyn Tree>, // InviteCount = Count pub(super) roomuserid_invitecount: Arc<dyn Tree>, // InviteCount = Count
@ -1906,9 +1907,18 @@ impl Rooms {
_ => {} _ => {}
} }
self.update_joined_count(room_id)?;
Ok(()) Ok(())
} }
pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> {
self.roomid_joinedcount.insert(
room_id.as_bytes(),
&(self.room_members(&room_id).count() as u64).to_be_bytes(),
)
}
pub async fn leave_room( pub async fn leave_room(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -2370,6 +2380,17 @@ impl Rooms {
}) })
} }
pub fn room_joined_count(&self, room_id: &RoomId) -> Result<Option<u64>> {
Ok(self
.roomid_joinedcount
.get(room_id.as_bytes())?
.map(|b| {
utils::u64_from_bytes(&b)
.map_err(|_| Error::bad_database("Invalid joinedcount in db."))
})
.transpose()?)
}
/// Returns an iterator over all User IDs who ever joined a room. /// Returns an iterator over all User IDs who ever joined a room.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn room_useroncejoined<'a>( pub fn room_useroncejoined<'a>(

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
client_server::{self, claim_keys_helper, get_keys_helper}, client_server::{self, claim_keys_helper, get_keys_helper},
database::{abstraction::sqlite::MILLI, DatabaseGuard}, database::{DatabaseGuard},
utils, ConduitResult, Database, Error, PduEvent, Result, Ruma, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma,
}; };
use get_profile_information::v1::ProfileField; use get_profile_information::v1::ProfileField;
@ -1736,20 +1736,11 @@ fn get_auth_chain(starting_events: Vec<EventId>, db: &Database) -> Result<HashSe
full_auth_chain.extend(cached.iter().cloned()); full_auth_chain.extend(cached.iter().cloned());
} else { } else {
drop(cache); drop(cache);
let start = Instant::now();
let auth_chain = get_auth_chain_recursive(&event_id, HashSet::new(), db)?; let auth_chain = get_auth_chain_recursive(&event_id, HashSet::new(), db)?;
let elapsed = start.elapsed();
if elapsed > MILLI {
println!("auth chain for {} took {:?}", &event_id, elapsed)
}
cache = db.rooms.auth_chain_cache(); cache = db.rooms.auth_chain_cache();
cache.insert(vec![event_id.clone()], auth_chain.clone()); cache.insert(vec![event_id.clone()], auth_chain.clone());
full_auth_chain.extend(auth_chain); full_auth_chain.extend(auth_chain);
}; };
} }
cache.insert(starting_events, full_auth_chain.clone()); cache.insert(starting_events, full_auth_chain.clone());