mirror of
https://github.com/matrix-construct/construct
synced 2024-11-26 00:32:35 +01:00
modules/client/sync: Checkpoint preliminary stateless incremental sync exhibit.
This commit is contained in:
parent
bcd704826c
commit
00b46a8885
2 changed files with 144 additions and 40 deletions
|
@ -160,53 +160,148 @@ since_sync(client &client,
|
|||
request.user_id
|
||||
};
|
||||
|
||||
const int64_t sequence
|
||||
{
|
||||
get_sequence(request, args, user_room)
|
||||
};
|
||||
|
||||
// Can dump pending?
|
||||
//return shortpoll_sync(client, request, args);
|
||||
if(shortpoll_sync(client, request, args))
|
||||
return {};
|
||||
|
||||
// Can't dump pending
|
||||
return longpoll_sync(client, request, args);
|
||||
}
|
||||
|
||||
int64_t
|
||||
get_sequence(const resource::request &request,
|
||||
const syncargs &args,
|
||||
const m::room &user_room)
|
||||
try
|
||||
{
|
||||
|
||||
int64_t sequence{0};
|
||||
user_room.get("ircd.tape.head", request.access_token, [&sequence]
|
||||
(const m::event &event)
|
||||
{
|
||||
const json::object &content
|
||||
{
|
||||
at<"content"_>(event)
|
||||
};
|
||||
|
||||
sequence = content.at<int64_t>("sequence");
|
||||
});
|
||||
|
||||
return sequence;
|
||||
}
|
||||
catch(const std::exception &e) //TODO: narrow
|
||||
{
|
||||
throw m::NOT_FOUND
|
||||
{
|
||||
"since parameter invalid :%s", e.what()
|
||||
};
|
||||
}
|
||||
|
||||
resource::response
|
||||
bool
|
||||
shortpoll_sync(client &client,
|
||||
const resource::request &request,
|
||||
const syncargs &args)
|
||||
|
||||
try
|
||||
{
|
||||
return {};
|
||||
const uint64_t _since
|
||||
{
|
||||
lex_cast<uint64_t>(args.since)
|
||||
};
|
||||
|
||||
uint64_t since
|
||||
{
|
||||
_since
|
||||
};
|
||||
|
||||
std::map<std::string, std::vector<std::string>, std::less<>> r;
|
||||
|
||||
m::vm::events::for_each(since, [&]
|
||||
(const uint64_t &sequence, const m::event &event)
|
||||
{
|
||||
if(!r.empty() && (since - _since > 128))
|
||||
return false;
|
||||
|
||||
since = sequence;
|
||||
if(!json::get<"room_id"_>(event))
|
||||
return true;
|
||||
|
||||
const m::room room
|
||||
{
|
||||
json::get<"room_id"_>(event)
|
||||
};
|
||||
|
||||
if(!room.membership(request.user_id))
|
||||
return true;
|
||||
|
||||
auto it
|
||||
{
|
||||
r.lower_bound(room.room_id)
|
||||
};
|
||||
|
||||
if(it == end(r) || it->first != room.room_id)
|
||||
it = r.emplace_hint(it, std::string{room.room_id}, std::vector<std::string>{});
|
||||
|
||||
it->second.emplace_back(json::strung{event});
|
||||
return true;
|
||||
});
|
||||
|
||||
if(r.empty())
|
||||
return false;
|
||||
|
||||
std::vector<json::member> joins;
|
||||
|
||||
for(auto &p : r)
|
||||
{
|
||||
const auto &room_id{p.first};
|
||||
auto &vec{p.second};
|
||||
|
||||
std::vector<std::string> timeline;
|
||||
std::vector<std::string> state;
|
||||
std::vector<std::string> ephemeral;
|
||||
|
||||
for(std::string &event : vec)
|
||||
if(json::object{event}.has("state_key"))
|
||||
state.emplace_back(std::move(event));
|
||||
else if(!json::object{event}.has("prev_events"))
|
||||
ephemeral.emplace_back(std::move(event));
|
||||
else
|
||||
timeline.emplace_back(std::move(event));
|
||||
|
||||
const json::strung timeline_serial{timeline.data(), timeline.data() + timeline.size()};
|
||||
const json::strung state_serial{state.data(), state.data() + state.size()};
|
||||
const json::strung ephemeral_serial{ephemeral.data(), ephemeral.data() + ephemeral.size()};
|
||||
|
||||
const string_view prev_batch
|
||||
{
|
||||
!timeline.empty()?
|
||||
unquote(json::object{timeline.front()}.at("event_id")):
|
||||
string_view{}
|
||||
};
|
||||
|
||||
const json::members body
|
||||
{
|
||||
{ "ephemeral",
|
||||
{
|
||||
{ "events", ephemeral_serial },
|
||||
}},
|
||||
{ "state",
|
||||
{
|
||||
{ "events", state_serial }
|
||||
}},
|
||||
{ "timeline",
|
||||
{
|
||||
{ "events", timeline_serial },
|
||||
{ "prev_batch", prev_batch },
|
||||
{ "limited", false },
|
||||
}},
|
||||
};
|
||||
|
||||
joins.emplace_back(room_id, body);
|
||||
};
|
||||
|
||||
const json::value join
|
||||
{
|
||||
joins.data(), joins.size()
|
||||
};
|
||||
|
||||
const json::members rooms
|
||||
{
|
||||
{ "join", join },
|
||||
{ "leave", json::object{} },
|
||||
{ "invite", json::object{} },
|
||||
};
|
||||
|
||||
resource::response
|
||||
{
|
||||
client, json::members
|
||||
{
|
||||
{ "next_batch", int64_t(since) },
|
||||
{ "rooms", rooms },
|
||||
{ "presence", json::object{} },
|
||||
}
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
catch(const bad_lex_cast &e)
|
||||
{
|
||||
throw m::BAD_REQUEST
|
||||
{
|
||||
"Since parameter invalid :%s", e.what()
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
resource::response
|
||||
|
@ -348,9 +443,19 @@ try
|
|||
const life_guard<client> client{wp};
|
||||
client->longpoll = false;
|
||||
|
||||
const auto &next_batch
|
||||
{
|
||||
int64_t(m::vm::current_sequence)
|
||||
};
|
||||
|
||||
resource::response
|
||||
{
|
||||
*client, http::REQUEST_TIMEOUT
|
||||
*client, json::members
|
||||
{
|
||||
{ "next_batch", next_batch },
|
||||
{ "rooms", json::object{} },
|
||||
{ "presence", json::object{} },
|
||||
}
|
||||
};
|
||||
|
||||
return client->async();
|
||||
|
|
|
@ -57,8 +57,7 @@ static void worker();
|
|||
extern ircd::context synchronizer;
|
||||
|
||||
static resource::response longpoll_sync(client &, const resource::request &, const syncargs &);
|
||||
static resource::response shortpoll_sync(client &, const resource::request &, const syncargs &);
|
||||
static int64_t get_sequence(const resource::request &, const syncargs &, const m::room &user_room);
|
||||
static bool shortpoll_sync(client &, const resource::request &, const syncargs &);
|
||||
static resource::response since_sync(client &, const resource::request &, const syncargs &);
|
||||
static resource::response initial_sync(client &, const resource::request &, const syncargs &);
|
||||
static resource::response sync(client &, const resource::request &);
|
||||
|
|
Loading…
Reference in a new issue