0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-09 19:41:22 +01:00
construct/modules/client/sync.cc

1163 lines
25 KiB
C++

// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
namespace ircd::m::sync
{
struct args;
struct stats;
struct data;
struct response;
static const_buffer flush(data &, resource::response::chunked &, const const_buffer &);
static void empty_response(data &, const uint64_t &next_batch);
static bool linear_handle(data &);
static bool polylog_handle(data &);
static bool longpoll_handle(data &);
static resource::response handle_get(client &, const resource::request &);
extern conf::item<size_t> flush_hiwat;
extern conf::item<size_t> buffer_size;
extern conf::item<size_t> linear_buffer_size;
extern conf::item<size_t> linear_delta_max;
extern conf::item<bool> longpoll_enable;
extern conf::item<bool> polylog_phased;
extern conf::item<bool> polylog_only;
extern resource::method method_get;
extern const string_view description;
extern resource resource;
}
namespace ircd::m::sync::longpoll
{
static void fini() noexcept;
}
#include "sync/args.h"
ircd::mapi::header
IRCD_MODULE
{
"Client 6.2.1 :Sync", nullptr, []
{
ircd::m::sync::longpoll::fini();
}
};
decltype(ircd::m::sync::resource)
ircd::m::sync::resource
{
"/_matrix/client/r0/sync",
{
description
}
};
decltype(ircd::m::sync::description)
ircd::m::sync::description
{R"(6.2.1
Synchronise the client's state with the latest state on the server. Clients
use this API when they first log in to get an initial snapshot of the state
on the server, and then continue to call this API to get incremental deltas
to the state, and to receive new messages.
)"};
const auto linear_delta_max_help
{R"(
Maximum number of events to scan sequentially for a /sync. This determines
whether linear-sync or polylog-sync mode is used to satisfy the request. If
the difference between the since token (lower-bound) and the upper-bound of
the sync is within this value, the linear-sync mode is used. If it is more
than this value a polylog-sync mode is used. The latter is used because at
some threshold it becomes too expensive to scan a huge number of events to
grab only those that the client requires; it is cheaper to conduct a series
of random-access queries with polylog-sync instead. Note the exclusive
upper-bound of a sync is determined either by a non-spec query parameter
'next_batch' or the vm::sequence::retired+1.
)"};
const auto linear_buffer_size_help
{R"(
The size of the coalescing buffer used when conducting a linear-sync. During
the sequential scan of events, when an event is marked as required for the
client's sync it is stringified and appended to this buffer. The buffer has
the format of a json::vector of individual events. At the end of the linear
sync, the objects in this buffer are merged into a single spec /sync response.
When this buffer is full the linear sync must finish and respond to the client
with whatever it has. The event::idx of the last event that fit into the buffer
forms the basis for the next_batch so the client can continue with another linear
/sync to complete the range.
)"};
decltype(ircd::m::sync::flush_hiwat)
ircd::m::sync::flush_hiwat
{
{ "name", "ircd.client.sync.flush.hiwat" },
{ "default", long(64_KiB) },
};
decltype(ircd::m::sync::buffer_size)
ircd::m::sync::buffer_size
{
{ "name", "ircd.client.sync.buffer_size" },
{ "default", long(512_KiB) },
{ "help", "Response chunk buffer size" },
};
decltype(ircd::m::sync::linear_buffer_size)
ircd::m::sync::linear_buffer_size
{
{ "name", "ircd.client.sync.linear.buffer_size" },
{ "default", long(256_KiB) },
{ "help", linear_buffer_size_help },
};
decltype(ircd::m::sync::linear_delta_max)
ircd::m::sync::linear_delta_max
{
{ "name", "ircd.client.sync.linear.delta.max" },
{ "default", 1024 },
{ "help", linear_delta_max_help },
};
decltype(ircd::m::sync::polylog_phased)
ircd::m::sync::polylog_phased
{
{ "name", "ircd.client.sync.polylog.phased" },
{ "default", true },
};
decltype(ircd::m::sync::polylog_only)
ircd::m::sync::polylog_only
{
{ "name", "ircd.client.sync.polylog.only" },
{ "default", false },
};
decltype(ircd::m::sync::longpoll_enable)
ircd::m::sync::longpoll_enable
{
{ "name", "ircd.client.sync.longpoll.enable" },
{ "default", true },
};
//
// GET sync
//
decltype(ircd::m::sync::method_get)
ircd::m::sync::method_get
{
resource, "GET", handle_get,
{
method_get.REQUIRES_AUTH,
-1s,
}
};
ircd::resource::response
ircd::m::sync::handle_get(client &client,
const resource::request &request)
{
// Parse the request options
const args args
{
request
};
// The range to `/sync`. We involve events starting at the range.first
// index in this sync. We will not involve events with an index equal
// or greater than the range.second. In this case the range.second does not
// exist yet because it is one past the server's sequence::retired counter.
const m::events::range range
{
args.since, std::min(args.next_batch, m::vm::sequence::retired + 1)
};
// The phased initial sync feature uses negative since tokens.
const bool phased_range
{
int64_t(range.first) < 0L
};
// Check if the admin disabled phased sync.
if(!polylog_phased && phased_range)
throw m::NOT_FOUND
{
"Since parameter '%ld' must be >= 0.",
range.first,
};
// When the range indexes are the same, the client is polling for the next
// event which doesn't exist yet. There is no reason for the since parameter
// to be greater than that, unless it's a negative integer and phased
// sync is enabled
if(!polylog_phased || !phased_range)
if(range.first > range.second)
throw m::NOT_FOUND
{
"Since parameter '%lu' is too far in the future."
" Cannot be greater than '%lu'.",
range.first,
range.second
};
// Query and cache the device ID for the access token of this request on
// the stack here for this sync.
const device::id::buf device_id
{
device::access_token_to_id(request.access_token)
};
// Keep state for statistics of this sync here on the stack.
stats stats;
// The ubiquitous /sync data object is constructed on the stack here.
// This is the main state structure for the sync::item iteration which
// composes the /sync response.
data data
{
request.user_id,
range,
&client,
nullptr,
&stats,
&args,
device_id,
};
// Determine if this is an initial-sync request.
const bool initial_sync
{
range.first == 0UL
};
// Conditions for phased sync for this client
data.phased =
{
polylog_phased && args.phased &&
(
phased_range ||
(initial_sync && empty(args.since_token.second))
)
};
// Start the chunked encoded response.
resource::response::chunked response
{
client, http::OK, buffer_size
};
// Start the JSON stream for this response. As the sync items are iterated
// the supplied response buffer will be flushed out to the supplied
// callback; in this case, both are provided by the chunked encoding
// response. Each flush will create and send a chunk containing in-progress
// JSON. This will yield the ircd::ctx as this chunk is copied to the
// kernel's TCP buffer, providing flow control for the sync composition.
json::stack out
{
response.buf,
std::bind(sync::flush, std::ref(data), std::ref(response), ph::_1),
size_t(flush_hiwat)
};
data.out = &out;
log::debug
{
log, "request %s", loghead(data)
};
// Pre-determine if longpoll sync mode should be used. This may
// indicate false now but after conducting a linear or even polylog
// sync if we don't find any events for the client then we might
// longpoll later.
const bool should_longpoll
{
// longpoll can be disabled by a conf item (for developers).
longpoll_enable
// polylog-phased sync and longpoll are totally exclusive.
&& !data.phased
// initial_sync cannot hang on a longpoll otherwise bad things clients
&& !initial_sync
// When the since token is in advance of the vm sequence number
// there's no events to consider for a sync.
&& range.first > vm::sequence::retired
// Spec sez that when ?full_state=1 to return immediately, so
// that rules out longpoll
&& !args.full_state
};
// Determine if linear sync mode should be used. If this is not used, and
// longpoll mode is not used, then polylog mode must be used.
const bool should_linear
{
// There is a conf item (for developers) to force polylog mode.
!polylog_only
// polylog-phased sync and linear are totally exclusive.
&& !data.phased
// If longpoll was already determined there's no need for linear
&& !should_longpoll
// The primary condition for a linear sync is the number of events
// in the range being considered by the sync. That threshold is
// supplied by a conf item.
&& range.second - range.first <= size_t(linear_delta_max)
// When the semaphore query param is set we don't need linear mode.
&& !args.semaphore
// When full_state is requested we skip to polylog sync because those
// handlers are best suited for syncing a full room state.
&& !args.full_state
};
// Determine if polylog sync mode should be used.
const bool should_polylog
{
// Polylog mode is only used when neither of the other two modes
// are determined.
!should_longpoll && !should_linear
// When the semaphore query param is set we don't need polylog mode.
&& !args.semaphore
};
// Determine if an empty sync response should be returned to the user.
// This is done by actually performing the sync operation based on the
// mode decided. The return value from the operation will be false if
// no output was generated by the sync operation, indicating we should
// finally send an empty response.
const bool complete
{
should_linear?
linear_handle(data):
should_polylog?
polylog_handle(data):
false
};
if(complete)
return std::move(response);
if(longpoll_handle(data))
return std::move(response);
const auto &next_batch
{
polylog_only?
data.range.first:
data.range.second
};
// A user-timeout occurred. According to the spec we return a
// 200 with empty fields rather than a 408.
empty_response(data, next_batch);
return std::move(response);
}
void
ircd::m::sync::empty_response(data &data,
const uint64_t &next_batch)
{
json::stack::object top
{
*data.out
};
// Empty objects added to output otherwise Riot b0rks.
json::stack::object
{
top, "rooms"
};
json::stack::object
{
top, "presence"
};
char buf[64];
json::stack::member
{
top, "next_batch", json::value
{
make_since(buf, next_batch), json::STRING
}
};
log::debug
{
log, "request %s timeout @%lu",
loghead(data),
next_batch
};
}
ircd::const_buffer
ircd::m::sync::flush(data &data,
resource::response::chunked &response,
const const_buffer &buffer)
{
assert(size(buffer) <= size(response.buf));
const auto wrote
{
response.flush(buffer)
};
assert(size(wrote) <= size(buffer));
if(data.stats)
{
data.stats->flush_bytes += size(wrote);
data.stats->flush_count++;
}
return wrote;
}
// polylog
//
// Random access approach for large `since` ranges. The /sync schema itself is
// recursed. For every component in the schema, the handler seeks the events
// appropriate for the user and appends it to the output. Concretely, this
// involves a full iteration of the rooms a user is a member of, and a full
// iteration of the presence status for all users visible to a user, etc.
//
// This entire process occurs in a single pass. The schema is traced with
// json::stack and its buffer is flushed to the client periodically with
// chunked encoding.
bool
ircd::m::sync::polylog_handle(data &data)
try
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object top
{
*data.out
};
bool ret{false};
m::sync::for_each(string_view{}, [&data, &ret]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object object
{
*data.out, item.member_name()
};
if(item.polylog(data))
{
ret = true;
data.out->invalidate_checkpoints();
}
else checkpoint.decommit();
return true;
});
if(ret)
{
const int64_t next_batch
{
data.phased?
int64_t(data.range.first) - 1L:
int64_t(data.range.second)
};
char buf[64];
assert(!data.phased || next_batch >= 0UL);
const string_view &next_batch_token
{
// The polylog phased since token. We pack two numbers separted by a '_'
// character which cannot be urlencoded atm. The first is the usual
// since token integer, which is negative for phased initial sync. The
// second part is the next_batch upper-bound integer which is a snapshot
// of the server's sequence number when the phased sync started.
data.phased?
make_since(buf, m::events::range{uint64_t(next_batch), data.range.second}):
// The normal integer since token.
make_since(buf, next_batch)
};
json::stack::member
{
*data.out, "next_batch", json::value
{
next_batch_token, json::STRING
}
};
}
if(!ret)
checkpoint.decommit();
if(!data.phased && stats_info) log::info
{
log, "request %s polylog commit:%b complete @%ld",
loghead(data),
ret,
data.phased?
data.range.first:
data.range.second
};
return ret;
}
catch(const std::exception &e)
{
log::error
{
log, "polylog %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
//
// linear
//
// Approach for small `since` ranges. The range of events is iterated and
// the event itself is presented to each handler in the schema. This also
// involves a json::stack trace of the schema so that if the handler determines
// the event is appropriate for syncing to the user the output buffer will
// contain a residue of a /sync response with a single event.
//
// After the iteration of events is complete we are left with several buffers
// of properly formatted individual /sync responses which we rewrite into a
// single response to overcome the inefficiency of request ping-pong under
// heavy load.
namespace ircd::m::sync
{
static bool linear_proffer_event_one(data &);
static size_t linear_proffer_event(data &, const mutable_buffer &);
static std::pair<event::idx, bool> linear_proffer(data &, window_buffer &);
}
bool
ircd::m::sync::linear_handle(data &data)
try
{
json::stack::checkpoint checkpoint
{
*data.out
};
json::stack::object top
{
*data.out
};
const unique_buffer<mutable_buffer> buf
{
// must be at least worst-case size of m::event plus some.
std::max(size_t(linear_buffer_size), size_t(128_KiB))
};
window_buffer wb{buf};
const auto &[last, completed]
{
linear_proffer(data, wb)
};
const json::vector vector
{
wb.completed()
};
const auto next
{
last && completed?
data.range.second:
last?
std::min(last + 1, data.range.second):
0UL
};
if(last)
{
char buf[64];
json::stack::member
{
top, "next_batch", json::value
{
make_since(buf, next), json::STRING
}
};
json::merge(top, vector);
}
else checkpoint.decommit();
log::debug
{
log, "request %s linear last:%lu %s@%lu events:%zu",
loghead(data),
last,
completed? "complete "_sv : string_view{},
next,
vector.size(),
};
return last;
}
catch(const std::exception &e)
{
log::error
{
log, "linear %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
/// Iterates the events in the data.range and creates a json::vector in
/// the supplied window_buffer. The return value is the event_idx of the
/// last event which fit in the buffer, or 0 of nothing was of interest
/// to our client in the event iteration.
std::pair<ircd::m::event::idx, bool>
ircd::m::sync::linear_proffer(data &data,
window_buffer &wb)
{
event::idx ret(0);
const auto closure{[&data, &wb, &ret]
(const m::event::idx &event_idx, const m::event &event)
{
const scope_restore their_event
{
data.event, &event
};
const scope_restore their_event_idx
{
data.event_idx, event_idx
};
wb([&data, &ret, &event_idx]
(const mutable_buffer &buf)
{
const auto consumed
{
linear_proffer_event(data, buf)
};
if(consumed)
ret = event_idx;
return consumed;
});
const bool enough_space_for_more
{
// The buffer must have at least this much more space
// to continue with the iteration. Otherwise if the next
// worst-case event does not fit, bad things.
wb.remaining() >= 68_KiB
};
return enough_space_for_more;
}};
const auto completed
{
m::events::for_each(data.range, closure)
};
return
{
ret, completed
};
}
/// Sets up a json::stack for the iteration of handlers for
/// one event.
size_t
ircd::m::sync::linear_proffer_event(data &data,
const mutable_buffer &buf)
{
json::stack out{buf};
const scope_restore their_out
{
data.out, &out
};
json::stack::object top
{
*data.out
};
const bool success
{
linear_proffer_event_one(data)
};
top.~object();
return success?
size(out.completed()):
0UL;
}
/// Generates a candidate /sync response for a single event by
/// iterating all of the handlers.
bool
ircd::m::sync::linear_proffer_event_one(data &data)
{
bool ret{false};
m::sync::for_each(string_view{}, [&data, &ret]
(item &item)
{
json::stack::checkpoint checkpoint
{
*data.out
};
if(item.linear(data))
ret = true;
else
checkpoint.rollback();
return true;
});
return ret;
}
//
// longpoll
//
namespace ircd::m::sync::longpoll
{
static bool polled(data &, const args &);
static int poll(data &);
static void handle_notify(const m::event &, m::vm::eval &);
static void fini() noexcept;
extern m::hookfn<m::vm::eval &> notified;
extern ctx::dock dock;
}
decltype(ircd::m::sync::longpoll::dock)
ircd::m::sync::longpoll::dock;
decltype(ircd::m::sync::longpoll::notified)
ircd::m::sync::longpoll::notified
{
handle_notify,
{
{ "_site", "vm.notify" },
}
};
void
ircd::m::sync::longpoll::fini()
noexcept
{
if(!dock.empty())
log::warning
{
log, "Interrupting %zu longpolling clients...",
dock.size(),
};
interrupt(dock);
}
void
ircd::m::sync::longpoll::handle_notify(const m::event &event,
m::vm::eval &eval)
try
{
assert(eval.opts);
if(!eval.opts->notify_clients)
return;
dock.notify_all();
}
catch(const ctx::interrupted &)
{
throw;
}
catch(const std::exception &e)
{
log::critical
{
log, "request %s longpoll notify :%s",
loghead(eval),
e.what(),
};
}
/// Longpolling blocks the client's request until a relevant event is processed
/// by the m::vm. If no event is processed by a timeout this returns false.
bool
ircd::m::sync::longpoll_handle(data &data)
try
{
int ret;
while((ret = longpoll::poll(data)) == -1)
{
// When the client explicitly gives a next_batch token we have to
// adhere to it and return an empty response before going past their
// desired upper-bound for this /sync.
if(data.args->next_batch_token)
if(data.range.first >= data.range.second || data.range.second >= vm::sequence::retired)
return false;
++data.range.second;
assert(data.range.first <= data.range.second);
}
return ret;
}
catch(const std::system_error &e)
{
log::derror
{
log, "longpoll %s failed :%s",
loghead(data),
e.what()
};
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "longpoll %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
/// When the vm's sequence number is incremented our dock is notified and the
/// event at that next sequence number is fetched. That event gets proffered
/// around the linear sync handlers for whether it's relevant to the user
/// making the request on this stack.
///
/// If relevant, we respond immediately with that one event and finish the
/// request right there, providing them the next since token of one-past the
/// event_idx that was just synchronized.
///
/// If not relevant, we send nothing and continue checking events that come
/// through until the timeout. This will be an empty response providing the
/// client with the next since token of one past where we left off (vm's
/// current sequence number) to start the next /sync.
///
/// @returns
/// - true if a relevant event was hit and output to the client. If so, this
/// request is finished and nothing else can be sent to the client.
/// - false if a timeout occurred. Nothing was sent to the client so the
/// request must be finished upstack, or an exception may be thrown, etc.
/// - -1 to continue the polling loop when no relevant event was hit. Nothing
/// has been sent to the client yet here either.
///
int
ircd::m::sync::longpoll::poll(data &data)
{
const auto ready{[&data]
{
assert(data.range.second <= m::vm::sequence::retired + 1);
return data.range.second <= m::vm::sequence::retired;
}};
assert(data.args);
if(!dock.wait_until(data.args->timesout, ready))
return false;
// Check if client went away while we were sleeping,
// if so, just returning true is the easiest way out w/o throwing
assert(data.client && data.client->sock);
if(unlikely(!data.client || !data.client->sock))
return true;
// slightly more involved check of the socket before
// we waste resources on the operation; throws.
const auto &client(*data.client);
net::check(*client.sock);
// Keep in mind if the handler returns true that means
// it made a hit and we can return true to exit longpoll
// and end the request cleanly.
if(polled(data, *data.args))
return true;
return -1;
}
/// Evaluate the event indexed by data.range.second (the upper-bound). The
/// sync system sees a data.range window of [since, U] where U is a counter
/// that starts at the `vm::sequence::retired` event_idx
bool
ircd::m::sync::longpoll::polled(data &data,
const args &args)
{
const m::event::fetch event
{
data.range.second, std::nothrow
};
if(!event.valid)
return false;
const scope_restore their_event
{
data.event, &event
};
const scope_restore their_event_idx
{
data.event_idx, event.event_idx
};
const unique_buffer<mutable_buffer> scratch
{
128_KiB
};
const size_t consumed
{
linear_proffer_event(data, scratch)
};
if(!consumed)
return false;
// In semaphore-mode we're just here to ride the longpoll's blocking
// behavior. We want the client to get an empty response.
if(args.semaphore)
return false;
const json::vector vector
{
string_view
{
buffer::data(scratch), consumed
}
};
json::stack::object top
{
*data.out
};
json::merge(top, vector);
const auto next
{
data.event_idx?
std::min(data.event_idx + 1, vm::sequence::retired + 1):
std::min(data.range.second + 1, vm::sequence::retired + 1)
};
char since_buf[64];
json::stack::member
{
top, "next_batch", json::value
{
make_since(since_buf, next), json::STRING
}
};
log::debug
{
log, "request %s longpoll hit:%lu consumed:%zu complete @%lu",
loghead(data),
event.event_idx,
consumed,
next
};
return true;
}
//
// data
//
ircd::m::sync::data::data(const m::user &user,
const m::events::range &range,
ircd::client *const &client,
json::stack *const &out,
sync::stats *const &stats,
const sync::args *const &args,
const device::id &device_id)
:range
{
range
}
,stats
{
stats
}
,client
{
client
}
,args
{
args
}
,user
{
user
}
,user_room
{
user
}
,user_state
{
user_room
}
,user_rooms
{
user
}
,filter_buf
{
this->args?
m::filter::get(this->args->filter_id, user):
std::string{}
}
,filter
{
json::object{filter_buf}
}
,device_id
{
device_id
}
,out
{
out
}
{
}
ircd::m::sync::data::~data()
noexcept
{
}
//
// sync/args.h
//
ircd::conf::item<ircd::milliseconds>
ircd::m::sync::args::timeout_max
{
{ "name", "ircd.client.sync.timeout.max" },
{ "default", 180 * 1000L },
};
ircd::conf::item<ircd::milliseconds>
ircd::m::sync::args::timeout_min
{
{ "name", "ircd.client.sync.timeout.min" },
{ "default", 15 * 1000L },
};
ircd::conf::item<ircd::milliseconds>
ircd::m::sync::args::timeout_default
{
{ "name", "ircd.client.sync.timeout.default" },
{ "default", 90 * 1000L },
};
//
// args::args
//
ircd::m::sync::args::args(const resource::request &request)
try
:filter_id
{
request.query["filter"]
}
,since_token
{
split(lstrip(request.query.get("since", "0"_sv), "ctor_"), '_')
}
,since
{
lex_cast<uint64_t>(since_token.first)
}
,next_batch_token
{
request.query.get("next_batch", since_token.second)
}
,next_batch
{
uint64_t(lex_cast<uint64_t>(next_batch_token?: "-1"_sv))
}
,timesout
{
ircd::now<system_point>() + std::clamp
(
request.query.get("timeout", milliseconds(timeout_default)),
milliseconds(timeout_min),
milliseconds(timeout_max)
)
}
,full_state
{
request.query.get("full_state", false)
}
,set_presence
{
request.query.get("set_presence", true)
}
,phased
{
request.query.get("phased", true)
}
,semaphore
{
request.query.get("semaphore", false)
}
{
}
catch(const bad_lex_cast &e)
{
throw m::BAD_REQUEST
{
"Since parameter invalid :%s", e.what()
};
}