// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.

#include <ircd/asio.h>

namespace ircd::server
{
	// Internal state
	ctx::dock dock;     // internal semaphore

	// Internal util
	template<class F> size_t accumulate_peers(F&&);
	template<class F> size_t accumulate_links(F&&);
	template<class F> size_t accumulate_tags(F&&);

	// Internal control
	std::unique_ptr<peer> create(const net::hostport &);
	void interrupt_all();
	void close_all();
	void wait_all();
}

decltype(ircd::server::log)
ircd::server::log
{
	"server", 'S'
};

ircd::conf::item<ircd::seconds>
close_all_timeout
{
	{ "name",     "ircd.server.close_all_timeout" },
	{ "default",  2L                              },
};

void
ircd::server::wait_all()
{
	while(peer_unfinished())
	{
		if(dock.wait_for(seconds(2)))
			continue;

		log.warning("Waiting for %zu tags on %zu links on %zu of %zu peers to close...",
		            tag_count(),
		            link_count(),
		            peer_unfinished(),
		            peer_count());
	}
}

void
ircd::server::close_all()
{
	log.debug("Closing all %zu peers",
	          peer_count());

	net::close_opts opts;
	opts.timeout = seconds(close_all_timeout);

	for(auto &peer : peers)
		peer.second->close(opts);
}

void
ircd::server::interrupt_all()
{
	log.debug("Interrupting %zu tags on %zu links on %zu peers",
	          tag_count(),
	          link_count(),
	          peer_count());

	for(auto &peer : peers)
		peer.second->interrupt();
}

ircd::server::peer &
ircd::server::get(const net::hostport &hostport)
{
	auto it(peers.lower_bound(host(hostport)));
	if(it == peers.end() || it->first != host(hostport))
	{
		auto peer{create(hostport)};
		log.debug("peer(%p) for %s created; adding...",
		          peer.get(),
		          string(hostport));

		const string_view key{peer->hostname};
		it = peers.emplace_hint(it, key, std::move(peer));
	}

	return *it->second;
}

std::unique_ptr<ircd::server::peer>
ircd::server::create(const net::hostport &hostport)
{
	auto peer(std::make_unique<peer>());
	peer->hostname = net::canonize(hostport);
	peer->open_opts = net::open_opts
	{
		peer->remote, net::hostport{peer->hostname}
	};

	peer->resolve(hostport);
	return peer;
}

ircd::server::peer &
ircd::server::find(const net::hostport &hostport)
{
	return *peers.at(host(hostport));
}

bool
ircd::server::exists(const net::hostport &hostport)
{
	return peers.find(host(hostport)) != end(peers);
}

bool
ircd::server::errclear(const net::hostport &hostport)
{
	const auto it
	{
		peers.find(host(hostport))
	};

	if(it == end(peers))
		return false;

	auto &peer(*it->second);
	return peer.err_clear();
}

ircd::string_view
ircd::server::errmsg(const net::hostport &hostport)
{
	const auto it
	{
		peers.find(host(hostport))
	};

	if(it == end(peers))
		return {};

	return it->second->err_msg();
}

size_t
ircd::server::peer_unfinished()
{
	return accumulate_peers([]
	(const auto &peer)
	{
		return !peer.finished();
	});
}

size_t
ircd::server::peer_count()
{
	return peers.size();
}

size_t
ircd::server::link_count()
{
	return accumulate_peers([]
	(const auto &peer)
	{
		return peer.link_count();
	});
}

size_t
ircd::server::tag_count()
{
	return accumulate_peers([]
	(const auto &peer)
	{
		return peer.tag_count();
	});
}

template<class F>
size_t
ircd::server::accumulate_tags(F&& closure)
{
	return accumulate_links([&closure]
	(const auto &link)
	{
		return link.accumulate_tags(std::forward<F>(closure));
	});
}

template<class F>
size_t
ircd::server::accumulate_links(F&& closure)
{
	return accumulate_peers([&closure]
	(const auto &peer)
	{
		return peer.accumulate_links(std::forward<F>(closure));
	});
}

template<class F>
size_t
ircd::server::accumulate_peers(F&& closure)
{
	return std::accumulate(begin(peers), end(peers), size_t(0), [&closure]
	(auto ret, const auto &pair)
	{
		const auto &peer{*pair.second};
		return ret += closure(peer);
	});
}

//
// init
//

ircd::server::init::init()
{
}

ircd::server::init::~init()
noexcept
{
	close();
	wait();
	peers.clear();
}

void
ircd::server::init::wait()
{
	wait_all();
}

void
ircd::server::init::close()
{
	close_all();
}

void
ircd::server::init::interrupt()
{
	interrupt_all();
}

///
// request
//

decltype(ircd::server::request::opts_default)
ircd::server::request::opts_default
{};

/// Canceling a request is tricky. This allows a robust way to let the user's
/// request go out of scope at virtually any time without disrupting the
/// pipeline and other requests.
bool
ircd::server::cancel(request &request)
{
	if(!request.tag)
		return false;

	if(request.tag->canceled())
		return false;

	if(request.tag->abandoned())
		return false;

	auto &tag
	{
		*request.tag
	};

/*
	log.debug("cancel request(%p) tag(%p) commit:%d w:%zu hr:%zu cr:%zu",
	          &request,
	          &tag,
	          tag.committed(),
	          tag.state.written,
	          tag.state.head_read,
	          tag.state.content_read);
*/

	tag.set_exception(canceled
	{
		"Request canceled"
	});

	// We got off easy... The link's write loop won't start an abandoned
	// request. All that has to be done is indicate a full cancellation
	// immediately and the user will know nothing was revealed to the remote.
	if(!tag.committed())
		return true;

	// Now things aren't so easy. More complicated logic happens inside...
	cancel(request, tag);
	return true;
}

void
ircd::server::submit(const hostport &hostport,
                     request &request)
{
	if(unlikely(ircd::runlevel != ircd::runlevel::RUN))
		throw unavailable
		{
			"Unable to fulfill requests at this time."
		};

	assert(request.tag == nullptr);
	auto &peer(server::get(hostport));
	peer.submit(request);
}

//
// peer
//

decltype(ircd::server::peers)
ircd::server::peers
{};

decltype(ircd::server::peer::link_min_default)
ircd::server::peer::link_min_default
{
	{ "name",     "ircd.server.peer.link_min" },
	{ "default",  1L                          }
};

decltype(ircd::server::peer::link_max_default)
ircd::server::peer::link_max_default
{
	{ "name",     "ircd.server.peer.link_max" },
	{ "default",  2L                          }
};

//
// peer::peer
//

ircd::server::peer::peer()
{
}

ircd::server::peer::~peer()
noexcept
{
	assert(links.empty());
}

void
ircd::server::peer::close(const net::close_opts &opts)
{
	op_fini = true;
	assert(!op_resolve);
	std::vector<link *> links(this->links.size());
	pointers(this->links, links);
	for(const auto &link : links)
		link->close(opts);

	if(finished())
		return handle_finished();
}

void
ircd::server::peer::interrupt()
{
	for(auto &link : this->links)
		link.cancel_all(std::make_exception_ptr(canceled
		{
			"Request was aborted due to interruption."
		}));
}

