mirror of
https://github.com/matrix-construct/construct
synced 2024-11-29 02:02:38 +01:00
modules/federation/get_missing_events: Update get_missing_events per spec; and chunked stream.
This commit is contained in:
parent
d7ca00c4a1
commit
6c71f43d1b
1 changed files with 81 additions and 48 deletions
|
@ -46,7 +46,7 @@ method_post
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
conf::item<size_t>
|
conf::item<ssize_t>
|
||||||
max_limit
|
max_limit
|
||||||
{
|
{
|
||||||
{ "name", "ircd.federation.missing_events.max_limit" },
|
{ "name", "ircd.federation.missing_events.max_limit" },
|
||||||
|
@ -54,10 +54,10 @@ max_limit
|
||||||
};
|
};
|
||||||
|
|
||||||
conf::item<size_t>
|
conf::item<size_t>
|
||||||
max_goose
|
flush_hiwat
|
||||||
{
|
{
|
||||||
{ "name", "ircd.federation.missing_events.max_goose" },
|
{ "name", "ircd.federation.missing_events.flush.hiwat" },
|
||||||
{ "default", 512L }
|
{ "default", long(16_KiB) },
|
||||||
};
|
};
|
||||||
|
|
||||||
resource::response
|
resource::response
|
||||||
|
@ -75,11 +75,11 @@ get__missing_events(client &client,
|
||||||
url::decode(request.parv[0], room_id)
|
url::decode(request.parv[0], room_id)
|
||||||
};
|
};
|
||||||
|
|
||||||
const auto limit
|
ssize_t limit
|
||||||
{
|
{
|
||||||
request["limit"]?
|
request["limit"]?
|
||||||
std::min(lex_cast<size_t>(request["limit"]), size_t(max_limit)):
|
std::min(lex_cast<ssize_t>(request["limit"]), ssize_t(max_limit)):
|
||||||
size_t(max_limit)
|
ssize_t(10) // default limit (protocol spec)
|
||||||
};
|
};
|
||||||
|
|
||||||
const auto min_depth
|
const auto min_depth
|
||||||
|
@ -99,55 +99,88 @@ get__missing_events(client &client,
|
||||||
request["latest_events"]
|
request["latest_events"]
|
||||||
};
|
};
|
||||||
|
|
||||||
const auto in_latest
|
const auto in_earliest{[&earliest](const auto &event_id)
|
||||||
{
|
{
|
||||||
[&latest](const auto &event_id)
|
return end(earliest) != std::find_if(begin(earliest), end(earliest), [&event_id]
|
||||||
|
(const auto &event_id_)
|
||||||
{
|
{
|
||||||
return end(latest) != std::find_if(begin(latest), end(latest), [&event_id]
|
return event_id == unquote(event_id_);
|
||||||
(const auto &latest)
|
});
|
||||||
{
|
}};
|
||||||
return event_id == unquote(latest);
|
|
||||||
});
|
const unique_buffer<mutable_buffer> buf
|
||||||
}
|
{
|
||||||
|
96_KiB
|
||||||
};
|
};
|
||||||
|
|
||||||
std::vector<std::string> ret;
|
resource::response::chunked response
|
||||||
ret.reserve(limit);
|
|
||||||
size_t goose{0};
|
|
||||||
for(const auto &event_id : earliest) try
|
|
||||||
{
|
{
|
||||||
m::room::messages it
|
client, http::OK
|
||||||
{
|
};
|
||||||
room_id, unquote(event_id)
|
|
||||||
};
|
|
||||||
|
|
||||||
for(; it && ret.size() < limit && goose < size_t(max_goose); ++it, ++goose)
|
const auto flush{[&response]
|
||||||
{
|
(const const_buffer &buf)
|
||||||
const m::event &event{*it};
|
{
|
||||||
if(!visible(event, request.node_id))
|
response.write(buf);
|
||||||
continue;
|
return buf;
|
||||||
|
}};
|
||||||
|
|
||||||
ret.emplace_back(json::strung{event});
|
json::stack out
|
||||||
if(in_latest(at<"event_id"_>(event)))
|
{
|
||||||
|
buf, flush, size_t(flush_hiwat)
|
||||||
|
};
|
||||||
|
|
||||||
|
json::stack::object top{out};
|
||||||
|
json::stack::member events_m
|
||||||
|
{
|
||||||
|
top, "events"
|
||||||
|
};
|
||||||
|
|
||||||
|
json::stack::array events
|
||||||
|
{
|
||||||
|
events_m
|
||||||
|
};
|
||||||
|
|
||||||
|
std::deque<std::string> queue;
|
||||||
|
const auto add_queue{[&limit, &queue, &in_earliest]
|
||||||
|
(const m::event::id &event_id) -> bool
|
||||||
|
{
|
||||||
|
if(in_earliest(event_id))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if(end(queue) != std::find(begin(queue), end(queue), event_id))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if(--limit < 0)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
queue.emplace_back(std::string{event_id});
|
||||||
|
return true;
|
||||||
|
}};
|
||||||
|
|
||||||
|
for(const auto &event_id : latest)
|
||||||
|
add_queue(unquote(event_id));
|
||||||
|
|
||||||
|
m::event::fetch event;
|
||||||
|
while(!queue.empty())
|
||||||
|
{
|
||||||
|
const auto &event_id{queue.front()};
|
||||||
|
const unwind pop{[&queue]
|
||||||
|
{
|
||||||
|
queue.pop_front();
|
||||||
|
}};
|
||||||
|
|
||||||
|
if(!seek(event, event_id, std::nothrow))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(!visible(event, request.node_id))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
events.append(event);
|
||||||
|
for(const json::array &prev : json::get<"prev_events"_>(event))
|
||||||
|
if(!add_queue(unquote(prev.at(0))))
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
}
|
|
||||||
catch(const std::exception &e)
|
|
||||||
{
|
|
||||||
log::derror
|
|
||||||
{
|
|
||||||
"Request from %s for earliest missing %s :%s",
|
|
||||||
string(remote(client)),
|
|
||||||
unquote(event_id),
|
|
||||||
e.what()
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return resource::response
|
return response;
|
||||||
{
|
|
||||||
client, json::members
|
|
||||||
{
|
|
||||||
{ "events", json::strung { ret.data(), ret.data() + ret.size() } }
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue