mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-01 05:04:32 +01:00
feat: config option for rocksdb max open files
This commit is contained in:
parent
5b8d2a736e
commit
d434dfb3a5
2 changed files with 28 additions and 7 deletions
|
@ -49,6 +49,8 @@ pub struct Config {
|
||||||
database_path: String,
|
database_path: String,
|
||||||
#[serde(default = "default_db_cache_capacity_mb")]
|
#[serde(default = "default_db_cache_capacity_mb")]
|
||||||
db_cache_capacity_mb: f64,
|
db_cache_capacity_mb: f64,
|
||||||
|
#[serde(default = "default_rocksdb_max_open_files")]
|
||||||
|
rocksdb_max_open_files: i32,
|
||||||
#[serde(default = "default_pdu_cache_capacity")]
|
#[serde(default = "default_pdu_cache_capacity")]
|
||||||
pdu_cache_capacity: u32,
|
pdu_cache_capacity: u32,
|
||||||
#[serde(default = "default_cleanup_second_interval")]
|
#[serde(default = "default_cleanup_second_interval")]
|
||||||
|
@ -127,6 +129,10 @@ fn default_db_cache_capacity_mb() -> f64 {
|
||||||
10.0
|
10.0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_rocksdb_max_open_files() -> i32 {
|
||||||
|
512
|
||||||
|
}
|
||||||
|
|
||||||
fn default_pdu_cache_capacity() -> u32 {
|
fn default_pdu_cache_capacity() -> u32 {
|
||||||
1_000_000
|
1_000_000
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ use std::{future::Future, pin::Pin, sync::Arc, sync::RwLock};
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
|
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
|
||||||
cache_capacity_bytes: usize,
|
cache_capacity_bytes: usize,
|
||||||
|
max_open_files: i32,
|
||||||
cache: rocksdb::Cache,
|
cache: rocksdb::Cache,
|
||||||
old_cfs: Vec<String>,
|
old_cfs: Vec<String>,
|
||||||
}
|
}
|
||||||
|
@ -16,7 +17,11 @@ pub struct RocksDbEngineTree<'a> {
|
||||||
write_lock: RwLock<()>,
|
write_lock: RwLock<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn db_options(cache_capacity_bytes: usize, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
|
fn db_options(
|
||||||
|
cache_capacity_bytes: usize,
|
||||||
|
max_open_files: i32,
|
||||||
|
rocksdb_cache: &rocksdb::Cache,
|
||||||
|
) -> rocksdb::Options {
|
||||||
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
||||||
block_based_options.set_block_cache(rocksdb_cache);
|
block_based_options.set_block_cache(rocksdb_cache);
|
||||||
|
|
||||||
|
@ -36,7 +41,7 @@ fn db_options(cache_capacity_bytes: usize, rocksdb_cache: &rocksdb::Cache) -> ro
|
||||||
//db_opts.set_use_direct_io_for_flush_and_compaction(true);
|
//db_opts.set_use_direct_io_for_flush_and_compaction(true);
|
||||||
db_opts.create_if_missing(true);
|
db_opts.create_if_missing(true);
|
||||||
db_opts.increase_parallelism(num_cpus::get() as i32);
|
db_opts.increase_parallelism(num_cpus::get() as i32);
|
||||||
db_opts.set_max_open_files(512);
|
db_opts.set_max_open_files(max_open_files);
|
||||||
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
|
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
|
||||||
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
||||||
db_opts.optimize_level_style_compaction(cache_capacity_bytes);
|
db_opts.optimize_level_style_compaction(cache_capacity_bytes);
|
||||||
|
@ -52,7 +57,11 @@ impl DatabaseEngine for Arc<Engine> {
|
||||||
let cache_capacity_bytes = (config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize;
|
let cache_capacity_bytes = (config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize;
|
||||||
let rocksdb_cache = rocksdb::Cache::new_lru_cache(cache_capacity_bytes).unwrap();
|
let rocksdb_cache = rocksdb::Cache::new_lru_cache(cache_capacity_bytes).unwrap();
|
||||||
|
|
||||||
let db_opts = db_options(cache_capacity_bytes, &rocksdb_cache);
|
let db_opts = db_options(
|
||||||
|
cache_capacity_bytes,
|
||||||
|
config.rocksdb_max_open_files,
|
||||||
|
&rocksdb_cache,
|
||||||
|
);
|
||||||
|
|
||||||
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
|
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
|
||||||
&db_opts,
|
&db_opts,
|
||||||
|
@ -66,7 +75,11 @@ impl DatabaseEngine for Arc<Engine> {
|
||||||
cfs.iter().map(|name| {
|
cfs.iter().map(|name| {
|
||||||
rocksdb::ColumnFamilyDescriptor::new(
|
rocksdb::ColumnFamilyDescriptor::new(
|
||||||
name,
|
name,
|
||||||
db_options(cache_capacity_bytes, &rocksdb_cache),
|
db_options(
|
||||||
|
cache_capacity_bytes,
|
||||||
|
config.rocksdb_max_open_files,
|
||||||
|
&rocksdb_cache,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
)?;
|
)?;
|
||||||
|
@ -74,6 +87,7 @@ impl DatabaseEngine for Arc<Engine> {
|
||||||
Ok(Arc::new(Engine {
|
Ok(Arc::new(Engine {
|
||||||
rocks: db,
|
rocks: db,
|
||||||
cache_capacity_bytes,
|
cache_capacity_bytes,
|
||||||
|
max_open_files: config.rocksdb_max_open_files,
|
||||||
cache: rocksdb_cache,
|
cache: rocksdb_cache,
|
||||||
old_cfs: cfs,
|
old_cfs: cfs,
|
||||||
}))
|
}))
|
||||||
|
@ -82,9 +96,10 @@ impl DatabaseEngine for Arc<Engine> {
|
||||||
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn Tree>> {
|
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn Tree>> {
|
||||||
if !self.old_cfs.contains(&name.to_owned()) {
|
if !self.old_cfs.contains(&name.to_owned()) {
|
||||||
// Create if it didn't exist
|
// Create if it didn't exist
|
||||||
let _ = self
|
let _ = self.rocks.create_cf(
|
||||||
.rocks
|
name,
|
||||||
.create_cf(name, &db_options(self.cache_capacity_bytes, &self.cache));
|
&db_options(self.cache_capacity_bytes, self.max_open_files, &self.cache),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Arc::new(RocksDbEngineTree {
|
Ok(Arc::new(RocksDbEngineTree {
|
||||||
|
|
Loading…
Reference in a new issue