bool
ircd::server::peer::err_clear()
{
	const auto ret{bool(e)};
	e.reset(nullptr);
	op_fini = false;
	return ret;
}

template<class... A>
void
ircd::server::peer::err_set(A&&... args)
{
	this->e = std::make_unique<err>(std::forward<A>(args)...);
}

ircd::string_view
ircd::server::peer::err_msg()
const
{
	return bool(e)? what(e->eptr) : string_view{};
}

bool
ircd::server::peer::err_has()
const
{
	return bool(e);
}

decltype(ircd::server::peer::error_clear_default)
ircd::server::peer::error_clear_default
{
	{ "name",     "ircd.server.peer.error.clear_default" },
	{ "default",  305L                                   }
};

bool
ircd::server::peer::err_check()
{
	if(op_fini)
		return false;

	if(!err_has())
		return true;

	//TODO: The specific error type should be switched and finer
	//TODO: timeouts should be used depending on the error: i.e
	//TODO: NXDOMAIN vs. temporary conn timeout, etc.
	if(e->etime + seconds(error_clear_default) > now<system_point>())
		return false;

	err_clear();
	return true;
}

void
ircd::server::peer::submit(request &request)
try
{
	if(!err_check() || unlikely(ircd::runlevel != ircd::runlevel::RUN))
		throw unavailable
		{
			"Peer is unable to take any requests: %s", err_msg()
		};

	link *const ret
	{
		link_get(request)
	};

	if(likely(ret))
	{
		ret->submit(request);
		return;
	}

	if(!request.tag)
		throw unavailable
		{
			"No link to peer %s available", hostname
		};
	else
		request.tag->set_exception(unavailable
		{
			"No link to peer %s available", hostname
		});
}
catch(const std::exception &e)
{
	if(!request.tag)
		throw;

	const auto eptr(std::current_exception());
	const ctx::exception_handler eh;
	request.tag->set_exception(eptr);
}

/// Dispatch algorithm here; finds the best link to place this request on,
/// or creates a new link entirely. There are a number of factors: foremost
/// if any special needs are indicated,
//
ircd::server::link *
ircd::server::peer::link_get(const request &request)
{
	if(links.empty())
		return &link_add(1);

	// Indicates that we can't add anymore links for this peer and the rest
	// of the algorithm should consider this.
	const bool links_maxed
	{
		links.size() >= link_max()
	};

	link *best{nullptr};
	for(auto &cand : links)
	{
		// Don't want a link that's shutting down or marked for exclusion
		if(cand.op_fini || cand.exclude)
			continue;

		if(!best)
		{
			best = &cand;
			continue;
		}

		// Indicates that the best candidate has its pipe saturated which can
		// be factored into the comparisons here.
		const bool best_maxed
		{
			best->tag_committed() >= best->tag_commit_max()
		};

		const bool cand_maxed
		{
			cand.tag_committed() >= cand.tag_commit_max()
		};

		if(best_maxed && !cand_maxed)
		{
			best = &cand;
			continue;
		}

		if(!best_maxed && cand_maxed)
			continue;

		// Candidates's queue has less or same backlog of unsent requests, but
		// now measure if candidate will take longer to process at least the
		// write-side of those requests.
		if(cand.write_remaining() > best->write_remaining())
			continue;

		// Candidate might be working through really large content; though
		// this is a very sketchy measurement right now since we only *might*
		// know about content-length for the *one* active tag occupying the
		// socket.
		if(cand.read_remaining() > best->read_remaining())
			continue;

		// Coarse distribution based on who has more work; this is weak, should
		// be replaced.
		if(cand.tag_count() > best->tag_count())
			continue;

		best = &cand;
	}

	if(links_maxed)
		return best;

	// best might not be good enough, we could try another connection. If best
	// has a backlog or is working on a large download or slow request.
	if(!best)
	{
		best = &link_add();
		return best;
	}

	if(best->tag_uncommitted() < best->tag_commit_max())
		return best;

	best = &link_add();
	return best;
}

ircd::server::link &
ircd::server::peer::link_add(const size_t &num)
{
	assert(!finished());

	if(e)
	{
		std::rethrow_exception(e->eptr);
		__builtin_unreachable();
	}

	assert(!op_fini);
	links.emplace_back(*this);
	auto &link{links.back()};

	if(remote)
		link.open(open_opts);

	return link;
}

void
ircd::server::peer::handle_open(link &link,
                                std::exception_ptr eptr)
{
	if(eptr)
	{
		if(links.size() == 1)
			err_set(eptr);

		log.derror("peer(%p) link(%p) [%s]: open: %s",
		           this,
		           &link,
		           string(remote),
		           what(eptr));

		if(op_fini)
		{
			if(link.finished())
				handle_finished(link);

			return;
		}

		link.close(net::dc::RST);
		return;
	}
}

void
ircd::server::peer::handle_close(link &link,
                                 std::exception_ptr eptr)
{
	if(eptr)
		log.derror("peer(%p) link(%p) [%s]: close: %s",
		           this,
		           &link,
		           string(remote),
		           what(eptr));

	if(link.finished())
		handle_finished(link);
}

void
ircd::server::peer::handle_error(link &link,
                                 std::exception_ptr eptr)
{
	assert(bool(eptr));
	link.cancel_committed(eptr);
	log.derror("peer(%p) link(%p): %s",
	           this,
	           &link,
	           what(eptr));

	link.close(net::dc::RST);
}

void
ircd::server::peer::handle_error(link &link,
                                 const boost::system::system_error &e)
{
	using namespace boost::system::errc;
	using boost::system::system_category;
	using boost::asio::error::get_misc_category;

	const auto &ec{e.code()};
	if(ec.category() == system_category()) switch(ec.value())
	{
		case success:
			assert(0);
			break;

		default:
			break;
	}
	else if(ec.category() == get_misc_category()) switch(ec.value())
	{
		case asio::error::eof:
			log.debug("peer(%p) link(%p) [%s]: %s",
			          this,
			          &link,
			          string(remote),
			          e.what());

			link.close(net::close_opts_default);
			return;

		default:
			break;
	}

	log.derror("peer(%p) link(%p) [%s]: error: %s",
	           this,
	           &link,
	           string(remote),
	           e.what());

	link.cancel_committed(std::make_exception_ptr(e));
	link.close(net::dc::RST);
}

void
ircd::server::peer::handle_finished(link &link)
{
	assert(link.finished());
	del(link);

	if(finished())
		handle_finished();
}

/// This is where we're notified a tag has been completed either to start the
/// next request when the link has too many requests in flight or perhaps to
/// reschedule the queues in various links to diffuse the pending requests.
/// This can't throw because the link still has to remove this tag from its
/// queue.
void
ircd::server::peer::handle_tag_done(link &link,
                                    tag &tag)
noexcept try
{
	log.debug("peer(%p) link(%p) tag(%p) done wt:%zu rt:%zu hr:%zu cr:%zu cl:%zu; %zu more in queue",
	          this,
	          &link,
	          &tag,
	          tag.write_size(),
	          tag.read_size(),
	          tag.state.head_read,
	          tag.state.content_read,
	          tag.state.content_length,
	          link.tag_count() - 1);

	if(link.tag_committed() >= link.tag_commit_max())
		link.wait_writable();
}
catch(const std::exception &e)
{
	log.critical("peer(%p) link(%p) tag(%p) done; error: %s",
	             this,
	             &link,
	             &tag,
	             e.what());
}

