0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 10:12:39 +01:00

ircd::server: Support robust request cancellation without disrupting pipeline.

This commit is contained in:
Jason Volk 2018-01-16 21:23:15 -08:00
parent 8366c735b4
commit 862fb1ebcd
5 changed files with 276 additions and 87 deletions

View file

@ -29,7 +29,7 @@ struct ircd::server::link
bool exclude {false}; ///< link is excluded bool exclude {false}; ///< link is excluded
std::shared_ptr<server::node> node; ///< backreference to node std::shared_ptr<server::node> node; ///< backreference to node
std::shared_ptr<net::socket> socket; ///< link's socket std::shared_ptr<net::socket> socket; ///< link's socket
std::deque<tag> queue; ///< link's work queue std::list<tag> queue; ///< link's work queue
template<class F> size_t accumulate_tags(F&&) const; template<class F> size_t accumulate_tags(F&&) const;
@ -77,7 +77,6 @@ struct ircd::server::link
size_t tag_uncommitted() const; size_t tag_uncommitted() const;
// request panel // request panel
tag cancel(request &);
void submit(request &); void submit(request &);
// control panel // control panel

View file

@ -77,7 +77,6 @@ struct ircd::server::node
link *link_get(const request &); link *link_get(const request &);
// request panel // request panel
void cancel(request &);
void submit(request &); void submit(request &);
// control panel // control panel

View file

