0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-25 16:22:35 +01:00

ircd::ctx: Add primary features to context.

This commit is contained in:
Jason Volk 2016-09-07 14:39:41 -07:00
parent 249ba02faa
commit d2bb8cd8a7
6 changed files with 495 additions and 63 deletions

View file

@ -31,32 +31,92 @@ namespace ctx {
IRCD_EXCEPTION(ircd::error, error)
IRCD_EXCEPTION(error, interrupted)
IRCD_EXCEPTION(error, timeout)
enum flags
{
DEFER_POST = 0x01, // Defers spawn with an ios.post()
DEFER_DISPATCH = 0x02, // (Defers) spawn with an ios.dispatch()
DEFER_STRAND = 0x04, // Defers spawn by posting to strand
SPAWN_STRAND = 0x08, // Spawn onto a strand, otherwise ios itself
};
const auto DEFAULT_STACK_SIZE = 64_KiB;
struct ctx;
struct ctx; // Internal implementation to hide boost headers
extern __thread struct ctx *current; // Always set to the currently running context or null
ctx &cur(); // Convenience for *current (try to use this instead)
void free(const ctx *); // Manual delete (for the incomplete type)
const int64_t &notes(const ctx &); // Peeks at internal semaphore count (you don't need this)
bool started(const ctx &); // Context was ever entered. (can't know if finished)
bool notify(ctx &); // Increment the semaphore (only library ppl need this)
class context
{
using ptr = std::unique_ptr<ctx>;
using function = std::function<void ()>;
ptr c;
std::unique_ptr<ctx> c;
public:
void join();
ctx *detach();
context &swap(context &) noexcept;
operator const ctx &() const { return *c; }
operator ctx &() { return *c; }
context(const size_t &stack_size, const function &);
context(const function &);
bool joined() const { return !c || ircd::ctx::finished(*c); }
void interrupt() { ircd::ctx::interrupt(*c); }
void join(); // Blocks the current context until this one finishes
ctx *detach(); // other calls undefined after this call
context(const size_t &stack_size, std::function<void ()>, const flags &flags = (enum flags)0);
context(std::function<void ()>, const flags &flags = (enum flags)0);
context(context &&) noexcept = default;
context(const context &) = delete;
~context() noexcept;
friend void swap(context &a, context &b) noexcept;
};
extern __thread struct ctx *current;
// "namespace this_context;" interface
std::chrono::microseconds wait(const std::chrono::microseconds &);
std::chrono::microseconds wait();
void swap(context &a, context &b) noexcept;
template<class duration>
duration wait(const duration &d)
{
using std::chrono::duration_cast;
const auto us(duration_cast<std::chrono::microseconds>(d));
return duration_cast<duration>(wait(us));
}
template<class E>
nothrow_overload<E, bool>
wait_until(const std::chrono::steady_clock::time_point &tp)
{
static const auto zero(tp - tp);
return wait(tp - std::chrono::steady_clock::now()) <= zero;
}
template<class E = timeout>
throw_overload<E>
wait_until(const std::chrono::steady_clock::time_point &tp)
{
if(wait_until<std::nothrow_t>(tp))
throw E();
}
template<class E = timeout>
throw_overload<E>
wait_until()
{
wait_until<E>(std::chrono::steady_clock::time_point::max());
}
inline
void swap(context &a, context &b)
noexcept
{
std::swap(a.c, b.c);
}
inline
ctx &cur()
@ -67,8 +127,8 @@ ctx &cur()
} // namespace ctx
using ctx::cur;
using ctx::context;
using ctx::timeout;
} // namespace ircd
#endif // __cplusplus

View file

