0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 10:12:39 +01:00

ircd::db: Support constant-time column lookup by id; various cleanup/fixes.

This commit is contained in:
Jason Volk 2017-09-18 19:17:36 -07:00
parent 469d286175
commit b27a2a6423
4 changed files with 271 additions and 135 deletions

View file

@ -62,6 +62,7 @@ namespace ircd::db
// Information about a column // Information about a column
const std::string &name(const column &); const std::string &name(const column &);
uint32_t id(const column &);
size_t file_count(column &); size_t file_count(column &);
size_t bytes(column &); size_t bytes(column &);

View file

@ -68,12 +68,14 @@ struct ircd::db::database
std::string name; std::string name;
std::string path; std::string path;
std::string optstr;
std::shared_ptr<struct logs> logs; std::shared_ptr<struct logs> logs;
std::shared_ptr<struct stats> stats; std::shared_ptr<struct stats> stats;
std::shared_ptr<struct events> events; std::shared_ptr<struct events> events;
std::shared_ptr<struct mergeop> mergeop; std::shared_ptr<struct mergeop> mergeop;
std::shared_ptr<rocksdb::Cache> cache; std::shared_ptr<rocksdb::Cache> cache;
std::map<string_view, std::shared_ptr<column>> columns; std::map<std::string, size_t, std::less<>> column_names;
std::vector<std::shared_ptr<column>> columns;
custom_ptr<rocksdb::DB> d; custom_ptr<rocksdb::DB> d;
unique_const_iterator<decltype(dbs)> dbs_it; unique_const_iterator<decltype(dbs)> dbs_it;
@ -81,8 +83,10 @@ struct ircd::db::database
operator const rocksdb::DB &() const { return *d; } operator const rocksdb::DB &() const { return *d; }
operator rocksdb::DB &() { return *d; } operator rocksdb::DB &() { return *d; }
const column &operator[](const string_view &) const; const column &operator[](const uint32_t &id) const;
column &operator[](const string_view &); const column &operator[](const string_view &name) const;
column &operator[](const uint32_t &id);
column &operator[](const string_view &name);
// [SET] Perform operations in a sequence as a single transaction. // [SET] Perform operations in a sequence as a single transaction.
void operator()(const sopts &, const delta *const &begin, const delta *const &end); void operator()(const sopts &, const delta *const &begin, const delta *const &end);

View file

@ -39,7 +39,8 @@ namespace ircd::db
// Indicates an op uses both a key and value for its operation. Some only use // Indicates an op uses both a key and value for its operation. Some only use
// a key name so an empty value argument in a delta is okay when false. // a key name so an empty value argument in a delta is okay when false.
bool value_required(const op &op); bool value_required(const op &);
string_view reflect(const op &);
} }
/// Update a database cell without `cell`, `column` or row `references`. /// Update a database cell without `cell`, `column` or row `references`.

View file

