0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-25 16:22:35 +01:00

ircd::ctx: Updates; add ctx::view; add preliminary ctx::fault; various reorg.

This commit is contained in:
Jason Volk 2017-09-19 19:01:37 -07:00
parent c65610b4ab
commit 1acd278632
11 changed files with 455 additions and 177 deletions

View file

@ -163,7 +163,7 @@ try
// so the output of the command (if log messages) can be seen.
{
const log::console_quiet quiet(false);
boost::asio::async_read_until(in, buf, '\n', yield(continuation()));
boost::asio::async_read_until(in, buf, '\n', yield_context{continuation{}});
}
std::getline(is, line);

View file

@ -44,18 +44,62 @@
///
namespace ircd::ctx
{
using std::chrono::steady_clock;
using time_point = steady_clock::time_point;
IRCD_EXCEPTION(ircd::error, error)
IRCD_EXCEPTION(error, interrupted)
IRCD_EXCEPTION(error, timeout)
using std::chrono::steady_clock;
using time_point = steady_clock::time_point;
struct ctx;
const uint64_t &id(const ctx &); // Unique ID for context
string_view name(const ctx &); // User's optional label for context
const int64_t &notes(const ctx &); // Peeks at internal semaphore count
bool interruption(const ctx &); // Context was marked for interruption
bool finished(const ctx &); // Context function returned (or exception).
bool started(const ctx &); // Context was ever entered.
IRCD_OVERLOAD(threadsafe)
void interrupt(ctx &); // Interrupt the context for termination.
void signal(ctx &, std::function<void ()>); // Post function to context strand
void notify(ctx &, threadsafe_t); // Notify context with threadsafety.
bool notify(ctx &); // Queue a context switch to arg
void yield(ctx &); // Direct context switch to arg
}
#include "ctx/ctx.h"
/// Interface to the currently running context
namespace ircd::ctx { inline namespace this_ctx
{
// Always set to the currently running context or null for main stack
extern __thread struct ctx *current;
ctx &cur(); // Assumptional reference to *current
void wait(); // Returns when context is woken up.
void yield(); // Allow other contexts to run before returning.
void interruption_point(); // throws interrupted if interruption_requested()
bool interruption_requested(); // interruption(cur())
// Return remaining time if notified; or <= 0 if not, and timeout thrown on throw overloads
microseconds wait(const microseconds &, const std::nothrow_t &);
template<class E, class duration> nothrow_overload<E, duration> wait(const duration &);
template<class E = timeout, class duration> throw_overload<E, duration> wait(const duration &);
// Returns false if notified; true if time point reached, timeout thrown on throw_overloads
bool wait_until(const time_point &tp, const std::nothrow_t &);
template<class E> nothrow_overload<E, bool> wait_until(const time_point &tp);
template<class E = timeout> throw_overload<E> wait_until(const time_point &tp);
// Ignores notes. Throws if interrupted.
void sleep_until(const time_point &tp);
template<class duration> void sleep(const duration &);
void sleep(const int &secs);
}}
#include "ctx/context.h"
#include "ctx/prof.h"
#include "ctx/dock.h"
#include "ctx/view.h"
#include "ctx/queue.h"
#include "ctx/mutex.h"
#include "ctx/shared_state.h"
@ -64,10 +108,67 @@ namespace ircd::ctx
#include "ctx/async.h"
#include "ctx/pool.h"
#include "ctx/ole.h"
#include "ctx/fault.h"
namespace ircd
{
//using yield = boost::asio::yield_context;
using ctx::timeout;
using ctx::context;
using ctx::sleep;
}
inline void
ircd::ctx::this_ctx::sleep(const int &secs)
{
sleep(seconds(secs));
}
template<class duration>
void
ircd::ctx::this_ctx::sleep(const duration &d)
{
sleep_until(steady_clock::now() + d);
}
template<class E>
ircd::throw_overload<E>
ircd::ctx::this_ctx::wait_until(const time_point &tp)
{
if(wait_until<std::nothrow_t>(tp))
throw E();
}
template<class E>
ircd::nothrow_overload<E, bool>
ircd::ctx::this_ctx::wait_until(const time_point &tp)
{
return wait_until(tp, std::nothrow);
}
template<class E,
class duration>
ircd::throw_overload<E, duration>
ircd::ctx::this_ctx::wait(const duration &d)
{
const auto ret(wait<std::nothrow_t>(d));
return ret <= duration(0)? throw E() : ret;
}
template<class E,
class duration>
ircd::nothrow_overload<E, duration>
ircd::ctx::this_ctx::wait(const duration &d)
{
using std::chrono::duration_cast;
const auto ret(wait(duration_cast<microseconds>(d), std::nothrow));
return duration_cast<duration>(ret);
}
inline ircd::ctx::ctx &
ircd::ctx::cur()
{
assert(current);
return *current;
}

