0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-16 15:00:51 +01:00

modules/client/sync: Various cleanup / simplify.

This commit is contained in:
Jason Volk 2019-01-10 13:19:07 -08:00
parent b15d3b929f
commit 4c495e1f44
13 changed files with 326 additions and 345 deletions

View file

@ -22,7 +22,6 @@ namespace ircd::m::sync
struct stats; struct stats;
struct data; struct data;
struct item; struct item;
struct response;
using item_closure = std::function<void (item &)>; using item_closure = std::function<void (item &)>;
using item_closure_bool = std::function<bool (item &)>; using item_closure_bool = std::function<bool (item &)>;
@ -42,7 +41,7 @@ namespace ircd::m::sync
struct ircd::m::sync::item struct ircd::m::sync::item
:instance_multimap<std::string, item, std::less<>> :instance_multimap<std::string, item, std::less<>>
{ {
using handle = std::function<bool (data &)>; using handle = std::function<void (data &)>;
handle _polylog; handle _polylog;
handle _linear; handle _linear;
@ -51,9 +50,9 @@ struct ircd::m::sync::item
string_view name() const; string_view name() const;
string_view member_name() const; string_view member_name() const;
bool poll(data &, const m::event &); void poll(data &, const m::event &);
bool linear(data &); void linear(data &);
bool polylog(data &); void polylog(data &);
item(std::string name, item(std::string name,
handle polylog = {}, handle polylog = {},
@ -66,27 +65,28 @@ struct ircd::m::sync::item
struct ircd::m::sync::data struct ircd::m::sync::data
{ {
sync::stats &stats; /// Range to synchronize. Starting index is inclusive, ending index is
ircd::client &client; /// exclusive. Generally the starting index is a since token, and ending
/// index is one beyond the vm::current_sequence and used for next_batch.
m::events::range range;
// Range related /// Statistics tracking. If null, stats won't be accumulated for the sync.
const uint64_t &since; sync::stats *stats {nullptr};
const uint64_t current;
/// The client. This may be null if sync is being called internally.
ircd::client *client {nullptr};
// User related // User related
const m::user user; const m::user user;
const m::user::room user_room; const m::user::room user_room;
const m::room::state user_state; const m::room::state user_state;
const m::user::rooms user_rooms; const m::user::rooms user_rooms;
// Filter to use
const std::string filter_buf; const std::string filter_buf;
const m::filter filter; const m::filter filter;
// response state /// The json::stack master object
const std::unique_ptr<response> resp;
json::stack out; json::stack out;
bool committed() const; bool committed {false};
bool commit(); bool commit();
// apropos contextual // apropos contextual
@ -94,11 +94,14 @@ struct ircd::m::sync::data
const m::room *room {nullptr}; const m::room *room {nullptr};
string_view membership; string_view membership;
data(sync::stats &stats, data(const m::user &user,
ircd::client &client, const m::events::range &range,
const m::user &user, const mutable_buffer &,
const std::pair<event::idx, event::idx> &range, json::stack::flush_callback,
const string_view &filter_id); const size_t &flush_hiwat = 64_KiB,
ircd::client *const &client = nullptr,
sync::stats *const &stats = nullptr,
const string_view &filter_id = {});
data(data &&) = delete; data(data &&) = delete;
data(const data &) = delete; data(const data &) = delete;

View file

@ -485,132 +485,83 @@ bool
ircd::m::sync::apropos(const data &d, ircd::m::sync::apropos(const data &d,
const event::idx &event_idx) const event::idx &event_idx)
{ {
return d.since <= event_idx && d.current >= event_idx; return event_idx >= d.range.first &&
event_idx < d.range.second;
} }
ircd::string_view ircd::string_view
ircd::m::sync::loghead(const data &data) ircd::m::sync::loghead(const data &data)
{ {
thread_local char headbuf[256], rembuf[128], iecbuf[2][64], tmbuf[32]; thread_local char headbuf[256], rembuf[128], iecbuf[2][64], tmbuf[32];
const auto remstr
{
data.client?
string(rembuf, ircd::remote(*data.client)):
string_view{}
};
const auto flush_bytes
{
data.stats?
data.stats->flush_bytes:
0U
};
const auto flush_count
{
data.stats?
data.stats->flush_count:
0U
};
const auto tmstr
{
data.stats?
ircd::pretty(tmbuf, data.stats->timer.at<milliseconds>(), true):
string_view{}
};
return fmt::sprintf return fmt::sprintf
{ {
headbuf, "%s %s %lu:%lu %s chunk:%zu %s in %s", headbuf, "%s %s %lu:%lu %s chunk:%zu %s in %s",
string(rembuf, ircd::remote(data.client)), remstr,
string_view{data.user.user_id}, string_view{data.user.user_id},
data.since, data.range.first,
data.current, data.range.second,
ircd::pretty(iecbuf[0], iec(data.stats.flush_bytes + size(data.out.completed()))), ircd::pretty(iecbuf[0], iec(flush_bytes + size(data.out.completed()))),
data.stats.flush_count, flush_count,
ircd::pretty(iecbuf[1], iec(data.stats.flush_bytes)), ircd::pretty(iecbuf[1], iec(flush_bytes)),
ircd::pretty(tmbuf, data.stats.timer.at<milliseconds>(), true), tmstr
}; };
} }
//
// response
//
struct ircd::m::sync::response
{
static conf::item<size_t> flush_hiwat;
sync::stats &stats;
ircd::client &client;
unique_buffer<mutable_buffer> buf;
std::unique_ptr<resource::response::chunked> resp;
bool committed;
const_buffer flush(const const_buffer &buf);
void commit();
response(sync::stats &stats, ircd::client &client);
~response() noexcept;
};
decltype(ircd::m::sync::response::flush_hiwat)
ircd::m::sync::response::flush_hiwat
{
{ "name", "ircd.m.sync.flush.hiwat" },
{ "default", long(32_KiB) },
};
//
// response::response
//
ircd::m::sync::response::response(sync::stats &stats,
ircd::client &client)
:stats
{
stats
}
,client
{
client
}
,buf
{
std::max(size_t(96_KiB), size_t(flush_hiwat))
}
,committed
{
false
}
{
}
ircd::m::sync::response::~response()
noexcept
{
}
ircd::const_buffer
ircd::m::sync::response::flush(const const_buffer &buf)
{
if(!committed)
return buf;
if(!resp)
commit();
stats.flush_bytes += resp->write(buf);
stats.flush_count++;
return buf;
}
void
ircd::m::sync::response::commit()
{
static const string_view content_type
{
"application/json; charset=utf-8"
};
assert(!resp);
resp = std::make_unique<resource::response::chunked>
(
client, http::OK, content_type
);
}
// //
// data // data
// //
ircd::m::sync::data::data(sync::stats &stats, ircd::m::sync::data::data
ircd::client &client, (
const m::user &user, const m::user &user,
const std::pair<event::idx, event::idx> &range, const m::events::range &range,
const string_view &filter_id) const mutable_buffer &buf,
:stats{stats} json::stack::flush_callback flusher,
,client{client} const size_t &flush_hiwat,
,since ircd::client *const &client,
sync::stats *const &stats,
const string_view &filter_id
)
:range
{ {
range.first range
} }
,current ,stats
{ {
range.second stats
}
,client
{
client
} }
,user ,user
{ {
@ -638,13 +589,9 @@ ircd::m::sync::data::data(sync::stats &stats,
{ {
json::object{filter_buf} json::object{filter_buf}
} }
,resp
{
std::make_unique<response>(stats, client)
}
,out ,out
{ {
resp->buf, std::bind(&response::flush, resp.get(), ph::_1), size_t(resp->flush_hiwat) buf, std::move(flusher), flush_hiwat
} }
{ {
} }
@ -657,20 +604,11 @@ noexcept
bool bool
ircd::m::sync::data::commit() ircd::m::sync::data::commit()
{ {
assert(resp); const auto ret{committed};
const auto ret{resp->committed}; committed = true;
resp->committed = true;
return ret; return ret;
} }
bool
ircd::m::sync::data::committed()
const
{
assert(resp);
return resp->committed;
}
// //
// item // item
// //
@ -719,21 +657,27 @@ noexcept
}; };
} }
bool void
ircd::m::sync::item::polylog(data &data) ircd::m::sync::item::polylog(data &data)
try try
{ {
#ifdef RB_DEBUG #ifdef RB_DEBUG
sync::stats stats{data.stats}; sync::stats stats
{
data.stats?
*data.stats:
sync::stats{}
};
if(data.stats)
stats.timer = {}; stats.timer = {};
#endif #endif
const bool ret _polylog(data);
{
_polylog(data)
};
#ifdef RB_DEBUG #ifdef RB_DEBUG
if(data.stats)
{
//data.out.flush(); //data.out.flush();
thread_local char tmbuf[32]; thread_local char tmbuf[32];
log::debug log::debug
@ -743,9 +687,8 @@ try
name(), name(),
ircd::pretty(tmbuf, stats.timer.at<microseconds>(), true) ircd::pretty(tmbuf, stats.timer.at<microseconds>(), true)
}; };
}
#endif #endif
return ret;
} }
catch(const std::bad_function_call &e) catch(const std::bad_function_call &e)
{ {
@ -756,8 +699,6 @@ catch(const std::bad_function_call &e)
name(), name(),
e.what() e.what()
}; };
return false;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
@ -772,33 +713,25 @@ catch(const std::exception &e)
throw; throw;
} }
bool void
ircd::m::sync::item::linear(data &data) ircd::m::sync::item::linear(data &data)
try try
{ {
const auto ret _linear(data);
{
_linear(data)
};
return ret;
} }
catch(const std::bad_function_call &e) catch(const std::bad_function_call &e)
{ {
thread_local char rembuf[128]; thread_local char rembuf[128];
log::dwarning log::dwarning
{ {
log, "linear %s %s '%s' missing handler :%s", log, "linear %s '%s' missing handler :%s",
string(rembuf, ircd::remote(data.client)), loghead(data),
string_view{data.user.user_id},
name(), name(),
e.what() e.what()
}; };
return false;
} }
bool void
ircd::m::sync::item::poll(data &data, ircd::m::sync::item::poll(data &data,
const m::event &event) const m::event &event)
try try
@ -808,26 +741,18 @@ try
data.event, &event data.event, &event
}; };
const auto ret _linear(data);
{
_linear(data)
};
return ret;
} }
catch(const std::bad_function_call &e) catch(const std::bad_function_call &e)
{ {
thread_local char rembuf[128]; thread_local char rembuf[128];
log::dwarning log::dwarning
{ {
log, "poll %s %s '%s' missing handler :%s", log, "poll %s '%s' missing handler :%s",
string(rembuf, ircd::remote(data.client)), loghead(data),
string_view{data.user.user_id},
name(), name(),
e.what() e.what()
}; };
return false;
} }
ircd::string_view ircd::string_view

