mirror of
https://github.com/matrix-construct/construct
synced 2025-01-13 16:33:53 +01:00
ircd: Add descriptor participation for all asynchronous operations.
This commit is contained in:
parent
d5397c599f
commit
d2546120ee
7 changed files with 125 additions and 22 deletions
26
ircd/ctx.cc
26
ircd/ctx.cc
|
@ -621,6 +621,11 @@ ircd::ctx::this_ctx::wait()
|
|||
void
|
||||
ircd::ctx::this_ctx::yield()
|
||||
{
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::ctx courtesy yield"
|
||||
};
|
||||
|
||||
bool done(false);
|
||||
const auto restore([&done, &me(cur())]
|
||||
{
|
||||
|
@ -629,7 +634,7 @@ ircd::ctx::this_ctx::yield()
|
|||
});
|
||||
|
||||
// All spurious notifications are ignored until `done`
|
||||
ircd::post(restore); do
|
||||
ircd::post(descriptor, restore); do
|
||||
{
|
||||
wait();
|
||||
}
|
||||
|
@ -1142,7 +1147,12 @@ ircd::ctx::context::context(const string_view &name,
|
|||
|
||||
if(flags & POST)
|
||||
{
|
||||
ios::post(std::move(spawn));
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::ctx::spawn post"
|
||||
};
|
||||
|
||||
ios::post(descriptor, std::move(spawn));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1154,9 +1164,15 @@ ircd::ctx::context::context(const string_view &name,
|
|||
});
|
||||
|
||||
if(flags & DISPATCH)
|
||||
ios::dispatch(std::move(spawn));
|
||||
else
|
||||
spawn();
|
||||
{
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::ctx::spawn dispatch"
|
||||
};
|
||||
|
||||
ios::dispatch(descriptor, std::move(spawn));
|
||||
}
|
||||
else spawn();
|
||||
}
|
||||
|
||||
ircd::ctx::context::context(const string_view &name,
|
||||
|
|
|
@ -616,7 +616,15 @@ ircd::fs::aio::system::submit(request &request)
|
|||
// items the chaser was already posted after the first item and will
|
||||
// flush the whole queue down to 0.
|
||||
if(qcount == 1)
|
||||
ircd::post(std::bind(&system::chase, this));
|
||||
{
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::fs::aio chase"
|
||||
};
|
||||
|
||||
auto handler(std::bind(&system::chase, this));
|
||||
ircd::post(descriptor, std::move(handler));
|
||||
}
|
||||
}
|
||||
|
||||
/// The chaser is posted to the IRCd event loop after the first request is
|
||||
|
@ -773,7 +781,12 @@ try
|
|||
std::bind(&system::handle, this, ph::_1, ph::_2)
|
||||
};
|
||||
|
||||
resfd.async_read_some(bufs, std::move(handler));
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::fs::aio sigfd"
|
||||
};
|
||||
|
||||
resfd.async_read_some(bufs, ios::handle(descriptor, std::move(handler)));
|
||||
}
|
||||
catch(...)
|
||||
{
|
||||
|
|
14
ircd/ios.cc
14
ircd/ios.cc
|
@ -62,13 +62,23 @@ ircd::ios::init(asio::io_context &user)
|
|||
void
|
||||
ircd::ios::post(std::function<void ()> function)
|
||||
{
|
||||
boost::asio::post(get(), std::move(function));
|
||||
static descriptor descriptor
|
||||
{
|
||||
"ircd::ios post"
|
||||
};
|
||||
|
||||
post(descriptor, std::move(function));
|
||||
}
|
||||
|
||||
void
|
||||
ircd::ios::dispatch(std::function<void ()> function)
|
||||
{
|
||||
boost::asio::dispatch(get(), std::move(function));
|
||||
static descriptor descriptor
|
||||
{
|
||||
"ircd::ios dispatch"
|
||||
};
|
||||
|
||||
dispatch(descriptor, std::move(function));
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -373,8 +373,13 @@ try
|
|||
latch.count_down();
|
||||
}};
|
||||
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::run::set"
|
||||
};
|
||||
|
||||
if(changed::list.size())
|
||||
ircd::post(call_users);
|
||||
ircd::post(descriptor, call_users);
|
||||
else
|
||||
call_users();
|
||||
|
||||
|
|
|
@ -391,6 +391,11 @@ ircd::log::vlog_threadsafe(const log &log,
|
|||
const string_view &fmt,
|
||||
const va_rtti &ap)
|
||||
{
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::log::vlog_threadsafe"
|
||||
};
|
||||
|
||||
// Generate the formatted message on this thread first
|
||||
std::string str
|
||||
{
|
||||
|
@ -399,7 +404,7 @@ ircd::log::vlog_threadsafe(const log &log,
|
|||
|
||||
// The pointer to the logger is copied to the main thread.
|
||||
auto *const logp{&log};
|
||||
ircd::post([lev, str(std::move(str)), logp]
|
||||
ircd::post(descriptor, [lev, str(std::move(str)), logp]
|
||||
{
|
||||
// If that named logger was destroyed while this closure was
|
||||
// travelling to the main thread then we just discard this message.
|
||||
|
|
69
ircd/net.cc
69
ircd/net.cc
|
@ -1556,6 +1556,11 @@ bool
|
|||
ircd::net::acceptor::set_handle()
|
||||
try
|
||||
{
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::socket timer"
|
||||
};
|
||||
|
||||
assert(!handle_set);
|
||||
handle_set = true;
|
||||
auto sock
|
||||
|
@ -1565,7 +1570,12 @@ try
|
|||
|
||||
++accepting;
|
||||
ip::tcp::socket &sd(*sock);
|
||||
a.async_accept(sd, std::bind(&acceptor::accept, this, ph::_1, sock, weak_from(*this)));
|
||||
auto handler
|
||||
{
|
||||
std::bind(&acceptor::accept, this, ph::_1, sock, weak_from(*this))
|
||||
};
|
||||
|
||||
a.async_accept(sd, ios::handle(desc, std::move(handler)));
|
||||
return true;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
|
@ -1622,6 +1632,11 @@ noexcept try
|
|||
socket::handshake_type::server
|
||||
};
|
||||
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::acceptor async_handshake"
|
||||
};
|
||||
|
||||
auto handshake
|
||||
{
|
||||
std::bind(&acceptor::handshake, this, ph::_1, sock, a)
|
||||
|
@ -1629,7 +1644,7 @@ noexcept try
|
|||
|
||||
++handshaking;
|
||||
sock->set_timeout(milliseconds(timeout));
|
||||
sock->ssl.async_handshake(handshake_type, std::move(handshake));
|
||||
sock->ssl.async_handshake(handshake_type, ios::handle(desc, std::move(handshake)));
|
||||
}
|
||||
catch(const ctx::interrupted &e)
|
||||
{
|
||||
|
@ -2460,13 +2475,18 @@ ircd::net::socket::connect(const endpoint &ep,
|
|||
opts.connect_timeout.count()
|
||||
};
|
||||
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::socket connect"
|
||||
};
|
||||
|
||||
auto connect_handler
|
||||
{
|
||||
std::bind(&socket::handle_connect, this, weak_from(*this), opts, std::move(callback), ph::_1)
|
||||
};
|
||||
|
||||
set_timeout(opts.connect_timeout);
|
||||
sd.async_connect(ep, std::move(connect_handler));
|
||||
sd.async_connect(ep, ios::handle(desc, std::move(connect_handler)));
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -2484,6 +2504,11 @@ ircd::net::socket::handshake(const open_opts &opts,
|
|||
opts.handshake_timeout.count()
|
||||
};
|
||||
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::socket handshake"
|
||||
};
|
||||
|
||||
auto handshake_handler
|
||||
{
|
||||
std::bind(&socket::handle_handshake, this, weak_from(*this), std::move(callback), ph::_1)
|
||||
|
@ -2500,7 +2525,7 @@ ircd::net::socket::handshake(const open_opts &opts,
|
|||
openssl::server_name(*this, server_name(opts));
|
||||
|
||||
ssl.set_verify_callback(std::move(verify_handler));
|
||||
ssl.async_handshake(handshake_type::client, std::move(handshake_handler));
|
||||
ssl.async_handshake(handshake_type::client, ios::handle(desc, std::move(handshake_handler)));
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -2550,13 +2575,18 @@ try
|
|||
|
||||
case dc::SSL_NOTIFY:
|
||||
{
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::socket shutdown"
|
||||
};
|
||||
|
||||
auto disconnect_handler
|
||||
{
|
||||
std::bind(&socket::handle_disconnect, this, shared_from(*this), std::move(callback), ph::_1)
|
||||
};
|
||||
|
||||
set_timeout(opts.timeout);
|
||||
ssl.async_shutdown(std::move(disconnect_handler));
|
||||
ssl.async_shutdown(ios::handle(desc, std::move(disconnect_handler)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -2702,23 +2732,33 @@ try
|
|||
{
|
||||
case ready::ERROR:
|
||||
{
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::socket::wait ready::ERROR"
|
||||
};
|
||||
|
||||
auto handle
|
||||
{
|
||||
std::bind(&socket::handle_ready, this, weak_from(*this), opts.type, std::move(callback), ph::_1, 0UL)
|
||||
};
|
||||
|
||||
sd.async_wait(wait_type::wait_error, std::move(handle));
|
||||
sd.async_wait(wait_type::wait_error, ios::handle(desc, std::move(handle)));
|
||||
break;
|
||||
}
|
||||
|
||||
case ready::WRITE:
|
||||
{
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::socket::wait ready::WRITE"
|
||||
};
|
||||
|
||||
auto handle
|
||||
{
|
||||
std::bind(&socket::handle_ready, this, weak_from(*this), opts.type, std::move(callback), ph::_1, 0UL)
|
||||
};
|
||||
|
||||
sd.async_wait(wait_type::wait_write, std::move(handle));
|
||||
sd.async_wait(wait_type::wait_write, ios::handle(desc, std::move(handle)));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2727,6 +2767,10 @@ try
|
|||
static char buf[1] alignas(16);
|
||||
static const ilist<mutable_buffer> bufs{buf};
|
||||
__builtin_prefetch(buf, 1, 0); // 1 = write, 0 = no cache
|
||||
static ios::descriptor desc
|
||||
{
|
||||
"ircd::net::socket::wait ready::READ"
|
||||
};
|
||||
|
||||
auto handle
|
||||
{
|
||||
|
@ -2741,7 +2785,7 @@ try
|
|||
// real socket wait.
|
||||
if(SSL_peek(ssl.native_handle(), buf, sizeof(buf)) >= ssize_t(sizeof(buf)))
|
||||
{
|
||||
ircd::post([handle(std::move(handle))]
|
||||
ircd::post(desc, [handle(std::move(handle))]
|
||||
{
|
||||
handle(error_code{}, 1UL);
|
||||
});
|
||||
|
@ -2753,7 +2797,7 @@ try
|
|||
// socket error and when data is actually available. We then have to check
|
||||
// using a non-blocking peek in the handler. By doing it this way here we
|
||||
// just get the error in the handler's ec.
|
||||
sd.async_receive(bufs, sd.message_peek, std::move(handle));
|
||||
sd.async_receive(bufs, sd.message_peek, ios::handle(desc, std::move(handle)));
|
||||
//sd.async_wait(wait_type::wait_read, std::move(handle));
|
||||
break;
|
||||
}
|
||||
|
@ -3399,6 +3443,11 @@ ircd::net::socket::set_timeout(const milliseconds &t,
|
|||
if(t < milliseconds(0))
|
||||
return;
|
||||
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"ircd::net::socket timer"
|
||||
};
|
||||
|
||||
auto handler
|
||||
{
|
||||
std::bind(&socket::handle_timeout, this, weak_from(*this), std::move(callback), ph::_1)
|
||||
|
@ -3413,7 +3462,7 @@ ircd::net::socket::set_timeout(const milliseconds &t,
|
|||
++timer_sem[1];
|
||||
timer_set = true;
|
||||
timer.expires_from_now(t);
|
||||
timer.async_wait(std::move(handler));
|
||||
timer.async_wait(ios::handle(descriptor, std::move(handler)));
|
||||
}
|
||||
|
||||
boost::asio::ip::tcp::endpoint
|
||||
|
|
|
@ -32,7 +32,12 @@ static void
|
|||
_cmd__die(const m::event &event,
|
||||
const string_view &line)
|
||||
{
|
||||
ircd::post([]
|
||||
static ios::descriptor descriptor
|
||||
{
|
||||
"s_control die"
|
||||
};
|
||||
|
||||
ircd::post(descriptor, []
|
||||
{
|
||||
ircd::quit();
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue