0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-07-01 08:18:20 +02:00

ircd::server: Improve write stack; add close_all(); various.

This commit is contained in:
Jason Volk 2018-01-15 00:11:32 -08:00
parent 745a00b332
commit 59621eb266
6 changed files with 125 additions and 49 deletions

View file

@ -27,9 +27,8 @@ struct ircd::server::link
bool init {false}; ///< link is connecting
bool fini {false}; ///< link is disconnecting
std::shared_ptr<server::node> node; ///< backreference to node
std::exception_ptr eptr; ///< error from socket
std::shared_ptr<net::socket> socket; ///< link's socket
std::deque<struct request::tag> queue; ///< link's work queue
std::deque<tag> queue; ///< link's work queue
bool connected() const noexcept;
bool ready() const;
@ -37,12 +36,15 @@ struct ircd::server::link
protected:
void discard_read();
const_buffer process_read_next(const const_buffer &, struct request::tag &, bool &done);
const_buffer process_read_next(const const_buffer &, tag &, bool &done);
bool process_read(const_buffer &);
void handle_readable_success();
void handle_readable(const error_code &);
void wait_readable();
const_buffer process_write_next(const const_buffer &);
bool process_write(tag &);
void handle_writable_success();
void handle_writable(const error_code &);
void wait_writable();

View file

@ -50,6 +50,9 @@ struct ircd::server::node
void cancel(request &);
void submit(request &);
void interrupt();
void close();
node();
~node() noexcept;
};

View file

@ -62,9 +62,7 @@ struct ircd::server::in
struct ircd::server::request
:ctx::future<http::code>
{
struct tag;
struct tag *tag {nullptr};
server::tag *tag {nullptr};
public:
server::out out;

View file

@ -27,6 +27,8 @@ namespace ircd::server
struct init;
struct link;
struct node;
struct request;
struct tag;
IRCD_EXCEPTION(ircd::error, error);
@ -38,8 +40,8 @@ namespace ircd::server
node &get(const net::hostport &);
}
#include "request.h"
#include "tag.h"
#include "request.h"
#include "link.h"
#include "node.h"

View file

