0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-13 08:23:56 +01:00

ircd::ctx::ole: Move opts outside offload; reorg; minor cleanup.

This commit is contained in:
Jason Volk 2019-09-03 11:09:32 -07:00
parent 2579313bad
commit ebfaa508fd
2 changed files with 58 additions and 26 deletions

View file

@ -20,6 +20,7 @@
namespace ircd::ctx::ole
{
struct init;
struct opts;
struct offload;
}
@ -30,13 +31,13 @@ namespace ircd::ctx
struct ircd::ctx::offload
{
struct opts;
using closure = std::function<void ()>;
using function = std::function<void ()>;
offload(const opts &, const closure &);
offload(const opts &, const function &);
offload(const function &);
};
struct ircd::ctx::ole::offload::opts
struct ircd::ctx::ole::opts
{
/// Optionally give this offload task a name for any tasklist.
string_view name;

View file

@ -14,16 +14,15 @@
namespace ircd::ctx::ole
{
extern conf::item<size_t> thread_max;
std::mutex mutex;
std::condition_variable cond;
bool termination;
std::deque<offload::closure> queue;
std::deque<offload::function> queue;
std::vector<std::thread> threads;
bool termination;
offload::closure pop();
void push(offload::closure &&);
void worker() noexcept;
static offload::function pop();
static void push(offload::function &&);
static void worker() noexcept;
}
decltype(ircd::ctx::ole::thread_max)
@ -42,7 +41,11 @@ ircd::ctx::ole::init::init()
ircd::ctx::ole::init::~init()
noexcept
{
std::unique_lock lock(mutex);
std::unique_lock lock
{
mutex
};
termination = true;
cond.notify_all();
cond.wait(lock, []
@ -51,8 +54,13 @@ noexcept
});
}
ircd::ctx::ole::offload::offload(const function &func)
:offload{opts{}, func}
{
}
ircd::ctx::ole::offload::offload(const opts &opts,
const closure &func)
const function &func)
{
assert(current);
assert(opts.concurrency == 1); // not yet implemented
@ -62,7 +70,7 @@ ircd::ctx::ole::offload::offload(const opts &opts,
latch latch{1};
std::exception_ptr eptr;
auto *const context(current);
auto closure([&func, &latch, &eptr, &context]
auto closure{[&func, &latch, &eptr, &context]
() noexcept
{
try
@ -86,7 +94,7 @@ ircd::ctx::ole::offload::offload(const opts &opts,
assert(!latch.is_ready());
latch.count_down();
});
});
}};
// interrupt(ctx) is suppressed while this context has offloaded some work
// to another thread. This context must stay right here and not disappear
@ -105,12 +113,16 @@ ircd::ctx::ole::offload::offload(const opts &opts,
std::rethrow_exception(eptr);
}
void
ircd::ctx::ole::push(offload::closure &&func)
ircd::ctx::ole::push(offload::function &&func)
{
if(unlikely(threads.size() < size_t(thread_max)))
threads.emplace_back(&worker);
const std::lock_guard lock(mutex);
const std::lock_guard lock
{
mutex
};
queue.emplace_back(std::move(func));
cond.notify_all();
}
@ -121,18 +133,29 @@ noexcept try
{
while(1)
{
const auto func(pop());
const auto func
{
pop()
};
func();
}
}
catch(const interrupted &)
{
std::unique_lock lock(mutex);
const auto it(std::find_if(begin(threads), end(threads), []
(const auto &thread)
const std::lock_guard lock
{
return thread.get_id() == std::this_thread::get_id();
}));
mutex
};
const auto it
{
std::find_if(begin(threads), end(threads), []
(const auto &thread)
{
return thread.get_id() == std::this_thread::get_id();
})
};
assert(it != end(threads));
auto &this_thread(*it);
@ -141,10 +164,14 @@ catch(const interrupted &)
cond.notify_all();
}
ircd::ctx::ole::offload::closure
ircd::ctx::ole::offload::function
ircd::ctx::ole::pop()
{
std::unique_lock lock(mutex);
std::unique_lock lock
{
mutex
};
cond.wait(lock, []
{
if(!queue.empty())
@ -156,7 +183,11 @@ ircd::ctx::ole::pop()
return false;
});
auto c(std::move(queue.front()));
auto function
{
std::move(queue.front())
};
queue.pop_front();
return c;
return function;
}