0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-15 14:31:11 +01:00

ircd::net: Various net/client/listener bugfixes/cleanup.

This commit is contained in:
Jason Volk 2017-11-01 15:51:24 -07:00
parent 20961bffa6
commit 2930b93dcb
8 changed files with 423 additions and 193 deletions

View file

@ -64,6 +64,7 @@ struct ircd::strand
/// ///
#include <ircd/ctx/continuation.h> #include <ircd/ctx/continuation.h>
#include <ircd/net/asio.h>
#include <ircd/net/socket.h> #include <ircd/net/socket.h>
inline ircd::strand::operator inline ircd::strand::operator

View file

@ -71,6 +71,8 @@ struct ircd::client
struct ircd::client::init struct ircd::client::init
{ {
void interrupt();
init(); init();
~init() noexcept; ~init() noexcept;
}; };

56
include/ircd/net/asio.h Normal file
View file

@ -0,0 +1,56 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#define HAVE_IRCD_NET_ASIO_H
// This file is not included with the IRCd standard include stack because
// it requires symbols we can't forward declare without boost headers. It
// is part of the <ircd/asio.h> stack which can be included in your
// definition file if you need low level access to this socket API. The
// <ircd/net/net.h> still offers higher level access to sockets without
// requiring boost headers; please check that for satisfaction before
// including this.
namespace ircd::net
{
using boost::system::error_code;
string_view string(const mutable_buffer &, const boost::system::error_code &);
string_view string(const mutable_buffer &, const boost::system::system_error &);
std::string string(const boost::system::error_code &);
std::string string(const boost::system::system_error &);
namespace ip = asio::ip;
uint16_t port(const ip::tcp::endpoint &);
ip::address addr(const ip::tcp::endpoint &);
std::string host(const ip::tcp::endpoint &);
std::string string(const ip::address &);
std::string string(const ip::tcp::endpoint &);
}
namespace ircd
{
using net::error_code;
using net::string;
using net::addr;
using net::host;
using net::port;
}

View file

@ -34,7 +34,7 @@ struct ircd::net::listener
IRCD_EXCEPTION(ircd::error, error) IRCD_EXCEPTION(ircd::error, error)
private: private:
std::unique_ptr<struct acceptor> acceptor; std::shared_ptr<struct acceptor> acceptor;
public: public:
listener(const json::object &options); listener(const json::object &options);

View file