@ -32,6 +32,7 @@ namespace ircd::server
size_t size(const out &); size_t size(const out &);
void submit(const hostport &, request &); void submit(const hostport &, request &);
bool cancel(request &);
} }
/// Request data and options related to transmitting the request. This /// Request data and options related to transmitting the request. This
@ -104,6 +105,8 @@ inline ircd::server::request &
ircd::server::request::operator=(request &&o) ircd::server::request::operator=(request &&o)
noexcept noexcept
{ {
this->~request();
ctx::future<http::code>::operator=(std::move(o)); ctx::future<http::code>::operator=(std::move(o));
out = std::move(o.out); out = std::move(o.out);
in = std::move(o.in); in = std::move(o.in);
@ -119,6 +122,9 @@ inline
ircd::server::request::~request() ircd::server::request::~request()
noexcept noexcept
{ {
if(tag && !tag->committed())
cancel(*this);
if(tag) if(tag)
disassociate(*this, *tag); disassociate(*this, *tag);
} }

View file

@ -28,22 +28,25 @@ namespace ircd::server
void associate(request &, tag &, tag &&) noexcept; void associate(request &, tag &, tag &&) noexcept;
void associate(request &, tag &, request &&) noexcept; void associate(request &, tag &, request &&) noexcept;
void disassociate(request &, tag &); void disassociate(request &, tag &);
void cancel(request &, tag &) noexcept;
} }
/// Internal portion of the request /// Internal portion of the request
/// ///
struct ircd::server::tag struct ircd::server::tag
{ {
server::request *request {nullptr};
ctx::promise<http::code> p; ctx::promise<http::code> p;
server::request *request {nullptr};
size_t written {0}; size_t written {0};
size_t head_read {0}; size_t head_read {0};
size_t content_read {0}; size_t content_read {0};
size_t content_over {0}; size_t content_over {0};
http::code status {(http::code)0}; http::code status {(http::code)0};
std::unique_ptr<char[]> cancellation;
void set_exception(const error_code &);
void set_exception(std::exception_ptr); void set_exception(std::exception_ptr);
template<class... args> void set_exception(args&&...);
template<class... args> void set_value(args&&...);
const_buffer make_write_content_buffer() const; const_buffer make_write_content_buffer() const;
const_buffer make_write_head_buffer() const; const_buffer make_write_head_buffer() const;
@ -60,9 +63,13 @@ struct ircd::server::tag
size_t write_completed() const; size_t write_completed() const;
size_t write_remaining() const; size_t write_remaining() const;
size_t read_total() const; // not accurate until content-length known size_t read_total() const; // not accurate until content-length
size_t read_completed() const; // reports all received so far size_t read_completed() const; // reports all received so far
size_t read_remaining() const; // not accurate until content-length known size_t read_remaining() const; // not accurate until content-length
bool committed() const; // Tag has revealed data to remote
bool abandoned() const; // User has abandoned their future
bool canceled() const; // User has abandoned their *request
const_buffer make_write_buffer() const; const_buffer make_write_buffer() const;
void wrote_buffer(const const_buffer &); void wrote_buffer(const const_buffer &);
@ -88,13 +95,14 @@ ircd::server::tag::tag(server::request &request)
inline inline
ircd::server::tag::tag(tag &&o) ircd::server::tag::tag(tag &&o)
noexcept noexcept
:request{std::move(o.request)} :p{std::move(o.p)}
,p{std::move(o.p)} ,request{std::move(o.request)}
,written{std::move(o.written)} ,written{std::move(o.written)}
,head_read{std::move(o.head_read)} ,head_read{std::move(o.head_read)}
,content_read{std::move(o.content_read)} ,content_read{std::move(o.content_read)}
,content_over{std::move(o.content_over)} ,content_over{std::move(o.content_over)}
,status{std::move(o.status)} ,status{std::move(o.status)}
,cancellation{std::move(o.cancellation)}
{ {
if(request) if(request)
associate(*request, *this, std::move(o)); associate(*request, *this, std::move(o));
@ -104,13 +112,16 @@ inline ircd::server::tag &
ircd::server::tag::operator=(tag &&o) ircd::server::tag::operator=(tag &&o)
noexcept noexcept
{ {
request = std::move(o.request); this->~tag();
p = std::move(o.p); p = std::move(o.p);
request = std::move(o.request);
written = std::move(o.written); written = std::move(o.written);
head_read = std::move(o.head_read); head_read = std::move(o.head_read);
content_read = std::move(o.content_read); content_read = std::move(o.content_read);
content_over = std::move(o.content_over); content_over = std::move(o.content_over);
status = std::move(o.status); status = std::move(o.status);
cancellation = std::move(o.cancellation);
if(request) if(request)
associate(*request, *this, std::move(o)); associate(*request, *this, std::move(o));

View file

@ -215,6 +215,54 @@ ircd::server::interrupt_all()
// request // request
// //
/// Canceling a request is tricky. This allows a robust way to let the user's
/// request go out of scope at virtually any time without disrupting the
/// pipeline and other requests.
bool
ircd::server::cancel(request &request)
{
if(!request.tag)
return false;
if(request.tag->canceled())
return false;
if(request.tag->abandoned())
return false;
auto &tag
{
*request.tag
};
/*
log.debug("cancel request(%p) tag(%p) commit:%d w:%zu hr:%zu cr:%zu",
&request,
&tag,
tag.committed(),
tag.written,
tag.head_read,
tag.content_read);
*/
// We got off easy... The link's write loop won't start an abandoned
// request. All that has to be done is indicate a full cancellation
// immediately and the user will know nothing was revealed to the remote.
if(!tag.committed())
{
tag.set_exception(canceled
{
"Request canceled"
});
return true;
}
// Now things aren't so easy. More complicated logic happens inside...
cancel(request, tag);
return true;
}
void void
ircd::server::submit(const hostport &hostport, ircd::server::submit(const hostport &hostport,
request &request) request &request)
@ -274,15 +322,10 @@ ircd::server::node::submit(request &request)
"No link to node %s available", remote.hostname "No link to node %s available", remote.hostname
}; };
else else
request.tag->set_exception(std::make_exception_ptr(unavailable request.tag->set_exception(unavailable
{ {
"No link to node %s available", remote.hostname "No link to node %s available", remote.hostname
})); });
}
void
ircd::server::node::cancel(request &request)
{
} }
/// Dispatch algorithm here; finds the best link to place this request on, /// Dispatch algorithm here; finds the best link to place this request on,
@ -519,7 +562,6 @@ ircd::server::node::handle_link_done(link &link)
void void
ircd::server::node::disperse(link &link) ircd::server::node::disperse(link &link)
{ {
// the easy part
disperse_uncommitted(link); disperse_uncommitted(link);
cancel_committed(link, canceled cancel_committed(link, canceled
{ {
@ -541,7 +583,7 @@ ircd::server::node::cancel_committed(link &link,
{ {
std::find_if(begin(queue), end(queue), [](const auto &tag) std::find_if(begin(queue), end(queue), [](const auto &tag)
{ {
return !tag.write_completed(); return !tag.committed();
}) })
}; };
@ -551,9 +593,8 @@ ircd::server::node::cancel_committed(link &link,
return; return;
assert(tag.write_completed()); assert(tag.write_completed());
auto &request{*tag.request}; tag.set_exception(e);
disassociate(request, tag); disassociate(*tag.request, tag);
tag.set_exception(std::make_exception_ptr(e));
}); });
log.debug("node(%p) link(%p) errored %zu of %zu of its tags", log.debug("node(%p) link(%p) errored %zu of %zu of its tags",
@ -562,12 +603,7 @@ ircd::server::node::cancel_committed(link &link,
std::distance(begin(queue), it), std::distance(begin(queue), it),
queue.size()); queue.size());
const auto remaining queue.erase(begin(queue), it);
{
std::distance(begin(queue), queue.erase(begin(queue), it))
};
queue.resize(remaining);
} }
void void
@ -581,7 +617,7 @@ ircd::server::node::disperse_uncommitted(link &link)
{ {
std::find_if(begin(queue), end(queue), [](const auto &tag) std::find_if(begin(queue), end(queue), [](const auto &tag)
{ {
return !tag.write_completed(); return !tag.committed();
}) })
}; };
@ -592,7 +628,6 @@ ircd::server::node::disperse_uncommitted(link &link)
assert(!tag.write_completed()); assert(!tag.write_completed());
auto &request{*tag.request}; auto &request{*tag.request};
disassociate(request, tag);
auto *const link auto *const link
{ {
@ -601,6 +636,7 @@ ircd::server::node::disperse_uncommitted(link &link)
if(likely(link)) if(likely(link))
{ {
disassociate(request, tag);
link->submit(request); link->submit(request);
return; return;
} }
@ -617,12 +653,7 @@ ircd::server::node::disperse_uncommitted(link &link)
std::distance(it, end(queue)), std::distance(it, end(queue)),
queue.size()); queue.size());
const auto remaining queue.erase(it, end(queue));
{
std::distance(begin(queue), queue.erase(it, end(queue)))
};
queue.resize(remaining);
} }
/// This *cannot* be called unless a link's socket is closed and its queue /// This *cannot* be called unless a link's socket is closed and its queue
@ -691,7 +722,7 @@ catch(const std::exception &e)
for(auto &link : links) for(auto &link : links)
for(auto &tag : link.queue) for(auto &tag : link.queue)
tag.set_exception(std::make_exception_ptr(e)); tag.set_exception(e);
nodes.erase(remote.hostname); nodes.erase(remote.hostname);
} }
@ -883,31 +914,6 @@ ircd::server::link::submit(request &request)
wait_writable(); wait_writable();
} }
/// Cancel a request queued in the link. This is an expensive operation, use
/// with care.
ircd::server::tag
ircd::server::link::cancel(request &request)
{
const auto it
{
std::find_if(begin(queue), end(queue), [&request]
(const auto &tag)
{
return &tag == request.tag;
})
};
if(it == end(queue))
throw std::out_of_range
{
"Request has no tag queued in this link"
};
tag ret{std::move(*it)};
queue.erase(it);
return ret;
}
bool bool
ircd::server::link::open(const net::open_opts &open_opts) ircd::server::link::open(const net::open_opts &open_opts)
{ {
@ -932,8 +938,8 @@ ircd::server::link::handle_open(std::exception_ptr eptr)
if(!eptr) if(!eptr)
{ {
wait_readable();
wait_writable(); wait_writable();
wait_readable();
} }
if(node) if(node)
@ -1012,8 +1018,23 @@ ircd::server::link::handle_writable(const error_code &ec)
void void
ircd::server::link::handle_writable_success() ircd::server::link::handle_writable_success()
{ {
for(auto &tag : queue) auto it(begin(queue));
while(it != end(queue))
{ {
auto &tag{*it};
if((tag.abandoned() || tag.canceled()) && !tag.committed())
{
log.debug("link(%p) discarding canceled:%d abandoned:%d uncommitted tag %zu of %zu",
this,
tag.canceled(),
tag.abandoned(),
tag_committed(),
tag_count());
it = queue.erase(it);
continue;
}
if(!process_write(tag)) if(!process_write(tag))
{ {
wait_writable(); wait_writable();
@ -1023,20 +1044,20 @@ ircd::server::link::handle_writable_success()
// Limits the amount of requests in the pipe. // Limits the amount of requests in the pipe.
if(tag_committed() >= tag_commit_max()) if(tag_committed() >= tag_commit_max())
break; break;
++it;
} }
} }
bool bool
ircd::server::link::process_write(tag &tag) ircd::server::link::process_write(tag &tag)
{ {
if(tag.write_remaining() >= tag.write_total()) if(!tag.committed())
{
log.debug("link(%p) starting on tag %zu of %zu: wt:%zu", log.debug("link(%p) starting on tag %zu of %zu: wt:%zu",
this, this,
tag_committed(), tag_committed(),
tag_count(), tag_count(),
tag.write_total()); tag.write_total());
}
while(tag.write_remaining()) while(tag.write_remaining())
{ {
@ -1161,6 +1182,13 @@ try
queue.front() queue.front()
}; };
if(!tag.committed())
{
// Tag hasn't sent its data yet, we shouldn't have anything for it
assert(empty(overrun));
return false;
}
bool done{false}; do bool done{false}; do
{ {
overrun = process_read_next(overrun, tag, done); overrun = process_read_next(overrun, tag, done);
@ -1236,7 +1264,7 @@ try
} }
catch(const buffer_overrun &e) catch(const buffer_overrun &e)
{ {
tag.p.set_exception(std::make_exception_ptr(e)); tag.set_exception(e);
throw; throw;
} }
@ -1281,7 +1309,7 @@ const
{ {
return accumulate_tags([](const auto &tag) return accumulate_tags([](const auto &tag)
{ {
return tag.write_completed() > 0; return tag.committed();
}); });
} }
@ -1405,6 +1433,120 @@ const
// tag // tag
// //
/// This is tricky. When a user cancels a request which has committed some
/// writes to the remote we have to continue to service it through to
/// completion without disrupting the linearity of the link's pipeline
/// and causing trouble with other requests. This all depends on what phase
/// the request is currently in.
///
/// In any case, the goal here is to swap out the user's request buffers
/// and replace them with cancellation buffers which will be transparent
/// to the link as it completes the request.
void
ircd::server::cancel(request &request,
tag &tag)
noexcept
{
// Must have a fully associated request/tag which has committed some
// data to the wire to enter this routine.
assert(tag.committed());
assert(request.tag == &tag);
assert(tag.request == &request);
// Disassociate the user's request and add our dummy request in its place.
request.tag = nullptr;
tag.request = new server::request{};
tag.request->tag = &tag;
// Setup the cancellation buffers by mirroring the current state of the
// user's buffers.
const size_t cancellation_size
{
size(request.out) + size(request.in)
};
tag.cancellation = std::make_unique<char[]>(cancellation_size);
char *ptr{tag.cancellation.get()};
const mutable_buffer out_head{ptr, size(request.out.head)};
ptr += size(out_head);
const mutable_buffer out_content{ptr, size(request.out.content)};
ptr += size(out_content);
const mutable_buffer in_head{ptr, size(request.in.head)};
ptr += size(in_head);
const mutable_buffer in_content{ptr, size(request.in.content)};
ptr += size(in_content);
assert(size_t(std::distance(tag.cancellation.get(), ptr)) == cancellation_size);
tag.request->out.head = out_head;
tag.request->out.content = out_content;
tag.request->in.head = in_head;
tag.request->in.content = in_content;
// If the head is not completely written we have to copy the remainder from where
// the socket left off.
if(tag.written < size(request.out.head))
{
const const_buffer src
{
data(request.out.head) + tag.written, size(request.out.head) - tag.written
};
const mutable_buffer dst
{
data(out_head) + tag.written, size(src)
};
copy(dst, src);
}
// If the content is not completely written we have to copy the remainder from where
// the socket left off.
const size_t content_written
{
tag.written > size(request.out.head)? tag.written - size(request.out.head) : 0
};
if(content_written < size(request.out.content))
{
const const_buffer src
{
data(request.out.content) + content_written, size(request.out.content) - content_written
};
const mutable_buffer dst
{
data(out_content) + content_written, size(src)
};
copy(dst, src);
}
// If the head is not completely read we have to copy what's been received so far so
// we can parse a coherent head.
if(tag.head_read > 0 && tag.head_read < size(request.in.head))
{
const const_buffer src
{
data(request.in.head), tag.head_read
};
const mutable_buffer dst
{
data(in_head), size(src)
};
copy(dst, src);
}
// No received content is copied.
}
void void
ircd::server::associate(request &request, ircd::server::associate(request &request,
tag &tag) tag &tag)
@ -1461,6 +1603,13 @@ ircd::server::disassociate(request &request,
request.tag = nullptr; request.tag = nullptr;
tag.request = nullptr; tag.request = nullptr;
// If the original request was canceled a new request was attached in its
// place in addition to an cancellation buffer. The existence of this
// cancellation buffer indicates that we must delete the request here.
// This is a little hacky but it gets the job done.
if(bool(tag.cancellation))
delete &request;
} }
/// Called by the controller of the socket with a view of the data received by /// Called by the controller of the socket with a view of the data received by
@ -1480,10 +1629,9 @@ ircd::const_buffer
ircd::server::tag::read_buffer(const const_buffer &buffer, ircd::server::tag::read_buffer(const const_buffer &buffer,
bool &done) bool &done)
{ {
return assert(request);
!request?
const_buffer{}:
return
head_read < size(request->in.head)? head_read < size(request->in.head)?
read_head(buffer, done): read_head(buffer, done):
@ -1498,10 +1646,9 @@ ircd::mutable_buffer
ircd::server::tag::make_read_buffer() ircd::server::tag::make_read_buffer()
const const
{ {
return assert(request);
!request?
mutable_buffer{}:
return
head_read < size(request->in.head)? head_read < size(request->in.head)?
make_read_head_buffer(): make_read_head_buffer():
@ -1712,7 +1859,7 @@ ircd::server::tag::read_head(const const_buffer &buffer,
if(this->content_read + content_over == head.content_length) if(this->content_read + content_over == head.content_length)
{ {
done = true; done = true;
p.set_value(status); set_value(status);
} }
return overrun; return overrun;
@ -1745,7 +1892,7 @@ ircd::server::tag::read_content(const const_buffer &buffer,
if(content_read == size(content) + content_over) if(content_read == size(content) + content_over)
{ {
done = true; done = true;
p.set_value(status); set_value(status);
} }
return {}; return {};
@ -1822,28 +1969,55 @@ const
}; };
} }
template<class... args>
void void
ircd::server::tag::set_exception(const error_code &ec) ircd::server::tag::set_value(args&&... a)
{ {
boost::system::system_error error if(abandoned())
{ return;
ec
};
auto eptr assert(p.valid());
{ p.set_value(std::forward<args>(a)...);
std::make_exception_ptr(std::move(error)) }
};
set_exception(std::move(eptr)); template<class... args>
void
ircd::server::tag::set_exception(args&&... a)
{
set_exception(std::make_exception_ptr(std::forward<args>(a)...));
} }
void void
ircd::server::tag::set_exception(std::exception_ptr eptr) ircd::server::tag::set_exception(std::exception_ptr eptr)
{ {
if(abandoned())
return;
assert(p.valid());
p.set_exception(std::move(eptr)); p.set_exception(std::move(eptr));
} }
bool
ircd::server::tag::abandoned()
const
{
return p.finished();
}
bool
ircd::server::tag::canceled()
const
{
return bool(cancellation);
}
bool
ircd::server::tag::committed()
const
{
return write_completed() > 0;
}
size_t size_t
ircd::server::tag::read_remaining() ircd::server::tag::read_remaining()
const const