From 3d79c94bb24ada0bbc167c99e07d71ed7320a875 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 24 Dec 2018 17:58:57 -0800 Subject: [PATCH] ircd::ctx: Add ctx::parallel device. --- include/ircd/ctx/ctx.h | 1 + include/ircd/ctx/parallel.h | 114 ++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 include/ircd/ctx/parallel.h diff --git a/include/ircd/ctx/ctx.h b/include/ircd/ctx/ctx.h index 79746ea6d..b56fefbbe 100644 --- a/include/ircd/ctx/ctx.h +++ b/include/ircd/ctx/ctx.h @@ -96,6 +96,7 @@ namespace ircd::ctx #include "pool.h" #include "ole.h" #include "fault.h" +#include "parallel.h" // Exports to ircd:: namespace ircd diff --git a/include/ircd/ctx/parallel.h b/include/ircd/ctx/parallel.h new file mode 100644 index 000000000..4d3704e9a --- /dev/null +++ b/include/ircd/ctx/parallel.h @@ -0,0 +1,114 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#pragma once +#define HAVE_IRCD_CTX_PARALLEL_H + +namespace ircd::ctx +{ + template class parallel; +} + +template +struct ircd::ctx::parallel +{ + using closure = std::function; + + pool *p {nullptr}; + vector_view a; + closure c; + dock d; + std::exception_ptr eptr; + size_t snd {0}; + size_t rcv {0}; + size_t out {0}; + + public: + void wait_avail(); + void wait_done(); + + void operator()(const arg &a); + + parallel(pool &, vector_view, closure); + ~parallel() noexcept; +}; + +template +ircd::ctx::parallel::parallel(pool &p, + vector_view a, + closure c) +:p{&p} +,a{std::move(a)} +,c{std::move(c)} +{ + p.min(this->a.size()); +} + +template +ircd::ctx::parallel::~parallel() +noexcept +{ + const uninterruptible::nothrow ui; + wait_done(); +} + +template +void +ircd::ctx::parallel::operator()(const arg &a) +{ + wait_avail(); + if(this->eptr) + std::rethrow_exception(this->eptr); + + auto &p(*this->p); + this->a.at(snd++ % this->a.size()) = a; + out++; + p([this]() + mutable + { + auto &a + { + this->a.at(rcv++ % this->a.size()) + }; + + if(!this->eptr) try + { + c(a); + } + catch(...) + { + this->eptr = std::current_exception(); + } + + out--; + d.notify_one(); + }); +} + +template +void +ircd::ctx::parallel::wait_avail() +{ + d.wait([this] + { + assert(snd >= rcv); + return out < a.size(); + }); +} + +template +void +ircd::ctx::parallel::wait_done() +{ + d.wait([this] + { + return !out; + }); +}