From 6db0659c06a97b040454bcca56d26615db022254 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 16 Aug 2018 04:48:21 -0700 Subject: [PATCH] ircd::db: Enable port; minimal context pool state. --- include/ircd/db/database/env/state.h | 33 ++++- ircd/db.cc | 195 ++++++++++++++++++++------- ircd/db.h | 7 + 3 files changed, 183 insertions(+), 52 deletions(-) diff --git a/include/ircd/db/database/env/state.h b/include/ircd/db/database/env/state.h index 51071f93b..194f65d18 100644 --- a/include/ircd/db/database/env/state.h +++ b/include/ircd/db/database/env/state.h @@ -13,8 +13,37 @@ struct ircd::db::database::env::state { + struct task; + + /// Backreference to database database &d; - state(database *const &d); - ~state() noexcept; + /// Convenience alias of the number of pools + static constexpr const size_t POOLS + { + rocksdb::Env::Priority::TOTAL + }; + + /// Track of background tasks. + std::array, POOLS> tasks; + + /// The background task pools. + std::array pool + {{ + // name of pool stack size initial workers + { "bottom", 128_KiB, 1 }, + { "low", 128_KiB, 1 }, + { "high", 128_KiB, 1 }, + }}; + + state(database *const &d) + :d{*d} + {} +}; + +struct ircd::db::database::env::state::task +{ + void (*func)(void *arg); + void (*cancel)(void *arg); + void *arg; }; diff --git a/ircd/db.cc b/ircd/db.cc index bac7d5286..1f9a0a5d7 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -46,6 +46,7 @@ #include #include +#define IRCD_DB_PORT // RocksDB port linktime-overriding interfaces (experimental). #ifdef IRCD_DB_PORT @@ -2230,31 +2231,88 @@ ircd::db::database::env::Schedule(void (*f)(void* arg), noexcept { #ifdef RB_DEBUG_DB_ENV - log.debug("'%s': schedule func:%p a:%p tag:%p u:%p prio:%s", - d.name, - f, - a, - tag, - u, - reflect(prio)); + log::debug + { + log, "'%s': schedule func:%p a:%p tag:%p u:%p prio:%s", + d.name, + f, + a, + tag, + u, + reflect(prio) + }; #endif - return defaults.Schedule(f, a, prio, tag, u); + assert(st); + auto &pool + { + st->pool.at(prio) + }; + + auto &tasks + { + st->tasks.at(prio) + }; + + tasks.emplace_back(state::task + { + f, u, a + }); + + pool([this, &tasks] + { + assert(this->st); + if(tasks.empty()) + return; + + const auto task + { + std::move(tasks.front()) + }; + + tasks.pop_front(); + + #ifdef RB_DEBUG_DB_ENV + log::debug + { + log, "'%s': func:%p arg:%p", + this->d.name, + task.func, + task.arg, + }; + #endif + + // Execute the task + task.func(task.arg); + }); } int -ircd::db::database::env::UnSchedule(void* tag, - Priority pri) +ircd::db::database::env::UnSchedule(void *const tag, + const Priority prio) noexcept { #ifdef RB_DEBUG_DB_ENV - log.debug("'%s': unschedule tag:%p prio:%s", - d.name, - tag, - reflect(pri)); + log::debug + { + log, "'%s': unschedule tag:%p prio:%s", + d.name, + tag, + reflect(prio) + }; #endif - return defaults.UnSchedule(tag, pri); + assert(st); + auto &tasks + { + st->tasks.at(prio) + }; + + size_t i(0); + for(auto it(begin(tasks)); it != end(tasks); it = tasks.erase(it), ++i) + it->cancel(it->arg); + + return i; } void @@ -2263,12 +2321,16 @@ ircd::db::database::env::StartThread(void (*f)(void*), noexcept { #ifdef RB_DEBUG_DB_ENV - log.debug("'%s': start thread func:%p a:%p", - d.name, - f, - a); + log::debug + { + log, "'%s': start thread func:%p a:%p", + d.name, + f, + a + }; #endif + assert(0); return defaults.StartThread(f, a); } @@ -2288,46 +2350,81 @@ noexcept } unsigned int -ircd::db::database::env::GetThreadPoolQueueLen(Priority pri) +ircd::db::database::env::GetThreadPoolQueueLen(Priority prio) const noexcept { #ifdef RB_DEBUG_DB_ENV - log.debug("'%s': get thread pool queue len prio:%s", - d.name, - reflect(pri)); + log::debug + { + log, "'%s': get thread pool queue len prio:%s", + d.name, + reflect(prio) + }; #endif - return defaults.GetThreadPoolQueueLen(pri); + assert(st); + const auto &pool + { + st->pool.at(prio) + }; + + return pool.queued(); } void ircd::db::database::env::SetBackgroundThreads(int num, - Priority pri) + Priority prio) noexcept { #ifdef RB_DEBUG_DB_ENV - log.debug("'%s': set background threads num:%d prio:%s", - d.name, - num, - reflect(pri)); + log::debug + { + log, "'%s': set background threads num:%d prio:%s", + d.name, + num, + reflect(prio) + }; #endif - return defaults.SetBackgroundThreads(num, pri); + assert(st); + auto &pool + { + st->pool.at(prio) + }; + + const auto &size + { + ssize_t(pool.size()) + }; + + if(size > num) + pool.del(size - num); + else if(size < num) + pool.add(num - size); } void ircd::db::database::env::IncBackgroundThreadsIfNeeded(int num, - Priority pri) + Priority prio) noexcept { #ifdef RB_DEBUG_DB_ENV - log.debug("'%s': increase background threads num:%d prio:%s", - d.name, - num, - reflect(pri)); + log::debug + { + log, "'%s': increase background threads num:%d prio:%s", + d.name, + num, + reflect(prio) + }; #endif - return defaults.IncBackgroundThreadsIfNeeded(num, pri); + assert(st); + auto &pool + { + st->pool.at(prio) + }; + + pool.add(num); } void @@ -2387,11 +2484,11 @@ const noexcept }; #endif - return defaults.GetThreadID(); + return ctx::this_ctx::id(); } int -ircd::db::database::env::GetBackgroundThreads(Priority pri) +ircd::db::database::env::GetBackgroundThreads(Priority prio) noexcept { #ifdef RB_DEBUG_DB_ENV @@ -2399,11 +2496,17 @@ noexcept { log, "'%s': get background threads prio:%s", d.name, - reflect(pri) + reflect(prio) }; #endif - return defaults.GetBackgroundThreads(pri); + assert(st); + const auto &pool + { + st->pool.at(prio) + }; + + return pool.size(); } // @@ -3246,6 +3349,7 @@ rocksdb::port::CondVar::Wait() assert(mu); assert_main_thread(); + mu->AssertHeld(); std::unique_lockmu)> l { mu->mu, std::adopt_lock @@ -3267,6 +3371,7 @@ rocksdb::port::CondVar::TimedWait(uint64_t abs_time_us) assert(mu); assert_main_thread(); + mu->AssertHeld(); const std::chrono::microseconds us(abs_time_us); const std::chrono::steady_clock::time_point tp(us); std::unique_lockmu)> l @@ -3312,16 +3417,6 @@ rocksdb::port::CondVar::SignalAll() // db/database/env/state.h // -ircd::db::database::env::state::state(database *const &d) -:d{*d} -{ -} - -ircd::db::database::env::state::~state() -noexcept -{ -} - /////////////////////////////////////////////////////////////////////////////// // // db/txn.h diff --git a/ircd/db.h b/ircd/db.h index a65559b8a..29aca8be8 100644 --- a/ircd/db.h +++ b/ircd/db.h @@ -23,6 +23,13 @@ /// //#define RB_DEBUG_DB_SEEK +/// Uncomment or -D this #define to enable extensive log messages for the +/// experimental db environment-port implementation. This is only useful +/// for developers working on the port impl and want to debug all locking +/// and unlocking etc. +/// +//#define RB_DEBUG_DB_PORT + namespace ircd::db { struct throw_on_error;