0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-10-01 13:18:58 +02:00

ircd: Various improvements to client and socket related.

This commit is contained in:
Jason Volk 2017-08-23 14:47:15 -06:00
parent 2411cc5b9d
commit 207ac4ef53
4 changed files with 234 additions and 85 deletions

View file

@ -45,7 +45,6 @@ struct client
static list clients;
string_view type;
unique_const_iterator<list> clit;
std::shared_ptr<socket> sock;

View file

@ -82,8 +82,10 @@ struct socket
stat in, out;
bool timedout;
void handle_timeout(const std::weak_ptr<socket> wp, const error_code &ec) noexcept;
bool handle_ready(const error_code &ec) noexcept;
void call_user(const handler &, const error_code &) noexcept;
bool handle_error(const error_code &ec);
void handle_timeout(std::weak_ptr<socket> wp, const error_code &ec);
void handle(std::weak_ptr<socket>, handler, const error_code &, const size_t &) noexcept;
public:
operator const ip::tcp::socket &() const { return sd; }
@ -130,6 +132,8 @@ struct socket
socket(asio::ssl::context &ssl = sslv23_client,
boost::asio::io_service *const &ios = ircd::ios);
socket(socket &&) = delete;
socket(const socket &) = delete;
~socket() noexcept;
};

View file