@ -55,6 +55,8 @@ namespace ircd::db
rocksdb::WriteOptions make_opts(const sopts &); rocksdb::WriteOptions make_opts(const sopts &);
rocksdb::ReadOptions make_opts(const gopts &, const bool &iterator = false); rocksdb::ReadOptions make_opts(const gopts &, const bool &iterator = false);
bool optstr_find_and_remove(std::string &optstr, const std::string &what); bool optstr_find_and_remove(std::string &optstr, const std::string &what);
rocksdb::DBOptions make_dbopts(std::string &optstr, bool *read_only = nullptr, bool *fsck = nullptr);
template<class... args> rocksdb::DBOptions make_dbopts(const std::string &, args&&...);
// Validation functors // Validation functors
bool valid(const rocksdb::Iterator &); bool valid(const rocksdb::Iterator &);
@ -77,6 +79,8 @@ namespace ircd::db
std::pair<string_view, string_view> operator*(const rocksdb::Iterator &); std::pair<string_view, string_view> operator*(const rocksdb::Iterator &);
// [SET] writebatch suite // [SET] writebatch suite
std::string debug(const rocksdb::WriteBatch &);
bool has(const rocksdb::WriteBatch &, const op &);
void commit(database &, rocksdb::WriteBatch &, const rocksdb::WriteOptions &); void commit(database &, rocksdb::WriteBatch &, const rocksdb::WriteOptions &);
void commit(database &, rocksdb::WriteBatch &, const sopts &); void commit(database &, rocksdb::WriteBatch &, const sopts &);
void append(rocksdb::WriteBatch &, column &, const column::delta &delta); void append(rocksdb::WriteBatch &, column &, const column::delta &delta);
@ -214,7 +218,7 @@ struct ircd::db::database::column
database *d; database *d;
std::type_index key_type; std::type_index key_type;
std::type_index mapped_type; std::type_index mapped_type;
struct descriptor descriptor; database::descriptor descriptor;
comparator cmp; comparator cmp;
prefix_transform prefix; prefix_transform prefix;
custom_ptr<rocksdb::ColumnFamilyHandle> handle; custom_ptr<rocksdb::ColumnFamilyHandle> handle;
@ -228,13 +232,15 @@ struct ircd::db::database::column
operator rocksdb::ColumnFamilyHandle *(); operator rocksdb::ColumnFamilyHandle *();
operator database &(); operator database &();
explicit column(database *const &d, struct descriptor); explicit column(database *const &d, const database::descriptor &);
column() = delete; column() = delete;
column(column &&) = delete; column(column &&) = delete;
column(const column &) = delete; column(const column &) = delete;
column &operator=(column &&) = delete; column &operator=(column &&) = delete;
column &operator=(const column &) = delete; column &operator=(const column &) = delete;
~column() noexcept; ~column() noexcept;
friend void flush(column &, const bool &blocking);
}; };
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -352,7 +358,7 @@ ircd::db::database::database(std::string name,
ircd::db::database::database(std::string name, ircd::db::database::database(std::string name,
std::string optstr, std::string optstr,
description description) const description &description)
try try
:name :name
{ {
@ -362,6 +368,10 @@ try
{ {
db::path(this->name) db::path(this->name)
} }
,optstr
{
std::move(optstr)
}
,logs ,logs
{ {
std::make_shared<struct logs>(this) std::make_shared<struct logs>(this)
@ -385,28 +395,33 @@ try
const auto lru_cache_size{64_MiB}; const auto lru_cache_size{64_MiB};
return rocksdb::NewLRUCache(lru_cache_size); return rocksdb::NewLRUCache(lru_cache_size);
}()} }()}
,d{[this, &description, &optstr] ,column_names{[this, &description]
{
// Existing columns
const auto opts(make_dbopts(std::string(this->optstr)));
std::set<std::string> existing;
for(auto &column_name : db::column_names(path, opts))
existing.emplace(std::move(column_name));
decltype(this->column_names) ret;
for(const auto &descriptor : description)
{
existing.erase(descriptor.name);
ret.emplace(descriptor.name, -1);
}
for(const auto &remain : existing)
throw error("Failed to describe existing column '%s'", remain);
return ret;
}()}
,d{[this, &description]
() -> custom_ptr<rocksdb::DB> () -> custom_ptr<rocksdb::DB>
{ {
// RocksDB doesn't parse a read_only option, so we allow that to be added bool fsck{false};
// to open the database as read_only and then remove that from the string. bool read_only{false};
const bool read_only auto opts(make_dbopts(this->optstr, &read_only, &fsck));
{
optstr_find_and_remove(optstr, "read_only=true;"s)
};
// We also allow the user to specify fsck=true to run a repair operation on
// the db. This may be expensive to do by default every startup.
const bool fsck
{
optstr_find_and_remove(optstr, "fsck=true;"s)
};
// Generate RocksDB options from string
rocksdb::DBOptions opts
{
options(optstr)
};
// Setup sundry // Setup sundry
opts.create_if_missing = true; opts.create_if_missing = true;
@ -435,36 +450,25 @@ try
opts.row_cache = this->cache; opts.row_cache = this->cache;
// Setup column families // Setup column families
for(auto &desc : description) for(const auto &desc : description)
{ {
const auto c(std::make_shared<column>(this, std::move(desc))); const auto c
columns.emplace(c->name, c);
}
// Existing columns
const auto column_names
{ {
db::column_names(path, opts) std::make_shared<column>(this, desc)
}; };
// Specified column descriptors have to describe all existing columns columns.emplace_back(c);
for(const auto &name : column_names) }
if(!columns.count(name))
throw error("Failed to describe existing column '%s'", name);
// Setup the database closer. // Setup the database closer.
const auto deleter([this](rocksdb::DB *const d) const auto deleter([this](rocksdb::DB *const d)
noexcept noexcept
{ {
throw_on_error const auto seq
{ {
d->SyncWAL() // blocking d->GetLatestSequenceNumber()
}; };
columns.clear();
//rocksdb::CancelAllBackgroundWork(d, true); // true = blocking
//throw_on_error(d->PauseBackgroundWork());
const auto seq(d->GetLatestSequenceNumber());
delete d; delete d;
log.info("'%s': closed database @ `%s' seq[%zu]", log.info("'%s': closed database @ `%s' seq[%zu]",
@ -478,9 +482,9 @@ try
std::vector<rocksdb::ColumnFamilyHandle *> handles; std::vector<rocksdb::ColumnFamilyHandle *> handles;
std::vector<rocksdb::ColumnFamilyDescriptor> columns(this->columns.size()); std::vector<rocksdb::ColumnFamilyDescriptor> columns(this->columns.size());
std::transform(begin(this->columns), end(this->columns), begin(columns), [] std::transform(begin(this->columns), end(this->columns), begin(columns), []
(const auto &pair) (const auto &column)
{ {
return static_cast<const rocksdb::ColumnFamilyDescriptor &>(*pair.second); return static_cast<const rocksdb::ColumnFamilyDescriptor &>(*column);
}); });
if(fsck && fs::is_dir(path)) if(fsck && fs::is_dir(path))
@ -517,7 +521,17 @@ try
}; };
for(const auto &handle : handles) for(const auto &handle : handles)
this->columns.at(handle->GetName())->handle.reset(handle); {
this->columns.at(handle->GetID())->handle.reset(handle);
this->column_names.at(handle->GetName()) = handle->GetID();
}
for(size_t i(0); i < this->columns.size(); ++i)
if(db::id(*this->columns[i]) != i)
throw error("Columns misaligned: expecting id[%zd] got id[%u] '%s'",
i,
db::id(*this->columns[i]),
db::name(*this->columns[i]));
return { ptr, deleter }; return { ptr, deleter };
}()} }()}
@ -543,6 +557,8 @@ catch(const std::exception &e)
ircd::db::database::~database() ircd::db::database::~database()
noexcept noexcept
{ {
//rocksdb::CancelAllBackgroundWork(d, true); // true = blocking
//throw_on_error(d->PauseBackgroundWork());
const auto background_errors const auto background_errors
{ {
property<uint64_t>(*this, rocksdb::DB::Properties::kBackgroundErrors) property<uint64_t>(*this, rocksdb::DB::Properties::kBackgroundErrors)
@ -552,6 +568,12 @@ noexcept
name, name,
path, path,
background_errors); background_errors);
columns.clear();
log.debug("'%s': closed columns; synchronizing to hardware...",
name);
sync(*this);
} }
void void
@ -614,28 +636,53 @@ ircd::db::database::operator()(const sopts &sopts,
ircd::db::database::column & ircd::db::database::column &
ircd::db::database::operator[](const string_view &name) ircd::db::database::operator[](const string_view &name)
try
{
return *columns.at(name);
}
catch(const std::out_of_range &e)
{ {
const auto it{column_names.find(name)};
if(unlikely(it == std::end(column_names)))
throw schema_error("'%s': column '%s' is not available or specified in schema", throw schema_error("'%s': column '%s' is not available or specified in schema",
this->name, this->name,
name); name);
return operator[](it->second);
}
ircd::db::database::column &
ircd::db::database::operator[](const uint32_t &id)
try
{
return *columns.at(id);
}
catch(const std::out_of_range &e)
{
throw schema_error("'%s': column id[%u] is not available or specified in schema",
this->name,
id);
} }
const ircd::db::database::column & const ircd::db::database::column &
ircd::db::database::operator[](const string_view &name) ircd::db::database::operator[](const string_view &name)
const try const
{
return *columns.at(name);
}
catch(const std::out_of_range &e)
{ {
const auto it{column_names.find(name)};
if(unlikely(it == std::end(column_names)))
throw schema_error("'%s': column '%s' is not available or specified in schema", throw schema_error("'%s': column '%s' is not available or specified in schema",
this->name, this->name,
name); name);
return operator[](it->second);
}
const ircd::db::database::column &
ircd::db::database::operator[](const uint32_t &id)
const try
{
return *columns.at(id);
}
catch(const std::out_of_range &e)
{
throw schema_error("'%s': column id[%u] is not available or specified in schema",
this->name,
id);
} }
ircd::db::database & ircd::db::database &
@ -796,16 +843,65 @@ const
// database::column // database::column
// //
ircd::db::database::column::column(database *const &d, void
struct descriptor descriptor) ircd::db::flush(database::column &c,
:rocksdb::ColumnFamilyDescriptor const bool &blocking)
{ {
descriptor.name, database::options(descriptor.options) database &d(*c.d);
rocksdb::FlushOptions opts;
opts.wait = blocking;
log.debug("'%s':'%s' @%lu FLUSH",
name(d),
name(c),
sequence(d));
throw_on_error
{
d.d->Flush(opts, c)
};
} }
void
ircd::db::drop(database::column &c)
{
if(!c.handle)
return;
throw_on_error
{
c.d->d->DropColumnFamily(c.handle.get())
};
}
uint32_t
ircd::db::id(const database::column &c)
{
if(!c.handle)
return -1;
return c.handle->GetID();
}
const std::string &
ircd::db::name(const database::column &c)
{
return c.name;
}
//
// database::column
//
ircd::db::database::column::column(database *const &d,
const database::descriptor &descriptor)
:rocksdb::ColumnFamilyDescriptor
(
descriptor.name, database::options{descriptor.options}
)
,d{d} ,d{d}
,key_type{descriptor.type.first} ,key_type{descriptor.type.first}
,mapped_type{descriptor.type.second} ,mapped_type{descriptor.type.second}
,descriptor{std::move(descriptor)} ,descriptor{descriptor}
,cmp{d, this->descriptor.cmp} ,cmp{d, this->descriptor.cmp}
,prefix{d, this->descriptor.prefix} ,prefix{d, this->descriptor.prefix}
,handle ,handle
@ -817,8 +913,6 @@ ircd::db::database::column::column(database *const &d,
} }
} }
{ {
assert(d->columns.count(this->name) == 0);
if(!this->descriptor.cmp.less) if(!this->descriptor.cmp.less)
{ {
if(key_type == typeid(string_view)) if(key_type == typeid(string_view))
@ -851,6 +945,8 @@ ircd::db::database::column::column(database *const &d,
ircd::db::database::column::~column() ircd::db::database::column::~column()
noexcept noexcept
{ {
if(handle)
flush(*this, false);
} }
ircd::db::database::column::operator ircd::db::database::column::operator
@ -879,33 +975,6 @@ const
return handle.get(); return handle.get();
} }
void
ircd::db::drop(database::column &c)
{
if(!c.handle)
return;
throw_on_error
{
c.d->d->DropColumnFamily(c.handle.get())
};
}
uint32_t
ircd::db::id(const database::column &c)
{
if(!c.handle)
return -1;
return c.handle->GetID();
}
const std::string &
ircd::db::name(const database::column &c)
{
return c.name;
}
const std::string & const std::string &
ircd::db::name(const database &d) ircd::db::name(const database &d)
{ {
@ -1735,7 +1804,7 @@ ircd::db::row::row(database &d,
std::transform(begin(d.columns), end(d.columns), begin(colptr), [&colnames] std::transform(begin(d.columns), end(d.columns), begin(colptr), [&colnames]
(const auto &p) (const auto &p)
{ {
return p.second.get(); return p.get();
}); });
else else
std::transform(begin(colnames), end(colnames), begin(colptr), [&d] std::transform(begin(colnames), end(colnames), begin(colptr), [&d]
@ -1933,6 +2002,13 @@ ircd::db::file_count(column &column)
return cfm.file_count; return cfm.file_count;
} }
uint32_t
ircd::db::id(const column &column)
{
const database::column &c(column);
return id(c);
}
const std::string & const std::string &
ircd::db::name(const column &column) ircd::db::name(const column &column)
{ {
@ -1944,20 +2020,8 @@ void
ircd::db::flush(column &column, ircd::db::flush(column &column,
const bool &blocking) const bool &blocking)
{ {
database &d(column);
database::column &c(column); database::column &c(column);
flush(c, blocking);
rocksdb::FlushOptions opts;
opts.wait = blocking;
log.debug("'%s':'%s' @%lu FLUSH",
name(d),
name(c),
sequence(d));
throw_on_error
{
d.d->Flush(opts, c)
};
} }
void void
@ -2423,11 +2487,6 @@ ircd::db::merge_operator(const string_view &key,
// writebatch // writebatch
// //
namespace ircd::db
{
template<size_t SIZE> const char *info(char (&buf)[SIZE], const rocksdb::WriteBatch &);
}
void void
ircd::db::append(rocksdb::WriteBatch &batch, ircd::db::append(rocksdb::WriteBatch &batch,
const cell::delta &delta) const cell::delta &delta)
@ -2475,11 +2534,10 @@ ircd::db::commit(database &d,
rocksdb::WriteBatch &batch, rocksdb::WriteBatch &batch,
const rocksdb::WriteOptions &opts) const rocksdb::WriteOptions &opts)
{ {
char info_buf[96];
log.debug("'%s' @%lu COMMIT %s", log.debug("'%s' @%lu COMMIT %s",
d.name, d.name,
sequence(d), sequence(d),
info(info_buf, batch)); debug(batch));
throw_on_error throw_on_error
{ {
@ -2487,12 +2545,16 @@ ircd::db::commit(database &d,
}; };
} }
template<size_t SIZE> std::string
const char * ircd::db::debug(const rocksdb::WriteBatch &batch)
ircd::db::info(char (&buf)[SIZE],
const rocksdb::WriteBatch &batch)
{ {
snprintf(buf, SIZE, "%d deltas; size: %zuB :%s%s%s%s%s%s%s%s%s", std::string ret;
ret.resize(511, char());
const auto size
{
snprintf(const_cast<char *>(ret.data()), ret.size() + 1,
"%d deltas; size: %zuB :%s%s%s%s%s%s%s%s%s",
batch.Count(), batch.Count(),
batch.GetDataSize(), batch.GetDataSize(),
batch.HasPut()? " PUT" : "", batch.HasPut()? " PUT" : "",
@ -2503,9 +2565,28 @@ ircd::db::info(char (&buf)[SIZE],
batch.HasBeginPrepare()? " BEGIN_PREPARE" : "", batch.HasBeginPrepare()? " BEGIN_PREPARE" : "",
batch.HasEndPrepare()? " END_PREPARE" : "", batch.HasEndPrepare()? " END_PREPARE" : "",
batch.HasCommit()? " COMMIT" : "", batch.HasCommit()? " COMMIT" : "",
batch.HasRollback()? " ROLLBACK" : ""); batch.HasRollback()? " ROLLBACK" : "")
};
return buf; ret.resize(size);
return ret;
}
bool
ircd::db::has(const rocksdb::WriteBatch &wb,
const op &op)
{
switch(op)
{
case op::GET: assert(0); return false;
case op::SET: return wb.HasPut();
case op::MERGE: return wb.HasMerge();
case op::DELETE: return wb.HasDelete();
case op::DELETE_RANGE: return wb.HasDeleteRange();
case op::SINGLE_DELETE: return wb.HasSingleDelete();
}
return false;
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -2693,7 +2774,7 @@ std::vector<std::string>
ircd::db::column_names(const std::string &path, ircd::db::column_names(const std::string &path,
const std::string &options) const std::string &options)
{ {
return column_names(path, database::options(options)); return column_names(path, database::options{options});
} }
std::vector<std::string> std::vector<std::string>
@ -2868,6 +2949,39 @@ const
// Misc // Misc
// //
template<class... args>
rocksdb::DBOptions
ircd::db::make_dbopts(const std::string &optstr,
args&&... a)
{
std::string _optstr(optstr);
return make_dbopts(_optstr, std::forward<args>(a)...);
}
rocksdb::DBOptions
ircd::db::make_dbopts(std::string &optstr,
bool *const read_only,
bool *const fsck)
{
// RocksDB doesn't parse a read_only option, so we allow that to be added
// to open the database as read_only and then remove that from the string.
if(read_only)
*read_only = optstr_find_and_remove(optstr, "read_only=true;"s);
// We also allow the user to specify fsck=true to run a repair operation on
// the db. This may be expensive to do by default every startup.
if(fsck)
*fsck = optstr_find_and_remove(optstr, "fsck=true;"s);
// Generate RocksDB options from string
rocksdb::DBOptions opts
{
database::options(optstr)
};
return opts;
}
bool bool
ircd::db::optstr_find_and_remove(std::string &optstr, ircd::db::optstr_find_and_remove(std::string &optstr,
const std::string &what) const std::string &what)
@ -3146,14 +3260,30 @@ ircd::db::reflect(const pos &pos)
{ {
switch(pos) switch(pos)
{ {
case pos::NEXT: return "NEXT"s; case pos::NEXT: return "NEXT";
case pos::PREV: return "PREV"s; case pos::PREV: return "PREV";
case pos::FRONT: return "FRONT"s; case pos::FRONT: return "FRONT";
case pos::BACK: return "BACK"s; case pos::BACK: return "BACK";
case pos::END: return "END"s; case pos::END: return "END";
} }
return "?????"s; return "?????";
}
ircd::string_view
ircd::db::reflect(const op &op)
{
switch(op)
{
case op::GET: return "GET";
case op::SET: return "SET";
case op::MERGE: return "MERGE";
case op::DELETE_RANGE: return "DELETE_RANGE";
case op::DELETE: return "DELETE";
case op::SINGLE_DELETE: return "SINGLE_DELETE";
}
return "?????";
} }
bool bool