@ -31,31 +31,15 @@
namespace ircd::net namespace ircd::net
{ {
namespace ip = asio::ip;
using boost::system::error_code;
using asio::steady_timer; using asio::steady_timer;
struct socket; struct socket;
extern asio::ssl::context sslv23_client; extern asio::ssl::context sslv23_client;
uint16_t port(const ip::tcp::endpoint &);
ip::address addr(const ip::tcp::endpoint &);
std::string host(const ip::tcp::endpoint &);
std::string string(const ip::address &);
std::string string(const ip::tcp::endpoint &);
std::shared_ptr<socket> connect(const ip::tcp::endpoint &remote, const milliseconds &timeout); std::shared_ptr<socket> connect(const ip::tcp::endpoint &remote, const milliseconds &timeout);
} }
namespace ircd
{
using net::error_code;
using net::string;
using net::addr;
using net::host;
using net::port;
}
struct ircd::net::socket struct ircd::net::socket
:std::enable_shared_from_this<ircd::net::socket> :std::enable_shared_from_this<ircd::net::socket>
{ {
@ -78,7 +62,7 @@ struct ircd::net::socket
asio::ssl::stream<ip::tcp::socket &> ssl; asio::ssl::stream<ip::tcp::socket &> ssl;
steady_timer timer; steady_timer timer;
stat in, out; stat in, out;
bool timedout; bool timedout {false};
void call_user(const handler &, const error_code &) noexcept; void call_user(const handler &, const error_code &) noexcept;
bool handle_error(const error_code &ec); bool handle_error(const error_code &ec);

View file

@ -207,11 +207,11 @@ ircd::client::client(std::shared_ptr<socket> sock)
ircd::client::~client() ircd::client::~client()
noexcept try noexcept try
{ {
disconnect(*this, net::dc::SSL_NOTIFY); //assert(!sock || !connected(*sock));
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log::error("~client(%p): %s", this, e.what()); log::critical("~client(%p): %s", this, e.what());
return; return;
} }
@ -259,12 +259,20 @@ catch(const boost::system::system_error &e)
if(ec.category() == get_system_category()) switch(ec.value()) if(ec.category() == get_system_category()) switch(ec.value())
{ {
case success: case success:
assert(0);
return true; return true;
case broken_pipe: case broken_pipe:
case connection_reset: case connection_reset:
case not_connected: case not_connected:
disconnect(*this, net::dc::RST);
return false;
case operation_canceled: case operation_canceled:
disconnect(*this, net::dc::SSL_NOTIFY);
return false;
case bad_file_descriptor:
return false; return false;
default: default:
@ -273,6 +281,7 @@ catch(const boost::system::system_error &e)
else if(ec.category() == get_misc_category()) switch(ec.value()) else if(ec.category() == get_misc_category()) switch(ec.value())
{ {
case boost::asio::error::eof: case boost::asio::error::eof:
disconnect(*this, net::dc::RST);
return false; return false;
default: default:
@ -281,19 +290,20 @@ catch(const boost::system::system_error &e)
else if(ec.category() == get_ssl_category()) switch(ec.value()) else if(ec.category() == get_ssl_category()) switch(ec.value())
{ {
case SSL_R_SHORT_READ: case SSL_R_SHORT_READ:
disconnect(*this, net::dc::RST);
return false; return false;
default: default:
break; break;
} }
log::critical("client(%p): (unexpected) system_error: %s", log::error("client(%p): (unexpected) %s: (%d) %s",
(const void *)this, (const void *)this,
e.what()); ec.category().name(),
int{ec.value()},
if(ircd::debugmode) ec.message());
throw;
disconnect(*this, net::dc::RST);
return false; return false;
} }
catch(const std::exception &e) catch(const std::exception &e)
@ -302,10 +312,11 @@ catch(const std::exception &e)
string(remote(*this)), string(remote(*this)),
e.what()); e.what());
if(ircd::debugmode) #ifdef RB_DEBUG
throw; throw;
#else
return false; return false;
#endif
} }
bool bool
@ -314,12 +325,15 @@ ircd::handle_request(client &client,
try try
{ {
client.request_timer = ircd::timer{}; client.request_timer = ircd::timer{};
client.sock->set_timeout(request_timeout, [client(shared_from(client))] const socket::scope_timeout timeout
(const net::error_code &ec)
{ {
if(!ec) *client.sock, request_timeout, [client(shared_from(client))]
disconnect(*client, net::dc::SSL_NOTIFY_YIELD); (const net::error_code &ec)
}); {
if(!ec)
disconnect(*client, net::dc::SSL_NOTIFY_YIELD);
}
};
bool ret{true}; bool ret{true};
http::request http::request
@ -327,7 +341,6 @@ try
pc, nullptr, write_closure(client), [&client, &pc, &ret] pc, nullptr, write_closure(client), [&client, &pc, &ret]
(const auto &head) (const auto &head)
{ {
client.sock->timer.cancel();
handle_request(client, pc, head); handle_request(client, pc, head);
ret = !iequals(head.connection, "close"s); ret = !iequals(head.connection, "close"s);
} }
@ -400,13 +413,18 @@ ircd::make_client(args&&... a)
void void
ircd::disconnect_all() ircd::disconnect_all()
{ {
for(auto &client : client::clients) try auto it(begin(client::clients));
while(it != end(client::clients))
{ {
disconnect(*client, net::dc::RST); auto *const client(*it);
} ++it; try
catch(const std::exception &e) {
{ disconnect(*client, net::dc::SSL_NOTIFY);
log::warning("Error disconnecting client @%p: %s", &client, e.what()); }
catch(const std::exception &e)
{
log::warning("Error disconnecting client @%p: %s", client, e.what());
}
} }
} }
@ -443,10 +461,13 @@ void
ircd::async_recv_next(std::shared_ptr<client> client, ircd::async_recv_next(std::shared_ptr<client> client,
const milliseconds &timeout) const milliseconds &timeout)
{ {
auto &sock(*client->sock); assert(bool(client));
assert(bool(client->sock));
// This call returns immediately so we no longer block the current context and // This call returns immediately so we no longer block the current context and
// its stack while waiting for activity on idle connections between requests. // its stack while waiting for activity on idle connections between requests.
auto &sock(*client->sock);
sock(timeout, [client(std::move(client)), timeout](const net::error_code &ec) sock(timeout, [client(std::move(client)), timeout](const net::error_code &ec)
noexcept noexcept
{ {
@ -459,14 +480,16 @@ ircd::async_recv_next(std::shared_ptr<client> client,
// of the ircd::context system. The context the closure ends up getting is the next // of the ircd::context system. The context the closure ends up getting is the next
// available from the request pool, which may not be available immediately so this // available from the request pool, which may not be available immediately so this
// handler might be queued for some time after this call returns. // handler might be queued for some time after this call returns.
request([ec, client, timeout] request([ec, client(std::move(client)), timeout]
{ {
// Right here this handler is executing on an ircd::context with its own // Right here this handler is executing on an ircd::context with its own
// stack dedicated to the lifetime of this request. If client::main() // stack dedicated to the lifetime of this request. If client::main()
// returns true, we bring the client back into async mode to wait for // returns true, we bring the client back into async mode to wait for
// the next request. // the next request.
if(client->main()) if(client->main())
async_recv_next(client, timeout); async_recv_next(std::move(client), timeout);
else
disconnect(*client, net::dc::SSL_NOTIFY_YIELD);
}); });
}); });
} }
@ -525,17 +548,17 @@ bool
ircd::handle_ec_short_read(client &client) ircd::handle_ec_short_read(client &client)
try try
{ {
log::debug("client[%s]: short_read", log::warning("client[%s]: short_read",
string(remote(client))); string(remote(client)));
disconnect(client, net::dc::RST); disconnect(client, net::dc::RST);
return false; return false;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log::warning("client(%p): short_read: %s", log::error("client(%p): short_read: %s",
&client, &client,
e.what()); e.what());
return false; return false;
} }
@ -552,9 +575,9 @@ try
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log::warning("client(%p): EOF: %s", log::error("client(%p): EOF: %s",
&client, &client,
e.what()); e.what());
return false; return false;
} }
@ -564,17 +587,17 @@ ircd::handle_ec_timeout(client &client)
try try
{ {
assert(bool(client.sock)); assert(bool(client.sock));
log::debug("client[%s]: disconnecting after inactivity timeout", log::warning("client[%s]: disconnecting after inactivity timeout",
string(remote(client))); string(remote(client)));
disconnect(client, net::dc::SSL_NOTIFY); disconnect(client, net::dc::SSL_NOTIFY);
return false; return false;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log::warning("client(%p): timeout: %s", log::error("client(%p): timeout: %s",
&client, &client,
e.what()); e.what());
return false; return false;
} }

View file

@ -204,6 +204,12 @@ try
js::init _js_; // SpiderMonkey js::init _js_; // SpiderMonkey
m::init _matrix_; // Matrix m::init _matrix_; // Matrix
// Any deinits which have to be done with all subsystems intact
const unwind shutdown{[&]
{
_client_.interrupt();
}};
// IRCd will now transition to the RUN state indicating full functionality. // IRCd will now transition to the RUN state indicating full functionality.
ircd::set_runlevel(runlevel::RUN); ircd::set_runlevel(runlevel::RUN);
@ -257,10 +263,14 @@ try
if(ircd::runlevel_changed) if(ircd::runlevel_changed)
ios->post([new_runlevel] ios->post([new_runlevel]
{ {
if(new_runlevel == runlevel::STOPPED)
log::notice("IRCd %s", reflect(new_runlevel));
ircd::runlevel_changed(new_runlevel); ircd::runlevel_changed(new_runlevel);
}); });
log::notice("IRCd %s", reflect(ircd::runlevel)); if(new_runlevel != runlevel::STOPPED)
log::notice("IRCd %s", reflect(new_runlevel));
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {

View file

@ -125,30 +125,8 @@ ircd::net::write(socket &socket,
// net/listener.h // net/listener.h
// //
//
// ircd::net::listener
//
ircd::net::listener::listener(const std::string &opts)
:listener{json::object{opts}}
{
}
ircd::net::listener::listener(const json::object &opts)
:acceptor{std::make_unique<struct acceptor>(opts)}
{
}
ircd::net::listener::~listener()
noexcept
{
}
//
// ircd::net::listener::acceptor
//
struct ircd::net::listener::acceptor struct ircd::net::listener::acceptor
:std::enable_shared_from_this<struct ircd::net::listener::acceptor>
{ {
using error_code = boost::system::error_code; using error_code = boost::system::error_code;
@ -159,25 +137,94 @@ struct ircd::net::listener::acceptor
asio::ssl::context ssl; asio::ssl::context ssl;
ip::tcp::endpoint ep; ip::tcp::endpoint ep;
ip::tcp::acceptor a; ip::tcp::acceptor a;
size_t accepting {0};
size_t handshaking {0};
bool interrupting {false};
ctx::dock joining;
explicit operator std::string() const; explicit operator std::string() const;
void configure(const json::object &opts); void configure(const json::object &opts);
// Handshake stack // Handshake stack
bool handshake_error(const error_code &ec); bool handshake_error(const error_code &ec, socket &);
void handshake(const error_code &ec, std::shared_ptr<socket>) noexcept; void handshake(const error_code &ec, std::shared_ptr<socket>, std::weak_ptr<acceptor>) noexcept;
// Acceptance stack // Acceptance stack
bool accept_error(const error_code &ec); bool accept_error(const error_code &ec, socket &);
void accept(const error_code &ec, std::shared_ptr<socket>) noexcept; void accept(const error_code &ec, std::shared_ptr<socket>, std::weak_ptr<acceptor>) noexcept;
// Accept next // Accept next
void next(); void next();
// Acceptor shutdown
bool interrupt() noexcept;
void join() noexcept;
acceptor(const json::object &opts); acceptor(const json::object &opts);
~acceptor() noexcept; ~acceptor() noexcept;
}; };
//
// ircd::net::listener
//
ircd::net::listener::listener(const std::string &opts)
:listener{json::object{opts}}
{
}
ircd::net::listener::listener(const json::object &opts)
:acceptor{std::make_shared<struct acceptor>(opts)}
{
// Starts the first asynchronous accept. This has to be done out here after
// the acceptor's shared object is constructed.
acceptor->next();
}
/// Cancels all pending accepts and handshakes and waits (yields ircd::ctx)
/// until report.
///
ircd::net::listener::~listener()
noexcept
{
if(acceptor)
acceptor->join();
}
void
ircd::net::listener::acceptor::join()
noexcept try
{
interrupt();
joining.wait([this]
{
return !accepting && !handshaking;
});
}
catch(const std::exception &e)
{
log.error("acceptor(%p): join: %s",
this,
e.what());
}
bool
ircd::net::listener::acceptor::interrupt()
noexcept try
{
a.cancel();
interrupting = true;
return true;
}
catch(const boost::system::system_error &e)
{
log.error("acceptor(%p): interrupt: %s",
this,
string(e));
return false;
}
// //
// ircd::net::listener::acceptor // ircd::net::listener::acceptor
// //
@ -243,8 +290,6 @@ try
std::string(*this), std::string(*this),
backlog, backlog,
max_connections); max_connections);
next();
} }
catch(const boost::system::system_error &e) catch(const boost::system::system_error &e)
{ {
@ -254,9 +299,13 @@ catch(const boost::system::system_error &e)
ircd::net::listener::acceptor::~acceptor() ircd::net::listener::acceptor::~acceptor()
noexcept noexcept
{ {
a.cancel();
} }
/// Sets the next asynchronous handler to start the next accept sequence.
/// Each call to next() sets one handler which handles the connect for one
/// socket. After the connect, an asynchronous SSL handshake handler is set
/// for the socket, and next() is called again to setup for the next socket
/// too.
void void
ircd::net::listener::acceptor::next() ircd::net::listener::acceptor::next()
try try
@ -266,9 +315,9 @@ try
std::string(*this), std::string(*this),
sock.get()); sock.get());
// The context blocks here until the next client is connected.
ip::tcp::socket &sd(*sock); ip::tcp::socket &sd(*sock);
a.async_accept(sd, std::bind(&acceptor::accept, this, ph::_1, sock)); a.async_accept(sd, std::bind(&acceptor::accept, this, ph::_1, sock, weak_from(*this)));
++accepting;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
@ -280,19 +329,37 @@ catch(const std::exception &e)
throw; throw;
} }
/// Callback for a socket connected. This handler then invokes the
/// asynchronous SSL handshake sequence.
///
void void
ircd::net::listener::acceptor::accept(const error_code &ec, ircd::net::listener::acceptor::accept(const error_code &ec,
const std::shared_ptr<socket> sock) const std::shared_ptr<socket> sock,
const std::weak_ptr<acceptor> a)
noexcept try noexcept try
{ {
if(accept_error(ec)) if(unlikely(a.expired()))
return; return;
const unwind next{[this] --accepting;
const unwind::nominal next{[this]
{ {
this->next(); this->next();
}}; }};
const unwind::exceptional drop{[&sock]
{
assert(bool(sock));
disconnect(*sock, dc::RST);
}};
assert(bool(sock));
if(unlikely(accept_error(ec, *sock)))
{
disconnect(*sock, dc::RST);
return;
}
log.debug("%s: socket(%p) accepted %s", log.debug("%s: socket(%p) accepted %s",
std::string(*this), std::string(*this),
sock.get(), sock.get(),
@ -315,24 +382,43 @@ noexcept try
auto handshake auto handshake
{ {
std::bind(&acceptor::handshake, this, ph::_1, sock) std::bind(&acceptor::handshake, this, ph::_1, sock, a)
}; };
sock->ssl.async_handshake(handshake_type, std::move(handshake)); sock->ssl.async_handshake(handshake_type, std::move(handshake));
++handshaking;
}
catch(const ctx::interrupted &e)
{
log.debug("%s: acceptor interrupted socket(%p): %s",
std::string(*this),
sock.get(),
string(ec));
joining.notify_all();
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log.error("%s: socket(%p) in accept(): %s", log.error("%s: socket(%p): in accept(): [%s]: %s",
std::string(*this), std::string(*this),
sock.get(), sock.get(),
sock->connected()? string(sock->remote()) : "<gone>",
e.what()); e.what());
} }
/// Error handler for the accept socket callback. This handler determines
/// whether or not the handler should return or continue processing the
/// result.
///
bool bool
ircd::net::listener::acceptor::accept_error(const error_code &ec) ircd::net::listener::acceptor::accept_error(const error_code &ec,
socket &sock)
{ {
using boost::system::get_system_category; using boost::system::get_system_category;
if(unlikely(interrupting))
throw ctx::interrupted();
if(ec.category() == get_system_category()) switch(ec.value()) if(ec.category() == get_system_category()) switch(ec.value())
{ {
using namespace boost::system::errc; using namespace boost::system::errc;
@ -341,7 +427,7 @@ ircd::net::listener::acceptor::accept_error(const error_code &ec)
return false; return false;
case operation_canceled: case operation_canceled:
return true; return false;
default: default:
break; break;
@ -352,33 +438,64 @@ ircd::net::listener::acceptor::accept_error(const error_code &ec)
void void
ircd::net::listener::acceptor::handshake(const error_code &ec, ircd::net::listener::acceptor::handshake(const error_code &ec,
const std::shared_ptr<socket> sock) const std::shared_ptr<socket> sock,
const std::weak_ptr<acceptor> a)
noexcept try noexcept try
{ {
if(handshake_error(ec)) if(unlikely(a.expired()))
return; return;
log.debug("%s socket(%p) SSL handshook %s", --handshaking;
assert(bool(sock));
const unwind::exceptional drop{[&sock]
{
disconnect(*sock, dc::RST);
}};
if(unlikely(handshake_error(ec, *sock)))
{
disconnect(*sock, dc::RST);
return;
}
log.debug("%s socket(%p): SSL handshook %s",
std::string(*this), std::string(*this),
sock.get(), sock.get(),
string(sock->remote())); string(sock->remote()));
add_client(sock); add_client(sock);
} }
catch(const ctx::interrupted &e)
{
log.debug("%s: SSL handshake interrupted socket(%p): %s",
std::string(*this),
sock.get(),
string(ec));
joining.notify_all();
}
catch(const std::exception &e) catch(const std::exception &e)
{ {
log.error("%s: socket(%p) in handshake(): [%s]: %s", log.error("%s: socket(%p): in handshake(): [%s]: %s",
std::string(*this), std::string(*this),
sock.get(), sock.get(),
sock->connected()? string(sock->remote()) : "<gone>", sock->connected()? string(sock->remote()) : "<gone>",
e.what()); e.what());
} }
/// Error handler for the SSL handshake callback. This handler determines
/// whether or not the handler should return or continue processing the
/// result.
///
bool bool
ircd::net::listener::acceptor::handshake_error(const error_code &ec) ircd::net::listener::acceptor::handshake_error(const error_code &ec,
socket &sock)
{ {
using boost::system::get_system_category; using boost::system::get_system_category;
if(unlikely(interrupting))
throw ctx::interrupted();
if(ec.category() == get_system_category()) switch(ec.value()) if(ec.category() == get_system_category()) switch(ec.value())
{ {
using namespace boost::system::errc; using namespace boost::system::errc;
@ -387,7 +504,7 @@ ircd::net::listener::acceptor::handshake_error(const error_code &ec)
return false; return false;
case operation_canceled: case operation_canceled:
return true; return false;
default: default:
break; break;
@ -483,17 +600,10 @@ ircd::net::listener::acceptor::configure(const json::object &opts)
ircd::net::listener::acceptor::operator std::string() ircd::net::listener::acceptor::operator std::string()
const const
{ {
std::string ret(256, char{}); return fmt::snstringf
const auto length
{ {
fmt::sprintf(mutable_buffer{ret}, "'%s' @ [%s]:%u", 256, "'%s' @ [%s]:%u", name, string(ep.address()), ep.port()
name,
string(ep.address()),
ep.port())
}; };
ret.resize(length);
return ret;
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -591,40 +701,6 @@ noexcept
return s.connected(); return s.connected();
} }
uint16_t
ircd::net::port(const ip::tcp::endpoint &ep)
{
return ep.port();
}
std::string
ircd::net::host(const ip::tcp::endpoint &ep)
{
return string(addr(ep));
}
std::string
ircd::net::string(const ip::address &addr)
{
return addr.to_string();
}
std::string
ircd::net::string(const ip::tcp::endpoint &ep)
{
std::string ret(256, char{});
const auto addr{string(net::addr(ep))};
const auto data{const_cast<char *>(ret.data())};
ret.resize(snprintf(data, ret.size(), "%s:%u", addr.c_str(), port(ep)));
return ret;
}
boost::asio::ip::address
ircd::net::addr(const ip::tcp::endpoint &ep)
{
return ep.address();
}
// //
// socket::io // socket::io
// //
@ -691,17 +767,9 @@ noexcept
} }
ircd::net::socket::scope_timeout::~scope_timeout() ircd::net::socket::scope_timeout::~scope_timeout()
noexcept try noexcept
{ {
if(s) cancel();
s->timer.cancel();
}
catch(const std::exception &e)
{
log.error("socket(%p) ~scope_timeout: %s",
(const void *)s,
e.what());
return;
} }
bool bool
@ -713,7 +781,7 @@ noexcept try
auto *const s{this->s}; auto *const s{this->s};
this->s = nullptr; this->s = nullptr;
s->timer.cancel(); s->cancel_timeout();
return true; return true;
} }
catch(const std::exception &e) catch(const std::exception &e)
@ -798,10 +866,6 @@ ircd::net::socket::socket(asio::ssl::context &ssl,
{ {
*ios *ios
} }
,timedout
{
false
}
{ {
} }
@ -820,7 +884,7 @@ noexcept try
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log.error("socket(%p): close: %s", this, e.what()); log.critical("socket(%p): close: %s", this, e.what());
return; return;
} }
@ -905,9 +969,9 @@ ircd::net::socket::connect(const ip::tcp::endpoint &ep,
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log.error("socket(%p): connect: unhandled exception from user callback: %s", log.critical("socket(%p): connect: unhandled exception from user callback: %s",
(const void *)this, (const void *)this,
e.what()); e.what());
} }
}}; }};
@ -934,8 +998,8 @@ ircd::net::socket::connect(const ip::tcp::endpoint &ep,
ssl.async_handshake(handshake, std::move(handshake_handler)); ssl.async_handshake(handshake, std::move(handshake_handler));
}}; }};
sd.async_connect(ep, std::move(connect_handler));
set_timeout(timeout); set_timeout(timeout);
sd.async_connect(ep, std::move(connect_handler));
} }
bool bool
@ -970,11 +1034,17 @@ try
sd.shutdown(ip::tcp::socket::shutdown_receive); sd.shutdown(ip::tcp::socket::shutdown_receive);
return true; return true;
case dc::SSL_NOTIFY_YIELD: case dc::SSL_NOTIFY_YIELD: if(likely(ctx::current))
{ {
const life_guard<socket> lg{*this}; const life_guard<socket> lg{*this};
const scope_timeout ts{*this, 8s}; const scope_timeout ts{*this, 8s};
ssl.async_shutdown(yield_context{to_asio{}}); ssl.async_shutdown(yield_context{to_asio{}});
error_code ec;
sd.close(ec);
if(ec)
log.error("socket(%p): close: %s: %s",
this,
string(ec));
return true; return true;
} }
@ -982,7 +1052,8 @@ try
{ {
set_timeout(8s); set_timeout(8s);
ssl.async_shutdown([s(shared_from_this())] ssl.async_shutdown([s(shared_from_this())]
(boost::system::error_code ec) noexcept (error_code ec)
noexcept
{ {
if(!s->timedout) if(!s->timedout)
s->cancel_timeout(); s->cancel_timeout();
@ -990,8 +1061,7 @@ try
if(ec) if(ec)
log.warning("socket(%p): SSL_NOTIFY: %s: %s", log.warning("socket(%p): SSL_NOTIFY: %s: %s",
s.get(), s.get(),
ec.category().name(), string(ec));
ec.message());
if(!s->sd.is_open()) if(!s->sd.is_open())
return; return;
@ -1001,8 +1071,7 @@ try
if(ec) if(ec)
log.warning("socket(%p): after SSL_NOTIFY: %s: %s", log.warning("socket(%p): after SSL_NOTIFY: %s: %s",
s.get(), s.get(),
ec.category().name(), string(ec));
ec.message());
}); });
return true; return true;
} }
@ -1025,8 +1094,7 @@ catch(const boost::system::system_error &e)
if(ec) if(ec)
log.warning("socket(%p): after disconnect: %s: %s", log.warning("socket(%p): after disconnect: %s: %s",
this, this,
ec.category().name(), string(ec));
ec.message());
throw; throw;
} }
@ -1063,12 +1131,6 @@ ircd::net::socket::operator()(handler h)
/// blocking any context and using any stack space whatsoever, i.e full /// blocking any context and using any stack space whatsoever, i.e full
/// asynchronous mode. /// asynchronous mode.
/// ///
/// boost::asio has no direct way to accomplish this because the buffer size
/// must be positive so we use a little trick to read a single byte with
/// MSG_PEEK as our indication. This is done directly on the socket and
/// not through the SSL cipher, but we don't want this byte anyway. This
/// isn't such a great trick.
///
void void
ircd::net::socket::operator()(const milliseconds &timeout, ircd::net::socket::operator()(const milliseconds &timeout,
handler callback) handler callback)
@ -1078,7 +1140,7 @@ ircd::net::socket::operator()(const milliseconds &timeout,
ip::tcp::socket::message_peek ip::tcp::socket::message_peek
}; };
static char buffer[1]; static char buffer[0];
static const asio::mutable_buffers_1 buffers static const asio::mutable_buffers_1 buffers
{ {
buffer, sizeof(buffer) buffer, sizeof(buffer)
@ -1089,6 +1151,7 @@ ircd::net::socket::operator()(const milliseconds &timeout,
std::bind(&socket::handle, this, weak_from(*this), std::move(callback), ph::_1, ph::_2) std::bind(&socket::handle, this, weak_from(*this), std::move(callback), ph::_1, ph::_2)
}; };
assert(connected());
set_timeout(timeout); set_timeout(timeout);
sd.async_receive(buffers, flags, std::move(handler)); sd.async_receive(buffers, flags, std::move(handler));
} }
@ -1101,13 +1164,17 @@ ircd::net::socket::handle(const std::weak_ptr<socket> wp,
noexcept try noexcept try
{ {
const life_guard<socket> s{wp}; const life_guard<socket> s{wp};
log.debug("socket(%p): %zu bytes: %s: %s",
this,
bytes,
string(ec));
// This handler and the timeout handler are responsible for canceling each other // This handler and the timeout handler are responsible for canceling each other
// when one or the other is entered. If the timeout handler has already fired for // when one or the other is entered. If the timeout handler has already fired for
// a timeout on the socket, `timedout` will be `true` and this handler will be // a timeout on the socket, `timedout` will be `true` and this handler will be
// entered with an `operation_canceled` error. // entered with an `operation_canceled` error.
if(!timedout) if(!timedout)
timer.cancel(); cancel_timeout();
else else
assert(ec == boost::system::errc::operation_canceled); assert(ec == boost::system::errc::operation_canceled);
@ -1115,10 +1182,9 @@ noexcept try
// user's callback. Otherwise they are passed up. // user's callback. Otherwise they are passed up.
if(!handle_error(ec)) if(!handle_error(ec))
{ {
log.warning("socket(%p): %s", log.error("socket(%p): %s",
this, this,
ec.category().name(), string(ec));
ec.message());
return; return;
} }
@ -1134,6 +1200,13 @@ catch(const std::bad_weak_ptr &e)
e.what()); e.what());
assert(0); assert(0);
} }
catch(const boost::system::system_error &e)
{
log.error("socket(%p): handle: %s %s",
this,
string(ec));
assert(0);
}
catch(const std::exception &e) catch(const std::exception &e)
{ {
log.error("socket(%p): handle: %s", log.error("socket(%p): handle: %s",
@ -1151,9 +1224,9 @@ noexcept try
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log.error("socket(%p): async handler: unhandled exception: %s", log.critical("socket(%p): async handler: unhandled exception: %s",
this, this,
e.what()); e.what());
} }
bool bool
@ -1165,10 +1238,9 @@ ircd::net::socket::handle_error(const error_code &ec)
using boost::asio::error::get_misc_category; using boost::asio::error::get_misc_category;
if(ec != success) if(ec != success)
log.error("socket(%p): handle error: %s: %s", log.warning("socket(%p): handle error: %s: %s",
this, this,
ec.category().name(), string(ec));
ec.message());
if(ec.category() == get_system_category()) switch(ec.value()) if(ec.category() == get_system_category()) switch(ec.value())
{ {
@ -1227,15 +1299,19 @@ noexcept try
case success: case success:
{ {
sd.cancel(); sd.cancel();
assert(timedout == false);
timedout = true; timedout = true;
break; break;
} }
// A cancelation means there was no timeout. // A cancelation means there was no timeout.
case operation_canceled: case operation_canceled:
{
assert(ec.category() == boost::system::get_system_category()); assert(ec.category() == boost::system::get_system_category());
assert(timedout == false);
timedout = false; timedout = false;
break; break;
}
// All other errors are unexpected, logged and ignored here. // All other errors are unexpected, logged and ignored here.
default: default:
@ -1278,6 +1354,7 @@ ircd::net::socket::cancel_timeout()
noexcept noexcept
{ {
boost::system::error_code ec; boost::system::error_code ec;
timedout = false;
timer.cancel(ec); timer.cancel(ec);
return ec; return ec;
} }
@ -1299,6 +1376,7 @@ const
void void
ircd::net::socket::set_timeout(const milliseconds &t) ircd::net::socket::set_timeout(const milliseconds &t)
{ {
cancel_timeout();
if(t < milliseconds(0)) if(t < milliseconds(0))
return; return;
@ -1310,6 +1388,7 @@ void
ircd::net::socket::set_timeout(const milliseconds &t, ircd::net::socket::set_timeout(const milliseconds &t,
handler h) handler h)
{ {
cancel_timeout();
if(t < milliseconds(0)) if(t < milliseconds(0))
return; return;
@ -1317,6 +1396,81 @@ ircd::net::socket::set_timeout(const milliseconds &t,
timer.async_wait(std::move(h)); timer.async_wait(std::move(h));
} }
///////////////////////////////////////////////////////////////////////////////
//
// net/asio.h
//
std::string
ircd::net::string(const ip::address &addr)
{
return addr.to_string();
}
std::string
ircd::net::string(const ip::tcp::endpoint &ep)
{
std::string ret(256, char{});
const auto addr{string(net::addr(ep))};
const auto data{const_cast<char *>(ret.data())};
ret.resize(snprintf(data, ret.size(), "%s:%u", addr.c_str(), port(ep)));
return ret;
}
std::string
ircd::net::host(const ip::tcp::endpoint &ep)
{
return string(addr(ep));
}
boost::asio::ip::address
ircd::net::addr(const ip::tcp::endpoint &ep)
{
return ep.address();
}
uint16_t
ircd::net::port(const ip::tcp::endpoint &ep)
{
return ep.port();
}
std::string
ircd::net::string(const boost::system::system_error &e)
{
return string(e.code());
}
std::string
ircd::net::string(const boost::system::error_code &ec)
{
std::string ret(128, char{});
ret.resize(string(mutable_buffer{ret}, ec).size());
return ret;
}
ircd::string_view
ircd::net::string(const mutable_buffer &buf,
const boost::system::system_error &e)
{
return string(buf, e.code());
}
ircd::string_view
ircd::net::string(const mutable_buffer &buf,
const boost::system::error_code &ec)
{
const auto len
{
fmt::sprintf
{
buf, "%s: %s", ec.category().name(), ec.message()
}
};
return { data(buf), size_t(len) };
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// //
// net/remote.h // net/remote.h