@ -85,6 +85,7 @@ ircd::client::init::~init()
noexcept
{
request.interrupt();
ctx::yield();
disconnect_all();
socket_init.reset(nullptr);
}
@ -232,8 +233,7 @@ ircd::client::client(const host_port &host_port,
}
ircd::client::client(std::shared_ptr<socket> sock)
:type{type}
,clit{clients, clients.emplace(end(clients), this)}
:clit{clients, clients.emplace(end(clients), this)}
,sock{std::move(sock)}
{
}
@ -252,6 +252,8 @@ noexcept try
catch(const boost::system::system_error &e)
{
using boost::asio::error::eof;
using boost::asio::error::broken_pipe;
using boost::asio::error::connection_reset;
using namespace boost::system::errc;
switch(e.code().value())
@ -261,6 +263,8 @@ catch(const boost::system::system_error &e)
return true;
case eof:
case broken_pipe:
case connection_reset:
case not_connected:
case operation_canceled:
return false;
@ -378,7 +382,11 @@ ircd::handle_request(client &client,
std::shared_ptr<ircd::client>
ircd::add_client(std::shared_ptr<socket> s)
{
const auto client(std::make_shared<ircd::client>(std::move(s)));
const auto client
{
make_client(std::move(s))
};
log::debug("client[%s] CONNECTED local[%s]",
string(remote_addr(*client)),
string(local_addr(*client)));
@ -421,20 +429,52 @@ ircd::async_recv_next(std::shared_ptr<client> client)
async_recv_next(std::move(client), milliseconds(-1));
}
//
// This function is the basis for the client's request loop. We still use
// an asynchronous pattern until there is activity on the socket (a request)
// in which case we switch to synchronous mode by jumping into an ircd::context
// drawn from the request pool. When the request is finished, we exit back
// into asynchronous mode until the next request is received and rinse and repeat.
//
// This sequence exists to avoid any possible c10k-style limitation imposed by
// dedicating a context and its stack space to the lifetime of a connection.
// This is similar to the thread-per-request pattern before async was in vogue.
// Except now with userspace threads, a context switch has a cost on the order
// of a function call, not nearly that of a system thread. So after enduring
// several years of non-blocking stackless callback asynchronous web-scale hell,
// we have now made it out alive on the other side. Enjoy.
//
// Pay close attention to the comments to know exactly where you are and what
// you can do at any given point in this sequence.
//
void
ircd::async_recv_next(std::shared_ptr<client> client,
const milliseconds &timeout)
{
auto &sock(*client->sock);
sock(timeout, [client, timeout]
(const error_code &ec)
// This call returns immediately so we no longer block the current context and
// its stack while waiting for activity on idle connections between requests.
sock(timeout, [client(std::move(client)), timeout](const error_code &ec)
noexcept
{
// Right here this handler is executing on the main stack (not in any
// ircd::context). We handle any socket errors now, and if this function
// returns here the client's shared_ptr may expire and that will be the
// end of this client, socket, and connection...
if(!handle_ec(*client, ec))
return;
request([client, timeout]
// This call returns immediately because we can never block the main stack outside
// of the ircd::context system. The context the closure ends up getting is the next
// available from the request pool, which may not be available immediately so this
// handler might be queued for some time after this call returns.
request([client(std::move(client)), timeout]
{
// Right here this handler is executing on an ircd::context with its own
// stack dedicated to the lifetime of this request. If client::main()
// returns true, we bring the client back into async mode to wait for
// the next request.
if(client->main())
async_recv_next(client, timeout);
});
@ -465,20 +505,42 @@ ircd::handle_ec_success(client &client)
bool
ircd::handle_ec_eof(client &client)
try
{
log::debug("client[%s]: EOF", string(remote_addr(client)));
log::debug("client[%s]: EOF",
string(remote_addr(client)));
client.sock->disconnect(socket::FIN_RECV);
return false;
}
catch(const std::exception &e)
{
log::warning("client(%p): EOF: %s",
&client,
e.what());
return false;
}
bool
ircd::handle_ec_timeout(client &client)
try
{
auto &sock(*client.sock);
log::debug("client[%s]: disconnecting after inactivity timeout", string(remote_addr(client)));
log::debug("client[%s]: disconnecting after inactivity timeout",
string(remote_addr(client)));
sock.disconnect();
return false;
}
catch(const std::exception &e)
{
log::warning("client(%p): timeout: %s",
&client,
e.what());
return false;
}
std::string
ircd::string(const client::host_port &pair)

View file

@ -141,10 +141,22 @@ ircd::socket::socket(const ip::tcp::endpoint &remote,
ircd::socket::socket(asio::ssl::context &ssl,
boost::asio::io_service *const &ios)
:ssl{*ios, ssl}
,sd{this->ssl.next_layer()}
,timer{*ios}
,timedout{false}
:ssl
{
*ios, ssl
}
,sd
{
this->ssl.next_layer()
}
,timer
{
*ios
}
,timedout
{
false
}
{
}
@ -208,40 +220,164 @@ ircd::socket::cancel()
sd.cancel();
}
//
// Overload for operator() without a timeout. see: operator()
//
void
ircd::socket::operator()(handler h)
{
operator()(milliseconds(-1), std::move(h));
}
//
// This function calls back the handler when the socket has received
// something and is ready to be read from.
//
// The purpose here is to allow waiting for data from the socket without
// blocking any context and using any stack space whatsoever, i.e full
// asynchronous mode.
//
// boost::asio has no direct way to accomplish this, so we use a little
// trick to read a single byte with MSG_PEEK as our indication. This is
// done directly on the socket and not through the SSL cipher, but we
// don't want this byte anyway. This isn't such a great trick, because
// it may result in an extra syscall; so there's room for improvement here.
//
void
ircd::socket::operator()(const milliseconds &timeout,
handler h)
handler callback)
{
static const auto flags
{
ip::tcp::socket::message_peek
};
static char buffer[1];
static const mutable_buffers buffers{{buffer, sizeof(buffer)}};
static const auto flags(ip::tcp::socket::message_peek);
static const asio::mutable_buffers_1 buffers
{
buffer, sizeof(buffer)
};
auto handler
{
std::bind(&socket::handle, this, weak_from(*this), std::move(callback), ph::_1, ph::_2)
};
set_timeout(timeout);
sd.async_receive(buffers, flags, [this, handler(h), wp(weak_from(*this))]
(error_code ec, const size_t &bytes)
{
assert(bytes <= sizeof(buffer));
sd.async_receive(buffers, flags, std::move(handler));
}
void
ircd::socket::handle(const std::weak_ptr<socket> wp,
const handler callback,
const error_code &ec,
const size_t &bytes)
noexcept
{
// This handler may still be registered with asio after the socket destructs, so
// the weak_ptr will indicate that fact. However, this is never intended and is
// a debug assertion which should be corrected.
if(unlikely(wp.expired()))
{
log::warning("socket(%p): belated callback to handler...", this);
assert(0);
return;
}
if(!handle_ready(ec))
// This handler and the timeout handler are responsible for canceling each other
// when one or the other is entered. If the timeout handler has already fired for
// a timeout on the socket, `timedout` will be `true` and this handler will be
// entered with an `operation_canceled` error.
if(!timedout)
timer.cancel();
else
assert(ec == boost::system::errc::operation_canceled);
// We can handle a few errors at this level which don't ever need to invoke the
// user's callback. Otherwise they are passed up.
if(!handle_error(ec))
{
log::debug("socket(%p): %s", this, ec.message());
return;
}
handler(ec);
});
call_user(callback, ec);
}
void
ircd::socket::call_user(const handler &callback,
const error_code &ec)
noexcept try
{
callback(ec);
}
catch(const std::exception &e)
{
log::error("socket(%p): async handler: unhandled user exception: %s",
this,
e.what());
if(ircd::debugmode)
throw;
}
bool
ircd::socket::handle_error(const error_code &ec)
{
using namespace boost::system::errc;
switch(ec.value())
{
// A success is not an error; can call the user handler
case success:
return true;
// A cancel is triggered either by the timeout handler or by
// a request to shutdown/close the socket. We only call the user's
// handler for a timeout, otherwise this is hidden from the user.
case operation_canceled:
return timedout;
// This indicates the remote closed the socket, we still
// pass this up to the user so they can handle it.
case boost::asio::error::eof:
return true;
// This is a condition which we hide from the user.
case bad_file_descriptor:
return false;
// Everything else is passed up to the user.
default:
return true;
}
}
void
ircd::socket::handle_timeout(const std::weak_ptr<socket> wp,
const error_code &ec)
{
using namespace boost::system::errc;
if(!wp.expired()) switch(ec.value())
{
// A 'success' for this handler means there was a timeout on the socket
case success:
timedout = true;
cancel();
break;
// A cancelation means there was no timeout.
case operation_canceled:
timedout = false;
break;
// All other errors are unexpected, logged and ignored here.
default:
log::error("socket::handle_timeout(): unexpected: %s\n",
ec.message());
break;
}
}
bool
@ -254,55 +390,3 @@ catch(const boost::system::system_error &e)
{
return false;
}
bool
ircd::socket::handle_ready(const error_code &ec)
noexcept
{
using namespace boost::system::errc;
if(!timedout)
timer.cancel();
switch(ec.value())
{
case success:
return true;
case boost::asio::error::eof:
return true;
case operation_canceled:
return timedout;
case bad_file_descriptor:
return false;
default:
return true;
}
}
void
ircd::socket::handle_timeout(const std::weak_ptr<socket> wp,
const error_code &ec)
noexcept
{
using namespace boost::system::errc;
if(!wp.expired()) switch(ec.value())
{
case success:
timedout = true;
cancel();
break;
case operation_canceled:
timedout = false;
break;
default:
log::error("socket::handle_timeout(): unexpected: %s\n", ec.message().c_str());
break;
}
}