diff --git a/include/ircd/ctx/continuation.h b/include/ircd/ctx/continuation.h index dbc5b78ad..a22af05a7 100644 --- a/include/ircd/ctx/continuation.h +++ b/include/ircd/ctx/continuation.h @@ -50,8 +50,10 @@ struct ircd::ctx::continuation operator const boost::asio::yield_context &() const; operator boost::asio::yield_context &(); + virtual void interrupted(ctx *const &) noexcept; + continuation(ctx *const &self = ircd::ctx::current); - ~continuation() noexcept; + virtual ~continuation() noexcept; }; /// This type of continuation should be used when yielding a context to a @@ -66,5 +68,17 @@ struct ircd::ctx::continuation struct ircd::ctx::to_asio :ircd::ctx::continuation { - using continuation::continuation; + using function = std::function; + + function handler; + + void interrupted(ctx *const &) noexcept final override; + + to_asio(const function &handler = {}); }; + +inline +ircd::ctx::to_asio::to_asio(const function &handler) +:continuation{ircd::ctx::current} +,handler{handler} +{} diff --git a/ircd/ctx.cc b/ircd/ctx.cc index 72cc542de..a02267908 100644 --- a/ircd/ctx.cc +++ b/ircd/ctx.cc @@ -28,6 +28,7 @@ struct ircd::ctx::ctx uintptr_t stack_base; // assigned when spawned size_t stack_max; // User given stack size int64_t notes; // norm: 0 = asleep; 1 = awake; inc by others; dec by self + continuation *cont; // valid when asleep; invalid when awake ctx *adjoindre; // context waiting for this to join() microseconds awake; // monotonic counter ctx *next; // next node in a ctx::list @@ -76,6 +77,7 @@ ircd::ctx::ctx::ctx(const char *const &name, ,stack_base{0} ,stack_max{stack_max} ,notes{1} +,cont{nullptr} ,adjoindre{nullptr} ,awake{0us} ,next{nullptr} @@ -183,8 +185,14 @@ ircd::ctx::ctx::wait() if(--notes > 0) return false; + const auto interruption{[this] + (ctx *const &interruptor) noexcept + { + wake(); + }}; + boost::system::error_code ec; - alarm.async_wait(boost::asio::yield_context{to_asio{this}}[ec]); + alarm.async_wait(boost::asio::yield_context{to_asio{interruption}}[ec]); assert(ec == errc::operation_canceled || ec == errc::success); assert(current == this); @@ -217,7 +225,10 @@ try } catch(const boost::system::system_error &e) { - ircd::log::error("ctx::wake(%p): %s", this, e.what()); + log::error + { + "ctx::wake(%p): %s", this, e.what() + }; } /// Throws if this context has been flagged for interruption and clears @@ -416,8 +427,15 @@ ircd::ctx::signal(ctx &ctx, void ircd::ctx::interrupt(ctx &ctx) { + if(finished(ctx)) + return; + + if(interruption(ctx)) + return; + ctx.flags |= context::INTERRUPTED; - ctx.wake(); + if(likely(&ctx != current && ctx.cont != nullptr)) + ctx.cont->interrupted(current); } /// Indicates if `ctx` was ever jumped to @@ -500,6 +518,7 @@ ircd::ctx::continuation::continuation(ctx *const &self) assert(!critical_asserted); assert(self != nullptr); assert(self->notes <= 1); + self->cont = this; ircd::ctx::current = nullptr; } @@ -509,6 +528,15 @@ noexcept ircd::ctx::current = self; self->notes = 1; mark(prof::event::CUR_CONTINUE); + + // self->continuation is not null'ed here; it remains an invalid + // pointer while the context is awake. +} + +void +ircd::ctx::continuation::interrupted(ctx *const &interruptor) +noexcept +{ } ircd::ctx::continuation::operator boost::asio::yield_context &() @@ -522,6 +550,18 @@ const return *self->yc; } +// +// to_asio +// + +void +ircd::ctx::to_asio::interrupted(ctx *const &interruptor) +noexcept +{ + if(handler) + handler(interruptor); +} + /////////////////////////////////////////////////////////////////////////////// // // ctx/context.h