1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-01-17 02:42:08 +01:00

fix: use db options for column families too

This commit is contained in:
Timo Kösters 2022-01-10 21:20:29 +01:00
parent 0bb7d76dec
commit b96822b617
No known key found for this signature in database
GPG key ID: 356E705610F626D5

View file

@ -4,6 +4,7 @@ use std::{future::Future, pin::Pin, sync::Arc, collections::HashMap, sync::RwLoc
pub struct Engine { pub struct Engine {
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>, rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
cache_capacity_bytes: usize,
cache: rocksdb::Cache, cache: rocksdb::Cache,
old_cfs: Vec<String>, old_cfs: Vec<String>,
} }
@ -15,23 +16,31 @@ pub struct RocksDbEngineTree<'a> {
write_lock: RwLock<()> write_lock: RwLock<()>
} }
fn db_options(cache_capacity_bytes: usize, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
let mut block_based_options = rocksdb::BlockBasedOptions::default();
block_based_options.set_block_cache(rocksdb_cache);
let mut db_opts = rocksdb::Options::default();
db_opts.set_block_based_table_factory(&block_based_options);
db_opts.create_if_missing(true);
db_opts.increase_parallelism(num_cpus::get() as i32);
db_opts.set_max_open_files(512);
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.optimize_level_style_compaction(cache_capacity_bytes);
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
db_opts.set_prefix_extractor(prefix_extractor);
db_opts
}
impl DatabaseEngine for Arc<Engine> { impl DatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Self> { fn open(config: &Config) -> Result<Self> {
let rocksdb_cache = let cache_capacity_bytes = (config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize;
rocksdb::Cache::new_lru_cache((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize) let rocksdb_cache = rocksdb::Cache::new_lru_cache(cache_capacity_bytes).unwrap();
.unwrap();
let mut block_based_options = rocksdb::BlockBasedOptions::default(); let db_opts = db_options(cache_capacity_bytes, &rocksdb_cache);
block_based_options.set_block_cache(&rocksdb_cache);
let mut db_opts = rocksdb::Options::default();
db_opts.set_block_based_table_factory(&block_based_options);
db_opts.create_if_missing(true);
db_opts.increase_parallelism(num_cpus::get() as i32);
db_opts.set_max_open_files(512);
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.optimize_level_style_compaction((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize);
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf( let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
&db_opts, &db_opts,
@ -43,16 +52,16 @@ impl DatabaseEngine for Arc<Engine> {
&db_opts, &db_opts,
&config.database_path, &config.database_path,
cfs.iter().map(|name| { cfs.iter().map(|name| {
let mut options = rocksdb::Options::default(); rocksdb::ColumnFamilyDescriptor::new(
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); name,
options.set_prefix_extractor(prefix_extractor); db_options(cache_capacity_bytes, &rocksdb_cache),
)
rocksdb::ColumnFamilyDescriptor::new(name, options)
}), }),
)?; )?;
Ok(Arc::new(Engine { Ok(Arc::new(Engine {
rocks: db, rocks: db,
cache_capacity_bytes,
cache: rocksdb_cache, cache: rocksdb_cache,
old_cfs: cfs, old_cfs: cfs,
})) }))
@ -61,11 +70,9 @@ impl DatabaseEngine for Arc<Engine> {
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn Tree>> { fn open_tree(&self, name: &'static str) -> Result<Arc<dyn Tree>> {
if !self.old_cfs.contains(&name.to_owned()) { if !self.old_cfs.contains(&name.to_owned()) {
// Create if it didn't exist // Create if it didn't exist
let mut options = rocksdb::Options::default(); let _ = self
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); .rocks
options.set_prefix_extractor(prefix_extractor); .create_cf(name, &db_options(self.cache_capacity_bytes, &self.cache));
let _ = self.rocks.create_cf(name, &options);
} }
Ok(Arc::new(RocksDbEngineTree { Ok(Arc::new(RocksDbEngineTree {