diff --git a/include/ircd/ctx/concurrent.h b/include/ircd/ctx/concurrent.h index dce244430..55d1cb6a1 100644 --- a/include/ircd/ctx/concurrent.h +++ b/include/ircd/ctx/concurrent.h @@ -9,192 +9,80 @@ // full license for this software is available in the LICENSE file. #pragma once -#define HAVE_IRCD_CTX_CONCURRENT_H +#define HAVE_IRCD_CTX_CONCURRENT namespace ircd::ctx { - template class concurrent; + template struct concurrent; } -template +template struct ircd::ctx::concurrent { - using closure = std::function; + using closure = std::function; - pool *p {nullptr}; - vector_view a; - std::vector b; + pool &p; closure c; dock d; + size_t snd {0}; + size_t rcv {0}; + size_t fin {0}; std::exception_ptr eptr; - uint64_t snd {0}; // sends to pool - uint64_t rcv {0}; // receives by worker - uint64_t fin {0}; // finished by worker - bool done() const; - bool avail() const; - void wait_done(); - void wait_avail(); - void rethrow_any_exception(); - void receiver(const size_t pos) noexcept; - void sender(const size_t pos) noexcept; + template void operator()(V&&); + void wait(); - public: - size_t nextpos() const; - - void operator()(const arg &a); - - concurrent(pool &, const vector_view &, closure); - concurrent(concurrent &&) = delete; - concurrent(const concurrent &) = delete; + concurrent(pool &, closure); ~concurrent() noexcept; }; -template -ircd::ctx::concurrent::concurrent(pool &p, - const vector_view &a, - closure c) -:p{&p} -,a{a} -,b(this->a.size(), false) -,c{std::move(c)} -{ - p.min(this->a.size()); -} +template +ircd::ctx::concurrent::concurrent(pool &p, + closure c) -template -ircd::ctx::concurrent::~concurrent() +:p{p} +,c{std::move(c)} +{} + +template +ircd::ctx::concurrent::~concurrent() noexcept { const uninterruptible::nothrow ui; - wait_done(); + this->wait(); } -template +template void -ircd::ctx::concurrent::operator()(const arg &a) +ircd::ctx::concurrent::wait() { - const uninterruptible ui; - rethrow_any_exception(); - assert(avail()); - const auto nextpos(this->nextpos()); - assert(nextpos < b.size()); - this->a.at(nextpos) = a; - assert(this->b.at(nextpos) == false); - this->b.at(nextpos) = true; - sender(nextpos); - wait_avail(); -} - -template -size_t -ircd::ctx::concurrent::nextpos() -const -{ - const auto it + d.wait([this] { - std::find(begin(b), end(b), false) - }; - - return std::distance(begin(b), it); + return snd == fin; + }); } -template +template +template void -ircd::ctx::concurrent::sender(const size_t pos) -noexcept +ircd::ctx::concurrent::operator()(V&& v) { - assert(pos < b.size()); - auto &p(*this->p); - auto func - { - std::bind(&concurrent::receiver, this, pos) //TODO: alloc - }; - ++snd; - assert(snd > rcv); - if(likely(p.size())) - p(std::move(func)); - else - func(); -} - -template -void -ircd::ctx::concurrent::receiver(const size_t pos) -noexcept -{ - ++rcv; - assert(snd >= rcv); - if(!this->eptr) try + p([this, v(std::move(v))] { - c(this->a.at(pos)); - } - catch(...) - { - this->eptr = std::current_exception(); - } + ++rcv; try + { + c(std::move(v)); + } + catch(...) + { + eptr = std::current_exception(); + } - assert(pos < b.size()); - assert(this->b.at(pos) == true); - this->b.at(pos) = false; - assert(rcv > fin); - ++fin; - d.notify_one(); -} - -template -void -ircd::ctx::concurrent::rethrow_any_exception() -{ - if(likely(!this->eptr)) - return; - - wait_done(); - const auto eptr(this->eptr); - this->eptr = {}; - std::rethrow_exception(eptr); -} - -template -void -ircd::ctx::concurrent::wait_avail() -{ - d.wait([this] - { - return avail(); + ++fin; + d.notify_all(); }); -} -template -void -ircd::ctx::concurrent::wait_done() -{ - d.wait([this] - { - return done(); - }); -} - -template -bool -ircd::ctx::concurrent::avail() -const -{ - assert(snd >= rcv); - assert(rcv >= fin); - assert(snd - rcv <= a.size()); - assert(snd - fin <= a.size()); - return snd - fin < a.size() && nextpos() < a.size(); -} - -template -bool -ircd::ctx::concurrent::done() -const -{ - assert(snd >= rcv); - assert(rcv >= fin); - assert(snd - rcv <= a.size()); - return snd - fin == 0 && nextpos() == 0; + if(eptr) + std::rethrow_exception(eptr); } diff --git a/modules/client/sync/presence.cc b/modules/client/sync/presence.cc index 3f5a64c3e..137ed8464 100644 --- a/modules/client/sync/presence.cc +++ b/modules/client/sync/presence.cc @@ -124,39 +124,32 @@ ircd::m::sync::presence_polylog(data &data) }}; // Setup for concurrentization. - static const size_t fibers(64); //TODO: conf - using buffer = std::array; - const auto buf(std::make_unique()); - std::array q; - ctx::concurrent concurrent + static const size_t fibers(64); + sync::pool.min(fibers); + ctx::concurrent concurrent { - m::sync::pool, q, [&data, &append_event] - (const m::user::id user_id) + sync::pool, [&data, &append_event](std::string user_id) { const event::idx event_idx { - m::presence::get(std::nothrow, user_id) + m::presence::get(std::nothrow, m::user::id{user_id}) }; - if(apropos(data, event_idx)) - m::get(std::nothrow, event_idx, "content", append_event); + if(!apropos(data, event_idx)) + return; + + m::get(std::nothrow, event_idx, "content", append_event); } }; // Iterate all of the users visible to our user in joined rooms. const m::user::mitsein mitsein{data.user}; - mitsein.for_each("join", [&concurrent, &q, &buf] + mitsein.for_each("join", [&concurrent] (const m::user &user) { - // Manual copy of the user_id string to the buffer and assignment - // of q at the next position. concurrent.snd is the position in q - // which ctx::concurrent wants us to store the next data at. The - // concurrent() call doesn't return (blocks this context) until there's - // a next position available; propagating flow-control for the iter. - const auto pos(concurrent.nextpos()); - concurrent(strlcpy(buf->at(pos), user.user_id)); + concurrent(std::string(user.user_id)); }); - concurrent.wait_done(); + concurrent.wait(); return ret; } diff --git a/modules/client/sync/rooms/state.cc b/modules/client/sync/rooms/state.cc index fafeae9c4..5be8b9a45 100644 --- a/modules/client/sync/rooms/state.cc +++ b/modules/client/sync/rooms/state.cc @@ -197,50 +197,44 @@ ircd::m::sync::room_state_polylog_events(data &data) if(data.phased && data.range.first == 0) return room_state_phased_events(data); - const m::room &room{*data.room}; - const m::room::state state{room}; + bool ret{false}; + ctx::mutex mutex; json::stack::array array { *data.out, "events" }; - ctx::mutex mutex; - std::array md; - std::vector event(md.size() * 3); - for(auto &fetch : event) - fetch = event::fetch{_default_fopts}; - - size_t _i(0); - bool ret{false}; - const event::closure_idx each_idx{[&data, &array, &mutex, &ret, &event, &_i] - (const m::event::idx event_idx) - { - const size_t i{_i++ % event.size()}; - if(unlikely(!seek(event.at(i), event_idx, std::nothrow))) - { - assert(data.room); - log::error - { - log, "Failed to fetch event idx:%lu in room %s state.", - event_idx, - string_view{data.room->room_id} - }; - - return; - } - - const std::lock_guard lock{mutex}; - room_state_append(data, array, event.at(i), event_idx); - ret = true; - }}; - + sync::pool.min(64); //TODO: XXX ctx::concurrent concurrent { - m::sync::pool, md, each_idx + sync::pool, [&](const event::idx &event_idx) + { + const m::event::fetch event + { + event_idx, std::nothrow, _default_fopts + }; + + if(unlikely(!event.valid)) + { + log::error + { + log, "Failed to fetch event idx:%lu in room %s state.", + event_idx, + string_view{data.room->room_id}, + }; + + return; + } + + const std::lock_guard lock{mutex}; + room_state_append(data, array, event, event_idx); + ret |= true; + } }; - state.for_each([&data, &concurrent, &each_idx] - (const m::event::idx &event_idx) + const room::state state{*data.room}; + state.for_each([&data, &concurrent] + (const event::idx &event_idx) { if(!apropos(data, event_idx)) return; @@ -248,7 +242,7 @@ ircd::m::sync::room_state_polylog_events(data &data) concurrent(event_idx); }); - concurrent.wait_done(); + concurrent.wait(); return ret; }