0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-02 18:18:56 +02:00

ircd::db: Various improvements to DB subsystem.

This commit is contained in:
Jason Volk 2017-08-23 14:37:47 -06:00
parent 00b27909a0
commit ca608402f5
9 changed files with 498 additions and 293 deletions

View file

@ -63,7 +63,7 @@ extern struct log::log log;
//
// These are forward declarations to objects we may carry a pointer to.
// Users of ircd::db should not be dealing with these types.
// Users of ircd::db should not have to deal directly with these types.
//
namespace rocksdb
{

View file

@ -32,6 +32,11 @@ namespace db {
// for satisfaction. Cells from different columns sharing the same key are composed
// into a `row` (see: row.h).
//
// When composed into a `row` or `object` remember that calls to cell::key() will
// all be the same index key -- not the name of the column the cell is representing
// in the row. You probably want cell::col() when iterating the row to build a
// JSON object's keys when iterating across a row.
//
// NOTE that this cell struct is type-agnostic. The database is capable of storing
// binary data in the key or the value for a cell. The string_view will work with
// both a normal string and binary data, so this class is not a template and offers
@ -69,6 +74,9 @@ struct cell
explicit operator string_view() const; // empty on !valid()
explicit operator string_view(); // reload then empty on !valid()
// [SET] Perform operation on this cell only
void operator()(const op &, const string_view &val = {}, const sopts & = {});
// [SET] assign cell
cell &operator=(const string_view &);
@ -76,9 +84,10 @@ struct cell
bool compare_exchange(string_view &expected, const string_view &desired);
string_view exchange(const string_view &desired);
// [GET] load cell only (returns valid)
bool load(gopts = {});
cell(column, const string_view &index, std::unique_ptr<rocksdb::Iterator>);
cell(column, const string_view &index, std::unique_ptr<rocksdb::Iterator>, gopts = {});
cell(column, const string_view &index, gopts = {});
cell(database &, const string_view &column, const string_view &index, gopts = {});
cell();
@ -92,48 +101,38 @@ struct cell
};
//
// A delta is an element of a database transaction. The cell::delta
// is specific to a key in a column. Use cell deltas to make an
// all-succeed-or-all-fail transaction across many cells in many columns.
// A delta is an element of a database transaction. You can use this to change
// the values of cells. Use cell deltas to make an all-succeed-or-all-fail
// transaction across many cells in various columns at once.
//
struct cell::delta
:std::tuple<op, cell &, string_view>
{
delta(const enum op &op, cell &c, const string_view &val = {})
delta(cell &c, const string_view &val, const enum op &op = op::SET)
:std::tuple<enum op, cell &, string_view>{op, c, val}
{}
delta(cell &c, const string_view &val, const enum op &op = op::SET)
delta(const enum op &op, cell &c, const string_view &val = {})
:std::tuple<enum op, cell &, string_view>{op, c, val}
{}
};
// [SET] Perform operations in a sequence as a single transaction.
void write(const cell::delta &, const sopts & = {});
// [SET] Perform operations in a sequence as a single transaction. No template
// iterators supported yet, just a ptr range good for contiguous sequences like
// vectors and initializer_lists. To support any iterator I think we'll need to
// forward-expose a wrapping of rocksdb::WriteBatch.
void write(const cell::delta *const &begin, const cell::delta *const &end, const sopts & = {});
void write(const std::initializer_list<cell::delta> &, const sopts & = {});
void write(const sopts &, const std::initializer_list<cell::delta> &);
void write(const cell::delta &, const sopts & = {});
// Util
const std::string &name(const cell &);
uint64_t sequence(const cell &);
} // namespace db
} // namespace ircd
inline
uint64_t
ircd::db::sequence(const cell &c)
{
const database::snapshot &ss(c);
return sequence(ss);
}
inline
const std::string &
ircd::db::name(const cell &c)
{
return name(c.c);
}
inline std::ostream &
ircd::db::operator<<(std::ostream &s, const cell &c)
{

View file

@ -95,11 +95,12 @@ struct column
void operator()(const string_view &key, const view_closure &func, const gopts & = {});
void operator()(const string_view &key, const gopts &, const view_closure &func);
// [SET] Perform operations in a sequence as a single transaction.
void operator()(const delta &, const sopts & = {});
// [SET] Perform operations in a sequence as a single transaction. No template iterators
// supported yet, just a ContiguousContainer iteration (and derived convenience overloads)
void operator()(const delta *const &begin, const delta *const &end, const sopts & = {});
void operator()(const std::initializer_list<delta> &, const sopts & = {});
void operator()(const sopts &, const std::initializer_list<delta> &);
void operator()(const op &, const string_view &key, const string_view &val = {}, const sopts & = {});
void operator()(const delta &, const sopts & = {});
explicit column(std::shared_ptr<database::column> c);
column(database::column &c);
@ -110,17 +111,24 @@ struct column
//
// Delta is an element of a transaction. Use column::delta's to atomically
// commit to multiple keys in the same column. Refer to delta.h for the `enum op`
// choices. Refer to cell::delta to transact with multiple cells across
// different columns.
// choices. Refer to cell::delta to transact with multiple cells across different
// columns. Refer to row::delta to transact with entire rows.
//
// Note, for now, unlike cell::delta and row::delta, the column::delta has
// no reference to the column in its tuple. This is why these deltas are executed
// through the member column::operator() and not an overload of db::write().
//
// It is unlikely you will need to work with column deltas directly because
// you may decohere one column from the others participating in a row.
//
struct column::delta
:std::tuple<op, string_view, string_view>
{
delta(const enum op &op, const string_view &key, const string_view &val = {})
delta(const string_view &key, const string_view &val, const enum op &op = op::SET)
:std::tuple<enum op, string_view, string_view>{op, key, val}
{}
delta(const string_view &key, const string_view &val, const enum op &op = op::SET)
delta(const enum op &op, const string_view &key, const string_view &val = {})
:std::tuple<enum op, string_view, string_view>{op, key, val}
{}
};
@ -146,12 +154,12 @@ struct column::const_iterator
const_iterator(std::shared_ptr<database::column>, std::unique_ptr<rocksdb::Iterator> &&, gopts = {});
public:
operator const database::column &() const { return *c; }
operator const database::snapshot &() const { return opts.snapshot; }
explicit operator const gopts &() const { return opts; }
explicit operator const database::snapshot &() const;
explicit operator const database::column &() const;
explicit operator const gopts &() const;
operator database::column &() { return *c; }
explicit operator database::snapshot &() { return opts.snapshot; }
explicit operator database::snapshot &();
explicit operator database::column &();
operator bool() const;
bool operator!() const;
@ -209,27 +217,60 @@ void flush(column &, const bool &blocking = false);
} // namespace db
} // namespace ircd
inline
ircd::db::column::operator database::column &()
inline ircd::db::column::const_iterator::operator
database::column &()
{
return *c;
}
inline
ircd::db::column::operator database &()
inline ircd::db::column::const_iterator::operator
database::snapshot &()
{
return database::get(*c);
return opts.snapshot;
}
inline
ircd::db::column::operator const database::column &()
inline ircd::db::column::const_iterator::operator
const gopts &()
const
{
return opts;
}
inline ircd::db::column::const_iterator::operator
const database::column &()
const
{
return *c;
}
inline
ircd::db::column::operator const database &()
inline ircd::db::column::const_iterator::operator
const database::snapshot &()
const
{
return opts.snapshot;
}
inline ircd::db::column::operator
database::column &()
{
return *c;
}
inline ircd::db::column::operator
database &()
{
return database::get(*c);
}
inline ircd::db::column::operator
const database::column &()
const
{
return *c;
}
inline ircd::db::column::operator
const database &()
const
{
return database::get(*c);

View file

@ -64,6 +64,7 @@ struct database
std::shared_ptr<rocksdb::Cache> cache;
std::map<string_view, std::shared_ptr<column>> columns;
custom_ptr<rocksdb::DB> d;
unique_const_iterator<decltype(dbs)> dbs_it;
operator std::shared_ptr<database>() { return shared_from_this(); }
operator const rocksdb::DB &() const { return *d; }
@ -72,12 +73,12 @@ struct database
const column &operator[](const string_view &) const;
column &operator[](const string_view &);
database(const std::string &name,
const std::string &options,
database(std::string name,
std::string options,
description);
database(const std::string &name,
const std::string &options = {});
database(std::string name,
std::string options = {});
database() = default;
database(database &&) = delete;
@ -148,19 +149,15 @@ struct database::options::map
struct database::snapshot
{
std::weak_ptr<database> d;
std::shared_ptr<const rocksdb::Snapshot> s;
public:
operator const database &() const { return *d.lock(); }
operator const rocksdb::Snapshot *() const { return s.get(); }
operator database &() { return *d.lock(); }
operator bool() const { return bool(s); }
explicit operator bool() const { return bool(s); }
bool operator !() const { return !s; }
snapshot(database &);
explicit snapshot(database &);
snapshot() = default;
~snapshot() noexcept;
};

View file

@ -36,6 +36,10 @@ enum op
SINGLE_DELETE, // (batch.SingleDelete)
};
// Indicates an op uses both a key and value for its operation. Some only use
// a key name so an empty value argument in a delta is okay when false.
bool value_required(const op &op);
using merge_delta = std::pair<string_view, string_view>;
using merge_closure = std::function<std::string (const string_view &key, const merge_delta &)>;
using update_closure = std::function<std::string (const string_view &key, merge_delta &)>;

View file

@ -23,42 +23,11 @@
#pragma once
#define HAVE_IRCD_DB_OBJECT_H
// handler register(username):
//
// (let database value registered = 0)
//
// context A | context B
// |
//0 enter |
//1 if(registered) | enter ; A yields on cache-miss/IO read
//2 | if(registered) ; B resumes hitting cached value A fetched
//3 | bnt return; ; B continues without yield
//4 | registered = time(nullptr); ; B assigns B's value to database
//5 b?t return; | leave ; A resumes [what does if() see?]
//6 registered = time(nullptr); | ; A overwrites B's value
//7 leave | ; ???
#include "value.h"
namespace ircd {
namespace db {
template<class T,
database *const &d>
struct value
{
};
template<database *const &d>
struct value<void, d>
:cell
{
value(const string_view &name,
const string_view &index)
:cell{*d, name, index}
{}
using cell::cell;
};
template<database *const &d>
struct object
:row
@ -68,14 +37,21 @@ struct object
string_view prefix;
string_view index;
object(const string_view &prefix, const string_view &index);
template<class T = string_view> value<d, T> get(const string_view &name);
object(const string_view &index, const string_view &prefix = {}, gopts = {});
object(const string_view &index, const std::initializer_list<json::index::member> &, gopts = {});
object() = default;
};
template<database *const &d>
object<d>::object(const string_view &prefix,
const string_view &index)
:row{[&prefix, &index]() -> row
} // namespace db
} // namespace ircd
template<ircd::db::database *const &d>
ircd::db::object<d>::object(const string_view &index,
const string_view &prefix,
gopts opts)
:row{[&prefix, &index, &opts]() -> row
{
// The prefix is the name of the object we want to find members in.
// This function has to find columns starting with the prefix but not
@ -110,116 +86,27 @@ object<d>::object(const string_view &prefix,
// Clear empty names from the array before passing up to row{}
const auto end(std::remove(names, names + std::distance(low, hi), string_view{}));
const auto count(std::distance(names, end));
return row
return
{
*d, index, vector_view<string_view>(names, count)
*d, index, vector_view<string_view>(names, count), opts
};
}()}
,index{index}
{
}
template<database *const &d>
struct value<string_view ,d>
:value<void, d>
template<ircd::db::database *const &d>
ircd::db::object<d>::object(const string_view &index,
const std::initializer_list<json::index::member> &members,
gopts opts)
:object{index, string_view{}, opts}
{
operator string_view() const
{
return string_view{static_cast<const cell &>(*this)};
}
operator string_view()
{
return string_view{static_cast<cell &>(*this)};
}
value &operator=(const string_view &val)
{
static_cast<cell &>(*this) = val;
return *this;
}
value(const string_view &col,
const string_view &row)
:value<void, d>{col, row}
{}
friend std::ostream &operator<<(std::ostream &s, const value<string_view, d> &v)
{
s << string_view{v};
return s;
}
};
template<class T,
database *const &d>
struct arithmetic_value
:value<void, d>
{
bool compare_exchange(T &expected, const T &desired)
{
const auto ep(reinterpret_cast<const char *>(&expected));
const auto dp(reinterpret_cast<const char *>(&desired));
string_view s{ep, expected? sizeof(T) : 0};
const auto ret(cell::compare_exchange(s, string_view{dp, sizeof(T)}));
expected = !s.empty()? *reinterpret_cast<const T *>(s.data()) : 0;
return ret;
}
T exchange(const T &desired)
{
const auto dp(reinterpret_cast<const char *>(&desired));
const auto ret(cell::exchange(string_view{dp, desired? sizeof(T) : 0}));
return !ret.empty()? *reinterpret_cast<const T *>(ret.data()) : 0;
}
operator T() const
{
const auto val(this->val());
return !val.empty()? *reinterpret_cast<const T *>(val.data()) : 0;
}
operator T()
{
const auto val(this->val());
return !val.empty()? *reinterpret_cast<const T *>(val.data()) : 0;
}
arithmetic_value &operator=(const T &val)
{
cell &cell(*this);
const auto ptr(reinterpret_cast<const char *>(&val));
cell = string_view{ptr, val? sizeof(T) : 0};
return *this;
}
friend std::ostream &operator<<(std::ostream &s, const arithmetic_value &v)
{
s << T(v);
return s;
}
arithmetic_value(const string_view &col,
const string_view &row)
:value<void, d>{col, row}
{}
};
#define IRCD_ARITHMETIC_VALUE(_type_) \
template<database *const &d> \
struct value<_type_, d> \
:arithmetic_value<_type_, d> \
{ \
using arithmetic_value<_type_, d>::arithmetic_value; \
}
IRCD_ARITHMETIC_VALUE(uint64_t);
IRCD_ARITHMETIC_VALUE(int64_t);
IRCD_ARITHMETIC_VALUE(uint32_t);
IRCD_ARITHMETIC_VALUE(int32_t);
IRCD_ARITHMETIC_VALUE(uint16_t);
IRCD_ARITHMETIC_VALUE(int16_t);
} // namespace db
} // namespace ircd
template<ircd::db::database *const &d>
template<class T>
ircd::db::value<d, T>
ircd::db::object<d>::get(const string_view &name)
{
return row::operator[](name);
}