@ -28,28 +28,120 @@ namespace ctx {
struct ctx
{
boost::asio::io_service::strand strand;
boost::asio::steady_timer alarm;
size_t stack_size;
const uint8_t *stack_base;
boost::asio::steady_timer alarm; // 64B
boost::asio::yield_context *yc;
uintptr_t stack_base;
size_t stack_max;
int64_t notes; // norm: 0 = asleep; 1 = awake; inc by others; dec by self
ctx *adjoindre;
operator boost::asio::yield_context();
bool started() const;
bool wait();
void wake();
bool note();
void operator()(boost::asio::yield_context, const std::function<void ()>);
void operator()(boost::asio::yield_context, const std::function<void ()>) noexcept;
ctx(const size_t &stack_size = DEFAULT_STACK_SIZE,
ctx(const size_t &stack_max = DEFAULT_STACK_SIZE,
boost::asio::io_service *const &ios = ircd::ios);
ctx(ctx &&) noexcept = delete;
ctx(const ctx &) = delete;
~ctx() noexcept;
};
inline
ctx::operator boost::asio::yield_context()
struct continuation
{
return *yc;
ctx *self;
operator const boost::asio::yield_context &() const;
operator boost::asio::yield_context &();
continuation(ctx *const &self = ircd::ctx::current);
~continuation() noexcept;
};
size_t stack_usage_here(const ctx &) __attribute__((noinline));
inline
continuation::continuation(ctx *const &self)
:self{self}
{
assert(self != nullptr);
assert(self->notes <= 1);
ircd::ctx::current = nullptr;
}
inline
continuation::~continuation()
noexcept
{
ircd::ctx::current = self;
self->notes = 1;
}
inline
continuation::operator boost::asio::yield_context &()
{
return *self->yc;
}
inline
continuation::operator const boost::asio::yield_context &()
const
{
return *self->yc;
}
inline bool
ctx::note()
{
if(notes++ > 0)
return false;
wake();
return true;
}
inline void
ctx::wake()
try
{
strand.dispatch([this]
{
alarm.cancel();
});
}
catch(const boost::system::system_error &e)
{
ircd::log::error("ctx::wake(%p): %s", this, e.what());
}
inline bool
ctx::wait()
{
if(--notes > 0)
return false;
boost::system::error_code ec;
alarm.async_wait(boost::asio::yield_context(continuation(this))[ec]);
assert(ec == boost::system::errc::operation_canceled ||
ec == boost::system::errc::success);
// notes = 1; set by continuation dtor on wakeup
return true;
}
inline bool
ctx::started()
const
{
return bool(yc);
}
} // namespace ctx
using ctx::continuation;
using yield = boost::asio::yield_context;
} // namespace ircd

188
include/ircd/ctx_dock.h Normal file
View file

