diff --git a/include/ircd/db/database/env/state.h b/include/ircd/db/database/env/state.h index bba7ec160..40b0ac94f 100644 --- a/include/ircd/db/database/env/state.h +++ b/include/ircd/db/database/env/state.h @@ -14,25 +14,41 @@ struct ircd::db::database::env::state { struct task; + struct pool; - /// Backreference to database - database &d; - - /// Convenience alias of the number of pools static constexpr const size_t POOLS { rocksdb::Env::Priority::TOTAL }; - static conf::item pool_stack_size; + database &d; + std::array, POOLS> pool; - std::array, POOLS> tasks; - std::array pool; - - state(database *const &d); + state(database *const &); ~state() noexcept; }; +struct ircd::db::database::env::state::pool +{ + using Priority = rocksdb::Env::Priority; + + static conf::item stack_size; + + database &d; + Priority pri; + std::deque tasks; + ctx::pool p; + + size_t cancel(void *const &tag); + void operator()(task &&); + + void wait(); + void join(); + + pool(database &, const Priority &); + ~pool() noexcept; +}; + struct ircd::db::database::env::state::task { void (*func)(void *arg); diff --git a/ircd/db.cc b/ircd/db.cc index d07ea4a2d..d2f45ee5d 100644 --- a/ircd/db.cc +++ b/ircd/db.cc @@ -3415,6 +3415,11 @@ st{std::make_unique(d)} ircd::db::database::env::~env() noexcept { + log::debug + { + log, "'%s': Shutting down environment...", + d.name + }; } rocksdb::Status @@ -4181,47 +4186,13 @@ noexcept try assert(st); auto &pool { - st->pool.at(prio) + *st->pool.at(prio) }; - auto &tasks - { - st->tasks.at(prio) - }; - - tasks.emplace_back(state::task + pool(state::task { f, u, a }); - - pool([this, &tasks] - { - const ctx::uninterruptible::nothrow ui; - - assert(this->st); - if(tasks.empty()) - return; - - const auto task - { - std::move(tasks.front()) - }; - - tasks.pop_front(); - - #ifdef RB_DEBUG_DB_ENV - log::debug - { - log, "'%s': func:%p arg:%p", - this->d.name, - task.func, - task.arg, - }; - #endif - - // Execute the task - task.func(task.arg); - }); } catch(const std::exception &e) { @@ -4255,16 +4226,12 @@ noexcept try #endif assert(st); - auto &tasks + auto &pool { - st->tasks.at(prio) + *st->pool.at(prio) }; - size_t i(0); - for(auto it(begin(tasks)); it != end(tasks); it = tasks.erase(it), ++i) - it->cancel(it->arg); - - return i; + return pool.cancel(tag); } catch(const std::exception &e) { @@ -4319,7 +4286,8 @@ noexcept try assert(st); for(auto &pool : st->pool) - pool.join(); + if(pool) + pool->join(); } catch(const std::exception &e) { @@ -4349,10 +4317,10 @@ const noexcept try assert(st); const auto &pool { - st->pool.at(prio) + *st->pool.at(prio) }; - return pool.queued(); + return pool.tasks.size(); } catch(const std::exception &e) { @@ -4374,35 +4342,29 @@ noexcept try #ifdef RB_DEBUG_DB_ENV log::debug { - log, "'%s': set background threads num:%d prio:%s", + log, "'%s': set background threads prio:%s num:%d", d.name, - num, - reflect(prio) + reflect(prio), + num }; #endif assert(st); auto &pool { - st->pool.at(prio) + *st->pool.at(prio) }; - const auto &size - { - ssize_t(pool.size()) - }; - - if(size > num) - pool.del(size - num); - else if(size < num) - pool.add(num - size); + pool.p.set(num); } catch(const std::exception &e) { log::critical { - log, "'%s': set background threads :%s", + log, "'%s': set background threads prio:%s num:%d :%s", d.name, + reflect(prio), + num, e.what() }; } @@ -4427,10 +4389,10 @@ noexcept try assert(st); auto &pool { - st->pool.at(prio) + *st->pool.at(prio) }; - pool.add(num); + pool.p.add(num); } catch(const std::exception &e) { @@ -4577,10 +4539,10 @@ noexcept try assert(st); const auto &pool { - st->pool.at(prio) + *st->pool.at(prio) }; - return pool.size(); + return pool.p.size(); } catch(const std::exception &e) { @@ -7353,74 +7315,171 @@ noexcept // db/database/env/state.h // -decltype(ircd::db::database::env::state::pool_stack_size) -ircd::db::database::env::state::pool_stack_size -{ - { "name", "ircd.db.env.pool.stack_size" }, - { "default", long(128_KiB) }, -}; - // // env::state::state // ircd::db::database::env::state::state(database *const &d) :d{*d} -,pool -{{ - // name of pool stack size initial workers - { "rdb bott", size_t(pool_stack_size), 0 }, - { "rdb low", size_t(pool_stack_size), 0 }, - { "rdb high", size_t(pool_stack_size), 0 }, -}} { + for(size_t i(0); i < pool.size(); ++i) + pool.at(i) = std::make_unique(this->d, Priority(i)); } ircd::db::database::env::state::~state() noexcept { - for(size_t i(0); i < POOLS; ++i) try - { - auto &tasks(this->tasks.at(i)); - auto &pool(this->pool.at(i)); - if(tasks.size() || pool.pending()) - log::warning - { - log, "'%s': Waiting for tasks:%zu queued:%zu active:%zu in pool '%s'", - d.name, - tasks.size(), - pool.queued(), - pool.active(), - pool.name - }; +} - pool.q.dock.wait([&tasks, &pool] +// +// state::pool +// + +decltype(ircd::db::database::env::state::pool::stack_size) +ircd::db::database::env::state::pool::stack_size +{ + { "name", "ircd.db.env.pool.stack_size" }, + { "default", long(128_KiB) }, +}; + +// +// state::pool::pool +// + +ircd::db::database::env::state::pool::pool(database &d, + const Priority &pri) +:d{d} +,pri{pri} +,p +{ + reflect(pri), // name of pool + size_t(stack_size), // stack size of worker + 0, // initial workers +} +{ +} + +ircd::db::database::env::state::pool::~pool() +noexcept +{ + join(); +} + +void +ircd::db::database::env::state::pool::join() +try +{ + if(!tasks.empty() || p.pending()) + log::warning { - return tasks.empty() && !pool.pending(); - }); + log, "'%s': Waiting for tasks:%zu queued:%zu active:%zu in pool '%s'", + d.name, + tasks.size(), + p.queued(), + p.active(), + ctx::name(p) + }; - pool.terminate(); - pool.join(); + this->wait(); + assert(!p.pending()); + assert(tasks.empty()); + p.join(); + + log::debug + { + log, "'%s': Terminated pool '%s'.", + d.name, + ctx::name(p) + }; +} +catch(const std::exception &e) +{ + log::critical + { + log, "'%s': Environment pool '%s' join :%s", + d.name, + ctx::name(p), + e.what() + }; + + throw; +} + +void +ircd::db::database::env::state::pool::wait() +{ + ctx::dock &dock(p); + dock.wait([this] + { + return tasks.empty() && !p.pending(); + }); +} + +void +ircd::db::database::env::state::pool::operator()(task &&task) +{ + tasks.emplace_back(std::move(task)); + p([this] + { + if(tasks.empty()) + return; + + const ctx::uninterruptible::nothrow ui; + const auto task{std::move(tasks.front())}; + tasks.pop_front(); - assert(tasks.empty()); - assert(!pool.queued()); - assert(!pool.pending()); log::debug { - log, "'%s': Terminated pool '%s'.", - d.name, - pool.name + log, "'%s': pool:%s tasks:%zu starting func:%p arg:%p", + this->d.name, + ctx::name(p), + tasks.size(), + task.func, + task.arg, }; - } - catch(const std::exception &e) - { - log::critical + + // Execute the task + task.func(task.arg); + + log::debug { - log, "'%s': Environment state shutdown :%s", - d.name, - e.what() + log, "'%s': pool:%s tasks:%zu task finished func:%p arg:%p", + this->d.name, + ctx::name(p), + tasks.size(), + task.func, + task.arg, }; + }); +} + +size_t +ircd::db::database::env::state::pool::cancel(void *const &tag) +{ + size_t i(0); + auto it(begin(tasks)); + while(it != end(tasks)) + { + auto &task(*it); + log::debug + { + log, "'%s': pool:%s tasks:%zu cancel %zu func:%p cancel:%p arg:%p tag:%p", + d.name, + ctx::name(p), + tasks.size(), + i, + task.func, + task.cancel, + task.arg, + tag + }; + + task.cancel(task.arg); + it = tasks.erase(it); + ++i; } + + return i; } ///////////////////////////////////////////////////////////////////////////////