View file

@ -56,6 +56,27 @@ ircd::m::sync::args::timeout_default
{ "default", 10 * 1000L }, { "default", 10 * 1000L },
}; };
decltype(ircd::m::sync::flush_hiwat)
ircd::m::sync::flush_hiwat
{
{ "name", "ircd.client.sync.flush.hiwat" },
{ "default", long(48_KiB) },
};
decltype(ircd::m::sync::buffer_size)
ircd::m::sync::buffer_size
{
{ "name", "ircd.client.sync.buffer_size" },
{ "default", long(128_KiB) },
};
decltype(ircd::m::sync::linear::delta_max)
ircd::m::sync::linear::delta_max
{
{ "name", "ircd.client.sync.linear.delta.max" },
{ "default", 1024 },
};
// //
// GET sync // GET sync
// //
@ -73,23 +94,57 @@ ircd::m::sync::method_get
ircd::resource::response ircd::resource::response
ircd::m::sync::handle_get(client &client, ircd::m::sync::handle_get(client &client,
const resource::request &request) const resource::request &request)
try
{ {
// Parse the request options
const args args const args args
{ {
request request
}; };
const std::pair<event::idx, event::idx> range // The range to `/sync`. We involve events starting at the range.first
// index in this sync. We will not involve events with an index equal
// or greater than the range.second. In this case the range.second does not
// exist yet because it is one past the server's current_sequence counter.
const m::events::range range
{ {
args.since, // start at since token args.since, // start at since token
m::vm::current_sequence // stop at present m::vm::current_sequence + 1 // stop before future
}; };
// When the range indexes are the same, the client is polling for the next
// event which doesn't exist yet. There is no reason for the since parameter
// to be greater than that.
if(range.first > range.second)
throw m::NOT_FOUND
{
"Since parameter is too far in the future..."
};
// Setup an output buffer to compose the response. This has to be at least
// the worst-case size of a matrix event (64_KiB) or bad things happen.
const unique_buffer<mutable_buffer> buffer
{
size_t(buffer_size)
};
// Setup a chunked encoded response.
resource::response::chunked response
{
client, http::OK
};
// Keep state for statistics of this sync here on the stack.
stats stats; stats stats;
data data data data
{ {
stats, client, request.user_id, range, args.filter_id request.user_id,
range,
buffer,
std::bind(&sync::flush, std::ref(data), std::ref(response), ph::_1),
size_t(flush_hiwat),
&client,
&stats,
args.filter_id
}; };
log::debug log::debug
@ -97,56 +152,76 @@ try
log, "request %s", loghead(data) log, "request %s", loghead(data)
}; };
if(data.since > data.current + 1) json::stack::object object
throw m::NOT_FOUND
{ {
"Since parameter is too far in the future..." data.out
};
const size_t linear_delta_max
{
linear::delta_max
}; };
const bool shortpolled const bool shortpolled
{ {
range.first > range.second? range.first > range.second?
false: false:
range.second - range.first <= linear_delta_max? range.second - range.first <= size_t(linear::delta_max)?
polylog::handle(data): polylog::handle(data):
polylog::handle(data) polylog::handle(data)
}; };
// When shortpoll was successful, do nothing else. // When shortpoll was successful, do nothing else.
if(shortpolled) if(shortpolled)
return {}; return response;
// When longpoll was successful, do nothing else. // When longpoll was successful, do nothing else.
if(longpoll::poll(data, args)) if(longpoll::poll(data, args))
return {}; return response;
// A user-timeout occurred. According to the spec we return a // A user-timeout occurred. According to the spec we return a
// 200 with empty fields rather than a 408. // 200 with empty fields rather than a 408.
const json::value next_batch empty_response(data);
{ return response;
lex_cast(m::vm::current_sequence + 0), json::STRING }
};
return resource::response void
ircd::m::sync::empty_response(data &data)
{
data.commit();
json::stack::member
{ {
client, json::members data.out, "next_batch", json::value
{ {
{ "next_batch", next_batch }, lex_cast(data.range.second), json::STRING
{ "rooms", json::object{} },
{ "presence", json::object{} },
} }
}; };
// Empty objects added to output otherwise Riot b0rks.
json::stack::object{data.out, "rooms"};
json::stack::object{data.out, "presence"};
} }
catch(const bad_lex_cast &e)
ircd::const_buffer
ircd::m::sync::flush(data &data,
resource::response::chunked &response,
const const_buffer &buffer)
{ {
throw m::BAD_REQUEST if(!data.committed)
return const_buffer
{ {
"Since parameter invalid :%s", e.what() buffer::data(buffer), 0UL
};
const size_t wrote
{
response.write(buffer)
};
if(data.stats)
{
data.stats->flush_bytes += wrote;
data.stats->flush_count++;
}
return const_buffer
{
buffer::data(buffer), wrote
}; };
} }
@ -158,11 +233,6 @@ bool
ircd::m::sync::polylog::handle(data &data) ircd::m::sync::polylog::handle(data &data)
try try
{ {
json::stack::object object
{
data.out
};
m::sync::for_each(string_view{}, [&data] m::sync::for_each(string_view{}, [&data]
(item &item) (item &item)
{ {
@ -175,24 +245,20 @@ try
return true; return true;
}); });
const json::value next_batch
{
lex_cast(data.current + 1), json::STRING
};
json::stack::member json::stack::member
{ {
object, "next_batch", next_batch data.out, "next_batch", json::value
{
lex_cast(data.range.second), json::STRING
}
}; };
log::info log::info
{ {
log, "polylog %s (next_batch:%s)", log, "polylog %s complete", loghead(data)
loghead(data),
string_view{next_batch}
}; };
return data.committed(); return data.committed;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
@ -210,28 +276,11 @@ catch(const std::exception &e)
// linear // linear
// //
decltype(ircd::m::sync::linear::delta_max)
ircd::m::sync::linear::delta_max
{
{ "name", "ircd.client.sync.linear.delta.max" },
{ "default", 1024 },
};
bool bool
ircd::m::sync::linear::handle(data &data) ircd::m::sync::linear::handle(data &data)
try try
{ {
json::stack::object object m::events::for_each(data.range, [&data]
{
data.out
};
const m::events::range range
{
data.since, data.current + 1
};
m::events::for_each(range, [&data]
(const m::event::idx &event_idx, const m::event &event) (const m::event::idx &event_idx, const m::event &event)
{ {
const scope_restore<decltype(data.event)> theirs const scope_restore<decltype(data.event)> theirs
@ -254,24 +303,20 @@ try
return true; return true;
}); });
const json::value next_batch
{
lex_cast(data.current + 1), json::STRING
};
json::stack::member json::stack::member
{ {
object, "next_batch", next_batch data.out, "next_batch", json::value
{
lex_cast(data.range.second), json::STRING
}
}; };
log::debug log::debug
{ {
log, "linear %s (next_batch:%s)", log, "linear %s complete", loghead(data)
loghead(data),
string_view{next_batch}
}; };
return data.committed(); return data.committed;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {

View file

@ -19,8 +19,11 @@ namespace ircd::m::sync
extern resource resource; extern resource resource;
extern resource::method method_get; extern resource::method method_get;
static long notification_count(const room &, const event::idx &a, const event::idx &b); extern conf::item<size_t> flush_hiwat;
static long highlight_count(const room &, const user &, const event::idx &a, const event::idx &b); extern conf::item<size_t> buffer_size;
static const_buffer flush(data &, resource::response::chunked &, const const_buffer &);
static void empty_response(data &);
static resource::response handle_get(client &, const resource::request &); static resource::response handle_get(client &, const resource::request &);
} }
@ -77,14 +80,12 @@ namespace ircd::m::sync::polylog
/// Argument parser for the client's /sync request /// Argument parser for the client's /sync request
struct ircd::m::sync::args struct ircd::m::sync::args
{ {
args(const resource::request &request);
static conf::item<milliseconds> timeout_max; static conf::item<milliseconds> timeout_max;
static conf::item<milliseconds> timeout_min; static conf::item<milliseconds> timeout_min;
static conf::item<milliseconds> timeout_default; static conf::item<milliseconds> timeout_default;
args(const resource::request &request)
:request{request}
{}
const resource::request &request; const resource::request &request;
string_view filter_id string_view filter_id
@ -132,3 +133,20 @@ struct ircd::m::sync::args
request.query.get("set_presence", true) request.query.get("set_presence", true)
}; };
}; };
inline
ircd::m::sync::args::args(const resource::request &request)
try
:request
{
request
}
{
}
catch(const bad_lex_cast &e)
{
throw m::BAD_REQUEST
{
"Since parameter invalid :%s", e.what()
};
}

View file

@ -17,8 +17,8 @@ IRCD_MODULE
namespace ircd::m::sync namespace ircd::m::sync
{ {
static bool account_data_(data &, const m::event &, const m::event::idx &); static bool account_data_(data &, const m::event &, const m::event::idx &);
static bool account_data_polylog(data &); static void account_data_polylog(data &);
static bool account_data_linear(data &); static void account_data_linear(data &);
extern item account_data; extern item account_data;
} }
@ -31,13 +31,13 @@ ircd::m::sync::account_data
account_data_linear account_data_linear
}; };
bool void
ircd::m::sync::account_data_linear(data &data) ircd::m::sync::account_data_linear(data &data)
{ {
return true;
} }
bool void
ircd::m::sync::account_data_polylog(data &data) ircd::m::sync::account_data_polylog(data &data)
{ {
json::stack::object object json::stack::object object
@ -61,8 +61,6 @@ ircd::m::sync::account_data_polylog(data &data)
if(account_data_(data, event, index(event))) if(account_data_(data, event, index(event)))
data.commit(); data.commit();
}); });
return true;
} }
bool bool