/// This is where we're notified a link has processed its queue and has no
/// more work. We can choose whether to close the link or keep it open and
/// reinstate the read poll; reschedule other work to this link, etc.
void
ircd::server::peer::handle_link_done(link &link)
{
	assert(link.tag_count() == 0);

	if(link_ready() > link_min())
	{
		link.close();
		return;
	}

	link.wait_readable();
}

/// This is called when a tag on a link receives an HTTP response head.
/// We can use this to learn information from the tag's request and the
/// response head etc.
void
ircd::server::peer::handle_head_recv(const link &link,
                                     const tag &tag,
                                     const http::response::head &head)
{
	// Learn the software version of the remote peer so we can shape
	// requests more effectively.
	if(!server_name && head.server)
	{
		server_name = std::string{head.server};
		log.debug("peer(%p) learned %s is '%s'",
		          this,
		          string(remote),
		          server_name);
	}
}

void
ircd::server::peer::disperse(link &link)
{
	disperse_uncommitted(link);
	link.cancel_committed(std::make_exception_ptr(canceled
	{
		"Request was aborted; though it was partially completed"
	}));

	assert(link.queue.empty());
}

void
ircd::server::peer::disperse_uncommitted(link &link)
{
	auto &queue(link.queue);
	auto it(begin(queue));
	while(it != end(queue)) try
	{
		auto &tag{*it};
		if(!tag.request || tag.committed())
		{
			++it;
			continue;
		}

		submit(*tag.request);
		it = queue.erase(it);
	}
	catch(const std::exception &e)
	{
		const auto &tag{*it};
		log.warning("peer(%p) failed to resubmit tag(%p): %s",
		            this,
		            &tag,
		            e.what());

		it = queue.erase(it);
	}
}

/// This *cannot* be called unless a link's socket is closed and its queue
/// is empty. It is usually only called by a disconnect handler because
/// the proper way to remove a link is asynchronously through link.close();
void
ircd::server::peer::del(link &link)
{
	assert(!link.tag_count());
	assert(!link.opened());
	const auto it(std::find_if(begin(links), end(links), [&link]
	(const auto &link_)
	{
		return &link_ == &link;
	}));

	assert(it != end(links));
	log.debug("peer(%p) removing link(%p) %zu of %zu to %s",
	          this,
	          &link,
	          std::distance(begin(links), it),
	          links.size(),
	          string(remote));

	links.erase(it);
}

void
ircd::server::peer::resolve(const hostport &hostport)
{
	if(op_resolve || op_fini)
		return;

	auto handler
	{
		std::bind(&peer::handle_resolve, this, ph::_1, ph::_2)
	};

	op_resolve = true;
	net::dns(hostport, std::move(handler));
}

void
ircd::server::peer::handle_resolve(std::exception_ptr eptr,
                                   const ipport &ipport)
try
{
	assert(op_resolve);
	op_resolve = false;

	if(eptr)
	{
		err_set(eptr);
		std::rethrow_exception(eptr);
		__builtin_unreachable();
	}

	this->remote = ipport;
	open_opts.ipport = this->remote;
	host(open_opts.hostport) = this->hostname;
	port(open_opts.hostport) = port(ipport);
	open_opts.common_name = this->hostname;

	if(unlikely(finished()))
		return handle_finished();

	if(op_fini)
		return;

	for(auto &link : links)
		link.open(open_opts);
}
catch(const std::exception &e)
{
	log.derror("peer(%p): error: %s",
	           this,
	           e.what());
	close();
}

void
ircd::server::peer::handle_finished()
{
	assert(finished());

	// Right now this is what the server:: ~init sequence needs
	// to wait for all links to close on IRCd shutdown.
	server::dock.notify_all();
}

size_t
ircd::server::peer::read_total()
const
{
	return read_bytes;
}

size_t
ircd::server::peer::write_total()
const
{
	return write_bytes;
}

size_t
ircd::server::peer::read_remaining()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.read_remaining();
	});
}

size_t
ircd::server::peer::read_completed()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.read_completed();
	});
}

size_t
ircd::server::peer::read_size()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.read_size();
	});
}

size_t
ircd::server::peer::write_remaining()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.write_remaining();
	});
}

size_t
ircd::server::peer::write_completed()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.write_completed();
	});
}

size_t
ircd::server::peer::write_size()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.write_size();
	});
}

size_t
ircd::server::peer::tag_uncommitted()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.tag_uncommitted();
	});
}

size_t
ircd::server::peer::tag_committed()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.tag_committed();
	});
}

size_t
ircd::server::peer::tag_count()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.tag_count();
	});
}

size_t
ircd::server::peer::link_ready()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.ready();
	});
}

size_t
ircd::server::peer::link_busy()
const
{
	return accumulate_links([](const auto &link)
	{
		return link.busy();
	});
}

size_t
ircd::server::peer::link_count()
const
{
	return links.size();
}

size_t
ircd::server::peer::link_min()
const
{
	return link_min_default;
}

size_t
ircd::server::peer::link_max()
const
{
	return link_max_default;
}

bool
ircd::server::peer::finished()
const
{
	return links.empty() && !op_resolve && op_fini;
}

template<class F>
size_t
ircd::server::peer::accumulate_tags(F&& closure)
const
{
	return accumulate_links([&closure](const auto &link)
	{
		return link.accumulate([&closure](const auto &tag)
		{
			return closure(tag);
		});
	});
}

template<class F>
size_t
ircd::server::peer::accumulate_links(F&& closure)
const
{
	return std::accumulate(begin(links), end(links), size_t(0), [&closure]
	(auto ret, const auto &tag)
	{
		return ret += closure(tag);
	});
}

//
// link
//

decltype(ircd::server::link::tag_max_default)
ircd::server::link::tag_max_default
{
	{ "name",     "ircd.server.link.tag_max" },
	{ "default",  -1L                        }
};

decltype(ircd::server::link::tag_commit_max_default)
ircd::server::link::tag_commit_max_default
{
	{ "name",     "ircd.server.link.tag_commit_max" },
	{ "default",  3L                                }
};

//
// link::link
//

ircd::server::link::link(server::peer &peer)
:peer{&peer}
{
}

ircd::server::link::~link()
noexcept
{
	assert(!busy());
	assert(!opened());
}

void
ircd::server::link::submit(request &request)
{
	assert(!request.tag || !request.tag->committed());

	const auto it
	{
		request.tag? queue.emplace(end(queue), std::move(*request.tag)):
		             queue.emplace(end(queue), request)
	};
/*
	log.debug("tag(%p) submitted to link(%p) queue: %zu",
	          &(*it),
	          this,
	          tag_count());
*/
	if(ready())
		wait_writable();
}

void
ircd::server::link::cancel_all(std::exception_ptr eptr)
{
	for(auto it(begin(queue)); it != end(queue); it = queue.erase(it))
	{
		auto &tag{*it};
		if(!tag.request)
			continue;

		tag.set_exception(eptr);
	}
}

