From fc9c952ba86f6fc9ae55cecb153809180ff517c3 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 8 Jun 2020 18:19:55 -0700 Subject: [PATCH] ircd::db: Add parallel read suite to internal interface. --- ircd/db.cc | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ ircd/db.h | 3 ++ 2 files changed, 94 insertions(+) diff --git a/ircd/db.cc b/ircd/db.cc index 427730917..1b1913796 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -7964,6 +7964,97 @@ ircd::db::_seek(database::column &c, return ret; } +// +// parallel read suite +// + +namespace ircd::db +{ + static void _seek(const vector_view<_read_op> &, const vector_view &, const vector_view &, const rocksdb::ReadOptions &); +} + +bool +ircd::db::_read(const vector_view<_read_op> &op, + const rocksdb::ReadOptions &ropts, + const _read_closure &closure) +{ + assert(op.size() >= 1); + assert(op.size() <= IOV_MAX); + const size_t &num + { + op.size() + }; + + rocksdb::Status status[num]; + rocksdb::PinnableSlice val[num]; + _seek(op, {status, num}, {val, num}, ropts); + if(closure) for(size_t i(0); i < op.size(); ++i) + { + const column::delta &delta + { + std::get<1>(op[i]), slice(val[i]) + }; + + if(!closure(std::get<0>(op[i]), delta, status[i])) + return false; + } + + return true; +} + +void +ircd::db::_seek(const vector_view<_read_op> &op, + const vector_view &ret, + const vector_view &val, + const rocksdb::ReadOptions &ropts) +{ + assert(ret.size() == op.size()); + assert(ret.size() == val.size()); + + const ctx::stack_usage_assertion sua; + const ctx::uninterruptible::nothrow ui; + + assert(op.size() >= 1); + database &d(std::get<0>(op[0])); + const size_t &num + { + op.size() + }; + + rocksdb::Slice key[num]; + std::transform(begin(op), end(op), key, [] + (const auto &op) + { + return slice(std::get<1>(op)); + }); + + rocksdb::ColumnFamilyHandle *cf[num]; + std::transform(begin(op), end(op), cf, [] + (auto &op_) + { + auto &op(const_cast<_read_op &>(op_)); + database::column &c(std::get(op)); + return static_cast(c); + }); + + #ifdef RB_DEBUG_DB_SEEK + const ircd::timer timer; + #endif + + d.d->MultiGet(ropts, num, cf, key, val.data(), ret.data()); + + #ifdef RB_DEBUG_DB_SEEK + log::debug + { + log, "[%s] %lu:%lu SEEK parallel:%zu in %ld$us", + name(d), + sequence(d), + sequence(opts.snapshot), + ret.size(), + timer.at().count(), + }; + #endif +} // // iterator seek suite diff --git a/ircd/db.h b/ircd/db.h index 9ba8ef207..96070757a 100644 --- a/ircd/db.h +++ b/ircd/db.h @@ -121,6 +121,9 @@ namespace ircd::db std::pair operator*(const rocksdb::Iterator &); // [GET] read suite + using _read_op = std::tuple; + using _read_closure = std::function; + bool _read(const vector_view<_read_op> &, const rocksdb::ReadOptions &, const _read_closure & = {}); size_t _read(std::nothrow_t, column &, const string_view &key, const rocksdb::ReadOptions &, const column::view_closure & = {}); size_t _read(column &, const string_view &key, const rocksdb::ReadOptions &, const column::view_closure & = {});