mirror of
https://github.com/matrix-construct/construct
synced 2024-11-02 11:58:53 +01:00
193 lines
4 KiB
C++
193 lines
4 KiB
C++
// Matrix Construct
|
|
//
|
|
// Copyright (C) Matrix Construct Developers, Authors & Contributors
|
|
// Copyright (C) 2016-2019 Jason Volk <jason@zemos.net>
|
|
//
|
|
// Permission to use, copy, modify, and/or distribute this software for any
|
|
// purpose with or without fee is hereby granted, provided that the above
|
|
// copyright notice and this permission notice is present in all copies. The
|
|
// full license for this software is available in the LICENSE file.
|
|
|
|
#include <ircd/asio.h>
|
|
#include "ctx.h"
|
|
|
|
namespace ircd::ctx::ole
|
|
{
|
|
extern conf::item<size_t> thread_max;
|
|
std::mutex mutex;
|
|
std::condition_variable cond;
|
|
std::deque<offload::function> queue;
|
|
std::vector<std::thread> threads;
|
|
bool termination;
|
|
|
|
static offload::function pop();
|
|
static void push(offload::function &&);
|
|
static void worker() noexcept;
|
|
}
|
|
|
|
decltype(ircd::ctx::ole::thread_max)
|
|
ircd::ctx::ole::thread_max
|
|
{
|
|
{ "name", "ircd.ctx.ole.thread.max" },
|
|
{ "default", int64_t(1) },
|
|
};
|
|
|
|
ircd::ctx::ole::init::init()
|
|
{
|
|
assert(threads.empty());
|
|
termination = false;
|
|
}
|
|
|
|
ircd::ctx::ole::init::~init()
|
|
noexcept
|
|
{
|
|
std::unique_lock lock
|
|
{
|
|
mutex
|
|
};
|
|
|
|
termination = true;
|
|
cond.notify_all();
|
|
cond.wait(lock, []
|
|
{
|
|
return threads.empty();
|
|
});
|
|
}
|
|
|
|
ircd::ctx::ole::offload::offload(const function &func)
|
|
:offload{opts{}, func}
|
|
{
|
|
}
|
|
|
|
ircd::ctx::ole::offload::offload(const opts &opts,
|
|
const function &func)
|
|
{
|
|
assert(current);
|
|
assert(opts.concurrency == 1); // not yet implemented
|
|
|
|
// Prepare the offload package on our stack here. These objects will
|
|
// remain here for the duration of the offload.
|
|
latch latch{1};
|
|
std::exception_ptr eptr;
|
|
auto *const context(current);
|
|
auto closure{[&func, &latch, &eptr, &context]
|
|
() noexcept
|
|
{
|
|
try
|
|
{
|
|
func();
|
|
}
|
|
catch(...)
|
|
{
|
|
// Note that the write to eptr is taking place on a different
|
|
// thread from where we created the eptr.
|
|
eptr = std::current_exception();
|
|
}
|
|
|
|
// The ctx::signal() is a special device which executes the closure
|
|
// as soon as the target context is not currently running on any
|
|
// thread. This has the ability to provide the cross-thread
|
|
// synchronization we need to hit the latch from this thread.
|
|
assert(context);
|
|
signal(*context, [&latch]
|
|
{
|
|
assert(!latch.is_ready());
|
|
latch.count_down();
|
|
});
|
|
}};
|
|
|
|
// interrupt(ctx) is suppressed while this context has offloaded some work
|
|
// to another thread. This context must stay right here and not disappear
|
|
// until the other thread signals back. Note that the destructor is
|
|
// capable of throwing an interrupt that was received during this scope.
|
|
const uninterruptible uninterruptible;
|
|
|
|
ole::push(std::move(closure)); // scope address required for clang-7
|
|
latch.wait();
|
|
|
|
// Don't throw any exception if there is a pending interrupt for this ctx.
|
|
// Two exceptions will be thrown in that case and if there's an interrupt
|
|
// we don't care about eptr anyway.
|
|
if(likely(!interruption_requested()))
|
|
if(unlikely(eptr))
|
|
std::rethrow_exception(eptr);
|
|
}
|
|
void
|
|
ircd::ctx::ole::push(offload::function &&func)
|
|
{
|
|
if(unlikely(threads.size() < size_t(thread_max)))
|
|
threads.emplace_back(&worker);
|
|
|
|
const std::lock_guard lock
|
|
{
|
|
mutex
|
|
};
|
|
|
|
queue.emplace_back(std::move(func));
|
|
cond.notify_all();
|
|
}
|
|
|
|
void
|
|
ircd::ctx::ole::worker()
|
|
noexcept try
|
|
{
|
|
while(1)
|
|
{
|
|
const auto func
|
|
{
|
|
pop()
|
|
};
|
|
|
|
func();
|
|
}
|
|
}
|
|
catch(const interrupted &)
|
|
{
|
|
const std::lock_guard lock
|
|
{
|
|
mutex
|
|
};
|
|
|
|
const auto it
|
|
{
|
|
std::find_if(begin(threads), end(threads), []
|
|
(const auto &thread)
|
|
{
|
|
return thread.get_id() == std::this_thread::get_id();
|
|
})
|
|
};
|
|
|
|
assert(it != end(threads));
|
|
auto &this_thread(*it);
|
|
this_thread.detach();
|
|
threads.erase(it);
|
|
cond.notify_all();
|
|
}
|
|
|
|
ircd::ctx::ole::offload::function
|
|
ircd::ctx::ole::pop()
|
|
{
|
|
std::unique_lock lock
|
|
{
|
|
mutex
|
|
};
|
|
|
|
cond.wait(lock, []
|
|
{
|
|
if(!queue.empty())
|
|
return true;
|
|
|
|
if(unlikely(termination))
|
|
throw interrupted{};
|
|
|
|
return false;
|
|
});
|
|
|
|
auto function
|
|
{
|
|
std::move(queue.front())
|
|
};
|
|
|
|
queue.pop_front();
|
|
return function;
|
|
}
|