mirror of
https://github.com/matrix-construct/construct
synced 2024-12-30 17:34:04 +01:00
ircd::ctx: Use closure for continuation.
This commit is contained in:
parent
37ba013413
commit
29e7fa3515
5 changed files with 84 additions and 71 deletions
|
@ -21,6 +21,7 @@ namespace ircd::ctx
|
|||
struct continuation;
|
||||
|
||||
using yield_context = boost::asio::yield_context;
|
||||
using yield_closure = std::function<void (yield_context &)>;
|
||||
using interruptor = std::function<void (ctx *const &)>;
|
||||
using predicate = std::function<bool ()>;
|
||||
}
|
||||
|
@ -33,16 +34,6 @@ namespace ircd
|
|||
|
||||
/// This object must be placed on the stack when the context is yielding (INTERNAL)
|
||||
///
|
||||
/// The continuation constructor is the last thing executed before a context
|
||||
/// yields. The continuation destructor is the first thing executed when a
|
||||
/// context continues. This is not placed by a normal user wishing to context
|
||||
/// switch, only a low-level library creator actually implementing the context
|
||||
/// switch itself. The placement of this object must be correct. Generally,
|
||||
/// we construct the `continuation` as an argument to `yield_context` as such
|
||||
/// `yield_context{continuation{}}`. This ensures the continuation destructor
|
||||
/// executes before control continues beyond the yield_context call itself and
|
||||
/// ties this sequence together neatly.
|
||||
///
|
||||
/// The instance contains references to some callables which must remain valid.
|
||||
///
|
||||
/// - predicate (NOT YET USED)
|
||||
|
@ -72,11 +63,12 @@ struct ircd::ctx::continuation
|
|||
operator boost::asio::yield_context &() noexcept;
|
||||
|
||||
continuation(const predicate &,
|
||||
const interruptor & = noop_interruptor) noexcept;
|
||||
const interruptor &,
|
||||
const yield_closure &);
|
||||
|
||||
continuation(continuation &&) = delete;
|
||||
continuation(const continuation &) = delete;
|
||||
continuation &operator=(continuation &&) = delete;
|
||||
continuation &operator=(const continuation &) = delete;
|
||||
~continuation() noexcept(false);
|
||||
~continuation() noexcept;
|
||||
};
|
||||
|
|
|
@ -139,12 +139,13 @@ try
|
|||
this->cancel();
|
||||
}};
|
||||
|
||||
const size_t ret
|
||||
size_t ret; continuation
|
||||
{
|
||||
asio::async_read(ssl, std::forward<iov>(bufs), completion, yield_context{continuation
|
||||
continuation::asio_predicate, interruption, [this, &ret, &bufs]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}})
|
||||
ret = asio::async_read(ssl, std::forward<iov>(bufs), completion, yield);
|
||||
}
|
||||
};
|
||||
|
||||
if(!ret)
|
||||
|
@ -174,12 +175,13 @@ try
|
|||
this->cancel();
|
||||
}};
|
||||
|
||||
const size_t ret
|
||||
size_t ret; continuation
|
||||
{
|
||||
ssl.async_read_some(std::forward<iov>(bufs), yield_context{continuation
|
||||
continuation::asio_predicate, interruption, [this, &ret, &bufs]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}})
|
||||
ret = ssl.async_read_some(std::forward<iov>(bufs), yield);
|
||||
}
|
||||
};
|
||||
|
||||
if(!ret)
|
||||
|
@ -261,12 +263,13 @@ try
|
|||
this->cancel();
|
||||
}};
|
||||
|
||||
const size_t ret
|
||||
size_t ret; continuation
|
||||
{
|
||||
asio::async_write(ssl, std::forward<iov>(bufs), completion, yield_context{continuation
|
||||
continuation::asio_predicate, interruption, [this, &ret, &bufs]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}})
|
||||
ret = asio::async_write(ssl, std::forward<iov>(bufs), completion, yield);
|
||||
}
|
||||
};
|
||||
|
||||
out.bytes += ret;
|
||||
|
@ -290,12 +293,13 @@ try
|
|||
this->cancel();
|
||||
}};
|
||||
|
||||
const size_t ret
|
||||
size_t ret; continuation
|
||||
{
|
||||
ssl.async_write_some(std::forward<iov>(bufs), yield_context{continuation
|
||||
continuation::asio_predicate, interruption, [this, &ret, &bufs]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}})
|
||||
ret = ssl.async_write_some(std::forward<iov>(bufs), yield);
|
||||
}
|
||||
};
|
||||
|
||||
out.bytes += ret;
|
||||
|
|
46
ircd/ctx.cc
46
ircd/ctx.cc
|
@ -145,15 +145,15 @@ ircd::ctx::ctx::jump()
|
|||
|
||||
// Jump from the currently running context (source) to *this (target)
|
||||
// with continuation of source after target
|
||||
current->notes = 0; // Unconditionally cleared here
|
||||
continuation
|
||||
{
|
||||
current->notes = 0; // Unconditionally cleared here
|
||||
const continuation continuation
|
||||
continuation::false_predicate, continuation::noop_interruptor, [&target]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::false_predicate
|
||||
};
|
||||
|
||||
target();
|
||||
}
|
||||
target();
|
||||
}
|
||||
};
|
||||
|
||||
assert(current != this);
|
||||
assert(current->notes == 1); // notes = 1; set by continuation dtor on wakeup
|
||||
|
@ -196,15 +196,14 @@ ircd::ctx::ctx::wait()
|
|||
// The construction of the arguments to the call on this stack comprise
|
||||
// our final control before the context switch. The destruction of the
|
||||
// arguments comprise the initial control after the context switch.
|
||||
boost::system::error_code ec;
|
||||
alarm.async_wait(yield_context
|
||||
boost::system::error_code ec; continuation
|
||||
{
|
||||
continuation
|
||||
predicate, interruptor, [this, &ec]
|
||||
(auto &yield)
|
||||
{
|
||||
predicate, interruptor
|
||||
alarm.async_wait(yield[ec]);
|
||||
}
|
||||
}
|
||||
[ec]);
|
||||
};
|
||||
|
||||
assert(ec == errc::operation_canceled || ec == errc::success);
|
||||
assert(current == this);
|
||||
|
@ -937,8 +936,9 @@ ircd::ctx::continuation::noop_interruptor{[]
|
|||
//
|
||||
|
||||
ircd::ctx::continuation::continuation(const predicate &pred,
|
||||
const interruptor &intr)
|
||||
noexcept
|
||||
const interruptor &intr,
|
||||
const yield_closure &closure)
|
||||
try
|
||||
:self
|
||||
{
|
||||
ircd::ctx::current
|
||||
|
@ -987,10 +987,20 @@ noexcept
|
|||
// restore this register; otherwise it remains null for executions on
|
||||
// the program's main stack.
|
||||
ircd::ctx::current = nullptr;
|
||||
|
||||
assert(self->yc);
|
||||
closure(*self->yc);
|
||||
|
||||
// Check here if the context was interrupted while it was sleeping.
|
||||
self->interruption_point();
|
||||
}
|
||||
catch(...)
|
||||
{
|
||||
this->~continuation();
|
||||
}
|
||||
|
||||
ircd::ctx::continuation::~continuation()
|
||||
noexcept(false)
|
||||
noexcept
|
||||
{
|
||||
// Set the fundamental current context register as the first operation
|
||||
// upon resuming execution.
|
||||
|
@ -1005,10 +1015,6 @@ noexcept(false)
|
|||
|
||||
// self->continuation is not null'ed here; it remains an invalid
|
||||
// pointer while the context is awake.
|
||||
|
||||
// Check here if the context was interrupted while it was sleeping. This
|
||||
// will throw out of this destructor if that is the case. Cuidado.
|
||||
self->interruption_point();
|
||||
}
|
||||
|
||||
ircd::ctx::continuation::operator
|
||||
|
|
|
@ -414,12 +414,13 @@ try
|
|||
fd.cancel();
|
||||
}};
|
||||
|
||||
const size_t len
|
||||
size_t len; continuation
|
||||
{
|
||||
boost::asio::async_read_until(fd, sb, '\n', yield_context{continuation
|
||||
continuation::asio_predicate, interruption, [&len, &fd, &sb]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}})
|
||||
len = boost::asio::async_read_until(fd, sb, '\n', yield);
|
||||
}
|
||||
};
|
||||
|
||||
std::istream is{&sb};
|
||||
|
|
48
ircd/net.cc
48
ircd/net.cc
|
@ -1754,12 +1754,13 @@ ircd::net::listener_udp::acceptor::operator()(datagram &datagram)
|
|||
}};
|
||||
|
||||
ip::udp::endpoint ep;
|
||||
const size_t rlen
|
||||
size_t rlen; continuation
|
||||
{
|
||||
a.async_receive_from(datagram.mbufs, ep, flags, yield_context{continuation
|
||||
continuation::asio_predicate, interruption, [this, &rlen, &datagram, &ep, &flags]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}})
|
||||
rlen = a.async_receive_from(datagram.mbufs, ep, flags, yield);
|
||||
}
|
||||
};
|
||||
|
||||
datagram.remote = make_ipport(ep);
|
||||
|
@ -2146,26 +2147,35 @@ try
|
|||
|
||||
switch(opts.type)
|
||||
{
|
||||
case ready::ERROR:
|
||||
sd.async_wait(wait_type::wait_error, yield_context{continuation
|
||||
case ready::READ: continuation
|
||||
{
|
||||
continuation::asio_predicate, interruption, [this]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}});
|
||||
break;
|
||||
sd.async_wait(wait_type::wait_read, yield);
|
||||
}
|
||||
};
|
||||
break;
|
||||
|
||||
case ready::WRITE:
|
||||
sd.async_wait(wait_type::wait_write, yield_context{continuation
|
||||
case ready::WRITE: continuation
|
||||
{
|
||||
continuation::asio_predicate, interruption, [this]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}});
|
||||
break;
|
||||
sd.async_wait(wait_type::wait_write, yield);
|
||||
}
|
||||
};
|
||||
break;
|
||||
|
||||
case ready::READ:
|
||||
sd.async_wait(wait_type::wait_read, yield_context{continuation
|
||||
case ready::ERROR: continuation
|
||||
{
|
||||
continuation::asio_predicate, interruption, [this]
|
||||
(auto &yield)
|
||||
{
|
||||
continuation::asio_predicate, interruption
|
||||
}});
|
||||
break;
|
||||
sd.async_wait(wait_type::wait_error, yield);
|
||||
}
|
||||
};
|
||||
break;
|
||||
|
||||
default:
|
||||
throw ircd::not_implemented{};
|
||||
|
|
Loading…
Reference in a new issue