From 7c4659a99fd8319e2659bbfe0718b887d1fd4df8 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 13 Jan 2018 21:16:49 -0800 Subject: [PATCH] ircd::server: Develop basic infrastructure. --- include/ircd/server/link.h | 9 +- include/ircd/server/node.h | 11 +- include/ircd/server/request.h | 36 ++- ircd/server.cc | 404 +++++++++++++++++++++++++++------- 4 files changed, 367 insertions(+), 93 deletions(-) diff --git a/include/ircd/server/link.h b/include/ircd/server/link.h index 41a70fa3d..42d3c7c32 100644 --- a/include/ircd/server/link.h +++ b/include/ircd/server/link.h @@ -36,13 +36,16 @@ struct ircd::server::link bool busy() const; protected: - void handle_writable(const error_code &); - void wait_writable(); - + const_buffer process_read_next(const const_buffer &, struct request::tag &, bool &done); + bool process_read(const_buffer &); + void discard_read(); void handle_readable_success(); void handle_readable(const error_code &); void wait_readable(); + void handle_writable(const error_code &); + void wait_writable(); + void handle_close(std::exception_ptr); void handle_open(std::exception_ptr); diff --git a/include/ircd/server/node.h b/include/ircd/server/node.h index 4f6f4acb9..ec3efa703 100644 --- a/include/ircd/server/node.h +++ b/include/ircd/server/node.h @@ -33,9 +33,18 @@ struct ircd::server::node void handle_resolve(std::weak_ptr, std::exception_ptr, const ipport &); void resolve(const hostport &); + void del(link &); + void handle_error(link &, const boost::system::system_error &); + void handle_close(link &, std::exception_ptr); + void handle_open(link &, std::exception_ptr); + public: + size_t num_tags() const; + size_t num_links() const; + size_t num_links_busy() const; + size_t num_links_ready() const; + link &link_add(const size_t &num = 1); - void link_del(const size_t &num = 1); link &link_get(); void cancel(request &); diff --git a/include/ircd/server/request.h b/include/ircd/server/request.h index 00e1e5007..ec378d942 100644 --- a/include/ircd/server/request.h +++ b/include/ircd/server/request.h @@ -29,22 +29,35 @@ namespace ircd::server struct request; } -struct ircd::server::in -{ - mutable_buffer head; - mutable_buffer content; -}; - +/// Request data and options related to transmitting the request. This +/// is where buffers must be supplied to send data to the server. +/// struct ircd::server::out { + // supplied by user const_buffer head; const_buffer content; }; +/// Request data and options related to the receive side of the request. +/// This is where buffers are supplied to receive data from the remote +/// server. +/// +struct ircd::server::in +{ + // supplied by user + mutable_buffer head_buffer; + mutable_buffer content_buffer; + + // supplied by system + http::response::head head; +}; + /// This is a handle for being a client to another server. This handle will /// attempt to find an existing connection pool for the remote server otherwise -/// one will be created. Then it will multiplex your requests and demultiplex -/// your responses. +/// one will be created. Then it will multiplex your request and demultiplex +/// your response with all the other requests pending in the pipelines to +/// the remote. /// struct ircd::server::request :ctx::future @@ -52,7 +65,6 @@ struct ircd::server::request struct tag; struct tag *tag {nullptr}; - http::response::head head; public: server::out out; @@ -79,12 +91,12 @@ struct ircd::server::request::tag mutable_buffer make_content_buffer() const; mutable_buffer make_head_buffer() const; - bool read_content(const const_buffer &, const_buffer &overrun); - bool read_head(const const_buffer &, const_buffer &overrun); + const_buffer read_content(const const_buffer &, bool &done); + const_buffer read_head(const const_buffer &, bool &done); public: mutable_buffer make_read_buffer() const; - bool read_buffer(const const_buffer &, const_buffer &overrun); + const_buffer read_buffer(const const_buffer &, bool &done); tag(server::request &); tag(tag &&) noexcept; diff --git a/ircd/server.cc b/ircd/server.cc index 525dcb949..7b5dc35b3 100644 --- a/ircd/server.cc +++ b/ircd/server.cc @@ -171,31 +171,62 @@ noexcept { } -bool +/// Called by the controller of the socket with a view of the data received by +/// the socket. The location and size of `buffer` is the same or smaller than +/// the buffer previously supplied by make_read_buffer(). +/// +/// Sometimes make_read_buffer() supplies a buffer that is too large, and some +/// data read off the socket does not belong to this tag. In that case, This +/// function returns a const_buffer viewing the portion of `buffer` which is +/// considered the "overrun," and the socket controller will copy that over to +/// the next tag. +/// +/// The tag indicates it is entirely finished with receiving its data by +/// setting the value of `done` to true. Otherwise it is assumed false. +/// +ircd::const_buffer ircd::server::request::tag::read_buffer(const const_buffer &buffer, - const_buffer &overrun) + bool &done) { - assert(request); - return !request? true: - !request->head.status? read_head(buffer, overrun): - read_content(buffer, overrun); + return !request? const_buffer{}: + !request->in.head.status? read_head(buffer, done): + read_content(buffer, done); } +/// An idempotent operation that provides the location of where the socket +/// should place the next received data. The tag figures this out based on +/// whether it receiving HTTP head data or whether it is in content mode. +/// +/// In head mode, portions of the user's buffer are returned starting with +/// the entire buffer. After a call read_buffer() supplying more data from +/// the socket, the subsequent invocation of make_read_buffer() will return +/// smaller windows for the remaining free space. Head mode is completed when +/// an \r\n\r\n terminator is received. +/// +/// In content mode, portions of the user's buffer are returned starting with +/// one that is at most content_length. After a call to read_buffer() +/// supplying more data, the remaining space of the remaining content_length +/// are returned. +/// ircd::mutable_buffer ircd::server::request::tag::make_read_buffer() const { assert(request); - return !request? mutable_buffer{}: - !request->head.status? make_head_buffer(): - make_content_buffer(); + return !request? mutable_buffer{}: + !request->in.head.status? make_head_buffer(): + make_content_buffer(); } -bool +ircd::const_buffer ircd::server::request::tag::read_head(const const_buffer &buffer, - const_buffer &overrun) + bool &done) { + assert(request); auto &req{*request}; + auto &head{req.in.head}; + const auto &head_buffer{req.in.head_buffer}; + const auto &content_buffer{req.in.content_buffer}; // informal search for head terminator static const string_view terminator{"\r\n\r\n"}; @@ -210,7 +241,7 @@ ircd::server::request::tag::read_head(const const_buffer &buffer, if(pos == string_view::npos) { head_read += size(buffer); - return false; + return {}; } // The given buffer may go past the end of the head and we may already @@ -226,11 +257,11 @@ ircd::server::request::tag::read_head(const const_buffer &buffer, head_read += addl_head_bytes; // Setup the capstan and mark the end of the tape - parse::buffer pb{mutable_buffer{data(req.in.head), head_read}}; + parse::buffer pb{mutable_buffer{data(head_buffer), head_read}}; parse::capstan pc{pb}; pc.read += head_read; - req.head = http::response::head{pc}; + head = http::response::head{pc}; assert(pb.completed() == head_read); const size_t overrun_length @@ -240,46 +271,49 @@ ircd::server::request::tag::read_head(const const_buffer &buffer, const size_t &content_read { - std::min(req.head.content_length, overrun_length) + std::min(head.content_length, overrun_length) }; const const_buffer partial_content { - data(req.in.head) + head_read, content_read + data(head_buffer) + head_read, content_read }; // Any partial content was written to the head buffer by accident, // that has to be copied over to the content buffer. - this->content_read += copy(req.in.content, partial_content); + this->content_read += copy(content_buffer, partial_content); assert(this->content_read == size(partial_content)); // Anything remaining is not our response and must be given back assert(overrun_length >= content_read); - overrun = const_buffer + const const_buffer overrun { - data(req.in.head) + head_read + content_read, overrun_length - content_read + data(head_buffer) + head_read + content_read, overrun_length - content_read }; - if(this->content_read == req.head.content_length) + if(this->content_read == head.content_length) { - p.set_value(http::status(req.head.status)); - return true; + done = true; + p.set_value(http::status(head.status)); } - return false; + return overrun; } -bool +ircd::const_buffer ircd::server::request::tag::read_content(const const_buffer &buffer, - const_buffer &overrun) + bool &done) { + assert(request); auto &req{*request}; + const auto &head{req.in.head}; + const auto &content_buffer{req.in.content_buffer}; // The amount of remaining content for the response sequence - assert(req.head.content_length >= content_read); + assert(head.content_length >= content_read); const size_t remaining { - req.head.content_length - content_read + head.content_length - content_read }; // The amount of content read in this buffer only. @@ -296,37 +330,42 @@ ircd::server::request::tag::read_content(const const_buffer &buffer, content_read += addl_content_read; // Report anything that doesn't belong to us. - overrun = const_buffer + const const_buffer overrun { - data(req.in.content) + content_read, overrun_length + data(content_buffer) + content_read, overrun_length }; - assert(content_read <= req.head.content_length); - if(content_read == req.head.content_length) + assert(content_read <= head.content_length); + if(content_read == head.content_length) { - p.set_value(http::status(req.head.status)); - return true; + done = true; + p.set_value(http::status(head.status)); } - else return false; + + return overrun; } ircd::mutable_buffer ircd::server::request::tag::make_head_buffer() const { + assert(request); const auto &req{*request}; - if(head_read >= size(req.in.head)) + const auto &head_buffer{req.in.head_buffer}; + const auto &content_buffer{req.in.content_buffer}; + + if(head_read >= size(head_buffer)) return {}; const size_t remaining { - size(req.in.head) - head_read + size(head_buffer) - head_read }; - assert(remaining <= size(req.in.head)); + assert(remaining <= size(head_buffer)); const mutable_buffer buffer { - data(req.in.head) + head_read, remaining + data(head_buffer) + head_read, remaining }; return buffer; @@ -336,27 +375,30 @@ ircd::mutable_buffer ircd::server::request::tag::make_content_buffer() const { - auto &req{*request}; + assert(request); + const auto &req{*request}; + const auto &head{req.in.head}; + const auto &content_buffer{req.in.content_buffer}; // The amount of bytes we still have to read to for the response - assert(req.head.content_length >= content_read); + assert(head.content_length >= content_read); const size_t remaining { - req.head.content_length - content_read + head.content_length - content_read }; // The amount of bytes available in the user's buffer. - assert(size(req.in.content) >= content_read); + assert(size(content_buffer) >= content_read); const size_t available { - size(req.in.content) - content_read + size(content_buffer) - content_read }; // For now, this has to trip right here. assert(available >= remaining); const mutable_buffer buffer { - data(req.in.content) + content_read, std::min(available, remaining) + data(content_buffer) + content_read, std::min(available, remaining) }; return buffer; @@ -414,8 +456,79 @@ ircd::server::node::link_add(const size_t &num) } void -ircd::server::node::link_del(const size_t &num) +ircd::server::node::handle_open(link &link, + std::exception_ptr eptr) +try { + if(eptr) + std::rethrow_exception(eptr); + + dock.notify_all(); +} +catch(const std::exception &e) +{ + log::error("node(%p) link(%p) [%s]: open: %s", + this, + &link, + string(remote), + e.what()); +} + +void +ircd::server::node::handle_close(link &link, + std::exception_ptr eptr) +try +{ + const unwind remove{[this, &link] + { + this->del(link); + }}; + + if(eptr) + std::rethrow_exception(eptr); +} +catch(const std::exception &e) +{ + log::error("node(%p) link(%p) [%s]: close: %s", + this, + &link, + string(remote), + e.what()); +} + +void +ircd::server::node::handle_error(link &link, + const boost::system::system_error &e) +{ + log::error("node(%p) link(%p) [%s]: error: %s", + this, + &link, + string(remote), + e.what()); + + if(!link.busy()) + { + link.close(net::close_opts_default); + return; + } +} + +void +ircd::server::node::del(link &link) +{ + log::debug("node(%p) [%s]: removing link %p", + this, + string(remote), + &link); + + const auto it(std::find_if(begin(links), end(links), [&link] + (const auto &link_) + { + return &link_ == &link; + })); + + assert(it != end(links)); + links.erase(it); } void @@ -447,6 +560,46 @@ catch(const std::bad_weak_ptr &) return; } +size_t +ircd::server::node::num_links_ready() +const +{ + return std::accumulate(begin(links), end(links), size_t(0), [] + (auto ret, const auto &link) + { + return ret += link.ready(); + }); +} + +size_t +ircd::server::node::num_links_busy() +const +{ + return std::accumulate(begin(links), end(links), size_t(0), [] + (auto ret, const auto &link) + { + return ret += link.busy(); + }); +} + +size_t +ircd::server::node::num_links() +const +{ + return links.size(); +} + +size_t +ircd::server::node::num_tags() +const +{ + return std::accumulate(begin(links), end(links), size_t(0), [] + (auto ret, const auto &link) + { + return ret += link.queue.size(); + }); +} + // // link // @@ -501,12 +654,13 @@ ircd::server::link::open(const net::open_opts &open_opts) void ircd::server::link::handle_open(std::exception_ptr eptr) { - eptr = std::move(eptr); init = false; - node->dock.notify_all(); - if(!eptr) + if(!this->eptr) wait_readable(); + + if(node) + node->handle_open(*this, std::move(eptr)); } bool @@ -532,20 +686,10 @@ ircd::server::link::close(const net::close_opts &close_opts) void ircd::server::link::handle_close(std::exception_ptr eptr) { - eptr = std::move(eptr); fini = false; -} -void -ircd::server::link::wait_readable() -{ - auto handler - { - std::bind(&link::handle_readable, this, ph::_1) - }; - - assert(ready()); - net::wait(*socket, net::ready::READ, std::move(handler)); + if(node) + node->handle_close(*this, std::move(eptr)); } void @@ -560,8 +704,27 @@ ircd::server::link::wait_writable() net::wait(*socket, net::ready::WRITE, std::move(handler)); } +void +ircd::server::link::handle_writable(const error_code &ec) +{ + std::cout << this << " writable: " << string(ec) << std::endl; +} + +void +ircd::server::link::wait_readable() +{ + auto handler + { + std::bind(&link::handle_readable, this, ph::_1) + }; + + assert(ready()); + net::wait(*socket, net::ready::READ, std::move(handler)); +} + void ircd::server::link::handle_readable(const error_code &ec) +try { using namespace boost::system::errc; using boost::system::system_category; @@ -573,54 +736,141 @@ ircd::server::link::handle_readable(const error_code &ec) wait_readable(); break; + case operation_canceled: + break; + default: throw boost::system::system_error{ec}; } } +catch(const boost::system::system_error &e) +{ + if(node) + node->handle_error(*this, e); +} +catch(const std::exception &e) +{ + log::critical("link::handle_readable(): %s", e.what()); + assert(0); + throw; +} +/// Process as many read operations from as many tags as possible void ircd::server::link::handle_readable_success() { if(queue.empty()) - throw std::out_of_range("queue empty"); + return discard_read(); + // Data pointed to by overrun will remain intact between iterations + // because this loop isn't executing in any ircd::ctx. + const_buffer overrun; do + { + if(!process_read(overrun)) + break; + } + while(!queue.empty()); +} + +void +ircd::server::link::discard_read() +{ + const size_t discard + { + available(*socket) + }; + + const size_t discarded + { + discard_any(*socket, discard) + }; + + // Shouldn't ever be hit because the read() within discard() throws + // the pending error like an eof. + log::warning("Link discarded %zu of %zu unexpected bytes", + discard, + discarded); + assert(0); + + // for non-assert builds just in case; so this doesn't get loopy with + // discarding zero with an empty queue... + if(unlikely(!discard || !discarded)) + throw assertive("Queue is empty and nothing to discard."); +} + +/// Process as many read operations for one tag as possible +bool +ircd::server::link::process_read(const_buffer &overrun) +try +{ auto &tag { queue.front() }; + bool done{false}; do + { + overrun = process_read_next(overrun, tag, done); + } + while(!done); + + queue.pop_front(); + return true; +} +catch(const boost::system::system_error &e) +{ + using namespace boost::system::errc; + switch(e.code().value()) + { + case resource_unavailable_try_again: + return false; + + case success: + assert(0); + + default: + throw; + } +} + +/// Process one read operation for one tag +ircd::const_buffer +ircd::server::link::process_read_next(const const_buffer &underrun, + struct request::tag &tag, + bool &done) +{ const mutable_buffer buffer { tag.make_read_buffer() }; - const size_t bytes + const size_t copied { - read_one(*socket, buffer) + copy(buffer, underrun) }; - const const_buffer received + const mutable_buffer remaining { - data(buffer), bytes + data(buffer) + copied, size(buffer) - copied }; - const_buffer overrun; - const bool done + const size_t received { - tag.read_buffer(received, overrun) + read_one(*socket, remaining) }; - if(!empty(overrun)) - std::cout << "got overrun: " << size(overrun) << std::endl; + const const_buffer view + { + data(buffer), copied + received + }; - if(done) - queue.pop_front(); -} - -void -ircd::server::link::handle_writable(const error_code &ec) -{ + const const_buffer overrun + { + tag.read_buffer(view, done) + }; + assert(done || empty(overrun)); + return overrun; } bool