0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-02 10:54:16 +01:00

modules/client/sync: Checkpoint scaffold non-initial sync w/ initial linkage.

This commit is contained in:
Jason Volk 2018-04-10 15:20:47 -07:00
parent b244959c35
commit acf8cf5ae6
2 changed files with 438 additions and 391 deletions

View file

@ -8,9 +8,38 @@
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
using namespace ircd;
#include "sync.int.h"
const auto sync_description
mapi::header
IRCD_MODULE
{
"Client 6.2.1 :Sync",
nullptr,
on_unload
};
void
on_unload()
{
synchronizer.interrupt();
synchronizer.join();
}
//
// sync resource
//
resource
sync_resource
{
"/_matrix/client/r0/sync",
{
sync_description
}
};
const string_view
sync_description
{R"(
6.2.1
@ -22,106 +51,135 @@ to the state, and to receive new messages.
)"};
resource sync_resource
//
// GET sync
//
resource::method
get_sync
{
"/_matrix/client/r0/sync",
sync_resource, "GET", sync,
{
sync_description
get_sync.REQUIRES_AUTH
}
};
void longpoll(client &client, const resource::request &request, const steady_point &timeout);
void synchronizer_worker();
ircd::context synchronizer_context
m::import<resource::response (client &, const resource::request &)>
initialsync
{
"synchronizer",
256_KiB,
&synchronizer_worker,
ircd::context::POST,
"client_initialsync", "initialsync"
};
void synchronizer_timeout_worker();
ircd::context synchronizer_timeout_context
{
"synchronizer",
256_KiB,
&synchronizer_timeout_worker,
ircd::context::POST,
};
const auto on_unload{[]
{
synchronizer_context.interrupt();
synchronizer_timeout_context.interrupt();
synchronizer_context.join();
synchronizer_timeout_context.join();
}};
mapi::header
IRCD_MODULE
{
"Client 6.2.1 :Sync",
nullptr,
on_unload
};
resource::response
initial_sync(client &client,
const resource::request &request,
const string_view &filter_id,
const bool &full_state,
const string_view &set_presence);
resource::response
sync(client &client,
const resource::request &request)
{
const syncargs args
{
request
};
return args.since?
since_sync(client, request, args):
initialsync(client, request);
}
conf::item<milliseconds>
sync_timeout_max
{
{ "name", "ircd.client.sync.timeout.max" },
{ "default", 305 * 1000L },
};
conf::item<milliseconds>
sync_timeout_min
{
{ "name", "ircd.client.sync.timeout.min" },
{ "default", 15 * 1000L },
};
conf::item<milliseconds>
sync_timeout_default
{
{ "name", "ircd.client.sync.timeout.default" },
{ "default", 180 * 1000L },
};
syncargs::syncargs(const resource::request &request)
:filter_id
{
// 6.2.1 The ID of a filter created using the filter API or a filter JSON object
// encoded as a string. The server will detect whether it is an ID or a JSON object
// by whether the first character is a "{" open brace. Passing the JSON inline is best
// suited to one off requests. Creating a filter using the filter API is recommended
// for clients that reuse the same filter multiple times, for example in long poll requests.
const auto filter_id
{
request.query["filter"]
};
request.query["filter"]
}
,since
{
// 6.2.1 A point in time to continue a sync from.
const auto since
{
request.query["since"]
};
request.query["since"]
}
,timesout{[&request]
{
// 6.2.1 The maximum time to poll in milliseconds before returning this request.
auto ret(request.query.get("timeout", milliseconds(sync_timeout_default)));
ret = std::min(ret, milliseconds(sync_timeout_max));
ret = std::max(ret, milliseconds(sync_timeout_min));
return now<steady_point>() + ret;
}()}
,full_state
{
// 6.2.1 Controls whether to include the full state for all rooms the user is a member of.
// If this is set to true, then all state events will be returned, even if since is non-empty.
// The timeline will still be limited by the since parameter. In this case, the timeout
// parameter will be ignored and the query will return immediately, possibly with an
// empty timeline. If false, and since is non-empty, only state which has changed since
// the point indicated by since will be returned. By default, this is false.
const bool full_state
{
request.query["full_state"] == "true"
};
request.query.get("full_state", false)
}
,set_presence
{
// 6.2.1 Controls whether the client is automatically marked as online by polling this API.
// If this parameter is omitted then the client is automatically marked as online when it
// uses this API. Otherwise if the parameter is set to "offline" then the client is not
// marked as being online when it uses this API. One of: ["offline"]
const string_view set_presence
request.query.get("set_presence", true)
}
{
}
resource::response
since_sync(client &client,
const resource::request &request,
const syncargs &args)
{
const m::user::room user_room
{
request.query["set_presence"]
request.user_id
};
// Start a new spool for client
if(!since)
return initial_sync(client, request, filter_id, full_state, set_presence);
const int64_t sequence
{
get_sequence(request, args, user_room)
};
// Can dump pending?
//return shortpoll_sync(client, request, args);
// Can't dump pending
return longpoll_sync(client, request, args);
}
int64_t
get_sequence(const resource::request &request,
const syncargs &args,
const m::room &user_room)
try
{
// The ircd.tape.head
int64_t sequence{0};
//TODO: user's room
if(!m::user::tokens.get(std::nothrow, "ircd.tape.head"_sv, request.access_token, [&sequence]
user_room.get("ircd.tape.head", request.access_token, [&sequence]
(const m::event &event)
{
const json::object &content
@ -130,194 +188,200 @@ sync(client &client,
};
sequence = content.at<int64_t>("sequence");
}))
throw m::NOT_FOUND{"since parameter invalid"};
});
// 6.2.1 The maximum time to poll in milliseconds before returning this request.
const int64_t timeout_requested
return sequence;
}
catch(const std::exception &e) //TODO: narrow
{
throw m::NOT_FOUND
{
request.query["timeout"]? lex_cast<int64_t>(request.query.at("timeout")) : 0L
"since parameter invalid :%s", e.what()
};
}
const auto timeout_at
{
now<steady_point>() +
milliseconds(timeout_requested? std::max(timeout_requested, 3000L) : 30000L)
};
longpoll(client, request, timeout_at);
// This handler returns no response. As long as this handler doesn't throw
// an exception IRCd will keep the client alive.
resource::response
shortpoll_sync(client &client,
const resource::request &request,
const syncargs &args)
{
return {};
}
resource::method get_sync
resource::response
longpoll_sync(client &client,
const resource::request &request,
const syncargs &args)
{
sync_resource, "GET", sync,
{
get_sync.REQUIRES_AUTH
}
};
/// Input
///
///
struct syncpoll
{
static std::list<syncpoll> polling;
static std::multimap<steady_point, decltype(polling)::iterator> pollout;
std::string user_id;
std::string since;
std::string access_token; // can get rid of this and use some session id
std::weak_ptr<ircd::client> client;
decltype(pollout)::iterator it { std::end(pollout) };
};
decltype(syncpoll::polling) syncpoll::polling {};
decltype(syncpoll::pollout) syncpoll::pollout {};
void
longpoll(client &client,
const resource::request &request,
const steady_point &timeout)
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
const auto it
{
polling.emplace(polling.end(), syncpoll
{
std::string{request.user_id},
std::string{request.query.at("since")},
std::string{request.access_token}, //TODO: nope.
weak_from(client)
})
};
syncpoll &data
{
*it
};
data.it = pollout.emplace(timeout, it);
if(pollout.size() == 1)
notify(synchronizer_timeout_context);
auto it(begin(polling));
for(; it != end(polling); ++it)
if(it->timesout > args.timesout)
break;
it = polling.emplace(it, client, request, args);
add(*it);
client.longpoll = true;
synchronizer_dock.notify_one();
return {};
}
//
// Timeout worker stack
longpoll::longpoll(ircd::client &client,
const resource::request &request,
const syncargs &args)
:client{weak_from(client)}
,timesout{args.timesout}
,user_id{request.user_id}
,since{args.since}
,access_token{request.access_token}
{
}
//
// Synchronizer worker
//
void synchronizer_timeout(const syncpoll &sp);
ircd::context
synchronizer
{
"synchronizer", 1_MiB, worker, context::POST
};
/// This function is the base of an ircd::context which yields until a client
/// is due to timeout. This worker reaps timed out clients from the lists.
void
synchronizer_timeout_worker()
worker()
try
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
while(1)
std::unique_lock<decltype(m::vm::accept)> lock
{
while(!pollout.empty())
m::vm::accept
};
while(1) try
{
auto &accepted
{
const auto &timeout{std::begin(pollout)->first};
const auto &iterator{std::begin(pollout)->second};
if(timeout > now<steady_point>())
{
ctx::wait_until<std::nothrow_t>(timeout);
continue;
}
//m::vm::accept.wait_for(lock, milliseconds(sync_timeout_min))
m::vm::accept.wait(lock)
};
const auto &data{*iterator};
synchronizer_timeout(data);
polling.erase(iterator);
pollout.erase(std::begin(pollout));
}
synchronize(accepted);
while(pollout.empty())
ctx::wait();
// Afterward we reap clients with errors or timed out.
errored_check();
timeout_check();
}
catch(const timeout &)
{
const ctx::exception_handler eh;
// We land here after sitting in synchronize() for too long without an
// event being seen. Because clients are polling we have to run a reap.
timeout_check();
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
"Synchronizer worker: %s", e.what()
};
}
}
catch(const ircd::ctx::interrupted &e)
catch(const ctx::interrupted &e)
{
ircd::log::debug("Synchronizer timeout worker interrupted");
log::debug
{
"Synchronizer worker interrupted"
};
return;
}
void
errored_check()
{
auto it(begin(polling));
while(it != end(polling))
{
auto &sp(*it);
if(sp.client.expired())
{
del(sp);
it = polling.erase(it);
}
else ++it;
}
}
void
timeout_check()
{
auto it(begin(polling));
if(it == end(polling))
return;
const auto now{ircd::now<steady_point>()}; do
{
auto &sp(*it);
if(sp.timesout > now)
return;
del(sp);
if(!sp.client.expired())
timedout(sp.client);
it = polling.erase(it);
}
while(it != end(polling));
}
///
/// TODO: The http error response should not yield this context. If the sendq
/// TODO: is backed up the client should be dc'ed.
void
synchronizer_timeout(const syncpoll &sp)
bool
timedout(const std::weak_ptr<client> &wp)
try
{
const life_guard<client> client
{
sp.client
};
const life_guard<client> client{wp};
client->longpoll = false;
resource::response
{
*client, http::REQUEST_TIMEOUT
};
client->longpoll = false;
client->async();
return client->async();
}
catch(const std::bad_weak_ptr &)
{
return false;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::error("synchronizer_timeout(): %s", e.what());
log::error
{
"synchronizer_timeout(): %s", e.what()
};
return false;
}
//
// Main worker stack
//
void
add(longpoll &sp)
{
bool update_sync(const syncpoll &data, const m::event &event, const m::room &);
void synchronize(const m::event &, const m::room::id &);
void synchronize(const m::event &);
}
void
synchronizer_worker()
del(longpoll &sp)
{
while(1) try
{
std::unique_lock<decltype(m::vm::accept)> lock
{
m::vm::accept
};
// reference to the event on the inserter's stack
const auto &event
{
m::vm::accept.wait(lock)
};
if(!syncpoll::polling.empty())
synchronize(event);
}
catch(const ircd::ctx::interrupted &e)
{
ircd::log::debug("Synchronizer worker interrupted");
return;
}
catch(const timeout &e)
{
ircd::log::debug("Synchronizer worker: %s", e.what());
}
catch(const std::exception &e)
{
ircd::log::error("Synchronizer worker: %s", e.what());
}
}
void
@ -333,37 +397,34 @@ synchronize(const m::event &event)
synchronize(event, room_id);
return;
}
assert(0);
}
void
synchronize(const m::event &event,
const m::room::id &room_id)
{
static auto &polling{syncpoll::polling};
static auto &pollout{syncpoll::pollout};
const m::room room
{
room_id
};
for(auto it(std::begin(polling)); it != std::end(polling);)
auto it(std::begin(polling));
while(it != std::end(polling))
{
const auto &data{*it};
auto &data{*it};
if(!room.membership(data.user_id))
{
++it;
continue;
}
if(update_sync(data, event, room))
if(!update_sync(data, event, room))
{
pollout.erase(data.it);
polling.erase(it++);
++it;
continue;
}
else ++it;
it = polling.erase(it);
}
}
@ -374,7 +435,7 @@ update_sync_room(client &client,
const m::event &event)
{
std::vector<std::string> state;
if(defined(json::get<"state_key"_>(event)))
if(defined(json::get<"event_id"_>(event)) && defined(json::get<"state_key"_>(event)))
state.emplace_back(json::strung(event));
const json::strung state_serial
@ -383,7 +444,7 @@ update_sync_room(client &client,
};
std::vector<std::string> timeline;
if(!defined(json::get<"state_key"_>(event)))
if(defined(json::get<"event_id"_>(event)) && !defined(json::get<"state_key"_>(event)))
timeline.emplace_back(json::strung(event));
const json::strung timeline_serial
@ -397,12 +458,25 @@ update_sync_room(client &client,
ephemeral.data(), ephemeral.data() + ephemeral.size()
};
const auto &prev_batch
{
!timeline.empty()?
unquote(json::object{timeline.front()}.get("event_id")):
string_view{}
};
//TODO: XXX
const bool limited
{
false
};
const json::members body
{
{ "account_data", json::members{} },
{ "unread_notifications",
{
{ "highlight_count", int64_t(0) },
{ "highlight_count", int64_t(0) },
{ "notification_count", int64_t(0) },
}},
{ "ephemeral",
@ -415,9 +489,9 @@ update_sync_room(client &client,
}},
{ "timeline",
{
{ "events", timeline_serial },
{ "prev_batch", int64_t(m::vm::current_sequence) }, //TODO: XXX
{ "limited", false }, //TODO: XXX
{ "events", timeline_serial },
{ "prev_batch", prev_batch },
{ "limited", limited },
}},
};
@ -431,7 +505,6 @@ update_sync_rooms(client &client,
const string_view &since,
const m::event &event)
{
std::vector<std::string> r[3];
std::vector<json::member> m[3];
r[0].emplace_back(update_sync_room(client, room, since, event));
@ -449,7 +522,7 @@ update_sync_rooms(client &client,
}
bool
update_sync(const syncpoll &data,
update_sync(const longpoll &data,
const m::event &event,
const m::room &room)
try
@ -459,19 +532,39 @@ try
data.client
};
if(!client->longpoll)
return true;
const auto rooms
{
update_sync_rooms(*client, data.user_id, room, data.since, event)
};
const auto presence
const m::user::room ur
{
"{}"
m::user::id{data.user_id}
};
const int64_t &next_batch
std::vector<json::value> presents;
ur.get(std::nothrow, "m.presence", [&]
(const m::event &event)
{
m::vm::current_sequence
const auto &content
{
at<"content"_>(event)
};
presents.emplace_back(event);
});
const json::members presence
{
{ "events", json::value { presents.data(), presents.size() } },
};
const auto &next_batch
{
int64_t(m::vm::current_sequence)
};
resource::response
@ -493,165 +586,48 @@ catch(const std::bad_weak_ptr &e)
return true;
}
std::string
initial_sync_room(client &client,
const resource::request &request,
const m::room &room,
const bool &full_state)
{
std::vector<std::string> state;
/*
if(!defined(json::get<"event_id"_>(event)))
{
const m::room::state state_
thread_local char foo[64_KiB];
mutable_buffer buf{foo};
m::event event_(event);
if(at<"type"_>(event) == "m.receipt")
{
room
};
state_.for_each([&state](const m::event &event)
{
state.emplace_back(json::strung(event));
});
}
const auto state_serial
{
json::strung(state.data(), state.data() + state.size())
};
std::vector<std::string> timeline;
{
const m::room::messages timeline_
{
room
};
timeline_.test([&timeline](const m::event &event)
{
timeline.emplace_back(json::strung(event));
return timeline.size() >= 10;
});
}
const auto timeline_serial
{
json::strung(timeline.data(), timeline.data() + timeline.size())
};
std::vector<std::string> ephemeral;
const json::strung ephemeral_serial
{
ephemeral.data(), ephemeral.data() + ephemeral.size()
};
const json::members body
{
{ "account_data", json::members{} },
{ "unread_notifications",
{
{ "highlight_count", int64_t(0) },
{ "notification_count", int64_t(0) },
}},
{ "ephemeral",
{
{ "events", ephemeral_serial },
}},
{ "state",
{
{ "events", state_serial }
}},
{ "timeline",
{
{ "events", timeline_serial },
{ "prev_batch", int64_t(m::vm::current_sequence) }, //TODO: XXX
{ "limited", false }, //TODO: XXX
}},
};
return json::strung(body);
}
std::string
initial_sync_rooms(client &client,
const resource::request &request,
const string_view &filter_id,
const bool &full_state)
{
m::user user{request.user_id};
const auto user_room_id{user.room_id()};
m::room::state user_state{user_room_id};
std::array<std::vector<std::string>, 3> r;
std::array<std::vector<json::member>, 3> m;
// Get the rooms the user is a joined member in by iterating the state
// events in the user's room.
user_state.for_each("join", [&r, &m, &client, &request, &full_state]
(const m::event &event)
{
const m::room::id &room_id{unquote(at<"state_key"_>(event))};
const auto i
{
//membership == "join"? 0:
//membership == "leave"? 1:
//membership == "invite"? 2:
//-1
0
};
r.at(i).emplace_back(initial_sync_room(client, request, room_id, full_state));
m.at(i).emplace_back(room_id, r.at(i).back());
});
const std::string join{json::strung(m[0].data(), m[0].data() + m[0].size())};
const std::string leave{json::strung(m[1].data(), m[1].data() + m[1].size())};
const std::string invite{json::strung(m[2].data(), m[2].data() + m[2].size())};
return json::strung(json::members
{
{ "join", join },
{ "leave", leave },
{ "invite", invite },
});
}
resource::response
initial_sync(client &client,
const resource::request &request,
const string_view &filter_id,
const bool &full_state,
const string_view &set_presence)
{
const std::string rooms
{
initial_sync_rooms(client, request, filter_id, full_state)
};
const auto presence
{
"{}"
};
const int64_t &next_batch
{
m::vm::current_sequence
};
const auto &state_key
{
request.access_token
};
//TODO: user's room
m::send(m::user::tokens, request.user_id, "ircd.tape.head", state_key,
{
{ "sequence", next_batch }
});
return resource::response
{
client, json::members
{
{ "next_batch", next_batch },
{ "rooms", rooms },
{ "presence", presence },
const auto &content{at<"content"_>(event)};
for(const auto &room : content)
{
const auto &room_id{unquote(room.first)};
const json::object &object{room.second};
for(const auto &ev : object)
{
const auto &type{unquote(ev.first)};
const json::object &users{ev.second};
for(const auto &user : users)
{
const auto &user_id{unquote(user.first)};
const json::object datas{user.second};
const json::object data{datas.get("data")};
const auto ts{data.get("ts")};
const auto event_id{unquote(data.get("event_id"))};
json::get<"content"_>(event_) = json::stringify(buf, json::members
{{
event_id, json::members
{{
type, json::members
{{
user_id, json::members
{{
"ts", ts
}}
}}
}}
}});
}
}
}
}
};
}
ephemeral.emplace_back(json::strung(event_));
}
*/

71
modules/client/sync.int.h Normal file
View file

@ -0,0 +1,71 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
using namespace ircd;
/// Argument parser for the client's /sync request
struct syncargs
{
string_view filter_id;
string_view since;
steady_point timesout;
bool full_state;
bool set_presence;
syncargs(const resource::request &);
};
/// State for a client conducting a longpoll /sync. This is used after a
/// session has caught up with initial-sync and shortpoll and is waiting
/// for the next event.
struct longpoll
{
std::weak_ptr<ircd::client> client;
steady_point timesout;
std::string user_id;
std::string since;
std::string access_token;
longpoll(ircd::client &, const resource::request &, const syncargs &);
};
struct shortpoll
{
};
static bool update_sync(const longpoll &data, const m::event &event, const m::room &);
static void synchronize(const m::event &, const m::room::id &);
static void synchronize(const m::event &);
static void synchronize();
std::list<longpoll> polling;
ctx::dock synchronizer_dock;
static void del(longpoll &);
static void add(longpoll &);
static bool timedout(const std::weak_ptr<client> &);
static void timeout_check();
static void errored_check();
static void worker();
extern ircd::context synchronizer;
static resource::response longpoll_sync(client &, const resource::request &, const syncargs &);
static resource::response shortpoll_sync(client &, const resource::request &, const syncargs &);
static int64_t get_sequence(const resource::request &, const syncargs &, const m::room &user_room);
static resource::response since_sync(client &, const resource::request &, const syncargs &);
static resource::response initial_sync(client &, const resource::request &, const syncargs &);
static resource::response sync(client &, const resource::request &);
extern resource::method get_sync;
extern const string_view sync_description;
extern resource sync_resource;
void on_unload();
extern mapi::header IRCD_MODULE;