From b27a2a6423aa31d7aae25cfed58c248516be0c59 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 18 Sep 2017 19:17:36 -0700 Subject: [PATCH] ircd::db: Support constant-time column lookup by id; various cleanup/fixes. --- include/ircd/db/column.h | 1 + include/ircd/db/database.h | 10 +- include/ircd/db/delta.h | 3 +- ircd/db.cc | 392 ++++++++++++++++++++++++------------- 4 files changed, 271 insertions(+), 135 deletions(-) diff --git a/include/ircd/db/column.h b/include/ircd/db/column.h index 7456efc39..26ab12e7b 100644 --- a/include/ircd/db/column.h +++ b/include/ircd/db/column.h @@ -62,6 +62,7 @@ namespace ircd::db // Information about a column const std::string &name(const column &); + uint32_t id(const column &); size_t file_count(column &); size_t bytes(column &); diff --git a/include/ircd/db/database.h b/include/ircd/db/database.h index e60f8ca77..d1e6190d6 100644 --- a/include/ircd/db/database.h +++ b/include/ircd/db/database.h @@ -68,12 +68,14 @@ struct ircd::db::database std::string name; std::string path; + std::string optstr; std::shared_ptr logs; std::shared_ptr stats; std::shared_ptr events; std::shared_ptr mergeop; std::shared_ptr cache; - std::map> columns; + std::map> column_names; + std::vector> columns; custom_ptr d; unique_const_iterator dbs_it; @@ -81,8 +83,10 @@ struct ircd::db::database operator const rocksdb::DB &() const { return *d; } operator rocksdb::DB &() { return *d; } - const column &operator[](const string_view &) const; - column &operator[](const string_view &); + const column &operator[](const uint32_t &id) const; + const column &operator[](const string_view &name) const; + column &operator[](const uint32_t &id); + column &operator[](const string_view &name); // [SET] Perform operations in a sequence as a single transaction. void operator()(const sopts &, const delta *const &begin, const delta *const &end); diff --git a/include/ircd/db/delta.h b/include/ircd/db/delta.h index 57c9b7138..9fdf52b7b 100644 --- a/include/ircd/db/delta.h +++ b/include/ircd/db/delta.h @@ -39,7 +39,8 @@ namespace ircd::db // Indicates an op uses both a key and value for its operation. Some only use // a key name so an empty value argument in a delta is okay when false. - bool value_required(const op &op); + bool value_required(const op &); + string_view reflect(const op &); } /// Update a database cell without `cell`, `column` or row `references`. diff --git a/ircd/db.cc b/ircd/db.cc index 77ff34da8..67bfeb112 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -55,6 +55,8 @@ namespace ircd::db rocksdb::WriteOptions make_opts(const sopts &); rocksdb::ReadOptions make_opts(const gopts &, const bool &iterator = false); bool optstr_find_and_remove(std::string &optstr, const std::string &what); + rocksdb::DBOptions make_dbopts(std::string &optstr, bool *read_only = nullptr, bool *fsck = nullptr); + template rocksdb::DBOptions make_dbopts(const std::string &, args&&...); // Validation functors bool valid(const rocksdb::Iterator &); @@ -77,6 +79,8 @@ namespace ircd::db std::pair operator*(const rocksdb::Iterator &); // [SET] writebatch suite + std::string debug(const rocksdb::WriteBatch &); + bool has(const rocksdb::WriteBatch &, const op &); void commit(database &, rocksdb::WriteBatch &, const rocksdb::WriteOptions &); void commit(database &, rocksdb::WriteBatch &, const sopts &); void append(rocksdb::WriteBatch &, column &, const column::delta &delta); @@ -214,7 +218,7 @@ struct ircd::db::database::column database *d; std::type_index key_type; std::type_index mapped_type; - struct descriptor descriptor; + database::descriptor descriptor; comparator cmp; prefix_transform prefix; custom_ptr handle; @@ -228,13 +232,15 @@ struct ircd::db::database::column operator rocksdb::ColumnFamilyHandle *(); operator database &(); - explicit column(database *const &d, struct descriptor); + explicit column(database *const &d, const database::descriptor &); column() = delete; column(column &&) = delete; column(const column &) = delete; column &operator=(column &&) = delete; column &operator=(const column &) = delete; ~column() noexcept; + + friend void flush(column &, const bool &blocking); }; /////////////////////////////////////////////////////////////////////////////// @@ -352,7 +358,7 @@ ircd::db::database::database(std::string name, ircd::db::database::database(std::string name, std::string optstr, - description description) + const description &description) try :name { @@ -362,6 +368,10 @@ try { db::path(this->name) } +,optstr +{ + std::move(optstr) +} ,logs { std::make_shared(this) @@ -385,28 +395,33 @@ try const auto lru_cache_size{64_MiB}; return rocksdb::NewLRUCache(lru_cache_size); }()} -,d{[this, &description, &optstr] +,column_names{[this, &description] +{ + // Existing columns + const auto opts(make_dbopts(std::string(this->optstr))); + + std::set existing; + for(auto &column_name : db::column_names(path, opts)) + existing.emplace(std::move(column_name)); + + decltype(this->column_names) ret; + for(const auto &descriptor : description) + { + existing.erase(descriptor.name); + ret.emplace(descriptor.name, -1); + } + + for(const auto &remain : existing) + throw error("Failed to describe existing column '%s'", remain); + + return ret; +}()} +,d{[this, &description] () -> custom_ptr { - // RocksDB doesn't parse a read_only option, so we allow that to be added - // to open the database as read_only and then remove that from the string. - const bool read_only - { - optstr_find_and_remove(optstr, "read_only=true;"s) - }; - - // We also allow the user to specify fsck=true to run a repair operation on - // the db. This may be expensive to do by default every startup. - const bool fsck - { - optstr_find_and_remove(optstr, "fsck=true;"s) - }; - - // Generate RocksDB options from string - rocksdb::DBOptions opts - { - options(optstr) - }; + bool fsck{false}; + bool read_only{false}; + auto opts(make_dbopts(this->optstr, &read_only, &fsck)); // Setup sundry opts.create_if_missing = true; @@ -435,36 +450,25 @@ try opts.row_cache = this->cache; // Setup column families - for(auto &desc : description) + for(const auto &desc : description) { - const auto c(std::make_shared(this, std::move(desc))); - columns.emplace(c->name, c); + const auto c + { + std::make_shared(this, desc) + }; + + columns.emplace_back(c); } - // Existing columns - const auto column_names - { - db::column_names(path, opts) - }; - - // Specified column descriptors have to describe all existing columns - for(const auto &name : column_names) - if(!columns.count(name)) - throw error("Failed to describe existing column '%s'", name); - // Setup the database closer. const auto deleter([this](rocksdb::DB *const d) noexcept { - throw_on_error + const auto seq { - d->SyncWAL() // blocking + d->GetLatestSequenceNumber() }; - columns.clear(); - //rocksdb::CancelAllBackgroundWork(d, true); // true = blocking - //throw_on_error(d->PauseBackgroundWork()); - const auto seq(d->GetLatestSequenceNumber()); delete d; log.info("'%s': closed database @ `%s' seq[%zu]", @@ -478,9 +482,9 @@ try std::vector handles; std::vector columns(this->columns.size()); std::transform(begin(this->columns), end(this->columns), begin(columns), [] - (const auto &pair) + (const auto &column) { - return static_cast(*pair.second); + return static_cast(*column); }); if(fsck && fs::is_dir(path)) @@ -517,7 +521,17 @@ try }; for(const auto &handle : handles) - this->columns.at(handle->GetName())->handle.reset(handle); + { + this->columns.at(handle->GetID())->handle.reset(handle); + this->column_names.at(handle->GetName()) = handle->GetID(); + } + + for(size_t i(0); i < this->columns.size(); ++i) + if(db::id(*this->columns[i]) != i) + throw error("Columns misaligned: expecting id[%zd] got id[%u] '%s'", + i, + db::id(*this->columns[i]), + db::name(*this->columns[i])); return { ptr, deleter }; }()} @@ -543,6 +557,8 @@ catch(const std::exception &e) ircd::db::database::~database() noexcept { + //rocksdb::CancelAllBackgroundWork(d, true); // true = blocking + //throw_on_error(d->PauseBackgroundWork()); const auto background_errors { property(*this, rocksdb::DB::Properties::kBackgroundErrors) @@ -552,6 +568,12 @@ noexcept name, path, background_errors); + + columns.clear(); + log.debug("'%s': closed columns; synchronizing to hardware...", + name); + + sync(*this); } void @@ -614,28 +636,53 @@ ircd::db::database::operator()(const sopts &sopts, ircd::db::database::column & ircd::db::database::operator[](const string_view &name) +{ + const auto it{column_names.find(name)}; + if(unlikely(it == std::end(column_names))) + throw schema_error("'%s': column '%s' is not available or specified in schema", + this->name, + name); + + return operator[](it->second); +} + +ircd::db::database::column & +ircd::db::database::operator[](const uint32_t &id) try { - return *columns.at(name); + return *columns.at(id); } catch(const std::out_of_range &e) { - throw schema_error("'%s': column '%s' is not available or specified in schema", + throw schema_error("'%s': column id[%u] is not available or specified in schema", this->name, - name); + id); } const ircd::db::database::column & ircd::db::database::operator[](const string_view &name) +const +{ + const auto it{column_names.find(name)}; + if(unlikely(it == std::end(column_names))) + throw schema_error("'%s': column '%s' is not available or specified in schema", + this->name, + name); + + return operator[](it->second); +} + +const ircd::db::database::column & +ircd::db::database::operator[](const uint32_t &id) const try { - return *columns.at(name); + return *columns.at(id); } catch(const std::out_of_range &e) { - throw schema_error("'%s': column '%s' is not available or specified in schema", + throw schema_error("'%s': column id[%u] is not available or specified in schema", this->name, - name); + id); } ircd::db::database & @@ -796,16 +843,65 @@ const // database::column // -ircd::db::database::column::column(database *const &d, - struct descriptor descriptor) -:rocksdb::ColumnFamilyDescriptor +void +ircd::db::flush(database::column &c, + const bool &blocking) { - descriptor.name, database::options(descriptor.options) + database &d(*c.d); + rocksdb::FlushOptions opts; + opts.wait = blocking; + log.debug("'%s':'%s' @%lu FLUSH", + name(d), + name(c), + sequence(d)); + + throw_on_error + { + d.d->Flush(opts, c) + }; } + +void +ircd::db::drop(database::column &c) +{ + if(!c.handle) + return; + + throw_on_error + { + c.d->d->DropColumnFamily(c.handle.get()) + }; +} + +uint32_t +ircd::db::id(const database::column &c) +{ + if(!c.handle) + return -1; + + return c.handle->GetID(); +} + +const std::string & +ircd::db::name(const database::column &c) +{ + return c.name; +} + +// +// database::column +// + +ircd::db::database::column::column(database *const &d, + const database::descriptor &descriptor) +:rocksdb::ColumnFamilyDescriptor +( + descriptor.name, database::options{descriptor.options} +) ,d{d} ,key_type{descriptor.type.first} ,mapped_type{descriptor.type.second} -,descriptor{std::move(descriptor)} +,descriptor{descriptor} ,cmp{d, this->descriptor.cmp} ,prefix{d, this->descriptor.prefix} ,handle @@ -817,8 +913,6 @@ ircd::db::database::column::column(database *const &d, } } { - assert(d->columns.count(this->name) == 0); - if(!this->descriptor.cmp.less) { if(key_type == typeid(string_view)) @@ -851,6 +945,8 @@ ircd::db::database::column::column(database *const &d, ircd::db::database::column::~column() noexcept { + if(handle) + flush(*this, false); } ircd::db::database::column::operator @@ -879,33 +975,6 @@ const return handle.get(); } -void -ircd::db::drop(database::column &c) -{ - if(!c.handle) - return; - - throw_on_error - { - c.d->d->DropColumnFamily(c.handle.get()) - }; -} - -uint32_t -ircd::db::id(const database::column &c) -{ - if(!c.handle) - return -1; - - return c.handle->GetID(); -} - -const std::string & -ircd::db::name(const database::column &c) -{ - return c.name; -} - const std::string & ircd::db::name(const database &d) { @@ -1735,7 +1804,7 @@ ircd::db::row::row(database &d, std::transform(begin(d.columns), end(d.columns), begin(colptr), [&colnames] (const auto &p) { - return p.second.get(); + return p.get(); }); else std::transform(begin(colnames), end(colnames), begin(colptr), [&d] @@ -1933,6 +2002,13 @@ ircd::db::file_count(column &column) return cfm.file_count; } +uint32_t +ircd::db::id(const column &column) +{ + const database::column &c(column); + return id(c); +} + const std::string & ircd::db::name(const column &column) { @@ -1944,20 +2020,8 @@ void ircd::db::flush(column &column, const bool &blocking) { - database &d(column); database::column &c(column); - - rocksdb::FlushOptions opts; - opts.wait = blocking; - log.debug("'%s':'%s' @%lu FLUSH", - name(d), - name(c), - sequence(d)); - - throw_on_error - { - d.d->Flush(opts, c) - }; + flush(c, blocking); } void @@ -2423,11 +2487,6 @@ ircd::db::merge_operator(const string_view &key, // writebatch // -namespace ircd::db -{ - template const char *info(char (&buf)[SIZE], const rocksdb::WriteBatch &); -} - void ircd::db::append(rocksdb::WriteBatch &batch, const cell::delta &delta) @@ -2475,11 +2534,10 @@ ircd::db::commit(database &d, rocksdb::WriteBatch &batch, const rocksdb::WriteOptions &opts) { - char info_buf[96]; log.debug("'%s' @%lu COMMIT %s", d.name, sequence(d), - info(info_buf, batch)); + debug(batch)); throw_on_error { @@ -2487,25 +2545,48 @@ ircd::db::commit(database &d, }; } -template -const char * -ircd::db::info(char (&buf)[SIZE], - const rocksdb::WriteBatch &batch) +std::string +ircd::db::debug(const rocksdb::WriteBatch &batch) { - snprintf(buf, SIZE, "%d deltas; size: %zuB :%s%s%s%s%s%s%s%s%s", - batch.Count(), - batch.GetDataSize(), - batch.HasPut()? " PUT" : "", - batch.HasDelete()? " DELETE" : "", - batch.HasSingleDelete()? " SINGLE_DELETE" : "", - batch.HasDeleteRange()? " DELETE_RANGE" : "", - batch.HasMerge()? " MERGE" : "", - batch.HasBeginPrepare()? " BEGIN_PREPARE" : "", - batch.HasEndPrepare()? " END_PREPARE" : "", - batch.HasCommit()? " COMMIT" : "", - batch.HasRollback()? " ROLLBACK" : ""); + std::string ret; + ret.resize(511, char()); - return buf; + const auto size + { + snprintf(const_cast(ret.data()), ret.size() + 1, + "%d deltas; size: %zuB :%s%s%s%s%s%s%s%s%s", + batch.Count(), + batch.GetDataSize(), + batch.HasPut()? " PUT" : "", + batch.HasDelete()? " DELETE" : "", + batch.HasSingleDelete()? " SINGLE_DELETE" : "", + batch.HasDeleteRange()? " DELETE_RANGE" : "", + batch.HasMerge()? " MERGE" : "", + batch.HasBeginPrepare()? " BEGIN_PREPARE" : "", + batch.HasEndPrepare()? " END_PREPARE" : "", + batch.HasCommit()? " COMMIT" : "", + batch.HasRollback()? " ROLLBACK" : "") + }; + + ret.resize(size); + return ret; +} + +bool +ircd::db::has(const rocksdb::WriteBatch &wb, + const op &op) +{ + switch(op) + { + case op::GET: assert(0); return false; + case op::SET: return wb.HasPut(); + case op::MERGE: return wb.HasMerge(); + case op::DELETE: return wb.HasDelete(); + case op::DELETE_RANGE: return wb.HasDeleteRange(); + case op::SINGLE_DELETE: return wb.HasSingleDelete(); + } + + return false; } /////////////////////////////////////////////////////////////////////////////// @@ -2693,7 +2774,7 @@ std::vector ircd::db::column_names(const std::string &path, const std::string &options) { - return column_names(path, database::options(options)); + return column_names(path, database::options{options}); } std::vector @@ -2868,6 +2949,39 @@ const // Misc // +template +rocksdb::DBOptions +ircd::db::make_dbopts(const std::string &optstr, + args&&... a) +{ + std::string _optstr(optstr); + return make_dbopts(_optstr, std::forward(a)...); +} + +rocksdb::DBOptions +ircd::db::make_dbopts(std::string &optstr, + bool *const read_only, + bool *const fsck) +{ + // RocksDB doesn't parse a read_only option, so we allow that to be added + // to open the database as read_only and then remove that from the string. + if(read_only) + *read_only = optstr_find_and_remove(optstr, "read_only=true;"s); + + // We also allow the user to specify fsck=true to run a repair operation on + // the db. This may be expensive to do by default every startup. + if(fsck) + *fsck = optstr_find_and_remove(optstr, "fsck=true;"s); + + // Generate RocksDB options from string + rocksdb::DBOptions opts + { + database::options(optstr) + }; + + return opts; +} + bool ircd::db::optstr_find_and_remove(std::string &optstr, const std::string &what) @@ -3146,14 +3260,30 @@ ircd::db::reflect(const pos &pos) { switch(pos) { - case pos::NEXT: return "NEXT"s; - case pos::PREV: return "PREV"s; - case pos::FRONT: return "FRONT"s; - case pos::BACK: return "BACK"s; - case pos::END: return "END"s; + case pos::NEXT: return "NEXT"; + case pos::PREV: return "PREV"; + case pos::FRONT: return "FRONT"; + case pos::BACK: return "BACK"; + case pos::END: return "END"; } - return "?????"s; + return "?????"; +} + +ircd::string_view +ircd::db::reflect(const op &op) +{ + switch(op) + { + case op::GET: return "GET"; + case op::SET: return "SET"; + case op::MERGE: return "MERGE"; + case op::DELETE_RANGE: return "DELETE_RANGE"; + case op::DELETE: return "DELETE"; + case op::SINGLE_DELETE: return "SINGLE_DELETE"; + } + + return "?????"; } bool