void
ircd::server::link::cancel_committed(std::exception_ptr eptr)
{
	for(auto it(begin(queue)); it != end(queue); it = queue.erase(it))
	{
		auto &tag{*it};
		if(!tag.request)
			continue;

		if(!tag.committed())
			break;

		tag.set_exception(eptr);
	}
}

void
ircd::server::link::cancel_uncommitted(std::exception_ptr eptr)
{
	auto it(begin(queue));
	while(it != end(queue))
	{
		auto &tag{*it};
		if(!tag.request || tag.committed())
		{
			++it;
			continue;
		}

		tag.set_exception(eptr);
		it = queue.erase(it);
	}
}

bool
ircd::server::link::open(const net::open_opts &open_opts)
{
	assert(ircd::runlevel == ircd::runlevel::RUN);

	if(op_init)
		return false;

	auto handler
	{
		std::bind(&link::handle_open, this, ph::_1)
	};

	op_init = true;
	const unwind::exceptional unhandled{[this]
	{
		op_init = false;
	}};

	socket = net::open(open_opts, std::move(handler));
	return true;
}

void
ircd::server::link::handle_open(std::exception_ptr eptr)
{
	assert(op_init);
	op_init = false;

	if(!eptr && !op_fini)
		wait_writable();

	if(peer)
		peer->handle_open(*this, std::move(eptr));
}

bool
ircd::server::link::close(const net::close_opts &close_opts)
{
	if(op_fini)
		return false;

	op_fini = true;

	// Tell the peer to ditch everything in the queue; op_fini has been set so
	// the tags won't get assigned back to this link.
	if(tag_count() && peer)
		peer->disperse(*this);

	auto handler
	{
		std::bind(&link::handle_close, this, ph::_1)
	};

	if(!socket)
	{
		handler(std::exception_ptr{});
		return true;
	}

	net::close(*socket, close_opts, std::move(handler));
	return true;
}

void
ircd::server::link::handle_close(std::exception_ptr eptr)
{
	assert(op_fini);

	if(op_init)
	{
		assert(bool(eptr));
	}

	if(peer)
		peer->handle_close(*this, std::move(eptr));
}

void
ircd::server::link::wait_writable()
{
	if(op_write || unlikely(op_fini))
		return;

	auto handler
	{
		std::bind(&link::handle_writable, this, ph::_1)
	};

	assert(ready());
	op_write = true;
	const unwind::exceptional unhandled{[this]
	{
		op_write = false;
	}};

	net::wait(*socket, net::ready::WRITE, std::move(handler));
}

void
ircd::server::link::handle_writable(const error_code &ec)
try
{
	using namespace boost::system::errc;
	using boost::system::system_category;

	op_write = false;

	if(unlikely(finished()))
	{
		assert(peer);
		return peer->handle_finished(*this);
	}

	if(ec.category() == system_category()) switch(ec.value())
	{
		case success:
			handle_writable_success();
			return;

		case operation_canceled:
			return;

		default:
			break;
	}

	throw boost::system::system_error{ec};
}
catch(const boost::system::system_error &e)
{
	assert(peer);
	peer->handle_error(*this, e);
}
catch(const std::exception &e)
{
	assert(peer);
	peer->handle_error(*this, std::current_exception());
}

void
ircd::server::link::handle_writable_success()
{
	auto it(begin(queue));
	while(it != end(queue))
	{
		auto &tag{*it};
		if((tag.abandoned() || tag.canceled()) && !tag.committed())
		{
			log.debug("link(%p) discarding canceled:%d abandoned:%d uncommitted tag %zu of %zu",
			          this,
			          tag.canceled(),
			          tag.abandoned(),
			          tag_committed(),
			          tag_count());

			it = queue.erase(it);
			continue;
		}

		if(tag_committed() == 0)
			wait_readable();

		if(!process_write(tag))
		{
			wait_writable();
			break;
		}

		// Limits the amount of requests in the pipe.
		if(tag_committed() >= tag_commit_max())
			break;

		++it;
	}
}

bool
ircd::server::link::process_write(tag &tag)
{
	if(!tag.committed())
		log.debug("peer(%p) link(%p) starting on tag(%p) %zu of %zu: wt:%zu",
		          peer,
		          this,
		          &tag,
		          tag_committed(),
		          tag_count(),
		          tag.write_size());

	while(tag.write_remaining())
	{
		const const_buffer buffer
		{
			tag.make_write_buffer()
		};

		assert(!empty(buffer));
		const const_buffer written
		{
			process_write_next(buffer)
		};

		tag.wrote_buffer(written);
		assert(tag_committed() <= tag_commit_max());
		if(size(written) < size(buffer))
			return false;
	}

	return true;
}

ircd::const_buffer
ircd::server::link::process_write_next(const const_buffer &buffer)
{
	const size_t bytes
	{
		write_any(*socket, buffer)
	};

	const const_buffer written
	{
		data(buffer), bytes
	};

	assert(peer);
	peer->write_bytes += bytes;
	return written;
}

void
ircd::server::link::wait_readable()
{
	if(op_read || unlikely(op_fini))
		return;

	auto handler
	{
		std::bind(&link::handle_readable, this, ph::_1)
	};

	assert(ready());
	op_read = true;
	const unwind::exceptional unhandled{[this]
	{
		op_read = false;
	}};

	net::wait(*socket, net::ready::READ, std::move(handler));
}

void
ircd::server::link::handle_readable(const error_code &ec)
try
{
	using namespace boost::system::errc;
	using boost::system::system_category;

	op_read = false;

	if(unlikely(finished()))
	{
		assert(peer);
		return peer->handle_finished(*this);
	}

	if(ec.category() == system_category()) switch(ec.value())
	{
		case success:
			handle_readable_success();
			return;

		case operation_canceled:
			return;

		default:
			break;
	}

	throw boost::system::system_error{ec};
}
catch(const boost::system::system_error &e)
{
	assert(peer);
	peer->handle_error(*this, e);
}
catch(const std::exception &e)
{
	assert(peer);
	peer->handle_error(*this, std::current_exception());
}

/// Process as many read operations from as many tags as possible
void
ircd::server::link::handle_readable_success()
{
	if(queue.empty())
	{
		discard_read();
		wait_readable();
		return;
	}

	// Data pointed to by overrun will remain intact between iterations
	// because this loop isn't executing in any ircd::ctx.
	const_buffer overrun; do
	{
		if(!process_read(overrun))
		{
			wait_readable();
			return;
		}
	}
	while(!queue.empty());

	assert(peer);
	peer->handle_link_done(*this);
}

/// Process as many read operations for one tag as possible
bool
ircd::server::link::process_read(const_buffer &overrun)
try
{
	auto &tag
	{
		queue.front()
	};

	if(!tag.committed())
	{
		// Tag hasn't sent its data yet, we shouldn't have anything for it
		assert(empty(overrun));
		discard_read(); // Should stumble on a socket error.
		return false;
	}

	bool done{false}; do
	{
		overrun = process_read_next(overrun, tag, done);
	}
	while(!done);

	assert(peer);
	peer->handle_tag_done(*this, queue.front());
	queue.pop_front();
	return true;
}
catch(const buffer_overrun &e)
{
	queue.pop_front();
	throw;
}
catch(const boost::system::system_error &e)
{
	using namespace boost::system::errc;

	switch(e.code().value())
	{
		case resource_unavailable_try_again:
			return false;

		case success:
			assert(0);
			return true;

		default:
			throw;
	}
}

