0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-27 07:54:05 +01:00

modules/client/keys/query: Eliminate head-of-line timeout; stream results to client as they arrive.

This commit is contained in:
Jason Volk 2019-04-18 19:47:33 -07:00
parent 0176dba9a8
commit 5ad6d2153e

View file

@ -35,8 +35,7 @@ static void
recv_response(const string_view &,
m::v1::user::keys::query &,
failure_map &,
json::stack::object &,
const steady_point &);
json::stack::object &);
static void
recv_responses(query_map &,
@ -195,6 +194,7 @@ recv_responses(query_map &queries,
failure_map &failures,
json::stack::object &out,
const milliseconds &timeout)
try
{
const steady_point timedout
{
@ -206,28 +206,49 @@ recv_responses(query_map &queries,
out, "device_keys"
};
for(auto &pair : queries)
while(!queries.empty())
{
const auto &remote(pair.first);
auto &request(pair.second);
static const auto dereferencer{[]
(auto &it) -> m::v1::user::keys::query &
{
return it->second;
}};
auto next
{
ctx::when_any(begin(queries), end(queries), dereferencer)
};
next.wait_until(timedout); // throws on timeout
const auto it{next.get()};
const unwind remove{[&queries, &it]
{
queries.erase(it);
}};
const auto &remote(it->first);
auto &request(it->second);
assert(!failures.count(remote));
if(failures.count(remote))
continue;
recv_response(remote, request, failures, response_keys, timedout);
recv_response(remote, request, failures, response_keys);
}
}
catch(const std::exception &)
{
for(const auto &[remote, request] : queries)
failures.emplace(remote, std::current_exception());
}
void
recv_response(const string_view &remote,
m::v1::user::keys::query &request,
failure_map &failures,
json::stack::object &object,
const steady_point &timeout)
json::stack::object &object)
try
{
request.wait_until(timeout);
const auto code
{
request.get()