From d19e960c586168a3f964c61fe01a6846263629bc Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 11 Jul 2019 14:07:12 -0700 Subject: [PATCH] ircd: Split ctx::ole to definition file. --- ircd/Makefile.am | 1 + ircd/ctx.cc | 151 -------------------------------------------- ircd/ctx_ole.cc | 158 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 151 deletions(-) create mode 100644 ircd/ctx_ole.cc diff --git a/ircd/Makefile.am b/ircd/Makefile.am index 516486c7b..7c2dea320 100644 --- a/ircd/Makefile.am +++ b/ircd/Makefile.am @@ -148,6 +148,7 @@ libircd_la_SOURCES += prof.cc libircd_la_SOURCES += fs.cc libircd_la_SOURCES += ios.cc libircd_la_SOURCES += ctx.cc +libircd_la_SOURCES += ctx_ole.cc if AIO libircd_la_SOURCES += fs_aio.cc endif diff --git a/ircd/ctx.cc b/ircd/ctx.cc index 58591ab7a..2ba3bf39f 100644 --- a/ircd/ctx.cc +++ b/ircd/ctx.cc @@ -1912,157 +1912,6 @@ ircd::ctx::prof::reflect(const event &e) return "?????"; } -/////////////////////////////////////////////////////////////////////////////// -// -// ctx_ole.h -// - -namespace ircd::ctx::ole -{ - using closure = std::function; - - extern conf::item thread_max; - - std::mutex mutex; - std::condition_variable cond; - bool termination; - std::deque queue; - std::vector threads; - - closure pop(); - void push(closure &&); - void worker() noexcept; -} - -decltype(ircd::ctx::ole::thread_max) -ircd::ctx::ole::thread_max -{ - { "name", "ircd.ctx.ole.thread.max" }, - { "default", int64_t(1) }, -}; - -ircd::ctx::ole::init::init() -{ - assert(threads.empty()); - termination = false; -} - -ircd::ctx::ole::init::~init() -noexcept -{ - std::unique_lock lock(mutex); - termination = true; - cond.notify_all(); - cond.wait(lock, [] - { - return threads.empty(); - }); -} - -void -ircd::ctx::ole::offload(const std::function &func) -{ - bool done(false); - auto *const context(current); - const auto kick([&context, &done] - { - done = true; - notify(*context); - }); - - std::exception_ptr eptr; - auto closure([&func, &eptr, &context, &kick] - () noexcept - { - try - { - func(); - } - catch(...) - { - eptr = std::current_exception(); - } - - // To wake the context on the IRCd thread we give it the kick - signal(*context, kick); - }); - - // interrupt(ctx) is suppressed while this context has offloaded some work - // to another thread. This context must stay right here and not disappear - // until the other thread signals back. Note that the destructor is - // capable of throwing an interrupt that was received during this scope. - const uninterruptible uninterruptible; - - push(std::move(closure)); do - { - wait(); - } - while(!done); - - // Don't throw any exception if there is a pending interrupt for this ctx. - // Two exceptions will be thrown in that case and if there's an interrupt - // we don't care about eptr anyway. - if(eptr && likely(!interruption_requested())) - std::rethrow_exception(eptr); -} - -void -ircd::ctx::ole::push(closure &&func) -{ - if(unlikely(threads.size() < size_t(thread_max))) - threads.emplace_back(&worker); - - const std::lock_guard lock(mutex); - queue.emplace_back(std::move(func)); - cond.notify_all(); -} - -void -ircd::ctx::ole::worker() -noexcept try -{ - while(1) - { - const auto func(pop()); - func(); - } -} -catch(const interrupted &) -{ - std::unique_lock lock(mutex); - const auto it(std::find_if(begin(threads), end(threads), [] - (const auto &thread) - { - return thread.get_id() == std::this_thread::get_id(); - })); - - assert(it != end(threads)); - auto &this_thread(*it); - this_thread.detach(); - threads.erase(it); - cond.notify_all(); -} - -ircd::ctx::ole::closure -ircd::ctx::ole::pop() -{ - std::unique_lock lock(mutex); - cond.wait(lock, [] - { - if(!queue.empty()) - return true; - - if(unlikely(termination)) - throw interrupted{}; - - return false; - }); - - auto c(std::move(queue.front())); - queue.pop_front(); - return c; -} - /////////////////////////////////////////////////////////////////////////////// // // ctx/promise.h diff --git a/ircd/ctx_ole.cc b/ircd/ctx_ole.cc new file mode 100644 index 000000000..11ecda832 --- /dev/null +++ b/ircd/ctx_ole.cc @@ -0,0 +1,158 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2019 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#include +#include "ctx.h" + +namespace ircd::ctx::ole +{ + using closure = std::function; + + extern conf::item thread_max; + + std::mutex mutex; + std::condition_variable cond; + bool termination; + std::deque queue; + std::vector threads; + + closure pop(); + void push(closure &&); + void worker() noexcept; +} + +decltype(ircd::ctx::ole::thread_max) +ircd::ctx::ole::thread_max +{ + { "name", "ircd.ctx.ole.thread.max" }, + { "default", int64_t(1) }, +}; + +ircd::ctx::ole::init::init() +{ + assert(threads.empty()); + termination = false; +} + +ircd::ctx::ole::init::~init() +noexcept +{ + std::unique_lock lock(mutex); + termination = true; + cond.notify_all(); + cond.wait(lock, [] + { + return threads.empty(); + }); +} + +void +ircd::ctx::ole::offload(const std::function &func) +{ + bool done(false); + auto *const context(current); + const auto kick([&context, &done] + { + done = true; + notify(*context); + }); + + std::exception_ptr eptr; + auto closure([&func, &eptr, &context, &kick] + () noexcept + { + try + { + func(); + } + catch(...) + { + eptr = std::current_exception(); + } + + // To wake the context on the IRCd thread we give it the kick + signal(*context, kick); + }); + + // interrupt(ctx) is suppressed while this context has offloaded some work + // to another thread. This context must stay right here and not disappear + // until the other thread signals back. Note that the destructor is + // capable of throwing an interrupt that was received during this scope. + const uninterruptible uninterruptible; + + push(std::move(closure)); do + { + wait(); + } + while(!done); + + // Don't throw any exception if there is a pending interrupt for this ctx. + // Two exceptions will be thrown in that case and if there's an interrupt + // we don't care about eptr anyway. + if(eptr && likely(!interruption_requested())) + std::rethrow_exception(eptr); +} + +void +ircd::ctx::ole::push(closure &&func) +{ + if(unlikely(threads.size() < size_t(thread_max))) + threads.emplace_back(&worker); + + const std::lock_guard lock(mutex); + queue.emplace_back(std::move(func)); + cond.notify_all(); +} + +void +ircd::ctx::ole::worker() +noexcept try +{ + while(1) + { + const auto func(pop()); + func(); + } +} +catch(const interrupted &) +{ + std::unique_lock lock(mutex); + const auto it(std::find_if(begin(threads), end(threads), [] + (const auto &thread) + { + return thread.get_id() == std::this_thread::get_id(); + })); + + assert(it != end(threads)); + auto &this_thread(*it); + this_thread.detach(); + threads.erase(it); + cond.notify_all(); +} + +ircd::ctx::ole::closure +ircd::ctx::ole::pop() +{ + std::unique_lock lock(mutex); + cond.wait(lock, [] + { + if(!queue.empty()) + return true; + + if(unlikely(termination)) + throw interrupted{}; + + return false; + }); + + auto c(std::move(queue.front())); + queue.pop_front(); + return c; +}