View file

@ -17,10 +17,10 @@ IRCD_MODULE
namespace ircd::m::sync namespace ircd::m::sync
{ {
static void presence_polylog_events(data &); static void presence_polylog_events(data &);
static bool presence_polylog(data &); static void presence_polylog(data &);
static void presence_linear_events(data &); static void presence_linear_events(data &);
static bool presence_linear(data &); static void presence_linear(data &);
extern item presence; extern item presence;
} }
@ -33,10 +33,10 @@ ircd::m::sync::presence
presence_linear, presence_linear,
}; };
bool void
ircd::m::sync::presence_linear(data &data) ircd::m::sync::presence_linear(data &data)
{ {
return true; return;
assert(data.event); assert(data.event);
const m::event &event const m::event &event
@ -45,10 +45,10 @@ ircd::m::sync::presence_linear(data &data)
}; };
if(json::get<"type"_>(event) != "ircd.presence") if(json::get<"type"_>(event) != "ircd.presence")
return true; return;
if(json::get<"sender"_>(event) != m::me.user_id) if(json::get<"sender"_>(event) != m::me.user_id)
return true; return;
// sender // sender
json::stack::member json::stack::member
@ -68,10 +68,10 @@ ircd::m::sync::presence_linear(data &data)
data.out, "content", at<"content"_>(event) data.out, "content", at<"content"_>(event)
}; };
return true; return;
} }
bool void
ircd::m::sync::presence_polylog(data &data) ircd::m::sync::presence_polylog(data &data)
{ {
json::stack::object object json::stack::object object
@ -80,7 +80,6 @@ ircd::m::sync::presence_polylog(data &data)
}; };
presence_polylog_events(data); presence_polylog_events(data);
return true;
} }
void void
@ -126,9 +125,9 @@ ircd::m::sync::presence_polylog_events(data &data)
}}; }};
//TODO: conf //TODO: conf
static const size_t fibers(16); static const size_t fibers(32);
std::array<string_view, fibers> q; std::array<string_view, fibers> q;
std::array<char[256], fibers> buf; std::array<char[128], fibers> buf; //TODO: X
ctx::parallel<string_view> parallel ctx::parallel<string_view> parallel
{ {
m::sync::pool, q, [&data, &closure](const auto &user_id) m::sync::pool, q, [&data, &closure](const auto &user_id)
@ -136,7 +135,7 @@ ircd::m::sync::presence_polylog_events(data &data)
const m::user user{user_id}; const m::user user{user_id};
const m::user::room user_room{user}; const m::user::room user_room{user};
//TODO: can't check event_idx cuz only closed presence content //TODO: can't check event_idx cuz only closed presence content
if(head_idx(std::nothrow, user_room) > data.since) if(head_idx(std::nothrow, user_room) >= data.range.first)
m::presence::get(std::nothrow, user, closure); m::presence::get(std::nothrow, user, closure);
} }
}; };

