0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 18:22:50 +01:00

ircd::ctx::parallel: Use monotonic counters only; use copy for closure argument.

This commit is contained in:
Jason Volk 2019-01-21 11:50:46 -08:00
parent 963d207bf6
commit defed2c008

View file

@ -19,7 +19,7 @@ namespace ircd::ctx
template<class arg>
struct ircd::ctx::parallel
{
using closure = std::function<void (arg &)>;
using closure = std::function<void (arg)>;
pool *p {nullptr};
vector_view<arg> a;
@ -28,15 +28,17 @@ struct ircd::ctx::parallel
std::exception_ptr eptr;
ushort snd {0};
ushort rcv {0};
ushort out {0};
bool done() const;
bool avail() const;
void wait_done();
void wait_avail();
void rethrow_any_exception();
void receiver() noexcept;
void sender() noexcept;
public:
void wait_avail();
void wait_done();
size_t nextpos() const;
void operator()();
void operator()(const arg &a);
@ -71,11 +73,10 @@ void
ircd::ctx::parallel<arg>::operator()(const arg &a)
{
rethrow_any_exception();
assert(snd < this->a.size());
this->a.at(snd) = a;
assert(avail());
this->a.at(nextpos()) = a;
sender();
wait_avail();
assert(out < this->a.size());
}
template<class arg>
@ -83,9 +84,17 @@ void
ircd::ctx::parallel<arg>::operator()()
{
rethrow_any_exception();
assert(avail());
sender();
wait_avail();
assert(out < this->a.size());
}
template<class arg>
size_t
ircd::ctx::parallel<arg>::nextpos()
const
{
return snd % a.size();
}
template<class arg>
@ -93,18 +102,14 @@ void
ircd::ctx::parallel<arg>::sender()
noexcept
{
++snd;
snd %= this->a.size();
++out;
assert(out <= this->a.size());
auto &p(*this->p);
auto func
{
std::bind(&parallel::receiver, this)
};
++snd;
assert(snd > rcv);
if(likely(p.size()))
p(std::move(func));
else
@ -116,22 +121,24 @@ void
ircd::ctx::parallel<arg>::receiver()
noexcept
{
auto &a{this->a.at(rcv % this->a.size())};
++rcv;
assert(snd > rcv);
const auto pos
{
rcv % this->a.size()
};
if(!this->eptr) try
{
c(a);
c(this->a.at(pos));
}
catch(...)
{
this->eptr = std::current_exception();
}
assert(out <= this->a.size());
assert(out > 0);
--out;
++rcv;
d.notify_one();
assert(snd >= rcv);
}
template<class arg>
@ -153,7 +160,7 @@ ircd::ctx::parallel<arg>::wait_avail()
{
d.wait([this]
{
return out < a.size();
return avail();
});
}
@ -163,6 +170,26 @@ ircd::ctx::parallel<arg>::wait_done()
{
d.wait([this]
{
return !out;
return done();
});
}
template<class arg>
bool
ircd::ctx::parallel<arg>::avail()
const
{
assert(snd >= rcv);
assert(snd - rcv <= ssize_t(a.size()));
return snd - rcv < ssize_t(a.size());
}
template<class arg>
bool
ircd::ctx::parallel<arg>::done()
const
{
assert(snd >= rcv);
assert(snd - rcv <= ssize_t(a.size()));
return snd - rcv == 0;
}