mirror of
https://github.com/matrix-construct/construct
synced 2024-11-16 15:00:51 +01:00
ircd::db: Add parallel read suite to internal interface.
This commit is contained in:
parent
4a1f04823f
commit
fc9c952ba8
2 changed files with 94 additions and 0 deletions
91
ircd/db.cc
91
ircd/db.cc
|
@ -7964,6 +7964,97 @@ ircd::db::_seek(database::column &c,
|
|||
return ret;
|
||||
}
|
||||
|
||||
//
|
||||
// parallel read suite
|
||||
//
|
||||
|
||||
namespace ircd::db
|
||||
{
|
||||
static void _seek(const vector_view<_read_op> &, const vector_view<rocksdb::Status> &, const vector_view<rocksdb::PinnableSlice> &, const rocksdb::ReadOptions &);
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::db::_read(const vector_view<_read_op> &op,
|
||||
const rocksdb::ReadOptions &ropts,
|
||||
const _read_closure &closure)
|
||||
{
|
||||
assert(op.size() >= 1);
|
||||
assert(op.size() <= IOV_MAX);
|
||||
const size_t &num
|
||||
{
|
||||
op.size()
|
||||
};
|
||||
|
||||
rocksdb::Status status[num];
|
||||
rocksdb::PinnableSlice val[num];
|
||||
_seek(op, {status, num}, {val, num}, ropts);
|
||||
if(closure) for(size_t i(0); i < op.size(); ++i)
|
||||
{
|
||||
const column::delta &delta
|
||||
{
|
||||
std::get<1>(op[i]), slice(val[i])
|
||||
};
|
||||
|
||||
if(!closure(std::get<0>(op[i]), delta, status[i]))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::db::_seek(const vector_view<_read_op> &op,
|
||||
const vector_view<rocksdb::Status> &ret,
|
||||
const vector_view<rocksdb::PinnableSlice> &val,
|
||||
const rocksdb::ReadOptions &ropts)
|
||||
{
|
||||
assert(ret.size() == op.size());
|
||||
assert(ret.size() == val.size());
|
||||
|
||||
const ctx::stack_usage_assertion sua;
|
||||
const ctx::uninterruptible::nothrow ui;
|
||||
|
||||
assert(op.size() >= 1);
|
||||
database &d(std::get<0>(op[0]));
|
||||
const size_t &num
|
||||
{
|
||||
op.size()
|
||||
};
|
||||
|
||||
rocksdb::Slice key[num];
|
||||
std::transform(begin(op), end(op), key, []
|
||||
(const auto &op)
|
||||
{
|
||||
return slice(std::get<1>(op));
|
||||
});
|
||||
|
||||
rocksdb::ColumnFamilyHandle *cf[num];
|
||||
std::transform(begin(op), end(op), cf, []
|
||||
(auto &op_)
|
||||
{
|
||||
auto &op(const_cast<_read_op &>(op_));
|
||||
database::column &c(std::get<column>(op));
|
||||
return static_cast<rocksdb::ColumnFamilyHandle *>(c);
|
||||
});
|
||||
|
||||
#ifdef RB_DEBUG_DB_SEEK
|
||||
const ircd::timer timer;
|
||||
#endif
|
||||
|
||||
d.d->MultiGet(ropts, num, cf, key, val.data(), ret.data());
|
||||
|
||||
#ifdef RB_DEBUG_DB_SEEK
|
||||
log::debug
|
||||
{
|
||||
log, "[%s] %lu:%lu SEEK parallel:%zu in %ld$us",
|
||||
name(d),
|
||||
sequence(d),
|
||||
sequence(opts.snapshot),
|
||||
ret.size(),
|
||||
timer.at<microseconds>().count(),
|
||||
};
|
||||
#endif
|
||||
}
|
||||
|
||||
//
|
||||
// iterator seek suite
|
||||
|
|
|
@ -121,6 +121,9 @@ namespace ircd::db
|
|||
std::pair<string_view, string_view> operator*(const rocksdb::Iterator &);
|
||||
|
||||
// [GET] read suite
|
||||
using _read_op = std::tuple<column, string_view>;
|
||||
using _read_closure = std::function<bool (column &, const column::delta &, const rocksdb::Status &)>;
|
||||
bool _read(const vector_view<_read_op> &, const rocksdb::ReadOptions &, const _read_closure & = {});
|
||||
size_t _read(std::nothrow_t, column &, const string_view &key, const rocksdb::ReadOptions &, const column::view_closure & = {});
|
||||
size_t _read(column &, const string_view &key, const rocksdb::ReadOptions &, const column::view_closure & = {});
|
||||
|
||||
|
|
Loading…
Reference in a new issue