mirror of
https://github.com/matrix-construct/construct
synced 2024-11-15 14:31:11 +01:00
ircd::ctx: Proper condition variable exclusion semantics.
This commit is contained in:
parent
5326f434d9
commit
a33b4b05de
1 changed files with 156 additions and 43 deletions
|
@ -16,11 +16,16 @@ namespace ircd::ctx
|
||||||
struct condition_variable;
|
struct condition_variable;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ircd::ctx::condition_variable
|
class ircd::ctx::condition_variable
|
||||||
{
|
{
|
||||||
dock d;
|
list q;
|
||||||
|
|
||||||
|
void notify(ctx &) noexcept;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
bool empty() const;
|
||||||
|
size_t size() const;
|
||||||
|
|
||||||
template<class lock, class time_point, class predicate> bool wait_until(lock &, time_point&& tp, predicate&& pred);
|
template<class lock, class time_point, class predicate> bool wait_until(lock &, time_point&& tp, predicate&& pred);
|
||||||
template<class lock, class time_point> std::cv_status wait_until(lock &, time_point&& tp);
|
template<class lock, class time_point> std::cv_status wait_until(lock &, time_point&& tp);
|
||||||
|
|
||||||
|
@ -35,33 +40,59 @@ struct ircd::ctx::condition_variable
|
||||||
void notify() noexcept;
|
void notify() noexcept;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Wake up the next context waiting on the condition_variable
|
||||||
|
///
|
||||||
|
/// Unlike notify_one(), the next context in the queue is repositioned in the
|
||||||
|
/// back before being woken up for fairness.
|
||||||
inline void
|
inline void
|
||||||
ircd::ctx::condition_variable::notify()
|
ircd::ctx::condition_variable::notify()
|
||||||
noexcept
|
noexcept
|
||||||
{
|
{
|
||||||
d.notify();
|
ctx *c;
|
||||||
|
if(!(c = q.pop_front()))
|
||||||
|
return;
|
||||||
|
|
||||||
|
q.push_back(c);
|
||||||
|
notify(*c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wake up the next context waiting on the condition_variable
|
||||||
inline void
|
inline void
|
||||||
ircd::ctx::condition_variable::notify_one()
|
ircd::ctx::condition_variable::notify_one()
|
||||||
noexcept
|
noexcept
|
||||||
{
|
{
|
||||||
d.notify_one();
|
if(!q.empty())
|
||||||
|
notify(*q.front());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wake up all contexts waiting on the condition_variable.
|
||||||
|
///
|
||||||
|
/// We post all notifications without requesting direct context
|
||||||
|
/// switches. This ensures everyone gets notified in a single
|
||||||
|
/// transaction without any interleaving during this process.
|
||||||
inline void
|
inline void
|
||||||
ircd::ctx::condition_variable::notify_all()
|
ircd::ctx::condition_variable::notify_all()
|
||||||
noexcept
|
noexcept
|
||||||
{
|
{
|
||||||
d.notify_all();
|
q.for_each([](ctx &c)
|
||||||
|
{
|
||||||
|
ircd::ctx::notify(c);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class lock>
|
template<class lock>
|
||||||
void
|
void
|
||||||
ircd::ctx::condition_variable::wait(lock &l)
|
ircd::ctx::condition_variable::wait(lock &l)
|
||||||
{
|
{
|
||||||
const unlock_guard<lock> ul(l);
|
assert(current);
|
||||||
d.wait();
|
const unwind remove{[this]
|
||||||
|
{
|
||||||
|
q.remove(current);
|
||||||
|
}};
|
||||||
|
|
||||||
|
q.push_back(current);
|
||||||
|
const unlock_guard<lock> ul{l};
|
||||||
|
ircd::ctx::wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class lock,
|
template<class lock,
|
||||||
|
@ -70,31 +101,46 @@ void
|
||||||
ircd::ctx::condition_variable::wait(lock &l,
|
ircd::ctx::condition_variable::wait(lock &l,
|
||||||
predicate&& pred)
|
predicate&& pred)
|
||||||
{
|
{
|
||||||
const unlock_guard<lock> ul(l);
|
if(pred())
|
||||||
return d.wait([&l, &pred]
|
return;
|
||||||
{
|
|
||||||
l.lock();
|
|
||||||
const unwind ul{[&l]
|
|
||||||
{
|
|
||||||
l.unlock();
|
|
||||||
}};
|
|
||||||
|
|
||||||
return pred();
|
assert(current);
|
||||||
});
|
const unwind remove{[this]
|
||||||
|
{
|
||||||
|
q.remove(current);
|
||||||
|
}};
|
||||||
|
|
||||||
|
q.push_back(current); do
|
||||||
|
{
|
||||||
|
const unlock_guard<lock> ul{l};
|
||||||
|
ircd::ctx::wait();
|
||||||
|
}
|
||||||
|
while(!pred());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if notified; false if timed out
|
||||||
template<class lock,
|
template<class lock,
|
||||||
class duration>
|
class duration>
|
||||||
std::cv_status
|
std::cv_status
|
||||||
ircd::ctx::condition_variable::wait_for(lock &l,
|
ircd::ctx::condition_variable::wait_for(lock &l,
|
||||||
const duration &dur)
|
const duration &dur)
|
||||||
{
|
{
|
||||||
const unlock_guard<lock> ul(l);
|
static const duration zero(0);
|
||||||
return d.wait_for(dur)?
|
|
||||||
|
assert(current);
|
||||||
|
const unwind remove{[this]
|
||||||
|
{
|
||||||
|
q.remove(current);
|
||||||
|
}};
|
||||||
|
|
||||||
|
q.push_back(current);
|
||||||
|
const unlock_guard<lock> ul{l};
|
||||||
|
return ircd::ctx::wait<std::nothrow_t>(dur) > zero?
|
||||||
std::cv_status::no_timeout:
|
std::cv_status::no_timeout:
|
||||||
std::cv_status::timeout;
|
std::cv_status::timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if predicate passed; false if timed out
|
||||||
template<class lock,
|
template<class lock,
|
||||||
class duration,
|
class duration,
|
||||||
class predicate>
|
class predicate>
|
||||||
|
@ -103,31 +149,55 @@ ircd::ctx::condition_variable::wait_for(lock &l,
|
||||||
const duration &dur,
|
const duration &dur,
|
||||||
predicate&& pred)
|
predicate&& pred)
|
||||||
{
|
{
|
||||||
const unlock_guard<lock> ul(l);
|
static const duration zero(0);
|
||||||
return d.wait_for(dur, [&l, &pred]
|
|
||||||
{
|
|
||||||
l.lock();
|
|
||||||
const unwind ul{[&l]
|
|
||||||
{
|
|
||||||
l.unlock();
|
|
||||||
}};
|
|
||||||
|
|
||||||
return pred();
|
if(pred())
|
||||||
});
|
return true;
|
||||||
|
|
||||||
|
assert(current);
|
||||||
|
const unwind remove{[this]
|
||||||
|
{
|
||||||
|
q.remove(current);
|
||||||
|
}};
|
||||||
|
|
||||||
|
q.push_back(current); do
|
||||||
|
{
|
||||||
|
bool expired;
|
||||||
|
{
|
||||||
|
const unlock_guard<lock> ul{l};
|
||||||
|
expired = ircd::ctx::wait<std::nothrow_t>(dur) <= zero;
|
||||||
|
};
|
||||||
|
|
||||||
|
if(pred())
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if(expired)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
while(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if notified; false if timed out
|
||||||
template<class lock,
|
template<class lock,
|
||||||
class time_point>
|
class time_point>
|
||||||
std::cv_status
|
std::cv_status
|
||||||
ircd::ctx::condition_variable::wait_until(lock &l,
|
ircd::ctx::condition_variable::wait_until(lock &l,
|
||||||
time_point&& tp)
|
time_point&& tp)
|
||||||
{
|
{
|
||||||
const unlock_guard<lock> ul(l);
|
assert(current);
|
||||||
return d.wait_until(std::forward<time_point>(tp))?
|
const unwind remove{[this]
|
||||||
std::cv_status::no_timeout:
|
{
|
||||||
std::cv_status::timeout;
|
q.remove(current);
|
||||||
|
}};
|
||||||
|
|
||||||
|
q.push_back(current);
|
||||||
|
const unlock_guard<lock> ul{l};
|
||||||
|
return ircd::ctx::wait_until<std::nothrow_t>(tp)?
|
||||||
|
std::cv_status::timeout:
|
||||||
|
std::cv_status::no_timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if predicate passed; false if timed out
|
||||||
template<class lock,
|
template<class lock,
|
||||||
class time_point,
|
class time_point,
|
||||||
class predicate>
|
class predicate>
|
||||||
|
@ -136,15 +206,58 @@ ircd::ctx::condition_variable::wait_until(lock &l,
|
||||||
time_point&& tp,
|
time_point&& tp,
|
||||||
predicate&& pred)
|
predicate&& pred)
|
||||||
{
|
{
|
||||||
const unlock_guard<lock> ul(l);
|
if(pred())
|
||||||
return d.wait_until(std::forward<time_point>(tp), [&l, &pred]
|
return true;
|
||||||
{
|
|
||||||
l.lock();
|
|
||||||
const unwind ul{[&l]
|
|
||||||
{
|
|
||||||
l.unlock();
|
|
||||||
}};
|
|
||||||
|
|
||||||
return pred();
|
assert(current);
|
||||||
});
|
const unwind remove{[this]
|
||||||
|
{
|
||||||
|
q.remove(current);
|
||||||
|
}};
|
||||||
|
|
||||||
|
q.push_back(current); do
|
||||||
|
{
|
||||||
|
bool expired;
|
||||||
|
{
|
||||||
|
const unlock_guard<lock> ul{l};
|
||||||
|
expired = ircd::ctx::wait_until<std::nothrow_t>(tp);
|
||||||
|
};
|
||||||
|
|
||||||
|
if(pred())
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if(expired)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
while(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void
|
||||||
|
ircd::ctx::condition_variable::notify(ctx &ctx)
|
||||||
|
noexcept
|
||||||
|
{
|
||||||
|
// This branch handles condition_variable.notify() being called from outside
|
||||||
|
// the context system. If a context is currently running we can make a direct
|
||||||
|
// context-switch with yield(ctx), otherwise notify(ctx) enqueues the context.
|
||||||
|
|
||||||
|
if(current)
|
||||||
|
ircd::ctx::yield(ctx);
|
||||||
|
else
|
||||||
|
ircd::ctx::notify(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The number of contexts waiting in the queue.
|
||||||
|
inline size_t
|
||||||
|
ircd::ctx::condition_variable::size()
|
||||||
|
const
|
||||||
|
{
|
||||||
|
return q.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The number of contexts waiting in the queue.
|
||||||
|
inline bool
|
||||||
|
ircd::ctx::condition_variable::empty()
|
||||||
|
const
|
||||||
|
{
|
||||||
|
return q.empty();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue