mirror of
https://github.com/matrix-construct/construct
synced 2024-11-17 15:30:52 +01:00
ircd::ctx: Candidate fixes for view sequencing.
This commit is contained in:
parent
b49cdb84e8
commit
543a1988b9
1 changed files with 50 additions and 40 deletions
|
@ -36,8 +36,9 @@ template<class T,
|
|||
class ircd::ctx::view
|
||||
:public mutex
|
||||
{
|
||||
T *t {nullptr};
|
||||
dock q;
|
||||
T *t {nullptr};
|
||||
size_t wanting {0};
|
||||
size_t waiting {0};
|
||||
|
||||
bool ready() const;
|
||||
|
@ -61,6 +62,7 @@ ircd::ctx::view<T, mutex>::~view()
|
|||
noexcept
|
||||
{
|
||||
assert(!waiting);
|
||||
assert(!wanting);
|
||||
}
|
||||
|
||||
template<class T,
|
||||
|
@ -68,16 +70,21 @@ template<class T,
|
|||
void
|
||||
ircd::ctx::view<T, mutex>::operator()(T &t)
|
||||
{
|
||||
if(!waiting)
|
||||
return;
|
||||
const auto produce{[this]
|
||||
(T *const &t, size_t &semaphore)
|
||||
{
|
||||
q.wait([this, &semaphore]
|
||||
{
|
||||
return semaphore == 0;
|
||||
});
|
||||
|
||||
this->t = &t;
|
||||
q.notify_all();
|
||||
q.wait([this] { return !waiting; });
|
||||
const std::lock_guard<view> lock{*this};
|
||||
this->t = nullptr;
|
||||
assert(!waiting);
|
||||
q.notify_all();
|
||||
const std::lock_guard<view> lock{*this};
|
||||
this->t = t;
|
||||
q.notify_all();
|
||||
}};
|
||||
|
||||
produce(&t, wanting);
|
||||
produce(nullptr, waiting);
|
||||
}
|
||||
|
||||
template<class T,
|
||||
|
@ -86,24 +93,26 @@ template<class lock>
|
|||
T &
|
||||
ircd::ctx::view<T, mutex>::wait(lock &l)
|
||||
{
|
||||
for(assert(l.owns_lock()); ready(); l.lock())
|
||||
const auto consume{[this, &l]
|
||||
(const bool &r, size_t &semaphore)
|
||||
{
|
||||
l.unlock();
|
||||
q.wait();
|
||||
}
|
||||
const unwind uw{[this, &l, &semaphore]
|
||||
{
|
||||
--semaphore;
|
||||
q.notify_all();
|
||||
}};
|
||||
|
||||
const unwind ul{[this]
|
||||
{
|
||||
--waiting;
|
||||
q.notify_all();
|
||||
++semaphore;
|
||||
assert(l.owns_lock());
|
||||
const unlock_guard<lock> ul{l};
|
||||
q.wait([this, &r]
|
||||
{
|
||||
return ready() == r;
|
||||
});
|
||||
}};
|
||||
|
||||
for(++waiting; !ready(); l.lock())
|
||||
{
|
||||
l.unlock();
|
||||
q.wait();
|
||||
}
|
||||
|
||||
consume(false, wanting);
|
||||
consume(true, waiting);
|
||||
assert(t != nullptr);
|
||||
return *t;
|
||||
}
|
||||
|
@ -127,26 +136,27 @@ T &
|
|||
ircd::ctx::view<T, mutex>::wait_until(lock &l,
|
||||
time_point&& tp)
|
||||
{
|
||||
for(assert(l.owns_lock()); ready(); l.lock())
|
||||
const auto consume{[this, &l, &tp]
|
||||
(const bool &r, size_t &semaphore)
|
||||
{
|
||||
l.unlock();
|
||||
if(!q.wait_until(tp))
|
||||
throw timeout{};
|
||||
}
|
||||
const unwind uw{[this, &l, &semaphore]
|
||||
{
|
||||
--semaphore;
|
||||
q.notify_all();
|
||||
}};
|
||||
|
||||
const unwind ul{[this]
|
||||
{
|
||||
--waiting;
|
||||
q.notify_all();
|
||||
++semaphore;
|
||||
assert(l.owns_lock());
|
||||
const unlock_guard<lock> ul{l};
|
||||
if(!q.wait_until(tp, [this, &r]
|
||||
{
|
||||
return ready() == r;
|
||||
}))
|
||||
throw timeout{};
|
||||
}};
|
||||
|
||||
for(++waiting; !ready(); l.lock())
|
||||
{
|
||||
l.unlock();
|
||||
if(!q.wait_until(tp))
|
||||
throw timeout{};
|
||||
}
|
||||
|
||||
consume(false, wanting);
|
||||
consume(true, waiting);
|
||||
assert(t != nullptr);
|
||||
return *t;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue