From 59621eb266793a4bcabaaa069e5f00aca6020d88 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 15 Jan 2018 00:11:32 -0800 Subject: [PATCH] ircd::server: Improve write stack; add close_all(); various. --- include/ircd/server/link.h | 8 +- include/ircd/server/node.h | 3 + include/ircd/server/request.h | 4 +- include/ircd/server/server.h | 4 +- include/ircd/server/tag.h | 9 ++- ircd/server.cc | 146 ++++++++++++++++++++++++---------- 6 files changed, 125 insertions(+), 49 deletions(-) diff --git a/include/ircd/server/link.h b/include/ircd/server/link.h index 2cb8674c2..a3e47ae11 100644 --- a/include/ircd/server/link.h +++ b/include/ircd/server/link.h @@ -27,9 +27,8 @@ struct ircd::server::link bool init {false}; ///< link is connecting bool fini {false}; ///< link is disconnecting std::shared_ptr node; ///< backreference to node - std::exception_ptr eptr; ///< error from socket std::shared_ptr socket; ///< link's socket - std::deque queue; ///< link's work queue + std::deque queue; ///< link's work queue bool connected() const noexcept; bool ready() const; @@ -37,12 +36,15 @@ struct ircd::server::link protected: void discard_read(); - const_buffer process_read_next(const const_buffer &, struct request::tag &, bool &done); + const_buffer process_read_next(const const_buffer &, tag &, bool &done); bool process_read(const_buffer &); void handle_readable_success(); void handle_readable(const error_code &); void wait_readable(); + const_buffer process_write_next(const const_buffer &); + bool process_write(tag &); + void handle_writable_success(); void handle_writable(const error_code &); void wait_writable(); diff --git a/include/ircd/server/node.h b/include/ircd/server/node.h index ec3efa703..a6875ce52 100644 --- a/include/ircd/server/node.h +++ b/include/ircd/server/node.h @@ -50,6 +50,9 @@ struct ircd::server::node void cancel(request &); void submit(request &); + void interrupt(); + void close(); + node(); ~node() noexcept; }; diff --git a/include/ircd/server/request.h b/include/ircd/server/request.h index b3aee97eb..2afc1dfd5 100644 --- a/include/ircd/server/request.h +++ b/include/ircd/server/request.h @@ -62,9 +62,7 @@ struct ircd::server::in struct ircd::server::request :ctx::future { - struct tag; - - struct tag *tag {nullptr}; + server::tag *tag {nullptr}; public: server::out out; diff --git a/include/ircd/server/server.h b/include/ircd/server/server.h index 553f62bce..3a0003462 100644 --- a/include/ircd/server/server.h +++ b/include/ircd/server/server.h @@ -27,6 +27,8 @@ namespace ircd::server struct init; struct link; struct node; + struct request; + struct tag; IRCD_EXCEPTION(ircd::error, error); @@ -38,8 +40,8 @@ namespace ircd::server node &get(const net::hostport &); } -#include "request.h" #include "tag.h" +#include "request.h" #include "link.h" #include "node.h" diff --git a/include/ircd/server/tag.h b/include/ircd/server/tag.h index 9e00b3619..6a20b7478 100644 --- a/include/ircd/server/tag.h +++ b/include/ircd/server/tag.h @@ -20,9 +20,14 @@ #pragma once #define HAVE_IRCD_SERVER_TAG_H +namespace ircd::server +{ + struct tag; +} + /// Internal portion of the request -// -struct ircd::server::request::tag +/// +struct ircd::server::tag { server::request *request; ctx::promise p; diff --git a/ircd/server.cc b/ircd/server.cc index 7416b20a5..5150da27d 100644 --- a/ircd/server.cc +++ b/ircd/server.cc @@ -23,6 +23,9 @@ namespace ircd::server { std::shared_ptr create(const net::hostport &); + + void interrupt_all(); + void close_all(); } decltype(ircd::server::log) @@ -81,13 +84,28 @@ ircd::server::init::init() ircd::server::init::~init() noexcept { + close_all(); nodes.clear(); } void ircd::server::init::interrupt() { - nodes.clear(); + interrupt_all(); +} + +void +ircd::server::close_all() +{ + for(auto &node : nodes) + node.second->close(); +} + +void +ircd::server::interrupt_all() +{ + for(auto &node : nodes) + node.second->interrupt(); } /// @@ -135,16 +153,16 @@ noexcept } // -// request::tag +// tag // -ircd::server::request::tag::tag(server::request &request) +ircd::server::tag::tag(server::request &request) :request{&request} { static_cast &>(request) = p; } -ircd::server::request::tag::tag(tag &&o) +ircd::server::tag::tag(tag &&o) noexcept :request{std::move(o.request)} { @@ -153,8 +171,8 @@ noexcept request->tag = this; } -struct ircd::server::request::tag & -ircd::server::request::tag::operator=(tag &&o) +struct ircd::server::tag & +ircd::server::tag::operator=(tag &&o) noexcept { request = std::move(o.request); @@ -166,7 +184,7 @@ noexcept return *this; } -ircd::server::request::tag::~tag() +ircd::server::tag::~tag() noexcept { } @@ -185,8 +203,8 @@ noexcept /// setting the value of `done` to true. Otherwise it is assumed false. /// ircd::const_buffer -ircd::server::request::tag::read_buffer(const const_buffer &buffer, - bool &done) +ircd::server::tag::read_buffer(const const_buffer &buffer, + bool &done) { return !request? const_buffer{}: !request->in.head.status? read_head(buffer, done): @@ -209,7 +227,7 @@ ircd::server::request::tag::read_buffer(const const_buffer &buffer, /// are returned. /// ircd::mutable_buffer -ircd::server::request::tag::make_read_buffer() +ircd::server::tag::make_read_buffer() const { assert(request); @@ -219,24 +237,26 @@ const } void -ircd::server::request::tag::wrote_buffer(const const_buffer &buffer) +ircd::server::tag::wrote_buffer(const const_buffer &buffer) { assert(request); const auto &req{*request}; if(head_written < size(req.out.head)) { - assert(data(buffer) == data(req.out.head)); head_written += size(buffer); + assert(data(buffer) == data(req.out.head)); + assert(head_written <= size(req.out.head)); } else if(content_written < size(req.out.content)) { - assert(data(buffer) == data(req.out.content)); content_written += size(buffer); + assert(data(buffer) == data(req.out.content)); + assert(content_written <= size(req.out.content)); } } ircd::const_buffer -ircd::server::request::tag::make_write_buffer() +ircd::server::tag::make_write_buffer() const { assert(request); @@ -247,7 +267,8 @@ const const const_buffer window{data(req.out.head) + head_written, remain}; return window; } - else if(content_written < size(req.out.content)) + + if(content_written < size(req.out.content)) { const size_t remain{size(req.out.content) - content_written}; const const_buffer window{data(req.out.content) + content_written, remain}; @@ -258,8 +279,8 @@ const } ircd::const_buffer -ircd::server::request::tag::read_head(const const_buffer &buffer, - bool &done) +ircd::server::tag::read_head(const const_buffer &buffer, + bool &done) { assert(request); auto &req{*request}; @@ -340,8 +361,8 @@ ircd::server::request::tag::read_head(const const_buffer &buffer, } ircd::const_buffer -ircd::server::request::tag::read_content(const const_buffer &buffer, - bool &done) +ircd::server::tag::read_content(const const_buffer &buffer, + bool &done) { assert(request); auto &req{*request}; @@ -385,7 +406,7 @@ ircd::server::request::tag::read_content(const const_buffer &buffer, } ircd::mutable_buffer -ircd::server::request::tag::make_read_head_buffer() +ircd::server::tag::make_read_head_buffer() const { assert(request); @@ -411,7 +432,7 @@ const } ircd::mutable_buffer -ircd::server::request::tag::make_read_content_buffer() +ircd::server::tag::make_read_content_buffer() const { assert(request); @@ -454,6 +475,26 @@ ircd::server::node::node() ircd::server::node::~node() noexcept { + assert(links.empty()); +} + +void +ircd::server::node::close() +{ + for(auto &link : links) + link.close(net::close_opts_default); + + while(!links.empty()) + dock.wait(); +} + +void +ircd::server::node::interrupt() +{ + //TODO: not a close + //TODO: interrupt = killing requests but still setting tag promises + for(auto &link : links) + link.close(net::close_opts_default); } void @@ -568,6 +609,7 @@ ircd::server::node::del(link &link) assert(it != end(links)); links.erase(it); + dock.notify_all(); } void @@ -591,7 +633,6 @@ try this->eptr = std::move(eptr); static_cast(this->remote) = ipport; - dock.notify_all(); } catch(const std::bad_weak_ptr &) @@ -694,7 +735,7 @@ ircd::server::link::handle_open(std::exception_ptr eptr) { init = false; - if(!this->eptr) + if(!eptr) wait_readable(); if(node) @@ -751,16 +792,34 @@ ircd::server::link::handle_writable(const error_code &ec) switch(ec.value()) { case success: + handle_writable_success(); break; case operation_canceled: - break; + return; default: throw boost::system::system_error{ec}; } +} +void +ircd::server::link::handle_writable_success() +{ for(auto &tag : queue) + { + if(!process_write(tag)) + { + wait_writable(); + break; + } + } +} + +bool +ircd::server::link::process_write(tag &tag) +{ + while(1) { const const_buffer buffer { @@ -768,28 +827,35 @@ ircd::server::link::handle_writable(const error_code &ec) }; if(empty(buffer)) - continue; - - const size_t bytes - { - write_any(*socket, buffer) - }; + return true; const const_buffer written { - data(buffer), bytes + process_write_next(buffer) }; tag.wrote_buffer(written); - - if(bytes < size(buffer)) - { - wait_writable(); - break; - } + if(size(written) < size(buffer)) + return false; } } +ircd::const_buffer +ircd::server::link::process_write_next(const const_buffer &buffer) +{ + const size_t bytes + { + write_any(*socket, buffer) + }; + + const const_buffer written + { + data(buffer), bytes + }; + + return written; +} + void ircd::server::link::wait_readable() { @@ -817,7 +883,7 @@ try break; case operation_canceled: - break; + return; default: throw boost::system::system_error{ec}; @@ -890,7 +956,7 @@ catch(const boost::system::system_error &e) /// Process one read operation for one tag ircd::const_buffer ircd::server::link::process_read_next(const const_buffer &underrun, - struct request::tag &tag, + tag &tag, bool &done) { const mutable_buffer buffer @@ -964,7 +1030,7 @@ bool ircd::server::link::ready() const { - return connected() && !init && !fini && !eptr; + return connected() && !init && !fini; } bool