0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-10-01 05:08:59 +02:00

ircd::server: Improve tag cancellation/dispersal on closing/shutdown; minor cleanup.

This commit is contained in:
Jason Volk 2018-03-03 21:55:59 -08:00
parent 4df13f5d4f
commit 92bef88b6c
3 changed files with 112 additions and 50 deletions

View file

@ -77,6 +77,9 @@ struct ircd::server::link
size_t tag_uncommitted() const;
// request panel
void cancel_uncommitted(std::exception_ptr);
void cancel_committed(std::exception_ptr);
void cancel_all(std::exception_ptr);
void submit(request &);
// control panel

View file

@ -33,7 +33,6 @@ struct ircd::server::node
void resolve(const hostport &);
void disperse_uncommitted(link &);
void cancel_committed(link &, std::exception_ptr);
void disperse(link &);
void del(link &);

View file

@ -12,12 +12,16 @@
namespace ircd::server
{
ctx::dock dock;
// Internal state
bool ready; // like an /etc/nologin to prevent actions when false.
ctx::dock dock; // internal semaphore
// Internal util
template<class F> size_t accumulate_nodes(F&&);
template<class F> size_t accumulate_links(F&&);
template<class F> size_t accumulate_tags(F&&);
// Internal control
std::shared_ptr<node> create(const net::hostport &);
void interrupt_all();
void close_all();
@ -146,27 +150,6 @@ ircd::server::accumulate_nodes(F&& closure)
});
}
//
// init
//
ircd::server::init::init()
{
}
ircd::server::init::~init()
noexcept
{
close_all();
nodes.clear();
}
void
ircd::server::init::interrupt()
{
interrupt_all();
}
void
ircd::server::close_all()
{
@ -197,6 +180,29 @@ ircd::server::interrupt_all()
node.second->interrupt();
}
//
// init
//
ircd::server::init::init()
{
ready = true;
}
ircd::server::init::~init()
noexcept
{
ready = false;
close_all();
nodes.clear();
}
void
ircd::server::init::interrupt()
{
interrupt_all();
}
///
// request
//
@ -257,6 +263,12 @@ void
ircd::server::submit(const hostport &hostport,
request &request)
{
if(unlikely(!server::ready))
throw unavailable
{
"Unable to fulfill requests at this time."
};
assert(request.tag == nullptr);
auto &node(server::get(hostport));
node.submit(request);
@ -297,17 +309,26 @@ noexcept
void
ircd::server::node::close()
{
for(auto &link : links)
link.close(net::close_opts_default);
for(auto &link : this->links)
link.exclude = true;
std::vector<link *> links(this->links.size());
pointers(this->links, links);
for(const auto &link : links)
link->close(net::close_opts_default);
}
void
ircd::server::node::interrupt()
{
//TODO: not a close
//TODO: interrupt = killing requests but still setting tag promises
for(auto &link : links)
link.close(net::close_opts_default);
link.exclude = true;
for(auto &link : this->links)
link.cancel_all(std::make_exception_ptr(canceled
{
"Request was aborted due to interruption."
}));
}
void
@ -515,7 +536,7 @@ ircd::server::node::handle_error(link &link,
std::exception_ptr eptr)
try
{
cancel_committed(link, eptr);
link.cancel_committed(eptr);
link.close(net::dc::RST);
std::rethrow_exception(eptr);
}
@ -567,7 +588,7 @@ ircd::server::node::handle_error(link &link,
string(remote),
e.what());
cancel_committed(link, std::make_exception_ptr(e));
link.cancel_committed(std::make_exception_ptr(e));
link.close(net::dc::RST);
}
@ -642,7 +663,7 @@ void
ircd::server::node::disperse(link &link)
{
disperse_uncommitted(link);
cancel_committed(link, std::make_exception_ptr(canceled
link.cancel_committed(std::make_exception_ptr(canceled
{
"Request was aborted; though it was partially completed"
}));
@ -650,30 +671,12 @@ ircd::server::node::disperse(link &link)
assert(link.queue.empty());
}
void
ircd::server::node::cancel_committed(link &link,
std::exception_ptr eptr)
{
auto &queue(link.queue);
for(auto it(begin(queue)); it != end(queue); it = queue.erase(it))
{
auto &tag{*it};
if(!tag.request)
continue;
if(!tag.committed())
break;
tag.set_exception(eptr);
}
}
void
ircd::server::node::disperse_uncommitted(link &link)
{
auto &queue(link.queue);
auto it(begin(queue));
while(it != end(queue))
while(it != end(queue)) try
{
auto &tag{*it};
if(!tag.request || tag.committed())
@ -685,6 +688,16 @@ ircd::server::node::disperse_uncommitted(link &link)
submit(*tag.request);
it = queue.erase(it);
}
catch(const std::exception &e)
{
const auto &tag{*it};
log.warning("node(%p) failed to resubmit tag(%p): %s",
this,
&tag,
e.what());
it = queue.erase(it);
}
}
/// This *cannot* be called unless a link's socket is closed and its queue
@ -969,6 +982,53 @@ ircd::server::link::submit(request &request)
wait_writable();
}
void
ircd::server::link::cancel_all(std::exception_ptr eptr)
{
for(auto it(begin(queue)); it != end(queue); it = queue.erase(it))
{
auto &tag{*it};
if(!tag.request)
continue;
tag.set_exception(eptr);
}
}
void
ircd::server::link::cancel_committed(std::exception_ptr eptr)
{
for(auto it(begin(queue)); it != end(queue); it = queue.erase(it))
{
auto &tag{*it};
if(!tag.request)
continue;
if(!tag.committed())
break;
tag.set_exception(eptr);
}
}
void
ircd::server::link::cancel_uncommitted(std::exception_ptr eptr)
{
auto it(begin(queue));
while(it != end(queue))
{
auto &tag{*it};
if(!tag.request || tag.committed())
{
++it;
continue;
}
tag.set_exception(eptr);
it = queue.erase(it);
}
}
bool
ircd::server::link::open(const net::open_opts &open_opts)
{