mirror of
https://github.com/matrix-construct/construct
synced 2024-11-29 10:12:39 +01:00
ircd: Fix/Address client current request flow for longpolls.
This commit is contained in:
parent
67fba3cd7b
commit
2be10ef206
5 changed files with 68 additions and 85 deletions
|
@ -35,19 +35,29 @@ struct ircd::client
|
|||
struct settings;
|
||||
struct request;
|
||||
|
||||
static constexpr const size_t &HEAD_MAX
|
||||
{
|
||||
4_KiB
|
||||
};
|
||||
|
||||
static struct settings settings;
|
||||
static struct conf default_conf;
|
||||
static ctx::pool context;
|
||||
|
||||
std::shared_ptr<socket> sock;
|
||||
struct conf *conf {&default_conf};
|
||||
struct request *request {nullptr};
|
||||
unique_buffer<mutable_buffer> headbuf;
|
||||
std::shared_ptr<socket> sock;
|
||||
ircd::timer timer;
|
||||
http::request::head head;
|
||||
size_t head_length {0};
|
||||
size_t content_consumed {0};
|
||||
bool longpoll {false};
|
||||
|
||||
void close(const net::close_opts &, net::close_callback);
|
||||
ctx::future<void> close(const net::close_opts & = {});
|
||||
|
||||
void discard_unconsumed(struct request &);
|
||||
bool resource_request(struct request &);
|
||||
void discard_unconsumed();
|
||||
bool resource_request();
|
||||
bool handle_request(parse::capstan &pc);
|
||||
bool main() noexcept;
|
||||
void async();
|
||||
|
@ -65,24 +75,6 @@ struct ircd::client
|
|||
friend ipport local(const client &);
|
||||
};
|
||||
|
||||
/// Organizes components of an individual request. A pointer to this structure
|
||||
/// is placed as a member of client when a request is being made; this allows
|
||||
/// for access to it without a separate argument wherever client goes.
|
||||
struct ircd::client::request
|
||||
{
|
||||
static constexpr size_t HEAD_MAX
|
||||
{
|
||||
4_KiB
|
||||
};
|
||||
|
||||
ircd::timer timer;
|
||||
http::request::head head;
|
||||
size_t content_consumed {0};
|
||||
string_view content_partial;
|
||||
|
||||
request(parse::capstan &pc);
|
||||
};
|
||||
|
||||
/// Confs can be attached to individual clients to change their behavior
|
||||
struct ircd::client::conf
|
||||
{
|
||||
|
|
|
@ -40,7 +40,7 @@ struct ircd::resource
|
|||
public:
|
||||
method &operator[](const string_view &path);
|
||||
|
||||
void operator()(client &, struct client::request &, const http::request::head &);
|
||||
void operator()(client &, const http::request::head &, const string_view &content_partial);
|
||||
|
||||
resource(const string_view &path, const opts &);
|
||||
resource(const string_view &path);
|
||||
|
|
|
@ -270,6 +270,9 @@ ircd::handle_client_ready(std::shared_ptr<client> client,
|
|||
if(!handle_ec(*client, ec))
|
||||
return;
|
||||
|
||||
if(client->longpoll)
|
||||
return;
|
||||
|
||||
auto handler
|
||||
{
|
||||
std::bind(ircd::handle_client_request, std::move(client))
|
||||
|
@ -431,7 +434,8 @@ ircd::client::client()
|
|||
}
|
||||
|
||||
ircd::client::client(std::shared_ptr<socket> sock)
|
||||
:sock{std::move(sock)}
|
||||
:headbuf{HEAD_MAX}
|
||||
,sock{std::move(sock)}
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -475,13 +479,15 @@ bool
|
|||
ircd::client::main()
|
||||
noexcept try
|
||||
{
|
||||
char buffer[client::request::HEAD_MAX];
|
||||
parse::buffer pb{mutable_buffer{buffer}};
|
||||
parse::buffer pb{headbuf};
|
||||
parse::capstan pc{pb, read_closure(*this)}; do
|
||||
{
|
||||
if(!handle_request(pc))
|
||||
return false;
|
||||
|
||||
if(longpoll)
|
||||
return true;
|
||||
|
||||
// After the request, the head and content has been read off the socket
|
||||
// and the capstan has advanced to the end of the content. The catch is
|
||||
// that reading off the socket could have read too much, bleeding into
|
||||
|
@ -574,35 +580,6 @@ catch(const std::exception &e)
|
|||
#endif
|
||||
}
|
||||
|
||||
/// The constructor for request state is only made in
|
||||
/// client::handle_request(). It is defined here to be adjacent to that
|
||||
/// callsite
|
||||
///
|
||||
ircd::client::request::request(parse::capstan &pc)
|
||||
:head
|
||||
{
|
||||
// This is the first read off the wire. The headers are entirely read and
|
||||
// the tape is advanced.
|
||||
pc
|
||||
}
|
||||
,content_consumed
|
||||
{
|
||||
// The size of HTTP headers are never initially known, which means
|
||||
// the above head parse could have read too much off the socket bleeding
|
||||
// into the content or even the next request entirely. That's ok because
|
||||
// the state of `pc` will reflect that back to the main() loop for the
|
||||
// next request, but for this request we have to figure out how much of
|
||||
// the content was accidentally read so far.
|
||||
std::min(pc.unparsed(), head.content_length)
|
||||
}
|
||||
,content_partial
|
||||
{
|
||||
pc.parsed, content_consumed
|
||||
}
|
||||
{
|
||||
pc.parsed += content_consumed;
|
||||
}
|
||||
|
||||
/// Handle a single request within the client main() loop.
|
||||
///
|
||||
/// This function returns false if the main() loop should exit
|
||||
|
@ -623,24 +600,30 @@ try
|
|||
*sock, conf->request_timeout
|
||||
};
|
||||
|
||||
struct request request{pc};
|
||||
// This is the first read off the wire. The headers are entirely read and
|
||||
// the tape is advanced.
|
||||
timer = ircd::timer{};
|
||||
head = http::request::head{pc};
|
||||
head_length = pc.parsed - data(headbuf);
|
||||
content_consumed = std::min(pc.unparsed(), head.content_length);
|
||||
pc.parsed += content_consumed;
|
||||
assert(pc.parsed <= pc.read);
|
||||
this->request = &request;
|
||||
log::debug("socket(%p) local[%s] remote[%s] HTTP %s `%s' content-length:%zu part:%zu",
|
||||
|
||||
log::debug("socket(%p) local[%s] remote[%s] HTTP %s `%s' content-length:%zu have:%zu",
|
||||
sock.get(),
|
||||
string(local(*this)),
|
||||
string(remote(*this)),
|
||||
request.head.method,
|
||||
request.head.path,
|
||||
request.head.content_length,
|
||||
request.content_consumed);
|
||||
head.method,
|
||||
head.path,
|
||||
head.content_length,
|
||||
content_consumed);
|
||||
|
||||
bool ret
|
||||
{
|
||||
resource_request(request)
|
||||
resource_request()
|
||||
};
|
||||
|
||||
if(ret && iequals(request.head.connection, "close"_sv))
|
||||
if(ret && iequals(head.connection, "close"_sv))
|
||||
ret = false;
|
||||
|
||||
return ret;
|
||||
|
@ -674,15 +657,20 @@ catch(const ircd::error &e)
|
|||
}
|
||||
|
||||
bool
|
||||
ircd::client::resource_request(struct request &request)
|
||||
ircd::client::resource_request()
|
||||
try
|
||||
{
|
||||
auto &resource
|
||||
const string_view content_partial
|
||||
{
|
||||
ircd::resource::find(request.head.path)
|
||||
data(headbuf) + head_length, content_consumed
|
||||
};
|
||||
|
||||
resource(*this, request, request.head);
|
||||
auto &resource
|
||||
{
|
||||
ircd::resource::find(head.path)
|
||||
};
|
||||
|
||||
resource(*this, head, content_partial);
|
||||
return true;
|
||||
}
|
||||
catch(const http::error &e)
|
||||
|
@ -705,20 +693,20 @@ catch(const http::error &e)
|
|||
// These codes are "recoverable" and allow the next HTTP request in
|
||||
// a pipeline to take place.
|
||||
default:
|
||||
discard_unconsumed(request);
|
||||
discard_unconsumed();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ircd::client::discard_unconsumed(struct request &request)
|
||||
ircd::client::discard_unconsumed()
|
||||
{
|
||||
if(unlikely(!sock))
|
||||
return;
|
||||
|
||||
const size_t unconsumed
|
||||
{
|
||||
request.head.content_length - request.content_consumed
|
||||
head.content_length - content_consumed
|
||||
};
|
||||
|
||||
if(!unconsumed)
|
||||
|
@ -729,10 +717,10 @@ ircd::client::discard_unconsumed(struct request &request)
|
|||
string(local(*this)),
|
||||
string(remote(*this)),
|
||||
unconsumed,
|
||||
request.head.content_length);
|
||||
head.content_length);
|
||||
|
||||
request.content_consumed += net::discard_all(*sock, unconsumed);
|
||||
assert(request.content_consumed == request.head.content_length);
|
||||
content_consumed += net::discard_all(*sock, unconsumed);
|
||||
assert(content_consumed == head.content_length);
|
||||
}
|
||||
|
||||
ircd::ctx::future<void>
|
||||
|
|
|
@ -227,8 +227,8 @@ catch(const std::exception &e)
|
|||
|
||||
void
|
||||
ircd::resource::operator()(client &client,
|
||||
struct client::request &request,
|
||||
const http::request::head &head)
|
||||
const http::request::head &head,
|
||||
const string_view &content_partial)
|
||||
{
|
||||
// Find the method or METHOD_NOT_ALLOWED
|
||||
auto &method
|
||||
|
@ -245,27 +245,27 @@ ircd::resource::operator()(client &client,
|
|||
|
||||
const size_t content_remain
|
||||
{
|
||||
head.content_length - request.content_consumed
|
||||
head.content_length - client.content_consumed
|
||||
};
|
||||
|
||||
unique_buffer<mutable_buffer> content_buffer;
|
||||
string_view content{request.content_partial};
|
||||
string_view content{content_partial};
|
||||
if(content_remain)
|
||||
{
|
||||
// Copy any partial content to the final contiguous allocated buffer;
|
||||
content_buffer = unique_buffer<mutable_buffer>{head.content_length};
|
||||
memcpy(data(content_buffer), data(request.content_partial), size(request.content_partial));
|
||||
memcpy(data(content_buffer), data(content_partial), size(content_partial));
|
||||
|
||||
// Setup a window inside the buffer for the remaining socket read.
|
||||
const mutable_buffer content_remain_buffer
|
||||
{
|
||||
data(content_buffer) + size(request.content_partial), content_remain
|
||||
data(content_buffer) + size(content_partial), content_remain
|
||||
};
|
||||
|
||||
//TODO: more discretion from the method.
|
||||
// Read the remaining content off the socket.
|
||||
request.content_consumed += read_all(*client.sock, content_remain_buffer);
|
||||
assert(request.content_consumed == head.content_length);
|
||||
client.content_consumed += read_all(*client.sock, content_remain_buffer);
|
||||
assert(client.content_consumed == head.content_length);
|
||||
content = string_view
|
||||
{
|
||||
data(content_buffer), head.content_length
|
||||
|
@ -622,10 +622,9 @@ ircd::resource::response::response(client &client,
|
|||
const http::code &code,
|
||||
const string_view &headers)
|
||||
{
|
||||
assert(client.request);
|
||||
const auto request_time
|
||||
{
|
||||
client.request->timer.at<microseconds>().count()
|
||||
client.timer.at<microseconds>().count()
|
||||
};
|
||||
|
||||
const fmt::bsprintf<64> rtime
|
||||
|
@ -684,7 +683,7 @@ ircd::resource::response::response(client &client,
|
|||
int(code),
|
||||
http::status(code),
|
||||
request_time,
|
||||
(client.request->timer.at<microseconds>().count() - request_time),
|
||||
(client.timer.at<microseconds>().count() - request_time),
|
||||
content_type,
|
||||
content.size()
|
||||
};
|
||||
|
|
|
@ -204,6 +204,8 @@ longpoll(client &client,
|
|||
|
||||
if(pollout.size() == 1)
|
||||
notify(synchronizer_timeout_context);
|
||||
|
||||
client.longpoll = true;
|
||||
}
|
||||
|
||||
//
|
||||
|
@ -452,6 +454,8 @@ try
|
|||
}
|
||||
};
|
||||
|
||||
client->longpoll = false;
|
||||
client->async();
|
||||
return true;
|
||||
}
|
||||
catch(const std::bad_weak_ptr &e)
|
||||
|
|
Loading…
Reference in a new issue