View file

@ -26,13 +26,15 @@
namespace ircd::ctx
{
using yield_context = boost::asio::yield_context;
struct continuation;
}
namespace ircd
{
using ctx::yield_context;
using ctx::continuation;
using yield = boost::asio::yield_context;
}
/// This object is placed on the top of the stack when the context is yielding (INTERNAL USE).

View file

@ -1,132 +0,0 @@
/*
* charybdis: oh just a little chat server
* ctx.h: userland context switching (stackful coroutines)
*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#define HAVE_IRCD_CTX_CTX_H
///////////////////////////////////////////////////////////////////////////////
//
// low-level ctx interface exposure
//
namespace ircd::ctx
{
struct ctx;
IRCD_OVERLOAD(threadsafe)
const uint64_t &id(const ctx &); // Unique ID for context
string_view name(const ctx &); // User's optional label for context
const int64_t &notes(const ctx &); // Peeks at internal semaphore count
bool finished(const ctx &); // Context function returned (or exception).
bool started(const ctx &); // Context was ever entered.
void interrupt(ctx &); // Interrupt the context for termination.
void strand(ctx &, std::function<void ()>); // Post function to context strand
void notify(ctx &, threadsafe_t); // Notify context with threadsafety.
bool notify(ctx &); // Queue a context switch
void yield(ctx &); // Direct context switch
}
///////////////////////////////////////////////////////////////////////////////
//
// "this_context" interface relevant to the currently running context
//
namespace ircd::ctx
{
extern __thread struct ctx *current; // Always set to the currently running context or null
ctx &cur(); // Convenience for *current (try to use this instead)
void yield(); // Allow other contexts to run before returning.
void wait(); // Returns when context is woken up.
// Return remaining time if notified; or <= 0 if not, and timeout thrown on throw overloads
microseconds wait(const microseconds &, const std::nothrow_t &);
template<class E, class duration> nothrow_overload<E, duration> wait(const duration &);
template<class E = timeout, class duration> throw_overload<E, duration> wait(const duration &);
// Returns false if notified; true if time point reached, timeout thrown on throw_overloads
bool wait_until(const time_point &tp, const std::nothrow_t &);
template<class E> nothrow_overload<E, bool> wait_until(const time_point &tp);
template<class E = timeout> throw_overload<E> wait_until(const time_point &tp);
// Ignores notes. Throws if interrupted.
void sleep_until(const time_point &tp);
template<class duration> void sleep(const duration &);
void sleep(const int &secs);
}
inline void
ircd::ctx::sleep(const int &secs)
{
sleep(seconds(secs));
}
template<class duration>
void
ircd::ctx::sleep(const duration &d)
{
sleep_until(steady_clock::now() + d);
}
template<class E>
ircd::throw_overload<E>
ircd::ctx::wait_until(const time_point &tp)
{
if(wait_until<std::nothrow_t>(tp))
throw E();
}
template<class E>
ircd::nothrow_overload<E, bool>
ircd::ctx::wait_until(const time_point &tp)
{
return wait_until(tp, std::nothrow);
}
template<class E,
class duration>
ircd::throw_overload<E, duration>
ircd::ctx::wait(const duration &d)
{
const auto ret(wait<std::nothrow_t>(d));
return ret <= duration(0)? throw E() : ret;
}
template<class E,
class duration>
ircd::nothrow_overload<E, duration>
ircd::ctx::wait(const duration &d)
{
using std::chrono::duration_cast;
const auto ret(wait(duration_cast<microseconds>(d), std::nothrow));
return duration_cast<duration>(ret);
}
inline ircd::ctx::ctx &
ircd::ctx::cur()
{
assert(current);
return *current;
}

View file

@ -41,6 +41,7 @@ class ircd::ctx::dock
void notify(ctx &) noexcept;
public:
bool empty() const;
size_t size() const;
template<class time_point, class predicate> bool wait_until(time_point&& tp, predicate&& pred);
@ -117,7 +118,11 @@ noexcept
inline void
ircd::ctx::dock::wait()
{
const scope remove(std::bind(&dock::remove_self, this));
const scope remove
{
std::bind(&dock::remove_self, this)
};
q.emplace_back(&cur());
ircd::ctx::wait();
}
@ -129,7 +134,11 @@ ircd::ctx::dock::wait(predicate&& pred)
if(pred())
return;
const scope remove(std::bind(&dock::remove_self, this));
const scope remove
{
std::bind(&dock::remove_self, this)
};
q.emplace_back(&cur()); do
{
ircd::ctx::wait();
@ -143,9 +152,12 @@ ircd::ctx::dock::wait_for(const duration &dur)
{
static const duration zero(0);
const scope remove(std::bind(&dock::remove_self, this));
q.emplace_back(&cur());
const scope remove
{
std::bind(&dock::remove_self, this)
};
q.emplace_back(&cur());
return ircd::ctx::wait<std::nothrow_t>(dur) > zero? cv_status::no_timeout:
cv_status::timeout;
}
@ -161,7 +173,11 @@ ircd::ctx::dock::wait_for(const duration &dur,
if(pred())
return true;
const scope remove(std::bind(&dock::remove_self, this));
const scope remove
{
std::bind(&dock::remove_self, this)
};
q.emplace_back(&cur()); do
{
const bool expired(ircd::ctx::wait<std::nothrow_t>(dur) <= zero);
@ -219,14 +235,6 @@ ircd::ctx::dock::wait_until(time_point&& tp,
while(1);
}
/// The number of contexts waiting in the queue.
inline size_t
ircd::ctx::dock::size()
const
{
return q.size();
}
inline void
ircd::ctx::dock::notify(ctx &ctx)
noexcept
@ -248,3 +256,19 @@ ircd::ctx::dock::remove_self()
assert(it != end(q));
q.erase(it);
}
/// The number of contexts waiting in the queue.
inline size_t
ircd::ctx::dock::size()
const
{
return q.size();
}
/// The number of contexts waiting in the queue.
inline bool
ircd::ctx::dock::empty()
const
{
return q.empty();
}

79
include/ircd/ctx/fault.h Normal file
View file

@ -0,0 +1,79 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#define HAVE_IRCD_CTX_FAULT_H
namespace ircd::ctx
{
IRCD_EXCEPTION(error, aborted)
IRCD_EXCEPTION(aborted, unhandled_fault)
template<class... args> struct fault;
}
// Faults add the notion of recoverable exceptions. C++ exceptions are not, as
// they destruct the stack and then clobber everything with the catch branch.
// A fault is an error handling device alternative to throwing an exception;
// Hitting a fault may stop the context until the fault is serviced to continue
// or a real exception is thrown to abort the context.
//
// A compelling example is std::bad_alloc, or an out of memory condition. A
// fault allows other contexts to free up their resources after which the
// faulty context can continue without having to unwind the work it's already
// made progress on to try again.
//
// Faults begin with the cost of a function call to a handler at the point of
// the fault. The handler's template specifies the argument list so the fault
// can safely observe or modify your data. The call to fault has no return
// value. If it returns the fault has been successfully serviced.
//
// Fault handlers must return true to default the faulty context. Handlers are
// also responsible for detecting if they are executing with an active
// exception which makes returning false considered a DOUBLE FAULT. This may
// lead to program termination because it's basically throwing an exception from
// a destructor (and can be useful proper behavior).
//
template<class... args>
struct ircd::ctx::fault
{
using handler = std::function<bool (args&&...)>;
handler h;
virtual bool handle(args&&... a)
{
if(unlikely(!h))
throw unhandled_fault{};
return h(std::forward<args>(a)...);
}
void operator()(args&&... a)
{
if(!handle(std::forward<args>(a)...))
throw aborted{};
}
fault(handler h)
:h{std::move(h)}
{}
};

View file

@ -47,11 +47,14 @@ struct ircd::ctx::pool
auto size() const { return ctxs.size(); }
auto avail() const { return available; }
auto queued() const { return queue.size(); }
auto active() const { return size() - avail(); }
auto pending() const { return active() + queued(); }
// control panel
void add(const size_t & = 1);
void del(const size_t & = 1);
void interrupt();
void join();
// dispatch function to pool
void operator()(closure);
@ -79,8 +82,16 @@ ircd::ctx::pool::async(F&& f,
{
using R = typename std::result_of<F (A...)>::type;
auto func(std::bind(std::forward<F>(f), std::forward<A>(a)...));
auto p(std::make_shared<promise<R>>());
auto func
{
std::bind(std::forward<F>(f), std::forward<A>(a)...)
};
auto p
{
std::make_shared<promise<R>>()
};
(*this)([p, func(std::move(func))]
() -> void
{
@ -98,8 +109,16 @@ ircd::ctx::pool::async(F&& f,
{
using R = typename std::result_of<F (A...)>::type;
auto func(std::bind(std::forward<F>(f), std::forward<A>(a)...));
auto p(std::make_shared<promise<R>>());
auto func
{
std::bind(std::forward<F>(f), std::forward<A>(a)...)
};
auto p
{
std::make_shared<promise<R>>()
};
(*this)([p, func(std::move(func))]
() -> void
{

161
include/ircd/ctx/view.h Normal file
View file

@ -0,0 +1,161 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#define HAVE_IRCD_CTX_VIEW_H
namespace ircd::ctx
{
template<class T> class view;
}
/// Device for a context to share data on its stack with others while yielding
///
/// The view yields a context while other contexts examine the object pointed
/// to in the view. This allows a producing context to construct something
/// on its stack and then wait for the consuming contexts to do something with
/// that data before the producer resumes and potentially destroys the data.
/// This creates a very simple and lightweight single-producer/multi-consumer
/// queue mechanism using only context switching.
///
/// Consumers get one chance to safely view the data when a call to wait()
/// returns. Once the consumer context yields again for any reason the data is
/// potentially invalid. The data can only be viewed once by the consumer
/// because the second call to wait() will yield until the next data is
/// made available by the producer, not the same data.
///
/// Producers will share an object during the call to notify(). Once the call
/// to notify() returns all consumers have viewed the data and the producer is
/// free to destroy it.
///
template<class T>
class ircd::ctx::view
{
T *t {nullptr};
dock a, b;
bool ready() const;
public:
size_t waiting() const;
// Consumer interface;
template<class time_point> T &wait_until(time_point&&);
template<class duration> T &wait_for(const duration &);
T &wait();
// Producer interface;
void notify(T &);
view() = default;
~view() noexcept;
};
template<class T>
ircd::ctx::view<T>::~view()
noexcept
{
assert(!waiting());
}
template<class T>
void
ircd::ctx::view<T>::notify(T &t)
{
const scope afterward{[this]
{
assert(a.empty());
this->t = nullptr;
if(!b.empty())
{
b.notify_all();
yield();
}
}};
assert(b.empty());
this->t = &t;
a.notify_all();
yield();
}
template<class T>
T &
ircd::ctx::view<T>::wait()
{
b.wait([this]
{
return !ready();
});
a.wait([this]
{
return ready();
});
assert(t != nullptr);
return *t;
}
template<class T>
template<class duration>
T &
ircd::ctx::view<T>::wait_for(const duration &dur)
{
return wait_until(now<steady_point>() + dur);
}
template<class T>
template<class time_point>
T &
ircd::ctx::view<T>::wait_until(time_point&& tp)
{
if(!b.wait_until(tp, [this]
{
return !ready();
}))
throw timeout();
if(!a.wait_until(tp, [this]
{
return ready();
}))
throw timeout();
assert(t != nullptr);
return *t;
}
template<class T>
size_t
ircd::ctx::view<T>::waiting()
const
{
return a.size() + b.size();
}
template<class T>
bool
ircd::ctx::view<T>::ready()
const
{
return t != nullptr;
}

View file

@ -185,7 +185,7 @@ ircd::socket::write(const iov &bufs)
{
return io(*this, out, [&]
{
return async_write(ssl, bufs, asio::transfer_all(), yield(continuation()));
return async_write(ssl, bufs, asio::transfer_all(), yield_context{continuation{}});
});
}
@ -196,7 +196,7 @@ ircd::socket::write(const iov &bufs,
{
return io(*this, out, [&]
{
return async_write(ssl, bufs, asio::transfer_all(), yield(continuation())[ec]);
return async_write(ssl, bufs, asio::transfer_all(), yield_context{continuation{}}[ec]);
});
}
@ -206,7 +206,7 @@ ircd::socket::write_some(const iov &bufs)
{
return io(*this, out, [&]
{
return ssl.async_write_some(bufs, yield(continuation()));
return ssl.async_write_some(bufs, yield_context{continuation{}});
});
}
@ -217,7 +217,7 @@ ircd::socket::write_some(const iov &bufs,
{
return io(*this, out, [&]
{
return ssl.async_write_some(bufs, yield(continuation())[ec]);
return ssl.async_write_some(bufs, yield_context{continuation{}}[ec]);
});
}
@ -227,7 +227,7 @@ ircd::socket::read(const iov &bufs)
{
return io(*this, in, [&]
{
const auto ret(async_read(ssl, bufs, yield(continuation())));
const size_t ret(async_read(ssl, bufs, yield_context{continuation{}}));
if(unlikely(!ret))
throw boost::system::system_error(boost::asio::error::eof);
@ -243,7 +243,7 @@ ircd::socket::read(const iov &bufs,
{
return io(*this, in, [&]
{
return async_read(ssl, bufs, yield(continuation())[ec]);
return async_read(ssl, bufs, yield_context{continuation{}}[ec]);
});
}
@ -253,7 +253,7 @@ ircd::socket::read_some(const iov &bufs)
{
return io(*this, in, [&]
{
const auto ret(ssl.async_read_some(bufs, yield(continuation())));
const size_t ret(ssl.async_read_some(bufs, yield_context{continuation{}}));
if(unlikely(!ret))
throw boost::system::system_error(boost::asio::error::eof);
@ -269,6 +269,6 @@ ircd::socket::read_some(const iov &bufs,
{
return io(*this, in, [&]
{
return ssl.async_read_some(bufs, yield(continuation())[ec]);
return ssl.async_read_some(bufs, yield_context{continuation{}}[ec]);
});
}

View file

@ -218,14 +218,14 @@ ircd::ctx::ctx::interruption_point(std::nothrow_t)
__thread ircd::ctx::ctx *ircd::ctx::current;
void
ircd::ctx::sleep_until(const std::chrono::steady_clock::time_point &tp)
ircd::ctx::this_ctx::sleep_until(const std::chrono::steady_clock::time_point &tp)
{
while(!wait_until(tp, std::nothrow));
}
bool
ircd::ctx::wait_until(const std::chrono::steady_clock::time_point &tp,
const std::nothrow_t &)
ircd::ctx::this_ctx::wait_until(const std::chrono::steady_clock::time_point &tp,
const std::nothrow_t &)
{
auto &c(cur());
c.alarm.expires_at(tp);
@ -235,8 +235,8 @@ ircd::ctx::wait_until(const std::chrono::steady_clock::time_point &tp,
}
std::chrono::microseconds
ircd::ctx::wait(const std::chrono::microseconds &duration,
const std::nothrow_t &)
ircd::ctx::this_ctx::wait(const std::chrono::microseconds &duration,
const std::nothrow_t &)
{
auto &c(cur());
c.alarm.expires_from_now(duration);
@ -250,7 +250,7 @@ ircd::ctx::wait(const std::chrono::microseconds &duration,
}
void
ircd::ctx::wait()
ircd::ctx::this_ctx::wait()
{
auto &c(cur());
c.alarm.expires_at(std::chrono::steady_clock::time_point::max());
@ -258,7 +258,7 @@ ircd::ctx::wait()
}
void
ircd::ctx::yield()
ircd::ctx::this_ctx::yield()
{
bool done(false);
const auto restore([&done, &me(cur())]
@ -275,6 +275,18 @@ ircd::ctx::yield()
while(!done);
}
void
ircd::ctx::this_ctx::interruption_point()
{
return cur().interruption_point();
}
bool
ircd::ctx::this_ctx::interruption_requested()
{
return interruption(cur());
}
void
ircd::ctx::yield(ctx &ctx)
{
@ -298,7 +310,7 @@ void
ircd::ctx::notify(ctx &ctx,
threadsafe_t)
{
strand(ctx, [&ctx]
signal(ctx, [&ctx]
{
notify(ctx);
});
@ -311,7 +323,7 @@ ircd::ctx::notify(ctx &ctx)
}
void
ircd::ctx::strand(ctx &ctx,
ircd::ctx::signal(ctx &ctx,
std::function<void ()> func)
{
ctx.strand.post(std::move(func));
@ -336,6 +348,12 @@ ircd::ctx::finished(const ctx &ctx)
return ctx.finished();
}
bool
ircd::ctx::interruption(const ctx &c)
{
return c.flags & context::INTERRUPTED;
}
const int64_t &
ircd::ctx::notes(const ctx &ctx)
{
@ -549,6 +567,12 @@ ircd::ctx::pool::add(const size_t &num)
ctxs.emplace_back(name, stack_size, context::POST, std::bind(&pool::main, this));
}
void
ircd::ctx::pool::join()
{
del(size());
}
void
ircd::ctx::pool::interrupt()
{
@ -804,7 +828,7 @@ ircd::ctx::ole::offload(const std::function<void ()> &func)
}
// To wake the context on the IRCd thread we give it the kick
strand(*context, kick);
signal(*context, kick);
});
push(std::move(closure)); do

View file

@ -89,7 +89,7 @@ ircd::socket::socket(const std::string &host,
{
assert(resolver);
const ip::tcp::resolver::query query(host, string(lex_cast(port)));
auto epit(resolver->async_resolve(query, yield(continuation())));
auto epit(resolver->async_resolve(query, yield_context{continuation{}}));
static const ip::tcp::resolver::iterator end;
if(epit == end)
throw nxdomain("host '%s' not found", host.data());
@ -143,8 +143,8 @@ ircd::socket::connect(const ip::tcp::endpoint &ep,
const milliseconds &timeout)
{
const scope_timeout ts(*this, timeout);
sd.async_connect(ep, yield(continuation()));
ssl.async_handshake(socket::handshake_type::client, yield(continuation()));
sd.async_connect(ep, yield_context{continuation{}});
ssl.async_handshake(socket::handshake_type::client, yield_context{continuation{}});
}
void
@ -179,7 +179,7 @@ ircd::socket::disconnect(const dc &type)
case dc::SSL_NOTIFY_YIELD:
{
ssl.async_shutdown(yield(continuation()));
ssl.async_shutdown(yield_context{continuation{}});
sd.close();
break;
}