diff --git a/include/ircd/db/compactor.h b/include/ircd/db/compactor.h new file mode 100644 index 000000000..9c9445a70 --- /dev/null +++ b/include/ircd/db/compactor.h @@ -0,0 +1,36 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#pragma once +#define HAVE_IRCD_DB_COMPACTOR_H + +namespace ircd::db +{ + struct compactor; +} + +/// Compaction callback +/// +/// return db::op::GET (0) from callback for no-op +/// return db::op::DELETE from callback to delete this kv. +/// return db::op::SET from callback if replacement modified. +/// return db::op::DELETE_RANGE from callback if skip_until modified. +/// +struct ircd::db::compactor +{ + using prototype = db::op (const int &level, + const string_view &key, + const string_view &val, + std::string *const replace, + std::string *const skip_until); + + std::function value; + std::function merge; +}; diff --git a/include/ircd/db/database/column.h b/include/ircd/db/database/column.h index 5224cf401..ed21c538c 100644 --- a/include/ircd/db/database/column.h +++ b/include/ircd/db/database/column.h @@ -37,6 +37,7 @@ struct ircd::db::database::column final database::descriptor descriptor; comparator cmp; prefix_transform prefix; + compaction_filter cfilter; rocksdb::BlockBasedTableOptions table_opts; custom_ptr handle; diff --git a/include/ircd/db/database/compaction_filter.h b/include/ircd/db/database/compaction_filter.h new file mode 100644 index 000000000..b8bcb2e23 --- /dev/null +++ b/include/ircd/db/database/compaction_filter.h @@ -0,0 +1,34 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#pragma once +#define HAVE_IRCD_DB_DATABASE_COMPACTION_FILTER_H + +// This file is not part of the standard include stack because it requires +// RocksDB symbols which we cannot forward declare. It is used internally +// and does not need to be included by general users of IRCd. + +struct ircd::db::database::compaction_filter +final +:rocksdb::CompactionFilter +{ + using Slice = rocksdb::Slice; + + column *c; + database *d; + db::compactor user; + + const char *Name() const override; + bool IgnoreSnapshots() const override; + Decision FilterV2(const int level, const Slice &key, const ValueType v, const Slice &oldval, std::string *newval, std::string *skipuntil) const override; + + compaction_filter(column *const &c, db::compactor); + ~compaction_filter() noexcept override; +}; diff --git a/include/ircd/db/database/database.h b/include/ircd/db/database/database.h index 9e447cfb6..0e23399ec 100644 --- a/include/ircd/db/database/database.h +++ b/include/ircd/db/database/database.h @@ -76,6 +76,7 @@ struct ircd::db::database struct snapshot; struct comparator; struct prefix_transform; + struct compaction_filter; struct column; struct env; struct cache; diff --git a/include/ircd/db/database/descriptor.h b/include/ircd/db/database/descriptor.h index 2e95632db..ae1dc9005 100644 --- a/include/ircd/db/database/descriptor.h +++ b/include/ircd/db/database/descriptor.h @@ -60,4 +60,7 @@ struct ircd::db::database::descriptor /// blocks will read and cache unrelated data if values are smaller /// than this size. size_t block_size { 512 }; + + /// User given compaction callback surface. + db::compactor compactor {}; }; diff --git a/include/ircd/db/db.h b/include/ircd/db/db.h index a1a8f787d..769d1287d 100644 --- a/include/ircd/db/db.h +++ b/include/ircd/db/db.h @@ -61,6 +61,7 @@ enum class ircd::db::pos #include "delta.h" #include "comparator.h" +#include "compactor.h" #include "prefix.h" #include "merge.h" #include "database/rocksdb.h" diff --git a/ircd/db.cc b/ircd/db.cc index 586aa14f3..d303d40d9 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -26,10 +26,12 @@ #include #include #include +#include // ircd::db interfaces requiring complete RocksDB (frontside). #include #include +#include #include #include #include @@ -1389,6 +1391,7 @@ ircd::db::database::column::column(database *const &d, ,descriptor{descriptor} ,cmp{d, this->descriptor.cmp} ,prefix{d, this->descriptor.prefix} +,cfilter{this, this->descriptor.compactor} ,handle { nullptr, [this](rocksdb::ColumnFamilyHandle *const handle) @@ -1426,6 +1429,9 @@ ircd::db::database::column::column(database *const &d, &this->prefix, [](const rocksdb::SliceTransform *) {} }; + // Set the compaction filter + this->options.compaction_filter = &this->cfilter; + // // Table options // @@ -1457,6 +1463,9 @@ ircd::db::database::column::column(database *const &d, // Misc options // + // More stats reported by the rocksdb.stats property. + this->options.report_bg_io_stats = true; + // Set the compaction style; we don't override this in the descriptor yet. this->options.compaction_style = rocksdb::kCompactionStyleLevel; @@ -2244,6 +2253,105 @@ noexcept } +/////////////////////////////////////////////////////////////////////////////// +// +// database::compaction_filter +// + +ircd::db::database::compaction_filter::compaction_filter(column *const &c, + db::compactor user) +:c{c} +,d{c->d} +,user{std::move(user)} +{ +} + +ircd::db::database::compaction_filter::~compaction_filter() +noexcept +{ +} + +rocksdb::CompactionFilter::Decision +ircd::db::database::compaction_filter::FilterV2(const int level, + const Slice &key, + const ValueType type, + const Slice &oldval, + std::string *const newval, + std::string *const skip) +const +{ + #ifdef RB_DEBUG_DB_ENV + const auto typestr + { + type == kValue? + "VALUE"_sv: + type == kMergeOperand? + "MERGE"_sv: + "BLOB"_sv + }; + + log::debug + { + log, "'%s':'%s': compaction level:%d key:%zu@%p type:%s old:%zu@%p new:%p skip:%p", + d->name, + c->name, + level, + size(key), + data(key), + typestr, + size(oldval), + data(oldval), + (const void *)newval, + (const void *)skipuntil + }; + #endif + + db::op ret + { + db::op::GET + }; + + switch(type) + { + case ValueType::kValue: + if(user.value) + ret = user.value(level, slice(key), slice(oldval), newval, skip); + break; + + case ValueType::kMergeOperand: + if(user.merge) + ret = user.merge(level, slice(key), slice(oldval), newval, skip); + break; + + case ValueType::kBlobIndex: + break; + } + + switch(ret) + { + default: + case db::op::GET: return Decision::kKeep; + case db::op::SET: return Decision::kChangeValue; + case db::op::DELETE: return Decision::kRemove; + case db::op::DELETE_RANGE: return Decision::kRemoveAndSkipUntil; + } +} + +bool +ircd::db::database::compaction_filter::IgnoreSnapshots() +const +{ + return false; +} + +const char * +ircd::db::database::compaction_filter::Name() +const +{ + assert(c); + return db::name(*c).c_str(); +} + /////////////////////////////////////////////////////////////////////////////// // // database::env