View file

@ -18,10 +18,10 @@ namespace ircd::m::sync
{ {
static void _rooms_polylog_room(data &, const m::room &); static void _rooms_polylog_room(data &, const m::room &);
static void _rooms_polylog(data &, const string_view &membership); static void _rooms_polylog(data &, const string_view &membership);
static bool rooms_polylog(data &); static void rooms_polylog(data &);
static void _rooms_linear(data &, const string_view &membership); static void _rooms_linear(data &, const string_view &membership);
static bool rooms_linear(data &); static void rooms_linear(data &);
extern item rooms; extern item rooms;
} }
@ -34,7 +34,7 @@ ircd::m::sync::rooms
rooms_linear rooms_linear
}; };
bool void
ircd::m::sync::rooms_linear(data &data) ircd::m::sync::rooms_linear(data &data)
{ {
json::stack::object object{data.out}; json::stack::object object{data.out};
@ -42,7 +42,6 @@ ircd::m::sync::rooms_linear(data &data)
_rooms_linear(data, "join"); _rooms_linear(data, "join");
_rooms_linear(data, "leave"); _rooms_linear(data, "leave");
_rooms_linear(data, "ban"); _rooms_linear(data, "ban");
return true;
} }
void void
@ -66,7 +65,7 @@ ircd::m::sync::_rooms_linear(data &data,
}); });
} }
bool void
ircd::m::sync::rooms_polylog(data &data) ircd::m::sync::rooms_polylog(data &data)
{ {
json::stack::object object{data.out}; json::stack::object object{data.out};
@ -74,7 +73,7 @@ ircd::m::sync::rooms_polylog(data &data)
_rooms_polylog(data, "join"); _rooms_polylog(data, "join");
_rooms_polylog(data, "leave"); _rooms_polylog(data, "leave");
_rooms_polylog(data, "ban"); _rooms_polylog(data, "ban");
return true; return;
} }
void void
@ -100,12 +99,19 @@ ircd::m::sync::_rooms_polylog(data &data,
}; };
assert(head_idx); // room should exist assert(head_idx); // room should exist
if(!head_idx || head_idx < data.since) if(!head_idx || head_idx < data.range.first)
return; return;
// Generate individual stats for this room's sync // Generate individual stats for this room's sync
#ifdef RB_DEBUG #ifdef RB_DEBUG
sync::stats stats{data.stats}; sync::stats stats
{
data.stats?
*data.stats:
sync::stats{}
};
if(data.stats)
stats.timer = timer{}; stats.timer = timer{};
#endif #endif
@ -118,7 +124,9 @@ ircd::m::sync::_rooms_polylog(data &data,
log, "polylog %s %s in %s", log, "polylog %s %s in %s",
loghead(data), loghead(data),
string_view{room.room_id}, string_view{room.room_id},
ircd::pretty(tmbuf, stats.timer.at<milliseconds>(), true) data.stats?
ircd::pretty(tmbuf, stats.timer.at<milliseconds>(), true):
string_view{"<no stats>"}
}; };
#endif #endif
}); });