/// Process one read operation for one tag
ircd::const_buffer
ircd::server::link::process_read_next(const const_buffer &underrun,
                                      tag &tag,
                                      bool &done)
try
{
	const mutable_buffer buffer
	{
		tag.make_read_buffer()
	};

	const size_t copied
	{
		copy(buffer, underrun)
	};

	const mutable_buffer remaining
	{
		data(buffer) + copied, size(buffer) - copied
	};

	const const_buffer view
	{
		read(remaining)
	};

	const const_buffer overrun
	{
		tag.read_buffer(view, done, *this)
	};

	assert(done || empty(overrun));
	return overrun;
}
catch(const buffer_overrun &e)
{
	tag.set_exception(e);
	throw;
}

/// Read directly off the link's socket into buf
ircd::const_buffer
ircd::server::link::read(const mutable_buffer &buf)
{
	const size_t received
	{
		read_one(*socket, buf)
	};

	assert(peer);
	peer->read_bytes += received;

	return const_buffer
	{
		data(buf), received
	};
}

void
ircd::server::link::discard_read()
{
	ssize_t discard
	{
		SSL_pending(socket->ssl.native_handle())
	};

	if(discard <= 0 && queue.empty())
		discard = available(*socket);

	if(discard <= 0 && !queue.empty())
		discard = 1;

	const size_t discarded
	{
		discard_any(*socket, size_t(discard))
	};

	assert(peer);
	peer->read_bytes += discarded;

	// Shouldn't ever be hit because the read() within discard() throws
	// the pending error like an eof.
	log.warning("link(%p) socket(%p) to %s discarded %zu of %zd unexpected bytes",
	            this,
	            socket.get(),
	            likely(peer)? string(peer->remote) : string(remote_ipport(*socket)),
	            discarded,
	            discard);

	// just in case so this doesn't get loopy with discarding zero with
	// an empty queue...
	if(unlikely(!discard && !discarded))
		throw assertive
		{
			"peer(%p) link(%p) socket(%p) queue is empty and nothing to discard.",
			peer,
			this,
			socket.get()
		};
}

size_t
ircd::server::link::tag_uncommitted()
const
{
	return tag_count() - tag_committed();
}

size_t
ircd::server::link::tag_committed()
const
{
	return accumulate_tags([](const auto &tag)
	{
		return tag.committed();
	});
}

size_t
ircd::server::link::tag_count()
const
{
	return queue.size();
}

size_t
ircd::server::link::read_total()
const
{
	return socket? socket->in.bytes : 0;
}

size_t
ircd::server::link::write_total()
const
{
	return socket? socket->out.bytes : 0;
}

size_t
ircd::server::link::read_remaining()
const
{
	return accumulate_tags([](const auto &tag)
	{
		return tag.read_remaining();
	});
}

size_t
ircd::server::link::read_completed()
const
{
	return accumulate_tags([](const auto &tag)
	{
		return tag.read_completed();
	});
}

size_t
ircd::server::link::read_size()
const
{
	return accumulate_tags([](const auto &tag)
	{
		return tag.read_size();
	});
}

size_t
ircd::server::link::write_remaining()
const
{
	return accumulate_tags([](const auto &tag)
	{
		return tag.write_remaining();
	});
}

size_t
ircd::server::link::write_completed()
const
{
	return accumulate_tags([](const auto &tag)
	{
		return tag.write_completed();
	});
}

size_t
ircd::server::link::write_size()
const
{
	return accumulate_tags([](const auto &tag)
	{
		return tag.write_size();
	});
}

bool
ircd::server::link::busy()
const
{
	return !queue.empty();
}

bool
ircd::server::link::ready()
const
{
	return opened() && !op_init && !op_fini;
}

bool
ircd::server::link::opened()
const noexcept
{
	return bool(socket) && net::opened(*socket);
}

bool
ircd::server::link::finished()
const
{
	if(!bool(socket))
		return true;

	return !opened() && op_fini && !op_init && !op_write && !op_read;
}

size_t
ircd::server::link::tag_commit_max()
const
{
	return tag_commit_max_default;
}

size_t
ircd::server::link::tag_max()
const
{
	return tag_max_default;
}

template<class F>
size_t
ircd::server::link::accumulate_tags(F&& closure)
const
{
	return std::accumulate(begin(queue), end(queue), size_t(0), [&closure]
	(auto ret, const auto &tag)
	{
		return ret += closure(tag);
	});
}

//
// tag
//

/// This is tricky. When a user cancels a request which has committed some
/// writes to the remote we have to continue to service it through to
/// completion without disrupting the linearity of the link's pipeline
/// and causing trouble with other requests. This all depends on what phase
/// the request is currently in.
///
/// In any case, the goal here is to swap out the user's request buffers
/// and replace them with cancellation buffers which will be transparent
/// to the link as it completes the request.
void
ircd::server::cancel(request &request,
                     tag &tag)
noexcept
{
	// Must have a fully associated request/tag which has committed some
	// data to the wire to enter this routine.
	assert(tag.committed());
	assert(request.tag == &tag);
	assert(tag.request == &request);

	// Disassociate the user's request and add our dummy request in its place.
	disassociate(request, tag);

	assert(tag.request == nullptr);
	tag.request = new server::request{};
	tag.request->tag = &tag;

	// Setup the cancellation buffers by mirroring the current state of the
	// user's buffers.

	const size_t cancellation_size
	{
		size(request.out) + size(request.in)
	};

	tag.cancellation = std::make_unique<char[]>(cancellation_size);
	char *ptr{tag.cancellation.get()};

	const mutable_buffer out_head{ptr, size(request.out.head)};
	tag.request->out.head = out_head;
	ptr += size(out_head);

	const mutable_buffer out_content{ptr, size(request.out.content)};
	tag.request->out.content = out_content;
	ptr += size(out_content);

	const mutable_buffer in_head{ptr, size(request.in.head)};
	tag.request->in.head = in_head;
	ptr += size(in_head);

	const mutable_buffer in_content{ptr, size(request.in.content)};
	tag.request->in.content = in_content;
	ptr += size(in_content);

	assert(size_t(std::distance(tag.cancellation.get(), ptr)) == cancellation_size);

	// If the head is not completely written we have to copy the remainder from where
	// the socket left off.
	if(tag.state.written < size(request.out.head))
	{
		const const_buffer src
		{
			data(request.out.head) + tag.state.written, size(request.out.head) - tag.state.written
		};

		const mutable_buffer dst
		{
			out_head + tag.state.written
		};

		copy(dst, src);
	}

	// If the content is not completely written we have to copy the remainder from where
	// the socket left off.
	const size_t content_written
	{
		tag.state.written > size(request.out.head)? tag.state.written - size(request.out.head) : 0
	};

	if(content_written < size(request.out.content))
	{
		const const_buffer src
		{
			data(request.out.content) + content_written, size(request.out.content) - content_written
		};

		const mutable_buffer dst
		{
			out_content + content_written
		};

		copy(dst, src);
	}

	// If the head is not completely read we have to copy what's been received so far so
	// we can parse a coherent head.
	if(tag.state.head_read > 0 && tag.state.head_read < size(request.in.head))
	{
		const const_buffer src
		{
			data(request.in.head), tag.state.head_read
		};

		const mutable_buffer dst
		{
			data(in_head), size(in_head)
		};

		copy(dst, src);
	}

	// Normally we have no reason to copy content, but there is one exception:
	// If the content is chunked encoding and the tag is in the phase of
	// receiving the chunk head we have to copy what's been received of that
	// head so far so the grammar can parse a coherent head to continue.
	if(tag.state.chunk_length == size_t(-1))
	{
		const const_buffer src
		{
			data(request.in.content) + tag.state.content_length,
			tag.state.content_read - tag.state.content_length
		};

		const mutable_buffer dst
		{
			in_content + tag.state.content_length
		};

		copy(dst, src);
	}
}

