// Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors // Copyright (C) 2016-2018 Jason Volk <jason@zemos.net> // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. #include <ircd/asio.h> namespace ircd::server { // Internal state ctx::dock dock; // internal semaphore // Internal util template<class F> size_t accumulate_peers(F&&); template<class F> size_t accumulate_links(F&&); template<class F> size_t accumulate_tags(F&&); // Internal control std::unique_ptr<peer> create(const net::hostport &); } decltype(ircd::server::log) ircd::server::log { "server", 'S' }; ircd::conf::item<ircd::seconds> close_all_timeout { { "name", "ircd.server.close_all_timeout" }, { "default", 2L }, }; // // init // ircd::server::init::init() { } ircd::server::init::~init() noexcept { interrupt_all(); close_all(); wait_all(); peers.clear(); log::debug { log, "All server peers, connections, and requests are clear." }; } // // server // void ircd::server::wait_all() { while(peer_unfinished()) { if(dock.wait_for(seconds(2))) continue; log::warning { log, "Waiting for %zu tags on %zu links on %zu of %zu peers to close...", tag_count(), link_count(), peer_unfinished(), peer_count() }; } } void ircd::server::close_all() { log::debug { log, "Closing all %zu peers", peer_count() }; net::close_opts opts; opts.timeout = seconds(close_all_timeout); for(auto &peer : peers) peer.second->close(opts); } void ircd::server::interrupt_all() { log::debug { log, "Interrupting %zu tags on %zu links on %zu peers", tag_count(), link_count(), peer_count() }; for(auto &peer : peers) peer.second->cancel(); } ircd::server::peer & ircd::server::get(const net::hostport &hostport) { thread_local char canonbuf[512]; const auto canonized { net::canonize(canonbuf, hostport) }; auto it(peers.lower_bound(canonized)); if(it == peers.end() || it->first != canonized) { auto peer { create(hostport) }; log::debug { log, "peer(%p) for %s created; adding...", peer.get(), canonized }; const string_view key{peer->hostname}; it = peers.emplace_hint(it, key, std::move(peer)); assert(it->second->hostname.data() == it->first.data()); assert(key == canonized); } return *it->second; } std::unique_ptr<ircd::server::peer> ircd::server::create(const net::hostport &hostport) { auto peer { std::make_unique<server::peer>(net::canonize(hostport)) }; peer->open_opts = net::open_opts { peer->remote, net::hostport { peer->hostname, net::canon_service, port(hostport) } }; // Async DNS resolve. The links for the new peer will be connected // once the resolver calls back into peer::handle_resolve(). peer->resolve(peer->open_opts.hostport); return peer; } ircd::server::peer & ircd::server::find(const net::hostport &hostport) { return *peers.at(host(hostport)); } bool ircd::server::exists(const net::hostport &hostport) { return peers.find(host(hostport)) != end(peers); } bool ircd::server::errclear(const net::hostport &hostport) { const auto it { peers.find(host(hostport)) }; if(it == end(peers)) return false; auto &peer(*it->second); return peer.err_clear(); } ircd::string_view ircd::server::errmsg(const net::hostport &hostport) { const auto it { peers.find(host(hostport)) }; if(it == end(peers)) return {}; return it->second->err_msg(); } size_t ircd::server::peer_unfinished() { return accumulate_peers([] (const auto &peer) { return !peer.finished(); }); } size_t ircd::server::peer_count() { return peers.size(); } size_t ircd::server::link_count() { return accumulate_peers([] (const auto &peer) { return peer.link_count(); }); } size_t ircd::server::tag_count() { return accumulate_peers([] (const auto &peer) { return peer.tag_count(); }); } template<class F> size_t ircd::server::accumulate_tags(F&& closure) { return accumulate_links([&closure] (const auto &link) { return link.accumulate_tags(std::forward<F>(closure)); }); } template<class F> size_t ircd::server::accumulate_links(F&& closure) { return accumulate_peers([&closure] (const auto &peer) { return peer.accumulate_links(std::forward<F>(closure)); }); } template<class F> size_t ircd::server::accumulate_peers(F&& closure) { return std::accumulate(begin(peers), end(peers), size_t(0), [&closure] (auto ret, const auto &pair) { const auto &peer{*pair.second}; return ret += closure(peer); }); } /// // request // decltype(ircd::server::request::opts_default) ircd::server::request::opts_default {}; /// Canceling a request is tricky. This allows a robust way to let the user's /// request go out of scope at virtually any time without disrupting the /// pipeline and other requests. bool ircd::server::cancel(request &request) { if(!request.tag) return false; if(request.tag->canceled()) return false; if(request.tag->abandoned()) return false; auto &tag { *request.tag }; /* log::debug { log, "cancel request(%p) tag(%p) commit:%d w:%zu hr:%zu cr:%zu", &request, &tag, tag.committed(), tag.state.written, tag.state.head_read, tag.state.content_read }; */ tag.set_exception(canceled { "Request canceled" }); // We got off easy... The link's write loop won't start an abandoned // request. All that has to be done is indicate a full cancellation // immediately and the user will know nothing was revealed to the remote. if(!tag.committed()) return true; // Now things aren't so easy. More complicated logic happens inside... cancel(request, tag); return true; } void ircd::server::submit(const hostport &hostport, request &request) { if(unlikely(ircd::runlevel != ircd::runlevel::RUN)) throw unavailable { "Unable to fulfill requests at this time." }; assert(request.tag == nullptr); auto &peer(server::get(hostport)); peer.submit(request); } /////////////////////////////////////////////////////////////////////////////// // // server/peer.h // decltype(ircd::server::peers) ircd::server::peers {}; decltype(ircd::server::peer::link_min_default) ircd::server::peer::link_min_default { { "name", "ircd.server.peer.link_min" }, { "default", 1L } }; decltype(ircd::server::peer::link_max_default) ircd::server::peer::link_max_default { { "name", "ircd.server.peer.link_max" }, { "default", 2L } }; // // peer::peer // ircd::server::peer::peer(std::string hostname) :hostname{std::move(hostname)} { } ircd::server::peer::~peer() noexcept { assert(links.empty()); } void ircd::server::peer::close(const net::close_opts &opts) { op_fini = true; link *links[LINK_MAX]; const auto end(pointers(this->links, links)); for(link **link(links); link != end; ++link) (*link)->close(opts); if(finished()) return handle_finished(); } void ircd::server::peer::cancel() { for(auto &link : this->links) link.cancel_all(std::make_exception_ptr(canceled { "Request was aborted due to interruption." })); } bool ircd::server::peer::err_clear() { const auto ret{bool(e)}; e.reset(nullptr); op_fini = false; return ret; } template<class... A> void ircd::server::peer::err_set(A&&... args) { this->e = std::make_unique<err>(std::forward<A>(args)...); } ircd::string_view ircd::server::peer::err_msg() const { return bool(e)? what(e->eptr) : string_view{}; } bool ircd::server::peer::err_has() const { return bool(e); } decltype(ircd::server::peer::error_clear_default) ircd::server::peer::error_clear_default { { "name", "ircd.server.peer.error.clear_default" }, { "default", 305L } }; bool ircd::server::peer::err_check() { if(op_fini) return false; if(!err_has()) return true; //TODO: The specific error type should be switched and finer //TODO: timeouts should be used depending on the error: i.e //TODO: NXDOMAIN vs. temporary conn timeout, etc. if(e->etime + seconds(error_clear_default) > now<system_point>()) return false; err_clear(); return true; } void ircd::server::peer::submit(request &request) try { if(!err_check() || unlikely(ircd::runlevel != ircd::runlevel::RUN)) throw unavailable { "Peer is unable to take any requests: %s", err_msg() }; link *const ret { link_get(request) }; if(likely(ret)) { ret->submit(request); return; } if(!request.tag) throw unavailable { "No link to peer %s available", hostname }; else request.tag->set_exception(unavailable { "No link to peer %s available", hostname }); } catch(const std::exception &e) { if(!request.tag) throw; const auto eptr(std::current_exception()); const ctx::exception_handler eh; request.tag->set_exception(eptr); } /// Dispatch algorithm here; finds the best link to place this request on, /// or creates a new link entirely. There are a number of factors: foremost /// if any special needs are indicated, // ircd::server::link * ircd::server::peer::link_get(const request &request) { assert(request.opt); const auto &prio(request.opt->priority); if(links.empty()) return &link_add(1); // Indicates that we can't add anymore links for this peer and the rest // of the algorithm should consider this. const bool links_maxed { links.size() >= link_max() }; link *best{nullptr}; for(auto &cand : links) { // Don't want a link that's shutting down or marked for exclusion if(cand.op_fini || cand.exclude) continue; if(!best) { best = &cand; continue; } // Indicates that the best candidate has its pipe saturated which can // be factored into the comparisons here. const bool best_maxed { best->tag_committed() >= best->tag_commit_max() }; const bool cand_maxed { cand.tag_committed() >= cand.tag_commit_max() }; if(best_maxed && !cand_maxed) { best = &cand; continue; } if(!best_maxed && cand_maxed) continue; // Candidates's queue has less or same backlog of unsent requests, but // now measure if candidate will take longer to process at least the // write-side of those requests. if(cand.write_remaining() > best->write_remaining()) continue; // Candidate might be working through really large content; though // this is a very sketchy measurement right now since we only *might* // know about content-length for the *one* active tag occupying the // socket. if(cand.read_remaining() > best->read_remaining()) continue; // Coarse distribution based on who has more work; this is weak, should // be replaced. if(cand.tag_count() > best->tag_count()) continue; best = &cand; } // Even though the prio is set to the super special value we allow the // normal loop to first come up with a best link which already is open // rather than unconditionally opening a new connection. if(prio == std::numeric_limits<std::remove_reference<decltype(prio)>::type>::min()) { if(!best) return &link_add(1); if(best->tag_committed()) return &link_add(1); return best; } if(links_maxed) return best; // best might not be good enough, we could try another connection. If best // has a backlog or is working on a large download or slow request. if(!best) { best = &link_add(); return best; } if(best->tag_uncommitted() < best->tag_commit_max()) return best; best = &link_add(); return best; } ircd::server::link & ircd::server::peer::link_add(const size_t &num) { assert(!finished()); if(e) { std::rethrow_exception(e->eptr); __builtin_unreachable(); } assert(!op_fini); links.emplace_back(*this); auto &link{links.back()}; if(remote) link.open(open_opts); return link; } void ircd::server::peer::handle_open(link &link, std::exception_ptr eptr) { if(eptr) { if(links.size() == 1) err_set(eptr); log::derror { log, "peer(%p) link(%p) [%s]: open: %s", this, &link, string(remote), what(eptr) }; if(op_fini) { if(link.finished()) handle_finished(link); return; } link.close(net::dc::RST); return; } } void ircd::server::peer::handle_close(link &link, std::exception_ptr eptr) { if(eptr) log::derror { log, "peer(%p) link(%p) [%s]: close: %s", this, &link, string(remote), what(eptr) }; if(link.finished()) handle_finished(link); } void ircd::server::peer::handle_error(link &link, std::exception_ptr eptr) { assert(bool(eptr)); link.cancel_committed(eptr); log::derror { log, "peer(%p) link(%p): %s", this, &link, what(eptr) }; link.close(net::dc::RST); } void ircd::server::peer::handle_error(link &link, const boost::system::system_error &e) { using namespace boost::system::errc; using boost::system::system_category; using boost::asio::error::get_misc_category; const auto &ec{e.code()}; if(ec.category() == system_category()) switch(ec.value()) { case success: assert(0); break; default: break; } else if(ec.category() == get_misc_category()) switch(ec.value()) { case asio::error::eof: log::debug { log, "peer(%p) link(%p) [%s]: %s", this, &link, string(remote), e.what() }; link.close(net::close_opts_default); return; default: break; } log::derror { log, "peer(%p) link(%p) [%s]: error: %s", this, &link, string(remote), e.what() }; link.cancel_committed(std::make_exception_ptr(e)); link.close(net::dc::RST); } void ircd::server::peer::handle_finished(link &link) { assert(link.finished()); del(link); if(finished()) handle_finished(); } /// This is where we're notified a tag has been completed either to start the /// next request when the link has too many requests in flight or perhaps to /// reschedule the queues in various links to diffuse the pending requests. /// This can't throw because the link still has to remove this tag from its /// queue. void ircd::server::peer::handle_tag_done(link &link, tag &tag) noexcept try { log::debug { log, "peer(%p) link(%p) tag(%p) done wt:%zu rt:%zu hr:%zu cr:%zu cl:%zu; %zu more in queue", this, &link, &tag, tag.write_size(), tag.read_size(), tag.state.head_read, tag.state.content_read, tag.state.content_length, link.tag_count() - 1 }; if(link.tag_committed() >= link.tag_commit_max()) link.wait_writable(); } catch(const std::exception &e) { log::critical { log, "peer(%p) link(%p) tag(%p) done; error: %s", this, &link, &tag, e.what() }; } /// This is where we're notified a link has processed its queue and has no /// more work. We can choose whether to close the link or keep it open and /// reinstate the read poll; reschedule other work to this link, etc. void ircd::server::peer::handle_link_done(link &link) { assert(link.tag_count() == 0); if(link_ready() > link_min()) { link.close(); return; } link.wait_readable(); } /// This is called when a tag on a link receives an HTTP response head. /// We can use this to learn information from the tag's request and the /// response head etc. void ircd::server::peer::handle_head_recv(const link &link, const tag &tag, const http::response::head &head) { // Learn the software version of the remote peer so we can shape // requests more effectively. if(!server_name && head.server) { server_name = std::string{head.server}; log::debug { log, "peer(%p) learned %s is '%s'", this, string(remote), server_name }; } } void ircd::server::peer::disperse(link &link) { disperse_uncommitted(link); link.cancel_committed(std::make_exception_ptr(canceled { "Request was aborted; though it was partially completed" })); assert(link.queue.empty()); } void ircd::server::peer::disperse_uncommitted(link &link) { auto &queue(link.queue); auto it(begin(queue)); while(it != end(queue)) try { auto &tag{*it}; if(!tag.request || tag.committed()) { ++it; continue; } submit(*tag.request); it = queue.erase(it); } catch(const std::exception &e) { const auto &tag{*it}; log::warning { log, "peer(%p) failed to resubmit tag(%p): %s", this, &tag, e.what() }; it = queue.erase(it); } } /// This *cannot* be called unless a link's socket is closed and its queue /// is empty. It is usually only called by a disconnect handler because /// the proper way to remove a link is asynchronously through link.close(); void ircd::server::peer::del(link &link) { assert(!link.tag_count()); assert(!link.opened()); const auto it(std::find_if(begin(links), end(links), [&link] (const auto &link_) { return &link_ == &link; })); assert(it != end(links)); log::debug { log, "peer(%p) removing link(%p) %zu of %zu to %s", this, &link, std::distance(begin(links), it), links.size(), string(remote) }; links.erase(it); } void ircd::server::peer::resolve(const hostport &hostport) { if(op_resolve || op_fini) return; auto handler { std::bind(&peer::handle_resolve, this, ph::_1, ph::_2, ph::_3) }; op_resolve = true; net::dns(hostport, std::move(handler)); } void ircd::server::peer::handle_resolve(std::exception_ptr eptr, const hostport &, const ipport &ipport) try { const ctx::critical_assertion ca; assert(op_resolve); op_resolve = false; if(eptr) { err_set(eptr); std::rethrow_exception(eptr); __builtin_unreachable(); } // Save the results of the query to this object instance. this->remote = ipport; open_opts.ipport = this->remote; port(open_opts.hostport) = port(ipport); // The hostname in open_opts should still reference this object's string. assert(host(open_opts.hostport).data() == this->hostname.data()); if(unlikely(ircd::runlevel != ircd::runlevel::RUN)) op_fini = true; if(unlikely(finished())) return handle_finished(); if(op_fini) return; link *links[LINK_MAX]; const auto end(pointers(this->links, links)); for(link **link(links); link != end; ++link) (*link)->open(open_opts); } catch(const std::exception &e) { log::derror { log, "peer(%p): error: %s", this, e.what() }; close(); } void ircd::server::peer::handle_finished() { assert(finished()); // Right now this is what the server:: ~init sequence needs // to wait for all links to close on IRCd shutdown. server::dock.notify_all(); } size_t ircd::server::peer::read_total() const { return read_bytes; } size_t ircd::server::peer::write_total() const { return write_bytes; } size_t ircd::server::peer::read_remaining() const { return accumulate_links([](const auto &link) { return link.read_remaining(); }); } size_t ircd::server::peer::read_completed() const { return accumulate_links([](const auto &link) { return link.read_completed(); }); } size_t ircd::server::peer::read_size() const { return accumulate_links([](const auto &link) { return link.read_size(); }); } size_t ircd::server::peer::write_remaining() const { return accumulate_links([](const auto &link) { return link.write_remaining(); }); } size_t ircd::server::peer::write_completed() const { return accumulate_links([](const auto &link) { return link.write_completed(); }); } size_t ircd::server::peer::write_size() const { return accumulate_links([](const auto &link) { return link.write_size(); }); } size_t ircd::server::peer::tag_uncommitted() const { return accumulate_links([](const auto &link) { return link.tag_uncommitted(); }); } size_t ircd::server::peer::tag_committed() const { return accumulate_links([](const auto &link) { return link.tag_committed(); }); } size_t ircd::server::peer::tag_count() const { return accumulate_links([](const auto &link) { return link.tag_count(); }); } size_t ircd::server::peer::link_ready() const { return accumulate_links([](const auto &link) { return link.ready(); }); } size_t ircd::server::peer::link_busy() const { return accumulate_links([](const auto &link) { return link.busy(); }); } size_t ircd::server::peer::link_count() const { return links.size(); } size_t ircd::server::peer::link_min() const { return link_min_default; } size_t ircd::server::peer::link_max() const { return link_max_default; } bool ircd::server::peer::finished() const { return links.empty() && !op_resolve && op_fini; } template<class F> size_t ircd::server::peer::accumulate_tags(F&& closure) const { return accumulate_links([&closure](const auto &link) { return link.accumulate([&closure](const auto &tag) { return closure(tag); }); }); } template<class F> size_t ircd::server::peer::accumulate_links(F&& closure) const { return std::accumulate(begin(links), end(links), size_t(0), [&closure] (auto ret, const auto &tag) { return ret += closure(tag); }); } // // link // decltype(ircd::server::link::tag_max_default) ircd::server::link::tag_max_default { { "name", "ircd.server.link.tag_max" }, { "default", -1L } }; decltype(ircd::server::link::tag_commit_max_default) ircd::server::link::tag_commit_max_default { { "name", "ircd.server.link.tag_commit_max" }, { "default", 3L } }; // // link::link // ircd::server::link::link(server::peer &peer) :peer{&peer} { } ircd::server::link::~link() noexcept { assert(!busy()); assert(!opened()); } void ircd::server::link::submit(request &request) { assert(!request.tag || !request.tag->committed()); const auto it { request.tag? queue.emplace(end(queue), std::move(*request.tag)): queue.emplace(end(queue), request) }; /* log::debug { log, "tag(%p) submitted to link(%p) queue: %zu", &(*it), this, tag_count() }; */ if(ready()) wait_writable(); } void ircd::server::link::cancel_all(std::exception_ptr eptr) { for(auto it(begin(queue)); it != end(queue); it = queue.erase(it)) { auto &tag{*it}; if(!tag.request) continue; tag.set_exception(eptr); } } void ircd::server::link::cancel_committed(std::exception_ptr eptr) { for(auto it(begin(queue)); it != end(queue); it = queue.erase(it)) { auto &tag{*it}; if(!tag.request) continue; if(!tag.committed()) break; tag.set_exception(eptr); } } void ircd::server::link::cancel_uncommitted(std::exception_ptr eptr) { auto it(begin(queue)); while(it != end(queue)) { auto &tag{*it}; if(!tag.request || tag.committed()) { ++it; continue; } tag.set_exception(eptr); it = queue.erase(it); } } bool ircd::server::link::open(const net::open_opts &open_opts) { assert(ircd::runlevel == ircd::runlevel::RUN); if(op_init) return false; auto handler { std::bind(&link::handle_open, this, ph::_1) }; op_init = true; const unwind::exceptional unhandled{[this] { op_init = false; }}; socket = net::open(open_opts, std::move(handler)); return true; } void ircd::server::link::handle_open(std::exception_ptr eptr) { assert(op_init); op_init = false; if(!eptr && !op_fini) wait_writable(); if(peer) peer->handle_open(*this, std::move(eptr)); } bool ircd::server::link::close(const net::close_opts &close_opts) { if(op_fini) return false; op_fini = true; // Tell the peer to ditch everything in the queue; op_fini has been set so // the tags won't get assigned back to this link. if(tag_count() && peer) peer->disperse(*this); auto handler { std::bind(&link::handle_close, this, ph::_1) }; if(!socket) { handler(std::exception_ptr{}); return true; } net::close(*socket, close_opts, std::move(handler)); return true; } void ircd::server::link::handle_close(std::exception_ptr eptr) { assert(op_fini); if(op_init) { assert(bool(eptr)); } if(peer) peer->handle_close(*this, std::move(eptr)); } void ircd::server::link::wait_writable() { if(op_write || unlikely(op_fini)) return; auto handler { std::bind(&link::handle_writable, this, ph::_1) }; assert(ready()); op_write = true; const unwind::exceptional unhandled{[this] { op_write = false; }}; net::wait(*socket, net::ready::WRITE, std::move(handler)); } void ircd::server::link::handle_writable(const error_code &ec) try { using namespace boost::system::errc; using boost::system::system_category; op_write = false; if(unlikely(finished())) { assert(peer); return peer->handle_finished(*this); } if(ec.category() == system_category()) switch(ec.value()) { case success: handle_writable_success(); return; case operation_canceled: return; default: break; } throw boost::system::system_error{ec}; } catch(const boost::system::system_error &e) { assert(peer); peer->handle_error(*this, e); } catch(...) { assert(peer); peer->handle_error(*this, std::current_exception()); } void ircd::server::link::handle_writable_success() { auto it(begin(queue)); while(it != end(queue)) { auto &tag{*it}; if((tag.abandoned() || tag.canceled()) && !tag.committed()) { log::debug { log, "link(%p) discarding canceled:%d abandoned:%d uncommitted tag %zu of %zu", this, tag.canceled(), tag.abandoned(), tag_committed(), tag_count() }; it = queue.erase(it); continue; } if(tag.canceled() && tag.committed() && tag_committed() <= 1) { log::debug { log, "link(%p) closing to interrupt canceled committed tag(%p) of %zu", this, &tag, tag_count() }; close(); break; } if(tag_committed() == 0) wait_readable(); if(!process_write(tag)) { wait_writable(); break; } // Limits the amount of requests in the pipe. if(tag_committed() >= tag_commit_max()) break; ++it; } } bool ircd::server::link::process_write(tag &tag) { if(!tag.committed()) log::debug { log, "peer(%p) link(%p) starting on tag(%p) %zu of %zu: wt:%zu", peer, this, &tag, tag_committed(), tag_count(), tag.write_size() }; while(tag.write_remaining()) { const const_buffer buffer { tag.make_write_buffer() }; assert(!empty(buffer)); const const_buffer written { process_write_next(buffer) }; tag.wrote_buffer(written); assert(tag_committed() <= tag_commit_max()); if(size(written) < size(buffer)) return false; } return true; } ircd::const_buffer ircd::server::link::process_write_next(const const_buffer &buffer) { const size_t bytes { write_any(*socket, buffer) }; const const_buffer written { data(buffer), bytes }; assert(peer); peer->write_bytes += bytes; return written; } void ircd::server::link::wait_readable() { if(op_read || op_fini) return; assert(ready()); op_read = true; const unwind::exceptional unhandled{[this] { op_read = false; }}; auto handler { std::bind(&link::handle_readable, this, ph::_1) }; net::wait(*socket, net::ready::READ, std::move(handler)); } void ircd::server::link::handle_readable(const error_code &ec) try { using namespace boost::system::errc; using boost::system::system_category; op_read = false; if(unlikely(finished())) { assert(peer); return peer->handle_finished(*this); } if(ec.category() == system_category()) switch(ec.value()) { case success: handle_readable_success(); return; case operation_canceled: return; default: break; } throw boost::system::system_error{ec}; } catch(const boost::system::system_error &e) { assert(peer); peer->handle_error(*this, e); } catch(...) { assert(peer); peer->handle_error(*this, std::current_exception()); } /// Process as many read operations from as many tags as possible void ircd::server::link::handle_readable_success() { if(queue.empty()) { discard_read(); wait_readable(); return; } // Data pointed to by overrun will remain intact between iterations // because this loop isn't executing in any ircd::ctx. const_buffer overrun; do { if(!process_read(overrun)) { wait_readable(); return; } } while(!queue.empty()); assert(peer); peer->handle_link_done(*this); } /// Process as many read operations for one tag as possible bool ircd::server::link::process_read(const_buffer &overrun) try { auto &tag { queue.front() }; if(!tag.committed()) { // Tag hasn't sent its data yet, we shouldn't have anything for it assert(empty(overrun)); discard_read(); // Should stumble on a socket error. return false; } if(tag.canceled() && tag_committed() <= 1) { log::debug { log, "link(%p) closing to interrupt canceled committed tag(%p) of %zu", this, &tag, tag_count() }; close(); return false; } bool done{false}; do { overrun = process_read_next(overrun, tag, done); } while(!done); assert(peer); peer->handle_tag_done(*this, tag); assert(!queue.empty()); queue.pop_front(); return true; } catch(const buffer_overrun &e) { queue.pop_front(); throw; } catch(const boost::system::system_error &e) { using namespace boost::system::errc; switch(e.code().value()) { case resource_unavailable_try_again: return false; case success: assert(0); return true; default: throw; } } /// Process one read operation for one tag ircd::const_buffer ircd::server::link::process_read_next(const const_buffer &underrun, tag &tag, bool &done) try { const mutable_buffer buffer { tag.make_read_buffer() }; const size_t copied { copy(buffer, underrun) }; const mutable_buffer remaining { data(buffer) + copied, size(buffer) - copied }; const const_buffer view { read(remaining) }; const const_buffer overrun { tag.read_buffer(view, done, *this) }; assert(done || empty(overrun)); return overrun; } catch(const buffer_overrun &e) { tag.set_exception(e); throw; } /// Read directly off the link's socket into buf ircd::const_buffer ircd::server::link::read(const mutable_buffer &buf) { assert(!empty(buf)); const size_t received { read_one(*socket, buf) }; assert(peer); peer->read_bytes += received; assert(received); return const_buffer { data(buf), received }; } void ircd::server::link::discard_read() { ssize_t discard { SSL_pending(socket->ssl.native_handle()) }; if(discard <= 0 && queue.empty()) discard = available(*socket); if(discard <= 0 && !queue.empty()) discard = 1; const size_t discarded { discard_any(*socket, size_t(discard)) }; assert(peer); peer->read_bytes += discarded; // Shouldn't ever be hit because the read() within discard() throws // the pending error like an eof. log::warning { log, "link(%p) socket(%p) to %s discarded %zu of %zd unexpected bytes", this, socket.get(), likely(peer)? string(peer->remote) : string(remote_ipport(*socket)), discarded, discard }; // just in case so this doesn't get loopy with discarding zero with // an empty queue... if(unlikely(!discard && !discarded)) throw assertive { "peer(%p) link(%p) socket(%p) queue is empty and nothing to discard.", peer, this, socket.get() }; } size_t ircd::server::link::tag_uncommitted() const { return tag_count() - tag_committed(); } size_t ircd::server::link::tag_committed() const { return accumulate_tags([](const auto &tag) { return tag.committed(); }); } size_t ircd::server::link::tag_count() const { return queue.size(); } size_t ircd::server::link::read_total() const { return socket? socket->in.bytes : 0; } size_t ircd::server::link::write_total() const { return socket? socket->out.bytes : 0; } size_t ircd::server::link::read_remaining() const { return accumulate_tags([](const auto &tag) { return tag.read_remaining(); }); } size_t ircd::server::link::read_completed() const { return accumulate_tags([](const auto &tag) { return tag.read_completed(); }); } size_t ircd::server::link::read_size() const { return accumulate_tags([](const auto &tag) { return tag.read_size(); }); } size_t ircd::server::link::write_remaining() const { return accumulate_tags([](const auto &tag) { return tag.write_remaining(); }); } size_t ircd::server::link::write_completed() const { return accumulate_tags([](const auto &tag) { return tag.write_completed(); }); } size_t ircd::server::link::write_size() const { return accumulate_tags([](const auto &tag) { return tag.write_size(); }); } bool ircd::server::link::busy() const { return !queue.empty(); } bool ircd::server::link::ready() const { return opened() && !op_init && !op_fini; } bool ircd::server::link::opened() const noexcept { return bool(socket) && net::opened(*socket); } bool ircd::server::link::finished() const { if(!bool(socket)) return true; return !opened() && op_fini && !op_init && !op_write && !op_read; } size_t ircd::server::link::tag_commit_max() const { return tag_commit_max_default; } size_t ircd::server::link::tag_max() const { return tag_max_default; } template<class F> size_t ircd::server::link::accumulate_tags(F&& closure) const { return std::accumulate(begin(queue), end(queue), size_t(0), [&closure] (auto ret, const auto &tag) { return ret += closure(tag); }); } // // tag // /// This is tricky. When a user cancels a request which has committed some /// writes to the remote we have to continue to service it through to /// completion without disrupting the linearity of the link's pipeline /// and causing trouble with other requests. This all depends on what phase /// the request is currently in. /// /// In any case, the goal here is to swap out the user's request buffers /// and replace them with cancellation buffers which will be transparent /// to the link as it completes the request. void ircd::server::cancel(request &request, tag &tag) noexcept { // Must have a fully associated request/tag which has committed some // data to the wire to enter this routine. assert(tag.committed()); assert(!tag.canceled()); assert(request.tag == &tag); assert(tag.request == &request); // Disassociate the user's request and add our dummy request in its place. disassociate(request, tag); assert(tag.request == nullptr); tag.request = new server::request{}; tag.request->tag = &tag; // Setup the cancellation buffers by mirroring the current state of the // user's buffers. const size_t cancellation_size { size(request.out) + size(request.in) }; tag.cancellation = std::make_unique<char[]>(cancellation_size); char *ptr{tag.cancellation.get()}; const mutable_buffer out_head{ptr, size(request.out.head)}; tag.request->out.head = out_head; ptr += size(out_head); const mutable_buffer out_content{ptr, size(request.out.content)}; tag.request->out.content = out_content; ptr += size(out_content); const mutable_buffer in_head{ptr, size(request.in.head)}; tag.request->in.head = in_head; ptr += size(in_head); const mutable_buffer in_content{ptr, size(request.in.content)}; // The nullity (btw that's a real word) of in.content has to be preserved // between the user's tag and the cancellation tag. This is important for // a dynamic chunked encoded response which has null in.content until done. if(!null(request.in.content)) { tag.request->in.content = in_content; ptr += size(in_content); } else tag.request->in.content = request.in.content; assert(size_t(std::distance(tag.cancellation.get(), ptr)) == cancellation_size); // If the head is not completely written we have to copy the remainder from where // the socket left off. if(tag.state.written < size(request.out.head)) { const const_buffer src { data(request.out.head) + tag.state.written, size(request.out.head) - tag.state.written }; const mutable_buffer dst { out_head + tag.state.written }; copy(dst, src); } // If the content is not completely written we have to copy the remainder from where // the socket left off. const size_t content_written { tag.state.written > size(request.out.head)? tag.state.written - size(request.out.head) : 0 }; if(content_written < size(request.out.content)) { const const_buffer src { data(request.out.content) + content_written, size(request.out.content) - content_written }; const mutable_buffer dst { out_content + content_written }; copy(dst, src); } // If the head is not completely read we have to copy what's been received so far so // we can parse a coherent head. if(tag.state.head_read > 0 && tag.state.head_read < size(request.in.head)) { const const_buffer src { data(request.in.head), tag.state.head_read }; const mutable_buffer dst { data(in_head), size(in_head) }; copy(dst, src); } // Normally we have no reason to copy content, but there is one exception: // If the content is chunked encoding and the tag is in the phase of // receiving the chunk head we have to copy what's been received of that // head so far so the grammar can parse a coherent head to continue. if(tag.state.chunk_length == size_t(-1) && !null(request.in.content)) { const const_buffer src { data(request.in.content) + tag.state.content_length, tag.state.content_read - tag.state.content_length }; const mutable_buffer dst { in_content + tag.state.content_length }; copy(dst, src); } // Moving the dynamic buffer should have no real effect because the // cancellation buffer already took over for it. We could do it anyway // to prevent regressions but at the cost of maintaining twice the memory // allocated. For now it's commented to let it die with the user's req. //tag.request->in.dynamic = std::move(request.in.dynamic); // Moving the chunk vector is important to maintain the state of dynamic // chunk transfers through this cancel. There is no condition here for if // this is not a dynamic chunk transfer because it's trivial. tag.request->in.chunks = std::move(request.in.chunks); } void ircd::server::associate(request &request, tag &tag) { assert(request.tag == nullptr); assert(tag.request == nullptr); auto &future { static_cast<ctx::future<http::code> &>(request) }; future = tag.p; request.tag = &tag; tag.request = &request; } void ircd::server::associate(request &request, tag &cur, tag &&old) noexcept { assert(request.tag == &old); // ctor moved assert(cur.request == &request); // ctor moved assert(old.request == &request); // ctor didn't trash old cur.request = &request; old.request = nullptr; request.tag = &cur; } void ircd::server::associate(request &cur, tag &tag, request &&old) noexcept { assert(tag.request == &old); // ctor already moved assert(cur.tag == &tag); // ctor already moved assert(old.tag == &tag); // ctor didn't trash old cur.tag = &tag; tag.request = &cur; old.tag = nullptr; } void ircd::server::disassociate(request &request, tag &tag) { assert(request.tag == &tag); assert(tag.request == &request); assert(tag.abandoned()); request.tag = nullptr; tag.request = nullptr; // If the original request was canceled a new request was attached in its // place in addition to an cancellation buffer. The existence of this // cancellation buffer indicates that we must delete the request here. // This is a little hacky but it gets the job done. if(bool(tag.cancellation)) delete &request; } void ircd::server::tag::wrote_buffer(const const_buffer &buffer) { assert(request); const auto &req{*request}; state.written += size(buffer); if(state.written <= size(req.out.head)) { assert(data(buffer) >= begin(req.out.head)); assert(data(buffer) < end(req.out.head)); } else if(state.written <= size(req.out.head) + size(req.out.content)) { assert(data(buffer) >= begin(req.out.content)); assert(data(buffer) < end(req.out.content)); assert(state.written <= write_size()); // Invoke the user's optional progress callback; this function // should be marked noexcept and has no reason to throw yet. if(req.out.progress) req.out.progress(buffer, const_buffer{data(req.out.content), state.written}); } else { assert(0); } } ircd::const_buffer ircd::server::tag::make_write_buffer() const { assert(request); const auto &req{*request}; return state.written < size(req.out.head)? make_write_head_buffer(): state.written < size(req.out.head) + size(req.out.content)? make_write_content_buffer(): const_buffer{}; } ircd::const_buffer ircd::server::tag::make_write_head_buffer() const { assert(request); const auto &req{*request}; const size_t remain { size(req.out.head) - state.written }; const const_buffer window { data(req.out.head) + state.written, remain }; return window; } ircd::const_buffer ircd::server::tag::make_write_content_buffer() const { assert(request); const auto &req{*request}; assert(state.written >= size(req.out.head)); const size_t content_offset { state.written - size(req.out.head) }; const size_t remain { size(req.out.head) + size(req.out.content) - state.written }; const const_buffer window { data(req.out.content) + content_offset, remain }; return window; } /// Called by the controller of the socket with a view of the data received by /// the socket. The location and size of `buffer` is the same or smaller than /// the buffer previously supplied by make_read_buffer(). /// /// Sometimes make_read_buffer() supplies a buffer that is too large, and some /// data read off the socket does not belong to this tag. In that case, This /// function returns a const_buffer viewing the portion of `buffer` which is /// considered the "overrun," and the socket controller will copy that over to /// the next tag. /// /// The tag indicates it is entirely finished with receiving its data by /// setting the value of `done` to true. Otherwise it is assumed false. /// /// The link argument is not to be used to control/modify the link from the /// tag; it's only a backreference to flash information to the link/peer /// through specific callbacks so the peer can learn information. /// ircd::const_buffer ircd::server::tag::read_buffer(const const_buffer &buffer, bool &done, link &link) { assert(request); if(state.status == (http::code)0) return read_head(buffer, done, link); if(state.chunk_length == size_t(-1) && null(request->in.content)) return read_chunk_dynamic_head(buffer, done); if(state.chunk_length == size_t(-1)) return read_chunk_head(buffer, done); if(state.chunk_length && null(request->in.content)) return read_chunk_dynamic_content(buffer, done); if(state.chunk_length) return read_chunk_content(buffer, done); return read_content(buffer, done); } ircd::const_buffer ircd::server::tag::read_head(const const_buffer &buffer, bool &done, link &link) { assert(request); auto &req{*request}; // informal search for head terminator static const string_view terminator{"\r\n\r\n"}; const auto pos { string_view{buffer}.find(terminator) }; // No terminator found; account for what was received in this buffer // for the next call to make_head_buffer() preparing for the subsequent // invocation of this function with more data. if(pos == string_view::npos) { state.head_read += size(buffer); return {}; } // This indicates how much head was just received from this buffer only, // including the terminator which is considered part of the dome. const size_t addl_head_bytes { pos + size(terminator) }; // The received buffer may go past the end of the head. assert(addl_head_bytes <= size(buffer)); const size_t beyond_head_len { size(buffer) - addl_head_bytes }; // The final update for the confirmed length of the head. state.head_read += addl_head_bytes; const size_t &head_read{state.head_read}; assert(head_read + beyond_head_len <= size(req.in.head)); // Window on any data in the buffer after the head. const const_buffer beyond_head { data(req.in.head) + head_read, beyond_head_len }; // Before changing the user's head buffer, we branch for a feature that // allows the user to receive head and content into a single contiguous // buffer by assigning in.content = in.head. const bool contiguous { data(req.in.content) == data(req.in.head) }; // Alternatively branch for a feature that allows dynamic allocation of // the content buffer if the user did not specify any buffer. const bool dynamic { !contiguous && empty(req.in.content) }; // Resize the user's head buffer tight to the head; this is how we convey // the size of the dome back to the user. state.head_rem = size(req.in.head) - head_read; req.in.head = mutable_buffer { data(req.in.head), head_read }; // Setup the capstan and mark the end of the tape parse::buffer pb{req.in.head}; parse::capstan pc{pb}; pc.read += size(req.in.head); // Play the tape through the formal grammar. const http::response::head head{pc}; assert(pb.completed() == head_read); state.status = http::status(head.status); state.content_length = head.content_length; // Proffer the HTTP head to the peer instance which owns the link working // this tag so it can learn from any header data. assert(link.peer); link.peer->handle_head_recv(link, *this, head); if(contiguous) { const auto content_max { std::max(ssize_t(size(req.in.content) - head_read), ssize_t(0)) }; req.in.content = mutable_buffer { data(req.in.head) + head_read, size_t(content_max) }; } // Branch for starting chunked encoding. We feed it whatever we have from // beyond the head as whole or part (or none) of the first chunk. Similar // to the non-chunked routine below, beyond_head may include all of the // chunk content and then part of the next message too: read_chunk_head // will return anything beyond this message as overrun and indicate done. if(head.transfer_encoding == "chunked") { if(dynamic) { assert(req.opt); req.in.chunks.reserve(req.opt->chunks_reserve); } const const_buffer chunk { !dynamic? const_buffer{data(req.in.content), move(req.in.content, beyond_head)}: beyond_head }; state.chunk_length = -1; const const_buffer overrun { !dynamic? read_chunk_head(chunk, done): read_chunk_dynamic_head(chunk, done) }; assert(empty(overrun) || done == true); return overrun; } // If no branch taken the rest of this function expects a content length // to be known from the received head. if(head.transfer_encoding) throw error { "Unsupported transfer-encoding '%s'", head.transfer_encoding }; if(dynamic) { assert(req.opt); const size_t alloc_size { std::min(state.content_length, req.opt->content_length_maxalloc) }; req.in.dynamic = unique_buffer<mutable_buffer>{alloc_size}; req.in.content = req.in.dynamic; } // Now we check how much content was received beyond the head const size_t &content_read { std::min(state.content_length, beyond_head_len) }; // Now we know how much bleed into the next message was also received assert(beyond_head_len >= content_read); const size_t beyond_content_len { beyond_head_len - content_read }; const const_buffer partial_content { data(req.in.head) + head_read, content_read }; // Anything remaining is not our response and must be given back. const const_buffer overrun { data(beyond_head) + size(partial_content), beyond_content_len }; // Reduce the user's content buffer to the content-length. This is sort of // how we convey the content-length back to the user. The buffer size will // eventually reflect how much content was actually received; the user can // find the given content-length by parsing the header. req.in.content = mutable_buffer { data(req.in.content), std::min(state.content_length, size(req.in.content)) }; // Any partial content was written to the head buffer by accident, // that may have to be copied over to the content buffer. if(!empty(partial_content) && !contiguous) copy(req.in.content, partial_content); // Invoke the read_content() routine which will increment this->content_read read_content(partial_content, done); assert(state.content_read == size(partial_content)); assert(state.content_read == state.content_length || !done); return overrun; } ircd::const_buffer ircd::server::tag::read_content(const const_buffer &buffer, bool &done) { assert(request); auto &req{*request}; const auto &content{req.in.content}; // The amount of remaining content for the response sequence assert(size(content) + content_overflow() >= state.content_read); assert(size(content) + content_overflow() == state.content_length); const size_t remaining { size(content) + content_overflow() - state.content_read }; // The amount of content read in this buffer only. const size_t addl_content_read { std::min(size(buffer), remaining) }; state.content_read += addl_content_read; assert(size(buffer) - addl_content_read == 0); assert(state.content_read <= size(content) + content_overflow()); assert(state.content_read <= state.content_length); // Invoke the user's optional progress callback; this function // should be marked noexcept for the time being. if(req.in.progress) req.in.progress(buffer, const_buffer{data(content), state.content_read}); if(state.content_read == size(content) + content_overflow()) { assert(state.content_read == state.content_length); assert(!done); done = true; set_value(state.status); } return {}; } ircd::const_buffer ircd::server::tag::read_chunk_head(const const_buffer &buffer, bool &done, const uint8_t recursion_level) { assert(request); auto &req{*request}; const auto &content{req.in.content}; // informal search for head terminator static const string_view terminator{"\r\n"}; const auto pos { string_view{buffer}.find(terminator) }; if(pos == string_view::npos) { state.content_read += size(buffer); return {}; } // This indicates how much head was just received from this buffer only. const size_t addl_head_bytes { pos + size(terminator) }; // The received buffer may go past the end of the head. assert(addl_head_bytes <= size(buffer)); const size_t beyond_head_length { size(buffer) - addl_head_bytes }; // The total head length is found from the end of the last chunk content state.content_read += addl_head_bytes; assert(state.content_read > state.content_length); const size_t head_length { state.content_read - state.content_length }; // Window on any data in the buffer after the head. const const_buffer beyond_head { data(content) + state.content_length + head_length, beyond_head_length }; // Setup the capstan and mark the end of the tape parse::buffer pb { mutable_buffer { data(content) + state.content_length, head_length } }; parse::capstan pc{pb}; pc.read += head_length; // Play the tape through the formal grammar. const http::response::chunk chunk{pc}; state.chunk_length = chunk.size + size(terminator); // Now we check how much chunk was received beyond the head const auto &chunk_read { std::min(state.chunk_length, beyond_head_length) }; // Now we know how much bleed into the next message was also received assert(beyond_head_length >= chunk_read); const size_t beyond_chunk_length { beyond_head_length - chunk_read }; // Finally we erase the chunk head by replacing it with everything received // after it. const mutable_buffer target { data(content) + state.content_length, beyond_head_length }; move(target, beyond_head); // Increment the content_length to now include this chunk state.content_length += state.chunk_length; // Adjust the content_read to erase the chunk head. state.content_read -= head_length; const const_buffer partial_chunk { data(target), chunk_read }; const const_buffer overrun { data(target) + chunk_read, beyond_chunk_length }; assert(state.chunk_length >= 2); read_chunk_content(partial_chunk, done); if(done) return overrun; // Prevent stack overflow from lots of tiny chunks nagled together. if(unlikely(recursion_level >= 32)) throw error { "Chunking recursion limit exceeded" }; return read_chunk_head(overrun, done, recursion_level + 1); } ircd::const_buffer ircd::server::tag::read_chunk_content(const const_buffer &buffer, bool &done) { assert(request); auto &req{*request}; const auto &content{req.in.content}; // The amount of remaining content for the response sequence const size_t remaining { content_remaining() }; // The amount of content read in this buffer only. const size_t addl_content_read { std::min(size(buffer), remaining) }; // Increment the read counters for this chunk and all chunks. state.chunk_read += addl_content_read; state.content_read += addl_content_read; assert(state.chunk_read <= state.content_read); if(state.content_read == state.content_length) { // This branch is taken at the completion of a chunk. The size // all the buffers is rolled back to hide the terminator so it's // either ignored or overwritten so it doesn't leak to the user. static const string_view terminator{"\r\n"}; assert(state.content_length >= size(terminator)); state.content_length -= size(terminator); state.content_read -= size(terminator); assert(state.chunk_length >= 2); assert(state.chunk_read == state.chunk_length); state.chunk_length -= size(terminator); state.chunk_read -= size(terminator); if(state.chunk_length == 0) { assert(state.chunk_read == 0); assert(!done); done = true; req.in.content = mutable_buffer{data(req.in.content), state.content_length}; set_value(state.status); } } // Invoke the user's optional progress callback; this function // should be marked noexcept for the time being. if(req.in.progress && !done) req.in.progress(buffer, const_buffer{data(content), state.content_read}); if(state.content_read == state.content_length) { assert(state.chunk_read == state.chunk_length); assert(state.chunk_read <= state.content_read); state.chunk_length = size_t(-1); state.chunk_read = 0; } return {}; } ircd::const_buffer ircd::server::tag::read_chunk_dynamic_head(const const_buffer &buffer, bool &done, const uint8_t recursion_level) { assert(request); auto &req{*request}; // informal search for head terminator static const string_view terminator{"\r\n"}; const auto pos { string_view{buffer}.find(terminator) }; if(pos == string_view::npos) { state.chunk_read += size(buffer); state.content_read += size(buffer); return {}; } // This indicates how much head was just received from this buffer only. const size_t addl_head_bytes { pos + size(terminator) }; // The received buffer may go past the end of the head. assert(addl_head_bytes <= size(buffer)); const size_t beyond_head_length { size(buffer) - addl_head_bytes }; state.chunk_read += addl_head_bytes; const auto head_length{state.chunk_read}; state.chunk_read = 0; // Window on any data in the buffer after the head. const const_buffer beyond_head { data(buffer) + addl_head_bytes, beyond_head_length }; // Setup the capstan and mark the end of the tape parse::buffer pb { mutable_buffer { data(req.in.head) + state.head_read, head_length } }; parse::capstan pc{pb}; pc.read += head_length; // Play the tape through the formal grammar. const http::response::chunk chunk{pc}; assert(state.chunk_length == size_t(-1)); state.chunk_length = chunk.size + size(terminator); // Increment the content_length to now include this chunk state.content_length += state.chunk_length; // Allocate the chunk content on the vector. //TODO: maxalloc req.in.chunks.emplace_back(state.chunk_length); // Now we check how much chunk was received beyond the head // state.chunk_head is still 0 here because that's only incremented // in the content read function. const auto &chunk_read { std::min(state.chunk_length, beyond_head_length) }; // Now we know how much bleed into the next message was also received assert(beyond_head_length >= chunk_read); const size_t beyond_chunk_length { beyond_head_length - chunk_read }; const const_buffer partial_chunk { data(beyond_head), chunk_read }; const size_t copied { copy(req.in.chunks.back(), partial_chunk) }; const const_buffer overrun { data(beyond_head) + chunk_read, beyond_chunk_length }; assert(state.chunk_length >= 2); read_chunk_dynamic_content(partial_chunk, done); if(done) return overrun; // Prevent stack overflow from lots of tiny chunks nagled together. if(unlikely(recursion_level >= 32)) throw error { "Chunking recursion limit exceeded" }; return read_chunk_dynamic_head(overrun, done, recursion_level + 1); } ircd::const_buffer ircd::server::tag::read_chunk_dynamic_content(const const_buffer &buffer, bool &done) { assert(request); auto &req{*request}; assert(state.chunk_length != size_t(-1)); assert(null(req.in.content)); assert(!req.in.chunks.empty()); const auto &chunk { req.in.chunks.back() }; // The amount of remaining content for the response sequence assert(state.chunk_read <= size(chunk)); const size_t remaining { size(chunk) - state.chunk_read }; // The amount of content read in this buffer only. const size_t addl_content_read { std::min(size(buffer), remaining) }; // Increment the read counters for this chunk and all chunks. state.chunk_read += addl_content_read; state.content_read += addl_content_read; assert(state.chunk_read <= state.content_read); if(state.chunk_read == state.chunk_length) { static const string_view terminator{"\r\n"}; state.content_length -= size(terminator); state.content_read -= size(terminator); assert(state.chunk_length >= 2); assert(state.chunk_read == state.chunk_length); state.chunk_length -= size(terminator); state.chunk_read -= size(terminator); auto &chunk{req.in.chunks.back()}; std::get<1>(chunk) -= size(terminator); assert(size(chunk) == state.chunk_length); assert(std::get<0>(chunk) <= std::get<1>(chunk)); if(state.chunk_length == 0) { assert(state.chunk_read == 0); assert(!done); done = true; assert(req.opt); if(req.opt->contiguous_content) { assert(state.content_length == size_chunks(req.in)); assert(req.in.chunks.size() >= 1); assert(empty(req.in.chunks.back())); req.in.chunks.pop_back(); if(req.in.chunks.size() > 1) { req.in.dynamic = size_chunks(req.in); req.in.content = req.in.dynamic; size_t copied{0}; for(const auto &buffer : req.in.chunks) copied += copy(req.in.content + copied, buffer); assert(copied == size(req.in.content)); assert(copied == state.content_length); } else if(req.in.chunks.size() == 1) { req.in.dynamic = std::move(req.in.chunks.front()); req.in.content = req.in.dynamic; assert(size(req.in.content) == state.content_length); } req.in.chunks.clear(); } set_value(state.status); } } // Invoke the user's optional progress callback; this function // should be marked noexcept for the time being. if(req.in.progress && !done) req.in.progress(buffer, const_buffer{data(chunk), state.chunk_read}); if(state.chunk_read == state.chunk_length) { assert(state.chunk_read == state.chunk_length); assert(state.chunk_read <= state.content_read); state.chunk_length = size_t(-1); state.chunk_read = 0; } return {}; } /// An idempotent operation that provides the location of where the socket /// should place the next received data. The tag figures this out based on /// whether it receiving HTTP head data or whether it is in content mode. /// ircd::mutable_buffer ircd::server::tag::make_read_buffer() const { assert(request); assert(state.head_read <= size(request->in.head)); assert(state.content_read <= state.content_length); if(state.status == (http::code)0) return make_read_head_buffer(); if(state.chunk_length == size_t(-1) && null(request->in.content)) return make_read_chunk_dynamic_head_buffer(); if(state.chunk_length == size_t(-1)) return make_read_chunk_head_buffer(); if(state.chunk_length && null(request->in.content)) return make_read_chunk_dynamic_content_buffer(); if(state.chunk_length) return make_read_chunk_content_buffer(); if(state.content_read >= size(request->in.content)) return make_read_discard_buffer(); return make_read_content_buffer(); } ircd::mutable_buffer ircd::server::tag::make_read_head_buffer() const { assert(request); const auto &req{*request}; const auto &head{req.in.head}; if(unlikely(size(req.in.head) <= state.head_read)) throw buffer_overrun { "Supplied buffer of %zu too small for HTTP head", size(req.in.head) }; const size_t remaining { size(head) - state.head_read }; const mutable_buffer buffer { data(head) + state.head_read, remaining }; assert(size(buffer) <= size(head)); assert(size(buffer) > 0); return buffer; } ircd::mutable_buffer ircd::server::tag::make_read_content_buffer() const { assert(request); const auto &req{*request}; const auto &content{req.in.content}; if(unlikely(size(content) <= state.content_read)) throw buffer_overrun { "Content buffer of %zu bytes too small to read %zu bytes of content", size(content), state.content_length }; // The amount of bytes we still have to read to for the response const size_t remaining { size(content) - state.content_read }; assert(remaining > 0); return { data(content) + state.content_read, remaining }; } /// The chunk head buffer starts after the last chunk ended and has a size of /// the rest of the available content buffer (hopefully much less will be /// needed). If only part of the chunk head was received previously this /// function accounts for that by returning a buffer which starts at the /// content_read offset (which is at the end of that previous read). /// ircd::mutable_buffer ircd::server::tag::make_read_chunk_head_buffer() const { assert(request); assert(state.chunk_length == size_t(-1)); assert(state.content_read >= state.content_length); const auto &req{*request}; const auto &content{req.in.content}; if(unlikely(size(content) <= state.content_read)) throw buffer_overrun { "Content buffer of %zu bytes too small to read next chunk header", size(content) }; const size_t remaining { size(content) - state.content_read }; const mutable_buffer buffer { data(content) + state.content_read, remaining }; assert(size(buffer) > 0); return buffer; } ircd::mutable_buffer ircd::server::tag::make_read_chunk_content_buffer() const { assert(request); assert(state.chunk_length > 0); assert(state.content_read <= state.content_length); const auto &req{*request}; const auto &content{req.in.content}; assert(size(content) >= state.content_read); const size_t buffer_remaining { size(content) - state.content_read }; const size_t chunk_remaining { content_remaining() }; assert(chunk_remaining <= state.chunk_length); assert(chunk_remaining == state.content_length - state.content_read); const size_t buffer_size { std::min(buffer_remaining, chunk_remaining) }; if(unlikely(buffer_size < chunk_remaining)) throw buffer_overrun { "Content buffer of %zu bytes too small to read remaining %zu of chunk", size(content), chunk_remaining }; const mutable_buffer buffer { data(content) + state.content_read, buffer_size }; assert(size(buffer) > 0); return buffer; } /// The dynamic chunk head buffer starts after the main head and has a size /// of the remaining main head buffer. This area is overwritten for each /// chunk head. /// ircd::mutable_buffer ircd::server::tag::make_read_chunk_dynamic_head_buffer() const { assert(request); const auto &req{*request}; assert(state.chunk_length == size_t(-1)); assert(null(req.in.content)); assert(size(req.in.head) >= state.head_read); const size_t head_max { size(req.in.head) + state.head_rem }; // The total offset in the head buffer is the message head plus the // amount of chunk head received so far, which is kept in chunk_read. const size_t head_offset { state.head_read + state.chunk_read }; assert(head_max >= head_offset); if(unlikely(head_max - head_offset <= 16)) throw buffer_overrun { "Remaining head buffer of %zu bytes too small to read next chunk header", head_max - state.head_read }; const size_t remaining { head_max - head_offset }; const mutable_buffer buffer { data(req.in.head) + state.head_read + state.chunk_read, remaining }; assert(size(buffer) > 0); return buffer; } ircd::mutable_buffer ircd::server::tag::make_read_chunk_dynamic_content_buffer() const { assert(request); const auto &req{*request}; assert(state.chunk_length > 0); assert(state.content_read <= state.content_length); assert(null(req.in.content)); assert(!req.in.chunks.empty()); const auto &buffer { req.in.chunks.back() }; assert(size(buffer) == state.chunk_length); assert(state.chunk_read <= size(buffer)); const size_t buffer_remaining { size(buffer) - state.chunk_read }; const mutable_buffer ret { data(buffer) + state.chunk_read, buffer_remaining }; assert(size(ret) > 0); return ret; } ircd::mutable_buffer ircd::server::tag::make_read_discard_buffer() const { assert(request); assert(content_overflow() > 0); assert(content_overflow() <= state.content_read); assert(state.content_read >= size(request->in.content)); const size_t remaining { content_overflow() - state.content_read }; static char buffer[512]; const size_t buffer_max { std::min(remaining, sizeof(buffer)) }; return { buffer, buffer_max }; } size_t ircd::server::tag::content_remaining() const { assert(state.content_length >= state.content_read); return state.content_length - state.content_read; } size_t ircd::server::tag::content_overflow() const { assert(request); const auto &req{*request}; const ssize_t diff(state.content_length - size(req.in.content)); return std::max(diff, ssize_t(0)); } template<class... args> void ircd::server::tag::set_value(args&&... a) { if(abandoned()) return; const http::code &code { std::forward<args>(a)... }; assert(request->opt); if(request->opt->http_exceptions && code >= http::code(300)) { const string_view content { data(request->in.content), size(request->in.content) }; set_exception(http::error{code, std::string{content}}); return; } p.set_value(code); } template<class... args> void ircd::server::tag::set_exception(args&&... a) { if(abandoned()) return; set_exception(std::make_exception_ptr(std::forward<args>(a)...)); } void ircd::server::tag::set_exception(std::exception_ptr eptr) { if(abandoned()) return; p.set_exception(std::move(eptr)); } bool ircd::server::tag::abandoned() const { return !p.valid(); } bool ircd::server::tag::canceled() const { return bool(cancellation); } bool ircd::server::tag::committed() const { return write_completed() > 0; } size_t ircd::server::tag::read_remaining() const { return read_size() - read_completed(); } size_t ircd::server::tag::read_completed() const { return state.head_read + state.content_read; } size_t ircd::server::tag::read_size() const { return state.head_read + state.content_length; } size_t ircd::server::tag::write_remaining() const { return write_size() - write_completed(); } size_t ircd::server::tag::write_completed() const { return state.written; } size_t ircd::server::tag::write_size() const { return request? size(request->out) : 0; }