From fa3afc7ad746309790499fd0cc85058ac1d1cda6 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 12 Jan 2018 18:57:58 -0800 Subject: [PATCH] ircd::server: Preliminary client request pipeline framework. --- include/ircd/server.h | 223 +++++++----- ircd/server.cc | 787 +++++++++++++++++++++++++++++++----------- 2 files changed, 719 insertions(+), 291 deletions(-) diff --git a/include/ircd/server.h b/include/ircd/server.h index 02c7d6815..7c63f376f 100644 --- a/include/ircd/server.h +++ b/include/ircd/server.h @@ -1,120 +1,175 @@ -/* - * Copyright (C) 2017 Charybdis Development Team - * Copyright (C) 2017 Jason Volk - * - * Permission to use, copy, modify, and/or distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice is present in all copies. - * - * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR - * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING - * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. +// +// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR +// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, +// INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING +// IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. #pragma once #define HAVE_IRCD_SERVER_H -namespace ircd -{ - struct server; - - size_t write(server &, const ilist &iov); - char *read(server &, char *&start, char *const &stop); - - http::request::write_closure write_closure(server &); - parse::read_closure read_closure(server &); -} - /// The interface for when IRCd plays the role of client to other servers /// -/// This is a handle for being a client to another server. This class is -/// named ircd::server and not ircd::client because this project is IRCd -/// and not IRCc. This handle will attempt to find an existing connection -/// pool for the remote server and then multiplex your requests and demultiplex -/// your responses to achieve concurrency and sharing and limitations and -/// shape the pipeline as best as the pool allows along with other instances -/// of ircd::server for the same remote. -/// -/// Note that this means ircd::server is only appropriate for stateless -/// protocols like HTTP and chunked-encoding should be avoided if we are -/// to get the best out of nagle'ing the pipe. Individual net::socket should -/// be used otherwise. -/// -/// This handle is a "tag" which services a single request and response per -/// instance to an ircd::server::node over one ircd::server::link available -/// to that node. Those interfaces are internal and don't have to be messed -/// with. -/// -/// Instances of this class can be used in arrays to make bulk requests. -/// -struct ircd::server +namespace ircd::server { struct init; struct node; struct link; + struct request; + struct out; + struct in; - static std::map> nodes; + IRCD_EXCEPTION(ircd::error, error); - std::shared_ptr n; - unique_iterator> it; + extern ircd::log::log log; + extern std::map> nodes; + + bool exists(const net::hostport &); + node &find(const net::hostport &); + node &get(const net::hostport &); +} + +struct ircd::server::out +{ + const_buffer head; + const_buffer content; +}; + +struct ircd::server::in +{ + mutable_buffer head; + mutable_buffer content; +}; + +/// This is a handle for being a client to another server. This handle will +/// attempt to find an existing connection pool for the remote server otherwise +/// one will be created. Then it will multiplex your requests and demultiplex +/// your responses. +/// +struct ircd::server::request +:ctx::future +{ + struct tag; + + struct tag *tag {nullptr}; + http::response::head head; public: - operator const net::remote &() const; + server::out out; + server::in in; - server() = default; - server(server &&) noexcept = default; - server &operator=(server &&) noexcept = default; - server(net::remote); - ~server() noexcept; + request(const net::hostport &, server::out out, server::in in); + request() = default; + request(request &&) noexcept; + request(const request &) = delete; + request &operator=(request &&) noexcept; + request &operator=(const request &) = delete; + ~request() noexcept; }; -struct ircd::server::link +/// Internal portion of the request +// +struct ircd::server::request::tag { - enum state :int; + server::request *request; + ctx::promise p; + size_t head_read {0}; + size_t content_read {0}; - std::shared_ptr s; - std::deque q; - enum state state; + mutable_buffer make_content_buffer() const; + mutable_buffer make_head_buffer() const; - bool connected() const noexcept; - bool connect(const net::remote &); - bool disconnect(); + bool read_content(const const_buffer &, const_buffer &overrun); + bool read_head(const const_buffer &, const_buffer &overrun); - link(const net::remote &remote); -}; - -enum ircd::server::link::state -:int -{ - DEAD, - IDLE, - BUSY, + public: + mutable_buffer make_read_buffer() const; + bool read_buffer(const const_buffer &, const_buffer &overrun); + + tag(server::request &); + tag(tag &&) noexcept; + tag(const tag &) = delete; + tag &operator=(tag &&) noexcept; + tag &operator=(const tag &) = delete; + ~tag() noexcept; }; +/// Internal representation of a remote server. +/// struct ircd::server::node :std::enable_shared_from_this { - enum state :int; - + std::exception_ptr eptr; net::remote remote; - std::list tags; std::list links; + ctx::dock dock; - void add(const size_t &num = 1); - void del(const size_t &num = 1); + void handle_resolve(std::weak_ptr, std::exception_ptr, const ipport &); + void resolve(const hostport &); - node(net::remote remote); + public: + link &link_add(const size_t &num = 1); + void link_del(const size_t &num = 1); + link &link_get(); + + void cancel(request &); + void submit(request &); + + node(); ~node() noexcept; }; +/// Internal representation of a single connection to a remote. +/// +struct ircd::server::link +{ + bool init {false}; ///< link is connecting + bool fini {false}; ///< link is disconnecting + std::shared_ptr node; ///< backreference to node + std::exception_ptr eptr; ///< error from socket + std::shared_ptr socket; ///< link's socket + std::deque queue; ///< link's work queue + + bool connected() const noexcept; + bool ready() const; + bool busy() const; + + protected: + void handle_writable(const error_code &); + void wait_writable(); + + void handle_readable_success(); + void handle_readable(const error_code &); + void wait_readable(); + + void handle_close(std::exception_ptr); + void handle_open(std::exception_ptr); + + public: + bool close(const net::close_opts &); + bool open(const net::open_opts &); + + void cancel(request &); + void submit(request &); + + link(server::node &); + ~link() noexcept; +}; + +/// Subsystem initialization / destruction from ircd::main +/// struct ircd::server::init { void interrupt(); diff --git a/ircd/server.cc b/ircd/server.cc index dd868031f..b7ce4faa2 100644 --- a/ircd/server.cc +++ b/ircd/server.cc @@ -1,26 +1,75 @@ -/* - * Copyright (C) 2017 Charybdis Development Team - * Copyright (C) 2017 Jason Volk - * - * Permission to use, copy, modify, and/or distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice is present in all copies. - * - * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR - * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING - * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. +// +// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR +// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, +// INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING +// IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +#include #include +namespace ircd::server +{ + std::shared_ptr create(const net::hostport &); +} + +decltype(ircd::server::log) +ircd::server::log +{ + "server", 'S' +}; + +decltype(ircd::server::nodes) +ircd::server::nodes +{}; + +ircd::server::node & +ircd::server::get(const net::hostport &hostport) +{ + auto it(nodes.lower_bound(host(hostport))); + if(it == nodes.end() || it->first != host(hostport)) + { + auto node{create(hostport)}; + const string_view key{node->remote.hostname}; + it = nodes.emplace_hint(it, key, std::move(node)); + } + + return *it->second; +} + +std::shared_ptr +ircd::server::create(const net::hostport &hostport) +{ + auto node(std::make_shared()); + node->remote.hostname = std::string{host(hostport)}; + node->resolve(hostport); + return node; +} + +ircd::server::node & +ircd::server::find(const net::hostport &hostport) +{ + return *nodes.at(host(hostport)); +} + +bool +ircd::server::exists(const net::hostport &hostport) +{ + return nodes.find(host(hostport)) != end(nodes); +} + // // init // @@ -32,204 +81,303 @@ ircd::server::init::init() ircd::server::init::~init() noexcept { - ircd::server::nodes.clear(); + nodes.clear(); } void ircd::server::init::interrupt() { - ircd::server::nodes.clear(); + nodes.clear(); } -// -// server +/// +// request // -ircd::http::request::write_closure -ircd::write_closure(server &server) +ircd::server::request::request(const net::hostport &hostport, + server::out out, + server::in in) +:tag{nullptr} +,out{std::move(out)} +,in{std::move(in)} { - // returns a function that can be called to send an iovector of data to a server - return [&server](const ilist &iov) - { - //std::cout << "<<<< " << size(iov) << std::endl; - //std::cout << iov << std::endl; - //std::cout << "----" << std::endl; - write(server, iov); - }; + auto &node(server::get(hostport)); + node.submit(*this); } -ircd::parse::read_closure -ircd::read_closure(server &server) -{ - // Returns a function the parser can call when it wants more data - return [&server](char *&start, char *const &stop) - { - char *const s(start); - read(server, start, stop); - //std::cout << ">>>> " << std::distance(s, start) << std::endl; - //std::cout << string_view{s, start} << std::endl; - //std::cout << "----" << std::endl; - }; -} - -char * -ircd::read(server &server, - char *&start, - char *const &stop) -try -{ - auto &link - { - *begin(server.n->links) - }; - - auto &sock - { - *link.s - }; - - const std::array bufs - {{ - { start, stop } - }}; - - char *const base(start); - start += sock.read_some(bufs); - return base; -} -catch(const boost::system::system_error &e) -{ - using namespace boost::system::errc; - - log::error("read error: %s: %s", - string(server.n->remote), - string(e)); - - if(ircd::runlevel == ircd::runlevel::QUIT) - throw; - - switch(e.code().value()) - { - case SSL_R_SHORT_READ: - case boost::asio::error::eof: - { - auto &link{*begin(server.n->links)}; - link.disconnect(); - if(!link.connect(server.n->remote)) - throw; - - return read(server, start, stop); - } - - case operation_canceled: - throw http::error(http::REQUEST_TIMEOUT); - - default: - throw; - } -} - -size_t -ircd::write(server &server, - const ilist &iov) -try -{ - auto &sock - { - *(*begin(server.n->links)).s - }; - - return sock.write(iov); -} -catch(const boost::system::system_error &e) -{ - using namespace boost::system::errc; - - log::error("write error: %s %s", - string(server.n->remote), - string(e)); - - switch(e.code().value()) - { - case SSL_R_SHORT_READ: - case boost::asio::error::eof: - case protocol_error: - case broken_pipe: - { - auto &link{*begin(server.n->links)}; - link.disconnect(); - if(!link.connect(server.n->remote)) - throw; - - return write(server, iov); - } - - case operation_canceled: - throw http::error(http::REQUEST_TIMEOUT); - - default: - throw; - } -} - -// -// server -// - -ircd::server::server(net::remote remote) -:n{[&remote] -{ - const auto &ipp - { - static_cast(remote) - }; - - const auto it(nodes.lower_bound(ipp)); - if(it == nodes.end() || it->first != ipp) - { - const auto ipp{static_cast(remote)}; - const auto n{std::make_shared(std::move(remote))}; - nodes.emplace_hint(it, ipp, n); - return n; - } - - return it->second; -}()} -,it -{ - n->tags, n->tags.emplace(n->tags.end(), this) -} +ircd::server::request::request(request &&o) +noexcept +:ctx::future{std::move(o)} +,tag{std::move(o.tag)} +,out{std::move(o.out)} +,in{std::move(o.in)} { } -ircd::server::~server() +ircd::server::request & +ircd::server::request::operator=(request &&o) +noexcept +{ + ctx::future::operator=(std::move(o)); + tag = std::move(o.tag); + out = std::move(o.out); + in = std::move(o.in); + + assert(tag->request == &o); + tag->request = this; + + return *this; +} + +ircd::server::request::~request() noexcept { } -ircd::server::operator -const ircd::net::remote &() -const -{ - static const ircd::net::remote null_remote {}; - if(unlikely(!n)) - return null_remote; +// +// request::tag +// - return n->remote; +ircd::server::request::tag::tag(server::request &request) +:request{&request} +{ + static_cast &>(request) = p; } -decltype(ircd::server::nodes) -ircd::server::nodes -{}; +ircd::server::request::tag::tag(tag &&o) +noexcept +:request{std::move(o.request)} +{ + assert(o.request->tag == &o); + o.request = nullptr; + request->tag = this; +} + +struct ircd::server::request::tag & +ircd::server::request::tag::operator=(tag &&o) +noexcept +{ + request = std::move(o.request); + + assert(o.request->tag == &o); + o.request = nullptr; + request->tag = this; + + return *this; +} + +ircd::server::request::tag::~tag() +noexcept +{ +} + +bool +ircd::server::request::tag::read_buffer(const const_buffer &buffer, + const_buffer &overrun) +{ + assert(request); + return !request? true: + !request->head.status? read_head(buffer, overrun): + read_content(buffer, overrun); +} + +ircd::mutable_buffer +ircd::server::request::tag::make_read_buffer() +const +{ + assert(request); + return !request? mutable_buffer{}: + !request->head.status? make_head_buffer(): + make_content_buffer(); +} + +bool +ircd::server::request::tag::read_head(const const_buffer &buffer, + const_buffer &overrun) +{ + auto &req{*request}; + + // informal search for head terminator + static const string_view terminator{"\r\n\r\n"}; + const auto pos + { + string_view{data(buffer), size(buffer)}.find(terminator) + }; + + // No terminator found; account for what was received in this buffer + // for the next call to make_head_buffer() preparing for the subsequent + // invocation of this function with more data. + if(pos == string_view::npos) + { + head_read += size(buffer); + return false; + } + + // The given buffer may go past the end of the head and we may already + // have part of the head from a previous invocation. This indicates + // how much dome was just received from this buffer only, including the + // terminator which is considered part of the head. + const size_t addl_head_bytes + { + pos + size(terminator) + }; + assert(addl_head_bytes <= size(buffer)); + + // The head bytes accounting can be updated and this will be the final + // value of what is legitimate head in the req.in.head buffer. + head_read += addl_head_bytes; + + // Setup the capstan and mark the end of the tape + parse::buffer pb{mutable_buffer{data(req.in.head), head_read}}; + parse::capstan pc{pb}; + pc.read += head_read; + + // The HTTP head is parsed here and saved in the user's object but they + // do not know about it yet and shouldn't be touching it. + req.head = http::response::head{pc}; + assert(pb.completed() == head_read); + + // As stated, the buffer may contain data past the head, which includes + // our content or the next response which doesn't even belong to us. + const size_t overrun_length + { + size(buffer) - addl_head_bytes + }; + + // Calculate the amount of overrun which belongs to our content. + const size_t &content_read + { + std::min(req.head.content_length, overrun_length) + }; + + // Where the partial content would be written to + const const_buffer partial_content + { + data(req.in.head) + head_read, content_read + }; + + // Any partial content was written to the head buffer by accident, + // that has to be copied over to the content buffer. + this->content_read += copy(req.in.content, partial_content); + assert(this->content_read == size(partial_content)); + + // Anything remaining is not our response and must be given back + assert(overrun_length >= content_read); + overrun = const_buffer + { + data(req.in.head) + head_read + content_read, overrun_length - content_read + }; + + // When lucky, the content was recieved already (or there is no content) and + // we can notify the user in one shot. + if(this->content_read == req.head.content_length) + { + p.set_value(http::status(req.head.status)); + return true; + } + + return false; +} + +bool +ircd::server::request::tag::read_content(const const_buffer &buffer, + const_buffer &overrun) +{ + auto &req{*request}; + + // The amount of remaining content for the response sequence + assert(req.head.content_length >= content_read); + const size_t remaining + { + req.head.content_length - content_read + }; + + // The amount of content read in this buffer only. + const size_t addl_content_read + { + std::min(size(buffer), remaining) + }; + + const size_t overrun_length + { + size(buffer) - addl_content_read + }; + + content_read += addl_content_read; + + // Report anything that doesn't belong to us. + overrun = const_buffer + { + data(req.in.content) + content_read, overrun_length + }; + + assert(content_read <= req.head.content_length); + if(content_read == req.head.content_length) + { + p.set_value(http::status(req.head.status)); + return true; + } + else return false; +} + +ircd::mutable_buffer +ircd::server::request::tag::make_head_buffer() +const +{ + const auto &req{*request}; + if(head_read >= size(req.in.head)) + return {}; + + const size_t remaining + { + size(req.in.head) - head_read + }; + + assert(remaining <= size(req.in.head)); + const mutable_buffer buffer + { + data(req.in.head) + head_read, remaining + }; + + return buffer; +} + +ircd::mutable_buffer +ircd::server::request::tag::make_content_buffer() +const +{ + auto &req{*request}; + + // The amount of bytes we still have to read to for the response + assert(req.head.content_length >= content_read); + const size_t remaining + { + req.head.content_length - content_read + }; + + // The amount of bytes available in the user's buffer. + assert(size(req.in.content) >= content_read); + const size_t available + { + size(req.in.content) - content_read + }; + + // For now, this has to trip right here. + assert(available >= remaining); + const mutable_buffer buffer + { + data(req.in.content) + content_read, std::min(available, remaining) + }; + + return buffer; +} // // node // -ircd::server::node::node(net::remote remote) -:remote{std::move(remote)} +ircd::server::node::node() { - add(1); } ircd::server::node::~node() @@ -238,46 +386,271 @@ noexcept } void -ircd::server::node::add(const size_t &num) +ircd::server::node::submit(request &request) { - links.emplace_back(remote); + link &ret(link_get()); + ret.submit(request); } void -ircd::server::node::del(const size_t &num) +ircd::server::node::cancel(request &request) { } +ircd::server::link & +ircd::server::node::link_get() +{ + while(!remote.resolved()) + dock.wait(); + + if(links.empty()) + { + auto &ret(link_add(1)); + while(!ret.ready()) + dock.wait(); + + return ret; + } + else + return links.back(); +} + +ircd::server::link & +ircd::server::node::link_add(const size_t &num) +{ + links.emplace_back(*this); + auto &link{links.back()}; + link.open(remote); + return link; +} + +void +ircd::server::node::link_del(const size_t &num) +{ +} + +void +ircd::server::node::resolve(const hostport &hostport) +{ + auto handler + { + std::bind(&node::handle_resolve, this, weak_from(*this), ph::_1, ph::_2) + }; + + net::resolve(hostport, std::move(handler)); +} + +void +ircd::server::node::handle_resolve(std::weak_ptr wp, + std::exception_ptr eptr, + const ipport &ipport) +try +{ + const life_guard lg(wp); + + this->eptr = std::move(eptr); + static_cast(this->remote) = ipport; + + dock.notify_all(); +} +catch(const std::bad_weak_ptr &) +{ + return; +} + // // link // -ircd::server::link::link(const net::remote &remote) -:s{std::make_shared()} -,state{state::DEAD} +ircd::server::link::link(server::node &node) +:node{shared_from(node)} { - connect(remote); +} + +ircd::server::link::~link() +noexcept +{ + assert(!busy()); + assert(!connected()); +} + +void +ircd::server::link::submit(request &request) +{ + const auto it + { + queue.emplace(end(queue), request) + }; + + write(*socket, request.out.head); + write(*socket, request.out.content); +} + +void +ircd::server::link::cancel(request &request) +{ + } bool -ircd::server::link::disconnect() +ircd::server::link::open(const net::open_opts &open_opts) { - return net::disconnect(*s); -} - -bool -ircd::server::link::connect(const net::remote &remote) -{ - if(connected()) + if(init) return false; - s->connect(remote, 30000ms); + auto handler + { + std::bind(&link::handle_open, this, ph::_1) + }; + + init = true; + fini = false; + socket = net::open(open_opts, std::move(handler)); return true; } +void +ircd::server::link::handle_open(std::exception_ptr eptr) +{ + eptr = std::move(eptr); + init = false; + node->dock.notify_all(); + + if(!eptr) + wait_readable(); +} + +bool +ircd::server::link::close(const net::close_opts &close_opts) +{ + if(!socket) + return false; + + if(fini) + return false; + + auto handler + { + std::bind(&link::handle_close, this, ph::_1) + }; + + init = false; + fini = true; + net::close(*socket, close_opts, std::move(handler)); + return true; +} + +void +ircd::server::link::handle_close(std::exception_ptr eptr) +{ + eptr = std::move(eptr); + fini = false; +} + +void +ircd::server::link::wait_readable() +{ + auto handler + { + std::bind(&link::handle_readable, this, ph::_1) + }; + + assert(ready()); + net::wait(*socket, net::ready::READ, std::move(handler)); +} + +void +ircd::server::link::wait_writable() +{ + auto handler + { + std::bind(&link::handle_writable, this, ph::_1) + }; + + assert(ready()); + net::wait(*socket, net::ready::WRITE, std::move(handler)); +} + +void +ircd::server::link::handle_readable(const error_code &ec) +{ + using namespace boost::system::errc; + using boost::system::system_category; + + switch(ec.value()) + { + case success: + handle_readable_success(); + wait_readable(); + break; + + default: + throw boost::system::system_error{ec}; + } +} + +void +ircd::server::link::handle_readable_success() +{ + if(queue.empty()) + throw std::out_of_range("queue empty"); + + auto &tag + { + queue.front() + }; + + const mutable_buffer buffer + { + tag.make_read_buffer() + }; + + const size_t bytes + { + read_one(*socket, buffer) + }; + + const const_buffer received + { + data(buffer), bytes + }; + + const_buffer overrun; + const bool done + { + tag.read_buffer(received, overrun) + }; + + if(!empty(overrun)) + std::cout << "got overrun: " << size(overrun) << std::endl; + + if(done) + queue.pop_front(); +} + +void +ircd::server::link::handle_writable(const error_code &ec) +{ + +} + +bool +ircd::server::link::busy() +const +{ + return !queue.empty(); +} + +bool +ircd::server::link::ready() +const +{ + return connected() && !init && !fini && !eptr; +} + bool ircd::server::link::connected() const noexcept { - return net::connected(*s); + return bool(socket) && net::connected(*socket); }