0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-27 07:54:05 +01:00

ircd::server: Various fixes; error handling; minor cleanup.

This commit is contained in:
Jason Volk 2018-01-17 05:58:44 -08:00
parent 557b521d62
commit 9ba31f77c2
3 changed files with 41 additions and 71 deletions

View file

@ -26,6 +26,7 @@ struct ircd::server::node
:std::enable_shared_from_this<ircd::server::node>
{
net::remote remote;
std::exception_ptr eptr;
std::list<link> links;
template<class F> size_t accumulate_links(F&&) const;

View file

@ -79,7 +79,7 @@ struct ircd::server::request
/// Options
const struct opts *opts { &opts_default };
request(const net::hostport &, server::out out, server::in in);
request(const net::hostport &, server::out, server::in);
request() = default;
request(request &&) noexcept;
request(const request &) = delete;

View file

@ -308,6 +308,7 @@ ircd::server::node::interrupt()
void
ircd::server::node::submit(request &request)
try
{
link *const ret
{
@ -331,6 +332,13 @@ ircd::server::node::submit(request &request)
"No link to node %s available", remote.hostname
});
}
catch(const std::exception &e)
{
if(request.tag)
request.tag->set_exception(std::current_exception());
else
throw;
}
/// Dispatch algorithm here; finds the best link to place this request on,
/// or creates a new link entirely. There are a number of factors: foremost
@ -425,6 +433,9 @@ ircd::server::node::link_get(const request &request)
ircd::server::link &
ircd::server::node::link_add(const size_t &num)
{
if(eptr)
std::rethrow_exception(eptr);
links.emplace_back(*this);
auto &link{links.back()};
@ -439,6 +450,9 @@ ircd::server::node::handle_open(link &link,
std::exception_ptr eptr)
try
{
if(eptr && links.size() == 1)
this->eptr = eptr;
if(eptr)
std::rethrow_exception(eptr);
}
@ -591,84 +605,36 @@ ircd::server::node::cancel_committed(link &link,
std::exception_ptr eptr)
{
auto &queue(link.queue);
// Find the first tag in the queue which hasn't revealed any data to
// the remote; this is uncommitted.
const auto it
{
std::find_if(begin(queue), end(queue), [](const auto &tag)
{
return !tag.committed();
})
};
std::for_each(begin(queue), it, [this, &eptr](auto &tag)
for(auto it(begin(queue)); it != end(queue); it = queue.erase(it))
{
auto &tag{*it};
if(!tag.request)
return;
continue;
if(!tag.committed())
break;
assert(tag.write_completed());
tag.set_exception(eptr);
disassociate(*tag.request, tag);
});
log.debug("node(%p) link(%p) errored %zu of %zu of its tags",
this,
&link,
std::distance(begin(queue), it),
queue.size());
queue.erase(begin(queue), it);
}
}
void
ircd::server::node::disperse_uncommitted(link &link)
{
auto &queue(link.queue);
// Find the first tag in the queue which hasn't revealed any data to
// the remote; this is uncommitted.
const auto it
auto it(begin(queue));
while(it != end(queue))
{
std::find_if(begin(queue), end(queue), [](const auto &tag)
auto &tag{*it};
if(!tag.request || tag.committed())
{
return !tag.committed();
})
};
std::for_each(it, end(queue), [this](auto &tag)
{
if(!tag.request)
return;
assert(!tag.write_completed());
auto &request{*tag.request};
auto *const link
{
this->link_get(request)
};
if(likely(link))
{
disassociate(request, tag);
link->submit(request);
return;
++it;
continue;
}
tag.set_exception(std::make_exception_ptr(unavailable
{
"No link to node %s available", remote.hostname
}));
});
log.debug("node(%p) link(%p) dispersed %zu of %zu of its tags",
this,
&link,
std::distance(it, end(queue)),
queue.size());
queue.erase(it, end(queue));
submit(*tag.request);
it = queue.erase(it);
}
}
/// This *cannot* be called unless a link's socket is closed and its queue
@ -689,7 +655,7 @@ ircd::server::node::del(link &link)
log.debug("node(%p) removing link(%p) %zu of %zu to %s",
this,
&link,
std::distance(it, end(links)),
std::distance(begin(links), it),
links.size(),
string(remote));
@ -920,9 +886,12 @@ noexcept
void
ircd::server::link::submit(request &request)
{
assert(!request.tag || !request.tag->committed());
const auto it
{
queue.emplace(end(queue), request)
request.tag? queue.emplace(end(queue), std::move(*request.tag)):
queue.emplace(end(queue), request)
};
if(ready())
@ -1486,22 +1455,22 @@ noexcept
char *ptr{tag.cancellation.get()};
const mutable_buffer out_head{ptr, size(request.out.head)};
tag.request->out.head = out_head;
ptr += size(out_head);
const mutable_buffer out_content{ptr, size(request.out.content)};
tag.request->out.content = out_content;
ptr += size(out_content);
const mutable_buffer in_head{ptr, size(request.in.head)};
tag.request->in.head = in_head;
ptr += size(in_head);
const mutable_buffer in_content{ptr, size(request.in.content)};
tag.request->in.content = in_content;
ptr += size(in_content);
assert(size_t(std::distance(tag.cancellation.get(), ptr)) == cancellation_size);
tag.request->out.head = out_head;
tag.request->out.content = out_content;
tag.request->in.head = in_head;
tag.request->in.content = in_content;
// If the head is not completely written we have to copy the remainder from where
// the socket left off.