void
ircd::server::associate(request &request,
                        tag &tag)
{
	assert(request.tag == nullptr);
	assert(tag.request == nullptr);

	auto &future
	{
		static_cast<ctx::future<http::code> &>(request)
	};

	future = tag.p;
	request.tag = &tag;
	tag.request = &request;
}

void
ircd::server::associate(request &request,
                        tag &cur,
                        tag &&old)
noexcept
{
	assert(request.tag == &old);         // ctor moved
	assert(cur.request == &request);     // ctor moved
	assert(old.request == &request);     // ctor didn't trash old

	cur.request = &request;
	old.request = nullptr;
	request.tag = &cur;
}

void
ircd::server::associate(request &cur,
                        tag &tag,
                        request &&old)
noexcept
{
	assert(tag.request == &old);   // ctor already moved
	assert(cur.tag == &tag);       // ctor already moved
	assert(old.tag == &tag);       // ctor didn't trash old

	cur.tag = &tag;
	tag.request = &cur;
	old.tag = nullptr;
}

void
ircd::server::disassociate(request &request,
                           tag &tag)
{
	assert(request.tag == &tag);
	assert(tag.request == &request);
	assert(tag.abandoned());

	request.tag = nullptr;
	tag.request = nullptr;

	// If the original request was canceled a new request was attached in its
	// place in addition to an cancellation buffer. The existence of this
	// cancellation buffer indicates that we must delete the request here.
	// This is a little hacky but it gets the job done.
	if(bool(tag.cancellation))
		delete &request;
}

void
ircd::server::tag::wrote_buffer(const const_buffer &buffer)
{
	assert(request);
	const auto &req{*request};
	state.written += size(buffer);

	if(state.written <= size(req.out.head))
	{
		assert(data(buffer) >= begin(req.out.head));
		assert(data(buffer) < end(req.out.head));
	}
	else if(state.written <= size(req.out.head) + size(req.out.content))
	{
		assert(data(buffer) >= begin(req.out.content));
		assert(data(buffer) < end(req.out.content));
		assert(state.written <= write_size());

		// Invoke the user's optional progress callback; this function
		// should be marked noexcept and has no reason to throw yet.
		if(req.out.progress)
			req.out.progress(buffer, const_buffer{data(req.out.content), state.written});
	}
	else
	{
		assert(0);
	}
}

ircd::const_buffer
ircd::server::tag::make_write_buffer()
const
{
	assert(request);
	const auto &req{*request};

	return
		state.written < size(req.out.head)?
			make_write_head_buffer():

		state.written < size(req.out.head) + size(req.out.content)?
			make_write_content_buffer():

		const_buffer{};
}

ircd::const_buffer
ircd::server::tag::make_write_head_buffer()
const
{
	assert(request);
	const auto &req{*request};

	const size_t remain
	{
		size(req.out.head) - state.written
	};

	const const_buffer window
	{
		data(req.out.head) + state.written, remain
	};

	return window;
}

ircd::const_buffer
ircd::server::tag::make_write_content_buffer()
const
{
	assert(request);
	const auto &req{*request};
	assert(state.written >= size(req.out.head));

	const size_t content_offset
	{
		state.written - size(req.out.head)
	};

	const size_t remain
	{
		size(req.out.head) + size(req.out.content) - state.written
	};

	const const_buffer window
	{
		data(req.out.content) + content_offset, remain
	};

	return window;
}

/// Called by the controller of the socket with a view of the data received by
/// the socket. The location and size of `buffer` is the same or smaller than
/// the buffer previously supplied by make_read_buffer().
///
/// Sometimes make_read_buffer() supplies a buffer that is too large, and some
/// data read off the socket does not belong to this tag. In that case, This
/// function returns a const_buffer viewing the portion of `buffer` which is
/// considered the "overrun," and the socket controller will copy that over to
/// the next tag.
///
/// The tag indicates it is entirely finished with receiving its data by
/// setting the value of `done` to true. Otherwise it is assumed false.
///
/// The link argument is not to be used to control/modify the link from the
/// tag; it's only a backreference to flash information to the link/peer
/// through specific callbacks so the peer can learn information.
///
ircd::const_buffer
ircd::server::tag::read_buffer(const const_buffer &buffer,
                               bool &done,
                               link &link)
{
	assert(request);

	if(state.status == (http::code)0)
		return read_head(buffer, done, link);

	if(state.chunk_length == size_t(-1))
		return read_chunk_head(buffer, done);

	if(state.chunk_length)
		return read_chunk_content(buffer, done);

	return read_content(buffer, done);
}

