0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-25 23:14:13 +01:00

ircd:Ⓜ️ Preliminary modular client sync system.

This commit is contained in:
Jason Volk 2019-01-03 17:21:02 -08:00
parent ab121835af
commit 86911226ed
13 changed files with 1311 additions and 723 deletions

View file

@ -59,6 +59,7 @@ namespace ircd::m::vm
#include "visible.h"
#include "feds.h"
#include "app.h"
#include "sync.h"
struct ircd::m::init
{

109
include/ircd/m/sync.h Normal file
View file

@ -0,0 +1,109 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_M_SYNC_H
namespace ircd
{
struct client;
}
namespace ircd::m::sync
{
struct args;
struct stats;
struct data;
struct item;
struct response;
extern log::log log;
}
struct ircd::m::sync::item
:instance_multimap<std::string, item, std::less<>>
{
using handle = std::function<bool (data &)>;
handle _polylog;
handle _linear;
handle _longpoll;
public:
string_view name() const;
bool polylog(data &);
bool linear(data &, const m::event &);
bool longpoll(data &, const m::event &);
item(std::string name,
handle polylog = {},
handle linear = {},
handle longpoll = {});
item(item &&) = delete;
item(const item &) = delete;
~item() noexcept;
};
struct ircd::m::sync::data
{
sync::stats &stats;
ircd::client &client;
// Range related
const uint64_t &since;
const uint64_t current;
const uint64_t delta;
// User related
const m::user user;
const m::user::room user_room;
const m::user::rooms user_rooms;
// Filter to use
const std::string filter_buf;
const m::filter filter;
// response state
const std::unique_ptr<response> resp;
json::stack out;
bool committed() const;
bool commit();
// apropos contextual
ctx::mutex write_mutex;
json::stack::member *member {nullptr};
json::stack::object *object {nullptr};
json::stack::array *array {nullptr};
const m::event *event {nullptr};
const m::room *room {nullptr};
string_view membership;
// unsorted / misc
uint64_t state_at {0};
data(sync::stats &stats,
ircd::client &client,
const m::user &user,
const std::pair<event::idx, event::idx> &range,
const string_view &filter_id);
data(data &&) = delete;
data(const data &) = delete;
~data() noexcept;
};
struct ircd::m::sync::stats
{
ircd::timer timer;
size_t flush_bytes {0};
size_t flush_count {0};
};

View file

@ -389,6 +389,333 @@ ircd::m::self::init::init(const string_view &origin)
};
}
///////////////////////////////////////////////////////////////////////////////
//
// m/sync.h
//
decltype(ircd::m::sync::log)
ircd::m::sync::log
{
"sync", 's'
};
//
// response
//
struct ircd::m::sync::response
{
static conf::item<size_t> flush_hiwat;
sync::stats &stats;
ircd::client &client;
unique_buffer<mutable_buffer> buf;
std::unique_ptr<resource::response::chunked> resp;
bool committed;
const_buffer flush(const const_buffer &buf);
void commit();
response(sync::stats &stats, ircd::client &client);
~response() noexcept;
};
decltype(ircd::m::sync::response::flush_hiwat)
ircd::m::sync::response::flush_hiwat
{
{ "name", "ircd.m.sync.flush.hiwat" },
{ "default", long(32_KiB) },
};
//
// response::response
//
ircd::m::sync::response::response(sync::stats &stats,
ircd::client &client)
:stats
{
stats
}
,client
{
client
}
,buf
{
std::max(size_t(96_KiB), size_t(flush_hiwat))
}
,committed
{
false
}
{
}
ircd::m::sync::response::~response()
noexcept
{
}
ircd::const_buffer
ircd::m::sync::response::flush(const const_buffer &buf)
{
if(!committed)
return buf;
if(!resp)
commit();
stats.flush_bytes += resp->write(buf);
stats.flush_count++;
return buf;
}
void
ircd::m::sync::response::commit()
{
static const string_view content_type
{
"application/json; charset=utf-8"
};
assert(!resp);
resp = std::make_unique<resource::response::chunked>
(
client, http::OK, content_type
);
}
//
// data
//
ircd::m::sync::data::data(sync::stats &stats,
ircd::client &client,
const m::user &user,
const std::pair<event::idx, event::idx> &range,
const string_view &filter_id)
:stats{stats}
,client{client}
,since
{
range.first
}
,current
{
range.second
}
,delta
{
current - since
}
,user
{
user
}
,user_room
{
user
}
,user_rooms
{
user
}
,filter_buf
{
filter_id?
user.filter(std::nothrow, filter_id):
std::string{}
}
,filter
{
json::object{filter_buf}
}
,resp
{
std::make_unique<response>(stats, client)
}
,out
{
resp->buf, std::bind(&response::flush, resp.get(), ph::_1), size_t(resp->flush_hiwat)
}
{
}
ircd::m::sync::data::~data()
noexcept
{
}
bool
ircd::m::sync::data::commit()
{
assert(resp);
const auto ret{resp->committed};
resp->committed = true;
return ret;
}
bool
ircd::m::sync::data::committed()
const
{
assert(resp);
return resp->committed;
}
//
// item
//
template<>
decltype(ircd::m::sync::item::instance_multimap::map)
ircd::m::sync::item::instance_multimap::map
{};
//
// item::item
//
ircd::m::sync::item::item(std::string name,
handle polylog,
handle linear,
handle longpoll)
:instance_multimap
{
std::move(name)
}
,_polylog
{
std::move(polylog)
}
,_linear
{
std::move(linear)
}
,_longpoll
{
std::move(longpoll)
}
{
log::debug
{
log, "Registered sync item(%p) '%s'",
this,
this->name()
};
}
ircd::m::sync::item::~item()
noexcept
{
log::debug
{
log, "Unregistered sync item(%p) '%s'",
this,
this->name()
};
}
bool
ircd::m::sync::item::longpoll(data &data,
const m::event &event)
try
{
const auto ret
{
_longpoll(data)
};
return ret;
}
catch(const std::bad_function_call &)
{
return false;
}
bool
ircd::m::sync::item::linear(data &data,
const m::event &event)
try
{
const scope_restore<decltype(data.event)> theirs
{
data.event, &event
};
const auto ret
{
_linear(data)
};
return ret;
}
catch(const std::bad_function_call &)
{
return false;
}
bool
ircd::m::sync::item::polylog(data &data)
try
{
#ifdef RB_DEBUG
sync::stats stats{data.stats};
stats.timer = {};
#endif
const auto ret
{
_polylog(data)
};
#ifdef RB_DEBUG
thread_local char rembuf[128], iecbuf[64], tmbuf[32];
log::debug
{
log, "polylog %s %s '%s' %s wc:%zu in %s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
name(),
ircd::pretty(iecbuf, iec(data.stats.flush_bytes - stats.flush_bytes)),
data.stats.flush_count - stats.flush_count,
ircd::pretty(tmbuf, stats.timer.at<microseconds>(), true)
};
#endif
return ret;
}
catch(const std::bad_function_call &)
{
return false;
}
catch(const std::exception &e)
{
thread_local char rembuf[128], iecbuf[64], tmbuf[32];
log::derror
{
log, "polylog %s %s '%s' %s wc:%zu in %s :%s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
name(),
ircd::pretty(iecbuf, iec(data.stats.flush_bytes)),
data.stats.flush_count,
ircd::pretty(tmbuf, data.stats.timer.at<milliseconds>(), true),
e.what()
};
throw;
}
ircd::string_view
ircd::m::sync::item::name()
const
{
return this->instance_multimap::it->first;
}
///////////////////////////////////////////////////////////////////////////////
//
// m/feds.h

