diff --git a/include/ircd/db/db.h b/include/ircd/db/db.h index b25d0a039..707fd57f3 100644 --- a/include/ircd/db/db.h +++ b/include/ircd/db/db.h @@ -56,6 +56,7 @@ namespace ircd::db #include "row.h" #include "json.h" #include "txn.h" +#include "prefetcher.h" #include "stats.h" // diff --git a/include/ircd/db/prefetcher.h b/include/ircd/db/prefetcher.h new file mode 100644 index 000000000..24634049a --- /dev/null +++ b/include/ircd/db/prefetcher.h @@ -0,0 +1,57 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2019 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#pragma once +#define HAVE_IRCD_DB_PREFETCHER_H + +namespace ircd::db +{ + struct prefetcher extern *prefetcher; +} + +struct ircd::db::prefetcher +{ + struct request; + using closure = std::function; + + ctx::dock dock; + std::deque queue; + ctx::context context; + size_t handles {0}; + size_t request_workers {0}; + size_t request_counter {0}; + size_t handles_counter {0}; + size_t fetched_counter {0}; + size_t cancels_counter {0}; + + size_t wait_pending(); + void request_handle(request &); + void request_worker(); + void handle(); + void worker(); + + public: + size_t cancel(const closure &); + size_t cancel(database &); // Cancel all for db + size_t cancel(column &); // Cancel all for column + + bool operator()(column &, const string_view &key, const gopts &); + + prefetcher(); + ~prefetcher() noexcept; +}; + +struct ircd::db::prefetcher::request +{ + std::string key; + database *d {nullptr}; + steady_point start; + uint32_t cid {0}; +}; diff --git a/ircd/db.cc b/ircd/db.cc index 4ef45f33c..44eacdc5a 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -2358,175 +2358,6 @@ catch(const std::exception &e) return false; } -/////////////////////////////////////////////////////////////////////////////// -// -// db/stats.h -// - -std::string -ircd::db::string(const rocksdb::IOStatsContext &ic, - const bool &all) -{ - const bool exclude_zeros(!all); - return ic.ToString(exclude_zeros); -} - -const rocksdb::IOStatsContext & -ircd::db::iostats_current() -{ - const auto *const &ret - { - rocksdb::get_iostats_context() - }; - - if(unlikely(!ret)) - throw error - { - "IO counters are not available on this thread." - }; - - return *ret; -} - -std::string -ircd::db::string(const rocksdb::PerfContext &pc, - const bool &all) -{ - const bool exclude_zeros(!all); - return pc.ToString(exclude_zeros); -} - -const rocksdb::PerfContext & -ircd::db::perf_current() -{ - const auto *const &ret - { - rocksdb::get_perf_context() - }; - - if(unlikely(!ret)) - throw error - { - "Performance counters are not available on this thread." - }; - - return *ret; -} - -void -ircd::db::perf_level(const uint &level) -{ - if(level >= rocksdb::PerfLevel::kOutOfBounds) - throw error - { - "Perf level of '%u' is invalid; maximum is '%u'", - level, - uint(rocksdb::PerfLevel::kOutOfBounds) - }; - - rocksdb::SetPerfLevel(rocksdb::PerfLevel(level)); -} - -uint -ircd::db::perf_level() -{ - return rocksdb::GetPerfLevel(); -} - -// -// ticker -// - -uint64_t -ircd::db::ticker(const database &d, - const string_view &key) -{ - return ticker(d, ticker_id(key)); -} - -uint64_t -ircd::db::ticker(const database &d, - const uint32_t &id) -{ - return d.stats->getTickerCount(id); -} - -uint32_t -ircd::db::ticker_id(const string_view &key) -{ - for(const auto &pair : rocksdb::TickersNameMap) - if(key == pair.second) - return pair.first; - - throw std::out_of_range - { - "No ticker with that key" - }; -} - -ircd::string_view -ircd::db::ticker_id(const uint32_t &id) -{ - for(const auto &pair : rocksdb::TickersNameMap) - if(id == pair.first) - return pair.second; - - return {}; -} - -decltype(ircd::db::ticker_max) -ircd::db::ticker_max -{ - rocksdb::TICKER_ENUM_MAX -}; - -// -// histogram -// - -const struct ircd::db::histogram & -ircd::db::histogram(const database &d, - const string_view &key) -{ - return histogram(d, histogram_id(key)); -} - -const struct ircd::db::histogram & -ircd::db::histogram(const database &d, - const uint32_t &id) -{ - return d.stats->histogram.at(id); -} - -uint32_t -ircd::db::histogram_id(const string_view &key) -{ - for(const auto &pair : rocksdb::HistogramsNameMap) - if(key == pair.second) - return pair.first; - - throw std::out_of_range - { - "No histogram with that key" - }; -} - -ircd::string_view -ircd::db::histogram_id(const uint32_t &id) -{ - for(const auto &pair : rocksdb::HistogramsNameMap) - if(id == pair.first) - return pair.second; - - return {}; -} - -decltype(ircd::db::histogram_max) -ircd::db::histogram_max -{ - rocksdb::HISTOGRAM_ENUM_MAX -}; - /////////////////////////////////////////////////////////////////////////////// // // database::stats (db/database/stats.h) internal @@ -3808,6 +3639,439 @@ ircd::db::database::wal::info::operator=(const rocksdb::LogFile &lf) return *this; } +/////////////////////////////////////////////////////////////////////////////// +// +// db/stats.h +// + +std::string +ircd::db::string(const rocksdb::IOStatsContext &ic, + const bool &all) +{ + const bool exclude_zeros(!all); + return ic.ToString(exclude_zeros); +} + +const rocksdb::IOStatsContext & +ircd::db::iostats_current() +{ + const auto *const &ret + { + rocksdb::get_iostats_context() + }; + + if(unlikely(!ret)) + throw error + { + "IO counters are not available on this thread." + }; + + return *ret; +} + +std::string +ircd::db::string(const rocksdb::PerfContext &pc, + const bool &all) +{ + const bool exclude_zeros(!all); + return pc.ToString(exclude_zeros); +} + +const rocksdb::PerfContext & +ircd::db::perf_current() +{ + const auto *const &ret + { + rocksdb::get_perf_context() + }; + + if(unlikely(!ret)) + throw error + { + "Performance counters are not available on this thread." + }; + + return *ret; +} + +void +ircd::db::perf_level(const uint &level) +{ + if(level >= rocksdb::PerfLevel::kOutOfBounds) + throw error + { + "Perf level of '%u' is invalid; maximum is '%u'", + level, + uint(rocksdb::PerfLevel::kOutOfBounds) + }; + + rocksdb::SetPerfLevel(rocksdb::PerfLevel(level)); +} + +uint +ircd::db::perf_level() +{ + return rocksdb::GetPerfLevel(); +} + +// +// ticker +// + +uint64_t +ircd::db::ticker(const database &d, + const string_view &key) +{ + return ticker(d, ticker_id(key)); +} + +uint64_t +ircd::db::ticker(const database &d, + const uint32_t &id) +{ + return d.stats->getTickerCount(id); +} + +uint32_t +ircd::db::ticker_id(const string_view &key) +{ + for(const auto &pair : rocksdb::TickersNameMap) + if(key == pair.second) + return pair.first; + + throw std::out_of_range + { + "No ticker with that key" + }; +} + +ircd::string_view +ircd::db::ticker_id(const uint32_t &id) +{ + for(const auto &pair : rocksdb::TickersNameMap) + if(id == pair.first) + return pair.second; + + return {}; +} + +decltype(ircd::db::ticker_max) +ircd::db::ticker_max +{ + rocksdb::TICKER_ENUM_MAX +}; + +// +// histogram +// + +const struct ircd::db::histogram & +ircd::db::histogram(const database &d, + const string_view &key) +{ + return histogram(d, histogram_id(key)); +} + +const struct ircd::db::histogram & +ircd::db::histogram(const database &d, + const uint32_t &id) +{ + return d.stats->histogram.at(id); +} + +uint32_t +ircd::db::histogram_id(const string_view &key) +{ + for(const auto &pair : rocksdb::HistogramsNameMap) + if(key == pair.second) + return pair.first; + + throw std::out_of_range + { + "No histogram with that key" + }; +} + +ircd::string_view +ircd::db::histogram_id(const uint32_t &id) +{ + for(const auto &pair : rocksdb::HistogramsNameMap) + if(id == pair.first) + return pair.second; + + return {}; +} + +decltype(ircd::db::histogram_max) +ircd::db::histogram_max +{ + rocksdb::HISTOGRAM_ENUM_MAX +}; + +/////////////////////////////////////////////////////////////////////////////// +// +// db/prefetcher.h +// + +decltype(ircd::db::prefetcher) +ircd::db::prefetcher; + +// +// db::prefetcher +// +ircd::db::prefetcher::prefetcher() +:context +{ + "db.prefetcher", + 128_KiB, + context::POST, + std::bind(&prefetcher::worker, this) +} +{ +} + +ircd::db::prefetcher::~prefetcher() +noexcept +{ + while(!queue.empty()) + { + log::warning + { + log, "Prefetcher waiting for %zu requests to clear...", + queue.size(), + }; + + dock.wait_for(seconds(5), [this] + { + return queue.empty(); + }); + } + + assert(queue.empty()); +} + +bool +ircd::db::prefetcher::operator()(column &c, + const string_view &key, + const gopts &opts) +{ + auto &d + { + static_cast(c) + }; + + const auto start + { + #ifdef IRCD_DB_DEBUG_PREFETCH + now() + #else + steady_point{} + #endif + }; + + queue.emplace_back(request + { + key, &d, now(), db::id(c) + }); + + ++request_counter; + dock.notify_one(); + return true; +} + +size_t +ircd::db::prefetcher::cancel(column &c) +{ + return cancel([&c] + (const auto &request) + { + return request.cid == id(c); + }); +} + +size_t +ircd::db::prefetcher::cancel(database &d) +{ + return cancel([&d] + (const auto &request) + { + return request.d == std::addressof(d); + }); +} + +size_t +ircd::db::prefetcher::cancel(const closure &closure) +{ + const auto e + { + std::remove_if(begin(queue), end(queue), closure) + }; + + const ssize_t remain + { + std::distance(begin(queue), e) + }; + + assert(remain >= 0); + const ssize_t canceled + { + ssize_t(queue.size()) - remain + }; + + assert(canceled >= 0); + queue.resize(remain); + cancels_counter += canceled; + + if(canceled) + dock.notify_all(); + + return canceled; +} + +void +ircd::db::prefetcher::worker() +try +{ + while(1) + { + dock.wait([this] + { + return !queue.empty() && request_counter > handles_counter; + }); + + handle(); + } +} +catch(const std::exception &e) +{ + log::critical + { + log, "prefetcher worker: %s", + e.what() + }; +} + +void +ircd::db::prefetcher::handle() +{ + const scope_count handles + { + this->handles + }; + + auto handler + { + std::bind(&prefetcher::request_worker, this) + }; + + db::request(std::move(handler)); + ++handles_counter; +} + +void +ircd::db::prefetcher::request_worker() +{ + const ctx::scope_notify notify + { + this->dock + }; + + const scope_count request_workers + { + this->request_workers + }; + + if(unlikely(queue.empty())) + return; + + auto request + { + std::move(queue.front()) + }; + + queue.pop_front(); + request_handle(request); + ++fetched_counter; +} + +void +ircd::db::prefetcher::request_handle(request &request) +try +{ + assert(request.d); + db::column column + { + (*request.d)[request.cid] + }; + + const bool has + { + db::has(column, request.key) + }; + + #ifdef IRCD_DB_DEBUG_PREFETCH + char pbuf[32]; + log::debug + { + log, "[%s][%s] completed fetch:%b queue:%zu h:%zu rw:%zu rc:%zu hc:%zu fc:%zu cc:%zu in %s", + name(*request.d), + name(column), + has? false : true, + queue.size(), + this->handles, + this->request_workers, + this->request_counter, + this->handles_counter, + this->fetched_counter, + this->cancels_counter, + pretty(pbuf, now() - request.start, 1), + }; + #endif +} +catch(const not_found &e) +{ + assert(request.d); + log::dwarning + { + log, "[%s][%u] :%s", + name(*request.d), + request.cid, + e.what(), + }; +} +catch(const std::exception &e) +{ + assert(request.d); + log::error + { + log, "[%s][%u] :%s", + name(*request.d), + request.cid, + e.what(), + }; +} + +size_t +ircd::db::prefetcher::wait_pending() +{ + const size_t fetched_counter + { + this->fetched_counter + }; + + const size_t fetched_target + { + fetched_counter + request_workers + }; + + dock.wait([this, &fetched_target] + { + return this->fetched_counter >= fetched_target; + }); + + assert(fetched_target >= fetched_counter); + return fetched_target - fetched_counter; +} /////////////////////////////////////////////////////////////////////////////// // @@ -5852,267 +6116,6 @@ ircd::db::prefetch(column &column, return (*prefetcher)(column, key, gopts); } -// -// db::prefetcher -// - -decltype(ircd::db::prefetcher) -ircd::db::prefetcher; - -ircd::db::prefetcher::prefetcher() -:context -{ - "db.prefetcher", - 128_KiB, - context::POST, - std::bind(&prefetcher::worker, this) -} -{ -} - -ircd::db::prefetcher::~prefetcher() -noexcept -{ - while(!queue.empty()) - { - log::warning - { - log, "Prefetcher waiting for %zu requests to clear...", - queue.size(), - }; - - dock.wait_for(seconds(5), [this] - { - return queue.empty(); - }); - } - - assert(queue.empty()); -} - -bool -ircd::db::prefetcher::operator()(column &c, - const string_view &key, - const gopts &opts) -{ - auto &d - { - static_cast(c) - }; - - const auto start - { - #ifdef IRCD_DB_DEBUG_PREFETCH - now() - #else - steady_point{} - #endif - }; - - queue.emplace_back(request - { - key, &d, now(), db::id(c) - }); - - ++request_counter; - dock.notify_one(); - return true; -} - -size_t -ircd::db::prefetcher::cancel(column &c) -{ - return cancel([&c] - (const auto &request) - { - return request.cid == id(c); - }); -} - -size_t -ircd::db::prefetcher::cancel(database &d) -{ - return cancel([&d] - (const auto &request) - { - return request.d == std::addressof(d); - }); -} - -size_t -ircd::db::prefetcher::cancel(const closure &closure) -{ - const auto e - { - std::remove_if(begin(queue), end(queue), closure) - }; - - const ssize_t remain - { - std::distance(begin(queue), e) - }; - - assert(remain >= 0); - const ssize_t canceled - { - ssize_t(queue.size()) - remain - }; - - assert(canceled >= 0); - queue.resize(remain); - cancels_counter += canceled; - - if(canceled) - dock.notify_all(); - - return canceled; -} - -void -ircd::db::prefetcher::worker() -try -{ - while(1) - { - dock.wait([this] - { - return !queue.empty() && request_counter > handles_counter; - }); - - handle(); - } -} -catch(const std::exception &e) -{ - log::critical - { - log, "prefetcher worker: %s", - e.what() - }; -} - -void -ircd::db::prefetcher::handle() -{ - const scope_count handles - { - this->handles - }; - - auto handler - { - std::bind(&prefetcher::request_worker, this) - }; - - db::request(std::move(handler)); - ++handles_counter; -} - -void -ircd::db::prefetcher::request_worker() -{ - const ctx::scope_notify notify - { - this->dock - }; - - const scope_count request_workers - { - this->request_workers - }; - - if(unlikely(queue.empty())) - return; - - auto request - { - std::move(queue.front()) - }; - - queue.pop_front(); - request_handle(request); - ++fetched_counter; -} - -void -ircd::db::prefetcher::request_handle(request &request) -try -{ - assert(request.d); - db::column column - { - (*request.d)[request.cid] - }; - - const bool has - { - db::has(column, request.key) - }; - - #ifdef IRCD_DB_DEBUG_PREFETCH - char pbuf[32]; - log::debug - { - log, "[%s][%s] completed fetch:%b queue:%zu h:%zu rw:%zu rc:%zu hc:%zu fc:%zu cc:%zu in %s", - name(*request.d), - name(column), - has? false : true, - queue.size(), - this->handles, - this->request_workers, - this->request_counter, - this->handles_counter, - this->fetched_counter, - this->cancels_counter, - pretty(pbuf, now() - request.start, 1), - }; - #endif -} -catch(const not_found &e) -{ - assert(request.d); - log::dwarning - { - log, "[%s][%u] :%s", - name(*request.d), - request.cid, - e.what(), - }; -} -catch(const std::exception &e) -{ - assert(request.d); - log::error - { - log, "[%s][%u] :%s", - name(*request.d), - request.cid, - e.what(), - }; -} - -size_t -ircd::db::prefetcher::wait_pending() -{ - const size_t fetched_counter - { - this->fetched_counter - }; - - const size_t fetched_target - { - fetched_counter + request_workers - }; - - dock.wait([this, &fetched_target] - { - return this->fetched_counter >= fetched_target; - }); - - assert(fetched_target >= fetched_counter); - return fetched_target - fetched_counter; -} - // // db::cached // diff --git a/ircd/db.h b/ircd/db.h index d385bc0a8..f376d035b 100644 --- a/ircd/db.h +++ b/ircd/db.h @@ -56,7 +56,6 @@ namespace ircd::db { struct throw_on_error; struct error_to_status; - struct prefetcher; constexpr const auto BLOCKING { rocksdb::ReadTier::kReadAllTier }; constexpr const auto NON_BLOCKING { rocksdb::ReadTier::kBlockCacheTier }; @@ -68,7 +67,6 @@ namespace ircd::db extern ctx::pool::opts request_pool_opts; extern ctx::pool request; extern ctx::mutex write_mutex; - extern db::prefetcher *prefetcher; // reflections string_view reflect(const rocksdb::Status::Severity &); @@ -187,47 +185,3 @@ struct ircd::db::txn::handler ,cb{cb} {} }; - -// -// prefetcher -// - -struct ircd::db::prefetcher -{ - struct request; - using closure = std::function; - - ctx::dock dock; - std::deque queue; - ctx::context context; - size_t handles {0}; - size_t request_workers {0}; - size_t request_counter {0}; - size_t handles_counter {0}; - size_t fetched_counter {0}; - size_t cancels_counter {0}; - - size_t wait_pending(); - void request_handle(request &); - void request_worker(); - void handle(); - void worker(); - - public: - size_t cancel(const closure &); - size_t cancel(database &); // Cancel all for db - size_t cancel(column &); // Cancel all for column - - bool operator()(column &, const string_view &key, const gopts &); - - prefetcher(); - ~prefetcher() noexcept; -}; - -struct ircd::db::prefetcher::request -{ - std::string key; - database *d {nullptr}; - steady_point start; - uint32_t cid {0}; -};