0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 02:02:38 +01:00

ircd::net: Ensure ordering of various operations if immediately dispatched.

This commit is contained in:
Jason Volk 2018-01-11 18:50:34 -08:00
parent 4d310ac22b
commit 3d8d6f47b4

View file

@ -742,7 +742,7 @@ struct ircd::net::listener::acceptor
void handshake(const error_code &ec, std::shared_ptr<socket>, std::weak_ptr<acceptor>) noexcept;
// Acceptance stack
void check_accept_error(const error_code &ec, socket &);
bool check_accept_error(const error_code &ec, socket &);
void accept(const error_code &ec, std::shared_ptr<socket>, std::weak_ptr<acceptor>) noexcept;
// Accept next
@ -908,9 +908,9 @@ try
std::string(*this),
sock.get());
*/
++accepting;
ip::tcp::socket &sd(*sock);
a.async_accept(sd, std::bind(&acceptor::accept, this, ph::_1, sock, weak_from(*this)));
++accepting;
}
catch(const std::exception &e)
{
@ -935,24 +935,16 @@ noexcept try
return;
--accepting;
const unwind::nominal next{[this]
{
this->next();
}};
const unwind::exceptional drop{[&sock]
{
assert(bool(sock));
close(*sock, dc::RST, close_ignore);
}};
assert(bool(sock));
check_accept_error(ec, *sock);
log.debug("%s: socket(%p) accepted(%zu) %s",
log.debug("%s: socket(%p) accepted(%zu) %s %s",
std::string(*this),
sock.get(),
accepting,
string(sock->remote()));
string(remote_ipport(*sock)),
string(ec));
if(!check_accept_error(ec, *sock))
return;
// Toggles the behavior of non-async functions; see func comment
blocking(*sock, false);
@ -967,12 +959,13 @@ noexcept try
std::bind(&acceptor::handshake, this, ph::_1, sock, a)
};
sock->ssl.async_handshake(handshake_type, std::move(handshake));
++handshaking;
sock->set_timeout(5000ms); //TODO: config
sock->ssl.async_handshake(handshake_type, std::move(handshake));
}
catch(const ctx::interrupted &e)
{
log.debug("%s: acceptor interrupted socket(%p): %s",
log.debug("%s: acceptor interrupted socket(%p) %s",
std::string(*this),
sock.get(),
string(ec));
@ -981,17 +974,18 @@ catch(const ctx::interrupted &e)
}
catch(const std::exception &e)
{
log.error("%s: socket(%p): in accept(): %s",
log.error("%s: socket(%p) in accept(): %s",
std::string(*this),
sock.get(),
e.what());
throw;
}
/// Error handler for the accept socket callback. This handler determines
/// whether or not the handler should return or continue processing the
/// result.
///
void
bool
ircd::net::listener::acceptor::check_accept_error(const error_code &ec,
socket &sock)
{
@ -1002,12 +996,15 @@ ircd::net::listener::acceptor::check_accept_error(const error_code &ec,
throw ctx::interrupted();
if(likely(ec == success))
return;
{
this->next();
return true;
}
if(ec.category() == system_category()) switch(ec.value())
{
case operation_canceled:
return;
return false;
default:
break;
@ -1033,18 +1030,20 @@ noexcept try
}};
assert(bool(sock));
check_handshake_error(ec, *sock);
log.debug("%s socket(%p): SSL handshook(%zu) %s",
std::string(*this),
log.debug("socket(%p) local[%s] remote[%s] handshook(%zu) %s",
sock.get(),
string(local_ipport(*sock)),
string(remote_ipport(*sock)),
handshaking,
string(sock->remote()));
string(ec));
check_handshake_error(ec, *sock);
sock->cancel_timeout();
add_client(sock);
}
catch(const ctx::interrupted &e)
{
log.debug("%s: SSL handshake interrupted socket(%p): %s",
log.debug("%s: SSL handshake interrupted socket(%p) %s",
std::string(*this),
sock.get(),
string(ec));
@ -1079,7 +1078,7 @@ ircd::net::listener::acceptor::check_handshake_error(const error_code &ec,
if(ec.category() == system_category()) switch(ec.value())
{
case operation_canceled:
return;
break;
default:
break;
@ -1292,9 +1291,9 @@ ircd::net::socket::handshake(const open_opts &opts,
std::bind(&socket::handle_verify, this, ph::_1, ph::_2, opts)
};
set_timeout(opts.handshake_timeout);
ssl.set_verify_callback(std::move(verify_handler));
ssl.async_handshake(handshake_type::client, std::move(handshake_handler));
set_timeout(opts.handshake_timeout);
}
void
@ -1308,14 +1307,13 @@ try
return;
}
const bool cancelation{cancel()};
log.debug("socket(%p): disconnect: %s type:%d user: in:%zu out:%zu cancel:%d",
log.debug("socket(%p) local[%s] remote[%s] disconnect type:%d user: in:%zu out:%zu",
(const void *)this,
ircd::string(remote_ipport(*this)),
string(local_ipport(*this)),
string(remote_ipport(*this)),
uint(opts.type),
in.bytes,
out.bytes,
cancelation);
out.bytes);
if(opts.sopts)
set(*this, *opts.sopts);
@ -1345,8 +1343,9 @@ try
std::bind(&socket::handle_disconnect, this, shared_from(*this), std::move(callback), ph::_1)
};
ssl.async_shutdown(std::move(disconnect_handler));
cancel();
set_timeout(opts.timeout);
ssl.async_shutdown(std::move(disconnect_handler));
return;
}
}
@ -1407,34 +1406,23 @@ ircd::net::socket::wait(const wait_opts &opts,
switch(opts.type)
{
case ready::ERROR:
set_timeout(opts.timeout);
sd.async_wait(wait_type::wait_error, std::move(handle));
break;
case ready::WRITE:
set_timeout(opts.timeout);
sd.async_wait(wait_type::wait_write, std::move(handle));
break;
// The problem here is that the wait operation gives ec=success on both a
// socket error and when data is actually available. This should be giving
// the error in ec; asio should fix this. On linux, all epoll()'s
// unconditionally report errors and that error gets hidden by the ec=success;
// as a consequence net::ready::ERROR is also useless and never invoked
// except when it's canceled. Something here smells off.
case ready::READ:
{
static char buf[1] alignas(16);
static const asio::mutable_buffers_1 bufs{buf, sizeof(buf)};
sd.async_receive(bufs, sd.message_peek, std::move(handle));
//sd.async_wait(wait_type::wait_read, std::move(handle));
set_timeout(opts.timeout);
sd.async_wait(wait_type::wait_read, std::move(handle));
break;
}
default:
throw ircd::not_implemented{};
}
// Commit to timeout here in case exception was thrown earlier.
set_timeout(opts.timeout);
}
/// Asynchronous callback when the socket is ready
@ -1479,8 +1467,20 @@ noexcept try
// After life_guard is constructed it is safe to use *this in this frame.
const life_guard<socket> s{wp};
log.debug("socket(%p)[%s]: ready to %s: %s (available: %zu)",
// The problem here is that the wait operation gives ec=success on both a
// socket error and when data is actually available. This could be giving
// the error in ec.
if(ec == success && type == ready::READ)
{
static char buf[16] alignas(16);
static const std::array<mutable_buffer, 1> bufs{{buf}};
sd.receive(bufs, sd.message_peek, ec);
}
log.debug("socket(%p) local[%s] remote[%s] ready %s %s available:%zu",
this,
string(local_ipport(*this)),
string(remote_ipport(*this)),
reflect(type),
string(ec),