0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-17 01:51:53 +01:00
construct/modules/client/sync.cc

621 lines
14 KiB
C++

/*
* Copyright (C) 2017 Charybdis Development Team
* Copyright (C) 2017 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
using namespace ircd;
const auto sync_description
{R"(
6.2.
Synchronise the client's state with the latest state on the server. Clients
use this API when they first log in to get an initial snapshot of the state
on the server, and then continue to call this API to get incremental deltas
to the state, and to receive new messages.
)"};
resource sync_resource
{
"/_matrix/client/r0/sync",
{
sync_description
}
};
void longpoll(client &client, const resource::request &request, const steady_point &timeout);
void synchronizer_worker();
ircd::context synchronizer_context
{
"synchronizer",
256_KiB,
&synchronizer_worker,
ircd::context::POST,
};
void synchronizer_timeout_worker();
ircd::context synchronizer_timeout_context
{
"synchronizer",
256_KiB,
&synchronizer_timeout_worker,
ircd::context::POST,
};
const auto on_unload{[]
{
synchronizer_context.interrupt();
synchronizer_timeout_context.interrupt();
synchronizer_context.join();
synchronizer_timeout_context.join();
}};
mapi::header IRCD_MODULE
{
"registers the resource 'client/sync' to handle requests.",
nullptr,
on_unload
};
resource::response
initial_sync(client &client,
const resource::request &request,
const string_view &filter_id,
const bool &full_state,
const string_view &set_presence);
resource::response
sync(client &client, const resource::request &request)
{
// 6.2.1 The ID of a filter created using the filter API or a filter JSON object
// encoded as a string. The server will detect whether it is an ID or a JSON object
// by whether the first character is a "{" open brace. Passing the JSON inline is best
// suited to one off requests. Creating a filter using the filter API is recommended
// for clients that reuse the same filter multiple times, for example in long poll requests.
const auto filter_id
{
request.query["filter"]
};
// 6.2.1 A point in time to continue a sync from.
const auto since
{
request.query["since"]
};
// 6.2.1 Controls whether to include the full state for all rooms the user is a member of.
// If this is set to true, then all state events will be returned, even if since is non-empty.
// The timeline will still be limited by the since parameter. In this case, the timeout
// parameter will be ignored and the query will return immediately, possibly with an
// empty timeline. If false, and since is non-empty, only state which has changed since
// the point indicated by since will be returned. By default, this is false.
const bool full_state
{
request.query["full_state"] == "true"
};
// 6.2.1 Controls whether the client is automatically marked as online by polling this API.
// If this parameter is omitted then the client is automatically marked as online when it
// uses this API. Otherwise if the parameter is set to "offline" then the client is not
// marked as being online when it uses this API. One of: ["offline"]
const string_view set_presence
{
request.query["set_presence"]
};
// Start a new spool for client
if(!since)
return initial_sync(client, request, filter_id, full_state, set_presence);
// The ircd.tape.head
const m::vm::query<m::vm::where::equal> query
{
{ "room_id", m::user::sessions.room_id },
{ "type", "ircd.tape.head" },
{ "state_key", request.query.at("access_token") },
};
int64_t sequence{0};
if(!m::vm::test(query, [&sequence](const auto &event)
{
const json::object &content
{
at<"content"_>(event)
};
sequence = content.at<int64_t>("sequence");
return true;
}))
throw m::NOT_FOUND{"since parameter invalid"};
// 6.2.1 The maximum time to poll in milliseconds before returning this request.
const auto timeout
{
request.query["timeout"]
};
const auto timeout_at
{
now<steady_point>() + milliseconds(std::max(timeout? lex_cast<int64_t>(timeout) : 30000L, 1000L))
};
longpoll(client, request, timeout_at);
// This handler returns no response. As long as this handler doesn't throw
// an exception IRCd will keep the client alive.
return {};
}
resource::method get_sync
{
sync_resource, "GET", sync,
{
get_sync.REQUIRES_AUTH
}
};
/// Input
///
///
struct syncpoll
{
static std::list<syncpoll> polling;
static std::multimap<steady_point, decltype(polling)::iterator> pollout;
std::string user_id;
std::string since;
std::string access_token; // can get rid of this and use some session id
std::weak_ptr<ircd::client> client;
decltype(pollout)::iterator it { std::end(pollout) };
};
decltype(syncpoll::polling) syncpoll::polling {};
decltype(syncpoll::pollout) syncpoll::pollout {};
void
longpoll(client &client,
const resource::request &request,
const steady_point &timeout)
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
const auto it
{
polling.emplace(polling.end(), syncpoll
{
std::string{request.user_id},
std::string{request.query.at("since")},
std::string{request.query.at("access_token")},
weak_from(client)
})
};
syncpoll &data
{
*it
};
data.it = pollout.emplace(timeout, it);
if(pollout.size() == 1)
notify(synchronizer_timeout_context);
}
//
// Timeout worker stack
//
void synchronizer_timeout(const syncpoll &sp);
/// This function is the base of an ircd::context which yields until a client
/// is due to timeout. This worker reaps timed out clients from the lists.
void
synchronizer_timeout_worker()
try
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
while(1)
{
while(!pollout.empty())
{
const auto &timeout{std::begin(pollout)->first};
const auto &iterator{std::begin(pollout)->second};
if(timeout > now<steady_point>())
{
ctx::wait_until<std::nothrow_t>(timeout);
continue;
}
const auto &data{*iterator};
synchronizer_timeout(data);
polling.erase(iterator);
pollout.erase(std::begin(pollout));
}
while(pollout.empty())
ctx::wait();
}
}
catch(const ircd::ctx::interrupted &e)
{
ircd::log::debug("Synchronizer timeout worker interrupted");
}
///
/// TODO: The http error response should not yield this context. If the sendq
/// TODO: is backed up the client should be dc'ed.
void
synchronizer_timeout(const syncpoll &sp)
try
{
const life_guard<client> client
{
sp.client
};
resource::response
{
*client, http::REQUEST_TIMEOUT
};
}
catch(const std::exception &e)
{
log::error("synchronizer_timeout(): %s", e.what());
}
//
// Main worker stack
//
bool update_sync(const syncpoll &data, const m::event &event, const m::room &);
void synchronize(const m::event &, const m::room::id &);
void synchronize(const m::event &);
void
synchronizer_worker()
{
while(1) try
{
std::unique_lock<decltype(m::vm::inserted)> lock
{
m::vm::inserted
};
// reference to the event on the inserter's stack
const auto &event
{
m::vm::inserted.wait(lock)
};
if(!syncpoll::polling.empty())
synchronize(event);
}
catch(const ircd::ctx::interrupted &e)
{
ircd::log::debug("Synchronizer worker interrupted");
return;
}
catch(const timeout &e)
{
ircd::log::debug("Synchronizer worker: %s", e.what());
}
catch(const std::exception &e)
{
ircd::log::error("Synchronizer worker: %s", e.what());
}
}
void
synchronize(const m::event &event)
{
const auto &room_id
{
json::get<"room_id"_>(event)
};
if(room_id)
{
synchronize(event, room_id);
return;
}
assert(0);
}
void
synchronize(const m::event &event,
const m::room::id &room_id)
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
const m::room room
{
room_id
};
for(auto it(std::begin(polling)); it != std::end(polling);)
{
const auto &data{*it};
if(!room.membership(data.user_id))
{
++it;
continue;
}
if(update_sync(data, event, room))
{
pollout.erase(data.it);
polling.erase(it++);
}
else ++it;
}
}
std::string
update_sync_room(client &client,
const m::room &room,
const string_view &since,
const m::event &event)
{
std::vector<std::string> state;
if(defined(json::get<"state_key"_>(event)))
state.emplace_back(json::strung(event));
const auto state_serial
{
json::strung(state.data(), state.data() + state.size())
};
std::vector<std::string> timeline;
if(!defined(json::get<"state_key"_>(event)))
timeline.emplace_back(json::strung(event));
const auto timeline_serial
{
json::strung(timeline.data(), timeline.data() + timeline.size())
};
const json::members body
{
{ "state", json::member { "events", state_serial } },
{ "timeline", json::member { "events", timeline_serial } }
};
return json::strung(body);
}
std::string
update_sync_rooms(client &client,
const m::user::id &user_id,
const m::room &room,
const string_view &since,
const m::event &event)
{
std::vector<std::string> r[3];
std::vector<json::member> m[3];
r[0].emplace_back(update_sync_room(client, room, since, event));
m[0].emplace_back(room.room_id, r[0].back());
const std::string join{json::strung(m[0].data(), m[0].data() + m[0].size())};
const std::string leave{json::strung(m[1].data(), m[1].data() + m[1].size())};
const std::string invite{json::strung(m[2].data(), m[2].data() + m[2].size())};
return json::strung(json::members
{
{ "join", join },
{ "leave", leave },
{ "invite", invite },
});
}
bool
update_sync(const syncpoll &data,
const m::event &event,
const m::room &room)
try
{
const life_guard<client> client
{
data.client
};
const auto rooms
{
update_sync_rooms(*client, data.user_id, room, data.since, event)
};
const auto presence
{
"{}"
};
const string_view next_batch
{
at<"event_id"_>(event)
};
resource::response
{
*client, json::members
{
{ "next_batch", next_batch },
{ "rooms", rooms },
{ "presence", presence }
}
};
return true;
}
catch(const std::bad_weak_ptr &e)
{
return true;
}
std::string
initial_sync_room(client &client,
const resource::request &request,
const m::room &room,
const bool &full_state)
{
std::vector<std::string> state;
{
const m::vm::query<m::vm::where::equal> state_query
{
{ "room_id", room.room_id },
};
m::vm::for_each(state_query, [&state](const auto &event)
{
if(!null(json::get<"state_key"_>(event)))
state.emplace_back(json::strung(event));
});
}
const auto state_serial
{
json::strung(state.data(), state.data() + state.size())
};
std::vector<std::string> timeline;
{
const m::vm::query<m::vm::where::equal> query
{
{ "room_id", room.room_id },
};
m::vm::test(query, [&timeline](const auto &event)
{
if(timeline.size() > 10)
return true;
if(null(json::get<"state_key"_>(event)))
timeline.emplace_back(json::strung(event));
return false;
});
}
const auto timeline_serial
{
json::strung(timeline.data(), timeline.data() + timeline.size())
};
const json::members body
{
{ "state", json::member { "events", state_serial } },
{ "timeline", json::member { "events", timeline_serial } }
};
return json::strung(body);
}
std::string
initial_sync_rooms(client &client,
const resource::request &request,
const string_view &filter_id,
const bool &full_state)
{
const m::vm::query<m::vm::where::equal> query
{
{ "type", "m.room.member" },
{ "state_key", request.user_id },
};
std::array<std::vector<std::string>, 3> r;
std::array<std::vector<json::member>, 3> m;
m::vm::for_each(query, [&r, &m, &client, &request, &full_state](const auto &event)
{
const auto &content{json::get<"content"_>(event)};
const auto &membership{unquote(content["membership"])};
const m::room::id &room_id{json::get<"room_id"_>(event)};
const auto i
{
membership == "join"? 0:
membership == "leave"? 1:
membership == "invite"? 2:
-1
};
r.at(i).emplace_back(initial_sync_room(client, request, room_id, full_state));
m.at(i).emplace_back(room_id, r.at(i).back());
});
const std::string join{json::strung(m[0].data(), m[0].data() + m[0].size())};
const std::string leave{json::strung(m[1].data(), m[1].data() + m[1].size())};
const std::string invite{json::strung(m[2].data(), m[2].data() + m[2].size())};
return json::strung(json::members
{
{ "join", join },
{ "leave", leave },
{ "invite", invite },
});
}
resource::response
initial_sync(client &client,
const resource::request &request,
const string_view &filter_id,
const bool &full_state,
const string_view &set_presence)
{
const std::string rooms
{
initial_sync_rooms(client, request, filter_id, full_state)
};
const auto presence
{
"{}"
};
const int64_t &next_batch
{
m::vm::current_sequence
};
const auto state_key
{
request.query.at("access_token")
};
m::send(m::user::sessions, request.user_id, "ircd.tape.head", state_key,
{
{ "sequence", next_batch }
});
return resource::response
{
client, json::members
{
{ "next_batch", next_batch },
{ "rooms", rooms },
{ "presence", presence }
}
};
}