@ -20,9 +20,14 @@
#pragma once
#define HAVE_IRCD_SERVER_TAG_H
namespace ircd::server
{
struct tag;
}
/// Internal portion of the request
//
struct ircd::server::request::tag
///
struct ircd::server::tag
{
server::request *request;
ctx::promise<http::code> p;

View file

@ -23,6 +23,9 @@
namespace ircd::server
{
std::shared_ptr<node> create(const net::hostport &);
void interrupt_all();
void close_all();
}
decltype(ircd::server::log)
@ -81,13 +84,28 @@ ircd::server::init::init()
ircd::server::init::~init()
noexcept
{
close_all();
nodes.clear();
}
void
ircd::server::init::interrupt()
{
nodes.clear();
interrupt_all();
}
void
ircd::server::close_all()
{
for(auto &node : nodes)
node.second->close();
}
void
ircd::server::interrupt_all()
{
for(auto &node : nodes)
node.second->interrupt();
}
///
@ -135,16 +153,16 @@ noexcept
}
//
// request::tag
// tag
//
ircd::server::request::tag::tag(server::request &request)
ircd::server::tag::tag(server::request &request)
:request{&request}
{
static_cast<ctx::future<http::code> &>(request) = p;
}
ircd::server::request::tag::tag(tag &&o)
ircd::server::tag::tag(tag &&o)
noexcept
:request{std::move(o.request)}
{
@ -153,8 +171,8 @@ noexcept
request->tag = this;
}
struct ircd::server::request::tag &
ircd::server::request::tag::operator=(tag &&o)
struct ircd::server::tag &
ircd::server::tag::operator=(tag &&o)
noexcept
{
request = std::move(o.request);
@ -166,7 +184,7 @@ noexcept
return *this;
}
ircd::server::request::tag::~tag()
ircd::server::tag::~tag()
noexcept
{
}
@ -185,8 +203,8 @@ noexcept
/// setting the value of `done` to true. Otherwise it is assumed false.
///
ircd::const_buffer
ircd::server::request::tag::read_buffer(const const_buffer &buffer,
bool &done)
ircd::server::tag::read_buffer(const const_buffer &buffer,
bool &done)
{
return !request? const_buffer{}:
!request->in.head.status? read_head(buffer, done):
@ -209,7 +227,7 @@ ircd::server::request::tag::read_buffer(const const_buffer &buffer,
/// are returned.
///
ircd::mutable_buffer
ircd::server::request::tag::make_read_buffer()
ircd::server::tag::make_read_buffer()
const
{
assert(request);
@ -219,24 +237,26 @@ const
}
void
ircd::server::request::tag::wrote_buffer(const const_buffer &buffer)
ircd::server::tag::wrote_buffer(const const_buffer &buffer)
{
assert(request);
const auto &req{*request};
if(head_written < size(req.out.head))
{
assert(data(buffer) == data(req.out.head));
head_written += size(buffer);
assert(data(buffer) == data(req.out.head));
assert(head_written <= size(req.out.head));
}
else if(content_written < size(req.out.content))
{
assert(data(buffer) == data(req.out.content));
content_written += size(buffer);
assert(data(buffer) == data(req.out.content));
assert(content_written <= size(req.out.content));
}
}
ircd::const_buffer
ircd::server::request::tag::make_write_buffer()
ircd::server::tag::make_write_buffer()
const
{
assert(request);
@ -247,7 +267,8 @@ const
const const_buffer window{data(req.out.head) + head_written, remain};
return window;
}
else if(content_written < size(req.out.content))
if(content_written < size(req.out.content))
{
const size_t remain{size(req.out.content) - content_written};
const const_buffer window{data(req.out.content) + content_written, remain};
@ -258,8 +279,8 @@ const
}
ircd::const_buffer
ircd::server::request::tag::read_head(const const_buffer &buffer,
bool &done)
ircd::server::tag::read_head(const const_buffer &buffer,
bool &done)
{
assert(request);
auto &req{*request};
@ -340,8 +361,8 @@ ircd::server::request::tag::read_head(const const_buffer &buffer,
}
ircd::const_buffer
ircd::server::request::tag::read_content(const const_buffer &buffer,
bool &done)
ircd::server::tag::read_content(const const_buffer &buffer,
bool &done)
{
assert(request);
auto &req{*request};
@ -385,7 +406,7 @@ ircd::server::request::tag::read_content(const const_buffer &buffer,
}
ircd::mutable_buffer
ircd::server::request::tag::make_read_head_buffer()
ircd::server::tag::make_read_head_buffer()
const
{
assert(request);
@ -411,7 +432,7 @@ const
}
ircd::mutable_buffer
ircd::server::request::tag::make_read_content_buffer()
ircd::server::tag::make_read_content_buffer()
const
{
assert(request);
@ -454,6 +475,26 @@ ircd::server::node::node()
ircd::server::node::~node()
noexcept
{
assert(links.empty());
}
void
ircd::server::node::close()
{
for(auto &link : links)
link.close(net::close_opts_default);
while(!links.empty())
dock.wait();
}
void
ircd::server::node::interrupt()
{
//TODO: not a close
//TODO: interrupt = killing requests but still setting tag promises
for(auto &link : links)
link.close(net::close_opts_default);
}
void
@ -568,6 +609,7 @@ ircd::server::node::del(link &link)
assert(it != end(links));
links.erase(it);
dock.notify_all();
}
void
@ -591,7 +633,6 @@ try
this->eptr = std::move(eptr);
static_cast<net::ipport &>(this->remote) = ipport;
dock.notify_all();
}
catch(const std::bad_weak_ptr &)
@ -694,7 +735,7 @@ ircd::server::link::handle_open(std::exception_ptr eptr)
{
init = false;
if(!this->eptr)
if(!eptr)
wait_readable();
if(node)
@ -751,16 +792,34 @@ ircd::server::link::handle_writable(const error_code &ec)
switch(ec.value())
{
case success:
handle_writable_success();
break;
case operation_canceled:
break;
return;
default:
throw boost::system::system_error{ec};
}
}
void
ircd::server::link::handle_writable_success()
{
for(auto &tag : queue)
{
if(!process_write(tag))
{
wait_writable();
break;
}
}
}
bool
ircd::server::link::process_write(tag &tag)
{
while(1)
{
const const_buffer buffer
{
@ -768,28 +827,35 @@ ircd::server::link::handle_writable(const error_code &ec)
};
if(empty(buffer))
continue;
const size_t bytes
{
write_any(*socket, buffer)
};
return true;
const const_buffer written
{
data(buffer), bytes
process_write_next(buffer)
};
tag.wrote_buffer(written);
if(bytes < size(buffer))
{
wait_writable();
break;
}
if(size(written) < size(buffer))
return false;
}
}
ircd::const_buffer
ircd::server::link::process_write_next(const const_buffer &buffer)
{
const size_t bytes
{
write_any(*socket, buffer)
};
const const_buffer written
{
data(buffer), bytes
};
return written;
}
void
ircd::server::link::wait_readable()
{
@ -817,7 +883,7 @@ try
break;
case operation_canceled:
break;
return;
default:
throw boost::system::system_error{ec};
@ -890,7 +956,7 @@ catch(const boost::system::system_error &e)
/// Process one read operation for one tag
ircd::const_buffer
ircd::server::link::process_read_next(const const_buffer &underrun,
struct request::tag &tag,
tag &tag,
bool &done)
{
const mutable_buffer buffer
@ -964,7 +1030,7 @@ bool
ircd::server::link::ready()
const
{
return connected() && !init && !fini && !eptr;
return connected() && !init && !fini;
}
bool