View file

@ -47,7 +47,10 @@ enum class set
struct sopts
:optlist<set>
{
template<class... list> sopts(list&&... l): optlist<set>{std::forward<list>(l)...} {}
template<class... list>
sopts(list&&... l)
:optlist<set>{std::forward<list>(l)...}
{}
};
enum class get
@ -66,7 +69,10 @@ struct gopts
{
database::snapshot snapshot;
template<class... list> gopts(list&&... l): optlist<get>{std::forward<list>(l)...} {}
template<class... list>
gopts(list&&... l)
:optlist<get>{std::forward<list>(l)...}
{}
};
} // namespace db

View file

@ -26,9 +26,13 @@
namespace ircd {
namespace db {
// A row is a collection of cells from different columns which all share the same
// A `row` is a collection of cells from different columns which all share the same
// key. This is an interface for dealing with those cells in the aggregate.
//
// Note that in a `row` each `cell` comes from a different `column`, but `cell::key()`
// will all return the same index value across the whole `row`. To get the names
// of the columns themselves to build ex. the key name of a JSON key-value pair,
// use `cell::col()`, which will be different for each `cell` across the `row`.
struct row
{
struct delta;
@ -73,7 +77,7 @@ struct row
row(database &,
const string_view &key = {},
const vector_view<string_view> &columns = {},
const gopts &opts = {});
gopts opts = {});
friend size_t trim(row &, const std::function<bool (cell &)> &);
friend size_t trim(row &, const string_view &key); // remove invalid or not equal
@ -138,6 +142,34 @@ struct row::iterator
friend bool operator!=(const iterator &, const iterator &);
};
//
// A delta is an element of a database transaction. You can use this to make
// an all-succeed-or-all-fail commitment to multiple rows at once. It is also
// useful to make a commitment on a single row as a convenient way to compose
// all of a row's cells together.
//
struct row::delta
:std::tuple<op, row &>
{
delta(row &r, const enum op &op = op::SET)
:std::tuple<enum op, row &>{op, r}
{}
delta(const enum op &op, row &r)
:std::tuple<enum op, row &>{op, r}
{}
};
// [SET] Perform operations in a sequence as a single transaction. No template
// iterators supported yet, just a ptr range good for contiguous sequences.
void write(const row::delta *const &begin, const row::delta *const &end, const sopts & = {});
void write(const std::initializer_list<row::delta> &, const sopts & = {});
void write(const sopts &, const std::initializer_list<row::delta> &);
void write(const row::delta &, const sopts & = {});
// [SET] Delete row from DB (convenience to an op::DELETE delta)
void del(row &, const sopts & = {});
} // namespace db
} // namespace ircd

View file

@ -55,6 +55,7 @@ string_view slice(const rocksdb::Slice &);
// Frequently used get options and set options are separate from the string/map system
rocksdb::WriteOptions make_opts(const sopts &);
rocksdb::ReadOptions make_opts(const gopts &, const bool &iterator = false);
bool optstr_find_and_remove(std::string &optstr, const std::string &what);
enum class pos
{
@ -229,13 +230,27 @@ database::dbs
} // namespace db
} // namespace ircd
///////////////////////////////////////////////////////////////////////////////
//
// init
//
namespace ircd {
namespace db {
static void init_directory();
static void init_version();
} // namespace db
} // namespace ircd
static char ircd_db_version[64];
const char *const ircd::db::version(ircd_db_version);
// Renders a version string from the defines included here.
__attribute__((constructor))
static void
version_init()
ircd::db::init_version()
{
snprintf(ircd_db_version, sizeof(ircd_db_version), "%d.%d.%d",
ROCKSDB_MAJOR,
@ -243,6 +258,33 @@ version_init()
ROCKSDB_PATCH);
}
static void
ircd::db::init_directory()
try
{
const auto dbdir(fs::get(fs::DB));
if(fs::mkdir(dbdir))
log.warning("Created new database directory at `%s'", dbdir);
else
log.info("Using database directory at `%s'", dbdir);
}
catch(const fs::error &e)
{
log.error("Cannot start database system: %s", e.what());
if(ircd::debugmode)
throw;
}
ircd::db::init::init()
{
init_directory();
}
ircd::db::init::~init()
noexcept
{
}
///////////////////////////////////////////////////////////////////////////////
//
// database
@ -291,22 +333,22 @@ ircd::db::shared_from(const database::column &column)
// database
//
ircd::db::database::database(const std::string &name,
const std::string &optstr)
ircd::db::database::database(std::string name,
std::string optstr)
:database
{
name, optstr, {}
std::move(name), std::move(optstr), {}
}
{
}
ircd::db::database::database(const std::string &name,
const std::string &optstr,
ircd::db::database::database(std::string name,
std::string optstr,
description description)
try
:name
{
name
std::move(name)
}
,path
{
@ -338,17 +380,31 @@ try
,d{[this, &description, &optstr]
() -> custom_ptr<rocksdb::DB>
{
// RocksDB doesn't parse a read_only option, so we allow that to be added
// to open the database as read_only and then remove that from the string.
const bool read_only
{
optstr_find_and_remove(optstr, "read_only=true;"s)
};
// We also allow the user to specify fsck=true to run a repair operation on
// the db. This may be expensive to do by default every startup.
const bool fsck
{
optstr_find_and_remove(optstr, "fsck=true;"s)
};
// Generate RocksDB options from string
rocksdb::DBOptions opts
{
options(optstr)
};
// Setup sundry
opts.error_if_exists = false;
opts.create_if_missing = true;
opts.create_missing_column_families = true;
opts.max_file_opening_threads = 0;
opts.use_fsync = true;
//opts.use_fsync = true;
// Setup logging
logs->SetInfoLogLevel(ircd::debugmode? rocksdb::DEBUG_LEVEL : rocksdb::WARN_LEVEL);
@ -418,53 +474,76 @@ try
{
return static_cast<const rocksdb::ColumnFamilyDescriptor &>(*pair.second);
});
/*
if(fs::is_dir(path))
if(fsck && fs::is_dir(path))
{
log.info("Checking database @ `%s' columns[%zu]", path, columns.size());
throw_on_error(rocksdb::RepairDB(path, opts, columns));
log.info("Database @ `%s' check complete", path, columns.size());
log.info("Checking database @ `%s' columns[%zu]",
path,
columns.size());
throw_on_error
{
rocksdb::RepairDB(path, opts, columns)
};
log.info("Database @ `%s' check complete",
path,
columns.size());
}
//if(has_opt(opts, opt::READ_ONLY))
// throw_on_error(rocksdb::DB::OpenForReadOnly(*this->opts, path, columns, &handles, &ptr));
//else
*/
// Announce attempt before usual point where exceptions are thrown
log.debug("Opening database \"%s\" @ `%s' columns[%zu]",
this->name,
path,
columns.size());
throw_on_error
{
rocksdb::DB::Open(opts, path, columns, &handles, &ptr)
};
if(read_only)
throw_on_error
{
rocksdb::DB::OpenForReadOnly(opts, path, columns, &handles, &ptr)
};
else
throw_on_error
{
rocksdb::DB::Open(opts, path, columns, &handles, &ptr)
};
for(const auto &handle : handles)
this->columns.at(handle->GetName())->handle.reset(handle);
return { ptr, deleter };
}()}
,dbs_it
{
dbs, dbs.emplace(string_view{this->name}, this).first
}
{
log.info("'%s': Opened database @ `%s' (handle: %p) columns[%zu] seq[%zu]",
name,
this->name,
path,
(const void *)this,
columns.size(),
d->GetLatestSequenceNumber());
dbs.emplace(string_view{this->name}, this);
}
catch(const std::exception &e)
{
throw error("Failed to open db '%s': %s", name, e.what());
throw error("Failed to open db '%s': %s",
this->name,
e.what());
}
ircd::db::database::~database()
noexcept
{
log.debug("'%s': closing database @ `%s'", name, path);
dbs.erase(name);
const auto background_errors
{
property<uint64_t>(*this, rocksdb::DB::Properties::kBackgroundErrors)
};
log.debug("'%s': closing database @ `%s' (background errors: %lu)",
name,
path,
background_errors);
}
ircd::db::database::column &
@ -475,7 +554,9 @@ try
}
catch(const std::out_of_range &e)
{
throw error("'%s': column '%s' is not available or specified in schema", this->name, name);
throw schema_error("'%s': column '%s' is not available or specified in schema",
this->name,
name);
}
const ircd::db::database::column &
@ -486,7 +567,9 @@ const try
}
catch(const std::out_of_range &e)
{
throw error("'%s': column '%s' is not available or specified in schema", this->name, name);
throw schema_error("'%s': column '%s' is not available or specified in schema",
this->name,
name);
}
ircd::db::database &
@ -522,8 +605,8 @@ ircd::db::database::comparator::Equal(const Slice &a,
const
{
assert(bool(user.equal));
const string_view sa{a.data(), a.size()};
const string_view sb{b.data(), b.size()};
const string_view sa{slice(a)};
const string_view sb{slice(b)};
return user.equal(sa, sb);
}
@ -533,8 +616,8 @@ ircd::db::database::comparator::Compare(const Slice &a,
const
{
assert(bool(user.less));
const string_view sa{a.data(), a.size()};
const string_view sb{b.data(), b.size()};
const string_view sa{slice(a)};
const string_view sb{slice(b)};
return user.less(sa, sb)? -1:
user.less(sb, sa)? 1:
0;
@ -644,6 +727,8 @@ ircd::db::database::column::column(database *const &d,
this->options.comparator = &this->cmp;
//this->options.prefix_extractor = std::shared_ptr<const rocksdb::SliceTransform>(rocksdb::NewNoopTransform());
//if(d->mergeop->merger)
// this->options.merge_operator = d->mergeop;
@ -728,20 +813,30 @@ ircd::db::sequence(const database::snapshot &s)
}
ircd::db::database::snapshot::snapshot(database &d)
:d{weak_from(d)}
,s{[this, &d]() -> const rocksdb::Snapshot *
:s
{
return d.d->GetSnapshot();
}()
,[this](const rocksdb::Snapshot *const s)
{
if(!s)
return;
d.d->GetSnapshot(),
[dp(weak_from(d))](const rocksdb::Snapshot *const s)
{
if(!s)
return;
const auto d(this->d.lock());
d->d->ReleaseSnapshot(s);
}}
const auto d(dp.lock());
log.debug("'%s' @%p: snapshot(@%p) release seq[%lu]",
db::name(*d),
d->d.get(),
s,
s->GetSequenceNumber());
d->d->ReleaseSnapshot(s);
}
}
{
log.debug("'%s' @%p: snapshot(@%p) seq[%lu]",
db::name(d),
d.d.get(),
s.get(),
sequence(*this));
}
ircd::db::database::snapshot::~snapshot()
@ -806,6 +901,10 @@ ircd::db::database::logs::Logv(const rocksdb::InfoLogLevel level,
lstrip(buf, ' ')
};
// Skip the options for now
if(startswith(str, "Options"))
return;
log(translate(level), "'%s': (rdb) %s", d->name, str);
}
@ -1018,6 +1117,26 @@ ircd::db::database::events::OnColumnFamilyHandleDeletionStarted(rocksdb::ColumnF
// db/cell.h
//
uint64_t
ircd::db::sequence(const cell &c)
{
const database::snapshot &ss(c);
return sequence(database::snapshot(c));
}
const std::string &
ircd::db::name(const cell &c)
{
return name(c.c);
}
void
ircd::db::write(const cell::delta &delta,
const sopts &sopts)
{
write(&delta, &delta + 1, sopts);
}
void
ircd::db::write(const sopts &sopts,
const std::initializer_list<cell::delta> &deltas)
@ -1029,34 +1148,37 @@ void
ircd::db::write(const std::initializer_list<cell::delta> &deltas,
const sopts &sopts)
{
if(!deltas.size())
write(std::begin(deltas), std::end(deltas), sopts);
}
void
ircd::db::write(const cell::delta *const &begin,
const cell::delta *const &end,
const sopts &sopts)
{
if(begin == end)
return;
auto &front(*std::begin(deltas));
// Find the database through one of the cell's columns. cell::deltas
// may come from different columns so we do nothing else with this.
auto &front(*begin);
column &c(std::get<cell &>(front).c);
database &d(c);
rocksdb::WriteBatch batch;
for(const auto &delta : deltas)
append(batch, delta);
auto opts(make_opts(sopts));
throw_on_error
std::for_each(begin, end, [&batch]
(const cell::delta &delta)
{
d.d->Write(opts, &batch)
};
}
append(batch, delta);
});
void
ircd::db::write(const cell::delta &delta,
const sopts &sopts)
{
column &c(std::get<cell &>(delta).c);
database &d(c);
rocksdb::WriteBatch batch;
append(batch, delta);
auto opts(make_opts(sopts));
log.debug("'%s' @%lu PUT %zu cell deltas",
name(d),
sequence(d),
std::distance(begin, end));
// Commitment
throw_on_error
{
d.d->Write(opts, &batch)
@ -1104,9 +1226,11 @@ ircd::db::cell::cell(column column,
ircd::db::cell::cell(column column,
const string_view &index,
std::unique_ptr<rocksdb::Iterator> it)
std::unique_ptr<rocksdb::Iterator> it,
gopts opts)
:c{std::move(column)}
,index{index}
,ss{std::move(opts.snapshot)}
,it{std::move(it)}
{
}
@ -1158,12 +1282,11 @@ ircd::db::cell::load(gopts opts)
while(tit && tit->Valid())
{
auto batchres(tit->GetBatch());
std::cout << "seq: " << batchres.sequence;
//std::cout << "seq: " << batchres.sequence;
if(batchres.writeBatchPtr)
{
auto &batch(*batchres.writeBatchPtr);
std::cout << " count " << batch.Count() << " ds: " << batch.GetDataSize()
<< " " << batch.Data() << std::endl;
//std::cout << " count " << batch.Count() << " ds: " << batch.GetDataSize() << " " << batch.Data() << std::endl;
}
tit->Next();
@ -1207,12 +1330,22 @@ ircd::db::cell::operator=(const string_view &s)
return *this;
}
ircd::db::cell::operator string_view()
void
ircd::db::cell::operator()(const op &op,
const string_view &val,
const sopts &sopts)
{
write(cell::delta{op, *this, val}, sopts);
}
ircd::db::cell::operator
string_view()
{
return val();
}
ircd::db::cell::operator string_view()
ircd::db::cell::operator
string_view()
const
{
return val();
@ -1262,10 +1395,82 @@ const
// db/row.h
//
void
ircd::db::del(row &row,
const sopts &sopts)
{
write(row::delta{op::DELETE, row}, sopts);
}
void
ircd::db::write(const row::delta &delta,
const sopts &sopts)
{
write(&delta, &delta + 1, sopts);
}
void
ircd::db::write(const sopts &sopts,
const std::initializer_list<row::delta> &deltas)
{
write(deltas, sopts);
}
void
ircd::db::write(const std::initializer_list<row::delta> &deltas,
const sopts &sopts)
{
write(std::begin(deltas), std::end(deltas), sopts);
}
void
ircd::db::write(const row::delta *const &begin,
const row::delta *const &end,
const sopts &sopts)
{
// Count the total number of cells for this transaction.
const auto cells
{
std::accumulate(begin, end, size_t(0), []
(auto ret, const row::delta &delta)
{
const auto &row(std::get<row &>(delta));
return ret += row.size();
})
};
//TODO: allocator?
std::vector<cell::delta> deltas;
deltas.reserve(cells);
// Compose all of the cells from all of the rows into a single txn
std::for_each(begin, end, [&deltas]
(const auto &delta)
{
auto &row(std::get<row &>(delta));
const auto &op(std::get<op>(delta));
std::for_each(std::begin(row), std::end(row), [&deltas, &op]
(auto &cell)
{
// For operations like DELETE which don't require a value in
// the delta, we can skip a potentially expensive load of the cell.
const auto value
{
value_required(op)? cell.val() : string_view{}
};
deltas.emplace_back(op, cell, value);
});
});
// Commitment
write(&deltas.front(), &deltas.front() + deltas.size(), sopts);
}
ircd::db::row::row(database &d,
const string_view &key,
const vector_view<string_view> &colnames,
const gopts &opts)
gopts opts)
:its{[this, &d, &key, &colnames, &opts]
{
using std::end;
@ -1273,11 +1478,15 @@ ircd::db::row::row(database &d,
using rocksdb::Iterator;
using rocksdb::ColumnFamilyHandle;
if(!opts.snapshot)
opts.snapshot = database::snapshot(d);
const rocksdb::ReadOptions options
{
make_opts(opts)
};
//TODO: allocator
std::vector<database::column *> colptr
{
colnames.empty()? d.columns.size() : colnames.size()
@ -1296,6 +1505,7 @@ ircd::db::row::row(database &d,
return &d[name];
});
//TODO: allocator
std::vector<ColumnFamilyHandle *> handles(colptr.size());
std::transform(begin(colptr), end(colptr), begin(handles), []
(database::column *const &ptr)
@ -1303,6 +1513,7 @@ ircd::db::row::row(database &d,
return ptr->handle.get();
});
//TODO: does this block?
std::vector<Iterator *> iterators;
throw_on_error
{
@ -1313,7 +1524,7 @@ ircd::db::row::row(database &d,
for(size_t i(0); i < ret.size(); ++i)
{
std::unique_ptr<Iterator> it(iterators.at(i));
ret[i] = cell { *colptr.at(i), key, std::move(it) };
ret[i] = cell { *colptr.at(i), key, std::move(it), opts };
}
return ret;
@ -1724,12 +1935,10 @@ ircd::db::has(column &column,
}
void
ircd::db::column::operator()(const op &op,
const string_view &key,
const string_view &val,
ircd::db::column::operator()(const delta &delta,
const sopts &sopts)
{
operator()(delta{op, key, val}, sopts);
operator()(&delta, &delta + 1, sopts);
}
void
@ -1743,27 +1952,29 @@ void
ircd::db::column::operator()(const std::initializer_list<delta> &deltas,
const sopts &sopts)
{
rocksdb::WriteBatch batch;
for(const auto &delta : deltas)
append(batch, *this, delta);
database &d(*this);
auto opts(make_opts(sopts));
throw_on_error
{
d.d->Write(opts, &batch)
};
operator()(std::begin(deltas), std::end(deltas), sopts);
}
void
ircd::db::column::operator()(const delta &delta,
ircd::db::column::operator()(const delta *const &begin,
const delta *const &end,
const sopts &sopts)
{
rocksdb::WriteBatch batch;
append(batch, *this, delta);
database &d(*this);
rocksdb::WriteBatch batch;
std::for_each(begin, end, [this, &batch]
(const delta &delta)
{
append(batch, *this, delta);
});
auto opts(make_opts(sopts));
log.debug("'%s' @%lu PUT %zu column deltas",
name(d),
sequence(d),
std::distance(begin, end));
throw_on_error
{
d.d->Write(opts, &batch)
@ -1950,7 +2161,7 @@ const
if(!it)
return true;
if(!it->Valid())
if(!valid(*it))
return true;
return false;
@ -2057,11 +2268,10 @@ ircd::db::seek(column &column,
const string_view &key,
const gopts &opts)
{
using rocksdb::Iterator;
database &d(column);
database::column &c(column);
std::unique_ptr<Iterator> ret;
std::unique_ptr<rocksdb::Iterator> ret;
seek(c, key, opts, ret);
return std::move(ret);
}
@ -2388,6 +2598,18 @@ const
return ret;
}
bool
ircd::db::optstr_find_and_remove(std::string &optstr,
const std::string &what)
{
const auto pos(optstr.find(what));
if(pos == std::string::npos)
return false;
optstr.erase(pos, what.size());
return true;
}
rocksdb::ReadOptions
ircd::db::make_opts(const gopts &opts,
const bool &iterator)
@ -2605,3 +2827,20 @@ ircd::db::reflect(const rocksdb::Histograms &type)
static const auto empty{"<histogram>?????"s};
return it != end(names)? it->second : empty;
}
bool
ircd::db::value_required(const op &op)
{
switch(op)
{
case op::SET:
case op::MERGE:
case op::DELETE_RANGE:
return true;
case op::GET:
case op::DELETE:
case op::SINGLE_DELETE:
return false;
}
}