View file

@ -18,7 +18,7 @@ namespace ircd::m::sync
{ {
static void room_account_data_polylog_events_event(data &, const m::event &); static void room_account_data_polylog_events_event(data &, const m::event &);
static void room_account_data_polylog_events(data &); static void room_account_data_polylog_events(data &);
static bool room_account_data_polylog(data &); static void room_account_data_polylog(data &);
extern item room_account_data; extern item room_account_data;
} }
@ -30,7 +30,7 @@ ircd::m::sync::room_account_data
room_account_data_polylog room_account_data_polylog
}; };
bool void
ircd::m::sync::room_account_data_polylog(data &data) ircd::m::sync::room_account_data_polylog(data &data)
{ {
json::stack::object object json::stack::object object
@ -39,7 +39,6 @@ ircd::m::sync::room_account_data_polylog(data &data)
}; };
room_account_data_polylog_events(data); room_account_data_polylog_events(data);
return true;
} }
void void

View file

@ -16,9 +16,9 @@ IRCD_MODULE
namespace ircd::m::sync namespace ircd::m::sync
{ {
static bool rooms_ephemeral_events_polylog(data &); static void rooms_ephemeral_events_polylog(data &);
static bool rooms_ephemeral_polylog(data &); static void rooms_ephemeral_polylog(data &);
static bool rooms_ephemeral_linear(data &); static void rooms_ephemeral_linear(data &);
extern item rooms_ephemeral; extern item rooms_ephemeral;
} }
@ -30,13 +30,13 @@ ircd::m::sync::rooms_ephemeral
rooms_ephemeral_linear rooms_ephemeral_linear
}; };
bool void
ircd::m::sync::rooms_ephemeral_linear(data &data) ircd::m::sync::rooms_ephemeral_linear(data &data)
{ {
return true;
} }
bool void
ircd::m::sync::rooms_ephemeral_polylog(data &data) ircd::m::sync::rooms_ephemeral_polylog(data &data)
{ {
json::stack::object object json::stack::object object
@ -45,10 +45,9 @@ ircd::m::sync::rooms_ephemeral_polylog(data &data)
}; };
rooms_ephemeral_events(data); rooms_ephemeral_events(data);
return true;
} }
bool void
ircd::m::sync::rooms_ephemeral_events_polylog(data &data) ircd::m::sync::rooms_ephemeral_events_polylog(data &data)
{ {
json::stack::array array json::stack::array array
@ -62,6 +61,4 @@ ircd::m::sync::rooms_ephemeral_events_polylog(data &data)
item.polylog(data); item.polylog(data);
return true; return true;
}); });
return true;
} }