View file

@ -258,6 +258,28 @@ client_module_LTLIBRARIES += \
client/client_thirdparty_protocols.la \
###
#
# client/sync/
#
client_client_sync_account_data_la_SOURCES = client/sync/account_data.cc
client_client_sync_presence_la_SOURCES = client/sync/presence.cc
client_client_sync_rooms_account_data_la_SOURCES = client/sync/rooms/account_data.cc
client_client_sync_rooms_receipt_la_SOURCES = client/sync/rooms/receipt.cc
client_client_sync_rooms_state_la_SOURCES = client/sync/rooms/state.cc
client_client_sync_rooms_timeline_la_SOURCES = client/sync/rooms/timeline.cc
client_client_sync_rooms_unread_notifications_la_SOURCES = client/sync/rooms/unread_notifications.cc
client_module_LTLIBRARIES += \
client/client_sync_account_data.la \
client/client_sync_presence.la \
client/client_sync_rooms_account_data.la \
client/client_sync_rooms_receipt.la \
client/client_sync_rooms_state.la \
client/client_sync_rooms_timeline.la \
client/client_sync_rooms_unread_notifications.la \
###
###############################################################################
#
# /_matrix/key/

View file

@ -16,12 +16,6 @@ IRCD_MODULE
"Client 6.2.1 :Sync"
};
decltype(ircd::m::sync::log)
ircd::m::sync::log
{
"sync", 's'
};
decltype(ircd::m::sync::resource)
ircd::m::sync::resource
{
@ -62,13 +56,6 @@ ircd::m::sync::args::timeout_default
{ "default", 10 * 1000L },
};
ircd::conf::item<size_t>
ircd::m::sync::shortpoll::flush_hiwat
{
{ "name", "ircd.client.sync.flush.hiwat" },
{ "default", long(24_KiB) },
};
//
// GET sync
//
@ -93,20 +80,27 @@ try
request
};
shortpoll sp
const std::pair<event::idx, event::idx> range
{
client, args
args.since, // start at since token
m::vm::current_sequence // stop at present
};
if(sp.since > sp.current)
stats stats;
data data
{
stats, client, request.user_id, range, args.filter_id
};
if(data.since > data.current + 1)
throw m::NOT_FOUND
{
"Since parameter is in the future..."
"Since parameter is too far in the future..."
};
json::stack::object top
{
sp.out
data.out
};
const size_t linear_delta_max
@ -116,11 +110,11 @@ try
const bool shortpolled
{
sp.delta == 0?
data.delta == 0?
false:
sp.delta > linear_delta_max?
polylog::handle(client, sp, top):
linear::handle(client, sp, top)
data.delta > linear_delta_max?
polylog::handle(client, data, top):
linear::handle(client, data, top)
};
// When shortpoll was successful, do nothing else.
@ -456,7 +450,7 @@ ircd::m::sync::linear::delta_max
bool
ircd::m::sync::linear::handle(client &client,
shortpoll &sp,
data &sp,
json::stack::object &object)
{
uint64_t since
@ -485,7 +479,7 @@ ircd::m::sync::linear::handle(client &client,
json::get<"room_id"_>(event)
};
if(!room.membership(sp.args.request.user_id))
if(!room.membership(sp.user.user_id))
return true;
auto it
@ -631,119 +625,122 @@ ircd::m::sync::linear::handle(client &client,
return true;
}
static long
ircd::m::sync::notification_count(const m::room &room,
const m::event::idx &a,
const m::event::idx &b)
{
return m::count_since(room, a, a < b? b : a);
}
static long
ircd::m::sync::highlight_count(const m::room &r,
const m::user &u,
const m::event::idx &a,
const m::event::idx &b)
{
using namespace ircd::m;
using proto = size_t (const user &, const room &, const event::idx &, const event::idx &);
static mods::import<proto> count
{
"m_user", "highlighted_count__between"
};
return count(u, r, a, a < b? b : a);
}
//
// polylog
//
ircd::conf::item<bool>
ircd::m::sync::polylog::prefetch_state
namespace ircd
{
{ "name", "ircd.client.sync.polylog.prefetch.state" },
{ "default", false },
};
ctx::pool::opts mepool_opts
{
256_KiB
};
ircd::conf::item<bool>
ircd::m::sync::polylog::prefetch_timeline
{
{ "name", "ircd.client.sync.polylog.prefetch.timeline" },
{ "default", false },
ctx::pool mepool
{
"me pool", mepool_opts
};
};
bool
ircd::m::sync::polylog::handle(client &client,
shortpoll &sp,
data &data,
json::stack::object &object)
try
{
// Generate individual stats for sections
thread_local char iecbuf[64], rembuf[128];
sync::stats stats{sp.stats};
thread_local char iecbuf[64], rembuf[128], tmbuf[32];
sync::stats stats{data.stats};
stats.timer = timer{};
{
json::stack::member member{object, "account_data"};
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
auto it(m::sync::item::map.find("account_data"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
item->polylog(data);
}
{
json::stack::member member{object, "rooms"};
json::stack::object object{member};
rooms(sp, object);
sync_rooms(data, object, "invite");
sync_rooms(data, object, "join");
sync_rooms(data, object, "leave");
sync_rooms(data, object, "ban");
}
#ifdef RB_DEBUG
log::debug
{
log, "polylog %s %s rooms %s wc:%zu in %lu$ms",
string(rembuf, ircd::remote(sp.client)),
string_view{sp.request.user_id},
pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)),
sp.stats.flush_count - stats.flush_count,
stats.timer.at<milliseconds>().count()
};
stats = sync::stats{sp.stats};
stats.timer = timer{};
#endif
{
json::stack::member member{object, "presence"};
json::stack::object object{member};
presence(sp, object);
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
auto it(m::sync::item::map.find("presence"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
item->polylog(data);
}
#ifdef RB_DEBUG
log::debug
{
log, "polylog %s %s presence %s wc:%zu in %lu$ms",
string(rembuf, ircd::remote(sp.client)),
string_view{sp.request.user_id},
pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)),
sp.stats.flush_count - stats.flush_count,
stats.timer.at<milliseconds>().count()
};
stats = sync::stats{sp.stats};
stats.timer = timer{};
#endif
{
json::stack::member member{object, "account_data"};
json::stack::object object{member};
account_data(sp, object);
}
#ifdef RB_DEBUG
log::debug
{
log, "polylog %s %s account_data %s wc:%zu in %lu$ms",
string(rembuf, ircd::remote(sp.client)),
string_view{sp.request.user_id},
pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)),
sp.stats.flush_count - stats.flush_count,
stats.timer.at<milliseconds>().count()
};
#endif
{
json::stack::member member
{
object, "next_batch", json::value(lex_cast(int64_t(sp.current)), json::STRING)
object, "next_batch", json::value(lex_cast(int64_t(data.current)), json::STRING)
};
}
log::info
{
log, "polylog %s %s %s wc:%zu in %lu$ms",
string(rembuf, ircd::remote(sp.client)),
string_view{sp.request.user_id},
pretty(iecbuf, iec(sp.stats.flush_bytes)),
sp.stats.flush_count,
sp.stats.timer.at<milliseconds>().count()
log, "polylog %s %s %s wc:%zu in %s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
pretty(iecbuf, iec(data.stats.flush_bytes)),
data.stats.flush_count,
ircd::pretty(tmbuf, data.stats.timer.at<milliseconds>(), true)
};
return sp.committed;
return data.committed();
}
catch(const std::exception &e)
{
log::error
{
log, "polylog sync FAILED %lu to %lu (vm @ %zu) :%s"
,sp.since
,sp.current
,data.since
,data.current
,m::vm::current_sequence
,e.what()
};
@ -752,137 +749,34 @@ catch(const std::exception &e)
}
void
ircd::m::sync::polylog::presence(shortpoll &sp,
json::stack::object &out)
{
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::user::mitsein mitsein
{
sp.user
};
mitsein.for_each("join", [&sp, &array]
(const m::user &user)
{
const m::user::room user_room{user};
if(head_idx(std::nothrow, user_room) <= sp.since)
return;
//TODO: can't check event_idx cuz only closed presence content
m::presence::get(std::nothrow, user, [&sp, &array]
(const json::object &event)
{
json::stack::object object{array};
// sender
{
json::stack::member member
{
object, "sender", unquote(event.get("user_id"))
};
}
// type
{
json::stack::member member
{
object, "type", json::value{"m.presence"}
};
}
// content
{
json::stack::member member
{
object, "content", event
};
}
});
});
}
void
ircd::m::sync::polylog::account_data(shortpoll &sp,
json::stack::object &out)
try
{
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::room::state state
{
sp.user_room
};
state.for_each("ircd.account_data", [&sp, &array]
(const m::event &event)
{
const auto &event_idx
{
index(event, std::nothrow)
};
if(event_idx < sp.since || event_idx >= sp.current)
return;
json::stack::object object{array};
// type
{
json::stack::member member
{
object, "type", at<"state_key"_>(event)
};
}
// content
{
json::stack::member member
{
object, "content", at<"content"_>(event)
};
}
});
}
catch(const json::not_found &e)
{
log::critical
{
log, "polylog sync account data error %lu to %lu (vm @ %zu) :%s"
,sp.since
,sp.current
,m::vm::current_sequence
,e.what()
};
}
void
ircd::m::sync::polylog::rooms(shortpoll &sp,
json::stack::object &object)
{
sync_rooms(sp, object, "invite");
sync_rooms(sp, object, "join");
sync_rooms(sp, object, "leave");
sync_rooms(sp, object, "ban");
}
void
ircd::m::sync::polylog::sync_rooms(shortpoll &sp,
ircd::m::sync::polylog::sync_rooms(data &data,
json::stack::object &out,
const string_view &membership)
{
json::stack::member rooms_member{out, membership};
json::stack::object rooms_object{rooms_member};
sp.rooms.for_each(membership, [&sp, &rooms_object]
const scope_restore<decltype(data.membership)> theirs
{
data.membership, membership
};
json::stack::member rooms_member
{
out, membership
};
json::stack::object rooms_object
{
rooms_member
};
data.user_rooms.for_each(membership, [&data, &rooms_object]
(const m::room &room, const string_view &membership)
{
if(head_idx(std::nothrow, room) <= sp.since)
if(head_idx(std::nothrow, room) <= data.since)
return;
// Generate individual stats for this room's sync
#ifdef RB_DEBUG
sync::stats stats{sp.stats};
sync::stats stats{data.stats};
stats.timer = timer{};
#endif
@ -891,71 +785,132 @@ ircd::m::sync::polylog::sync_rooms(shortpoll &sp,
{
json::stack::member member{rooms_object, room.room_id};
json::stack::object object{member};
sync_room(sp, object, room, membership);
sync_room(data, object, room);
}
#ifdef RB_DEBUG
thread_local char iecbuf[64], rembuf[128];
thread_local char iecbuf[64], rembuf[128], tmbuf[32];
log::debug
{
log, "polylog %s %s %s %s wc:%zu in %lu$ms",
string(rembuf, ircd::remote(sp.client)),
string_view{sp.request.user_id},
log, "polylog %s %s %s %s wc:%zu in %s",
string(rembuf, ircd::remote(data.client)),
string_view{data.user.user_id},
string_view{room.room_id},
pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)),
sp.stats.flush_count - stats.flush_count,
stats.timer.at<milliseconds>().count()
pretty(iecbuf, iec(data.stats.flush_bytes - stats.flush_bytes)),
data.stats.flush_count - stats.flush_count,
ircd::pretty(tmbuf, stats.timer.at<milliseconds>(), true)
};
#endif
});
}
void
ircd::m::sync::polylog::sync_room(shortpoll &sp,
ircd::m::sync::polylog::sync_room(data &data,
json::stack::object &out,
const m::room &room,
const string_view &membership)
const m::room &room)
try
{
const scope_restore<decltype(data.room)> theirs
{
data.room, &room
};
// timeline
{
auto it(m::sync::item::map.find("rooms...timeline"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member{out, "timeline"};
json::stack::object object{member};
room_timeline(sp, object, room);
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
// state
{
auto it(m::sync::item::map.find("rooms...state"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member
{
out, membership != "invite"?
out, data.membership != "invite"?
"state":
"invite_state"
};
json::stack::object object{member};
room_state(sp, object, room);
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
// ephemeral
{
auto pit
{
m::sync::item::map.equal_range("rooms...ephemeral")
};
assert(pit.first != pit.second);
json::stack::member member{out, "ephemeral"};
json::stack::object object{member};
room_ephemeral(sp, object, room);
const scope_restore<decltype(data.object)> theirs
{
data.object, &object
};
{
json::stack::member member{object, "events"};
json::stack::array array{member};
const scope_restore<decltype(data.array)> theirs
{
data.array, &array
};
for(; pit.first != pit.second; ++pit.first)
{
const auto &item(pit.first->second);
assert(item);
item->polylog(data);
}
}
}
// account_data
{
auto it(m::sync::item::map.find("rooms...account_data"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member{out, "account_data"};
json::stack::object object{member};
room_account_data(sp, object, room);
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
// unread_notifications
{
auto it(m::sync::item::map.find("rooms...unread_notifications"));
assert(it != m::sync::item::map.end());
const auto &item(it->second);
assert(item);
json::stack::member member{out, "unread_notifications"};
json::stack::object object{member};
room_unread_notifications(sp, object, room);
const scope_restore<decltype(data.member)> theirs
{
data.member, &member
};
item->polylog(data);
}
}
catch(const json::not_found &e)
@ -964,362 +919,9 @@ catch(const json::not_found &e)
{
log, "polylog sync room %s error %lu to %lu (vm @ %zu) :%s"
,string_view{room.room_id}
,sp.since
,sp.current
,data.since
,data.current
,m::vm::current_sequence
,e.what()
};
}
void
ircd::m::sync::polylog::room_state(shortpoll &sp,
json::stack::object &out,
const m::room &room)
{
static const m::event::fetch::opts fopts
{
m::event::keys::include
{
"content",
"depth",
"event_id",
"origin_server_ts",
"redacts",
"room_id",
"sender",
"state_key",
"type",
},
};
json::stack::member member
{
out, "events"
};
json::stack::array array
{
member
};
m::room::state state
{
room
};
if(bool(prefetch_state))
state.prefetch(sp.since, sp.current);
state.for_each([&sp, &array]
(const m::event::idx &event_idx)
{
if(event_idx < sp.since || event_idx >= sp.current)
return;
const event::fetch event
{
event_idx, std::nothrow, &fopts
};
if(!event.valid || at<"depth"_>(event) >= int64_t(sp.state_at))
return;
array.append(event);
sp.committed = true;
});
}
void
ircd::m::sync::polylog::room_timeline(shortpoll &sp,
json::stack::object &out,
const m::room &room)
{
// events
bool limited{false};
m::event::id::buf prev;
{
json::stack::member member{out, "events"};
json::stack::array array{member};
prev = room_timeline_events(sp, array, room, limited);
}
// prev_batch
{
json::stack::member member
{
out, "prev_batch", string_view{prev}
};
}
// limited
{
json::stack::member member
{
out, "limited", json::value{limited}
};
}
}
ircd::m::event::id::buf
ircd::m::sync::polylog::room_timeline_events(shortpoll &sp,
json::stack::array &out,
const m::room &room,
bool &limited)
{
static const m::event::fetch::opts fopts
{
m::event::keys::include
{
"content",
"depth",
"event_id",
"origin_server_ts",
"prev_events",
"redacts",
"room_id",
"sender",
"state_key",
"type",
},
};
// messages seeks to the newest event, but the client wants the oldest
// event first so we seek down first and then iterate back up. Due to
// an issue with rocksdb's prefix-iteration this iterator becomes
// toxic as soon as it becomes invalid. As a result we have to copy the
// event_id on the way down in case of renewing the iterator for the
// way back. This is not a big deal but rocksdb should fix their shit.
ssize_t i(0);
m::event::id::buf event_id;
m::room::messages it
{
room, &fopts
};
for(; it && i < 10; --it, ++i)
{
event_id = it.event_id();
if(it.event_idx() < sp.since)
break;
if(it.event_idx() >= sp.current)
break;
if(bool(prefetch_timeline))
m::prefetch(it.event_idx(), fopts);
}
limited = i >= 10;
sp.committed |= i > 0;
if(i > 0 && !it)
it.seek(event_id);
if(i > 0 && it)
{
const m::event &event{*it};
sp.state_at = at<"depth"_>(event);
}
if(i > 0)
for(; it && i > -1; ++it, --i)
out.append(*it);
return event_id;
}
void
ircd::m::sync::polylog::room_ephemeral(shortpoll &sp,
json::stack::object &out,
const m::room &room)
{
{
json::stack::member member{out, "events"};
json::stack::array array{member};
room_ephemeral_events(sp, array, room);
}
}
void
ircd::m::sync::polylog::room_ephemeral_events(shortpoll &sp,
json::stack::array &out,
const m::room &room)
{
const m::room::members members{room};
members.for_each("join", m::room::members::closure{[&]
(const m::user &user)
{
static const m::event::fetch::opts fopts
{
m::event::keys::include
{
"event_id",
"content",
"sender",
},
};
m::user::room user_room{user};
user_room.fopts = &fopts;
if(head_idx(std::nothrow, user_room) <= sp.since)
return;
user_room.get(std::nothrow, "ircd.read", room.room_id, [&]
(const m::event &event)
{
const auto &event_idx
{
index(event, std::nothrow)
};
if(event_idx < sp.since || event_idx >= sp.current)
return;
sp.committed = true;
json::stack::object object{out};
// type
{
json::stack::member member
{
object, "type", "m.receipt"
};
}
// content
{
const json::object data
{
at<"content"_>(event)
};
thread_local char buf[1024];
const json::members reformat
{
{ unquote(data.at("event_id")),
{
{ "m.read",
{
{ at<"sender"_>(event),
{
{ "ts", data.at("ts") }
}}
}}
}}
};
json::stack::member member
{
object, "content", json::stringify(mutable_buffer{buf}, reformat)
};
}
});
}});
}
void
ircd::m::sync::polylog::room_account_data(shortpoll &sp,
json::stack::object &out,
const m::room &room)
{
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::room::state state
{
sp.user_room
};
char typebuf[288]; //TODO: room_account_data_typebuf_size
const auto type
{
m::user::_account_data_type(typebuf, room.room_id)
};
state.for_each(type, [&sp, &array]
(const m::event &event)
{
const auto &event_idx
{
index(event, std::nothrow)
};
if(event_idx < sp.since || event_idx >= sp.current)
return;
json::stack::object object{array};
// type
{
json::stack::member member
{
object, "type", at<"state_key"_>(event)
};
}
// content
{
json::stack::member member
{
object, "content", at<"content"_>(event)
};
}
});
}
void
ircd::m::sync::polylog::room_unread_notifications(shortpoll &sp,
json::stack::object &out,
const m::room &room)
{
m::event::id::buf last_read;
if(!m::receipt::read(last_read, room.room_id, sp.user))
return;
const auto last_read_idx
{
index(last_read)
};
// highlight_count
json::stack::member
{
out, "highlight_count", json::value
{
highlight_count(room, sp.user, last_read_idx, sp.current)
}
};
// notification_count
json::stack::member
{
out, "notification_count", json::value
{
notification_count(room, last_read_idx, sp.current)
}
};
}
long
ircd::m::sync::highlight_count(const room &r,
const user &u,
const event::idx &a,
const event::idx &b)
{
using proto = size_t (const user &, const room &, const event::idx &, const event::idx &);
static mods::import<proto> count
{
"m_user", "highlighted_count__between"
};
return count(u, r, a, a < b? b : a);
}
long
ircd::m::sync::notification_count(const room &room,
const event::idx &a,
const event::idx &b)
{
return m::count_since(room, a, a < b? b : a);
}

View file

@ -12,9 +12,9 @@ namespace ircd::m::sync
{
struct args;
struct stats;
struct shortpoll;
struct data;
struct response;
extern log::log log;
extern const string_view description;
extern resource resource;
extern resource::method method_get;
@ -70,27 +70,14 @@ namespace ircd::m::sync::linear
{
extern conf::item<size_t> delta_max;
static bool handle(client &, shortpoll &, json::stack::object &);
static bool handle(client &, data &, json::stack::object &);
}
namespace ircd::m::sync::polylog
{
extern conf::item<bool> prefetch_state;
extern conf::item<bool> prefetch_timeline;
static void room_state(shortpoll &, json::stack::object &, const m::room &);
static m::event::id::buf room_timeline_events(shortpoll &, json::stack::array &, const m::room &, bool &limited);
static void room_timeline(shortpoll &, json::stack::object &, const m::room &);
static void room_ephemeral_events(shortpoll &, json::stack::array &, const m::room &);
static void room_ephemeral(shortpoll &, json::stack::object &, const m::room &);
static void room_account_data(shortpoll &, json::stack::object &, const m::room &);
static void room_unread_notifications(shortpoll &, json::stack::object &, const m::room &);
static void sync_room(shortpoll &, json::stack::object &, const m::room &, const string_view &membership);
static void sync_rooms(shortpoll &, json::stack::object &, const string_view &membership);
static void rooms(shortpoll &, json::stack::object &);
static void presence(shortpoll &, json::stack::object &);
static void account_data(shortpoll &, json::stack::object &);
static bool handle(client &, shortpoll &, json::stack::object &);
static void sync_room(data &, json::stack::object &, const m::room &);
static void sync_rooms(data &, json::stack::object &, const string_view &membership);
static bool handle(client &, data &, json::stack::object &);
}
/// Argument parser for the client's /sync request
@ -151,113 +138,3 @@ struct ircd::m::sync::args
request.query.get("set_presence", true)
};
};
struct ircd::m::sync::stats
{
ircd::timer timer;
size_t flush_bytes {0};
size_t flush_count {0};
};
struct ircd::m::sync::shortpoll
{
static conf::item<size_t> flush_hiwat;
shortpoll(ircd::client &client,
const sync::args &args)
:client{client}
,args{args}
{}
sync::stats stats;
ircd::client &client;
const sync::args &args;
const resource::request &request
{
args.request
};
const uint64_t &since
{
args.since
};
const uint64_t current
{
m::vm::current_sequence
};
const uint64_t delta
{
current - since
};
const m::user user
{
request.user_id
};
const std::string filter_buf
{
args.filter_id?
user.filter(std::nothrow, args.filter_id):
std::string{}
};
const m::filter filter
{
json::object{filter_buf}
};
const m::user::room user_room
{
user
};
const m::user::rooms rooms
{
user
};
uint64_t state_at
{
0
};
bool committed
{
false
};
unique_buffer<mutable_buffer> buf
{
std::max(size_t(96_KiB), size_t(flush_hiwat))
};
std::unique_ptr<resource::response::chunked> response;
json::stack out
{
buf, std::bind(&shortpoll::flush, this, ph::_1), size_t(flush_hiwat)
};
void commit()
{
response = std::make_unique<resource::response::chunked>
(
client, http::OK, "application/json; charset=utf-8"
);
}
const_buffer flush(const const_buffer &buf)
{
if(!committed)
return buf;
if(!response)
commit();
stats.flush_bytes += response->write(buf);
stats.flush_count++;
return buf;
}
};

View file

@ -0,0 +1,64 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Account Data"
};
namespace ircd::m::sync
{
static bool account_data_polylog(data &);
extern item account_data;
}
decltype(ircd::m::sync::account_data)
ircd::m::sync::account_data
{
"account_data",
account_data_polylog
};
bool
ircd::m::sync::account_data_polylog(data &data)
{
json::stack::object out{*data.member};
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::room::state state{data.user_room};
state.for_each("ircd.account_data", [&data, &array]
(const m::event &event)
{
const auto &event_idx(index(event, std::nothrow));
if(event_idx < data.since || event_idx > data.current)
return;
json::stack::object object{array};
// type
{
json::stack::member member
{
object, "type", at<"state_key"_>(event)
};
}
// content
{
json::stack::member member
{
object, "content", at<"content"_>(event)
};
}
});
return true;
}

View file

@ -0,0 +1,114 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Presence"
};
namespace ircd::m::sync
{
static bool presence_polylog(data &);
extern item presence;
}
decltype(ircd::m::sync::presence)
ircd::m::sync::presence
{
"presence",
presence_polylog
};
namespace ircd
{
ctx::pool::opts meepool_opts
{
256_KiB
};
ctx::pool meepool
{
"meepool", meepool_opts
};
};
bool
ircd::m::sync::presence_polylog(data &data)
{
json::stack::object out{*data.member};
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::user::mitsein mitsein
{
data.user
};
ctx::mutex mutex;
const auto closure{[&data, &array, &mutex]
(const json::object &event)
{
// Lock the json::stack for the append operations. This mutex will only be
// contended during a json::stack flush to the client; not during database
// queries leading to this.
const std::lock_guard<decltype(mutex)> l{mutex};
json::stack::object object{array};
// sender
json::stack::member
{
object, "sender", unquote(event.get("user_id"))
};
// type
json::stack::member
{
object, "type", json::value{"m.presence"}
};
// content
json::stack::member
{
object, "content", event
};
}};
const auto each_user{[&data, &closure]
(const m::user::id &user_id)
{
const m::user user{user_id};
const m::user::room user_room{user};
//TODO: can't check event_idx cuz only closed presence content
if(head_idx(std::nothrow, user_room) > data.since)
m::presence::get(std::nothrow, user, closure);
}};
//TODO: conf
static const size_t fibers(24);
string_view q[fibers];
char buf[fibers][256];
ctx::parallel<string_view> parallel
{
meepool, q, each_user
};
const auto paraclosure{[&parallel, &q, &buf]
(const m::user &u)
{
assert(parallel.snd < fibers);
strlcpy(buf[parallel.snd], string_view{u.user_id});
q[parallel.snd] = buf[parallel.snd];
parallel();
}};
mitsein.for_each("join", paraclosure);
// mitsein.for_each("join", each_user);
return true;
}

View file

@ -0,0 +1,68 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Room Account Data"
};
namespace ircd::m::sync
{
static bool room_account_data_polylog(data &);
extern item room_account_data;
}
decltype(ircd::m::sync::room_account_data)
ircd::m::sync::room_account_data
{
"rooms...account_data",
room_account_data_polylog
};
bool
ircd::m::sync::room_account_data_polylog(data &data)
{
json::stack::object out{*data.member};
json::stack::member member{out, "events"};
json::stack::array array{member};
const m::room::state state
{
data.user_room
};
char typebuf[288]; //TODO: room_account_data_typebuf_size
const auto type
{
m::user::_account_data_type(typebuf, data.room->room_id)
};
state.for_each(type, [&data, &array]
(const m::event &event)
{
const auto &event_idx(index(event, std::nothrow));
if(event_idx < data.since || event_idx >= data.current)
return;
json::stack::object object{array};
json::stack::member
{
object, "type", at<"state_key"_>(event)
};
json::stack::member
{
object, "content", at<"content"_>(event)
};
});
return true;
}

View file

@ -0,0 +1,99 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Room Receipts"
};
namespace ircd::m::sync
{
static bool room_ephemeral_m_receipt_m_read_polylog(data &);
extern item room_ephemeral_m_receipt_m_read;
}
decltype(ircd::m::sync::room_ephemeral_m_receipt_m_read)
ircd::m::sync::room_ephemeral_m_receipt_m_read
{
"rooms...ephemeral",
room_ephemeral_m_receipt_m_read_polylog
};
bool
ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data)
{
const m::room &room{*data.room};
const m::room::members members{room};
const m::room::members::closure closure{[&]
(const m::user::id &user_id)
{
static const m::event::fetch::opts fopts
{
m::event::keys::include
{
"event_id",
"content",
"sender",
},
};
const m::user user{user_id};
m::user::room user_room{user};
user_room.fopts = &fopts;
if(head_idx(std::nothrow, user_room) <= data.since)
return;
user_room.get(std::nothrow, "ircd.read", room.room_id, [&]
(const m::event &event)
{
const auto &event_idx(index(event, std::nothrow));
if(event_idx < data.since || event_idx >= data.current)
return;
data.commit();
json::stack::object object{*data.array};
// type
json::stack::member
{
object, "type", "m.receipt"
};
// content
const json::object data
{
at<"content"_>(event)
};
thread_local char buf[1024];
const json::members reformat
{
{ unquote(data.at("event_id")),
{
{ "m.read",
{
{ at<"sender"_>(event),
{
{ "ts", data.at("ts") }
}}
}}
}}
};
json::stack::member
{
object, "content", json::stringify(mutable_buffer{buf}, reformat)
};
});
}};
return true;
}

View file

@ -0,0 +1,92 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Room State"
};
namespace ircd::m::sync
{
static bool room_state_polylog(data &);
extern item room_state;
}
decltype(ircd::m::sync::room_state)
ircd::m::sync::room_state
{
"rooms...state",
room_state_polylog
};
bool
ircd::m::sync::room_state_polylog(data &data)
{
static const m::event::keys::include default_keys
{
"content",
"depth",
"event_id",
"origin_server_ts",
"redacts",
"room_id",
"sender",
"state_key",
"type",
};
static const m::event::fetch::opts fopts
{
default_keys
};
json::stack::object out{*data.member};
json::stack::member member
{
out, "events"
};
json::stack::array array
{
member
};
ctx::mutex mutex;
const event::closure_idx each_idx{[&data, &array, &mutex]
(const m::event::idx &event_idx)
{
assert(event_idx);
const event::fetch event
{
event_idx, std::nothrow, &fopts
};
if(!event.valid || at<"depth"_>(event) >= int64_t(data.state_at))
return;
const std::lock_guard<decltype(mutex)> lock{mutex};
array.append(event);
data.commit();
}};
const event::closure_idx _each_idx{[&data, &each_idx]
(const m::event::idx &event_idx)
{
assert(event_idx);
if(event_idx >= data.since && event_idx <= data.current)
each_idx(event_idx);
}};
const m::room &room{*data.room};
const m::room::state state{room};
state.for_each(_each_idx);
return true;
}

View file

@ -0,0 +1,124 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Room Timeline"
};
namespace ircd::m::sync
{
static event::id::buf _room_timeline_events(data &, json::stack::array &, const m::room &, bool &);
static bool room_timeline_polylog(data &);
extern item room_timeline;
}
decltype(ircd::m::sync::room_timeline)
ircd::m::sync::room_timeline
{
"rooms...timeline",
room_timeline_polylog
};
bool
ircd::m::sync::room_timeline_polylog(data &data)
{
json::stack::object out{*data.member};
// events
bool limited{false};
m::event::id::buf prev;
{
json::stack::member member{out, "events"};
json::stack::array array{member};
prev = _room_timeline_events(data, array, *data.room, limited);
}
// prev_batch
json::stack::member
{
out, "prev_batch", string_view{prev}
};
// limited
json::stack::member
{
out, "limited", json::value{limited}
};
return true;
}
ircd::m::event::id::buf
ircd::m::sync::_room_timeline_events(data &data,
json::stack::array &out,
const m::room &room,
bool &limited)
{
static const m::event::fetch::opts fopts
{
m::event::keys::include
{
"content",
"depth",
"event_id",
"origin_server_ts",
"prev_events",
"redacts",
"room_id",
"sender",
"state_key",
"type",
},
};
// messages seeks to the newest event, but the client wants the oldest
// event first so we seek down first and then iterate back up. Due to
// an issue with rocksdb's prefix-iteration this iterator becomes
// toxic as soon as it becomes invalid. As a result we have to copy the
// event_id on the way down in case of renewing the iterator for the
// way back. This is not a big deal but rocksdb should fix their shit.
ssize_t i(0);
m::event::id::buf event_id;
m::room::messages it
{
room, &fopts
};
for(; it && i < 10; --it, ++i)
{
event_id = it.event_id();
if(it.event_idx() < data.since)
break;
if(it.event_idx() > data.current)
break;
}
limited = i >= 10;
if(i > 0)
data.commit();
if(i > 0 && !it)
it.seek(event_id);
if(i > 0 && it)
{
const m::event &event{*it};
data.state_at = at<"depth"_>(event);
}
if(i > 0)
for(; it && i > -1; ++it, --i)
out.append(*it);
return event_id;
}

View file

@ -0,0 +1,89 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Room Unread Notifications"
};
namespace ircd::m::sync
{
static long _notification_count(const room &, const event::idx &a, const event::idx &b);
static long _highlight_count(const room &, const user &u, const event::idx &a, const event::idx &b);
static bool room_unread_notifications_polylog(data &);
extern item room_unread_notifications;
}
decltype(ircd::m::sync::room_unread_notifications)
ircd::m::sync::room_unread_notifications
{
"rooms...unread_notifications",
room_unread_notifications_polylog
};
bool
ircd::m::sync::room_unread_notifications_polylog(data &data)
{
auto &room{*data.room};
m::event::id::buf last_read;
if(!m::receipt::read(last_read, room.room_id, data.user))
return false;
json::stack::object out{*data.member};
const auto last_read_idx
{
index(last_read)
};
// highlight_count
json::stack::member
{
out, "highlight_count", json::value
{
_highlight_count(room, data.user, last_read_idx, data.current)
}
};
// notification_count
json::stack::member
{
out, "notification_count", json::value
{
_notification_count(room, last_read_idx, data.current)
}
};
return true;
}
long
ircd::m::sync::_notification_count(const room &room,
const event::idx &a,
const event::idx &b)
{
return m::count_since(room, a, a < b? b : a);
}
long
ircd::m::sync::_highlight_count(const room &r,
const user &u,
const event::idx &a,
const event::idx &b)
{
using proto = size_t (const user &, const room &, const event::idx &, const event::idx &);
static mods::import<proto> count
{
"m_user", "highlighted_count__between"
};
return count(u, r, a, a < b? b : a);
}