diff --git a/include/ircd/db/prefetcher.h b/include/ircd/db/prefetcher.h index 24634049a..452eb105d 100644 --- a/include/ircd/db/prefetcher.h +++ b/include/ircd/db/prefetcher.h @@ -24,15 +24,19 @@ struct ircd::db::prefetcher ctx::dock dock; std::deque queue; ctx::context context; - size_t handles {0}; size_t request_workers {0}; size_t request_counter {0}; + size_t directs_counter {0}; size_t handles_counter {0}; + size_t handled_counter {0}; + size_t fetches_counter {0}; size_t fetched_counter {0}; size_t cancels_counter {0}; + size_t cache_hits {0}; size_t wait_pending(); void request_handle(request &); + size_t request_cleanup() noexcept; void request_worker(); void handle(); void worker(); @@ -50,8 +54,22 @@ struct ircd::db::prefetcher struct ircd::db::prefetcher::request { - std::string key; - database *d {nullptr}; - steady_point start; - uint32_t cid {0}; + database *d {nullptr}; // database instance + uint32_t cid {0}; // column ID + uint32_t len {0}; // length of key + steady_point snd; // submitted by user + steady_point req; // request sent to database + steady_point fin; // result from database + char key[208] alignas(16); // key buffer + + explicit operator string_view() const noexcept; + + request(database &d, const column &c, const string_view &key) noexcept; + request() = default; }; + +static_assert +( + sizeof(ircd::db::prefetcher::request) == 256, + "struct ircd::db::prefetcher::request fell out of alignment" +); diff --git a/ircd/db.cc b/ircd/db.cc index 7e5b88325..b16fa3cf2 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -3860,22 +3860,30 @@ ircd::db::prefetcher::operator()(column &c, static_cast(c) }; - const auto start + if(db::cached(c, key, opts)) { - #ifdef IRCD_DB_DEBUG_PREFETCH - now() - #else - steady_point{} - #endif - }; - - queue.emplace_back(request - { - key, &d, now(), db::id(c) - }); + ++cache_hits; + return false; + } + const ctx::critical_assertion ca; + queue.emplace_back(d, c, key); + queue.back().snd = now(); ++request_counter; - dock.notify_one(); + + // Branch here based on whether it's not possible to directly dispatch + // a db::request worker. If all request workers are busy we notify our own + // prefetcher worker, and then it blocks on submitting to the request + // worker instead of us blocking here. This is done to avoid use and growth + // of any request pool queue, and allow for more direct submission. + if(db::request.wouldblock()) + { + dock.notify_one(); + return true; + } + + ++directs_counter; + this->handle(); return true; } @@ -3902,25 +3910,25 @@ ircd::db::prefetcher::cancel(database &d) size_t ircd::db::prefetcher::cancel(const closure &closure) { - const auto e + size_t canceled(0); + for(auto &request : queue) { - std::remove_if(begin(queue), end(queue), closure) - }; + // already finished + if(request.fin != steady_point::min()) + continue; - const ssize_t remain - { - std::distance(begin(queue), e) - }; + // in progress; can't cancel + if(request.req != steady_point::min()) + continue; - assert(remain >= 0); - const ssize_t canceled - { - ssize_t(queue.size()) - remain - }; + // allow user to accept or reject + if(!closure(request)) + continue; - assert(canceled >= 0); - queue.resize(remain); - cancels_counter += canceled; + // cancel by precociously setting the finish time. + request.fin = now(); + ++canceled; + } if(canceled) dock.notify_all(); @@ -3936,7 +3944,13 @@ try { dock.wait([this] { - return !queue.empty() && request_counter > handles_counter; + if(queue.empty()) + return false; + + if(request_counter <= handles_counter) + return false; + + return true; }); handle(); @@ -3954,18 +3968,14 @@ catch(const std::exception &e) void ircd::db::prefetcher::handle() { - const scope_count handles - { - this->handles - }; - auto handler { std::bind(&prefetcher::request_worker, this) }; - db::request(std::move(handler)); ++handles_counter; + db::request(std::move(handler)); + ++handled_counter; } void @@ -3981,17 +3991,65 @@ ircd::db::prefetcher::request_worker() this->request_workers }; - if(unlikely(queue.empty())) - return; - - auto request + // Garbage collection of the queue invoked unconditionally on unwind. + const unwind cleanup_on_leave { - std::move(queue.front()) + std::bind(&prefetcher::request_cleanup, this) }; - queue.pop_front(); - request_handle(request); + // GC the queue here to get rid of any cancelled requests which have + // arrived at the front so they don't become our request. + const size_t cleanup_on_enter + { + request_cleanup() + }; + + // Find the first request in the queue which does not have its req + // timestamp sent. + auto request + { + std::find_if(begin(queue), end(queue), [] + (const auto &request) + { + return request.req == steady_point::min(); + }) + }; + + if(request == end(queue)) + return; + + request->req = now(); + assert(request->fin == steady_point::min()); + ++fetches_counter; + request_handle(*request); ++fetched_counter; + + #ifdef IRCD_DB_DEBUG_PREFETCH + log::debug + { + log, "prefetcher ch:%zu rc:%zu hc:%zu fc:%zu dc:%zu cc:%zu queue:%zu rw:%zu", + cache_hits, + request_counter, + handles_counter, + fetches_counter, + directs_counter, + cancels_counter, + queue.size(), + this->request_workers, + }; + #endif +} + +size_t +ircd::db::prefetcher::request_cleanup() +noexcept +{ + size_t removed(0); + const ctx::critical_assertion ca; + for(; !queue.empty() && queue.front().fin != steady_point::min(); ++removed) + queue.pop_front(); + + return removed; } void @@ -4004,44 +4062,45 @@ try (*request.d)[request.cid] }; - const bool has + const string_view key { - db::has(column, request.key) + request }; + const auto it + { + seek(column, key, gopts{}) + }; + + const bool lte + { + valid_lte(*it, key) + }; + + const ctx::critical_assertion ca; + request.fin = now(); + #ifdef IRCD_DB_DEBUG_PREFETCH - char pbuf[32]; + char pbuf[3][32]; log::debug { - log, "[%s][%s] completed fetch:%b queue:%zu h:%zu rw:%zu rc:%zu hc:%zu fc:%zu cc:%zu in %s", + log, "[%s][%s] completed prefetch len:%zu lte:%b snd-req:%s req-fin:%s snd-fin:%s queue:%zu", name(*request.d), name(column), - has? false : true, + size(key), + lte, + pretty(pbuf[0], request.req - request.snd, 1), + pretty(pbuf[1], request.fin - request.req, 1), + pretty(pbuf[2], request.fin - request.snd, 1), queue.size(), - this->handles, - this->request_workers, - this->request_counter, - this->handles_counter, - this->fetched_counter, - this->cancels_counter, - pretty(pbuf, now() - request.start, 1), }; #endif } -catch(const not_found &e) -{ - assert(request.d); - log::dwarning - { - log, "[%s][%u] :%s", - name(*request.d), - request.cid, - e.what(), - }; -} catch(const std::exception &e) { assert(request.d); + request.fin = now(); + log::error { log, "[%s][%u] :%s", @@ -4050,6 +4109,11 @@ catch(const std::exception &e) e.what(), }; } +catch(...) +{ + request.fin = now(); + throw; +} size_t ircd::db::prefetcher::wait_pending() @@ -4073,6 +4137,57 @@ ircd::db::prefetcher::wait_pending() return fetched_target - fetched_counter; } +// +// prefetcher::request +// + +ircd::db::prefetcher::request::request(database &d, + const column &c, + const string_view &key) +noexcept +:d +{ + std::addressof(d) +} +,cid +{ + db::id(c) +} +,len +{ + uint32_t(std::min(size(key), sizeof(this->key))) +} +,snd +{ + steady_point::min() +} +,req +{ + steady_point::min() +} +,fin +{ + steady_point::min() +} +{ + const size_t &len + { + buffer::copy(this->key, key) + }; + + assert(this->len == len); +} + +ircd::db::prefetcher::request::operator +ircd::string_view() +const noexcept +{ + return + { + key, len + }; +} + /////////////////////////////////////////////////////////////////////////////// // // db/txn.h @@ -6109,9 +6224,6 @@ ircd::db::prefetch(column &column, const string_view &key, const gopts &gopts) { - if(cached(column, key, gopts)) - return false; - static construction instance { [] { prefetcher = new struct prefetcher(); }