From 9f1267256767cdf2e36d496353563b0aee6ee10c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 18 Aug 2017 16:13:15 -0600 Subject: [PATCH] ircd::db: Fix seek offload procedure. --- ircd/db.cc | 99 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 19 deletions(-) diff --git a/ircd/db.cc b/ircd/db.cc index 7f549979b..9bd862170 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -2081,6 +2081,12 @@ ircd::db::seek(database::column &c, seek(c, p, opts, it); } +// +// Seek with offload-safety in case of blocking IO. +// +// The options for an iterator cannot be changed after the iterator is created. +// This slightly complicates our toggling between blocking and non-blocking queries. +// template void ircd::db::seek(database::column &c, @@ -2089,32 +2095,82 @@ ircd::db::seek(database::column &c, std::unique_ptr &it) { database &d(*c.d); + rocksdb::ColumnFamilyHandle *const &cf(c); - // Start with a non-blocking query - if(!it || opts.read_tier == BLOCKING) - { - opts.read_tier = NON_BLOCKING; - it.reset(d.d->NewIterator(opts, c)); - } + // The ReadOptions created by make_opts(gopts) always sets NON_BLOCKING + // mode. The user should never touch this. Only this function will ever + // deal with iterators in BLOCKING mode. + assert(opts.read_tier == NON_BLOCKING); + if(!it) + it.reset(d.d->NewIterator(opts, cf)); + + // Start with a non-blocking query. _seek_(*it, p); - if(it->status().IsIncomplete()) + + // Indicate a cache miss and blocking is required. + if(!it->status().IsIncomplete()) { - // DB cache miss: reset the iterator to blocking mode and offload it - opts.read_tier = BLOCKING; - it.reset(d.d->NewIterator(opts, c)); - ctx::offload([&it, &p] - { - _seek_(*it, p); - }); + log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE HIT %s", + name(d), + name(c), + sequence(d), + valid(*it), + it->status().ToString()); + return; } - log.debug("'%s':'%s' @%lu SEEK [valid: %d] [%s]", + // DB cache miss: create a blocking iterator and offload it. + rocksdb::ReadOptions blocking_opts(opts); + blocking_opts.fill_cache = true; + blocking_opts.read_tier = BLOCKING; + std::unique_ptr blocking_it + { + d.d->NewIterator(blocking_opts, cf) + }; + + ctx::offload([&blocking_it, &it, &p] + { + // When the non-blocking iterator hit its cache miss in the middle of an + // iteration we have to copy its position to the blocking iterator first + // and then make the next query. TODO: this can be avoided when 'p' is a + // slice and not an increment. + if(valid(*it)) + _seek_(*blocking_it, it->key()); + + _seek_(*blocking_it, p); + }); + + // When the blocking iterator comes back invalid the result is propagated + if(!valid(*blocking_it)) + { + it.reset(rocksdb::NewErrorIterator(blocking_it->status())); + log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s", + name(d), + name(c), + sequence(d), + valid(*it), + it->status().ToString()); + return; + } + + // When the blocking iterator comes back valid the result still has to be + // properly transferred back to the user's non-blocking iterator. RocksDB + // seems to be forcing us to recreate the iterator after it failed with the + // status IsIncomplete(). Regardless of reuse, a non-blocking seek must occur + // to match this iterator with the result -- such a seek may fail again if + // the cache has been hosed between the point the offload took place and this + // seek! To properly handle this case we reenter this seek() function and + // enjoy the safety of offloading again for this edge case. + it.reset(nullptr); + log.debug("'%s':'%s' @%lu SEEK valid[%d] CACHE MISS %s", name(d), name(c), sequence(d), - valid(*it), - opts.read_tier == BLOCKING? "CACHE MISS"s : "CACHE HIT"s); + valid(*blocking_it), + blocking_it->status().ToString()); + + seek(c, blocking_it->key(), opts, it); } void @@ -2338,11 +2394,16 @@ ircd::db::make_opts(const gopts &opts, { rocksdb::ReadOptions ret; ret.snapshot = opts.snapshot; + ret.read_tier = NON_BLOCKING; + //ret.total_order_seek = true; + //ret.iterate_upper_bound = nullptr; if(iterator) + { ret.fill_cache = false; - else - ret.fill_cache = true; + ret.readahead_size = 4_KiB; + } + else ret.fill_cache = true; for(const auto &opt : opts) switch(opt.first) {