diff --git a/modules/federation/get_missing_events.cc b/modules/federation/get_missing_events.cc index cadbac5d4..d17a53a21 100644 --- a/modules/federation/get_missing_events.cc +++ b/modules/federation/get_missing_events.cc @@ -46,7 +46,7 @@ method_post } }; -conf::item +conf::item max_limit { { "name", "ircd.federation.missing_events.max_limit" }, @@ -54,10 +54,10 @@ max_limit }; conf::item -max_goose +flush_hiwat { - { "name", "ircd.federation.missing_events.max_goose" }, - { "default", 512L } + { "name", "ircd.federation.missing_events.flush.hiwat" }, + { "default", long(16_KiB) }, }; resource::response @@ -75,11 +75,11 @@ get__missing_events(client &client, url::decode(request.parv[0], room_id) }; - const auto limit + ssize_t limit { request["limit"]? - std::min(lex_cast(request["limit"]), size_t(max_limit)): - size_t(max_limit) + std::min(lex_cast(request["limit"]), ssize_t(max_limit)): + ssize_t(10) // default limit (protocol spec) }; const auto min_depth @@ -99,55 +99,88 @@ get__missing_events(client &client, request["latest_events"] }; - const auto in_latest + const auto in_earliest{[&earliest](const auto &event_id) { - [&latest](const auto &event_id) + return end(earliest) != std::find_if(begin(earliest), end(earliest), [&event_id] + (const auto &event_id_) { - return end(latest) != std::find_if(begin(latest), end(latest), [&event_id] - (const auto &latest) - { - return event_id == unquote(latest); - }); - } + return event_id == unquote(event_id_); + }); + }}; + + const unique_buffer buf + { + 96_KiB }; - std::vector ret; - ret.reserve(limit); - size_t goose{0}; - for(const auto &event_id : earliest) try + resource::response::chunked response { - m::room::messages it - { - room_id, unquote(event_id) - }; + client, http::OK + }; - for(; it && ret.size() < limit && goose < size_t(max_goose); ++it, ++goose) - { - const m::event &event{*it}; - if(!visible(event, request.node_id)) - continue; + const auto flush{[&response] + (const const_buffer &buf) + { + response.write(buf); + return buf; + }}; - ret.emplace_back(json::strung{event}); - if(in_latest(at<"event_id"_>(event))) + json::stack out + { + buf, flush, size_t(flush_hiwat) + }; + + json::stack::object top{out}; + json::stack::member events_m + { + top, "events" + }; + + json::stack::array events + { + events_m + }; + + std::deque queue; + const auto add_queue{[&limit, &queue, &in_earliest] + (const m::event::id &event_id) -> bool + { + if(in_earliest(event_id)) + return true; + + if(end(queue) != std::find(begin(queue), end(queue), event_id)) + return true; + + if(--limit < 0) + return false; + + queue.emplace_back(std::string{event_id}); + return true; + }}; + + for(const auto &event_id : latest) + add_queue(unquote(event_id)); + + m::event::fetch event; + while(!queue.empty()) + { + const auto &event_id{queue.front()}; + const unwind pop{[&queue] + { + queue.pop_front(); + }}; + + if(!seek(event, event_id, std::nothrow)) + continue; + + if(!visible(event, request.node_id)) + continue; + + events.append(event); + for(const json::array &prev : json::get<"prev_events"_>(event)) + if(!add_queue(unquote(prev.at(0)))) break; - } - } - catch(const std::exception &e) - { - log::derror - { - "Request from %s for earliest missing %s :%s", - string(remote(client)), - unquote(event_id), - e.what() - }; } - return resource::response - { - client, json::members - { - { "events", json::strung { ret.data(), ret.data() + ret.size() } } - } - }; + return response; }