ircd::const_buffer
ircd::server::tag::read_head(const const_buffer &buffer,
                             bool &done,
                             link &link)
{
	assert(request);
	auto &req{*request};

	// informal search for head terminator
	static const string_view terminator{"\r\n\r\n"};
	const auto pos
	{
		string_view{buffer}.find(terminator)
	};

	// No terminator found; account for what was received in this buffer
	// for the next call to make_head_buffer() preparing for the subsequent
	// invocation of this function with more data.
	if(pos == string_view::npos)
	{
		state.head_read += size(buffer);
		return {};
	}

	// This indicates how much head was just received from this buffer only,
	// including the terminator which is considered part of the dome.
	const size_t addl_head_bytes
	{
		pos + size(terminator)
	};

	// The received buffer may go past the end of the head.
	assert(addl_head_bytes <= size(buffer));
	const size_t beyond_head_len
	{
		size(buffer) - addl_head_bytes
	};

	// The final update for the confirmed length of the head.
	state.head_read += addl_head_bytes;
	const size_t &head_read{state.head_read};
	assert(head_read + beyond_head_len <= size(req.in.head));

	// Window on any data in the buffer after the head.
	const const_buffer beyond_head
	{
		data(req.in.head) + head_read, beyond_head_len
	};

	// Before changing the user's head buffer, we branch for a feature that
	// allows the user to receive head and content into a single contiguous
	// buffer by assigning in.content = in.head.
	const bool contiguous
	{
		data(req.in.content) == data(req.in.head)
	};

	// Alternatively branch for a feature that allows dynamic allocation of
	// the content buffer if the user did not specify any buffer.
	const bool dynamic
	{
		!contiguous && empty(req.in.content)
	};

	// Resize the user's head buffer tight to the head; this is how we convey
	// the size of the dome back to the user.
	req.in.head = mutable_buffer
	{
		data(req.in.head), head_read
	};

	// Setup the capstan and mark the end of the tape
	parse::buffer pb{req.in.head};
	parse::capstan pc{pb};
	pc.read += size(req.in.head);

	// Play the tape through the formal grammar.
	const http::response::head head{pc};
	assert(pb.completed() == head_read);
	state.status = http::status(head.status);
	state.content_length = head.content_length;

	// Proffer the HTTP head to the peer instance which owns the link working
	// this tag so it can learn from any header data.
	assert(link.peer);
	link.peer->handle_head_recv(link, *this, head);

	if(contiguous)
	{
		const auto content_max
		{
			std::max(ssize_t(size(req.in.content) - head_read), ssize_t(0))
		};

		req.in.content = mutable_buffer
		{
			data(req.in.head) + head_read, size_t(content_max)
		};
	}

	// Branch for starting chunked encoding. We feed it whatever we have from
	// beyond the head as whole or part (or none) of the first chunk. Similar
	// to the non-chunked routine below, beyond_head may include all of the
	// chunk content and then part of the next message too: read_chunk_head
	// will return anything beyond this message as overrun and indicate done.
	if(head.transfer_encoding == "chunked")
	{
		assert(!dynamic);

		const const_buffer chunk
		{
			data(req.in.content), move(req.in.content, beyond_head)
		};

		state.chunk_length = -1;
		const const_buffer overrun
		{
			read_chunk_head(chunk, done)
		};

		assert(empty(overrun) || done == true);
		return overrun;
	}

	// If no branch taken the rest of this function expects a content length
	// to be known from the received head.
	if(head.transfer_encoding)
		throw error
		{
			"Unsupported transfer-encoding '%s'", head.transfer_encoding
		};

	if(dynamic)
	{
		assert(req.opt);
		const size_t alloc_size
		{
			std::min(state.content_length, req.opt->content_length_maxalloc)
		};

		req.in.dynamic = unique_buffer<mutable_buffer>{alloc_size};
		req.in.content = req.in.dynamic;
	}

	// Now we check how much content was received beyond the head
	const size_t &content_read
	{
		std::min(state.content_length, beyond_head_len)
	};

	// Now we know how much bleed into the next message was also received
	assert(beyond_head_len >= content_read);
	const size_t beyond_content_len
	{
		beyond_head_len - content_read
	};

	const const_buffer partial_content
	{
		data(req.in.head) + head_read, content_read
	};

	// Anything remaining is not our response and must be given back.
	const const_buffer overrun
	{
		data(beyond_head) + size(partial_content), beyond_content_len
	};

	// Reduce the user's content buffer to the content-length. This is sort of
	// how we convey the content-length back to the user. The buffer size will
	// eventually reflect how much content was actually received; the user can
	// find the given content-length by parsing the header.
	req.in.content = mutable_buffer
	{
		data(req.in.content), std::min(state.content_length, size(req.in.content))
	};

	// Any partial content was written to the head buffer by accident,
	// that may have to be copied over to the content buffer.
	if(!empty(partial_content) && !contiguous)
		copy(req.in.content, partial_content);

	// Invoke the read_content() routine which will increment this->content_read
	read_content(partial_content, done);
	assert(state.content_read == size(partial_content));
	assert(state.content_read == state.content_length || !done);

	return overrun;
}

ircd::const_buffer
ircd::server::tag::read_content(const const_buffer &buffer,
                                bool &done)
{
	assert(request);
	auto &req{*request};
	const auto &content{req.in.content};

	// The amount of remaining content for the response sequence
	assert(size(content) + content_overflow() >= state.content_read);
	assert(size(content) + content_overflow() == state.content_length);
	const size_t remaining
	{
		size(content) + content_overflow() - state.content_read
	};

	// The amount of content read in this buffer only.
	const size_t addl_content_read
	{
		std::min(size(buffer), remaining)
	};

	state.content_read += addl_content_read;
	assert(size(buffer) - addl_content_read == 0);
	assert(state.content_read <= size(content) + content_overflow());
	assert(state.content_read <= state.content_length);

	// Invoke the user's optional progress callback; this function
	// should be marked noexcept for the time being.
	if(req.in.progress)
		req.in.progress(buffer, const_buffer{data(content), state.content_read});

	if(state.content_read == size(content) + content_overflow())
	{
		assert(state.content_read == state.content_length);
		assert(!done);
		done = true;
		set_value(state.status);
	}

	return {};
}

ircd::const_buffer
ircd::server::tag::read_chunk_head(const const_buffer &buffer,
                                   bool &done)
{
	assert(request);
	auto &req{*request};
	const auto &content{req.in.content};

	// informal search for head terminator
	static const string_view terminator{"\r\n"};
	const auto pos
	{
		string_view{buffer}.find(terminator)
	};

	if(pos == string_view::npos)
	{
		state.content_read += size(buffer);
		return {};
	}

	// This indicates how much head was just received from this buffer only.
	const size_t addl_head_bytes
	{
		pos + size(terminator)
	};

	// The received buffer may go past the end of the head.
	assert(addl_head_bytes <= size(buffer));
	const size_t beyond_head_length
	{
		size(buffer) - addl_head_bytes
	};

	// The total head length is found from the end of the last chunk content
	state.content_read += addl_head_bytes;
	assert(state.content_read > state.content_length);
	const size_t head_length
	{
		state.content_read - state.content_length
	};

	// Window on any data in the buffer after the head.
	const const_buffer beyond_head
	{
		data(content) + state.content_length + head_length, beyond_head_length
	};

	// Setup the capstan and mark the end of the tape
	parse::buffer pb
	{
		mutable_buffer
		{
			data(content) + state.content_length, head_length
		}
	};
	parse::capstan pc{pb};
	pc.read += head_length;

	// Play the tape through the formal grammar.
	const http::response::chunk chunk{pc};
	state.chunk_length = chunk.size + size(terminator);

	// Now we check how much chunk was received beyond the head
	const size_t &chunk_read
	{
		std::min(state.chunk_length, beyond_head_length)
	};

	// Now we know how much bleed into the next message was also received
	assert(beyond_head_length >= chunk_read);
	const size_t beyond_chunk_length
	{
		beyond_head_length - chunk_read
	};

	// Finally we erase the chunk head by replacing it with everything received
	// after it.
	const mutable_buffer target
	{
		data(content) + state.content_length, beyond_head_length
	};

	move(target, beyond_head);

	// Increment the content_length to now include this chunk
	state.content_length += state.chunk_length;

	// Adjust the content_read to erase the chunk head.
	state.content_read -= head_length;

	const const_buffer partial_chunk
	{
		data(target), chunk_read
	};

	const const_buffer overrun
	{
		data(target) + chunk_read, beyond_chunk_length
	};

	assert(state.chunk_length >= 2);
	read_chunk_content(partial_chunk, done);

	if(done)
		return overrun;

	return read_chunk_head(overrun, done);        // gobble gobbles
}

