mirror of
https://github.com/matrix-construct/construct
synced 2025-02-18 09:40:12 +01:00
ircd::server: Preliminary client request pipeline framework.
This commit is contained in:
parent
0f0c50017a
commit
fa3afc7ad7
2 changed files with 719 additions and 291 deletions
|
@ -1,120 +1,175 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Charybdis Development Team
|
||||
* Copyright (C) 2017 Jason Volk <jason@zemos.net>
|
||||
*
|
||||
* Permission to use, copy, modify, and/or distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
* copyright notice and this permission notice is present in all copies.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
||||
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
|
||||
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
||||
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
// Copyright (C) Matrix Construct Developers, Authors & Contributors
|
||||
// Copyright (C) 2016-2018 Jason Volk
|
||||
//
|
||||
// Permission to use, copy, modify, and/or distribute this software for any
|
||||
// purpose with or without fee is hereby granted, provided that the above
|
||||
// copyright notice and this permission notice is present in all copies.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
||||
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
// DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
|
||||
// INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
||||
// IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
// POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
#define HAVE_IRCD_SERVER_H
|
||||
|
||||
namespace ircd
|
||||
{
|
||||
struct server;
|
||||
|
||||
size_t write(server &, const ilist<const_buffer> &iov);
|
||||
char *read(server &, char *&start, char *const &stop);
|
||||
|
||||
http::request::write_closure write_closure(server &);
|
||||
parse::read_closure read_closure(server &);
|
||||
}
|
||||
|
||||
/// The interface for when IRCd plays the role of client to other servers
|
||||
///
|
||||
/// This is a handle for being a client to another server. This class is
|
||||
/// named ircd::server and not ircd::client because this project is IRCd
|
||||
/// and not IRCc. This handle will attempt to find an existing connection
|
||||
/// pool for the remote server and then multiplex your requests and demultiplex
|
||||
/// your responses to achieve concurrency and sharing and limitations and
|
||||
/// shape the pipeline as best as the pool allows along with other instances
|
||||
/// of ircd::server for the same remote.
|
||||
///
|
||||
/// Note that this means ircd::server is only appropriate for stateless
|
||||
/// protocols like HTTP and chunked-encoding should be avoided if we are
|
||||
/// to get the best out of nagle'ing the pipe. Individual net::socket should
|
||||
/// be used otherwise.
|
||||
///
|
||||
/// This handle is a "tag" which services a single request and response per
|
||||
/// instance to an ircd::server::node over one ircd::server::link available
|
||||
/// to that node. Those interfaces are internal and don't have to be messed
|
||||
/// with.
|
||||
///
|
||||
/// Instances of this class can be used in arrays to make bulk requests.
|
||||
///
|
||||
struct ircd::server
|
||||
namespace ircd::server
|
||||
{
|
||||
struct init;
|
||||
struct node;
|
||||
struct link;
|
||||
struct request;
|
||||
struct out;
|
||||
struct in;
|
||||
|
||||
static std::map<net::ipport, std::shared_ptr<node>> nodes;
|
||||
IRCD_EXCEPTION(ircd::error, error);
|
||||
|
||||
std::shared_ptr<node> n;
|
||||
unique_iterator<std::list<server *>> it;
|
||||
extern ircd::log::log log;
|
||||
extern std::map<string_view, std::shared_ptr<node>> nodes;
|
||||
|
||||
bool exists(const net::hostport &);
|
||||
node &find(const net::hostport &);
|
||||
node &get(const net::hostport &);
|
||||
}
|
||||
|
||||
struct ircd::server::out
|
||||
{
|
||||
const_buffer head;
|
||||
const_buffer content;
|
||||
};
|
||||
|
||||
struct ircd::server::in
|
||||
{
|
||||
mutable_buffer head;
|
||||
mutable_buffer content;
|
||||
};
|
||||
|
||||
/// This is a handle for being a client to another server. This handle will
|
||||
/// attempt to find an existing connection pool for the remote server otherwise
|
||||
/// one will be created. Then it will multiplex your requests and demultiplex
|
||||
/// your responses.
|
||||
///
|
||||
struct ircd::server::request
|
||||
:ctx::future<http::code>
|
||||
{
|
||||
struct tag;
|
||||
|
||||
struct tag *tag {nullptr};
|
||||
http::response::head head;
|
||||
|
||||
public:
|
||||
operator const net::remote &() const;
|
||||
server::out out;
|
||||
server::in in;
|
||||
|
||||
server() = default;
|
||||
server(server &&) noexcept = default;
|
||||
server &operator=(server &&) noexcept = default;
|
||||
server(net::remote);
|
||||
~server() noexcept;
|
||||
request(const net::hostport &, server::out out, server::in in);
|
||||
request() = default;
|
||||
request(request &&) noexcept;
|
||||
request(const request &) = delete;
|
||||
request &operator=(request &&) noexcept;
|
||||
request &operator=(const request &) = delete;
|
||||
~request() noexcept;
|
||||
};
|
||||
|
||||
struct ircd::server::link
|
||||
/// Internal portion of the request
|
||||
//
|
||||
struct ircd::server::request::tag
|
||||
{
|
||||
enum state :int;
|
||||
server::request *request;
|
||||
ctx::promise<http::code> p;
|
||||
size_t head_read {0};
|
||||
size_t content_read {0};
|
||||
|
||||
std::shared_ptr<net::socket> s;
|
||||
std::deque<server *> q;
|
||||
enum state state;
|
||||
mutable_buffer make_content_buffer() const;
|
||||
mutable_buffer make_head_buffer() const;
|
||||
|
||||
bool connected() const noexcept;
|
||||
bool connect(const net::remote &);
|
||||
bool disconnect();
|
||||
bool read_content(const const_buffer &, const_buffer &overrun);
|
||||
bool read_head(const const_buffer &, const_buffer &overrun);
|
||||
|
||||
link(const net::remote &remote);
|
||||
};
|
||||
|
||||
enum ircd::server::link::state
|
||||
:int
|
||||
{
|
||||
DEAD,
|
||||
IDLE,
|
||||
BUSY,
|
||||
public:
|
||||
mutable_buffer make_read_buffer() const;
|
||||
bool read_buffer(const const_buffer &, const_buffer &overrun);
|
||||
|
||||
tag(server::request &);
|
||||
tag(tag &&) noexcept;
|
||||
tag(const tag &) = delete;
|
||||
tag &operator=(tag &&) noexcept;
|
||||
tag &operator=(const tag &) = delete;
|
||||
~tag() noexcept;
|
||||
};
|
||||
|
||||
/// Internal representation of a remote server.
|
||||
///
|
||||
struct ircd::server::node
|
||||
:std::enable_shared_from_this<ircd::server::node>
|
||||
{
|
||||
enum state :int;
|
||||
|
||||
std::exception_ptr eptr;
|
||||
net::remote remote;
|
||||
std::list<server *> tags;
|
||||
std::list<link> links;
|
||||
ctx::dock dock;
|
||||
|
||||
void add(const size_t &num = 1);
|
||||
void del(const size_t &num = 1);
|
||||
void handle_resolve(std::weak_ptr<node>, std::exception_ptr, const ipport &);
|
||||
void resolve(const hostport &);
|
||||
|
||||
node(net::remote remote);
|
||||
public:
|
||||
link &link_add(const size_t &num = 1);
|
||||
void link_del(const size_t &num = 1);
|
||||
link &link_get();
|
||||
|
||||
void cancel(request &);
|
||||
void submit(request &);
|
||||
|
||||
node();
|
||||
~node() noexcept;
|
||||
};
|
||||
|
||||
/// Internal representation of a single connection to a remote.
|
||||
///
|
||||
struct ircd::server::link
|
||||
{
|
||||
bool init {false}; ///< link is connecting
|
||||
bool fini {false}; ///< link is disconnecting
|
||||
std::shared_ptr<server::node> node; ///< backreference to node
|
||||
std::exception_ptr eptr; ///< error from socket
|
||||
std::shared_ptr<net::socket> socket; ///< link's socket
|
||||
std::deque<struct request::tag> queue; ///< link's work queue
|
||||
|
||||
bool connected() const noexcept;
|
||||
bool ready() const;
|
||||
bool busy() const;
|
||||
|
||||
protected:
|
||||
void handle_writable(const error_code &);
|
||||
void wait_writable();
|
||||
|
||||
void handle_readable_success();
|
||||
void handle_readable(const error_code &);
|
||||
void wait_readable();
|
||||
|
||||
void handle_close(std::exception_ptr);
|
||||
void handle_open(std::exception_ptr);
|
||||
|
||||
public:
|
||||
bool close(const net::close_opts &);
|
||||
bool open(const net::open_opts &);
|
||||
|
||||
void cancel(request &);
|
||||
void submit(request &);
|
||||
|
||||
link(server::node &);
|
||||
~link() noexcept;
|
||||
};
|
||||
|
||||
/// Subsystem initialization / destruction from ircd::main
|
||||
///
|
||||
struct ircd::server::init
|
||||
{
|
||||
void interrupt();
|
||||
|
|
787
ircd/server.cc
787
ircd/server.cc
|
@ -1,26 +1,75 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Charybdis Development Team
|
||||
* Copyright (C) 2017 Jason Volk <jason@zemos.net>
|
||||
*
|
||||
* Permission to use, copy, modify, and/or distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
* copyright notice and this permission notice is present in all copies.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
||||
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
|
||||
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
||||
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
// Copyright (C) Matrix Construct Developers, Authors & Contributors
|
||||
// Copyright (C) 2016-2018 Jason Volk
|
||||
//
|
||||
// Permission to use, copy, modify, and/or distribute this software for any
|
||||
// purpose with or without fee is hereby granted, provided that the above
|
||||
// copyright notice and this permission notice is present in all copies.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
||||
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
// DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
|
||||
// INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
||||
// IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
// POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#include <ircd/server.h>
|
||||
#include <ircd/asio.h>
|
||||
|
||||
namespace ircd::server
|
||||
{
|
||||
std::shared_ptr<node> create(const net::hostport &);
|
||||
}
|
||||
|
||||
decltype(ircd::server::log)
|
||||
ircd::server::log
|
||||
{
|
||||
"server", 'S'
|
||||
};
|
||||
|
||||
decltype(ircd::server::nodes)
|
||||
ircd::server::nodes
|
||||
{};
|
||||
|
||||
ircd::server::node &
|
||||
ircd::server::get(const net::hostport &hostport)
|
||||
{
|
||||
auto it(nodes.lower_bound(host(hostport)));
|
||||
if(it == nodes.end() || it->first != host(hostport))
|
||||
{
|
||||
auto node{create(hostport)};
|
||||
const string_view key{node->remote.hostname};
|
||||
it = nodes.emplace_hint(it, key, std::move(node));
|
||||
}
|
||||
|
||||
return *it->second;
|
||||
}
|
||||
|
||||
std::shared_ptr<ircd::server::node>
|
||||
ircd::server::create(const net::hostport &hostport)
|
||||
{
|
||||
auto node(std::make_shared<node>());
|
||||
node->remote.hostname = std::string{host(hostport)};
|
||||
node->resolve(hostport);
|
||||
return node;
|
||||
}
|
||||
|
||||
ircd::server::node &
|
||||
ircd::server::find(const net::hostport &hostport)
|
||||
{
|
||||
return *nodes.at(host(hostport));
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::exists(const net::hostport &hostport)
|
||||
{
|
||||
return nodes.find(host(hostport)) != end(nodes);
|
||||
}
|
||||
|
||||
//
|
||||
// init
|
||||
//
|
||||
|
@ -32,204 +81,303 @@ ircd::server::init::init()
|
|||
ircd::server::init::~init()
|
||||
noexcept
|
||||
{
|
||||
ircd::server::nodes.clear();
|
||||
nodes.clear();
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::init::interrupt()
|
||||
{
|
||||
ircd::server::nodes.clear();
|
||||
nodes.clear();
|
||||
}
|
||||
|
||||
//
|
||||
// server
|
||||
///
|
||||
// request
|
||||
//
|
||||
|
||||
ircd::http::request::write_closure
|
||||
ircd::write_closure(server &server)
|
||||
ircd::server::request::request(const net::hostport &hostport,
|
||||
server::out out,
|
||||
server::in in)
|
||||
:tag{nullptr}
|
||||
,out{std::move(out)}
|
||||
,in{std::move(in)}
|
||||
{
|
||||
// returns a function that can be called to send an iovector of data to a server
|
||||
return [&server](const ilist<const_buffer> &iov)
|
||||
{
|
||||
//std::cout << "<<<< " << size(iov) << std::endl;
|
||||
//std::cout << iov << std::endl;
|
||||
//std::cout << "----" << std::endl;
|
||||
write(server, iov);
|
||||
};
|
||||
auto &node(server::get(hostport));
|
||||
node.submit(*this);
|
||||
}
|
||||
|
||||
ircd::parse::read_closure
|
||||
ircd::read_closure(server &server)
|
||||
{
|
||||
// Returns a function the parser can call when it wants more data
|
||||
return [&server](char *&start, char *const &stop)
|
||||
{
|
||||
char *const s(start);
|
||||
read(server, start, stop);
|
||||
//std::cout << ">>>> " << std::distance(s, start) << std::endl;
|
||||
//std::cout << string_view{s, start} << std::endl;
|
||||
//std::cout << "----" << std::endl;
|
||||
};
|
||||
}
|
||||
|
||||
char *
|
||||
ircd::read(server &server,
|
||||
char *&start,
|
||||
char *const &stop)
|
||||
try
|
||||
{
|
||||
auto &link
|
||||
{
|
||||
*begin(server.n->links)
|
||||
};
|
||||
|
||||
auto &sock
|
||||
{
|
||||
*link.s
|
||||
};
|
||||
|
||||
const std::array<mutable_buffer, 1> bufs
|
||||
{{
|
||||
{ start, stop }
|
||||
}};
|
||||
|
||||
char *const base(start);
|
||||
start += sock.read_some(bufs);
|
||||
return base;
|
||||
}
|
||||
catch(const boost::system::system_error &e)
|
||||
{
|
||||
using namespace boost::system::errc;
|
||||
|
||||
log::error("read error: %s: %s",
|
||||
string(server.n->remote),
|
||||
string(e));
|
||||
|
||||
if(ircd::runlevel == ircd::runlevel::QUIT)
|
||||
throw;
|
||||
|
||||
switch(e.code().value())
|
||||
{
|
||||
case SSL_R_SHORT_READ:
|
||||
case boost::asio::error::eof:
|
||||
{
|
||||
auto &link{*begin(server.n->links)};
|
||||
link.disconnect();
|
||||
if(!link.connect(server.n->remote))
|
||||
throw;
|
||||
|
||||
return read(server, start, stop);
|
||||
}
|
||||
|
||||
case operation_canceled:
|
||||
throw http::error(http::REQUEST_TIMEOUT);
|
||||
|
||||
default:
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
size_t
|
||||
ircd::write(server &server,
|
||||
const ilist<const_buffer> &iov)
|
||||
try
|
||||
{
|
||||
auto &sock
|
||||
{
|
||||
*(*begin(server.n->links)).s
|
||||
};
|
||||
|
||||
return sock.write(iov);
|
||||
}
|
||||
catch(const boost::system::system_error &e)
|
||||
{
|
||||
using namespace boost::system::errc;
|
||||
|
||||
log::error("write error: %s %s",
|
||||
string(server.n->remote),
|
||||
string(e));
|
||||
|
||||
switch(e.code().value())
|
||||
{
|
||||
case SSL_R_SHORT_READ:
|
||||
case boost::asio::error::eof:
|
||||
case protocol_error:
|
||||
case broken_pipe:
|
||||
{
|
||||
auto &link{*begin(server.n->links)};
|
||||
link.disconnect();
|
||||
if(!link.connect(server.n->remote))
|
||||
throw;
|
||||
|
||||
return write(server, iov);
|
||||
}
|
||||
|
||||
case operation_canceled:
|
||||
throw http::error(http::REQUEST_TIMEOUT);
|
||||
|
||||
default:
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// server
|
||||
//
|
||||
|
||||
ircd::server::server(net::remote remote)
|
||||
:n{[&remote]
|
||||
{
|
||||
const auto &ipp
|
||||
{
|
||||
static_cast<const net::ipport &>(remote)
|
||||
};
|
||||
|
||||
const auto it(nodes.lower_bound(ipp));
|
||||
if(it == nodes.end() || it->first != ipp)
|
||||
{
|
||||
const auto ipp{static_cast<net::ipport>(remote)};
|
||||
const auto n{std::make_shared<node>(std::move(remote))};
|
||||
nodes.emplace_hint(it, ipp, n);
|
||||
return n;
|
||||
}
|
||||
|
||||
return it->second;
|
||||
}()}
|
||||
,it
|
||||
{
|
||||
n->tags, n->tags.emplace(n->tags.end(), this)
|
||||
}
|
||||
ircd::server::request::request(request &&o)
|
||||
noexcept
|
||||
:ctx::future<http::code>{std::move(o)}
|
||||
,tag{std::move(o.tag)}
|
||||
,out{std::move(o.out)}
|
||||
,in{std::move(o.in)}
|
||||
{
|
||||
}
|
||||
|
||||
ircd::server::~server()
|
||||
ircd::server::request &
|
||||
ircd::server::request::operator=(request &&o)
|
||||
noexcept
|
||||
{
|
||||
ctx::future<http::code>::operator=(std::move(o));
|
||||
tag = std::move(o.tag);
|
||||
out = std::move(o.out);
|
||||
in = std::move(o.in);
|
||||
|
||||
assert(tag->request == &o);
|
||||
tag->request = this;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
ircd::server::request::~request()
|
||||
noexcept
|
||||
{
|
||||
}
|
||||
|
||||
ircd::server::operator
|
||||
const ircd::net::remote &()
|
||||
const
|
||||
{
|
||||
static const ircd::net::remote null_remote {};
|
||||
if(unlikely(!n))
|
||||
return null_remote;
|
||||
//
|
||||
// request::tag
|
||||
//
|
||||
|
||||
return n->remote;
|
||||
ircd::server::request::tag::tag(server::request &request)
|
||||
:request{&request}
|
||||
{
|
||||
static_cast<ctx::future<http::code> &>(request) = p;
|
||||
}
|
||||
|
||||
decltype(ircd::server::nodes)
|
||||
ircd::server::nodes
|
||||
{};
|
||||
ircd::server::request::tag::tag(tag &&o)
|
||||
noexcept
|
||||
:request{std::move(o.request)}
|
||||
{
|
||||
assert(o.request->tag == &o);
|
||||
o.request = nullptr;
|
||||
request->tag = this;
|
||||
}
|
||||
|
||||
struct ircd::server::request::tag &
|
||||
ircd::server::request::tag::operator=(tag &&o)
|
||||
noexcept
|
||||
{
|
||||
request = std::move(o.request);
|
||||
|
||||
assert(o.request->tag == &o);
|
||||
o.request = nullptr;
|
||||
request->tag = this;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
ircd::server::request::tag::~tag()
|
||||
noexcept
|
||||
{
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::request::tag::read_buffer(const const_buffer &buffer,
|
||||
const_buffer &overrun)
|
||||
{
|
||||
assert(request);
|
||||
return !request? true:
|
||||
!request->head.status? read_head(buffer, overrun):
|
||||
read_content(buffer, overrun);
|
||||
}
|
||||
|
||||
ircd::mutable_buffer
|
||||
ircd::server::request::tag::make_read_buffer()
|
||||
const
|
||||
{
|
||||
assert(request);
|
||||
return !request? mutable_buffer{}:
|
||||
!request->head.status? make_head_buffer():
|
||||
make_content_buffer();
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::request::tag::read_head(const const_buffer &buffer,
|
||||
const_buffer &overrun)
|
||||
{
|
||||
auto &req{*request};
|
||||
|
||||
// informal search for head terminator
|
||||
static const string_view terminator{"\r\n\r\n"};
|
||||
const auto pos
|
||||
{
|
||||
string_view{data(buffer), size(buffer)}.find(terminator)
|
||||
};
|
||||
|
||||
// No terminator found; account for what was received in this buffer
|
||||
// for the next call to make_head_buffer() preparing for the subsequent
|
||||
// invocation of this function with more data.
|
||||
if(pos == string_view::npos)
|
||||
{
|
||||
head_read += size(buffer);
|
||||
return false;
|
||||
}
|
||||
|
||||
// The given buffer may go past the end of the head and we may already
|
||||
// have part of the head from a previous invocation. This indicates
|
||||
// how much dome was just received from this buffer only, including the
|
||||
// terminator which is considered part of the head.
|
||||
const size_t addl_head_bytes
|
||||
{
|
||||
pos + size(terminator)
|
||||
};
|
||||
assert(addl_head_bytes <= size(buffer));
|
||||
|
||||
// The head bytes accounting can be updated and this will be the final
|
||||
// value of what is legitimate head in the req.in.head buffer.
|
||||
head_read += addl_head_bytes;
|
||||
|
||||
// Setup the capstan and mark the end of the tape
|
||||
parse::buffer pb{mutable_buffer{data(req.in.head), head_read}};
|
||||
parse::capstan pc{pb};
|
||||
pc.read += head_read;
|
||||
|
||||
// The HTTP head is parsed here and saved in the user's object but they
|
||||
// do not know about it yet and shouldn't be touching it.
|
||||
req.head = http::response::head{pc};
|
||||
assert(pb.completed() == head_read);
|
||||
|
||||
// As stated, the buffer may contain data past the head, which includes
|
||||
// our content or the next response which doesn't even belong to us.
|
||||
const size_t overrun_length
|
||||
{
|
||||
size(buffer) - addl_head_bytes
|
||||
};
|
||||
|
||||
// Calculate the amount of overrun which belongs to our content.
|
||||
const size_t &content_read
|
||||
{
|
||||
std::min(req.head.content_length, overrun_length)
|
||||
};
|
||||
|
||||
// Where the partial content would be written to
|
||||
const const_buffer partial_content
|
||||
{
|
||||
data(req.in.head) + head_read, content_read
|
||||
};
|
||||
|
||||
// Any partial content was written to the head buffer by accident,
|
||||
// that has to be copied over to the content buffer.
|
||||
this->content_read += copy(req.in.content, partial_content);
|
||||
assert(this->content_read == size(partial_content));
|
||||
|
||||
// Anything remaining is not our response and must be given back
|
||||
assert(overrun_length >= content_read);
|
||||
overrun = const_buffer
|
||||
{
|
||||
data(req.in.head) + head_read + content_read, overrun_length - content_read
|
||||
};
|
||||
|
||||
// When lucky, the content was recieved already (or there is no content) and
|
||||
// we can notify the user in one shot.
|
||||
if(this->content_read == req.head.content_length)
|
||||
{
|
||||
p.set_value(http::status(req.head.status));
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::request::tag::read_content(const const_buffer &buffer,
|
||||
const_buffer &overrun)
|
||||
{
|
||||
auto &req{*request};
|
||||
|
||||
// The amount of remaining content for the response sequence
|
||||
assert(req.head.content_length >= content_read);
|
||||
const size_t remaining
|
||||
{
|
||||
req.head.content_length - content_read
|
||||
};
|
||||
|
||||
// The amount of content read in this buffer only.
|
||||
const size_t addl_content_read
|
||||
{
|
||||
std::min(size(buffer), remaining)
|
||||
};
|
||||
|
||||
const size_t overrun_length
|
||||
{
|
||||
size(buffer) - addl_content_read
|
||||
};
|
||||
|
||||
content_read += addl_content_read;
|
||||
|
||||
// Report anything that doesn't belong to us.
|
||||
overrun = const_buffer
|
||||
{
|
||||
data(req.in.content) + content_read, overrun_length
|
||||
};
|
||||
|
||||
assert(content_read <= req.head.content_length);
|
||||
if(content_read == req.head.content_length)
|
||||
{
|
||||
p.set_value(http::status(req.head.status));
|
||||
return true;
|
||||
}
|
||||
else return false;
|
||||
}
|
||||
|
||||
ircd::mutable_buffer
|
||||
ircd::server::request::tag::make_head_buffer()
|
||||
const
|
||||
{
|
||||
const auto &req{*request};
|
||||
if(head_read >= size(req.in.head))
|
||||
return {};
|
||||
|
||||
const size_t remaining
|
||||
{
|
||||
size(req.in.head) - head_read
|
||||
};
|
||||
|
||||
assert(remaining <= size(req.in.head));
|
||||
const mutable_buffer buffer
|
||||
{
|
||||
data(req.in.head) + head_read, remaining
|
||||
};
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
ircd::mutable_buffer
|
||||
ircd::server::request::tag::make_content_buffer()
|
||||
const
|
||||
{
|
||||
auto &req{*request};
|
||||
|
||||
// The amount of bytes we still have to read to for the response
|
||||
assert(req.head.content_length >= content_read);
|
||||
const size_t remaining
|
||||
{
|
||||
req.head.content_length - content_read
|
||||
};
|
||||
|
||||
// The amount of bytes available in the user's buffer.
|
||||
assert(size(req.in.content) >= content_read);
|
||||
const size_t available
|
||||
{
|
||||
size(req.in.content) - content_read
|
||||
};
|
||||
|
||||
// For now, this has to trip right here.
|
||||
assert(available >= remaining);
|
||||
const mutable_buffer buffer
|
||||
{
|
||||
data(req.in.content) + content_read, std::min(available, remaining)
|
||||
};
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
//
|
||||
// node
|
||||
//
|
||||
|
||||
ircd::server::node::node(net::remote remote)
|
||||
:remote{std::move(remote)}
|
||||
ircd::server::node::node()
|
||||
{
|
||||
add(1);
|
||||
}
|
||||
|
||||
ircd::server::node::~node()
|
||||
|
@ -238,46 +386,271 @@ noexcept
|
|||
}
|
||||
|
||||
void
|
||||
ircd::server::node::add(const size_t &num)
|
||||
ircd::server::node::submit(request &request)
|
||||
{
|
||||
links.emplace_back(remote);
|
||||
link &ret(link_get());
|
||||
ret.submit(request);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::node::del(const size_t &num)
|
||||
ircd::server::node::cancel(request &request)
|
||||
{
|
||||
}
|
||||
|
||||
ircd::server::link &
|
||||
ircd::server::node::link_get()
|
||||
{
|
||||
while(!remote.resolved())
|
||||
dock.wait();
|
||||
|
||||
if(links.empty())
|
||||
{
|
||||
auto &ret(link_add(1));
|
||||
while(!ret.ready())
|
||||
dock.wait();
|
||||
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
return links.back();
|
||||
}
|
||||
|
||||
ircd::server::link &
|
||||
ircd::server::node::link_add(const size_t &num)
|
||||
{
|
||||
links.emplace_back(*this);
|
||||
auto &link{links.back()};
|
||||
link.open(remote);
|
||||
return link;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::node::link_del(const size_t &num)
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::node::resolve(const hostport &hostport)
|
||||
{
|
||||
auto handler
|
||||
{
|
||||
std::bind(&node::handle_resolve, this, weak_from(*this), ph::_1, ph::_2)
|
||||
};
|
||||
|
||||
net::resolve(hostport, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::node::handle_resolve(std::weak_ptr<node> wp,
|
||||
std::exception_ptr eptr,
|
||||
const ipport &ipport)
|
||||
try
|
||||
{
|
||||
const life_guard<node> lg(wp);
|
||||
|
||||
this->eptr = std::move(eptr);
|
||||
static_cast<net::ipport &>(this->remote) = ipport;
|
||||
|
||||
dock.notify_all();
|
||||
}
|
||||
catch(const std::bad_weak_ptr &)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
//
|
||||
// link
|
||||
//
|
||||
|
||||
ircd::server::link::link(const net::remote &remote)
|
||||
:s{std::make_shared<net::socket>()}
|
||||
,state{state::DEAD}
|
||||
ircd::server::link::link(server::node &node)
|
||||
:node{shared_from(node)}
|
||||
{
|
||||
connect(remote);
|
||||
}
|
||||
|
||||
ircd::server::link::~link()
|
||||
noexcept
|
||||
{
|
||||
assert(!busy());
|
||||
assert(!connected());
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::submit(request &request)
|
||||
{
|
||||
const auto it
|
||||
{
|
||||
queue.emplace(end(queue), request)
|
||||
};
|
||||
|
||||
write(*socket, request.out.head);
|
||||
write(*socket, request.out.content);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::cancel(request &request)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::link::disconnect()
|
||||
ircd::server::link::open(const net::open_opts &open_opts)
|
||||
{
|
||||
return net::disconnect(*s);
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::link::connect(const net::remote &remote)
|
||||
{
|
||||
if(connected())
|
||||
if(init)
|
||||
return false;
|
||||
|
||||
s->connect(remote, 30000ms);
|
||||
auto handler
|
||||
{
|
||||
std::bind(&link::handle_open, this, ph::_1)
|
||||
};
|
||||
|
||||
init = true;
|
||||
fini = false;
|
||||
socket = net::open(open_opts, std::move(handler));
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::handle_open(std::exception_ptr eptr)
|
||||
{
|
||||
eptr = std::move(eptr);
|
||||
init = false;
|
||||
node->dock.notify_all();
|
||||
|
||||
if(!eptr)
|
||||
wait_readable();
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::link::close(const net::close_opts &close_opts)
|
||||
{
|
||||
if(!socket)
|
||||
return false;
|
||||
|
||||
if(fini)
|
||||
return false;
|
||||
|
||||
auto handler
|
||||
{
|
||||
std::bind(&link::handle_close, this, ph::_1)
|
||||
};
|
||||
|
||||
init = false;
|
||||
fini = true;
|
||||
net::close(*socket, close_opts, std::move(handler));
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::handle_close(std::exception_ptr eptr)
|
||||
{
|
||||
eptr = std::move(eptr);
|
||||
fini = false;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::wait_readable()
|
||||
{
|
||||
auto handler
|
||||
{
|
||||
std::bind(&link::handle_readable, this, ph::_1)
|
||||
};
|
||||
|
||||
assert(ready());
|
||||
net::wait(*socket, net::ready::READ, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::wait_writable()
|
||||
{
|
||||
auto handler
|
||||
{
|
||||
std::bind(&link::handle_writable, this, ph::_1)
|
||||
};
|
||||
|
||||
assert(ready());
|
||||
net::wait(*socket, net::ready::WRITE, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::handle_readable(const error_code &ec)
|
||||
{
|
||||
using namespace boost::system::errc;
|
||||
using boost::system::system_category;
|
||||
|
||||
switch(ec.value())
|
||||
{
|
||||
case success:
|
||||
handle_readable_success();
|
||||
wait_readable();
|
||||
break;
|
||||
|
||||
default:
|
||||
throw boost::system::system_error{ec};
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::handle_readable_success()
|
||||
{
|
||||
if(queue.empty())
|
||||
throw std::out_of_range("queue empty");
|
||||
|
||||
auto &tag
|
||||
{
|
||||
queue.front()
|
||||
};
|
||||
|
||||
const mutable_buffer buffer
|
||||
{
|
||||
tag.make_read_buffer()
|
||||
};
|
||||
|
||||
const size_t bytes
|
||||
{
|
||||
read_one(*socket, buffer)
|
||||
};
|
||||
|
||||
const const_buffer received
|
||||
{
|
||||
data(buffer), bytes
|
||||
};
|
||||
|
||||
const_buffer overrun;
|
||||
const bool done
|
||||
{
|
||||
tag.read_buffer(received, overrun)
|
||||
};
|
||||
|
||||
if(!empty(overrun))
|
||||
std::cout << "got overrun: " << size(overrun) << std::endl;
|
||||
|
||||
if(done)
|
||||
queue.pop_front();
|
||||
}
|
||||
|
||||
void
|
||||
ircd::server::link::handle_writable(const error_code &ec)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::link::busy()
|
||||
const
|
||||
{
|
||||
return !queue.empty();
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::link::ready()
|
||||
const
|
||||
{
|
||||
return connected() && !init && !fini && !eptr;
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::server::link::connected()
|
||||
const noexcept
|
||||
{
|
||||
return net::connected(*s);
|
||||
return bool(socket) && net::connected(*socket);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue