1
0
Fork 0
mirror of https://gitlab.com/famedly/conduit.git synced 2025-01-14 21:07:23 +01:00

remove rocksdb

This commit is contained in:
Timo Kösters 2021-08-04 21:17:40 +02:00
parent 902404d48d
commit df727688ef
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
8 changed files with 3 additions and 345 deletions

144
Cargo.lock generated
View file

@ -144,43 +144,12 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.59.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453c49e5950bb0eb63bb3df640e31618846c89d5b7faa54040d76e98e0134375"
dependencies = [
"bitflags",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
]
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitvec"
version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]]
name = "blake2b_simd"
version = "0.5.11"
@ -234,15 +203,6 @@ dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db507a7679252d2276ed0dd8113c6875ec56d3089f9225b2b42c30cc1f8e5c89"
dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
version = "0.1.10"
@ -268,17 +228,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "clang-sys"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "color_quant"
version = "1.1.0"
@ -308,7 +257,6 @@ dependencies = [
"reqwest",
"ring",
"rocket",
"rocksdb",
"ruma",
"rusqlite",
"rust-argon2",
@ -725,12 +673,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]]
name = "futures"
version = "0.3.16"
@ -1243,40 +1185,12 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320cfe77175da3a483efed4bc0adc1968ca050b098ce4f2f1c13a56626128790"
[[package]]
name = "libloading"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a"
dependencies = [
"cfg-if 1.0.0",
"winapi",
]
[[package]]
name = "librocksdb-sys"
version = "6.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c309a9d2470844aceb9a4a098cf5286154d20596868b75a6b36357d2bb9ca25d"
dependencies = [
"bindgen",
"cc",
"glob",
"libc",
]
[[package]]
name = "libsqlite3-sys"
version = "0.22.2"
@ -1445,18 +1359,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"memchr",
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -1649,12 +1551,6 @@ dependencies = [
"syn",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "0.8.3"
@ -1817,12 +1713,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]]
name = "rand"
version = "0.7.3"
@ -2122,16 +2012,6 @@ dependencies = [
"uncased",
]
[[package]]
name = "rocksdb"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]]
name = "ruma"
version = "0.2.0"
@ -2415,12 +2295,6 @@ dependencies = [
"crossbeam-utils 0.8.5",
]
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.2.3"
@ -2647,12 +2521,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a568c8f2cd051a4d283bd6eb0343ac214c1b0f1ac19f93e1175b2dee38c73d"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -2864,12 +2732,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tempfile"
version = "3.2.0"
@ -3540,12 +3402,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]]
name = "yaml-rust"
version = "0.4.5"

View file

@ -26,7 +26,6 @@ ruma = { git = "https://github.com/timokoesters/ruma", rev = "a2d93500e1dbc87e70
tokio = "1.8.2"
# Used for storing data permanently
sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true }
rocksdb = { version = "0.16.0", features = ["multi-threaded-cf"], optional = true }
#sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] }
# Used for the http request / response body type for Ruma endpoints used with reqwest
@ -84,7 +83,6 @@ heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c7
[features]
default = ["conduit_bin", "backend_sqlite"]
backend_sled = ["sled"]
backend_rocksdb = ["rocksdb"]
backend_sqlite = ["sqlite"]
backend_heed = ["heed", "crossbeam"]
sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"]

View file

