mirror of
https://github.com/matrix-construct/construct
synced 2024-06-02 18:18:56 +02:00
ircd::db: additional support: stats, callbacks and merge operator.
This commit is contained in:
parent
7ae5785431
commit
1f6d83b5b1
|
@ -121,11 +121,12 @@ struct opts
|
|||
|
||||
enum op
|
||||
{
|
||||
PUT,
|
||||
GET,
|
||||
SET,
|
||||
MERGE,
|
||||
DELETE,
|
||||
SINGLE_DELETE,
|
||||
DELETE_RANGE,
|
||||
SINGLE_DELETE,
|
||||
};
|
||||
|
||||
struct delta
|
||||
|
@ -135,17 +136,21 @@ struct delta
|
|||
:std::tuple<enum op, string_view, string_view>{op, key, val}
|
||||
{}
|
||||
|
||||
delta(const string_view &key, const string_view &val, const enum op &op = op::PUT)
|
||||
delta(const string_view &key, const string_view &val, const enum op &op = op::SET)
|
||||
:std::tuple<enum op, string_view, string_view>{op, key, val}
|
||||
{}
|
||||
};
|
||||
|
||||
using merge_delta = std::pair<string_view, string_view>;
|
||||
using merge_function = std::function<std::string (const string_view &key, const merge_delta &)>;
|
||||
using update_function = std::function<std::string (const string_view &key, merge_delta &)>;
|
||||
|
||||
struct handle
|
||||
{
|
||||
struct const_iterator;
|
||||
|
||||
private:
|
||||
std::unique_ptr<struct meta> meta;
|
||||
std::shared_ptr<struct meta> meta;
|
||||
std::unique_ptr<rocksdb::DB> d;
|
||||
|
||||
public:
|
||||
|
@ -160,9 +165,6 @@ struct handle
|
|||
const_iterator cbegin(const gopts & = {});
|
||||
const_iterator cend(const gopts & = {});
|
||||
|
||||
// Tests if key exists
|
||||
bool has(const string_view &key, const gopts & = {});
|
||||
|
||||
// Perform a get into a closure. This offers a reference to the data with zero-copy.
|
||||
void operator()(const string_view &key, const closure &func, const gopts & = {});
|
||||
void operator()(const string_view &key, const gopts &, const closure &func);
|
||||
|
@ -170,6 +172,10 @@ struct handle
|
|||
// Perform operations in a sequence as a single transaction.
|
||||
void operator()(const delta &, const sopts & = {});
|
||||
void operator()(const std::initializer_list<delta> &, const sopts & = {});
|
||||
void operator()(const op &, const string_view &key, const string_view &val = {}, const sopts & = {});
|
||||
|
||||
// Tests if key exists
|
||||
bool has(const string_view &key, const gopts & = {});
|
||||
|
||||
// Get data into your buffer. The signed char buffer is null terminated; the unsigned is not.
|
||||
size_t get(const string_view &key, char *const &buf, const size_t &max, const gopts & = {});
|
||||
|
@ -183,7 +189,7 @@ struct handle
|
|||
// Remove data from the db. not_found is never thrown.
|
||||
void del(const string_view &key, const sopts & = {});
|
||||
|
||||
handle(const std::string &name, const opts & = {});
|
||||
handle(const std::string &name, const opts & = {}, merge_function = {});
|
||||
handle();
|
||||
handle(handle &&) noexcept;
|
||||
handle &operator=(handle &&) noexcept;
|
||||
|
@ -195,6 +201,10 @@ struct handle::const_iterator
|
|||
using key_type = string_view;
|
||||
using mapped_type = string_view;
|
||||
using value_type = std::pair<key_type, mapped_type>;
|
||||
using pointer = value_type *;
|
||||
using reference = value_type &;
|
||||
using difference_type = size_t;
|
||||
using iterator_category = std::bidirectional_iterator_tag;
|
||||
|
||||
private:
|
||||
struct state;
|
||||
|
@ -227,8 +237,6 @@ struct handle::const_iterator
|
|||
~const_iterator() noexcept;
|
||||
};
|
||||
|
||||
void write(handle &, const string_view &key, const json::doc &obj, const sopts & = {});
|
||||
|
||||
handle::const_iterator begin(handle &);
|
||||
handle::const_iterator end(handle &);
|
||||
|
||||
|
@ -244,6 +252,24 @@ extern struct log::log log;
|
|||
} // namespace db
|
||||
} // namespace ircd
|
||||
|
||||
namespace ircd {
|
||||
namespace db {
|
||||
namespace json {
|
||||
|
||||
std::string merge_operator(const string_view &, const std::pair<string_view, string_view> &);
|
||||
|
||||
struct obj
|
||||
:handle
|
||||
{
|
||||
obj(const std::string &name, const opts &opts = {})
|
||||
:handle{name, opts, merge_operator}
|
||||
{}
|
||||
};
|
||||
|
||||
} // namespace json
|
||||
} // namespace db
|
||||
} // namespace ircd
|
||||
|
||||
inline ircd::db::handle::const_iterator
|
||||
ircd::db::end(handle &handle)
|
||||
{
|
||||
|
|
358
ircd/db.cc
358
ircd/db.cc
|
@ -22,6 +22,10 @@
|
|||
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/cache.h>
|
||||
#include <rocksdb/merge_operator.h>
|
||||
#include <rocksdb/perf_level.h>
|
||||
#include <rocksdb/listener.h>
|
||||
#include <rocksdb/statistics.h>
|
||||
|
||||
namespace ircd {
|
||||
namespace db {
|
||||
|
@ -31,6 +35,9 @@ struct log::log log
|
|||
"db", 'D' // Database subsystem takes SNOMASK +D
|
||||
};
|
||||
|
||||
const std::string &reflect(const rocksdb::Tickers &);
|
||||
const std::string &reflect(const rocksdb::Histograms &);
|
||||
|
||||
void throw_on_error(const rocksdb::Status &);
|
||||
bool valid(const rocksdb::Iterator &);
|
||||
void valid_or_throw(const rocksdb::Iterator &);
|
||||
|
@ -59,11 +66,44 @@ rocksdb::ReadOptions make_opts(const gopts &, const bool &iterator = false);
|
|||
rocksdb::Options make_opts(const opts &);
|
||||
|
||||
struct meta
|
||||
:rocksdb::Statistics
|
||||
,rocksdb::EventListener
|
||||
,rocksdb::AssociativeMergeOperator
|
||||
{
|
||||
std::string name;
|
||||
std::string path;
|
||||
rocksdb::Options opts;
|
||||
std::shared_ptr<rocksdb::Cache> cache;
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> cols;
|
||||
std::vector<rocksdb::ColumnFamilyHandle *> handles;
|
||||
merge_function merger;
|
||||
|
||||
// Statistics
|
||||
std::array<uint64_t, rocksdb::TICKER_ENUM_MAX> ticker {{0}};
|
||||
std::array<rocksdb::HistogramData, rocksdb::HISTOGRAM_ENUM_MAX> histogram;
|
||||
|
||||
uint64_t getTickerCount(const uint32_t tickerType) const override;
|
||||
void recordTick(const uint32_t tickerType, const uint64_t count) override;
|
||||
void setTickerCount(const uint32_t tickerType, const uint64_t count) override;
|
||||
void histogramData(const uint32_t type, rocksdb::HistogramData *) const override;
|
||||
void measureTime(const uint32_t histogramType, const uint64_t time) override;
|
||||
bool HistEnabledForType(const uint32_t type) const override;
|
||||
|
||||
// EventListener
|
||||
void OnFlushCompleted(rocksdb::DB *, const rocksdb::FlushJobInfo &) override;
|
||||
void OnCompactionCompleted(rocksdb::DB *, const rocksdb::CompactionJobInfo &) override;
|
||||
void OnTableFileDeleted(const rocksdb::TableFileDeletionInfo &) override;
|
||||
void OnTableFileCreated(const rocksdb::TableFileCreationInfo &) override;
|
||||
void OnTableFileCreationStarted(const rocksdb::TableFileCreationBriefInfo &) override;
|
||||
void OnMemTableSealed(const rocksdb::MemTableInfo &) override;
|
||||
void OnColumnFamilyHandleDeletionStarted(rocksdb::ColumnFamilyHandle *) override;
|
||||
|
||||
// AssociativeMergeOperator
|
||||
bool Merge(const rocksdb::Slice &, const rocksdb::Slice *, const rocksdb::Slice &, std::string *, rocksdb::Logger *) const override;
|
||||
const char *Name() const override;
|
||||
|
||||
meta();
|
||||
~meta() noexcept;
|
||||
};
|
||||
|
||||
} // namespace db
|
||||
|
@ -80,54 +120,83 @@ noexcept
|
|||
{
|
||||
}
|
||||
|
||||
void
|
||||
db::write(handle &handle,
|
||||
const string_view &key,
|
||||
const json::doc &obj,
|
||||
const sopts &sopts)
|
||||
{
|
||||
printf("Got this object %zu\n", obj.size());
|
||||
}
|
||||
namespace ircd {
|
||||
namespace db {
|
||||
|
||||
} // namespace db
|
||||
} // namespace ircd
|
||||
|
||||
db::handle::handle(const std::string &name,
|
||||
const opts &opts)
|
||||
const opts &opts,
|
||||
merge_function mf)
|
||||
try
|
||||
:meta{[&name, &opts]
|
||||
:meta{[&name, &opts, &mf]
|
||||
{
|
||||
auto meta(std::make_unique<struct meta>());
|
||||
auto meta(std::make_shared<struct meta>());
|
||||
meta->name = name;
|
||||
meta->path = path(name);
|
||||
meta->opts = make_opts(opts);
|
||||
|
||||
meta->cols =
|
||||
{
|
||||
rocksdb::ColumnFamilyDescriptor { "default", meta->opts },
|
||||
};
|
||||
|
||||
// Setup event and statistics callbacks
|
||||
meta->opts.listeners.emplace_back(meta);
|
||||
//meta->opts.statistics = meta; // broken?
|
||||
|
||||
// Setup performance metric options
|
||||
//rocksdb::SetPerfLevel(rocksdb::PerfLevel::kDisable);
|
||||
|
||||
// Setup journal recovery options
|
||||
meta->opts.wal_recovery_mode = rocksdb::WALRecoveryMode::kAbsoluteConsistency;
|
||||
//meta->opts.wal_recovery_mode = rocksdb::WALRecoveryMode::kPointInTimeRecovery;
|
||||
|
||||
// Setup caching options
|
||||
const auto lru_cache_size(opt_val(opts, opt::LRU_CACHE));
|
||||
if(lru_cache_size > 0)
|
||||
meta->cache = rocksdb::NewLRUCache(lru_cache_size);
|
||||
|
||||
meta->opts.row_cache = meta->cache;
|
||||
return std::move(meta);
|
||||
|
||||
if(mf) // Setup user operators
|
||||
{
|
||||
meta->merger = std::move(mf);
|
||||
meta->opts.merge_operator = meta;
|
||||
}
|
||||
|
||||
return meta;
|
||||
}()}
|
||||
,d{[this, &opts]
|
||||
{
|
||||
rocksdb::DB *ptr;
|
||||
auto &name(meta->name);
|
||||
auto &path(meta->path);
|
||||
auto &cols(meta->cols);
|
||||
auto &handles(meta->handles);
|
||||
log.debug("Attempting to open database \"%s\" @ `%s'",
|
||||
name,
|
||||
path);
|
||||
|
||||
rocksdb::DB *ptr;
|
||||
if(has_opt(opts, opt::READ_ONLY))
|
||||
throw_on_error(rocksdb::DB::OpenForReadOnly(meta->opts, meta->path, &ptr));
|
||||
throw_on_error(rocksdb::DB::OpenForReadOnly(meta->opts, path, cols, &handles, &ptr));
|
||||
else
|
||||
throw_on_error(rocksdb::DB::Open(meta->opts, meta->path, &ptr));
|
||||
throw_on_error(rocksdb::DB::Open(meta->opts, path, cols, &handles, &ptr));
|
||||
|
||||
return std::unique_ptr<rocksdb::DB>{ptr};
|
||||
}()}
|
||||
{
|
||||
log.info("Opened database \"%s\" @ `%s' (handle: %p)",
|
||||
meta->name.c_str(),
|
||||
meta->path.c_str(),
|
||||
(const void *)this);
|
||||
log.info("Opened database \"%s\" @ `%s' (handle: %p) columns: %zu",
|
||||
meta->name,
|
||||
meta->path,
|
||||
(const void *)this,
|
||||
meta->handles.size());
|
||||
}
|
||||
catch(const invalid_argument &e)
|
||||
{
|
||||
const bool no_create(has_opt(opts, opt::NO_CREATE));
|
||||
const bool no_existing(has_opt(opts, opt::NO_EXISTING));
|
||||
const char *const helpstr
|
||||
const auto helpstr
|
||||
{
|
||||
no_create? " (The database is missing and will not be created)":
|
||||
no_existing? " (The database already exists but must be fresh)":
|
||||
|
@ -135,14 +204,14 @@ catch(const invalid_argument &e)
|
|||
};
|
||||
|
||||
throw error("Failed to open db '%s': %s%s",
|
||||
name.c_str(),
|
||||
name,
|
||||
e.what(),
|
||||
helpstr);
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
throw error("Failed to open db '%s': %s",
|
||||
name.c_str(),
|
||||
name,
|
||||
e.what());
|
||||
}
|
||||
|
||||
|
@ -177,6 +246,9 @@ noexcept
|
|||
meta->name.c_str(),
|
||||
meta->path.c_str(),
|
||||
(const void *)this);
|
||||
|
||||
for(const auto &handle : meta->handles)
|
||||
d->DestroyColumnFamilyHandle(handle);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -282,17 +354,27 @@ void append(rocksdb::WriteBatch &batch,
|
|||
|
||||
switch(std::get<0>(delta))
|
||||
{
|
||||
case op::PUT: batch.Put(k, v); break;
|
||||
case op::GET: assert(0); break;
|
||||
case op::SET: batch.Put(k, v); break;
|
||||
case op::MERGE: batch.Merge(k, v); break;
|
||||
case op::DELETE: batch.Delete(k); break;
|
||||
case op::SINGLE_DELETE: batch.SingleDelete(k); break;
|
||||
case op::DELETE_RANGE: batch.DeleteRange(k, v); break;
|
||||
case op::SINGLE_DELETE: batch.SingleDelete(k); break;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
} // namespace ircd
|
||||
|
||||
void
|
||||
db::handle::operator()(const op &op,
|
||||
const string_view &key,
|
||||
const string_view &val,
|
||||
const sopts &sopts)
|
||||
{
|
||||
operator()(delta{op, key, val}, sopts);
|
||||
}
|
||||
|
||||
void
|
||||
db::handle::operator()(const std::initializer_list<delta> &deltas,
|
||||
const sopts &sopts)
|
||||
|
@ -380,6 +462,204 @@ db::handle::has(const string_view &key,
|
|||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Meta
|
||||
//
|
||||
|
||||
ircd::db::meta::meta()
|
||||
{
|
||||
}
|
||||
|
||||
ircd::db::meta::~meta()
|
||||
noexcept
|
||||
{
|
||||
}
|
||||
|
||||
const char *
|
||||
ircd::db::meta::Name()
|
||||
const
|
||||
{
|
||||
return "<unnamed>";
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::db::meta::Merge(const rocksdb::Slice &_key,
|
||||
const rocksdb::Slice *const _exist,
|
||||
const rocksdb::Slice &_update,
|
||||
std::string *const newval,
|
||||
rocksdb::Logger *const)
|
||||
const try
|
||||
{
|
||||
const string_view key
|
||||
{
|
||||
_key.data(), _key.size()
|
||||
};
|
||||
|
||||
const string_view exist
|
||||
{
|
||||
_exist? string_view { _exist->data(), _exist->size() } : string_view{}
|
||||
};
|
||||
|
||||
const string_view update
|
||||
{
|
||||
_update.data(), _update.size()
|
||||
};
|
||||
|
||||
if(exist.empty())
|
||||
{
|
||||
*newval = std::string(update);
|
||||
return true;
|
||||
}
|
||||
|
||||
//XXX caching opportunity?
|
||||
*newval = merger(key, {exist, update}); // call the user
|
||||
return true;
|
||||
}
|
||||
catch(const std::bad_function_call &e)
|
||||
{
|
||||
log.critical("merge: missing merge operator (%s)", e.what());
|
||||
return false;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
log.error("merge: %s", e.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
ircd::db::meta::HistEnabledForType(const uint32_t type)
|
||||
const
|
||||
{
|
||||
return type < histogram.size();
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::measureTime(const uint32_t type,
|
||||
const uint64_t time)
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::histogramData(const uint32_t type,
|
||||
rocksdb::HistogramData *const data)
|
||||
const
|
||||
{
|
||||
assert(data);
|
||||
|
||||
const auto &median(data->median);
|
||||
const auto &percentile95(data->percentile95);
|
||||
const auto &percentile88(data->percentile99);
|
||||
const auto &average(data->average);
|
||||
const auto &standard_deviation(data->standard_deviation);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::recordTick(const uint32_t type,
|
||||
const uint64_t count)
|
||||
{
|
||||
ticker.at(type) += count;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::setTickerCount(const uint32_t type,
|
||||
const uint64_t count)
|
||||
{
|
||||
ticker.at(type) = count;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
ircd::db::meta::getTickerCount(const uint32_t type)
|
||||
const
|
||||
{
|
||||
return ticker.at(type);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::OnFlushCompleted(rocksdb::DB *const db,
|
||||
const rocksdb::FlushJobInfo &info)
|
||||
{
|
||||
log.debug("'%s' @%p: flushed: column[%s] path[%s] tid[%lu] job[%d] writes[slow:%d stop:%d]",
|
||||
name,
|
||||
db,
|
||||
info.cf_name,
|
||||
info.file_path,
|
||||
info.thread_id,
|
||||
info.job_id,
|
||||
info.triggered_writes_slowdown,
|
||||
info.triggered_writes_stop);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::OnCompactionCompleted(rocksdb::DB *const db,
|
||||
const rocksdb::CompactionJobInfo &info)
|
||||
{
|
||||
log.debug("'%s' @%p: compacted: column[%s] status[%d] tid[%lu] job[%d]",
|
||||
name,
|
||||
db,
|
||||
info.cf_name,
|
||||
int(info.status.code()),
|
||||
info.thread_id,
|
||||
info.job_id);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::OnTableFileDeleted(const rocksdb::TableFileDeletionInfo &info)
|
||||
{
|
||||
log.debug("'%s': table file deleted: db[%s] path[%s] status[%d] job[%d]",
|
||||
name,
|
||||
info.db_name,
|
||||
info.file_path,
|
||||
int(info.status.code()),
|
||||
info.job_id);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::OnTableFileCreated(const rocksdb::TableFileCreationInfo &info)
|
||||
{
|
||||
log.debug("'%s': table file created: db[%s] path[%s] status[%d] job[%d]",
|
||||
name,
|
||||
info.db_name,
|
||||
info.file_path,
|
||||
int(info.status.code()),
|
||||
info.job_id);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::OnTableFileCreationStarted(const rocksdb::TableFileCreationBriefInfo &info)
|
||||
{
|
||||
log.debug("'%s': table file creating: db[%s] column[%s] path[%s] job[%d]",
|
||||
name,
|
||||
info.db_name,
|
||||
info.cf_name,
|
||||
info.file_path,
|
||||
info.job_id);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::OnMemTableSealed(const rocksdb::MemTableInfo &info)
|
||||
{
|
||||
log.debug("'%s': memory table sealed: column[%s] entries[%lu] deletes[%lu]",
|
||||
name,
|
||||
info.cf_name,
|
||||
info.num_entries,
|
||||
info.num_deletes);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::meta::OnColumnFamilyHandleDeletionStarted(rocksdb::ColumnFamilyHandle *const h)
|
||||
{
|
||||
log.debug("'%s': column family handle deletion started: %p",
|
||||
name,
|
||||
h);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// handle::const_iterator
|
||||
//
|
||||
|
||||
namespace ircd {
|
||||
namespace db {
|
||||
|
||||
|
@ -904,3 +1184,31 @@ db::path(const std::string &name)
|
|||
const auto prefix(path::get(path::DB));
|
||||
return path::build({prefix, name});
|
||||
}
|
||||
|
||||
const std::string &
|
||||
ircd::db::reflect(const rocksdb::Tickers &type)
|
||||
{
|
||||
const auto &names(rocksdb::TickersNameMap);
|
||||
const auto it(std::find_if(begin(names), end(names), [&type]
|
||||
(const auto &pair)
|
||||
{
|
||||
return pair.first == type;
|
||||
}));
|
||||
|
||||
static const auto empty{"<ticker>?????"s};
|
||||
return it != end(names)? it->second : empty;
|
||||
}
|
||||
|
||||
const std::string &
|
||||
ircd::db::reflect(const rocksdb::Histograms &type)
|
||||
{
|
||||
const auto &names(rocksdb::HistogramsNameMap);
|
||||
const auto it(std::find_if(begin(names), end(names), [&type]
|
||||
(const auto &pair)
|
||||
{
|
||||
return pair.first == type;
|
||||
}));
|
||||
|
||||
static const auto empty{"<histogram>?????"s};
|
||||
return it != end(names)? it->second : empty;
|
||||
}
|
||||
|
|
17
ircd/json.cc
17
ircd/json.cc
|
@ -243,6 +243,23 @@ std::ostream &operator<<(std::ostream &, const obj &);
|
|||
} // namespace json
|
||||
} // namespace ircd
|
||||
|
||||
namespace ircd {
|
||||
namespace db {
|
||||
namespace json {
|
||||
|
||||
std::string
|
||||
merge_operator(const string_view &key,
|
||||
const std::pair<string_view, string_view> &delta)
|
||||
{
|
||||
ircd::json::obj obj{delta.first};
|
||||
obj += delta.second;
|
||||
return obj;
|
||||
}
|
||||
|
||||
} // namespace json
|
||||
} // namespace db
|
||||
} // namespace ircd
|
||||
|
||||
ircd::json::printer::printer()
|
||||
{
|
||||
const auto recursor([this](auto &a, auto &b, auto &c)
|
||||
|
|
Loading…
Reference in a new issue