0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-26 05:48:20 +02:00

ircd:Ⓜ️:sync: Additional abstraction; checkpoint polylog /sync modules.

This commit is contained in:
Jason Volk 2019-01-08 15:10:06 -08:00
parent 32aa9b4833
commit 443905859f
13 changed files with 548 additions and 431 deletions

View file

@ -23,8 +23,20 @@ namespace ircd::m::sync
struct data;
struct item;
struct response;
using item_closure = std::function<void (item &)>;
using item_closure_bool = std::function<bool (item &)>;
string_view loghead(const data &);
bool apropos(const data &, const event::idx &);
bool apropos(const data &, const event::id &);
bool apropos(const data &, const event &);
bool for_each(const string_view &prefix, const item_closure_bool &);
bool for_each(const item_closure_bool &);
extern log::log log;
extern ctx::pool pool;
}
struct ircd::m::sync::item
@ -37,6 +49,7 @@ struct ircd::m::sync::item
public:
string_view name() const;
string_view member_name() const;
bool linear(data &, const m::event &);
bool polylog(data &);
@ -58,11 +71,11 @@ struct ircd::m::sync::data
// Range related
const uint64_t &since;
const uint64_t current;
const uint64_t delta;
// User related
const m::user user;
const m::user::room user_room;
const m::room::state user_state;
const m::user::rooms user_rooms;
// Filter to use
@ -76,18 +89,9 @@ struct ircd::m::sync::data
bool commit();
// apropos contextual
ctx::mutex write_mutex;
json::stack::member *member {nullptr};
json::stack::object *object {nullptr};
json::stack::array *array {nullptr};
const m::event *event {nullptr};
const m::room *room {nullptr};
string_view membership;
window_buffer ret;
std::array<string_view, 16> path;
// unsorted / misc
uint64_t state_at {0};
data(sync::stats &stats,
ircd::client &client,

View file

@ -72,6 +72,8 @@ catch(const std::exception &e)
ircd::m::init::~init()
noexcept try
{
m::sync::pool.join();
if(!std::current_exception())
presence::set(me, "offline", me_offline_status_msg);
}
@ -400,6 +402,105 @@ ircd::m::sync::log
"sync", 's'
};
namespace ircd::m::sync
{
const ctx::pool::opts pool_opts
{
ctx::DEFAULT_STACK_SIZE,
0,
-1,
0
};
}
decltype(ircd::m::sync::pool)
ircd::m::sync::pool
{
"sync", pool_opts
};
bool
ircd::m::sync::for_each(const item_closure_bool &closure)
{
return for_each(string_view{}, closure);
}
bool
ircd::m::sync::for_each(const string_view &prefix,
const item_closure_bool &closure)
{
const auto depth
{
token_count(prefix, '.')
};
auto it
{
item::map.lower_bound(prefix)
};
for(; it != end(item::map); ++it)
{
const auto item_depth
{
token_count(it->first, '.')
};
if(item_depth > depth + 1)
continue;
if(it->first == prefix)
continue;
if(item_depth < depth + 1)
break;
if(!closure(*it->second))
return false;
}
return true;
}
bool
ircd::m::sync::apropos(const data &d,
const event &event)
{
return apropos(d, index(event, std::nothrow));
}
bool
ircd::m::sync::apropos(const data &d,
const event::id &event_id)
{
return apropos(d, index(event_id, std::nothrow));
}
bool
ircd::m::sync::apropos(const data &d,
const event::idx &event_idx)
{
return d.since <= event_idx && d.current >= event_idx;
}
ircd::string_view
ircd::m::sync::loghead(const data &data)
{
thread_local char headbuf[256], rembuf[128], iecbuf[2][64], tmbuf[32];
return fmt::sprintf
{
headbuf, "%s %s [%lu -> %lu] %s chunk:%zu %s in %s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
data.since,
data.current,
ircd::pretty(iecbuf[0], iec(data.stats.flush_bytes + size(data.out.completed()))),
data.stats.flush_count,
ircd::pretty(iecbuf[1], iec(data.stats.flush_bytes)),
ircd::pretty(tmbuf, data.stats.timer.at<milliseconds>(), true),
};
}
//
// response
//
@ -506,10 +607,6 @@ ircd::m::sync::data::data(sync::stats &stats,
{
range.second
}
,delta
{
current - since
}
,user
{
user
@ -518,6 +615,10 @@ ircd::m::sync::data::data(sync::stats &stats,
{
user
}
,user_state
{
user_room
}
,user_rooms
{
user
@ -654,21 +755,19 @@ try
stats.timer = {};
#endif
const auto ret
const bool ret
{
_polylog(data)
};
#ifdef RB_DEBUG
thread_local char rembuf[128], iecbuf[64], tmbuf[32];
//data.out.flush();
thread_local char tmbuf[32];
log::debug
{
log, "polylog %s %s '%s' %s wc:%zu in %s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
log, "polylog %s '%s' %s",
loghead(data),
name(),
ircd::pretty(iecbuf, iec(data.stats.flush_bytes - stats.flush_bytes)),
data.stats.flush_count - stats.flush_count,
ircd::pretty(tmbuf, stats.timer.at<microseconds>(), true)
};
#endif
@ -677,12 +776,10 @@ try
}
catch(const std::bad_function_call &e)
{
thread_local char rembuf[128];
log::dwarning
{
log, "polylog %s %s '%s' missing handler :%s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
log, "polylog %s '%s' missing handler :%s",
loghead(data),
name(),
e.what()
};
@ -691,22 +788,24 @@ catch(const std::bad_function_call &e)
}
catch(const std::exception &e)
{
thread_local char rembuf[128], iecbuf[64], tmbuf[32];
log::derror
{
log, "polylog %s %s '%s' %s wc:%zu in %s :%s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
log, "polylog %s '%s' :%s",
loghead(data),
name(),
ircd::pretty(iecbuf, iec(data.stats.flush_bytes)),
data.stats.flush_count,
ircd::pretty(tmbuf, data.stats.timer.at<milliseconds>(), true),
e.what()
};
throw;
}
ircd::string_view
ircd::m::sync::item::member_name()
const
{
return token_last(name(), '.');
}
ircd::string_view
ircd::m::sync::item::name()
const

View file

@ -92,6 +92,11 @@ try
stats, client, request.user_id, range, args.filter_id
};
log::debug
{
log, "request %s", loghead(data)
};
if(data.since > data.current + 1)
throw m::NOT_FOUND
{
@ -105,11 +110,11 @@ try
const bool shortpolled
{
data.delta == 0?
range.first > range.second?
false:
data.delta > linear_delta_max?
polylog::handle(data):
linear::handle(data)
range.second - range.first <= linear_delta_max?
linear::handle(data):
polylog::handle(data)
};
// When shortpoll was successful, do nothing else.
@ -117,7 +122,7 @@ try
return {};
// When longpoll was successful, do nothing else.
if(longpoll::poll(client, args))
if(longpoll::poll(client, data, args))
return {};
// A user-timeout occurred. According to the spec we return a
@ -145,6 +150,65 @@ catch(const bad_lex_cast &e)
};
}
//
// polylog
//
bool
ircd::m::sync::polylog::handle(data &data)
try
{
data.commit();
// Top level sync object.
json::stack::object object
{
data.out
};
m::sync::for_each([&data]
(item &item)
{
json::stack::member member
{
data.out, item.member_name()
};
item.polylog(data);
return true;
});
const json::value next_batch
{
lex_cast(data.current + 1), json::STRING
};
json::stack::member
{
object, "next_batch", next_batch
};
log::info
{
log, "polylog %s (next_batch:%s)",
loghead(data),
string_view{next_batch}
};
return true;
}
catch(const std::exception &e)
{
log::error
{
log, "polylog %s FAILED :%s",
loghead(data),
e.what()
};
throw;
}
//
// longpoll
//
@ -178,6 +242,7 @@ ircd::m::sync::longpoll::handle_notify(const m::event &event,
bool
ircd::m::sync::longpoll::poll(client &client,
data &data,
const args &args)
{
++polling;
@ -201,13 +266,14 @@ ircd::m::sync::longpoll::poll(client &client,
queue.pop_front();
}};
if(handle(client, args, a))
if(handle(client, data, args, a))
return true;
}
}
bool
ircd::m::sync::longpoll::handle(client &client,
data &data,
const args &args,
const accepted &event)
{
@ -219,7 +285,7 @@ ircd::m::sync::longpoll::handle(client &client,
if(room_id)
{
const m::room room{room_id};
return handle(client, args, event, room);
return handle(client, data, args, event, room);
}
return false;
@ -227,6 +293,7 @@ ircd::m::sync::longpoll::handle(client &client,
bool
ircd::m::sync::longpoll::handle(client &client,
data &data,
const args &args,
const accepted &event,
const m::room &room)
@ -271,7 +338,7 @@ ircd::m::sync::longpoll::handle(client &client,
const auto &next_batch
{
int64_t(m::vm::current_sequence + 1)
int64_t(data.current + 1)
};
resource::response
@ -615,7 +682,7 @@ ircd::m::sync::linear::handle(data &data)
{
client, json::members
{
{ "next_batch", json::value { lex_cast(int64_t(since + 1)), json::STRING } },
{ "next_batch", json::value { lex_cast(int64_t(data.current + 1)), json::STRING } },
{ "rooms", rooms },
{ "presence", json::object{} },
}
@ -648,96 +715,3 @@ ircd::m::sync::highlight_count(const m::room &r,
return count(u, r, a, a < b? b : a);
}
//
// polylog
//
bool
ircd::m::sync::polylog::handle(data &data)
try
{
json::stack::object object
{
data.out
};
// Generate individual stats for sections
thread_local char iecbuf[64], rembuf[128], tmbuf[32];
sync::stats stats{data.stats};
stats.timer = timer{};
{
json::stack::member member{object, "account_data"};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
auto it(m::sync::item::map.find("account_data"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
item->polylog(data);
}
{
json::stack::member member{object, "rooms"};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
auto it(m::sync::item::map.find("rooms"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
item->polylog(data);
}
{
json::stack::member member{object, "presence"};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
auto it(m::sync::item::map.find("presence"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
item->polylog(data);
}
{
json::stack::member member
{
object, "next_batch", json::value(lex_cast(int64_t(data.current + 1)), json::STRING)
};
}
log::info
{
log, "polylog %s %s %s wc:%zu in %s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
pretty(iecbuf, iec(data.stats.flush_bytes)),
data.stats.flush_count,
ircd::pretty(tmbuf, data.stats.timer.at<milliseconds>(), true)
};
return data.committed();
}
catch(const std::exception &e)
{
log::error
{
log, "polylog sync FAILED %lu to %lu (vm @ %zu) :%s"
,data.since
,data.current
,m::vm::current_sequence
,e.what()
};
throw;
}

View file

@ -58,9 +58,9 @@ namespace ircd::m::sync::longpoll
static std::string sync_room(client &, const m::room &, const args &, const accepted &);
static std::string sync_rooms(client &, const m::user::id &, const m::room &, const args &, const accepted &);
static bool handle(client &, const args &, const accepted &, const m::room &);
static bool handle(client &, const args &, const accepted &);
static bool poll(client &, const args &);
static bool handle(client &, data &, const args &, const accepted &, const m::room &);
static bool handle(client &, data &, const args &, const accepted &);
static bool poll(client &, data &, const args &);
static void handle_notify(const m::event &, m::vm::eval &);
extern m::hookfn<m::vm::eval &> notified;

View file

@ -17,6 +17,7 @@ IRCD_MODULE
namespace ircd::m::sync
{
static bool account_data_polylog(data &);
extern item account_data;
}
@ -30,34 +31,42 @@ ircd::m::sync::account_data
bool
ircd::m::sync::account_data_polylog(data &data)
{
json::stack::object out{*data.member};
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::room::state state{data.user_room};
// Open an object
json::stack::object object
{
data.out
};
json::stack::array array
{
data.out, "events"
};
const m::room::state &state{data.user_state};
state.for_each("ircd.account_data", [&data, &array]
(const m::event &event)
{
const auto &event_idx(index(event, std::nothrow));
if(event_idx < data.since || event_idx > data.current)
// Ignore events outside this sync window
if(!apropos(data, event))
return;
json::stack::object object{array};
// Each account_data event is an object in the events array
json::stack::object object
{
array
};
// type
json::stack::member
{
json::stack::member member
{
object, "type", at<"state_key"_>(event)
};
}
object, "type", at<"state_key"_>(event)
};
// content
json::stack::member
{
json::stack::member member
{
object, "content", at<"content"_>(event)
};
}
object, "content", at<"content"_>(event)
};
});
return true;

View file

@ -16,6 +16,7 @@ IRCD_MODULE
namespace ircd::m::sync
{
static void presence_events_polylog(data &);
static bool presence_polylog(data &);
static bool presence_linear(data &);
extern item presence;
@ -40,7 +41,7 @@ ircd::m::sync::presence_linear(data &data)
json::stack::object object
{
*data.array
data.out
};
// sender
@ -67,12 +68,21 @@ ircd::m::sync::presence_linear(data &data)
bool
ircd::m::sync::presence_polylog(data &data)
{
json::stack::object out{*data.member};
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::user::mitsein mitsein
json::stack::object object
{
data.user
data.out
};
presence_events_polylog(data);
return true;
}
void
ircd::m::sync::presence_events_polylog(data &data)
{
json::stack::array array
{
data.out, "events"
};
ctx::mutex mutex;
@ -83,7 +93,10 @@ ircd::m::sync::presence_polylog(data &data)
// contended during a json::stack flush to the client; not during database
// queries leading to this.
const std::lock_guard<decltype(mutex)> l{mutex};
json::stack::object object{array};
json::stack::object object
{
array
};
// sender
json::stack::member
@ -104,36 +117,27 @@ ircd::m::sync::presence_polylog(data &data)
};
}};
const auto each_user{[&data, &closure]
(const m::user::id &user_id)
{
const m::user user{user_id};
const m::user::room user_room{user};
//TODO: can't check event_idx cuz only closed presence content
if(head_idx(std::nothrow, user_room) > data.since)
m::presence::get(std::nothrow, user, closure);
}};
mitsein.for_each("join", each_user);
/*
//TODO: conf
static const size_t fibers(24);
string_view q[fibers];
char buf[fibers][256];
static const size_t fibers(16);
std::array<string_view, fibers> q;
std::array<char[256], fibers> buf;
ctx::parallel<string_view> parallel
{
meepool, q, each_user
m::sync::pool, q, [&data, &closure](const auto &user_id)
{
const m::user user{user_id};
const m::user::room user_room{user};
//TODO: can't check event_idx cuz only closed presence content
if(head_idx(std::nothrow, user_room) > data.since)
m::presence::get(std::nothrow, user, closure);
}
};
const auto paraclosure{[&parallel, &q, &buf]
(const m::user &u)
const m::user::mitsein mitsein{data.user};
mitsein.for_each("join", [&parallel, &q, &buf]
(const m::user &user)
{
assert(parallel.snd < fibers);
strlcpy(buf[parallel.snd], string_view{u.user_id});
q[parallel.snd] = buf[parallel.snd];
q[parallel.snd] = strlcpy(buf[parallel.snd], user.user_id);
parallel();
}};
*/
// mitsein.for_each("join", paraclosure);
return true;
});
}

View file

@ -16,8 +16,8 @@ IRCD_MODULE
namespace ircd::m::sync
{
static void _rooms_polylog_room(data &, json::stack::object &out, const m::room &);
static void _rooms_polylog(data &, json::stack::object &out, const string_view &membership);
static void _rooms_polylog_room(data &, const m::room &);
static void _rooms_polylog(data &, const string_view &membership);
static bool rooms_polylog(data &);
static bool rooms_linear(data &);
extern item rooms;
@ -40,35 +40,24 @@ ircd::m::sync::rooms_linear(data &data)
bool
ircd::m::sync::rooms_polylog(data &data)
{
json::stack::object object{*data.member};
_rooms_polylog(data, object, "invite");
_rooms_polylog(data, object, "join");
_rooms_polylog(data, object, "leave");
_rooms_polylog(data, object, "ban");
json::stack::object object{data.out};
_rooms_polylog(data, "invite");
_rooms_polylog(data, "join");
_rooms_polylog(data, "leave");
_rooms_polylog(data, "ban");
return true;
}
void
ircd::m::sync::_rooms_polylog(data &data,
json::stack::object &out,
const string_view &membership)
{
const scope_restore<decltype(data.membership)> theirs
json::stack::object object
{
data.membership, membership
data.out, membership
};
json::stack::member rooms_member
{
out, membership
};
json::stack::object rooms_object
{
rooms_member
};
data.user_rooms.for_each(membership, [&data, &rooms_object]
data.user_rooms.for_each(membership, [&data]
(const m::room &room, const string_view &membership)
{
if(head_idx(std::nothrow, room) <= data.since)
@ -83,21 +72,21 @@ ircd::m::sync::_rooms_polylog(data &data,
// This scope ensures the object destructs and flushes before
// the log message tallying the stats for this room below.
{
json::stack::member member{rooms_object, room.room_id};
json::stack::object object{member};
_rooms_polylog_room(data, object, room);
json::stack::object object
{
data.out, room.room_id
};
_rooms_polylog_room(data, room);
}
#ifdef RB_DEBUG
thread_local char iecbuf[64], rembuf[128], tmbuf[32];
thread_local char tmbuf[32];
log::debug
{
log, "polylog %s %s %s %s wc:%zu in %s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
log, "polylog %s %s in %s",
loghead(data),
string_view{room.room_id},
pretty(iecbuf, iec(data.stats.flush_bytes - stats.flush_bytes)),
data.stats.flush_count - stats.flush_count,
ircd::pretty(tmbuf, stats.timer.at<milliseconds>(), true)
};
#endif
@ -106,7 +95,6 @@ ircd::m::sync::_rooms_polylog(data &data,
void
ircd::m::sync::_rooms_polylog_room(data &data,
json::stack::object &out,
const m::room &room)
try
{
@ -115,113 +103,24 @@ try
data.room, &room
};
// state
m::sync::for_each("rooms", [&](item &item)
{
auto it(m::sync::item::map.find("rooms.$membership.$room_id.state"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member
{
out, data.membership != "invite"?
"state":
"invite_state"
data.out, item.member_name()
};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
// timeline
{
auto it(m::sync::item::map.find("rooms.$membership.$room_id.timeline"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member{out, "timeline"};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
// ephemeral
{
auto pit
{
m::sync::item::map.equal_range("rooms...ephemeral")
};
assert(pit.first != pit.second);
json::stack::member member{out, "ephemeral"};
json::stack::object object{member};
const scope_restore<decltype(data.object)> theirs
{
data.object, &object
};
{
json::stack::member member{object, "events"};
json::stack::array array{member};
const scope_restore<decltype(data.array)> theirs
{
data.array, &array
};
for(; pit.first != pit.second; ++pit.first)
{
const auto &item(pit.first->second);
assert(item);
item->polylog(data);
}
}
}
// account_data
{
auto it(m::sync::item::map.find("rooms...account_data"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member{out, "account_data"};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
// unread_notifications
{
auto it(m::sync::item::map.find("rooms...unread_notifications"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member{out, "unread_notifications"};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
item.polylog(data);
return true;
});
}
catch(const json::not_found &e)
{
log::critical
{
log, "polylog sync room %s error %lu to %lu (vm @ %zu) :%s"
log, "polylog %s room %s error :%s"
,loghead(data)
,string_view{room.room_id}
,data.since
,data.current
,m::vm::current_sequence
,e.what()
};
}

View file

@ -16,6 +16,7 @@ IRCD_MODULE
namespace ircd::m::sync
{
static bool room_account_data_events_polylog(data &);
static bool room_account_data_polylog(data &);
extern item room_account_data;
}
@ -23,16 +24,30 @@ namespace ircd::m::sync
decltype(ircd::m::sync::room_account_data)
ircd::m::sync::room_account_data
{
"rooms...account_data",
"rooms.account_data",
room_account_data_polylog
};
bool
ircd::m::sync::room_account_data_polylog(data &data)
{
json::stack::object out{*data.member};
json::stack::member member{out, "events"};
json::stack::array array{member};
json::stack::object object
{
data.out
};
room_account_data_events_polylog(data);
return true;
}
bool
ircd::m::sync::room_account_data_events_polylog(data &data)
{
json::stack::array array
{
data.out, "events"
};
const m::room::state state
{
data.user_room
@ -47,11 +62,13 @@ ircd::m::sync::room_account_data_polylog(data &data)
state.for_each(type, [&data, &array]
(const m::event &event)
{
const auto &event_idx(index(event, std::nothrow));
if(event_idx < data.since || event_idx >= data.current)
if(!apropos(data, event))
return;
json::stack::object object{array};
json::stack::object object
{
data.out
};
json::stack::member
{

View file

@ -0,0 +1,67 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Rooms Ephemeral"
};
namespace ircd::m::sync
{
static bool rooms_ephemeral_events_polylog(data &);
static bool rooms_ephemeral_polylog(data &);
static bool rooms_ephemeral_linear(data &);
extern item rooms_ephemeral;
}
decltype(ircd::m::sync::rooms_ephemeral)
ircd::m::sync::rooms_ephemeral
{
"rooms.ephemeral",
rooms_ephemeral_polylog,
rooms_ephemeral_linear
};
bool
ircd::m::sync::rooms_ephemeral_linear(data &data)
{
return true;
}
bool
ircd::m::sync::rooms_ephemeral_polylog(data &data)
{
json::stack::object object
{
data.out
};
rooms_ephemeral_events(data);
return true;
}
bool
ircd::m::sync::rooms_ephemeral_events_polylog(data &data)
{
json::stack::array array
{
data.out, "events"
};
m::sync::for_each("rooms.ephemeral", [&]
(item &item)
{
item.polylog(data);
return true;
});
return true;
}

View file

@ -16,6 +16,9 @@ IRCD_MODULE
namespace ircd::m::sync
{
static void _reformat_receipt(json::stack::object &, const m::event &);
static void _handle_receipt(data &, const m::event &);
static void _handle_user(data &, const m::user &);
static bool room_ephemeral_m_receipt_m_read_polylog(data &);
extern item room_ephemeral_m_receipt_m_read;
}
@ -23,7 +26,7 @@ namespace ircd::m::sync
decltype(ircd::m::sync::room_ephemeral_m_receipt_m_read)
ircd::m::sync::room_ephemeral_m_receipt_m_read
{
"rooms...ephemeral",
"rooms.ephemeral.m_receipt",
room_ephemeral_m_receipt_m_read_polylog
};
@ -31,69 +34,89 @@ bool
ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data)
{
const m::room &room{*data.room};
const m::room::members members{room};
const m::room::members::closure closure{[&]
const m::room::members members
{
room
};
members.for_each(data.membership, m::room::members::closure{[&data]
(const m::user::id &user_id)
{
static const m::event::fetch::opts fopts
{
m::event::keys::include
{
"event_id",
"content",
"sender",
},
};
const m::user user{user_id};
m::user::room user_room{user};
user_room.fopts = &fopts;
if(head_idx(std::nothrow, user_room) <= data.since)
return;
user_room.get(std::nothrow, "ircd.read", room.room_id, [&]
(const m::event &event)
{
const auto &event_idx(index(event, std::nothrow));
if(event_idx < data.since || event_idx >= data.current)
return;
data.commit();
json::stack::object object{*data.array};
// type
json::stack::member
{
object, "type", "m.receipt"
};
// content
const json::object data
{
at<"content"_>(event)
};
thread_local char buf[1024];
const json::members reformat
{
{ unquote(data.at("event_id")),
{
{ "m.read",
{
{ at<"sender"_>(event),
{
{ "ts", data.at("ts") }
}}
}}
}}
};
json::stack::member
{
object, "content", json::stringify(mutable_buffer{buf}, reformat)
};
});
}};
_handle_user(data, user_id);
}});
return true;
}
void
ircd::m::sync::_handle_user(data &data,
const m::user &user)
{
static const m::event::fetch::opts fopts
{
m::event::keys::include
{
"event_id", "content", "sender",
},
};
m::user::room user_room{user};
user_room.fopts = &fopts;
if(head_idx(std::nothrow, user_room) < data.since)
return;
const m::room::id &room_id{*data.room};
user_room.get(std::nothrow, "ircd.read", room_id, [&data]
(const m::event &event)
{
if(apropos(data, event))
_handle_receipt(data, event);
});
}
void
ircd::m::sync::_handle_receipt(data &data,
const m::event &event)
{
const json::object content
{
at<"content"_>(event)
};
json::stack::object object
{
data.out
};
// type
json::stack::member
{
object, "type", "m.receipt"
};
// content
json::stack::object content_
{
object, "content"
};
json::stack::object event_id_
{
content_, unquote(content.at("event_id"))
};
json::stack::object m_read_
{
event_id_, "m.read"
};
json::stack::object sender_
{
m_read_, at<"sender"_>(event)
};
json::stack::member
{
sender_, "ts", json::value(content.at("ts"))
};
}

View file

@ -16,19 +16,36 @@ IRCD_MODULE
namespace ircd::m::sync
{
static bool _room_state_polylog_events(data &);
static bool room_state_polylog(data &);
static bool room_state_linear(data &);
extern const event::keys::include _default_keys;
extern item room_state;
}
decltype(ircd::m::sync::room_state)
ircd::m::sync::room_state
{
"rooms.$membership.$room_id.state",
"rooms.state",
room_state_polylog,
room_state_linear
};
decltype(ircd::m::sync::_default_keys)
ircd::m::sync::_default_keys
{
"content",
"depth",
"event_id",
"origin_server_ts",
"redacts",
"room_id",
"sender",
"state_key",
"type",
};
bool
ircd::m::sync::room_state_linear(data &data)
{
@ -42,70 +59,65 @@ ircd::m::sync::room_state_linear(data &data)
if(!data.room->membership(data.user, data.membership))
return false;
data.array->append(*data.event);
//data.array->append(*data.event);
return true;
}
bool
ircd::m::sync::room_state_polylog(data &data)
{
static const m::event::keys::include default_keys
json::stack::object object
{
"content",
"depth",
"event_id",
"origin_server_ts",
"redacts",
"room_id",
"sender",
"state_key",
"type",
data.out
};
static const m::event::fetch::opts fopts
{
default_keys
};
json::stack::object out{*data.member};
json::stack::member member
{
out, "events"
};
return _room_state_polylog_events(data);
}
bool
ircd::m::sync::_room_state_polylog_events(data &data)
{
json::stack::array array
{
member
data.out, "events"
};
ctx::mutex mutex;
const event::closure_idx each_idx{[&data, &array, &mutex]
(const m::event::idx &event_idx)
{
assert(event_idx);
static const m::event::fetch::opts fopts
{
_default_keys
};
const event::fetch event
{
event_idx, std::nothrow, &fopts
};
if(!event.valid || at<"depth"_>(event) >= int64_t(data.state_at))
if(!event.valid)
return;
const std::lock_guard<decltype(mutex)> lock{mutex};
array.append(event);
data.commit();
}};
const event::closure_idx _each_idx{[&data, &each_idx]
(const m::event::idx &event_idx)
//TODO: conf
std::array<event::idx, 8> md;
ctx::parallel<event::idx> parallel
{
assert(event_idx);
if(event_idx >= data.since && event_idx <= data.current)
each_idx(event_idx);
}};
m::sync::pool, md, each_idx
};
const m::room &room{*data.room};
const m::room::state state{room};
state.for_each(_each_idx);
state.for_each([&data, &parallel]
(const m::event::idx &event_idx)
{
if(apropos(data, event_idx))
parallel(event_idx);
});
return true;
}

View file

@ -16,7 +16,7 @@ IRCD_MODULE
namespace ircd::m::sync
{
static event::id::buf _room_timeline_events(data &, json::stack::array &, const m::room &, bool &);
static event::id::buf _room_timeline_events(data &, const m::room &, bool &);
static bool room_timeline_polylog(data &);
extern item room_timeline;
}
@ -24,34 +24,35 @@ namespace ircd::m::sync
decltype(ircd::m::sync::room_timeline)
ircd::m::sync::room_timeline
{
"rooms.$membership.$room_id.timeline",
"rooms.timeline",
room_timeline_polylog
};
bool
ircd::m::sync::room_timeline_polylog(data &data)
{
json::stack::object out{*data.member};
json::stack::object object
{
data.out
};
// events
bool limited{false};
m::event::id::buf prev;
m::event::id::buf prev
{
json::stack::member member{out, "events"};
json::stack::array array{member};
prev = _room_timeline_events(data, array, *data.room, limited);
}
_room_timeline_events(data, *data.room, limited)
};
// prev_batch
json::stack::member
{
out, "prev_batch", string_view{prev}
object, "prev_batch", string_view{prev}
};
// limited
json::stack::member
{
out, "limited", json::value{limited}
object, "limited", json::value{limited}
};
return true;
@ -59,10 +60,14 @@ ircd::m::sync::room_timeline_polylog(data &data)
ircd::m::event::id::buf
ircd::m::sync::_room_timeline_events(data &data,
json::stack::array &out,
const m::room &room,
bool &limited)
{
json::stack::array array
{
data.out, "events"
};
static const m::event::fetch::opts fopts
{
m::event::keys::include
@ -112,13 +117,13 @@ ircd::m::sync::_room_timeline_events(data &data,
if(i > 0 && it)
{
const m::event &event{*it};
data.state_at = at<"depth"_>(event);
//const m::event &event{*it};
//data.state_at = at<"depth"_>(event);
}
if(i > 0)
for(; it && i > -1; ++it, --i)
out.append(*it);
array.append(*it);
return event_id;
}

View file

@ -26,7 +26,7 @@ namespace ircd::m::sync
decltype(ircd::m::sync::room_unread_notifications)
ircd::m::sync::room_unread_notifications
{
"rooms...unread_notifications",
"rooms.unread_notifications",
room_unread_notifications_polylog,
room_unread_notifications_linear
};
@ -45,7 +45,11 @@ ircd::m::sync::room_unread_notifications_polylog(data &data)
if(!m::receipt::read(last_read, room.room_id, data.user))
return false;
json::stack::object out{*data.member};
json::stack::object out
{
data.out
};
const auto last_read_idx
{
index(last_read)