View file

@ -19,7 +19,7 @@ namespace ircd::m::sync
static void _reformat_receipt(json::stack::object &, const m::event &); static void _reformat_receipt(json::stack::object &, const m::event &);
static void _handle_receipt(data &, const m::event &); static void _handle_receipt(data &, const m::event &);
static void _handle_user(data &, const m::user &); static void _handle_user(data &, const m::user &);
static bool room_ephemeral_m_receipt_m_read_polylog(data &); static void room_ephemeral_m_receipt_m_read_polylog(data &);
extern item room_ephemeral_m_receipt_m_read; extern item room_ephemeral_m_receipt_m_read;
} }
@ -30,7 +30,7 @@ ircd::m::sync::room_ephemeral_m_receipt_m_read
room_ephemeral_m_receipt_m_read_polylog room_ephemeral_m_receipt_m_read_polylog
}; };
bool void
ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data) ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data)
{ {
const m::room &room{*data.room}; const m::room &room{*data.room};
@ -44,8 +44,6 @@ ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data)
{ {
_handle_user(data, user_id); _handle_user(data, user_id);
}}); }});
return true;
} }
void void
@ -62,7 +60,7 @@ ircd::m::sync::_handle_user(data &data,
m::user::room user_room{user}; m::user::room user_room{user};
user_room.fopts = &fopts; user_room.fopts = &fopts;
if(head_idx(std::nothrow, user_room) < data.since) if(head_idx(std::nothrow, user_room) < data.range.first)
return; return;
const m::room::id &room_id{*data.room}; const m::room::id &room_id{*data.room};

View file

@ -16,11 +16,11 @@ IRCD_MODULE
namespace ircd::m::sync namespace ircd::m::sync
{ {
static bool room_state_polylog_events(data &); static void room_state_polylog_events(data &);
static bool room_state_polylog(data &); static void room_state_polylog(data &);
static bool room_state_linear_events(data &); static void room_state_linear_events(data &);
static bool room_state_linear(data &); static void room_state_linear(data &);
extern const event::keys::include _default_keys; extern const event::keys::include _default_keys;
extern item room_state; extern item room_state;
@ -48,7 +48,7 @@ ircd::m::sync::_default_keys
"type", "type",
}; };
bool void
ircd::m::sync::room_state_linear(data &data) ircd::m::sync::room_state_linear(data &data)
{ {
assert(data.event); assert(data.event);
@ -56,16 +56,15 @@ ircd::m::sync::room_state_linear(data &data)
assert(json::get<"room_id"_>(*data.event)); assert(json::get<"room_id"_>(*data.event));
if(!json::get<"state_key"_>(*data.event)) if(!json::get<"state_key"_>(*data.event))
return false; return;
if(!data.room->membership(data.user, data.membership)) if(!data.room->membership(data.user, data.membership))
return false; return;
//data.array->append(*data.event); //data.array->append(*data.event);
return true;
} }
bool void
ircd::m::sync::room_state_polylog(data &data) ircd::m::sync::room_state_polylog(data &data)
{ {
json::stack::object object json::stack::object object
@ -73,10 +72,10 @@ ircd::m::sync::room_state_polylog(data &data)
data.out data.out
}; };
return room_state_polylog_events(data); room_state_polylog_events(data);
} }
bool void
ircd::m::sync::room_state_polylog_events(data &data) ircd::m::sync::room_state_polylog_events(data &data)
{ {
json::stack::array array json::stack::array array
@ -107,7 +106,7 @@ ircd::m::sync::room_state_polylog_events(data &data)
}}; }};
//TODO: conf //TODO: conf
std::array<event::idx, 8> md; std::array<event::idx, 16> md;
ctx::parallel<event::idx> parallel ctx::parallel<event::idx> parallel
{ {
m::sync::pool, md, each_idx m::sync::pool, md, each_idx
@ -121,6 +120,4 @@ ircd::m::sync::room_state_polylog_events(data &data)
if(apropos(data, event_idx)) if(apropos(data, event_idx))
parallel(event_idx); parallel(event_idx);
}); });
return true;
} }

