mirror of
https://github.com/matrix-construct/construct
synced 2024-11-29 10:12:39 +01:00
ircd::ctx::parallel: Simplify internal interface.
This commit is contained in:
parent
4d05fe0353
commit
52e6c28383
1 changed files with 18 additions and 30 deletions
|
@ -33,7 +33,6 @@ struct ircd::ctx::parallel
|
|||
void rethrow_any_exception();
|
||||
void receiver() noexcept;
|
||||
void sender() noexcept;
|
||||
void sender(const arg &a) noexcept;
|
||||
|
||||
public:
|
||||
void wait_avail();
|
||||
|
@ -43,6 +42,8 @@ struct ircd::ctx::parallel
|
|||
void operator()(const arg &a);
|
||||
|
||||
parallel(pool &, const vector_view<arg> &, closure);
|
||||
parallel(parallel &&) = delete;
|
||||
parallel(const parallel &) = delete;
|
||||
~parallel() noexcept;
|
||||
};
|
||||
|
||||
|
@ -70,8 +71,11 @@ void
|
|||
ircd::ctx::parallel<arg>::operator()(const arg &a)
|
||||
{
|
||||
rethrow_any_exception();
|
||||
sender(a);
|
||||
assert(snd < this->a.size());
|
||||
this->a.at(snd) = a;
|
||||
sender();
|
||||
wait_avail();
|
||||
assert(out < this->a.size());
|
||||
}
|
||||
|
||||
template<class arg>
|
||||
|
@ -81,27 +85,7 @@ ircd::ctx::parallel<arg>::operator()()
|
|||
rethrow_any_exception();
|
||||
sender();
|
||||
wait_avail();
|
||||
}
|
||||
|
||||
template<class arg>
|
||||
void
|
||||
ircd::ctx::parallel<arg>::sender(const arg &a)
|
||||
noexcept
|
||||
{
|
||||
auto &p(*this->p);
|
||||
auto func
|
||||
{
|
||||
std::bind(¶llel::receiver, this)
|
||||
};
|
||||
|
||||
this->a.at(snd++) = a;
|
||||
snd %= this->a.size();
|
||||
out++;
|
||||
|
||||
if(likely(p.size()))
|
||||
p(std::move(func));
|
||||
else
|
||||
func();
|
||||
assert(out < this->a.size());
|
||||
}
|
||||
|
||||
template<class arg>
|
||||
|
@ -109,16 +93,18 @@ void
|
|||
ircd::ctx::parallel<arg>::sender()
|
||||
noexcept
|
||||
{
|
||||
++snd;
|
||||
snd %= this->a.size();
|
||||
|
||||
++out;
|
||||
assert(out <= this->a.size());
|
||||
|
||||
auto &p(*this->p);
|
||||
auto func
|
||||
{
|
||||
std::bind(¶llel::receiver, this)
|
||||
};
|
||||
|
||||
snd++;
|
||||
snd %= this->a.size();
|
||||
out++;
|
||||
|
||||
if(likely(p.size()))
|
||||
p(std::move(func));
|
||||
else
|
||||
|
@ -130,8 +116,8 @@ void
|
|||
ircd::ctx::parallel<arg>::receiver()
|
||||
noexcept
|
||||
{
|
||||
auto &a(this->a.at(rcv % this->a.size()));
|
||||
rcv++;
|
||||
auto &a{this->a.at(rcv % this->a.size())};
|
||||
++rcv;
|
||||
|
||||
if(!this->eptr) try
|
||||
{
|
||||
|
@ -142,7 +128,9 @@ noexcept
|
|||
this->eptr = std::current_exception();
|
||||
}
|
||||
|
||||
out--;
|
||||
assert(out <= this->a.size());
|
||||
assert(out > 0);
|
||||
--out;
|
||||
d.notify_one();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue