From 207ac4ef53d00fbcdf76a325700511591e298fe1 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 23 Aug 2017 14:47:15 -0600 Subject: [PATCH] ircd: Various improvements to client and socket related. --- include/ircd/client.h | 1 - include/ircd/socket.h | 8 +- ircd/client.cc | 78 ++++++++++++-- ircd/socket.cc | 232 ++++++++++++++++++++++++++++-------------- 4 files changed, 234 insertions(+), 85 deletions(-) diff --git a/include/ircd/client.h b/include/ircd/client.h index a28bb09c0..277a96f25 100644 --- a/include/ircd/client.h +++ b/include/ircd/client.h @@ -45,7 +45,6 @@ struct client static list clients; - string_view type; unique_const_iterator clit; std::shared_ptr sock; diff --git a/include/ircd/socket.h b/include/ircd/socket.h index f9ab33eba..a2ebbc7af 100644 --- a/include/ircd/socket.h +++ b/include/ircd/socket.h @@ -82,8 +82,10 @@ struct socket stat in, out; bool timedout; - void handle_timeout(const std::weak_ptr wp, const error_code &ec) noexcept; - bool handle_ready(const error_code &ec) noexcept; + void call_user(const handler &, const error_code &) noexcept; + bool handle_error(const error_code &ec); + void handle_timeout(std::weak_ptr wp, const error_code &ec); + void handle(std::weak_ptr, handler, const error_code &, const size_t &) noexcept; public: operator const ip::tcp::socket &() const { return sd; } @@ -130,6 +132,8 @@ struct socket socket(asio::ssl::context &ssl = sslv23_client, boost::asio::io_service *const &ios = ircd::ios); + socket(socket &&) = delete; + socket(const socket &) = delete; ~socket() noexcept; }; diff --git a/ircd/client.cc b/ircd/client.cc index d156fa15a..486069dd0 100644 --- a/ircd/client.cc +++ b/ircd/client.cc @@ -85,6 +85,7 @@ ircd::client::init::~init() noexcept { request.interrupt(); + ctx::yield(); disconnect_all(); socket_init.reset(nullptr); } @@ -232,8 +233,7 @@ ircd::client::client(const host_port &host_port, } ircd::client::client(std::shared_ptr sock) -:type{type} -,clit{clients, clients.emplace(end(clients), this)} +:clit{clients, clients.emplace(end(clients), this)} ,sock{std::move(sock)} { } @@ -252,6 +252,8 @@ noexcept try catch(const boost::system::system_error &e) { using boost::asio::error::eof; + using boost::asio::error::broken_pipe; + using boost::asio::error::connection_reset; using namespace boost::system::errc; switch(e.code().value()) @@ -261,6 +263,8 @@ catch(const boost::system::system_error &e) return true; case eof: + case broken_pipe: + case connection_reset: case not_connected: case operation_canceled: return false; @@ -378,7 +382,11 @@ ircd::handle_request(client &client, std::shared_ptr ircd::add_client(std::shared_ptr s) { - const auto client(std::make_shared(std::move(s))); + const auto client + { + make_client(std::move(s)) + }; + log::debug("client[%s] CONNECTED local[%s]", string(remote_addr(*client)), string(local_addr(*client))); @@ -421,20 +429,52 @@ ircd::async_recv_next(std::shared_ptr client) async_recv_next(std::move(client), milliseconds(-1)); } +// +// This function is the basis for the client's request loop. We still use +// an asynchronous pattern until there is activity on the socket (a request) +// in which case we switch to synchronous mode by jumping into an ircd::context +// drawn from the request pool. When the request is finished, we exit back +// into asynchronous mode until the next request is received and rinse and repeat. +// +// This sequence exists to avoid any possible c10k-style limitation imposed by +// dedicating a context and its stack space to the lifetime of a connection. +// This is similar to the thread-per-request pattern before async was in vogue. +// Except now with userspace threads, a context switch has a cost on the order +// of a function call, not nearly that of a system thread. So after enduring +// several years of non-blocking stackless callback asynchronous web-scale hell, +// we have now made it out alive on the other side. Enjoy. +// +// Pay close attention to the comments to know exactly where you are and what +// you can do at any given point in this sequence. +// void ircd::async_recv_next(std::shared_ptr client, const milliseconds &timeout) { auto &sock(*client->sock); - sock(timeout, [client, timeout] - (const error_code &ec) + + // This call returns immediately so we no longer block the current context and + // its stack while waiting for activity on idle connections between requests. + sock(timeout, [client(std::move(client)), timeout](const error_code &ec) noexcept { + // Right here this handler is executing on the main stack (not in any + // ircd::context). We handle any socket errors now, and if this function + // returns here the client's shared_ptr may expire and that will be the + // end of this client, socket, and connection... if(!handle_ec(*client, ec)) return; - request([client, timeout] + // This call returns immediately because we can never block the main stack outside + // of the ircd::context system. The context the closure ends up getting is the next + // available from the request pool, which may not be available immediately so this + // handler might be queued for some time after this call returns. + request([client(std::move(client)), timeout] { + // Right here this handler is executing on an ircd::context with its own + // stack dedicated to the lifetime of this request. If client::main() + // returns true, we bring the client back into async mode to wait for + // the next request. if(client->main()) async_recv_next(client, timeout); }); @@ -465,20 +505,42 @@ ircd::handle_ec_success(client &client) bool ircd::handle_ec_eof(client &client) +try { - log::debug("client[%s]: EOF", string(remote_addr(client))); + log::debug("client[%s]: EOF", + string(remote_addr(client))); + client.sock->disconnect(socket::FIN_RECV); return false; } +catch(const std::exception &e) +{ + log::warning("client(%p): EOF: %s", + &client, + e.what()); + + return false; +} bool ircd::handle_ec_timeout(client &client) +try { auto &sock(*client.sock); - log::debug("client[%s]: disconnecting after inactivity timeout", string(remote_addr(client))); + log::debug("client[%s]: disconnecting after inactivity timeout", + string(remote_addr(client))); + sock.disconnect(); return false; } +catch(const std::exception &e) +{ + log::warning("client(%p): timeout: %s", + &client, + e.what()); + + return false; +} std::string ircd::string(const client::host_port &pair) diff --git a/ircd/socket.cc b/ircd/socket.cc index 2e09738ae..bea58fb40 100644 --- a/ircd/socket.cc +++ b/ircd/socket.cc @@ -141,10 +141,22 @@ ircd::socket::socket(const ip::tcp::endpoint &remote, ircd::socket::socket(asio::ssl::context &ssl, boost::asio::io_service *const &ios) -:ssl{*ios, ssl} -,sd{this->ssl.next_layer()} -,timer{*ios} -,timedout{false} +:ssl +{ + *ios, ssl +} +,sd +{ + this->ssl.next_layer() +} +,timer +{ + *ios +} +,timedout +{ + false +} { } @@ -208,40 +220,164 @@ ircd::socket::cancel() sd.cancel(); } +// +// Overload for operator() without a timeout. see: operator() +// void ircd::socket::operator()(handler h) { operator()(milliseconds(-1), std::move(h)); } +// +// This function calls back the handler when the socket has received +// something and is ready to be read from. +// +// The purpose here is to allow waiting for data from the socket without +// blocking any context and using any stack space whatsoever, i.e full +// asynchronous mode. +// +// boost::asio has no direct way to accomplish this, so we use a little +// trick to read a single byte with MSG_PEEK as our indication. This is +// done directly on the socket and not through the SSL cipher, but we +// don't want this byte anyway. This isn't such a great trick, because +// it may result in an extra syscall; so there's room for improvement here. +// void ircd::socket::operator()(const milliseconds &timeout, - handler h) + handler callback) { + static const auto flags + { + ip::tcp::socket::message_peek + }; + static char buffer[1]; - static const mutable_buffers buffers{{buffer, sizeof(buffer)}}; - static const auto flags(ip::tcp::socket::message_peek); + static const asio::mutable_buffers_1 buffers + { + buffer, sizeof(buffer) + }; + + auto handler + { + std::bind(&socket::handle, this, weak_from(*this), std::move(callback), ph::_1, ph::_2) + }; set_timeout(timeout); - sd.async_receive(buffers, flags, [this, handler(h), wp(weak_from(*this))] - (error_code ec, const size_t &bytes) + sd.async_receive(buffers, flags, std::move(handler)); +} + +void +ircd::socket::handle(const std::weak_ptr wp, + const handler callback, + const error_code &ec, + const size_t &bytes) +noexcept +{ + // This handler may still be registered with asio after the socket destructs, so + // the weak_ptr will indicate that fact. However, this is never intended and is + // a debug assertion which should be corrected. + if(unlikely(wp.expired())) { - assert(bytes <= sizeof(buffer)); + log::warning("socket(%p): belated callback to handler...", this); + assert(0); + return; + } - if(unlikely(wp.expired())) - { - log::warning("socket(%p): belated callback to handler...", this); - return; - } + // This handler and the timeout handler are responsible for canceling each other + // when one or the other is entered. If the timeout handler has already fired for + // a timeout on the socket, `timedout` will be `true` and this handler will be + // entered with an `operation_canceled` error. + if(!timedout) + timer.cancel(); + else + assert(ec == boost::system::errc::operation_canceled); - if(!handle_ready(ec)) - { - log::debug("socket(%p): %s", this, ec.message()); - return; - } + // We can handle a few errors at this level which don't ever need to invoke the + // user's callback. Otherwise they are passed up. + if(!handle_error(ec)) + { + log::debug("socket(%p): %s", this, ec.message()); + return; + } - handler(ec); - }); + call_user(callback, ec); +} + +void +ircd::socket::call_user(const handler &callback, + const error_code &ec) +noexcept try +{ + callback(ec); +} +catch(const std::exception &e) +{ + log::error("socket(%p): async handler: unhandled user exception: %s", + this, + e.what()); + + if(ircd::debugmode) + throw; +} + +bool +ircd::socket::handle_error(const error_code &ec) +{ + using namespace boost::system::errc; + + switch(ec.value()) + { + // A success is not an error; can call the user handler + case success: + return true; + + // A cancel is triggered either by the timeout handler or by + // a request to shutdown/close the socket. We only call the user's + // handler for a timeout, otherwise this is hidden from the user. + case operation_canceled: + return timedout; + + // This indicates the remote closed the socket, we still + // pass this up to the user so they can handle it. + case boost::asio::error::eof: + return true; + + // This is a condition which we hide from the user. + case bad_file_descriptor: + return false; + + // Everything else is passed up to the user. + default: + return true; + } +} + +void +ircd::socket::handle_timeout(const std::weak_ptr wp, + const error_code &ec) +{ + using namespace boost::system::errc; + + if(!wp.expired()) switch(ec.value()) + { + // A 'success' for this handler means there was a timeout on the socket + case success: + timedout = true; + cancel(); + break; + + // A cancelation means there was no timeout. + case operation_canceled: + timedout = false; + break; + + // All other errors are unexpected, logged and ignored here. + default: + log::error("socket::handle_timeout(): unexpected: %s\n", + ec.message()); + break; + } } bool @@ -254,55 +390,3 @@ catch(const boost::system::system_error &e) { return false; } - -bool -ircd::socket::handle_ready(const error_code &ec) -noexcept -{ - using namespace boost::system::errc; - - if(!timedout) - timer.cancel(); - - switch(ec.value()) - { - case success: - return true; - - case boost::asio::error::eof: - return true; - - case operation_canceled: - return timedout; - - case bad_file_descriptor: - return false; - - default: - return true; - } -} - -void -ircd::socket::handle_timeout(const std::weak_ptr wp, - const error_code &ec) -noexcept -{ - using namespace boost::system::errc; - - if(!wp.expired()) switch(ec.value()) - { - case success: - timedout = true; - cancel(); - break; - - case operation_canceled: - timedout = false; - break; - - default: - log::error("socket::handle_timeout(): unexpected: %s\n", ec.message().c_str()); - break; - } -}