From c7a68a8cb3d51d2f33583c518fe592bc8bd4e1e4 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 12 Jul 2019 15:18:26 -0700 Subject: [PATCH] ircd::ctx: Fix ctx::parallel argument allocation assumptions. ircd::ctx: Rename tool. --- include/ircd/ctx/{parallel.h => concurrent.h} | 95 +++++++++---------- include/ircd/ctx/ctx.h | 2 +- modules/client/sync/presence.cc | 19 ++-- modules/client/sync/rooms/state.cc | 16 ++-- 4 files changed, 66 insertions(+), 66 deletions(-) rename include/ircd/ctx/{parallel.h => concurrent.h} (56%) diff --git a/include/ircd/ctx/parallel.h b/include/ircd/ctx/concurrent.h similarity index 56% rename from include/ircd/ctx/parallel.h rename to include/ircd/ctx/concurrent.h index f18cd6999..dce244430 100644 --- a/include/ircd/ctx/parallel.h +++ b/include/ircd/ctx/concurrent.h @@ -1,7 +1,7 @@ // Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors -// Copyright (C) 2016-2018 Jason Volk +// Copyright (C) 2016-2019 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above @@ -9,20 +9,21 @@ // full license for this software is available in the LICENSE file. #pragma once -#define HAVE_IRCD_CTX_PARALLEL_H +#define HAVE_IRCD_CTX_CONCURRENT_H namespace ircd::ctx { - template class parallel; + template class concurrent; } template -struct ircd::ctx::parallel +struct ircd::ctx::concurrent { using closure = std::function; pool *p {nullptr}; vector_view a; + std::vector b; closure c; dock d; std::exception_ptr eptr; @@ -35,34 +36,34 @@ struct ircd::ctx::parallel void wait_done(); void wait_avail(); void rethrow_any_exception(); - void receiver() noexcept; - void sender() noexcept; + void receiver(const size_t pos) noexcept; + void sender(const size_t pos) noexcept; public: size_t nextpos() const; - void operator()(); void operator()(const arg &a); - parallel(pool &, const vector_view &, closure); - parallel(parallel &&) = delete; - parallel(const parallel &) = delete; - ~parallel() noexcept; + concurrent(pool &, const vector_view &, closure); + concurrent(concurrent &&) = delete; + concurrent(const concurrent &) = delete; + ~concurrent() noexcept; }; template -ircd::ctx::parallel::parallel(pool &p, - const vector_view &a, - closure c) +ircd::ctx::concurrent::concurrent(pool &p, + const vector_view &a, + closure c) :p{&p} ,a{a} +,b(this->a.size(), false) ,c{std::move(c)} { p.min(this->a.size()); } template -ircd::ctx::parallel::~parallel() +ircd::ctx::concurrent::~concurrent() noexcept { const uninterruptible::nothrow ui; @@ -71,44 +72,43 @@ noexcept template void -ircd::ctx::parallel::operator()(const arg &a) +ircd::ctx::concurrent::operator()(const arg &a) { const uninterruptible ui; rethrow_any_exception(); assert(avail()); - this->a.at(nextpos()) = a; - sender(); - wait_avail(); -} - -template -void -ircd::ctx::parallel::operator()() -{ - const uninterruptible ui; - rethrow_any_exception(); - assert(avail()); - sender(); + const auto nextpos(this->nextpos()); + assert(nextpos < b.size()); + this->a.at(nextpos) = a; + assert(this->b.at(nextpos) == false); + this->b.at(nextpos) = true; + sender(nextpos); wait_avail(); } template size_t -ircd::ctx::parallel::nextpos() +ircd::ctx::concurrent::nextpos() const { - return snd % a.size(); + const auto it + { + std::find(begin(b), end(b), false) + }; + + return std::distance(begin(b), it); } template void -ircd::ctx::parallel::sender() +ircd::ctx::concurrent::sender(const size_t pos) noexcept { + assert(pos < b.size()); auto &p(*this->p); auto func { - std::bind(¶llel::receiver, this) + std::bind(&concurrent::receiver, this, pos) //TODO: alloc }; ++snd; @@ -121,15 +121,11 @@ noexcept template void -ircd::ctx::parallel::receiver() +ircd::ctx::concurrent::receiver(const size_t pos) noexcept { - assert(snd > rcv); - const auto pos - { - rcv++ % this->a.size() - }; - + ++rcv; + assert(snd >= rcv); if(!this->eptr) try { c(this->a.at(pos)); @@ -139,14 +135,17 @@ noexcept this->eptr = std::current_exception(); } + assert(pos < b.size()); + assert(this->b.at(pos) == true); + this->b.at(pos) = false; assert(rcv > fin); - fin++; + ++fin; d.notify_one(); } template void -ircd::ctx::parallel::rethrow_any_exception() +ircd::ctx::concurrent::rethrow_any_exception() { if(likely(!this->eptr)) return; @@ -159,7 +158,7 @@ ircd::ctx::parallel::rethrow_any_exception() template void -ircd::ctx::parallel::wait_avail() +ircd::ctx::concurrent::wait_avail() { d.wait([this] { @@ -169,7 +168,7 @@ ircd::ctx::parallel::wait_avail() template void -ircd::ctx::parallel::wait_done() +ircd::ctx::concurrent::wait_done() { d.wait([this] { @@ -179,23 +178,23 @@ ircd::ctx::parallel::wait_done() template bool -ircd::ctx::parallel::avail() +ircd::ctx::concurrent::avail() const { assert(snd >= rcv); assert(rcv >= fin); assert(snd - rcv <= a.size()); assert(snd - fin <= a.size()); - return snd - fin < a.size(); + return snd - fin < a.size() && nextpos() < a.size(); } template bool -ircd::ctx::parallel::done() +ircd::ctx::concurrent::done() const { assert(snd >= rcv); assert(rcv >= fin); assert(snd - rcv <= a.size()); - return snd - fin == 0; + return snd - fin == 0 && nextpos() == 0; } diff --git a/include/ircd/ctx/ctx.h b/include/ircd/ctx/ctx.h index cd2f84a7e..85d927857 100644 --- a/include/ircd/ctx/ctx.h +++ b/include/ircd/ctx/ctx.h @@ -101,7 +101,7 @@ namespace ircd::ctx #include "pool.h" #include "ole.h" #include "fault.h" -#include "parallel.h" +#include "concurrent.h" // Exports to ircd:: namespace ircd diff --git a/modules/client/sync/presence.cc b/modules/client/sync/presence.cc index c9b72a549..3f5a64c3e 100644 --- a/modules/client/sync/presence.cc +++ b/modules/client/sync/presence.cc @@ -123,12 +123,12 @@ ircd::m::sync::presence_polylog(data &data) }; }}; - // Setup for parallelization. + // Setup for concurrentization. static const size_t fibers(64); //TODO: conf using buffer = std::array; const auto buf(std::make_unique()); std::array q; - ctx::parallel parallel + ctx::concurrent concurrent { m::sync::pool, q, [&data, &append_event] (const m::user::id user_id) @@ -145,19 +145,18 @@ ircd::m::sync::presence_polylog(data &data) // Iterate all of the users visible to our user in joined rooms. const m::user::mitsein mitsein{data.user}; - mitsein.for_each("join", [¶llel, &q, &buf] + mitsein.for_each("join", [&concurrent, &q, &buf] (const m::user &user) { // Manual copy of the user_id string to the buffer and assignment - // of q at the next position. parallel.snd is the position in q - // which ctx::parallel wants us to store the next data at. The - // parallel() call doesn't return (blocks this context) until there's + // of q at the next position. concurrent.snd is the position in q + // which ctx::concurrent wants us to store the next data at. The + // concurrent() call doesn't return (blocks this context) until there's // a next position available; propagating flow-control for the iter. - const auto pos(parallel.nextpos()); - q[pos] = strlcpy(buf->at(pos), user.user_id); - parallel(); + const auto pos(concurrent.nextpos()); + concurrent(strlcpy(buf->at(pos), user.user_id)); }); - parallel.wait_done(); + concurrent.wait_done(); return ret; } diff --git a/modules/client/sync/rooms/state.cc b/modules/client/sync/rooms/state.cc index 373b56e67..d63da30b3 100644 --- a/modules/client/sync/rooms/state.cc +++ b/modules/client/sync/rooms/state.cc @@ -208,8 +208,8 @@ ircd::m::sync::room_state_polylog_events(data &data) }; ctx::mutex mutex; - std::array md; //TODO: conf - std::vector event(md.size() * 2); + std::array md; + std::vector event(md.size() * 3); for(auto &fetch : event) fetch = event::fetch{_default_fopts}; @@ -237,19 +237,21 @@ ircd::m::sync::room_state_polylog_events(data &data) ret = true; }}; - ctx::parallel parallel + ctx::concurrent concurrent { m::sync::pool, md, each_idx }; - state.for_each([&data, ¶llel] + state.for_each([&data, &concurrent, &each_idx] (const m::event::idx &event_idx) { - if(apropos(data, event_idx)) - parallel(event_idx); + if(!apropos(data, event_idx)) + return; + + concurrent(event_idx); }); - parallel.wait_done(); + concurrent.wait_done(); return ret; }