mirror of
https://github.com/matrix-construct/construct
synced 2024-12-28 00:14:07 +01:00
ircd::db: Fix seek offload procedure.
This commit is contained in:
parent
fa41c1db5f
commit
9f12672567
1 changed files with 80 additions and 19 deletions
99
ircd/db.cc
99
ircd/db.cc
|
@ -2081,6 +2081,12 @@ ircd::db::seek(database::column &c,
|
|||
seek(c, p, opts, it);
|
||||
}
|
||||
|
||||
//
|
||||
// Seek with offload-safety in case of blocking IO.
|
||||
//
|
||||
// The options for an iterator cannot be changed after the iterator is created.
|
||||
// This slightly complicates our toggling between blocking and non-blocking queries.
|
||||
//
|
||||
template<class pos>
|
||||
void
|
||||
ircd::db::seek(database::column &c,
|
||||
|
@ -2089,32 +2095,82 @@ ircd::db::seek(database::column &c,
|
|||
std::unique_ptr<rocksdb::Iterator> &it)
|
||||
{
|
||||
database &d(*c.d);
|
||||
rocksdb::ColumnFamilyHandle *const &cf(c);
|
||||
|
||||
// Start with a non-blocking query
|
||||
if(!it || opts.read_tier == BLOCKING)
|
||||
{
|
||||
opts.read_tier = NON_BLOCKING;
|
||||
it.reset(d.d->NewIterator(opts, c));
|
||||
}
|
||||
// The ReadOptions created by make_opts(gopts) always sets NON_BLOCKING
|
||||
// mode. The user should never touch this. Only this function will ever
|
||||
// deal with iterators in BLOCKING mode.
|
||||
assert(opts.read_tier == NON_BLOCKING);
|
||||
|
||||
if(!it)
|
||||
it.reset(d.d->NewIterator(opts, cf));
|
||||
|
||||
// Start with a non-blocking query.
|
||||
_seek_(*it, p);
|
||||
if(it->status().IsIncomplete())
|
||||
|
||||
// Indicate a cache miss and blocking is required.
|
||||
if(!it->status().IsIncomplete())
|
||||
{
|
||||
// DB cache miss: reset the iterator to blocking mode and offload it
|
||||
opts.read_tier = BLOCKING;
|
||||
it.reset(d.d->NewIterator(opts, c));
|
||||
ctx::offload([&it, &p]
|
||||
{
|
||||
_seek_(*it, p);
|
||||
});
|
||||
log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE HIT %s",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
valid(*it),
|
||||
it->status().ToString());
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("'%s':'%s' @%lu SEEK [valid: %d] [%s]",
|
||||
// DB cache miss: create a blocking iterator and offload it.
|
||||
rocksdb::ReadOptions blocking_opts(opts);
|
||||
blocking_opts.fill_cache = true;
|
||||
blocking_opts.read_tier = BLOCKING;
|
||||
std::unique_ptr<rocksdb::Iterator> blocking_it
|
||||
{
|
||||
d.d->NewIterator(blocking_opts, cf)
|
||||
};
|
||||
|
||||
ctx::offload([&blocking_it, &it, &p]
|
||||
{
|
||||
// When the non-blocking iterator hit its cache miss in the middle of an
|
||||
// iteration we have to copy its position to the blocking iterator first
|
||||
// and then make the next query. TODO: this can be avoided when 'p' is a
|
||||
// slice and not an increment.
|
||||
if(valid(*it))
|
||||
_seek_(*blocking_it, it->key());
|
||||
|
||||
_seek_(*blocking_it, p);
|
||||
});
|
||||
|
||||
// When the blocking iterator comes back invalid the result is propagated
|
||||
if(!valid(*blocking_it))
|
||||
{
|
||||
it.reset(rocksdb::NewErrorIterator(blocking_it->status()));
|
||||
log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
valid(*it),
|
||||
it->status().ToString());
|
||||
return;
|
||||
}
|
||||
|
||||
// When the blocking iterator comes back valid the result still has to be
|
||||
// properly transferred back to the user's non-blocking iterator. RocksDB
|
||||
// seems to be forcing us to recreate the iterator after it failed with the
|
||||
// status IsIncomplete(). Regardless of reuse, a non-blocking seek must occur
|
||||
// to match this iterator with the result -- such a seek may fail again if
|
||||
// the cache has been hosed between the point the offload took place and this
|
||||
// seek! To properly handle this case we reenter this seek() function and
|
||||
// enjoy the safety of offloading again for this edge case.
|
||||
it.reset(nullptr);
|
||||
log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
valid(*it),
|
||||
opts.read_tier == BLOCKING? "CACHE MISS"s : "CACHE HIT"s);
|
||||
valid(*blocking_it),
|
||||
blocking_it->status().ToString());
|
||||
|
||||
seek(c, blocking_it->key(), opts, it);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -2338,11 +2394,16 @@ ircd::db::make_opts(const gopts &opts,
|
|||
{
|
||||
rocksdb::ReadOptions ret;
|
||||
ret.snapshot = opts.snapshot;
|
||||
ret.read_tier = NON_BLOCKING;
|
||||
//ret.total_order_seek = true;
|
||||
//ret.iterate_upper_bound = nullptr;
|
||||
|
||||
if(iterator)
|
||||
{
|
||||
ret.fill_cache = false;
|
||||
else
|
||||
ret.fill_cache = true;
|
||||
ret.readahead_size = 4_KiB;
|
||||
}
|
||||
else ret.fill_cache = true;
|
||||
|
||||
for(const auto &opt : opts) switch(opt.first)
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue