// Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors // Copyright (C) 2016-2018 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. #pragma once #define HAVE_IRCD_CTX_PARALLEL_H namespace ircd::ctx { template class parallel; } template struct ircd::ctx::parallel { using closure = std::function; pool *p {nullptr}; vector_view a; closure c; dock d; std::exception_ptr eptr; size_t snd {0}; size_t rcv {0}; size_t out {0}; public: void wait_avail(); void wait_done(); void operator()(const arg &a); parallel(pool &, vector_view, closure); ~parallel() noexcept; }; template ircd::ctx::parallel::parallel(pool &p, vector_view a, closure c) :p{&p} ,a{std::move(a)} ,c{std::move(c)} { p.min(this->a.size()); } template ircd::ctx::parallel::~parallel() noexcept { const uninterruptible::nothrow ui; wait_done(); } template void ircd::ctx::parallel::operator()(const arg &a) { wait_avail(); if(this->eptr) std::rethrow_exception(this->eptr); auto &p(*this->p); this->a.at(snd++ % this->a.size()) = a; out++; p([this]() mutable { auto &a { this->a.at(rcv++ % this->a.size()) }; if(!this->eptr) try { c(a); } catch(...) { this->eptr = std::current_exception(); } out--; d.notify_one(); }); } template void ircd::ctx::parallel::wait_avail() { d.wait([this] { assert(snd >= rcv); return out < a.size(); }); } template void ircd::ctx::parallel::wait_done() { d.wait([this] { return !out; }); }