modules/client/keys/query: Refactor receive handling for properly composed response.

This commit is contained in:
Jason Volk 2023-04-25 12:49:55 -07:00
parent 91a8fcbe43
commit 6f5121dc6a
1 changed files with 210 additions and 241 deletions

View File

@ -19,8 +19,30 @@ namespace
using buffer_list = std::vector<unique_buffer<mutable_buffer>>;
}
static host_users_map
parse_user_request(const json::object &device_keys);
static void
handle_cross_keys(const m::resource::request &,
query_map &,
failure_map &,
json::stack::object &,
const string_view &,
const bool match_user = false);
static void
handle_device_keys(const m::resource::request &,
query_map &,
failure_map &,
json::stack::object &);
static void
handle_responses(const m::resource::request &,
query_map &,
failure_map &,
json::stack::object &);
static void
handle_errors(const m::resource::request &,
query_map &,
failure_map &);
static bool
send_request(const string_view &,
@ -34,19 +56,8 @@ send_requests(const host_users_map &,
buffer_list &,
failure_map &);
static void
recv_response(const m::resource::request &,
const string_view &,
m::fed::user::keys::query &,
failure_map &,
json::stack::object &);
static void
recv_responses(const m::resource::request &,
query_map &,
failure_map &,
json::stack::object &,
const milliseconds &);
static host_users_map
parse_user_request(const json::object &device_keys);
static void
handle_failures(const failure_map &,
@ -148,6 +159,20 @@ post__keys_query(client &client,
send_requests(map, buffers, failures)
};
auto responses
{
ctx::when_all(begin(queries), end(queries), []
(auto &it) -> m::fed::user::keys::query &
{
return it->second;
})
};
const bool all_good
{
responses.wait_until(now<system_point>() + timeout, std::nothrow)
};
m::resource::response::chunked response
{
client, http::OK
@ -163,9 +188,9 @@ post__keys_query(client &client,
out
};
recv_responses(request, queries, failures, top, timeout);
handle_responses(request, queries, failures, top);
handle_failures(failures, top);
return {};
return response;
}
void
@ -177,216 +202,39 @@ handle_failures(const failure_map &failures,
out, "failures"
};
for(const auto &p : failures)
{
const string_view &hostname(p.first);
const std::exception_ptr &eptr(p.second);
for(const auto &[remote, eptr] : failures)
json::stack::member
{
response_failures, hostname, what(eptr)
response_failures, remote, what(eptr)
};
}
}
void
recv_responses(const m::resource::request &client_request,
query_map &queries,
failure_map &failures,
json::stack::object &out,
const milliseconds &timeout)
try
host_users_map
parse_user_request(const json::object &device_keys)
{
const system_point timedout
host_users_map ret;
for(const auto &member : device_keys)
{
ircd::now<system_point>() + timeout
};
const m::user::id &user_id(member.first);
const json::array &device_ids(member.second);
const string_view &host(user_id.host());
while(!queries.empty())
{
static const auto dereferencer{[]
(auto &it) -> m::fed::user::keys::query &
auto it(ret.lower_bound(host));
if(it == end(ret) || it->first != host)
it = ret.emplace_hint(it, host, user_devices_map{});
user_devices_map &users(it->second);
{
return it->second;
}};
auto it(users.lower_bound(user_id));
if(it == end(users) || it->first != user_id)
it = users.emplace_hint(it, user_id, json::array{});
auto next
{
ctx::when_any(begin(queries), end(queries), dereferencer)
};
next.wait_until(timedout); // throws on timeout
const auto it{next.get()};
const unwind remove{[&queries, &it]
{
queries.erase(it);
}};
const auto &remote(it->first);
auto &request(it->second);
assert(!failures.count(remote));
if(failures.count(remote))
continue;
recv_response(client_request, remote, request, failures, out);
}
}
catch(const std::exception &)
{
for(const auto &[remote, request] : queries)
failures.emplace(remote, std::current_exception());
}
void
recv_response(const m::resource::request &client_request,
const string_view &remote,
m::fed::user::keys::query &request,
failure_map &failures,
json::stack::object &out)
try
{
const auto code
{
request.get()
};
const json::object response
{
request
};
// device_keys
{
json::stack::object object
{
out, "device_keys"
};
const json::object &device_keys
{
response["device_keys"]
};
for(const auto &[_user_id, device_keys] : device_keys)
{
const m::user::id &user_id
{
_user_id
};
json::stack::object user_object
{
object, user_id
};
for(const auto &[device_id, keys] : json::object(device_keys))
json::stack::member
{
user_object, device_id, keys
};
if(!empty(device_ids))
it->second = device_ids;
}
}
// master_keys
{
json::stack::object object
{
out, "master_keys"
};
const json::object &master_keys
{
response["master_keys"]
};
for(const auto &[_user_id, master_key] : master_keys)
{
const m::user::id &user_id
{
_user_id
};
json::stack::member
{
object, user_id, json::object
{
master_key
}
};
}
}
// self_signing_keys
{
json::stack::object object
{
out, "self_signing_keys"
};
const json::object &self_signing_keys
{
response["self_signing_keys"]
};
for(const auto &[_user_id, self_signing_key] : self_signing_keys)
{
const m::user::id &user_id
{
_user_id
};
json::stack::member
{
object, user_id, json::object
{
self_signing_key
}
};
}
}
// user_signing_keys
{
json::stack::object object
{
out, "user_signing_keys"
};
const json::object &user_signing_keys
{
response["user_signing_keys"]
};
for(const auto &[_user_id, user_signing_key] : user_signing_keys)
{
const m::user::id &user_id
{
_user_id
};
if(client_request.user_id != _user_id)
continue;
json::stack::member
{
object, user_id, json::object
{
user_signing_key
}
};
}
}
}
catch(const std::exception &e)
{
log::error
{
m::log, "user keys query from %s :%s",
remote,
e.what()
};
failures.emplace(remote, std::current_exception());
return ret;
}
query_map
@ -395,12 +243,8 @@ send_requests(const host_users_map &hosts,
failure_map &failures)
{
query_map ret;
for(const auto &pair : hosts)
{
const string_view &remote(pair.first);
const user_devices_map &user_devices(pair.second);
for(const auto &[remote, user_devices] : hosts)
send_request(remote, user_devices, failures, buffers, ret);
}
return ret;
}
@ -441,43 +285,168 @@ try
return true;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::error
failures.emplace(remote, std::current_exception());
log::derror
{
m::log, "user keys query to %s :%s",
remote,
e.what()
};
failures.emplace(remote, std::current_exception());
return false;
}
host_users_map
parse_user_request(const json::object &device_keys)
void
handle_responses(const m::resource::request &request,
query_map &queries,
failure_map &failures,
json::stack::object &out)
{
host_users_map ret;
for(const auto &member : device_keys)
handle_errors(request, queries, failures);
handle_device_keys(request, queries, failures, out);
handle_cross_keys(request, queries, failures, out, "master_keys");
handle_cross_keys(request, queries, failures, out, "self_signing_keys");
handle_cross_keys(request, queries, failures, out, "user_signing_keys", true);
}
void
handle_errors(const m::resource::request &request,
query_map &queries,
failure_map &failures)
{
auto it(begin(queries));
while(it != end(queries))
{
const m::user::id &user_id(member.first);
const json::array &device_ids(member.second);
const string_view &host(user_id.host());
auto it(ret.lower_bound(host));
if(it == end(ret) || it->first != host)
it = ret.emplace_hint(it, host, user_devices_map{});
user_devices_map &users(it->second);
const auto &[remote, query] {*it};
if(query.eptr())
{
auto it(users.lower_bound(user_id));
if(it == end(users) || it->first != user_id)
it = users.emplace_hint(it, user_id, json::array{});
failures.emplace(remote, query.eptr());
it = queries.erase(it);
}
else ++it;
}
}
if(!empty(device_ids))
it->second = device_ids;
void
handle_device_keys(const m::resource::request &request,
query_map &queries,
failure_map &failures,
json::stack::object &out)
{
json::stack::object object
{
out, "device_keys"
};
for(auto &[remote, query] : queries) try
{
const json::object response
{
query.in.content
};
const json::object &device_keys
{
response["device_keys"]
};
for(const auto &[user_id, device_keys] : device_keys)
{
if(m::user::id(user_id).host() != remote)
continue;
json::stack::object user_object
{
object, user_id
};
for(const auto &[device_id, keys] : json::object(device_keys))
json::stack::member
{
user_object, device_id, keys
};
}
}
return ret;
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
failures.emplace(remote, std::current_exception());
log::derror
{
m::log, "Processing device_keys response from '%s' :%s",
remote,
e.what(),
};
}
}
void
handle_cross_keys(const m::resource::request &request,
query_map &queries,
failure_map &failures,
json::stack::object &out_,
const string_view &name,
const bool match_user)
{
json::stack::object out
{
out_, name
};
for(auto &[remote, query] : queries) try
{
if(match_user && request.user_id.host() != remote)
continue;
const json::object response
{
query.in.content
};
const json::object &object
{
response[name]
};
for(const auto &[user_id, keys] : object)
{
if(m::user::id(user_id).host() != remote)
continue;
if(match_user && request.user_id != user_id)
continue;
json::stack::member
{
out, user_id, json::object
{
keys
}
};
}
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
failures.emplace(remote, std::current_exception());
log::derror
{
m::log, "Processing %s response from '%s' :%s",
name,
remote,
e.what(),
};
}
}