View file

@ -16,13 +16,11 @@ IRCD_MODULE
namespace ircd::m::sync namespace ircd::m::sync
{ {
struct room_timeline;
static event::id::buf _room_timeline_polylog_events(data &, const m::room &, bool &); static event::id::buf _room_timeline_polylog_events(data &, const m::room &, bool &);
static bool room_timeline_polylog(data &); static void room_timeline_polylog(data &);
static event::id::buf _room_timeline_linear_events(data &, const m::room &, bool &); static event::id::buf _room_timeline_linear_events(data &, const m::room &, bool &);
static bool room_timeline_linear(data &); static void room_timeline_linear(data &);
extern const event::keys::include default_keys; extern const event::keys::include default_keys;
extern item room_timeline; extern item room_timeline;
@ -51,10 +49,10 @@ ircd::m::sync::default_keys
"type", "type",
}; };
bool void
ircd::m::sync::room_timeline_linear(data &data) ircd::m::sync::room_timeline_linear(data &data)
{ {
return true; return;
json::stack::object object json::stack::object object
{ {
@ -82,7 +80,7 @@ ircd::m::sync::room_timeline_linear(data &data)
object, "limited", json::value{limited} object, "limited", json::value{limited}
}; };
return true; return;
} }
ircd::m::event::id::buf ircd::m::event::id::buf
@ -98,7 +96,7 @@ ircd::m::sync::_room_timeline_linear_events(data &data,
return {}; return {};
} }
bool void
ircd::m::sync::room_timeline_polylog(data &data) ircd::m::sync::room_timeline_polylog(data &data)
{ {
json::stack::object object json::stack::object object
@ -124,8 +122,6 @@ ircd::m::sync::room_timeline_polylog(data &data)
{ {
object, "limited", json::value{limited} object, "limited", json::value{limited}
}; };
return true;
} }
ircd::m::event::id::buf ircd::m::event::id::buf
@ -159,10 +155,10 @@ ircd::m::sync::_room_timeline_polylog_events(data &data,
for(; it && i < 10; --it, ++i) for(; it && i < 10; --it, ++i)
{ {
event_id = it.event_id(); event_id = it.event_id();
if(it.event_idx() < data.since) if(it.event_idx() < data.range.first)
break; break;
if(it.event_idx() > data.current) if(it.event_idx() >= data.range.second)
break; break;
} }

View file

@ -18,8 +18,8 @@ namespace ircd::m::sync
{ {
static long _notification_count(const room &, const event::idx &a, const event::idx &b); static long _notification_count(const room &, const event::idx &a, const event::idx &b);
static long _highlight_count(const room &, const user &u, const event::idx &a, const event::idx &b); static long _highlight_count(const room &, const user &u, const event::idx &a, const event::idx &b);
static bool room_unread_notifications_polylog(data &); static void room_unread_notifications_polylog(data &);
static bool room_unread_notifications_linear(data &); static void room_unread_notifications_linear(data &);
extern item room_unread_notifications; extern item room_unread_notifications;
} }
@ -31,19 +31,19 @@ ircd::m::sync::room_unread_notifications
room_unread_notifications_linear room_unread_notifications_linear
}; };
bool void
ircd::m::sync::room_unread_notifications_linear(data &data) ircd::m::sync::room_unread_notifications_linear(data &data)
{ {
return true;
} }
bool void
ircd::m::sync::room_unread_notifications_polylog(data &data) ircd::m::sync::room_unread_notifications_polylog(data &data)
{ {
auto &room{*data.room}; auto &room{*data.room};
m::event::id::buf last_read; m::event::id::buf last_read;
if(!m::receipt::read(last_read, room.room_id, data.user)) if(!m::receipt::read(last_read, room.room_id, data.user))
return false; return;
data.commit(); data.commit();
json::stack::object out json::stack::object out
@ -51,7 +51,7 @@ ircd::m::sync::room_unread_notifications_polylog(data &data)
data.out data.out
}; };
const auto last_read_idx const auto start_idx
{ {
index(last_read) index(last_read)
}; };
@ -61,7 +61,7 @@ ircd::m::sync::room_unread_notifications_polylog(data &data)
{ {
out, "highlight_count", json::value out, "highlight_count", json::value
{ {
_highlight_count(room, data.user, last_read_idx, data.current) _highlight_count(room, data.user, start_idx, data.range.second)
} }
}; };
@ -70,11 +70,9 @@ ircd::m::sync::room_unread_notifications_polylog(data &data)
{ {
out, "notification_count", json::value out, "notification_count", json::value
{ {
_notification_count(room, last_read_idx, data.current) _notification_count(room, start_idx, data.range.second)
} }
}; };
return true;
} }
long long