mirror of
https://github.com/matrix-construct/construct
synced 2024-11-25 16:22:35 +01:00
ircd::ctx: Support shared futures.
This commit is contained in:
parent
74679a3a5f
commit
1210523757
7 changed files with 538 additions and 287 deletions
|
@ -60,13 +60,8 @@ struct ircd::ctx::future
|
|||
template<class duration> T get(const duration &d);
|
||||
template<class time_point> T get_until(const time_point &);
|
||||
|
||||
future() = default;
|
||||
future(promise<T> &promise);
|
||||
future(future &&) noexcept;
|
||||
future(const future &) = delete;
|
||||
future &operator=(future &&) noexcept;
|
||||
future &operator=(const future &) = delete;
|
||||
~future() noexcept;
|
||||
using shared_state<T>::shared_state;
|
||||
using shared_state<T>::operator=;
|
||||
};
|
||||
|
||||
template<>
|
||||
|
@ -89,125 +84,26 @@ struct ircd::ctx::future<void>
|
|||
template<class duration> void wait(const duration &d) const;
|
||||
void wait() const;
|
||||
|
||||
IRCD_OVERLOAD(already)
|
||||
|
||||
future(promise<void> &promise);
|
||||
future(already_t); // construct in ready state
|
||||
future() = default;
|
||||
future(future &&) noexcept;
|
||||
future(const future &) = delete;
|
||||
future &operator=(future &&) noexcept;
|
||||
future &operator=(const future &) = delete;
|
||||
~future() noexcept;
|
||||
using shared_state<void>::shared_state;
|
||||
using shared_state<void>::operator=;
|
||||
};
|
||||
|
||||
namespace ircd::ctx
|
||||
{
|
||||
}
|
||||
|
||||
template<class... T>
|
||||
struct ircd::ctx::scoped_future
|
||||
:future<T...>
|
||||
{
|
||||
template<class... Args> scoped_future(Args&&... args);
|
||||
using future<T...>::future;
|
||||
~scoped_future() noexcept;
|
||||
};
|
||||
|
||||
template<class... T>
|
||||
template<class... Args>
|
||||
ircd::ctx::scoped_future<T...>::scoped_future(Args&&... args)
|
||||
:future<T...>{std::forward<Args>(args)...}
|
||||
{
|
||||
}
|
||||
|
||||
template<class... T>
|
||||
ircd::ctx::scoped_future<T...>::~scoped_future()
|
||||
noexcept
|
||||
{
|
||||
if(std::uncaught_exceptions())
|
||||
if(std::uncaught_exceptions() || !this->valid())
|
||||
return;
|
||||
|
||||
if(this->valid())
|
||||
this->wait();
|
||||
}
|
||||
|
||||
template<class T>
|
||||
ircd::ctx::future<T>::future(promise<T> &promise)
|
||||
:shared_state<T>{promise}
|
||||
{
|
||||
assert(!promise.valid());
|
||||
update(state());
|
||||
assert(promise.valid());
|
||||
}
|
||||
|
||||
inline
|
||||
ircd::ctx::future<void>::future(promise<void> &promise)
|
||||
:shared_state<void>{promise}
|
||||
{
|
||||
assert(!promise.valid());
|
||||
update(state());
|
||||
assert(promise.valid());
|
||||
}
|
||||
|
||||
inline
|
||||
ircd::ctx::future<void>::future(already_t)
|
||||
{
|
||||
set(state(), future_state::READY);
|
||||
}
|
||||
|
||||
template<class T>
|
||||
ircd::ctx::future<T>::future(future<T> &&o)
|
||||
noexcept
|
||||
:shared_state<T>{std::move(o)}
|
||||
{
|
||||
update(state());
|
||||
o.state().p = nullptr;
|
||||
}
|
||||
|
||||
inline
|
||||
ircd::ctx::future<void>::future(future<void> &&o)
|
||||
noexcept
|
||||
:shared_state<void>{std::move(o)}
|
||||
{
|
||||
update(state());
|
||||
o.state().p = nullptr;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
ircd::ctx::future<T> &
|
||||
ircd::ctx::future<T>::operator=(future<T> &&o)
|
||||
noexcept
|
||||
{
|
||||
this->~future();
|
||||
static_cast<shared_state<T> &>(*this) = std::move(o);
|
||||
update(state());
|
||||
o.state().p = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
inline ircd::ctx::future<void> &
|
||||
ircd::ctx::future<void>::operator=(future<void> &&o)
|
||||
noexcept
|
||||
{
|
||||
this->~future();
|
||||
static_cast<shared_state<void> &>(*this) = std::move(o);
|
||||
update(state());
|
||||
o.state().p = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
ircd::ctx::future<T>::~future()
|
||||
noexcept
|
||||
{
|
||||
invalidate(state());
|
||||
}
|
||||
|
||||
inline
|
||||
ircd::ctx::future<void>::~future()
|
||||
noexcept
|
||||
{
|
||||
invalidate(state());
|
||||
this->wait();
|
||||
}
|
||||
|
||||
template<class T>
|
||||
|
@ -331,11 +227,7 @@ const
|
|||
{
|
||||
if(_wait_until(*this, tp, std::nothrow))
|
||||
{
|
||||
auto &state
|
||||
{
|
||||
const_cast<future<void> *>(this)->state()
|
||||
};
|
||||
|
||||
auto &state(mutable_cast(this->state()));
|
||||
set(state, future_state::RETRIEVED);
|
||||
if(bool(state.eptr))
|
||||
std::rethrow_exception(state.eptr);
|
||||
|
@ -362,11 +254,7 @@ ircd::ctx::_wait_until(const future<T> &f,
|
|||
const time_point &tp,
|
||||
std::nothrow_t)
|
||||
{
|
||||
auto &state
|
||||
{
|
||||
const_cast<future<T> &>(f).state()
|
||||
};
|
||||
|
||||
auto &state(mutable_cast(f.state()));
|
||||
if(unlikely(is(state, future_state::INVALID)))
|
||||
throw no_state{};
|
||||
|
||||
|
|
|
@ -37,15 +37,22 @@ namespace ircd::ctx
|
|||
/// of these promises are making the same promise to the same shared_state;
|
||||
/// the list allows for copy semantics which are important for some callback
|
||||
/// systems (like boost::asio). This solution is far more optimal than
|
||||
/// allocating the promise in a shared_ptr and refcounting...
|
||||
/// allocating the promise in a shared_ptr and refcounting... Note that the
|
||||
/// same semantic exists on the future side to implement shared futures. Both
|
||||
/// parties maintain a pointer to the head of a singly linked list of the
|
||||
/// other party, and a pointer to the next instance of their own party.
|
||||
struct ircd::ctx::promise_base
|
||||
{
|
||||
static void remove(shared_state_base &, promise_base &);
|
||||
static void update(promise_base &new_, promise_base &old);
|
||||
static void append(promise_base &new_, promise_base &old);
|
||||
// Internal operations
|
||||
static const promise_base *head(const shared_state_base &);
|
||||
static const promise_base *head(const promise_base &);
|
||||
static size_t refcount(const shared_state_base &);
|
||||
|
||||
shared_state_base *st {nullptr};
|
||||
mutable promise_base *next {nullptr};
|
||||
static promise_base *head(promise_base &);
|
||||
static promise_base *head(shared_state_base &);
|
||||
|
||||
shared_state_base *st {nullptr}; // the head of all sharing futures
|
||||
mutable promise_base *next {nullptr}; // next sharing promise
|
||||
|
||||
template<class T> const shared_state<T> &state() const noexcept;
|
||||
template<class T> shared_state<T> &state() noexcept;
|
||||
|
@ -54,6 +61,7 @@ struct ircd::ctx::promise_base
|
|||
|
||||
void check_pending() const;
|
||||
void make_ready();
|
||||
void remove();
|
||||
|
||||
public:
|
||||
bool valid() const noexcept;
|
||||
|
@ -137,7 +145,23 @@ ircd::ctx::promise<T>::set_value(T&& val)
|
|||
return;
|
||||
|
||||
check_pending();
|
||||
state().val = std::move(val);
|
||||
auto *state
|
||||
{
|
||||
shared_state_base::head(*this)
|
||||
};
|
||||
|
||||
assert(state);
|
||||
if(shared_state_base::refcount(*state) > 1) do
|
||||
{
|
||||
assert(is(*state, future_state::PENDING));
|
||||
static_cast<shared_state<T> &>(*state).val = val;
|
||||
}
|
||||
while((state = state->next)); else
|
||||
{
|
||||
assert(is(this->state(), future_state::PENDING));
|
||||
this->state().val = std::move(val);
|
||||
}
|
||||
|
||||
make_ready();
|
||||
}
|
||||
|
||||
|
@ -149,7 +173,13 @@ ircd::ctx::promise<T>::set_value(const T &val)
|
|||
return;
|
||||
|
||||
check_pending();
|
||||
state().val = val;
|
||||
auto *state(shared_state_base::head(*this)); do
|
||||
{
|
||||
assert(state);
|
||||
assert(is(*state, future_state::PENDING));
|
||||
static_cast<shared_state<T> &>(*state).val = val;
|
||||
}
|
||||
while((state = state->next));
|
||||
make_ready();
|
||||
}
|
||||
|
||||
|
@ -172,6 +202,15 @@ const
|
|||
// promise_base
|
||||
//
|
||||
|
||||
inline void
|
||||
ircd::ctx::promise_base::check_pending()
|
||||
const
|
||||
{
|
||||
assert(valid());
|
||||
if(unlikely(!is(state(), future_state::PENDING)))
|
||||
throw promise_already_satisfied{};
|
||||
}
|
||||
|
||||
inline bool
|
||||
ircd::ctx::promise_base::operator!()
|
||||
const noexcept
|
||||
|
|
|
@ -21,14 +21,11 @@ namespace ircd::ctx
|
|||
template<> struct shared_state<void>;
|
||||
|
||||
IRCD_EXCEPTION(ircd::ctx::error, future_error)
|
||||
IRCD_OVERLOAD(already)
|
||||
|
||||
future_state state(const shared_state_base &);
|
||||
bool is(const shared_state_base &, const future_state &);
|
||||
void set(shared_state_base &, const future_state &);
|
||||
size_t refcount(const shared_state_base &);
|
||||
void invalidate(shared_state_base &);
|
||||
void update(shared_state_base &);
|
||||
void notify(shared_state_base &);
|
||||
}
|
||||
|
||||
/// Internal state enumeration for the promise / future / related. These can
|
||||
|
@ -47,23 +44,47 @@ enum class ircd::ctx::future_state
|
|||
/// Internal Non-template base of the state object shared by promise and
|
||||
/// future. It is extended by the appropriate template, and usually resides
|
||||
/// in the future's instance, where the promise finds it.
|
||||
///
|
||||
/// There can be multiple promises and multiple futures all associated with
|
||||
/// the same resolution event. All promises point to the first
|
||||
/// shared_state_base (future) of the associated shared_state_base list. When
|
||||
/// any promise in the list of associated promises sets a result, it copies
|
||||
/// the result to all futures in the list; if only one future, it std::move()'s
|
||||
/// the result; then the association of all promises and all futures and
|
||||
/// respective lists are invalidated.
|
||||
///
|
||||
/// Note that the only way to traverse the list of shared_state_bases's is to
|
||||
/// dereference the promise pointer (head promise) and follow the st->next
|
||||
/// list. The only way to traverse the list of promises is to dereference a
|
||||
/// shared_state_base with a valid *p in future_state::PENDING and chase the
|
||||
/// p->next list.
|
||||
struct ircd::ctx::shared_state_base
|
||||
{
|
||||
static const shared_state_base *head(const shared_state_base &);
|
||||
static const shared_state_base *head(const promise_base &);
|
||||
static size_t refcount(const shared_state_base &);
|
||||
|
||||
static shared_state_base *head(shared_state_base &);
|
||||
static shared_state_base *head(promise_base &);
|
||||
|
||||
mutable dock cond;
|
||||
std::exception_ptr eptr;
|
||||
std::function<void (shared_state_base &)> then;
|
||||
mutable shared_state_base *next{nullptr}; // next sharing future
|
||||
union
|
||||
{
|
||||
promise_base *p {nullptr};
|
||||
promise_base *p {nullptr}; // the head of all sharing promises
|
||||
future_state st;
|
||||
};
|
||||
|
||||
shared_state_base(promise_base &p);
|
||||
shared_state_base() = default;
|
||||
shared_state_base(shared_state_base &&) = default;
|
||||
shared_state_base(const shared_state_base &) = delete;
|
||||
shared_state_base &operator=(shared_state_base &&) = default;
|
||||
shared_state_base &operator=(const shared_state_base &) = delete;
|
||||
shared_state_base(already_t);
|
||||
shared_state_base(promise_base &);
|
||||
shared_state_base(shared_state_base &&) noexcept;
|
||||
shared_state_base(const shared_state_base &);
|
||||
shared_state_base &operator=(promise_base &);
|
||||
shared_state_base &operator=(shared_state_base &&) noexcept;
|
||||
shared_state_base &operator=(const shared_state_base &);
|
||||
~shared_state_base() noexcept;
|
||||
};
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ ircd::ctx::when_any(it first,
|
|||
if(is(state(closure(first)), future_state::PENDING))
|
||||
when::set_any_then(p, first, closure);
|
||||
|
||||
if(refcount(p.state()) <= 1)
|
||||
if(promise<it>::refcount(p.state()) <= 1)
|
||||
p.set_value(last);
|
||||
|
||||
return ret;
|
||||
|
@ -118,7 +118,7 @@ ircd::ctx::when_all(it first,
|
|||
if(is(state(closure(first)), future_state::PENDING))
|
||||
when::set_all_then(p, first, closure);
|
||||
|
||||
if(refcount(p.state()) <= 1)
|
||||
if(promise<void>::refcount(p.state()) <= 1)
|
||||
p.set_value();
|
||||
|
||||
return ret;
|
||||
|
@ -174,10 +174,10 @@ ircd::ctx::when::all_then(promise<void> &p)
|
|||
if(!p.valid())
|
||||
return;
|
||||
|
||||
if(refcount(p.state()) < 2)
|
||||
if(promise<void>::refcount(p.state()) < 2)
|
||||
return p.set_value();
|
||||
|
||||
return p.remove(p.state(), p);
|
||||
return p.remove();
|
||||
}
|
||||
|
||||
/// In order for this template to be reusable with std::set iterations we
|
||||
|
|
|
@ -101,7 +101,7 @@ struct ircd::m::fetch::opts
|
|||
struct ircd::m::fetch::result
|
||||
{
|
||||
/// Backing buffer for any data pointed to by this result.
|
||||
unique_buffer<mutable_buffer> buf;
|
||||
shared_buffer<mutable_buffer> buf;
|
||||
|
||||
/// The backing buffer may contain other data ahead of the response
|
||||
/// content; in any case this points to a view of the response content.
|
||||
|
|
|
@ -980,7 +980,7 @@ ircd::client::close(const net::close_opts &opts)
|
|||
{
|
||||
return likely(sock) && !sock->fini?
|
||||
net::close(*sock, opts):
|
||||
ctx::future<void>::already;
|
||||
ctx::already;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
587
ircd/ctx.cc
587
ircd/ctx.cc
|
@ -1913,6 +1913,22 @@ ircd::ctx::prof::reflect(const event &e)
|
|||
// ctx/promise.h
|
||||
//
|
||||
|
||||
namespace ircd::ctx
|
||||
{
|
||||
static void set_promises_state(shared_state_base &);
|
||||
static void invalidate_promises(shared_state_base &);
|
||||
static void append(shared_state_base &new_, shared_state_base &old);
|
||||
static void update(shared_state_base &new_, shared_state_base &old);
|
||||
static void remove(shared_state_base &);
|
||||
static void notify(shared_state_base &);
|
||||
|
||||
static void set_futures_promise(promise_base &);
|
||||
static void invalidate_futures(promise_base &);
|
||||
static void append(promise_base &new_, promise_base &old);
|
||||
static void update(promise_base &new_, promise_base &old);
|
||||
static void remove(promise_base &);
|
||||
}
|
||||
|
||||
//
|
||||
// promise<void>
|
||||
//
|
||||
|
@ -1949,11 +1965,7 @@ noexcept
|
|||
:st{std::move(o.st)}
|
||||
,next{std::move(o.next)}
|
||||
{
|
||||
if(st)
|
||||
{
|
||||
update(*this, o);
|
||||
o.st = nullptr;
|
||||
}
|
||||
update(*this, o);
|
||||
}
|
||||
|
||||
ircd::ctx::promise_base::promise_base(const promise_base &o)
|
||||
|
@ -1970,11 +1982,7 @@ noexcept
|
|||
this->~promise_base();
|
||||
st = std::move(o.st);
|
||||
next = std::move(o.next);
|
||||
if(!st)
|
||||
return *this;
|
||||
|
||||
update(*this, o);
|
||||
o.st = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -1984,10 +1992,10 @@ noexcept
|
|||
if(!valid())
|
||||
return;
|
||||
|
||||
if(refcount(state()) == 1)
|
||||
if(promise_base::refcount(state()) == 1)
|
||||
set_exception(make_exception_ptr<broken_promise>());
|
||||
else
|
||||
remove(state(), *this);
|
||||
|
||||
remove();
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -1997,53 +2005,169 @@ ircd::ctx::promise_base::set_exception(std::exception_ptr eptr)
|
|||
return;
|
||||
|
||||
check_pending();
|
||||
state().eptr = std::move(eptr);
|
||||
for(auto *st(shared_state_base::head(*this)); st; st = st->next)
|
||||
st->eptr = eptr;
|
||||
|
||||
make_ready();
|
||||
}
|
||||
|
||||
void
|
||||
ircd::ctx::promise_base::remove()
|
||||
{
|
||||
if(!valid())
|
||||
return;
|
||||
|
||||
ircd::ctx::remove(*this);
|
||||
assert(!valid());
|
||||
}
|
||||
|
||||
void
|
||||
ircd::ctx::promise_base::make_ready()
|
||||
{
|
||||
auto &st(state());
|
||||
assert(valid());
|
||||
shared_state_base *next
|
||||
{
|
||||
shared_state_base::head(*this)
|
||||
};
|
||||
|
||||
// First we have to chase the linked list of promises reachable
|
||||
// from this shared_state. invalidate() will null their pointer
|
||||
// to the shared_state indicating the promise was already satisfied.
|
||||
// This is done first because the set() to the READY writes to the
|
||||
// same union as the promise pointer (see shared_state.h).
|
||||
invalidate(st);
|
||||
// same union as the promise pointer (see shared_state.h). Then
|
||||
// chase the linked lists of futures and make_ready() each one.
|
||||
assert(next);
|
||||
invalidate_promises(*next); do
|
||||
{
|
||||
// Now set the shared_state to READY. We know the location of the
|
||||
// shared state by saving it in this frame earlier, otherwise
|
||||
// invalidate_promises() would have nulled it.
|
||||
set(*next, future_state::READY);
|
||||
|
||||
// Now set the shared_state to READY. We know the location of the
|
||||
// shared state by saving it in this frame earlier, otherwise invalidate()
|
||||
// would have nulled it.
|
||||
set(st, future_state::READY);
|
||||
|
||||
// Finally call the notify() routine which will tell the future the promise
|
||||
// was satisfied and the value/exception is ready for them. This call may
|
||||
// notify an ircd::ctx and/or post a function to the ircd::ios for a then()
|
||||
// callback etc.
|
||||
notify(st);
|
||||
// Finally call the notify() routine which will tell the future the promise
|
||||
// was satisfied and the value/exception is ready for them. This call may
|
||||
// notify an ircd::ctx and/or post a function to the ircd::ios for a then()
|
||||
// callback etc.
|
||||
notify(*next);
|
||||
}
|
||||
while((next = next->next));
|
||||
|
||||
// At this point the promise should no longer be considered valid; no longer
|
||||
// referring to the shared_state.
|
||||
assert(!valid());
|
||||
}
|
||||
|
||||
void
|
||||
ircd::ctx::promise_base::check_pending()
|
||||
const
|
||||
/// Internal use; returns the number of copies of the promise reachable from
|
||||
/// the linked list headed by the shared state. This is used to indicate when
|
||||
/// the last copy has destructed which may result in a broken_promise exception
|
||||
/// being sent to the future.
|
||||
size_t
|
||||
ircd::ctx::promise_base::refcount(const shared_state_base &st)
|
||||
{
|
||||
assert(valid());
|
||||
if(unlikely(!is(state(), future_state::PENDING)))
|
||||
throw promise_already_satisfied{};
|
||||
size_t ret{0};
|
||||
if(!is(st, future_state::PENDING))
|
||||
return ret;
|
||||
|
||||
for(const auto *next(head(mutable_cast(st))); next; next = next->next)
|
||||
++ret;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
ircd::ctx::promise_base *
|
||||
ircd::ctx::promise_base::head(promise_base &p)
|
||||
{
|
||||
return p.st && head(*p.st)?
|
||||
head(*p.st):
|
||||
std::addressof(p);
|
||||
}
|
||||
|
||||
ircd::ctx::promise_base *
|
||||
ircd::ctx::promise_base::head(shared_state_base &st)
|
||||
{
|
||||
return is(st, future_state::PENDING)?
|
||||
st.p:
|
||||
nullptr;
|
||||
}
|
||||
|
||||
const ircd::ctx::promise_base *
|
||||
ircd::ctx::promise_base::head(const promise_base &p)
|
||||
{
|
||||
return p.st && head(*p.st)?
|
||||
head(*p.st):
|
||||
nullptr;
|
||||
}
|
||||
|
||||
const ircd::ctx::promise_base *
|
||||
ircd::ctx::promise_base::head(const shared_state_base &st)
|
||||
{
|
||||
return is(st, future_state::PENDING)?
|
||||
st.p:
|
||||
nullptr;
|
||||
}
|
||||
|
||||
//
|
||||
// internal
|
||||
//
|
||||
|
||||
/// Internal semantics; removes the promise from the linked list of promises.
|
||||
void
|
||||
ircd::ctx::remove(promise_base &p)
|
||||
{
|
||||
promise_base *last
|
||||
{
|
||||
promise_base::head(p)
|
||||
};
|
||||
|
||||
if(last == &p && p.next)
|
||||
set_futures_promise(*p.next);
|
||||
|
||||
if(last)
|
||||
for(auto *next{last->next}; next; last = next, next = last->next)
|
||||
if(next == &p)
|
||||
{
|
||||
last->next = p.next;
|
||||
break;
|
||||
}
|
||||
|
||||
p.st = nullptr;
|
||||
p.next = nullptr;
|
||||
}
|
||||
|
||||
/// Internal semantics; updates the location of a promise within the linked
|
||||
/// list of related promises (for move semantic).
|
||||
void
|
||||
ircd::ctx::update(promise_base &new_,
|
||||
promise_base &old)
|
||||
{
|
||||
new_.next = old.next;
|
||||
promise_base *last
|
||||
{
|
||||
promise_base::head(old)
|
||||
};
|
||||
|
||||
if(last == &old)
|
||||
set_futures_promise(new_);
|
||||
|
||||
if(last)
|
||||
for(auto *next{last->next}; next; last = next, next = last->next)
|
||||
if(next == &old)
|
||||
{
|
||||
last->next = &new_;
|
||||
break;
|
||||
}
|
||||
|
||||
old.st = nullptr;
|
||||
old.next = nullptr;
|
||||
}
|
||||
|
||||
/// Internal semantics; chases the linked list of promises and adds a reference
|
||||
/// to a new copy at the end (for copy semantic).
|
||||
void
|
||||
ircd::ctx::promise_base::append(promise_base &new_,
|
||||
promise_base &old)
|
||||
ircd::ctx::append(promise_base &new_,
|
||||
promise_base &old)
|
||||
{
|
||||
assert(new_.st);
|
||||
if(!old.next)
|
||||
{
|
||||
old.next = &new_;
|
||||
|
@ -2052,59 +2176,37 @@ ircd::ctx::promise_base::append(promise_base &new_,
|
|||
|
||||
promise_base *next{old.next};
|
||||
for(; next->next; next = next->next);
|
||||
|
||||
assert(!next->next);
|
||||
next->next = &new_;
|
||||
}
|
||||
|
||||
/// Internal semantics; updates the location of a promise within the linked
|
||||
/// list of related promises (for move semantic).
|
||||
void
|
||||
ircd::ctx::promise_base::update(promise_base &new_,
|
||||
promise_base &old)
|
||||
ircd::ctx::set_futures_promise(promise_base &p)
|
||||
{
|
||||
assert(old.st);
|
||||
auto &st{*old.st};
|
||||
if(!is(st, future_state::PENDING))
|
||||
return;
|
||||
|
||||
if(st.p == &old)
|
||||
auto *next
|
||||
{
|
||||
st.p = &new_;
|
||||
return;
|
||||
}
|
||||
shared_state_base::head(p)
|
||||
};
|
||||
|
||||
promise_base *last{st.p};
|
||||
for(promise_base *next{last->next}; next; last = next, next = last->next)
|
||||
if(next == &old)
|
||||
{
|
||||
last->next = &new_;
|
||||
break;
|
||||
}
|
||||
for(; next; next = next->next)
|
||||
{
|
||||
assert(is(*next, future_state::PENDING));
|
||||
next->p = std::addressof(p);
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal semantics; removes the promise from the linked list of promises.
|
||||
/// Because the linked list of promises is a forward singly-linked list this
|
||||
/// operation requires a reference to the list's head in shared_state_base
|
||||
/// (for dtor semantic).
|
||||
void
|
||||
ircd::ctx::promise_base::remove(shared_state_base &st,
|
||||
promise_base &p)
|
||||
ircd::ctx::invalidate_futures(promise_base &p)
|
||||
{
|
||||
if(!is(st, future_state::PENDING))
|
||||
return;
|
||||
|
||||
if(st.p == &p)
|
||||
auto *next
|
||||
{
|
||||
st.p = p.next;
|
||||
return;
|
||||
}
|
||||
shared_state_base::head(p)
|
||||
};
|
||||
|
||||
promise_base *last{st.p};
|
||||
for(promise_base *next{last->next}; next; last = next, next = last->next)
|
||||
if(next == &p)
|
||||
{
|
||||
last->next = p.next;
|
||||
break;
|
||||
}
|
||||
for(; next; next = next->next)
|
||||
if(is(*next, future_state::PENDING))
|
||||
next->p = nullptr;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2112,70 +2214,6 @@ ircd::ctx::promise_base::remove(shared_state_base &st,
|
|||
// ctx/shared_shared.h
|
||||
//
|
||||
|
||||
/// Internal use
|
||||
void
|
||||
ircd::ctx::notify(shared_state_base &st)
|
||||
{
|
||||
if(!st.then)
|
||||
{
|
||||
st.cond.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
if(!current)
|
||||
{
|
||||
st.cond.notify_all();
|
||||
assert(bool(st.then));
|
||||
st.then(st);
|
||||
return;
|
||||
}
|
||||
|
||||
const stack_usage_assertion sua;
|
||||
st.cond.notify_all();
|
||||
assert(bool(st.then));
|
||||
st.then(st);
|
||||
}
|
||||
|
||||
/// Internal use; chases the linked list of promises starting from the head
|
||||
/// in the shared_state and invalidates all of their references to the shared
|
||||
/// state. This will cause the promise to no longer be valid().
|
||||
///
|
||||
void
|
||||
ircd::ctx::invalidate(shared_state_base &st)
|
||||
{
|
||||
if(is(st, future_state::PENDING))
|
||||
for(promise_base *p{st.p}; p; p = p->next)
|
||||
p->st = nullptr;
|
||||
}
|
||||
|
||||
/// Internal use; chases the linked list of promises starting from the head in
|
||||
/// the shared_state and updates the location of the shared_state within each
|
||||
/// promise. This is used to tell the promises when the shared_state itself
|
||||
/// has relocated.
|
||||
///
|
||||
void
|
||||
ircd::ctx::update(shared_state_base &st)
|
||||
{
|
||||
if(is(st, future_state::PENDING))
|
||||
for(promise_base *p{st.p}; p; p = p->next)
|
||||
p->st = &st;
|
||||
}
|
||||
|
||||
/// Internal use; returns the number of copies of the promise reachable from
|
||||
/// the linked list headed by the shared state. This is used to indicate when
|
||||
/// the last copy has destructed which may result in a broken_promise exception
|
||||
/// being sent to the future.
|
||||
size_t
|
||||
ircd::ctx::refcount(const shared_state_base &st)
|
||||
{
|
||||
size_t ret{0};
|
||||
if(is(st, future_state::PENDING))
|
||||
for(const promise_base *p{st.p}; p; p = p->next)
|
||||
++ret;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Internal use; sets the state indicator within the shared_state object. Take
|
||||
/// special note that this data is unionized. Setting a state here will clobber
|
||||
/// the shared_state's reference to its promise.
|
||||
|
@ -2254,16 +2292,281 @@ ircd::ctx::state(const shared_state_base &st)
|
|||
// shared_state_base::shared_state_base
|
||||
//
|
||||
|
||||
ircd::ctx::shared_state_base::shared_state_base(promise_base &p)
|
||||
:p{&p}
|
||||
ircd::ctx::shared_state_base::shared_state_base(already_t)
|
||||
{
|
||||
set(*this, future_state::READY);
|
||||
}
|
||||
|
||||
ircd::ctx::shared_state_base::shared_state_base(promise_base &p)
|
||||
{
|
||||
// assign the promise pointer in our new shared_state contained
|
||||
// in the future. If the promise already has a shared_state, that
|
||||
// means this is a shared future.
|
||||
this->p = promise_base::head(p);
|
||||
assert(!this->next);
|
||||
|
||||
// Add this future (shared_state) to the end of the list of futures. Else
|
||||
// this is not a shared future, this is the head of the futures list told
|
||||
// to all shared promises.
|
||||
if(!p.st)
|
||||
{
|
||||
p.st = this;
|
||||
set_promises_state(*this);
|
||||
}
|
||||
else append(*this, *p.st);
|
||||
|
||||
assert(p.valid());
|
||||
assert(is(*this, future_state::PENDING));
|
||||
}
|
||||
|
||||
ircd::ctx::shared_state_base::shared_state_base(shared_state_base &&o)
|
||||
noexcept
|
||||
:cond{std::move(o.cond)}
|
||||
,eptr{std::move(o.eptr)}
|
||||
,then{std::move(o.then)}
|
||||
,next{std::move(o.next)}
|
||||
,p{std::move(o.p)}
|
||||
{
|
||||
update(*this, o);
|
||||
}
|
||||
|
||||
ircd::ctx::shared_state_base::shared_state_base(const shared_state_base &o)
|
||||
:p{o.p}
|
||||
{
|
||||
append(*this, mutable_cast(o));
|
||||
}
|
||||
|
||||
ircd::ctx::shared_state_base &
|
||||
ircd::ctx::shared_state_base::operator=(promise_base &p)
|
||||
{
|
||||
this->~shared_state_base();
|
||||
new (this) shared_state_base{p};
|
||||
return *this;
|
||||
}
|
||||
|
||||
ircd::ctx::shared_state_base &
|
||||
ircd::ctx::shared_state_base::operator=(shared_state_base &&o)
|
||||
noexcept
|
||||
{
|
||||
this->~shared_state_base();
|
||||
eptr = std::move(o.eptr);
|
||||
then = std::move(o.then);
|
||||
next = std::move(o.next);
|
||||
p = std::move(o.p);
|
||||
update(*this, o);
|
||||
return *this;
|
||||
}
|
||||
|
||||
ircd::ctx::shared_state_base &
|
||||
ircd::ctx::shared_state_base::operator=(const shared_state_base &o)
|
||||
{
|
||||
this->~shared_state_base();
|
||||
eptr = o.eptr;
|
||||
then = o.then;
|
||||
p = o.p;
|
||||
append(*this, mutable_cast(o));
|
||||
return *this;
|
||||
}
|
||||
|
||||
// Linkage
|
||||
ircd::ctx::shared_state_base::~shared_state_base()
|
||||
noexcept
|
||||
{
|
||||
then = {};
|
||||
const auto refcount
|
||||
{
|
||||
this->refcount(*this)
|
||||
};
|
||||
|
||||
assert(refcount >= 1);
|
||||
if(refcount <= 1)
|
||||
invalidate_promises(*this);
|
||||
|
||||
remove(*this);
|
||||
}
|
||||
|
||||
//
|
||||
// util
|
||||
//
|
||||
|
||||
/// Count the number of associated futures
|
||||
size_t
|
||||
ircd::ctx::shared_state_base::refcount(const shared_state_base &st)
|
||||
{
|
||||
size_t ret{0};
|
||||
for(const auto *next(head(st)); next; next = next->next)
|
||||
++ret;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Get the head of the futures from any associated promise
|
||||
ircd::ctx::shared_state_base *
|
||||
ircd::ctx::shared_state_base::head(promise_base &p)
|
||||
{
|
||||
return p.st;
|
||||
}
|
||||
|
||||
/// Get the head of the futures from any associated future
|
||||
ircd::ctx::shared_state_base *
|
||||
ircd::ctx::shared_state_base::head(shared_state_base &st)
|
||||
{
|
||||
return is(st, future_state::PENDING) && head(*st.p)?
|
||||
head(*st.p):
|
||||
std::addressof(st);
|
||||
}
|
||||
|
||||
/// Get the head of the futures from any associated promise
|
||||
const ircd::ctx::shared_state_base *
|
||||
ircd::ctx::shared_state_base::head(const promise_base &p)
|
||||
{
|
||||
return p.st;
|
||||
}
|
||||
|
||||
/// Get the head of the futures from any associated future
|
||||
const ircd::ctx::shared_state_base *
|
||||
ircd::ctx::shared_state_base::head(const shared_state_base &st)
|
||||
{
|
||||
return is(st, future_state::PENDING) && head(*st.p)?
|
||||
head(*st.p):
|
||||
std::addressof(st);
|
||||
}
|
||||
|
||||
//
|
||||
// internal
|
||||
//
|
||||
|
||||
void
|
||||
ircd::ctx::notify(shared_state_base &st)
|
||||
{
|
||||
if(!st.then)
|
||||
{
|
||||
st.cond.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
if(!current)
|
||||
{
|
||||
st.cond.notify_all();
|
||||
assert(bool(st.then));
|
||||
st.then(st);
|
||||
st.then = {};
|
||||
return;
|
||||
}
|
||||
|
||||
const stack_usage_assertion sua;
|
||||
st.cond.notify_all();
|
||||
assert(bool(st.then));
|
||||
st.then(st);
|
||||
st.then = {};
|
||||
}
|
||||
|
||||
/// Remove the future from the list of futures.
|
||||
void
|
||||
ircd::ctx::remove(shared_state_base &st)
|
||||
{
|
||||
shared_state_base *last
|
||||
{
|
||||
shared_state_base::head(st)
|
||||
};
|
||||
|
||||
if(last == &st && is(st, future_state::PENDING))
|
||||
{
|
||||
if(last->next)
|
||||
set_promises_state(*last->next);
|
||||
else
|
||||
invalidate_promises(st);
|
||||
}
|
||||
|
||||
assert(last);
|
||||
for(auto *next(last->next); next; last = next, next = last->next)
|
||||
if(next == &st)
|
||||
{
|
||||
last->next = st.next;
|
||||
break;
|
||||
}
|
||||
|
||||
st.next = nullptr;
|
||||
st.p = nullptr;
|
||||
}
|
||||
|
||||
/// Replace associated future old with new_. This is used to implement the
|
||||
/// object move semantics.
|
||||
void
|
||||
ircd::ctx::update(shared_state_base &new_,
|
||||
shared_state_base &old)
|
||||
{
|
||||
shared_state_base *last
|
||||
{
|
||||
shared_state_base::head(old)
|
||||
};
|
||||
|
||||
assert(last);
|
||||
if(last == &old && is(*last, future_state::PENDING))
|
||||
set_promises_state(new_);
|
||||
|
||||
new_.next = old.next;
|
||||
for(auto *next{last->next}; next; last = next, next = last->next)
|
||||
if(next == &old)
|
||||
{
|
||||
last->next = &new_;
|
||||
break;
|
||||
}
|
||||
|
||||
old.p = nullptr;
|
||||
old.next = nullptr;
|
||||
}
|
||||
|
||||
/// Add a new future sharing the list
|
||||
void
|
||||
ircd::ctx::append(shared_state_base &new_,
|
||||
shared_state_base &old)
|
||||
{
|
||||
assert(!new_.next);
|
||||
assert(is(new_, future_state::PENDING));
|
||||
if(!old.next)
|
||||
{
|
||||
old.next = &new_;
|
||||
return;
|
||||
}
|
||||
|
||||
shared_state_base *next{old.next};
|
||||
for(; next->next; next = next->next);
|
||||
|
||||
assert(!next->next);
|
||||
next->next = &new_;
|
||||
}
|
||||
|
||||
/// Internal use; chases the linked list of promises starting from the head in
|
||||
/// the shared_state and updates the location of the shared_state within each
|
||||
/// promise. This is used to tell the promises when the shared_state itself
|
||||
/// has relocated.
|
||||
void
|
||||
ircd::ctx::set_promises_state(shared_state_base &st)
|
||||
{
|
||||
assert(is(st, future_state::PENDING));
|
||||
promise_base *next
|
||||
{
|
||||
promise_base::head(st)
|
||||
};
|
||||
|
||||
assert(next);
|
||||
for(; next; next = next->next)
|
||||
next->st = std::addressof(st);
|
||||
}
|
||||
|
||||
/// Chases the linked list of promises starting from the head
|
||||
/// in the shared_state and invalidates all of their references to the shared
|
||||
/// state. This will cause the promise to no longer be valid().
|
||||
///
|
||||
void
|
||||
ircd::ctx::invalidate_promises(shared_state_base &st)
|
||||
{
|
||||
promise_base *next
|
||||
{
|
||||
promise_base::head(st)
|
||||
};
|
||||
|
||||
for(; next; next = next->next)
|
||||
next->st = nullptr;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in a new issue