0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-09-29 20:28:52 +02:00

ircd::ctx: Simplify concurrent template.

modules/client/sync: Improve concurrent instantiation sites.
This commit is contained in:
Jason Volk 2019-07-15 12:46:19 -07:00
parent 14040f4917
commit 598585a431
3 changed files with 84 additions and 209 deletions

View file

@ -9,192 +9,80 @@
// full license for this software is available in the LICENSE file. // full license for this software is available in the LICENSE file.
#pragma once #pragma once
#define HAVE_IRCD_CTX_CONCURRENT_H #define HAVE_IRCD_CTX_CONCURRENT
namespace ircd::ctx namespace ircd::ctx
{ {
template<class arg> class concurrent; template<class value> struct concurrent;
} }
template<class arg> template<class value>
struct ircd::ctx::concurrent struct ircd::ctx::concurrent
{ {
using closure = std::function<void (arg &)>; using closure = std::function<void (value)>;
pool *p {nullptr}; pool &p;
vector_view<arg> a;
std::vector<bool> b;
closure c; closure c;
dock d; dock d;
size_t snd {0};
size_t rcv {0};
size_t fin {0};
std::exception_ptr eptr; std::exception_ptr eptr;
uint64_t snd {0}; // sends to pool
uint64_t rcv {0}; // receives by worker
uint64_t fin {0}; // finished by worker
bool done() const; template<class V> void operator()(V&&);
bool avail() const; void wait();
void wait_done();
void wait_avail();
void rethrow_any_exception();
void receiver(const size_t pos) noexcept;
void sender(const size_t pos) noexcept;
public: concurrent(pool &, closure);
size_t nextpos() const;
void operator()(const arg &a);
concurrent(pool &, const vector_view<arg> &, closure);
concurrent(concurrent &&) = delete;
concurrent(const concurrent &) = delete;
~concurrent() noexcept; ~concurrent() noexcept;
}; };
template<class arg> template<class value>
ircd::ctx::concurrent<arg>::concurrent(pool &p, ircd::ctx::concurrent<value>::concurrent(pool &p,
const vector_view<arg> &a, closure c)
closure c)
:p{&p}
,a{a}
,b(this->a.size(), false)
,c{std::move(c)}
{
p.min(this->a.size());
}
template<class arg> :p{p}
ircd::ctx::concurrent<arg>::~concurrent() ,c{std::move(c)}
{}
template<class value>
ircd::ctx::concurrent<value>::~concurrent()
noexcept noexcept
{ {
const uninterruptible::nothrow ui; const uninterruptible::nothrow ui;
wait_done(); this->wait();
} }
template<class arg> template<class value>
void void
ircd::ctx::concurrent<arg>::operator()(const arg &a) ircd::ctx::concurrent<value>::wait()
{ {
const uninterruptible ui; d.wait([this]
rethrow_any_exception();
assert(avail());
const auto nextpos(this->nextpos());
assert(nextpos < b.size());
this->a.at(nextpos) = a;
assert(this->b.at(nextpos) == false);
this->b.at(nextpos) = true;
sender(nextpos);
wait_avail();
}
template<class arg>
size_t
ircd::ctx::concurrent<arg>::nextpos()
const
{
const auto it
{ {
std::find(begin(b), end(b), false) return snd == fin;
}; });
return std::distance(begin(b), it);
} }
template<class arg> template<class value>
template<class V>
void void
ircd::ctx::concurrent<arg>::sender(const size_t pos) ircd::ctx::concurrent<value>::operator()(V&& v)
noexcept
{ {
assert(pos < b.size());
auto &p(*this->p);
auto func
{
std::bind(&concurrent::receiver, this, pos) //TODO: alloc
};
++snd; ++snd;
assert(snd > rcv); p([this, v(std::move(v))]
if(likely(p.size()))
p(std::move(func));
else
func();
}
template<class arg>
void
ircd::ctx::concurrent<arg>::receiver(const size_t pos)
noexcept
{
++rcv;
assert(snd >= rcv);
if(!this->eptr) try
{ {
c(this->a.at(pos)); ++rcv; try
} {
catch(...) c(std::move(v));
{ }
this->eptr = std::current_exception(); catch(...)
} {
eptr = std::current_exception();
}
assert(pos < b.size()); ++fin;
assert(this->b.at(pos) == true); d.notify_all();
this->b.at(pos) = false;
assert(rcv > fin);
++fin;
d.notify_one();
}
template<class arg>
void
ircd::ctx::concurrent<arg>::rethrow_any_exception()
{
if(likely(!this->eptr))
return;
wait_done();
const auto eptr(this->eptr);
this->eptr = {};
std::rethrow_exception(eptr);
}
template<class arg>
void
ircd::ctx::concurrent<arg>::wait_avail()
{
d.wait([this]
{
return avail();
}); });
}
template<class arg> if(eptr)
void std::rethrow_exception(eptr);
ircd::ctx::concurrent<arg>::wait_done()
{
d.wait([this]
{
return done();
});
}
template<class arg>
bool
ircd::ctx::concurrent<arg>::avail()
const
{
assert(snd >= rcv);
assert(rcv >= fin);
assert(snd - rcv <= a.size());
assert(snd - fin <= a.size());
return snd - fin < a.size() && nextpos() < a.size();
}
template<class arg>
bool
ircd::ctx::concurrent<arg>::done()
const
{
assert(snd >= rcv);
assert(rcv >= fin);
assert(snd - rcv <= a.size());
return snd - fin == 0 && nextpos() == 0;
} }

