1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2024-12-26 07:54:21 +01:00

Merge branch 'rocksdb' into 'next'

improvement: use simpler rocksdb config

See merge request famedly/conduit!602
This commit is contained in:
Timo Kösters 2024-03-22 06:47:51 +00:00
commit bdae9ceccf
5 changed files with 289 additions and 256 deletions

436
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -121,6 +121,7 @@ optional = true
features = [
"multi-threaded-cf",
"zstd",
"lz4",
]
[target.'cfg(unix)'.dependencies]

View file

@ -29,7 +29,7 @@ use std::{
time::Duration,
};
use tokio::sync::watch::Sender;
use tracing::error;
use tracing::{error, info};
/// # `GET /_matrix/client/r0/sync`
///
@ -99,6 +99,8 @@ pub async fn sync_events_route(
o.insert((body.since.clone(), rx.clone()));
info!("Sync started for {sender_user}");
tokio::spawn(sync_helper_wrapper(
sender_user.clone(),
sender_device.clone(),

View file

@ -48,6 +48,9 @@ pub async fn search_users_route(
return None;
}
// It's a matching user, but is the sender allowed to see them?
let mut user_visible = false;
let user_is_in_public_rooms = services()
.rooms
.state_cache
@ -69,22 +72,26 @@ pub async fn search_users_route(
});
if user_is_in_public_rooms {
return Some(user);
user_visible = true;
} else {
let user_is_in_shared_rooms = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id])
.ok()?
.next()
.is_some();
if user_is_in_shared_rooms {
user_visible = true;
}
}
let user_is_in_shared_rooms = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id])
.ok()?
.next()
.is_some();
if user_is_in_shared_rooms {
return Some(user);
if !user_visible {
return None;
}
None
Some(user)
});
let results = users.by_ref().take(limit).collect();

View file

@ -23,29 +23,23 @@ pub struct RocksDbEngineTree<'a> {
fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
let mut block_based_options = rocksdb::BlockBasedOptions::default();
block_based_options.set_block_cache(rocksdb_cache);
// "Difference of spinning disk"
// https://zhangyuchi.gitbooks.io/rocksdbbook/content/RocksDB-Tuning-Guide.html
block_based_options.set_bloom_filter(10.0, false);
block_based_options.set_block_size(4 * 1024);
block_based_options.set_cache_index_and_filter_blocks(true);
block_based_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_based_options.set_optimize_filters_for_memory(true);
let mut db_opts = rocksdb::Options::default();
db_opts.set_block_based_table_factory(&block_based_options);
db_opts.set_optimize_filters_for_hits(true);
db_opts.set_skip_stats_update_on_db_open(true);
db_opts.set_level_compaction_dynamic_level_bytes(true);
db_opts.set_target_file_size_base(256 * 1024 * 1024);
//db_opts.set_compaction_readahead_size(2 * 1024 * 1024);
//db_opts.set_use_direct_reads(true);
//db_opts.set_use_direct_io_for_flush_and_compaction(true);
db_opts.create_if_missing(true);
db_opts.increase_parallelism(num_cpus::get() as i32);
db_opts.set_max_open_files(max_open_files);
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
db_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.optimize_level_style_compaction(10 * 1024 * 1024);
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
db_opts.set_level_compaction_dynamic_level_bytes(true);
db_opts.set_max_background_jobs(6);
db_opts.set_bytes_per_sync(1048576);
@ -59,9 +53,6 @@ fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::O
// restored via federation.
db_opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords);
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
db_opts.set_prefix_extractor(prefix_extractor);
db_opts
}
@ -147,12 +138,17 @@ impl RocksDbEngineTree<'_> {
impl KvTree for RocksDbEngineTree<'_> {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.db.rocks.get_cf(&self.cf(), key)?)
let readoptions = rocksdb::ReadOptions::default();
Ok(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?)
}
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
let writeoptions = rocksdb::WriteOptions::default();
let lock = self.write_lock.read().unwrap();
self.db.rocks.put_cf(&self.cf(), key, value)?;
self.db
.rocks
.put_cf_opt(&self.cf(), key, value, &writeoptions)?;
drop(lock);
self.watchers.wake(key);
@ -161,22 +157,31 @@ impl KvTree for RocksDbEngineTree<'_> {
}
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
let writeoptions = rocksdb::WriteOptions::default();
for (key, value) in iter {
self.db.rocks.put_cf(&self.cf(), key, value)?;
self.db
.rocks
.put_cf_opt(&self.cf(), key, value, &writeoptions)?;
}
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> {
Ok(self.db.rocks.delete_cf(&self.cf(), key)?)
let writeoptions = rocksdb::WriteOptions::default();
Ok(self
.db
.rocks
.delete_cf_opt(&self.cf(), key, &writeoptions)?)
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let readoptions = rocksdb::ReadOptions::default();
Box::new(
self.db
.rocks
.iterator_cf(&self.cf(), rocksdb::IteratorMode::Start)
.iterator_cf_opt(&self.cf(), readoptions, rocksdb::IteratorMode::Start)
.map(|r| r.unwrap())
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
)
@ -187,11 +192,14 @@ impl KvTree for RocksDbEngineTree<'_> {
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let readoptions = rocksdb::ReadOptions::default();
Box::new(
self.db
.rocks
.iterator_cf(
.iterator_cf_opt(
&self.cf(),
readoptions,
rocksdb::IteratorMode::From(
from,
if backwards {
@ -207,23 +215,33 @@ impl KvTree for RocksDbEngineTree<'_> {
}
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
let readoptions = rocksdb::ReadOptions::default();
let writeoptions = rocksdb::WriteOptions::default();
let lock = self.write_lock.write().unwrap();
let old = self.db.rocks.get_cf(&self.cf(), key)?;
let old = self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?;
let new = utils::increment(old.as_deref()).unwrap();
self.db.rocks.put_cf(&self.cf(), key, &new)?;
self.db
.rocks
.put_cf_opt(&self.cf(), key, &new, &writeoptions)?;
drop(lock);
Ok(new)
}
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
let readoptions = rocksdb::ReadOptions::default();
let writeoptions = rocksdb::WriteOptions::default();
let lock = self.write_lock.write().unwrap();
for key in iter {
let old = self.db.rocks.get_cf(&self.cf(), &key)?;
let old = self.db.rocks.get_cf_opt(&self.cf(), &key, &readoptions)?;
let new = utils::increment(old.as_deref()).unwrap();
self.db.rocks.put_cf(&self.cf(), key, new)?;
self.db
.rocks
.put_cf_opt(&self.cf(), key, new, &writeoptions)?;
}
drop(lock);
@ -235,11 +253,14 @@ impl KvTree for RocksDbEngineTree<'_> {
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let readoptions = rocksdb::ReadOptions::default();
Box::new(
self.db
.rocks
.iterator_cf(
.iterator_cf_opt(
&self.cf(),
readoptions,
rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
)
.map(|r| r.unwrap())