diff --git a/modules/client/sync.cc b/modules/client/sync.cc index cc921b4e1..319588b04 100644 --- a/modules/client/sync.cc +++ b/modules/client/sync.cc @@ -160,53 +160,148 @@ since_sync(client &client, request.user_id }; - const int64_t sequence - { - get_sequence(request, args, user_room) - }; - // Can dump pending? - //return shortpoll_sync(client, request, args); + if(shortpoll_sync(client, request, args)) + return {}; // Can't dump pending return longpoll_sync(client, request, args); } -int64_t -get_sequence(const resource::request &request, - const syncargs &args, - const m::room &user_room) -try -{ - - int64_t sequence{0}; - user_room.get("ircd.tape.head", request.access_token, [&sequence] - (const m::event &event) - { - const json::object &content - { - at<"content"_>(event) - }; - - sequence = content.at("sequence"); - }); - - return sequence; -} -catch(const std::exception &e) //TODO: narrow -{ - throw m::NOT_FOUND - { - "since parameter invalid :%s", e.what() - }; -} - -resource::response +bool shortpoll_sync(client &client, const resource::request &request, const syncargs &args) + +try { - return {}; + const uint64_t _since + { + lex_cast(args.since) + }; + + uint64_t since + { + _since + }; + + std::map, std::less<>> r; + + m::vm::events::for_each(since, [&] + (const uint64_t &sequence, const m::event &event) + { + if(!r.empty() && (since - _since > 128)) + return false; + + since = sequence; + if(!json::get<"room_id"_>(event)) + return true; + + const m::room room + { + json::get<"room_id"_>(event) + }; + + if(!room.membership(request.user_id)) + return true; + + auto it + { + r.lower_bound(room.room_id) + }; + + if(it == end(r) || it->first != room.room_id) + it = r.emplace_hint(it, std::string{room.room_id}, std::vector{}); + + it->second.emplace_back(json::strung{event}); + return true; + }); + + if(r.empty()) + return false; + + std::vector joins; + + for(auto &p : r) + { + const auto &room_id{p.first}; + auto &vec{p.second}; + + std::vector timeline; + std::vector state; + std::vector ephemeral; + + for(std::string &event : vec) + if(json::object{event}.has("state_key")) + state.emplace_back(std::move(event)); + else if(!json::object{event}.has("prev_events")) + ephemeral.emplace_back(std::move(event)); + else + timeline.emplace_back(std::move(event)); + + const json::strung timeline_serial{timeline.data(), timeline.data() + timeline.size()}; + const json::strung state_serial{state.data(), state.data() + state.size()}; + const json::strung ephemeral_serial{ephemeral.data(), ephemeral.data() + ephemeral.size()}; + + const string_view prev_batch + { + !timeline.empty()? + unquote(json::object{timeline.front()}.at("event_id")): + string_view{} + }; + + const json::members body + { + { "ephemeral", + { + { "events", ephemeral_serial }, + }}, + { "state", + { + { "events", state_serial } + }}, + { "timeline", + { + { "events", timeline_serial }, + { "prev_batch", prev_batch }, + { "limited", false }, + }}, + }; + + joins.emplace_back(room_id, body); + }; + + const json::value join + { + joins.data(), joins.size() + }; + + const json::members rooms + { + { "join", join }, + { "leave", json::object{} }, + { "invite", json::object{} }, + }; + + resource::response + { + client, json::members + { + { "next_batch", int64_t(since) }, + { "rooms", rooms }, + { "presence", json::object{} }, + } + }; + + return true; +} +catch(const bad_lex_cast &e) +{ + throw m::BAD_REQUEST + { + "Since parameter invalid :%s", e.what() + }; + } resource::response @@ -348,9 +443,19 @@ try const life_guard client{wp}; client->longpoll = false; + const auto &next_batch + { + int64_t(m::vm::current_sequence) + }; + resource::response { - *client, http::REQUEST_TIMEOUT + *client, json::members + { + { "next_batch", next_batch }, + { "rooms", json::object{} }, + { "presence", json::object{} }, + } }; return client->async(); diff --git a/modules/client/sync.int.h b/modules/client/sync.int.h index 935bf8252..ee91cf3ef 100644 --- a/modules/client/sync.int.h +++ b/modules/client/sync.int.h @@ -57,8 +57,7 @@ static void worker(); extern ircd::context synchronizer; static resource::response longpoll_sync(client &, const resource::request &, const syncargs &); -static resource::response shortpoll_sync(client &, const resource::request &, const syncargs &); -static int64_t get_sequence(const resource::request &, const syncargs &, const m::room &user_room); +static bool shortpoll_sync(client &, const resource::request &, const syncargs &); static resource::response since_sync(client &, const resource::request &, const syncargs &); static resource::response initial_sync(client &, const resource::request &, const syncargs &); static resource::response sync(client &, const resource::request &);