View file

@ -124,39 +124,32 @@ ircd::m::sync::presence_polylog(data &data)
}}; }};
// Setup for concurrentization. // Setup for concurrentization.
static const size_t fibers(64); //TODO: conf static const size_t fibers(64);
using buffer = std::array<char[m::id::MAX_SIZE+1], fibers>; sync::pool.min(fibers);
const auto buf(std::make_unique<buffer>()); ctx::concurrent<std::string> concurrent
std::array<string_view, fibers> q;
ctx::concurrent<string_view> concurrent
{ {
m::sync::pool, q, [&data, &append_event] sync::pool, [&data, &append_event](std::string user_id)
(const m::user::id user_id)
{ {
const event::idx event_idx const event::idx event_idx
{ {
m::presence::get(std::nothrow, user_id) m::presence::get(std::nothrow, m::user::id{user_id})
}; };
if(apropos(data, event_idx)) if(!apropos(data, event_idx))
m::get(std::nothrow, event_idx, "content", append_event); return;
m::get(std::nothrow, event_idx, "content", append_event);
} }
}; };
// Iterate all of the users visible to our user in joined rooms. // Iterate all of the users visible to our user in joined rooms.
const m::user::mitsein mitsein{data.user}; const m::user::mitsein mitsein{data.user};
mitsein.for_each("join", [&concurrent, &q, &buf] mitsein.for_each("join", [&concurrent]
(const m::user &user) (const m::user &user)
{ {
// Manual copy of the user_id string to the buffer and assignment concurrent(std::string(user.user_id));
// of q at the next position. concurrent.snd is the position in q
// which ctx::concurrent wants us to store the next data at. The
// concurrent() call doesn't return (blocks this context) until there's
// a next position available; propagating flow-control for the iter.
const auto pos(concurrent.nextpos());
concurrent(strlcpy(buf->at(pos), user.user_id));
}); });
concurrent.wait_done(); concurrent.wait();
return ret; return ret;
} }

View file

@ -197,50 +197,44 @@ ircd::m::sync::room_state_polylog_events(data &data)
if(data.phased && data.range.first == 0) if(data.phased && data.range.first == 0)
return room_state_phased_events(data); return room_state_phased_events(data);
const m::room &room{*data.room}; bool ret{false};
const m::room::state state{room}; ctx::mutex mutex;
json::stack::array array json::stack::array array
{ {
*data.out, "events" *data.out, "events"
}; };
ctx::mutex mutex; sync::pool.min(64); //TODO: XXX
std::array<event::idx, 64> md;
std::vector<event::fetch> event(md.size() * 3);
for(auto &fetch : event)
fetch = event::fetch{_default_fopts};
size_t _i(0);
bool ret{false};
const event::closure_idx each_idx{[&data, &array, &mutex, &ret, &event, &_i]
(const m::event::idx event_idx)
{
const size_t i{_i++ % event.size()};
if(unlikely(!seek(event.at(i), event_idx, std::nothrow)))
{
assert(data.room);
log::error
{
log, "Failed to fetch event idx:%lu in room %s state.",
event_idx,
string_view{data.room->room_id}
};
return;
}
const std::lock_guard lock{mutex};
room_state_append(data, array, event.at(i), event_idx);
ret = true;
}};
ctx::concurrent<event::idx> concurrent ctx::concurrent<event::idx> concurrent
{ {
m::sync::pool, md, each_idx sync::pool, [&](const event::idx &event_idx)
{
const m::event::fetch event
{
event_idx, std::nothrow, _default_fopts
};
if(unlikely(!event.valid))
{
log::error
{
log, "Failed to fetch event idx:%lu in room %s state.",
event_idx,
string_view{data.room->room_id},
};
return;
}
const std::lock_guard lock{mutex};
room_state_append(data, array, event, event_idx);
ret |= true;
}
}; };
state.for_each([&data, &concurrent, &each_idx] const room::state state{*data.room};
(const m::event::idx &event_idx) state.for_each([&data, &concurrent]
(const event::idx &event_idx)
{ {
if(!apropos(data, event_idx)) if(!apropos(data, event_idx))
return; return;
@ -248,7 +242,7 @@ ircd::m::sync::room_state_polylog_events(data &data)
concurrent(event_idx); concurrent(event_idx);
}); });
concurrent.wait_done(); concurrent.wait();
return ret; return ret;
} }