From 6f5121dc6a20a25062fe87c9961bba9457eb2086 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 25 Apr 2023 12:49:55 -0700 Subject: [PATCH] modules/client/keys/query: Refactor receive handling for properly composed response. --- modules/client/keys/query.cc | 451 ++++++++++++++++------------------- 1 file changed, 210 insertions(+), 241 deletions(-) diff --git a/modules/client/keys/query.cc b/modules/client/keys/query.cc index 0d923d178..c51b3ae52 100644 --- a/modules/client/keys/query.cc +++ b/modules/client/keys/query.cc @@ -19,8 +19,30 @@ namespace using buffer_list = std::vector>; } -static host_users_map -parse_user_request(const json::object &device_keys); +static void +handle_cross_keys(const m::resource::request &, + query_map &, + failure_map &, + json::stack::object &, + const string_view &, + const bool match_user = false); + +static void +handle_device_keys(const m::resource::request &, + query_map &, + failure_map &, + json::stack::object &); + +static void +handle_responses(const m::resource::request &, + query_map &, + failure_map &, + json::stack::object &); + +static void +handle_errors(const m::resource::request &, + query_map &, + failure_map &); static bool send_request(const string_view &, @@ -34,19 +56,8 @@ send_requests(const host_users_map &, buffer_list &, failure_map &); -static void -recv_response(const m::resource::request &, - const string_view &, - m::fed::user::keys::query &, - failure_map &, - json::stack::object &); - -static void -recv_responses(const m::resource::request &, - query_map &, - failure_map &, - json::stack::object &, - const milliseconds &); +static host_users_map +parse_user_request(const json::object &device_keys); static void handle_failures(const failure_map &, @@ -148,6 +159,20 @@ post__keys_query(client &client, send_requests(map, buffers, failures) }; + auto responses + { + ctx::when_all(begin(queries), end(queries), [] + (auto &it) -> m::fed::user::keys::query & + { + return it->second; + }) + }; + + const bool all_good + { + responses.wait_until(now() + timeout, std::nothrow) + }; + m::resource::response::chunked response { client, http::OK @@ -163,9 +188,9 @@ post__keys_query(client &client, out }; - recv_responses(request, queries, failures, top, timeout); + handle_responses(request, queries, failures, top); handle_failures(failures, top); - return {}; + return response; } void @@ -177,216 +202,39 @@ handle_failures(const failure_map &failures, out, "failures" }; - for(const auto &p : failures) - { - const string_view &hostname(p.first); - const std::exception_ptr &eptr(p.second); + for(const auto &[remote, eptr] : failures) json::stack::member { - response_failures, hostname, what(eptr) + response_failures, remote, what(eptr) }; - } } -void -recv_responses(const m::resource::request &client_request, - query_map &queries, - failure_map &failures, - json::stack::object &out, - const milliseconds &timeout) -try +host_users_map +parse_user_request(const json::object &device_keys) { - const system_point timedout + host_users_map ret; + for(const auto &member : device_keys) { - ircd::now() + timeout - }; + const m::user::id &user_id(member.first); + const json::array &device_ids(member.second); + const string_view &host(user_id.host()); - while(!queries.empty()) - { - static const auto dereferencer{[] - (auto &it) -> m::fed::user::keys::query & + auto it(ret.lower_bound(host)); + if(it == end(ret) || it->first != host) + it = ret.emplace_hint(it, host, user_devices_map{}); + + user_devices_map &users(it->second); { - return it->second; - }}; + auto it(users.lower_bound(user_id)); + if(it == end(users) || it->first != user_id) + it = users.emplace_hint(it, user_id, json::array{}); - auto next - { - ctx::when_any(begin(queries), end(queries), dereferencer) - }; - - next.wait_until(timedout); // throws on timeout - const auto it{next.get()}; - const unwind remove{[&queries, &it] - { - queries.erase(it); - }}; - - const auto &remote(it->first); - auto &request(it->second); - - assert(!failures.count(remote)); - if(failures.count(remote)) - continue; - - recv_response(client_request, remote, request, failures, out); - } -} -catch(const std::exception &) -{ - for(const auto &[remote, request] : queries) - failures.emplace(remote, std::current_exception()); -} - -void -recv_response(const m::resource::request &client_request, - const string_view &remote, - m::fed::user::keys::query &request, - failure_map &failures, - json::stack::object &out) -try -{ - const auto code - { - request.get() - }; - - const json::object response - { - request - }; - - // device_keys - { - json::stack::object object - { - out, "device_keys" - }; - - const json::object &device_keys - { - response["device_keys"] - }; - - for(const auto &[_user_id, device_keys] : device_keys) - { - const m::user::id &user_id - { - _user_id - }; - - json::stack::object user_object - { - object, user_id - }; - - for(const auto &[device_id, keys] : json::object(device_keys)) - json::stack::member - { - user_object, device_id, keys - }; + if(!empty(device_ids)) + it->second = device_ids; } } - // master_keys - { - json::stack::object object - { - out, "master_keys" - }; - - const json::object &master_keys - { - response["master_keys"] - }; - - for(const auto &[_user_id, master_key] : master_keys) - { - const m::user::id &user_id - { - _user_id - }; - - json::stack::member - { - object, user_id, json::object - { - master_key - } - }; - } - } - - // self_signing_keys - { - json::stack::object object - { - out, "self_signing_keys" - }; - - const json::object &self_signing_keys - { - response["self_signing_keys"] - }; - - for(const auto &[_user_id, self_signing_key] : self_signing_keys) - { - const m::user::id &user_id - { - _user_id - }; - - json::stack::member - { - object, user_id, json::object - { - self_signing_key - } - }; - } - } - - // user_signing_keys - { - json::stack::object object - { - out, "user_signing_keys" - }; - - const json::object &user_signing_keys - { - response["user_signing_keys"] - }; - - for(const auto &[_user_id, user_signing_key] : user_signing_keys) - { - const m::user::id &user_id - { - _user_id - }; - - if(client_request.user_id != _user_id) - continue; - - json::stack::member - { - object, user_id, json::object - { - user_signing_key - } - }; - } - } -} -catch(const std::exception &e) -{ - log::error - { - m::log, "user keys query from %s :%s", - remote, - e.what() - }; - - failures.emplace(remote, std::current_exception()); + return ret; } query_map @@ -395,12 +243,8 @@ send_requests(const host_users_map &hosts, failure_map &failures) { query_map ret; - for(const auto &pair : hosts) - { - const string_view &remote(pair.first); - const user_devices_map &user_devices(pair.second); + for(const auto &[remote, user_devices] : hosts) send_request(remote, user_devices, failures, buffers, ret); - } return ret; } @@ -441,43 +285,168 @@ try return true; } +catch(const ctx::interrupted &e) +{ + throw; +} catch(const std::exception &e) { - log::error + failures.emplace(remote, std::current_exception()); + log::derror { m::log, "user keys query to %s :%s", remote, e.what() }; - failures.emplace(remote, std::current_exception()); return false; } -host_users_map -parse_user_request(const json::object &device_keys) +void +handle_responses(const m::resource::request &request, + query_map &queries, + failure_map &failures, + json::stack::object &out) { - host_users_map ret; - for(const auto &member : device_keys) + handle_errors(request, queries, failures); + handle_device_keys(request, queries, failures, out); + handle_cross_keys(request, queries, failures, out, "master_keys"); + handle_cross_keys(request, queries, failures, out, "self_signing_keys"); + handle_cross_keys(request, queries, failures, out, "user_signing_keys", true); +} + +void +handle_errors(const m::resource::request &request, + query_map &queries, + failure_map &failures) +{ + auto it(begin(queries)); + while(it != end(queries)) { - const m::user::id &user_id(member.first); - const json::array &device_ids(member.second); - const string_view &host(user_id.host()); - - auto it(ret.lower_bound(host)); - if(it == end(ret) || it->first != host) - it = ret.emplace_hint(it, host, user_devices_map{}); - - user_devices_map &users(it->second); + const auto &[remote, query] {*it}; + if(query.eptr()) { - auto it(users.lower_bound(user_id)); - if(it == end(users) || it->first != user_id) - it = users.emplace_hint(it, user_id, json::array{}); + failures.emplace(remote, query.eptr()); + it = queries.erase(it); + } + else ++it; + } +} - if(!empty(device_ids)) - it->second = device_ids; +void +handle_device_keys(const m::resource::request &request, + query_map &queries, + failure_map &failures, + json::stack::object &out) +{ + json::stack::object object + { + out, "device_keys" + }; + + for(auto &[remote, query] : queries) try + { + const json::object response + { + query.in.content + }; + + const json::object &device_keys + { + response["device_keys"] + }; + + for(const auto &[user_id, device_keys] : device_keys) + { + if(m::user::id(user_id).host() != remote) + continue; + + json::stack::object user_object + { + object, user_id + }; + + for(const auto &[device_id, keys] : json::object(device_keys)) + json::stack::member + { + user_object, device_id, keys + }; } } - - return ret; + catch(const ctx::interrupted &) + { + throw; + } + catch(const std::exception &e) + { + failures.emplace(remote, std::current_exception()); + log::derror + { + m::log, "Processing device_keys response from '%s' :%s", + remote, + e.what(), + }; + } +} + +void +handle_cross_keys(const m::resource::request &request, + query_map &queries, + failure_map &failures, + json::stack::object &out_, + const string_view &name, + const bool match_user) +{ + json::stack::object out + { + out_, name + }; + + for(auto &[remote, query] : queries) try + { + if(match_user && request.user_id.host() != remote) + continue; + + const json::object response + { + query.in.content + }; + + const json::object &object + { + response[name] + }; + + for(const auto &[user_id, keys] : object) + { + if(m::user::id(user_id).host() != remote) + continue; + + if(match_user && request.user_id != user_id) + continue; + + json::stack::member + { + out, user_id, json::object + { + keys + } + }; + } + } + catch(const ctx::interrupted &) + { + throw; + } + catch(const std::exception &e) + { + failures.emplace(remote, std::current_exception()); + log::derror + { + m::log, "Processing %s response from '%s' :%s", + name, + remote, + e.what(), + }; + } }