mirror of
https://github.com/matrix-construct/construct
synced 2024-12-29 08:54:02 +01:00
ircd::db: Improve seek() stack.
This commit is contained in:
parent
061e1fa485
commit
d7c59f4e49
1 changed files with 160 additions and 60 deletions
216
ircd/db.cc
216
ircd/db.cc
|
@ -3206,6 +3206,9 @@ namespace ircd::db
|
|||
static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const rocksdb::Slice &);
|
||||
static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const string_view &);
|
||||
static rocksdb::Iterator &_seek_(rocksdb::Iterator &, const pos &);
|
||||
static std::unique_ptr<rocksdb::Iterator> _seek_offload(database::column &c, const rocksdb::ReadOptions &opts, const std::function<void (rocksdb::Iterator &)> &closure);
|
||||
bool _seek(database::column &, const pos &, const rocksdb::ReadOptions &, std::unique_ptr<rocksdb::Iterator> &it);
|
||||
bool _seek(database::column &, const string_view &, const rocksdb::ReadOptions &, std::unique_ptr<rocksdb::Iterator> &it);
|
||||
}
|
||||
|
||||
std::unique_ptr<rocksdb::Iterator>
|
||||
|
@ -3236,12 +3239,6 @@ ircd::db::seek(database::column &c,
|
|||
return seek(c, p, opts, it);
|
||||
}
|
||||
|
||||
//
|
||||
// Seek with offload-safety in case of blocking IO.
|
||||
//
|
||||
// The options for an iterator cannot be changed after the iterator is created.
|
||||
// This slightly complicates our toggling between blocking and non-blocking queries.
|
||||
//
|
||||
template<class pos>
|
||||
bool
|
||||
ircd::db::seek(database::column &c,
|
||||
|
@ -3249,16 +3246,32 @@ ircd::db::seek(database::column &c,
|
|||
const rocksdb::ReadOptions &opts,
|
||||
std::unique_ptr<rocksdb::Iterator> &it)
|
||||
{
|
||||
database &d(*c.d);
|
||||
rocksdb::ColumnFamilyHandle *const &cf(c);
|
||||
|
||||
// The ReadOptions created by make_opts(gopts) always sets NON_BLOCKING
|
||||
// mode. The user should never touch this. Only this function will ever
|
||||
// deal with iterators in BLOCKING mode.
|
||||
assert(opts.read_tier == NON_BLOCKING);
|
||||
|
||||
if(!it)
|
||||
{
|
||||
database &d(*c.d);
|
||||
rocksdb::ColumnFamilyHandle *const &cf(c);
|
||||
it.reset(d.d->NewIterator(opts, cf));
|
||||
}
|
||||
|
||||
return _seek(c, p, opts, it);
|
||||
}
|
||||
|
||||
//
|
||||
// Seek with offload-safety in case of blocking IO.
|
||||
//
|
||||
// The options for an iterator cannot be changed after the iterator is created.
|
||||
// This slightly complicates our toggling between blocking and non-blocking queries.
|
||||
//
|
||||
bool
|
||||
ircd::db::_seek(database::column &c,
|
||||
const string_view &p,
|
||||
const rocksdb::ReadOptions &opts,
|
||||
std::unique_ptr<rocksdb::Iterator> &it)
|
||||
{
|
||||
database &d(*c.d);
|
||||
const ircd::timer timer;
|
||||
|
||||
// Start with a non-blocking query.
|
||||
_seek_(*it, p);
|
||||
|
@ -3266,17 +3279,143 @@ ircd::db::seek(database::column &c,
|
|||
// Branch for query being fulfilled from cache
|
||||
if(!it->status().IsIncomplete())
|
||||
{
|
||||
log.debug("'%s':'%s' @%lu SEEK %s CACHE HIT %s",
|
||||
log.debug("'%s':'%s' @%lu SEEK[%zu] %s CACHE HIT %s in %ld$us",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
p.size(),
|
||||
valid(*it)? "VALID" : "INVALID",
|
||||
it->status().ToString());
|
||||
it->status().ToString(),
|
||||
timer.at<microseconds>().count());
|
||||
|
||||
return valid(*it);
|
||||
}
|
||||
|
||||
// DB cache miss: create a blocking iterator and offload it.
|
||||
const auto blocking_it
|
||||
{
|
||||
_seek_offload(c, opts, [&p]
|
||||
(rocksdb::Iterator &blocking_it)
|
||||
{
|
||||
_seek_(blocking_it, p);
|
||||
})
|
||||
};
|
||||
|
||||
// When the blocking iterator comes back invalid the result is propagated
|
||||
if(!valid(*blocking_it))
|
||||
{
|
||||
it.reset(rocksdb::NewErrorIterator(blocking_it->status()));
|
||||
log.debug("'%s':'%s' @%lu SEEK[%zu] INVALID CACHE MISS %s in %ld$us",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
p.size(),
|
||||
it->status().ToString(),
|
||||
timer.at<microseconds>().count());
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
it.reset(nullptr);
|
||||
log.debug("'%s':'%s' @%lu SEEK[%zu] VALID CACHE MISS %s in %ld$us",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
p.size(),
|
||||
blocking_it->status().ToString(),
|
||||
timer.at<microseconds>().count());
|
||||
|
||||
return seek(c, slice(blocking_it->key()), opts, it);
|
||||
}
|
||||
|
||||
//
|
||||
// Seek with offload-safety in case of blocking IO.
|
||||
//
|
||||
// The options for an iterator cannot be changed after the iterator is created.
|
||||
// This slightly complicates our toggling between blocking and non-blocking queries.
|
||||
//
|
||||
bool
|
||||
ircd::db::_seek(database::column &c,
|
||||
const pos &p,
|
||||
const rocksdb::ReadOptions &opts,
|
||||
std::unique_ptr<rocksdb::Iterator> &it)
|
||||
{
|
||||
database &d(*c.d);
|
||||
const ircd::timer timer;
|
||||
const bool valid_it
|
||||
{
|
||||
valid(*it)
|
||||
};
|
||||
|
||||
// Start with a non-blocking query.
|
||||
_seek_(*it, p);
|
||||
|
||||
// Branch for query being fulfilled from cache
|
||||
if(!it->status().IsIncomplete())
|
||||
{
|
||||
log.debug("'%s':'%s' @%lu SEEK[%s] %s->%s CACHE HIT %s in %ld$us",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
reflect(p),
|
||||
valid_it? "VALID" : "INVALID",
|
||||
valid(*it)? "VALID" : "INVALID",
|
||||
it->status().ToString(),
|
||||
timer.at<microseconds>().count());
|
||||
|
||||
return valid(*it);
|
||||
}
|
||||
|
||||
const auto blocking_it
|
||||
{
|
||||
_seek_offload(c, opts, [&valid_it, &it, &p]
|
||||
(rocksdb::Iterator &blocking_it)
|
||||
{
|
||||
if(valid_it)
|
||||
_seek_(blocking_it, it->key());
|
||||
|
||||
_seek_(blocking_it, p);
|
||||
})
|
||||
};
|
||||
|
||||
// When the blocking iterator comes back invalid the result is propagated
|
||||
if(!valid(*blocking_it))
|
||||
{
|
||||
it.reset(rocksdb::NewErrorIterator(blocking_it->status()));
|
||||
log.debug("'%s':'%s' @%lu SEEK[%s] %s->%s|INVALID CACHE MISS %s in %ld$us",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
reflect(p),
|
||||
valid_it? "VALID" : "INVALID",
|
||||
valid(*it)? "VALID" : "INVALID",
|
||||
it->status().ToString(),
|
||||
timer.at<microseconds>().count());
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
it.reset(nullptr);
|
||||
log.debug("'%s':'%s' @%lu SEEK[%s] %s->%s|VALID CACHE MISS %s in %ld$us",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
reflect(p),
|
||||
valid_it? "VALID" : "INVALID",
|
||||
valid(*it)? "VALID" : "INVALID",
|
||||
blocking_it->status().ToString(),
|
||||
timer.at<microseconds>().count());
|
||||
|
||||
return seek(c, slice(blocking_it->key()), opts, it);
|
||||
}
|
||||
|
||||
/// DB cache miss: create a blocking iterator and offload it.
|
||||
std::unique_ptr<rocksdb::Iterator>
|
||||
ircd::db::_seek_offload(database::column &c,
|
||||
const rocksdb::ReadOptions &opts,
|
||||
const std::function<void (rocksdb::Iterator &)> &closure)
|
||||
{
|
||||
database &d(*c.d);
|
||||
rocksdb::ColumnFamilyHandle *const &cf(c);
|
||||
rocksdb::ReadOptions blocking_opts(opts);
|
||||
blocking_opts.fill_cache = true;
|
||||
blocking_opts.read_tier = BLOCKING;
|
||||
|
@ -3285,52 +3424,12 @@ ircd::db::seek(database::column &c,
|
|||
d.d->NewIterator(blocking_opts, cf)
|
||||
};
|
||||
|
||||
ctx::offload([&blocking_it, &it, &p]
|
||||
ctx::offload([&closure, &blocking_it]
|
||||
{
|
||||
// When the non-blocking iterator cache missed in the middle of an
|
||||
// iteration we have to copy its position to the blocking iterator first
|
||||
// and then make the next query. In other words, two seeks, because the
|
||||
// original seek (p) may be a `pos` and not a key. TODO: this can be
|
||||
// avoided if we detect 'p' is a slice and not an increment.
|
||||
if(valid(*it))
|
||||
_seek_(*blocking_it, it->key());
|
||||
|
||||
if(!valid(*it) || valid(*blocking_it))
|
||||
_seek_(*blocking_it, p);
|
||||
closure(*blocking_it);
|
||||
});
|
||||
|
||||
// When the blocking iterator comes back invalid the result is propagated
|
||||
if(!valid(*blocking_it))
|
||||
{
|
||||
it.reset(rocksdb::NewErrorIterator(blocking_it->status()));
|
||||
log.debug("'%s':'%s' @%lu SEEK %s CACHE MISS %s",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
valid(*it)? "VALID" : "INVALID",
|
||||
it->status().ToString());
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// When the blocking iterator comes back valid the result still has to be
|
||||
// properly transferred back to the user's non-blocking iterator. RocksDB
|
||||
// seems to be forcing us to recreate the iterator after it failed with the
|
||||
// status IsIncomplete(). Regardless of reuse, a non-blocking seek must occur
|
||||
// to match this iterator with the result -- such a seek may fail again if
|
||||
// the blocking_it's data has been evicted from cache between the point the
|
||||
// offload took place and the seek for the user's iterator. That may be
|
||||
// impossible. But if it ever becomes possible, we reenter this seek()
|
||||
// function and enjoy the safety of offloading to try again.
|
||||
it.reset(nullptr);
|
||||
log.debug("'%s':'%s' @%lu SEEK %s CACHE MISS %s",
|
||||
name(d),
|
||||
name(c),
|
||||
sequence(d),
|
||||
valid(*blocking_it)? "VALID" : "INVALID",
|
||||
blocking_it->status().ToString());
|
||||
|
||||
return seek(c, blocking_it->key(), opts, it);
|
||||
return blocking_it;
|
||||
}
|
||||
|
||||
rocksdb::Iterator &
|
||||
|
@ -3605,6 +3704,7 @@ rocksdb::ReadOptions
|
|||
ircd::db::make_opts(const gopts &opts)
|
||||
{
|
||||
rocksdb::ReadOptions ret;
|
||||
assert(ret.fill_cache);
|
||||
ret.read_tier = NON_BLOCKING;
|
||||
ret.iterate_upper_bound = opts.upper_bound;
|
||||
|
||||
|
|
Loading…
Reference in a new issue