@ -0,0 +1,188 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#define HAVE_IRCD_CTX_DOCK_H
namespace ircd {
namespace ctx {
enum class cv_status
{
no_timeout, timeout
};
class dock
{
std::deque<ctx *> q;
void remove_self();
public:
auto size() const { return q.size(); }
template<class time_point, class predicate> bool wait_until(time_point&& tp, predicate&& pred);
template<class time_point> cv_status wait_until(time_point&& tp);
template<class duration, class predicate> bool wait_for(const duration &dur, predicate&& pred);
template<class duration> cv_status wait_for(const duration &dur);
template<class predicate> void wait(predicate&& pred);
void wait();
void notify_all() noexcept;
void notify_one() noexcept;
dock() = default;
~dock() noexcept;
};
inline
dock::~dock()
noexcept
{
assert(q.empty());
}
inline void
dock::notify_one()
noexcept
{
const auto c(q.front());
q.pop_front();
notify(*c);
}
inline void
dock::notify_all()
noexcept
{
const auto copy(q);
q.clear();
for(const auto &c : copy)
notify(*c);
}
inline void
dock::wait()
{
const scope remove(std::bind(&dock::remove_self, this));
q.emplace_back(&cur());
ircd::ctx::wait();
}
template<class predicate>
void
dock::wait(predicate&& pred)
{
if(pred())
return;
const scope remove(std::bind(&dock::remove_self, this));
q.emplace_back(&cur()); do
{
ircd::ctx::wait();
}
while(!pred());
}
template<class duration>
cv_status
dock::wait_for(const duration &dur)
{
static const duration zero(0);
const scope remove(std::bind(&dock::remove_self, this));
q.emplace_back(&cur());
return ircd::ctx::wait<std::nothrow_t>(dur) > zero? cv_status::no_timeout:
cv_status::timeout;
}
template<class duration,
class predicate>
bool
dock::wait_for(const duration &dur,
predicate&& pred)
{
static const duration zero(0);
if(pred())
return true;
const scope remove(std::bind(&dock::remove_self, this));
q.emplace_back(&cur()); do
{
const bool expired(ircd::ctx::wait<std::nothrow_t>(dur) <= zero);
if(pred())
return true;
if(expired)
return false;
}
while(1);
}
template<class time_point>
cv_status
dock::wait_until(time_point&& tp)
{
const scope remove(std::bind(&dock::remove_self, this));
q.emplace_back(&cur());
return ircd::ctx::wait_until<std::nothrow_t>(tp)? cv_status::timeout:
cv_status::no_timeout;
}
template<class time_point,
class predicate>
bool
dock::wait_until(time_point&& tp,
predicate&& pred)
{
if(pred())
return true;
const scope remove(std::bind(&dock::remove_self, this));
q.emplace_back(&cur()); do
{
const bool expired(ircd::ctx::wait_until<std::nothrow_t>(tp));
if(pred())
return true;
if(expired)
return false;
}
while(1);
}
inline void
dock::remove_self()
{
std::remove(begin(q), end(q), &cur());
}
} // namespace ctx
} // namespace ircd

View file

@ -35,6 +35,7 @@ namespace ircd
using ostream = std::ostream;
namespace ph = std::placeholders;
using namespace std::string_literals;
using namespace std::literals::chrono_literals;
}
// Temp fwd decl scaffold
@ -79,6 +80,7 @@ namespace ircd
#include "mode_lease.h"
#include "snomask.h"
#include "ctx.h"
#include "ctx_dock.h"
#include "cache.h"
#include "whowas.h"

View file

@ -74,14 +74,14 @@ extern "C" {
#include <RB_INC_SET
#include <RB_INC_LIST
#include <RB_INC_FORWARD_LIST
//#include <RB_INC_UNORDERED_MAP
//#include <RB_INC_DEQUE
#include <RB_INC_UNORDERED_MAP
#include <RB_INC_DEQUE
#include <RB_INC_SSTREAM
#include <RB_INC_FSTREAM
#include <RB_INC_IOSTREAM
#include <RB_INC_IOMANIP
#include <RB_INC_CSTDIO
//#include <RB_INC_CHRONO
#include <RB_INC_CHRONO
#include <RB_INC_CTIME
//#include <RB_INC_BOOST_LEXICAL_CAST_HPP

View file

@ -28,25 +28,75 @@ using namespace ircd;
*/
__thread ctx::ctx *ctx::current;
ircd::ctx::context::context(const size_t &stack_size,
const function &func)
:c{std::make_unique<ctx>(stack_size)}
std::chrono::microseconds
ctx::wait()
{
static const boost::coroutines::attributes attrs
{
stack_size,
boost::coroutines::stack_unwind
};
const auto bound(std::bind(&ctx::operator(), c.get(), ph::_1, func));
boost::asio::spawn(c->strand, bound, attrs);
const auto rem(wait(std::chrono::microseconds::max()));
return std::chrono::microseconds::max() - rem;
}
ircd::ctx::context::context(const function &func)
std::chrono::microseconds
ctx::wait(const std::chrono::microseconds &duration)
{
assert(current);
auto &c(cur());
c.alarm.expires_from_now(duration);
c.wait(); // now you're yielding with portals
// return remaining duration.
// this is > 0 if notified or interrupted
// this is unchanged if a note prevented any wait at all
const auto ret(c.alarm.expires_from_now());
return std::chrono::duration_cast<std::chrono::microseconds>(ret);
}
void
ctx::wait()
{
auto &c(cur());
c.alarm.expires_at(std::chrono::steady_clock::time_point::max());
c.wait(); // now you're yielding with portals
}
ircd::ctx::context::context(const size_t &stack_sz,
std::function<void ()> func,
const flags &flags)
:c{std::make_unique<ctx>(stack_sz, ircd::ios)}
{
auto spawn([stack_sz, c(c.get()), func(std::move(func))]
{
auto bound(std::bind(&ctx::operator(), c, ph::_1, std::move(func)));
const boost::coroutines::attributes attrs
{
stack_sz,
boost::coroutines::stack_unwind
};
boost::asio::spawn(*ios, std::move(bound), attrs);
});
// The current context must be reasserted if spawn returns here
const scope recurrent([current(ircd::ctx::current)]
{
ircd::ctx::current = current;
});
if(flags & DEFER_POST)
ios->post(std::move(spawn));
else if(flags & DEFER_DISPATCH)
ios->dispatch(std::move(spawn));
else
spawn();
}
ircd::ctx::context::context(std::function<void ()> func,
const flags &flags)
:context
{
DEFAULT_STACK_SIZE,
func
std::move(func),
flags
}
{
}
@ -54,21 +104,26 @@ ircd::ctx::context::context(const function &func)
ircd::ctx::context::~context()
noexcept
{
if(!c)
return;
// Can't join to bare metal, only from within another context.
if(!current)
return;
join();
}
void
ircd::ctx::context::join()
{
if(joined())
return;
}
ircd::ctx::context &
ircd::ctx::context::swap(context &other)
noexcept
{
std::swap(c, other.c);
return *this;
assert(!c->adjoindre);
c->adjoindre = &cur();
wait();
c.reset(nullptr);
}
ircd::ctx::ctx *
@ -77,39 +132,74 @@ ircd::ctx::context::detach()
return c.release();
}
void
ircd::ctx::swap(context &a, context &b)
noexcept
bool
ircd::ctx::notify(ctx &ctx)
{
a.swap(b);
return ctx.note();
}
bool
ircd::ctx::started(const ctx &ctx)
{
return ctx.started();
}
const int64_t &
ircd::ctx::notes(const ctx &ctx)
{
return ctx.notes;
}
void
ircd::ctx::free(const ctx *const ptr)
{
delete ptr;
}
/******************************************************************************
* ctx_ctx.h
*/
ctx::ctx::ctx(const size_t &stack_size,
ctx::ctx::ctx(const size_t &stack_max,
boost::asio::io_service *const &ios)
:strand{*ios}
,alarm{*ios}
,stack_size{stack_size}
,stack_base{nullptr}
:alarm{*ios}
,yc{nullptr}
{
}
ctx::ctx::~ctx()
noexcept
,stack_base{0}
,stack_max{stack_max}
,notes{1}
,adjoindre{nullptr}
{
}
void
ctx::ctx::operator()(boost::asio::yield_context yc,
const std::function<void ()> func)
noexcept
{
this->stack_base = reinterpret_cast<const uint8_t *>(__builtin_frame_address(0));
this->yc = &yc;
notes = 1;
stack_base = uintptr_t(__builtin_frame_address(0));
ircd::ctx::current = this;
// If anything is done to `this' after func() or in atexit, func() cannot
// delete its own context, which is a worthy enough convenience to preserve.
const scope atexit([]
{
ircd::ctx::current = nullptr;
this->yc = nullptr;
if(adjoindre)
notify(*adjoindre);
if(flags & SELF_DESTRUCT)
delete this;
});
if(likely(bool(func)))
func();
}
size_t
ctx::stack_usage_here(const ctx &ctx)
{
return ctx.stack_base - uintptr_t(__builtin_frame_address(0));
}