From fcd410d65602160683d011a306659e60f0e91e88 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 12 Sep 2016 17:10:04 -0700 Subject: [PATCH] ircd::client: Add timer logic to client socket/io. Note: Not sure if I like two reference counters for each client, ideally, but we'll see where it goes after abstractions. --- include/ircd/client.h | 6 +++-- include/ircd/sock.h | 50 ++++++++++++++++++++++++++++++++++-- ircd/client.cc | 57 +++++++++++++++++++++++++++++++----------- modules/conf/listen.cc | 2 +- 4 files changed, 96 insertions(+), 19 deletions(-) diff --git a/include/ircd/client.h b/include/ircd/client.h index 049c26940..eecf9c318 100644 --- a/include/ircd/client.h +++ b/include/ircd/client.h @@ -59,7 +59,9 @@ enum class dc bool connected(const client &) noexcept; bool disconnect(std::nothrow_t, client &, const dc & = dc::FIN) noexcept; void disconnect(client &, const dc & = dc::FIN); -void set_recv(client &); +void recv_cancel(client &); +void recv_next(client &, const std::chrono::milliseconds &timeout); +void recv_next(client &); // Destroys a client. This only removes the client from the clients list, // and may result in a destruction and disconnect, or it may not. @@ -67,7 +69,7 @@ void finished(client &); // Creates a client. std::shared_ptr add_client(); -std::shared_ptr add_client(std::unique_ptr); +std::shared_ptr add_client(std::shared_ptr); using clist = std::list>; const clist &clients(); diff --git a/include/ircd/sock.h b/include/ircd/sock.h index 92d72c19a..ef9f60fcf 100644 --- a/include/ircd/sock.h +++ b/include/ircd/sock.h @@ -35,17 +35,22 @@ using boost::system::error_code; using boost::asio::steady_timer; struct sock +:std::enable_shared_from_this { using message_flags = boost::asio::socket_base::message_flags; ip::tcp::socket sd; steady_timer timer; + bool timedout; operator const ip::tcp::socket &() const { return sd; } operator ip::tcp::socket &() { return sd; } ip::tcp::endpoint remote() const { return sd.remote_endpoint(); } ip::tcp::endpoint local() const { return sd.local_endpoint(); } + void handle_timeout(const std::weak_ptr, const error_code &); + template void set_timeout(const duration &); + template auto recv_some(const mutable_buffers &, const message_flags & = 0); template auto recv(const mutable_buffers &); @@ -74,7 +79,9 @@ template auto sock::send(const const_buffers &bufs) { - return async_write(sd, bufs, yield(continuation())); + const auto ret(async_write(sd, bufs, yield(continuation()))); + timer.cancel(); + return ret; } // Block until something transmitted, returns amount @@ -91,7 +98,9 @@ template auto sock::recv(const mutable_buffers &bufs) { - return async_read(sd, bufs, yield(continuation())); + const auto ret(async_read(sd, bufs, yield(continuation()))); + timer.cancel(); + return ret; } // Block until something in buffers, returns size @@ -103,6 +112,43 @@ sock::recv_some(const mutable_buffers &bufs, return sd.async_receive(bufs, flags, yield(continuation())); } +template +void +sock::set_timeout(const duration &t) +{ + if(t < duration(0)) + return; + + timer.expires_from_now(t); + timer.async_wait(std::bind(&sock::handle_timeout, this, shared_from_this(), ph::_1)); +} + +inline void +sock::handle_timeout(const std::weak_ptr wp, + const error_code &ec) +{ + using namespace boost::system::errc; + + if(!wp.expired()) switch(ec.value()) + { + case success: + { + timedout = true; + error_code sd_ec; + sd.cancel(sd_ec); + return; + } + + case operation_canceled: + timedout = false; + return; + + default: + log::error("sock::handle_timeout(): unexpected: %s\n", ec.message().c_str()); + return; + } +} + inline uint16_t local_port(const sock &sock) { diff --git a/ircd/client.cc b/ircd/client.cc index 847808437..29e557e70 100644 --- a/ircd/client.cc +++ b/ircd/client.cc @@ -114,7 +114,7 @@ namespace ircd { struct rbuf rbuf; clist::const_iterator clit; - std::unique_ptr sock; + std::shared_ptr sock; client(); client(const client &) = delete; @@ -149,7 +149,7 @@ ircd::clients() } std::shared_ptr -ircd::add_client(std::unique_ptr sock) +ircd::add_client(std::shared_ptr sock) { auto client(add_client()); client->sock = std::move(sock); @@ -157,7 +157,7 @@ ircd::add_client(std::unique_ptr sock) string(remote_address(*client)).c_str(), string(local_address(*client)).c_str()); - set_recv(*client); + recv_next(*client); return client; } @@ -221,7 +221,14 @@ catch(...) } void -ircd::set_recv(client &client) +ircd::recv_next(client &client) +{ + recv_next(client, std::chrono::milliseconds(-1)); +} + +void +ircd::recv_next(client &client, + const std::chrono::milliseconds &timeout) { using boost::asio::async_read; @@ -229,36 +236,45 @@ ircd::set_recv(client &client) rbuf.reset(); auto &sock(*client.sock); + sock.set_timeout(timeout); async_read(sock.sd, mutable_buffers_1(rbuf.buf.data(), rbuf.buf.size()), std::bind(&rbuf::handle_pck, &rbuf, ph::_1, ph::_2), std::bind(&ircd::handle_recv, std::ref(client), ph::_1, ph::_2)); } +void +ircd::recv_cancel(client &client) +{ + auto &sock(socket(client)); + sock.sd.cancel(); +} + void ircd::handle_recv(client &client, const error_code &ec, const size_t bytes) try { - if(!handle_error(client, ec)) + if(!handle_ec(client, ec)) return; auto &rbuf(client.rbuf); auto &reel(rbuf.reel); execute(client, reel); -} -catch(const rfc1459::syntax_error &e) -{ - std::cerr << e.what() << std::endl; + recv_next(client); } catch(const std::exception &e) { - std::cerr << "errored: " << e.what() << std::endl; + log::error("client[%s]: error: %s", + string(remote_address(client)).c_str(), + e.what()); + + finished(client); } bool -ircd::handle_error(client &client, - const error_code &ec) +ircd::handle_ec(client &client, + const error_code &ec) { using namespace boost::system::errc; using boost::asio::error::eof; @@ -268,8 +284,21 @@ ircd::handle_error(client &client, switch(ec.value()) { - case success: return true; - default: throw boost::system::system_error(ec); + case success: return handle_ec_success(client); + case operation_canceled: return handle_ec_cancel(client); + case eof: return handle_ec_eof(client); + default: throw boost::system::system_error(ec); + } +} + +bool +ircd::handle_ec_success(client &client) +{ + auto &sock(*client.sock); + { + error_code ec; + sock.timer.cancel(ec); + assert(ec == boost::system::errc::success); } return true; diff --git a/modules/conf/listen.cc b/modules/conf/listen.cc index 8880152ef..97af202fd 100644 --- a/modules/conf/listen.cc +++ b/modules/conf/listen.cc @@ -120,7 +120,7 @@ bool listener::accept() try { - auto sock(std::make_unique()); + auto sock(std::make_shared()); acceptor.async_accept(sock->sd, yield(continuation())); add_client(std::move(sock)); return true;