ircd::const_buffer
ircd::server::tag::read_chunk_content(const const_buffer &buffer,
                                      bool &done)
{
	assert(request);
	auto &req{*request};
	const auto &content{req.in.content};

	// The amount of remaining content for the response sequence
	const size_t remaining
	{
		content_remaining()
	};

	// The amount of content read in this buffer only.
	const size_t addl_content_read
	{
		std::min(size(buffer), remaining)
	};

	state.content_read += addl_content_read;
	if(state.content_read == state.content_length)
	{
		static const string_view terminator{"\r\n"};
		assert(state.content_length >= size(terminator));
		state.content_length -= size(terminator);
		state.content_read -= size(terminator);

		if(state.chunk_length == 2)
		{
			assert(!done);
			done = true;
			req.in.content = mutable_buffer{data(req.in.content), state.content_length};
			set_value(state.status);
		}
	}

	// Invoke the user's optional progress callback; this function
	// should be marked noexcept for the time being.
	if(req.in.progress && !done)
		req.in.progress(buffer, const_buffer{data(content), state.content_read});

	if(state.content_read == state.content_length)
		state.chunk_length = size_t(-1);

	return {};
}

/// An idempotent operation that provides the location of where the socket
/// should place the next received data. The tag figures this out based on
/// whether it receiving HTTP head data or whether it is in content mode.
///
ircd::mutable_buffer
ircd::server::tag::make_read_buffer()
const
{
	assert(request);
	assert(state.head_read <= size(request->in.head));
	assert(state.content_read <= state.content_length);

	if(state.status == (http::code)0)
		return make_read_head_buffer();

	if(state.content_read >= size(request->in.content))
		return make_read_discard_buffer();

	if(state.chunk_length == size_t(-1))
		return make_read_chunk_head_buffer();

	if(state.chunk_length)
		return make_read_chunk_content_buffer();

	return make_read_content_buffer();
}

ircd::mutable_buffer
ircd::server::tag::make_read_head_buffer()
const
{
	assert(request);
	const auto &req{*request};
	const auto &head{req.in.head};
	if(unlikely(size(req.in.head) <= state.head_read))
		throw buffer_overrun
		{
			"Supplied buffer of %zu too small for HTTP head", size(req.in.head)
		};

	const size_t remaining
	{
		size(head) - state.head_read
	};

	const mutable_buffer buffer
	{
		data(head) + state.head_read, remaining
	};

	assert(size(buffer) <= size(head));
	assert(size(buffer) > 0);
	return buffer;
}

ircd::mutable_buffer
ircd::server::tag::make_read_content_buffer()
const
{
	assert(request);
	const auto &req{*request};
	const auto &content{req.in.content};
	if(unlikely(size(content) <= state.content_read))
		throw buffer_overrun
		{
			"Content buffer of %zu bytes too small to read %zu bytes of content",
			size(content),
			state.content_length
		};

	// The amount of bytes we still have to read to for the response
	const size_t remaining
	{
		size(content) - state.content_read
	};

	assert(remaining > 0);
	return
	{
		data(content) + state.content_read, remaining
	};
}

/// The chunk head buffer starts after the last chunk ended and has a size of
/// the rest of the available content buffer (hopefully much less will be
/// needed). If only part of the chunk head was received previously this
/// function accounts for that by returning a buffer which starts at the
/// content_read offset (which is at the end of that previous read).
///
ircd::mutable_buffer
ircd::server::tag::make_read_chunk_head_buffer()
const
{
	assert(request);
	assert(state.chunk_length == size_t(-1));
	assert(state.content_read >= state.content_length);

	const auto &req{*request};
	const auto &content{req.in.content};
	if(unlikely(size(content) <= state.content_read))
		throw buffer_overrun
		{
			"Content buffer of %zu bytes too small to read next chunk header",
			size(content)
		};

	const size_t remaining
	{
		size(content) - state.content_read
	};

	const mutable_buffer buffer
	{
		data(content) + state.content_read, remaining
	};

	assert(size(buffer) > 0);
	return buffer;
}

ircd::mutable_buffer
ircd::server::tag::make_read_chunk_content_buffer()
const
{
	assert(request);
	assert(state.chunk_length > 0);
	assert(state.content_read <= state.content_length);

	const auto &req{*request};
	const auto &content{req.in.content};

	assert(size(content) >= state.content_read);
	const size_t buffer_remaining
	{
		size(content) - state.content_read
	};

	const size_t chunk_remaining
	{
		content_remaining()
	};

	assert(chunk_remaining <= state.chunk_length);
	assert(chunk_remaining == state.content_length - state.content_read);
	const size_t buffer_size
	{
		std::min(buffer_remaining, chunk_remaining)
	};

	if(unlikely(buffer_size < chunk_remaining))
		throw buffer_overrun
		{
			"Content buffer of %zu bytes too small to read remaining %zu of chunk",
			size(content),
			chunk_remaining
		};

	const mutable_buffer buffer
	{
		data(content) + state.content_read, buffer_size
	};

	assert(size(buffer) > 0);
	return buffer;
}

ircd::mutable_buffer
ircd::server::tag::make_read_discard_buffer()
const
{
	assert(request);
	assert(content_overflow() > 0);
	assert(content_overflow() <= state.content_read);
	assert(state.content_read >= size(request->in.content));
	const size_t remaining
	{
		content_overflow() - state.content_read
	};

	static char buffer[512];
	const size_t buffer_max
	{
		std::min(remaining, sizeof(buffer))
	};

	return
	{
		buffer, buffer_max
	};
}

size_t
ircd::server::tag::content_remaining()
const
{
	assert(state.content_length >= state.content_read);
	return state.content_length - state.content_read;
}

size_t
ircd::server::tag::content_overflow()
const
{
	assert(request);
	const auto &req{*request};
	const ssize_t diff(state.content_length - size(req.in.content));
	return std::max(diff, ssize_t(0));
}

template<class... args>
void
ircd::server::tag::set_value(args&&... a)
{
	if(abandoned())
		return;

	const http::code &code
	{
		std::forward<args>(a)...
	};

	assert(request->opt);
	if(request->opt->http_exceptions && code >= http::code(300))
	{
		const string_view content
		{
			data(request->in.content), size(request->in.content)
		};

		set_exception(http::error{code, std::string{content}});
		return;
	}

	p.set_value(code);
}

template<class... args>
void
ircd::server::tag::set_exception(args&&... a)
{
	if(abandoned())
		return;

	set_exception(std::make_exception_ptr(std::forward<args>(a)...));
}

void
ircd::server::tag::set_exception(std::exception_ptr eptr)
{
	if(abandoned())
		return;

	p.set_exception(std::move(eptr));
}

bool
ircd::server::tag::abandoned()
const
{
	return !p.valid();
}

bool
ircd::server::tag::canceled()
const
{
	return bool(cancellation);
}

bool
ircd::server::tag::committed()
const
{
	return write_completed() > 0;
}

size_t
ircd::server::tag::read_remaining()
const
{
	return read_size() - read_completed();
}

size_t
ircd::server::tag::read_completed()
const
{
	return state.head_read + state.content_read;
}

size_t
ircd::server::tag::read_size()
const
{
	return state.head_read + state.content_length;
}

size_t
ircd::server::tag::write_remaining()
const
{
	return write_size() - write_completed();
}

size_t
ircd::server::tag::write_completed()
const
{
	return state.written;
}

size_t
ircd::server::tag::write_size()
const
{
	return request? size(request->out) : 0;
}