@ -125,9 +125,6 @@ fn default_log() -> String {
#[cfg(feature = "sled")]
pub type Engine = abstraction::sled::Engine;
#[cfg(feature = "rocksdb")]
pub type Engine = abstraction::rocksdb::Engine;
#[cfg(feature = "sqlite")]
pub type Engine = abstraction::sqlite::Engine;
@ -426,7 +423,8 @@ impl Database {
println!("Migration: 4 -> 5 finished");
}
if db.globals.database_version()? < 9 { // TODO update to 6
if db.globals.database_version()? < 6 {
// TODO update to 6
// Set room member count
for (roomid, _) in db.rooms.roomid_shortstatehash.iter() {
let room_id =

View file

@ -3,9 +3,6 @@ use crate::Result;
use std::{future::Future, pin::Pin, sync::Arc};
#[cfg(feature = "rocksdb")]
pub mod rocksdb;
#[cfg(feature = "sled")]
pub mod sled;

View file

@ -1,176 +0,0 @@
use super::super::Config;
use crate::{utils, Result};
use std::{future::Future, pin::Pin, sync::Arc};
use super::{DatabaseEngine, Tree};
use std::{collections::HashMap, sync::RwLock};
pub struct Engine(rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>);
pub struct RocksDbEngineTree<'a> {
db: Arc<Engine>,
name: &'a str,
watchers: RwLock<HashMap<Vec<u8>, Vec<tokio::sync::oneshot::Sender<()>>>>,
}
impl DatabaseEngine for Engine {
fn open(config: &Config) -> Result<Arc<Self>> {
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.set_max_open_files(16);
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
db_opts.set_target_file_size_base(256 << 20);
db_opts.set_write_buffer_size(256 << 20);
let mut block_based_options = rocksdb::BlockBasedOptions::default();
block_based_options.set_block_size(512 << 10);
db_opts.set_block_based_table_factory(&block_based_options);
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
&db_opts,
&config.database_path,
)
.unwrap_or_default();
let mut options = rocksdb::Options::default();
options.set_merge_operator_associative("increment", utils::increment_rocksdb);
let db = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::open_cf_descriptors(
&db_opts,
&config.database_path,
cfs.iter()
.map(|name| rocksdb::ColumnFamilyDescriptor::new(name, options.clone())),
)?;
Ok(Arc::new(Engine(db)))
}
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
let mut options = rocksdb::Options::default();
options.set_merge_operator_associative("increment", utils::increment_rocksdb);
// Create if it doesn't exist
let _ = self.0.create_cf(name, &options);
Ok(Arc::new(RocksDbEngineTree {
name,
db: Arc::clone(self),
watchers: RwLock::new(HashMap::new()),
}))
}
}
impl RocksDbEngineTree<'_> {
fn cf(&self) -> rocksdb::BoundColumnFamily<'_> {
self.db.0.cf_handle(self.name).unwrap()
}
}
impl Tree for RocksDbEngineTree<'_> {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.db.0.get_cf(self.cf(), key)?)
}
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
let watchers = self.watchers.read().unwrap();
let mut triggered = Vec::new();
for length in 0..=key.len() {
if watchers.contains_key(&key[..length]) {
triggered.push(&key[..length]);
}
}
drop(watchers);
if !triggered.is_empty() {
let mut watchers = self.watchers.write().unwrap();
for prefix in triggered {
if let Some(txs) = watchers.remove(prefix) {
for tx in txs {
let _ = tx.send(());
}
}
}
}
Ok(self.db.0.put_cf(self.cf(), key, value)?)
}
fn remove(&self, key: &[u8]) -> Result<()> {
Ok(self.db.0.delete_cf(self.cf(), key)?)
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + Sync + 'a> {
Box::new(
self.db
.0
.iterator_cf(self.cf(), rocksdb::IteratorMode::Start),
)
}
fn iter_from<'a>(
&'a self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
Box::new(self.db.0.iterator_cf(
self.cf(),
rocksdb::IteratorMode::From(
from,
if backwards {
rocksdb::Direction::Reverse
} else {
rocksdb::Direction::Forward
},
),
))
}
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
let stats = rocksdb::perf::get_memory_usage_stats(Some(&[&self.db.0]), None).unwrap();
dbg!(stats.mem_table_total);
dbg!(stats.mem_table_unflushed);
dbg!(stats.mem_table_readers_total);
dbg!(stats.cache_total);
// TODO: atomic?
let old = self.get(key)?;
let new = utils::increment(old.as_deref()).unwrap();
self.insert(key, &new)?;
Ok(new)
}
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
Box::new(
self.db
.0
.iterator_cf(
self.cf(),
rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
)
.take_while(move |(k, _)| k.starts_with(&prefix)),
)
}
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.watchers
.write()
.unwrap()
.entry(prefix.to_vec())
.or_default()
.push(tx);
Box::pin(async move {
// Tx is never destroyed
rx.await.unwrap();
})
}
}

View file

@ -30,12 +30,6 @@ pub enum Error {
#[from]
source: sled::Error,
},
#[cfg(feature = "rocksdb")]
#[error("There was a problem with the connection to the rocksdb database: {source}")]
RocksDbError {
#[from]
source: rocksdb::Error,
},
#[cfg(feature = "sqlite")]
#[error("There was a problem with the connection to the sqlite database: {source}")]
SqliteError {

View file

@ -1,6 +1,6 @@
use crate::{
client_server::{self, claim_keys_helper, get_keys_helper},
database::{DatabaseGuard},
database::DatabaseGuard,
utils, ConduitResult, Database, Error, PduEvent, Result, Ruma,
};
use get_profile_information::v1::ProfileField;

View file

@ -17,15 +17,6 @@ pub fn millis_since_unix_epoch() -> u64 {
.as_millis() as u64
}
#[cfg(feature = "rocksdb")]
pub fn increment_rocksdb(
_new_key: &[u8],
old: Option<&[u8]>,
_operands: &mut rocksdb::MergeOperands,
) -> Option<Vec<u8>> {
increment(old)
}
pub fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
let number = match old.map(|bytes| bytes.try_into()) {
Some(Ok(bytes)) => {