construct/ircd/ctx_ole.cc

243 lines
4.8 KiB
C++

// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2019 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#include "ctx.h"
namespace ircd::ctx::ole
{
static const opts default_opts;
extern conf::item<size_t> thread_max;
static std::mutex mutex;
static std::condition_variable cond;
extern std::deque<offload::function> queue;
static ssize_t working;
extern std::vector<std::thread> threads;
static bool termination alignas(64);
static offload::function pop();
static void push(offload::function &&);
static void worker_remove();
static void worker() noexcept;
}
decltype(ircd::ctx::ole::thread_max)
ircd::ctx::ole::thread_max
{
{ "name", "ircd.ctx.ole.thread.max" },
{ "default", int64_t(1) },
};
[[gnu::visibility("internal"), clang::always_destroy]]
decltype(ircd::ctx::ole::queue)
ircd::ctx::ole::queue;
[[gnu::visibility("internal"), clang::always_destroy]]
decltype(ircd::ctx::ole::threads)
ircd::ctx::ole::threads;
ircd::ctx::ole::init::init()
{
assert(threads.empty());
termination = false;
}
[[gnu::cold]]
ircd::ctx::ole::init::~init()
noexcept
{
std::unique_lock lock
{
mutex
};
termination = true;
cond.notify_all();
cond.wait(lock, []
{
return threads.empty();
});
}
//
// ole::offload
//
ircd::ctx::ole::offload::offload(const function &func)
:offload{default_opts, func}
{
}
ircd::ctx::ole::offload::offload(const opts &opts,
const function &func)
{
assert(current);
assert(opts.concurrency == 1); // not yet implemented
// Prepare the offload package on our stack here. These objects will
// remain here for the duration of the offload.
latch latch{1};
std::exception_ptr eptr;
auto *const context(current);
auto closure{[&func, &latch, &eptr, &context]
() noexcept
{
try
{
func();
}
catch(...)
{
// Note that the write to eptr is taking place on a different
// thread from where we created the eptr.
eptr = std::current_exception();
}
// The ctx::signal() is a special device which executes the closure
// as soon as the target context is not currently running on any
// thread. This has the ability to provide the cross-thread
// synchronization we need to hit the latch from this thread.
assert(context);
signal(*context, [&latch]
() noexcept
{
assert(!latch.is_ready());
latch.count_down();
});
}};
// interrupt(ctx) is suppressed while this context has offloaded some work
// to another thread. This context must stay right here and not disappear
// until the other thread signals back. Note that the destructor is
// capable of throwing an interrupt that was received during this scope.
const uninterruptible uninterruptible;
ole::push(std::move(closure)); // scope address required for clang-7
latch.wait();
// Don't throw any exception if there is a pending interrupt for this ctx.
// Two exceptions will be thrown in that case and if there's an interrupt
// we don't care about eptr anyway.
if(likely(!interruption_requested()))
if(unlikely(eptr))
std::rethrow_exception(eptr);
}
void
ircd::ctx::ole::push(offload::function &&func)
{
const std::lock_guard lock
{
mutex
};
assert(working >= 0);
const bool need_thread
{
threads.empty()
|| threads.size() == size_t(working)
};
const bool add_thread
{
need_thread
&& threads.size() < size_t(thread_max)
};
if(unlikely(add_thread))
{
++working; // pre-increment under lock here
const posix::enable_pthread enable_pthread;
threads.emplace_back(&worker);
}
queue.emplace_back(std::move(func));
cond.notify_all();
}
void
ircd::ctx::ole::worker()
noexcept
{
while(!termination) try
{
const auto func
{
pop()
};
func();
}
catch(const interrupted &)
{
break;
}
catch(const std::exception &e)
{
assert(false);
continue;
}
worker_remove();
}
void
ircd::ctx::ole::worker_remove()
{
const std::lock_guard lock
{
mutex
};
const auto it
{
std::find_if(begin(threads), end(threads), []
(const auto &thread)
{
return thread.get_id() == std::this_thread::get_id();
})
};
assert(it != end(threads));
auto &this_thread(*it);
this_thread.detach();
threads.erase(it);
cond.notify_all();
}
ircd::ctx::ole::offload::function
ircd::ctx::ole::pop()
{
std::unique_lock lock
{
mutex
};
--working;
assert(working >= 0);
cond.wait(lock, []
{
return !queue.empty() || termination;
});
if(unlikely(termination))
throw interrupted{};
auto function
{
std::move(queue.front())
};
queue.pop_front();
++working;
assert(working > 0);
return function;
}