// Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors // Copyright (C) 2016-2019 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. #include "ctx.h" namespace ircd::ctx::ole { static const opts default_opts; extern conf::item thread_max; static std::mutex mutex; static std::condition_variable cond; extern std::deque queue; static ssize_t working; extern std::vector threads; static bool termination alignas(64); static offload::function pop(); static void push(offload::function &&); static void worker_remove(); static void worker() noexcept; } decltype(ircd::ctx::ole::thread_max) ircd::ctx::ole::thread_max { { "name", "ircd.ctx.ole.thread.max" }, { "default", int64_t(1) }, }; [[gnu::visibility("internal"), clang::always_destroy]] decltype(ircd::ctx::ole::queue) ircd::ctx::ole::queue; [[gnu::visibility("internal"), clang::always_destroy]] decltype(ircd::ctx::ole::threads) ircd::ctx::ole::threads; ircd::ctx::ole::init::init() { assert(threads.empty()); termination = false; } [[gnu::cold]] ircd::ctx::ole::init::~init() noexcept { std::unique_lock lock { mutex }; termination = true; cond.notify_all(); cond.wait(lock, [] { return threads.empty(); }); } // // ole::offload // ircd::ctx::ole::offload::offload(const function &func) :offload{default_opts, func} { } ircd::ctx::ole::offload::offload(const opts &opts, const function &func) { assert(current); assert(opts.concurrency == 1); // not yet implemented // Prepare the offload package on our stack here. These objects will // remain here for the duration of the offload. latch latch{1}; std::exception_ptr eptr; auto *const context(current); auto closure{[&func, &latch, &eptr, &context] () noexcept { try { func(); } catch(...) { // Note that the write to eptr is taking place on a different // thread from where we created the eptr. eptr = std::current_exception(); } // The ctx::signal() is a special device which executes the closure // as soon as the target context is not currently running on any // thread. This has the ability to provide the cross-thread // synchronization we need to hit the latch from this thread. assert(context); signal(*context, [&latch] () noexcept { assert(!latch.is_ready()); latch.count_down(); }); }}; // interrupt(ctx) is suppressed while this context has offloaded some work // to another thread. This context must stay right here and not disappear // until the other thread signals back. Note that the destructor is // capable of throwing an interrupt that was received during this scope. const uninterruptible uninterruptible; ole::push(std::move(closure)); // scope address required for clang-7 latch.wait(); // Don't throw any exception if there is a pending interrupt for this ctx. // Two exceptions will be thrown in that case and if there's an interrupt // we don't care about eptr anyway. if(likely(!interruption_requested())) if(unlikely(eptr)) std::rethrow_exception(eptr); } void ircd::ctx::ole::push(offload::function &&func) { const std::lock_guard lock { mutex }; assert(working >= 0); const bool need_thread { threads.empty() || threads.size() == size_t(working) }; const bool add_thread { need_thread && threads.size() < size_t(thread_max) }; if(unlikely(add_thread)) { ++working; // pre-increment under lock here const posix::enable_pthread enable_pthread; threads.emplace_back(&worker); } queue.emplace_back(std::move(func)); cond.notify_all(); } void ircd::ctx::ole::worker() noexcept { while(!termination) try { const auto func { pop() }; func(); } catch(const interrupted &) { break; } catch(const std::exception &e) { assert(false); continue; } worker_remove(); } void ircd::ctx::ole::worker_remove() { const std::lock_guard lock { mutex }; const auto it { std::find_if(begin(threads), end(threads), [] (const auto &thread) { return thread.get_id() == std::this_thread::get_id(); }) }; assert(it != end(threads)); auto &this_thread(*it); this_thread.detach(); threads.erase(it); cond.notify_all(); } ircd::ctx::ole::offload::function ircd::ctx::ole::pop() { std::unique_lock lock { mutex }; --working; assert(working >= 0); cond.wait(lock, [] { return !queue.empty() || termination; }); if(unlikely(termination)) throw interrupted{}; auto function { std::move(queue.front()) }; queue.pop_front(); ++working; assert(working > 0); return function; }