0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 10:12:39 +01:00

ircd::ctx: Improve context interface.

This commit is contained in:
Jason Volk 2016-09-09 22:23:07 -07:00
parent 9d351686cd
commit 89c2e74f3b
6 changed files with 168 additions and 116 deletions

View file

@ -311,12 +311,8 @@ console_spawn()
return;
// The console function is executed asynchronously.
// This call may or may not return immediately.
ircd::context context(std::bind(&console));
// The console may be joined unless it is released here;
// now cleanup must occur by other means.
context.detach();
// The SELF_DESTRUCT indicates it will clean itself up.
ircd::context(std::bind(&console), ircd::ctx::SELF_DESTRUCT);
}
const char *const console_message
@ -337,7 +333,6 @@ try
{
console_active = false;
console_in = nullptr;
free(console_ctx);
});
console_active = true;

View file

@ -29,35 +29,60 @@
namespace ircd {
namespace ctx {
using std::chrono::microseconds;
using std::chrono::steady_clock;
using time_point = steady_clock::time_point;
IRCD_EXCEPTION(ircd::error, error)
IRCD_EXCEPTION(error, interrupted)
IRCD_EXCEPTION(error, timeout)
enum flags
{
DEFER_POST = 0x01, // Defers spawn with an ios.post()
DEFER_DISPATCH = 0x02, // (Defers) spawn with an ios.dispatch()
DEFER_STRAND = 0x04, // Defers spawn by posting to strand
SPAWN_STRAND = 0x08, // Spawn onto a strand, otherwise ios itself
DEFER_POST = 0x0001, // Defers spawn with an ios.post()
DEFER_DISPATCH = 0x0002, // (Defers) spawn with an ios.dispatch()
DEFER_STRAND = 0x0004, // Defers spawn by posting to strand
SPAWN_STRAND = 0x0008, // Spawn onto a strand, otherwise ios itself
SELF_DESTRUCT = 0x0010, // Context deletes itself; see struct context constructor notes
INTERRUPTED = 0x0020, // Marked
};
const auto DEFAULT_STACK_SIZE = 64_KiB;
// Context implementation
struct ctx; // Internal implementation to hide boost headers
const int64_t &notes(const ctx &); // Peeks at internal semaphore count (you don't need this)
const flags &flags(const ctx &); // Get the internal flags value.
bool finished(const ctx &); // Context function returned (or exception).
bool started(const ctx &); // Context was ever entered.
void interrupt(ctx &); // Interrupt the context for termination.
bool notify(ctx &); // Increment the semaphore (only library ppl need this)
// this_context
extern __thread struct ctx *current; // Always set to the currently running context or null
ctx &cur(); // Convenience for *current (try to use this instead)
void free(const ctx *); // Manual delete (for the incomplete type)
const int64_t &notes(const ctx &); // Peeks at internal semaphore count (you don't need this)
bool started(const ctx &); // Context was ever entered. (can't know if finished)
bool notify(ctx &); // Increment the semaphore (only library ppl need this)
void wait(); // Returns when context notified.
// Return remaining time if notified; or <= 0 if not, and timeout thrown on throw overloads
microseconds wait(const microseconds &, const std::nothrow_t &);
template<class E, class duration> nothrow_overload<E, duration> wait(const duration &);
template<class E = timeout, class duration> throw_overload<E, duration> wait(const duration &);
// Returns false if notified; true if time point reached, timeout thrown on throw_overloads
bool wait_until(const time_point &tp, const std::nothrow_t &);
template<class E> nothrow_overload<E, bool> wait_until(const time_point &tp);
template<class E = timeout> throw_overload<E> wait_until(const time_point &tp);
class context
{
std::unique_ptr<ctx> c;
public:
bool operator!() const { return !c; }
operator bool() const { return bool(c); }
operator const ctx &() const { return *c; }
operator ctx &() { return *c; }
@ -66,8 +91,9 @@ class context
void join(); // Blocks the current context until this one finishes
ctx *detach(); // other calls undefined after this call
context(const size_t &stack_size, std::function<void ()>, const flags &flags = (enum flags)0);
context(std::function<void ()>, const flags &flags = (enum flags)0);
// Note: Constructing with SELF_DESTRUCT flag makes any further use of this object undefined.
context(const size_t &stack_size, std::function<void ()>, const enum flags &flags = (enum flags)0);
context(std::function<void ()>, const enum flags &flags = (enum flags)0);
context(context &&) noexcept = default;
context(const context &) = delete;
~context() noexcept;
@ -75,51 +101,50 @@ class context
friend void swap(context &a, context &b) noexcept;
};
// "namespace this_context;" interface
std::chrono::microseconds wait(const std::chrono::microseconds &);
std::chrono::microseconds wait();
template<class duration>
duration wait(const duration &d)
{
using std::chrono::duration_cast;
const auto us(duration_cast<std::chrono::microseconds>(d));
return duration_cast<duration>(wait(us));
}
template<class E>
nothrow_overload<E, bool>
wait_until(const std::chrono::steady_clock::time_point &tp)
{
static const auto zero(tp - tp);
return wait(tp - std::chrono::steady_clock::now()) <= zero;
}
template<class E = timeout>
throw_overload<E>
wait_until(const std::chrono::steady_clock::time_point &tp)
{
if(wait_until<std::nothrow_t>(tp))
throw E();
}
template<class E = timeout>
throw_overload<E>
wait_until()
{
wait_until<E>(std::chrono::steady_clock::time_point::max());
}
inline
void swap(context &a, context &b)
inline void
swap(context &a, context &b)
noexcept
{
std::swap(a.c, b.c);
}
inline
ctx &cur()
template<class E>
throw_overload<E>
wait_until(const time_point &tp)
{
if(wait_until<std::nothrow_t>(tp))
throw E();
}
template<class E>
nothrow_overload<E, bool>
wait_until(const time_point &tp)
{
return wait_until(tp, std::nothrow);
}
template<class E,
class duration>
throw_overload<E, duration>
wait(const duration &d)
{
const auto ret(wait<std::nothrow_t>(d));
return ret <= duration(0)? throw E() : ret;
}
template<class E,
class duration>
nothrow_overload<E, duration>
wait(const duration &d)
{
using std::chrono::duration_cast;
const auto ret(wait(duration_cast<microseconds>(d), std::nothrow));
return duration_cast<duration>(ret);
}
inline ctx &
cur()
{
assert(current);
return *current;

View file

@ -35,23 +35,30 @@ struct ctx
boost::asio::yield_context *yc;
uintptr_t stack_base;
size_t stack_max;
int64_t notes; // norm: 0 = asleep; 1 = awake; inc by others; dec by self
int64_t notes; // norm: 0 = asleep; 1 = awake; inc by others; dec by self
ctx *adjoindre;
enum flags flags;
bool started() const;
bool finished() const { return yc == nullptr; }
bool started() const { return bool(yc); }
bool wait();
void wake();
bool note();
void operator()(boost::asio::yield_context, const std::function<void ()>) noexcept;
ctx(const size_t &stack_max = DEFAULT_STACK_SIZE,
boost::asio::io_service *const &ios = ircd::ios);
ctx(const size_t &stack_max = DEFAULT_STACK_SIZE,
const enum flags &flags = (enum flags)0,
boost::asio::io_service *const &ios = ircd::ios);
ctx(ctx &&) noexcept = delete;
ctx(const ctx &) = delete;
};
size_t stack_usage_here(const ctx &) __attribute__((noinline));
bool stack_warning(const ctx &, const double &pct = 0.80);
void stack_assertion(const ctx &, const double &pct = 0.80);
struct continuation
{
ctx *self;
@ -63,14 +70,13 @@ struct continuation
~continuation() noexcept;
};
size_t stack_usage_here(const ctx &) __attribute__((noinline));
inline
continuation::continuation(ctx *const &self)
:self{self}
{
assert(self != nullptr);
assert(self->notes <= 1);
stack_assertion(*self);
ircd::ctx::current = nullptr;
}
@ -95,6 +101,20 @@ const
return *self->yc;
}
inline void
stack_assertion(const ctx &ctx,
const double &pct)
{
assert(!stack_warning(ctx, pct));
}
inline bool
stack_warning(const ctx &ctx,
const double &pct)
{
return stack_usage_here(ctx) > double(ctx.stack_max) * pct;
}
inline bool
ctx::note()
{
@ -109,10 +129,7 @@ inline void
ctx::wake()
try
{
strand.dispatch([this]
{
alarm.cancel();
});
alarm.cancel();
}
catch(const boost::system::system_error &e)
{
@ -131,17 +148,15 @@ ctx::wait()
assert(ec == boost::system::errc::operation_canceled ||
ec == boost::system::errc::success);
// Interruption shouldn't be used for normal operation,
// so please eat this branch misprediction.
if(unlikely(flags & INTERRUPTED))
throw interrupted("ctx(%p)::wait()", (const void *)this);
// notes = 1; set by continuation dtor on wakeup
return true;
}
inline bool
ctx::started()
const
{
return bool(yc);
}
} // namespace ctx
using ctx::continuation;

View file

@ -66,10 +66,10 @@ inline void
dock::notify_one()
noexcept
{
const auto c(q.front());
q.pop_front();
if(q.empty())
return;
notify(*c);
notify(*q.front());
}
inline void
@ -77,8 +77,6 @@ dock::notify_all()
noexcept
{
const auto copy(q);
q.clear();
for(const auto &c : copy)
notify(*c);
}
@ -181,7 +179,9 @@ dock::wait_until(time_point&& tp,
inline void
dock::remove_self()
{
std::remove(begin(q), end(q), &cur());
const auto it(std::find(begin(q), end(q), &cur()));
assert(it != end(q));
q.erase(it);
}
} // namespace ctx

View file

@ -23,31 +23,35 @@
using namespace ircd;
/******************************************************************************
* ctx.h
*/
///////////////////////////////////////////////////////////////////////////////
//
// ctx.h
//
__thread ctx::ctx *ctx::current;
std::chrono::microseconds
ctx::wait()
bool
ctx::wait_until(const std::chrono::steady_clock::time_point &tp,
const std::nothrow_t &)
{
const auto rem(wait(std::chrono::microseconds::max()));
return std::chrono::microseconds::max() - rem;
auto &c(cur());
c.alarm.expires_at(tp);
c.wait(); // now you're yielding with portals
return std::chrono::steady_clock::now() >= tp;
}
std::chrono::microseconds
ctx::wait(const std::chrono::microseconds &duration)
ctx::wait(const std::chrono::microseconds &duration,
const std::nothrow_t &)
{
assert(current);
auto &c(cur());
c.alarm.expires_from_now(duration);
c.wait(); // now you're yielding with portals
const auto ret(c.alarm.expires_from_now());
// return remaining duration.
// this is > 0 if notified or interrupted
// this is unchanged if a note prevented any wait at all
const auto ret(c.alarm.expires_from_now());
return std::chrono::duration_cast<std::chrono::microseconds>(ret);
}
@ -61,8 +65,8 @@ ctx::wait()
ircd::ctx::context::context(const size_t &stack_sz,
std::function<void ()> func,
const flags &flags)
:c{std::make_unique<ctx>(stack_sz, ircd::ios)}
const enum flags &flags)
:c{std::make_unique<ctx>(stack_sz, flags, ircd::ios)}
{
auto spawn([stack_sz, c(c.get()), func(std::move(func))]
{
@ -88,10 +92,13 @@ ircd::ctx::context::context(const size_t &stack_sz,
ios->dispatch(std::move(spawn));
else
spawn();
if(flags & SELF_DESTRUCT)
c.release();
}
ircd::ctx::context::context(std::function<void ()> func,
const flags &flags)
const enum flags &flags)
:context
{
DEFAULT_STACK_SIZE,
@ -111,6 +118,7 @@ noexcept
if(!current)
return;
interrupt();
join();
}
@ -120,10 +128,10 @@ ircd::ctx::context::join()
if(joined())
return;
// Set the target context to notify this context when it finishes
assert(!c->adjoindre);
c->adjoindre = &cur();
wait();
c.reset(nullptr);
}
ircd::ctx::ctx *
@ -138,28 +146,44 @@ ircd::ctx::notify(ctx &ctx)
return ctx.note();
}
void
ircd::ctx::interrupt(ctx &ctx)
{
ctx.flags |= INTERRUPTED;
ctx.wake();
}
bool
ircd::ctx::started(const ctx &ctx)
{
return ctx.started();
}
bool
ircd::ctx::finished(const ctx &ctx)
{
return ctx.finished();
}
const enum ctx::flags &
ircd::ctx::flags(const ctx &ctx)
{
return ctx.flags;
}
const int64_t &
ircd::ctx::notes(const ctx &ctx)
{
return ctx.notes;
}
void
ircd::ctx::free(const ctx *const ptr)
{
delete ptr;
}
///////////////////////////////////////////////////////////////////////////////
//
// ctx_ctx.h
//
/******************************************************************************
* ctx_ctx.h
*/
ctx::ctx::ctx(const size_t &stack_max,
const enum flags &flags,
boost::asio::io_service *const &ios)
:alarm{*ios}
,yc{nullptr}
@ -167,6 +191,7 @@ ctx::ctx::ctx(const size_t &stack_max,
,stack_max{stack_max}
,notes{1}
,adjoindre{nullptr}
,flags{flags}
{
}
@ -180,9 +205,7 @@ noexcept
stack_base = uintptr_t(__builtin_frame_address(0));
ircd::ctx::current = this;
// If anything is done to `this' after func() or in atexit, func() cannot
// delete its own context, which is a worthy enough convenience to preserve.
const scope atexit([]
const scope atexit([this]
{
ircd::ctx::current = nullptr;
this->yc = nullptr;

View file

@ -31,7 +31,6 @@ namespace ircd
bool debugmode; // set by command line
boost::asio::io_service *ios; // user's io service
main_exit_cb main_exit_func; // Called when main context exits
ctx::ctx *mc; // IRCd's main context
void seed_random();
void init_system();
@ -63,16 +62,12 @@ ircd::init(boost::asio::io_service &io_service,
log::info("parsing your configuration");
conf::parse(configfile);
// The master of ceremonies runs the show after this function returns and ios.run()
// It cannot spawn when no ios is running so it is deferred just in case.
log::debug("spawning main context");
at_main_exit(std::move(main_exit_func));
context mc(8_MiB, ircd::main, ctx::DEFER_POST);
// The context will not be joined and block this function when no parent context
// is currently running, but it should still be detached here. It can then delete
// itself from within main().
ircd::mc = mc.detach();
// The master of ceremonies runs the show after this function returns and ios.run()
// The SELF_DESTRUCT flag indicates it will clean itself.
log::debug("spawning main context");
context(8_MiB, ircd::main, ctx::DEFER_POST | ctx::SELF_DESTRUCT);
log::debug("IRCd initialization completed.");
}
@ -82,7 +77,6 @@ ircd::main()
noexcept try
{
// Ownership is taken of the main context to delete it at function end
const custom_ptr<ctx::ctx> mc(ircd::mc, ctx::free);
const scope main_exit(&main_exiting);
log::debug("IRCd entered main context.");