0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-10-01 05:08:59 +02:00

ircd::ctx: Fix ctx::parallel argument allocation assumptions.

ircd::ctx: Rename tool.
This commit is contained in:
Jason Volk 2019-07-12 15:18:26 -07:00
parent 63a238fa76
commit c7a68a8cb3
4 changed files with 66 additions and 66 deletions

View file

@ -1,7 +1,7 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
// Copyright (C) 2016-2019 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
@ -9,20 +9,21 @@
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_CTX_PARALLEL_H
#define HAVE_IRCD_CTX_CONCURRENT_H
namespace ircd::ctx
{
template<class arg> class parallel;
template<class arg> class concurrent;
}
template<class arg>
struct ircd::ctx::parallel
struct ircd::ctx::concurrent
{
using closure = std::function<void (arg &)>;
pool *p {nullptr};
vector_view<arg> a;
std::vector<bool> b;
closure c;
dock d;
std::exception_ptr eptr;
@ -35,34 +36,34 @@ struct ircd::ctx::parallel
void wait_done();
void wait_avail();
void rethrow_any_exception();
void receiver() noexcept;
void sender() noexcept;
void receiver(const size_t pos) noexcept;
void sender(const size_t pos) noexcept;
public:
size_t nextpos() const;
void operator()();
void operator()(const arg &a);
parallel(pool &, const vector_view<arg> &, closure);
parallel(parallel &&) = delete;
parallel(const parallel &) = delete;
~parallel() noexcept;
concurrent(pool &, const vector_view<arg> &, closure);
concurrent(concurrent &&) = delete;
concurrent(const concurrent &) = delete;
~concurrent() noexcept;
};
template<class arg>
ircd::ctx::parallel<arg>::parallel(pool &p,
const vector_view<arg> &a,
closure c)
ircd::ctx::concurrent<arg>::concurrent(pool &p,
const vector_view<arg> &a,
closure c)
:p{&p}
,a{a}
,b(this->a.size(), false)
,c{std::move(c)}
{
p.min(this->a.size());
}
template<class arg>
ircd::ctx::parallel<arg>::~parallel()
ircd::ctx::concurrent<arg>::~concurrent()
noexcept
{
const uninterruptible::nothrow ui;
@ -71,44 +72,43 @@ noexcept
template<class arg>
void
ircd::ctx::parallel<arg>::operator()(const arg &a)
ircd::ctx::concurrent<arg>::operator()(const arg &a)
{
const uninterruptible ui;
rethrow_any_exception();
assert(avail());
this->a.at(nextpos()) = a;
sender();
wait_avail();
}
template<class arg>
void
ircd::ctx::parallel<arg>::operator()()
{
const uninterruptible ui;
rethrow_any_exception();
assert(avail());
sender();
const auto nextpos(this->nextpos());
assert(nextpos < b.size());
this->a.at(nextpos) = a;
assert(this->b.at(nextpos) == false);
this->b.at(nextpos) = true;
sender(nextpos);
wait_avail();
}
template<class arg>
size_t
ircd::ctx::parallel<arg>::nextpos()
ircd::ctx::concurrent<arg>::nextpos()
const
{
return snd % a.size();
const auto it
{
std::find(begin(b), end(b), false)
};
return std::distance(begin(b), it);
}
template<class arg>
void
ircd::ctx::parallel<arg>::sender()
ircd::ctx::concurrent<arg>::sender(const size_t pos)
noexcept
{
assert(pos < b.size());
auto &p(*this->p);
auto func
{
std::bind(&parallel::receiver, this)
std::bind(&concurrent::receiver, this, pos) //TODO: alloc
};
++snd;
@ -121,15 +121,11 @@ noexcept
template<class arg>
void
ircd::ctx::parallel<arg>::receiver()
ircd::ctx::concurrent<arg>::receiver(const size_t pos)
noexcept
{
assert(snd > rcv);
const auto pos
{
rcv++ % this->a.size()
};
++rcv;
assert(snd >= rcv);
if(!this->eptr) try
{
c(this->a.at(pos));
@ -139,14 +135,17 @@ noexcept
this->eptr = std::current_exception();
}
assert(pos < b.size());
assert(this->b.at(pos) == true);
this->b.at(pos) = false;
assert(rcv > fin);
fin++;
++fin;
d.notify_one();
}
template<class arg>
void
ircd::ctx::parallel<arg>::rethrow_any_exception()
ircd::ctx::concurrent<arg>::rethrow_any_exception()
{
if(likely(!this->eptr))
return;
@ -159,7 +158,7 @@ ircd::ctx::parallel<arg>::rethrow_any_exception()
template<class arg>
void
ircd::ctx::parallel<arg>::wait_avail()
ircd::ctx::concurrent<arg>::wait_avail()
{
d.wait([this]
{
@ -169,7 +168,7 @@ ircd::ctx::parallel<arg>::wait_avail()
template<class arg>
void
ircd::ctx::parallel<arg>::wait_done()
ircd::ctx::concurrent<arg>::wait_done()
{
d.wait([this]
{
@ -179,23 +178,23 @@ ircd::ctx::parallel<arg>::wait_done()
template<class arg>
bool
ircd::ctx::parallel<arg>::avail()
ircd::ctx::concurrent<arg>::avail()
const
{
assert(snd >= rcv);
assert(rcv >= fin);
assert(snd - rcv <= a.size());
assert(snd - fin <= a.size());
return snd - fin < a.size();
return snd - fin < a.size() && nextpos() < a.size();
}
template<class arg>
bool
ircd::ctx::parallel<arg>::done()
ircd::ctx::concurrent<arg>::done()
const
{
assert(snd >= rcv);
assert(rcv >= fin);
assert(snd - rcv <= a.size());
return snd - fin == 0;
return snd - fin == 0 && nextpos() == 0;
}

View file

@ -101,7 +101,7 @@ namespace ircd::ctx
#include "pool.h"
#include "ole.h"
#include "fault.h"
#include "parallel.h"
#include "concurrent.h"
// Exports to ircd::
namespace ircd

View file

@ -123,12 +123,12 @@ ircd::m::sync::presence_polylog(data &data)
};
}};
// Setup for parallelization.
// Setup for concurrentization.
static const size_t fibers(64); //TODO: conf
using buffer = std::array<char[m::id::MAX_SIZE+1], fibers>;
const auto buf(std::make_unique<buffer>());
std::array<string_view, fibers> q;
ctx::parallel<string_view> parallel
ctx::concurrent<string_view> concurrent
{
m::sync::pool, q, [&data, &append_event]
(const m::user::id user_id)
@ -145,19 +145,18 @@ ircd::m::sync::presence_polylog(data &data)
// Iterate all of the users visible to our user in joined rooms.
const m::user::mitsein mitsein{data.user};
mitsein.for_each("join", [&parallel, &q, &buf]
mitsein.for_each("join", [&concurrent, &q, &buf]
(const m::user &user)
{
// Manual copy of the user_id string to the buffer and assignment
// of q at the next position. parallel.snd is the position in q
// which ctx::parallel wants us to store the next data at. The
// parallel() call doesn't return (blocks this context) until there's
// of q at the next position. concurrent.snd is the position in q
// which ctx::concurrent wants us to store the next data at. The
// concurrent() call doesn't return (blocks this context) until there's
// a next position available; propagating flow-control for the iter.
const auto pos(parallel.nextpos());
q[pos] = strlcpy(buf->at(pos), user.user_id);
parallel();
const auto pos(concurrent.nextpos());
concurrent(strlcpy(buf->at(pos), user.user_id));
});
parallel.wait_done();
concurrent.wait_done();
return ret;
}

View file

@ -208,8 +208,8 @@ ircd::m::sync::room_state_polylog_events(data &data)
};
ctx::mutex mutex;
std::array<event::idx, 64> md; //TODO: conf
std::vector<event::fetch> event(md.size() * 2);
std::array<event::idx, 64> md;
std::vector<event::fetch> event(md.size() * 3);
for(auto &fetch : event)
fetch = event::fetch{_default_fopts};
@ -237,19 +237,21 @@ ircd::m::sync::room_state_polylog_events(data &data)
ret = true;
}};
ctx::parallel<event::idx> parallel
ctx::concurrent<event::idx> concurrent
{
m::sync::pool, md, each_idx
};
state.for_each([&data, &parallel]
state.for_each([&data, &concurrent, &each_idx]
(const m::event::idx &event_idx)
{
if(apropos(data, event_idx))
parallel(event_idx);
if(!apropos(data, event_idx))
return;
concurrent(event_idx);
});
parallel.wait_done();
concurrent.wait_done();
return ret;
}