diff --git a/include/ircd/m/m.h b/include/ircd/m/m.h index f16634a72..d675deb59 100644 --- a/include/ircd/m/m.h +++ b/include/ircd/m/m.h @@ -59,6 +59,7 @@ namespace ircd::m::vm #include "visible.h" #include "feds.h" #include "app.h" +#include "sync.h" struct ircd::m::init { diff --git a/include/ircd/m/sync.h b/include/ircd/m/sync.h new file mode 100644 index 000000000..9a0952fd7 --- /dev/null +++ b/include/ircd/m/sync.h @@ -0,0 +1,109 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#pragma once +#define HAVE_IRCD_M_SYNC_H + +namespace ircd +{ + struct client; +} + +namespace ircd::m::sync +{ + struct args; + struct stats; + struct data; + struct item; + struct response; + + extern log::log log; +} + +struct ircd::m::sync::item +:instance_multimap> +{ + using handle = std::function; + + handle _polylog; + handle _linear; + handle _longpoll; + + public: + string_view name() const; + + bool polylog(data &); + bool linear(data &, const m::event &); + bool longpoll(data &, const m::event &); + + item(std::string name, + handle polylog = {}, + handle linear = {}, + handle longpoll = {}); + + item(item &&) = delete; + item(const item &) = delete; + ~item() noexcept; +}; + +struct ircd::m::sync::data +{ + sync::stats &stats; + ircd::client &client; + + // Range related + const uint64_t &since; + const uint64_t current; + const uint64_t delta; + + // User related + const m::user user; + const m::user::room user_room; + const m::user::rooms user_rooms; + + // Filter to use + const std::string filter_buf; + const m::filter filter; + + // response state + const std::unique_ptr resp; + json::stack out; + bool committed() const; + bool commit(); + + // apropos contextual + ctx::mutex write_mutex; + json::stack::member *member {nullptr}; + json::stack::object *object {nullptr}; + json::stack::array *array {nullptr}; + const m::event *event {nullptr}; + const m::room *room {nullptr}; + string_view membership; + + // unsorted / misc + uint64_t state_at {0}; + + data(sync::stats &stats, + ircd::client &client, + const m::user &user, + const std::pair &range, + const string_view &filter_id); + + data(data &&) = delete; + data(const data &) = delete; + ~data() noexcept; +}; + +struct ircd::m::sync::stats +{ + ircd::timer timer; + size_t flush_bytes {0}; + size_t flush_count {0}; +}; diff --git a/ircd/m/m.cc b/ircd/m/m.cc index d9d6ed688..9fba3ddb7 100644 --- a/ircd/m/m.cc +++ b/ircd/m/m.cc @@ -389,6 +389,333 @@ ircd::m::self::init::init(const string_view &origin) }; } +/////////////////////////////////////////////////////////////////////////////// +// +// m/sync.h +// + +decltype(ircd::m::sync::log) +ircd::m::sync::log +{ + "sync", 's' +}; + +// +// response +// + +struct ircd::m::sync::response +{ + static conf::item flush_hiwat; + + sync::stats &stats; + ircd::client &client; + unique_buffer buf; + std::unique_ptr resp; + bool committed; + + const_buffer flush(const const_buffer &buf); + void commit(); + + response(sync::stats &stats, ircd::client &client); + ~response() noexcept; +}; + +decltype(ircd::m::sync::response::flush_hiwat) +ircd::m::sync::response::flush_hiwat +{ + { "name", "ircd.m.sync.flush.hiwat" }, + { "default", long(32_KiB) }, +}; + +// +// response::response +// + +ircd::m::sync::response::response(sync::stats &stats, + ircd::client &client) +:stats +{ + stats +} +,client +{ + client +} +,buf +{ + std::max(size_t(96_KiB), size_t(flush_hiwat)) +} +,committed +{ + false +} +{ +} + +ircd::m::sync::response::~response() +noexcept +{ +} + +ircd::const_buffer +ircd::m::sync::response::flush(const const_buffer &buf) +{ + if(!committed) + return buf; + + if(!resp) + commit(); + + stats.flush_bytes += resp->write(buf); + stats.flush_count++; + return buf; +} + +void +ircd::m::sync::response::commit() +{ + static const string_view content_type + { + "application/json; charset=utf-8" + }; + + assert(!resp); + resp = std::make_unique + ( + client, http::OK, content_type + ); +} + +// +// data +// + +ircd::m::sync::data::data(sync::stats &stats, + ircd::client &client, + const m::user &user, + const std::pair &range, + const string_view &filter_id) +:stats{stats} +,client{client} +,since +{ + range.first +} +,current +{ + range.second +} +,delta +{ + current - since +} +,user +{ + user +} +,user_room +{ + user +} +,user_rooms +{ + user +} +,filter_buf +{ + filter_id? + user.filter(std::nothrow, filter_id): + std::string{} +} +,filter +{ + json::object{filter_buf} +} +,resp +{ + std::make_unique(stats, client) +} +,out +{ + resp->buf, std::bind(&response::flush, resp.get(), ph::_1), size_t(resp->flush_hiwat) +} +{ +} + +ircd::m::sync::data::~data() +noexcept +{ +} + +bool +ircd::m::sync::data::commit() +{ + assert(resp); + const auto ret{resp->committed}; + resp->committed = true; + return ret; +} + +bool +ircd::m::sync::data::committed() +const +{ + assert(resp); + return resp->committed; +} + +// +// item +// + +template<> +decltype(ircd::m::sync::item::instance_multimap::map) +ircd::m::sync::item::instance_multimap::map +{}; + +// +// item::item +// + +ircd::m::sync::item::item(std::string name, + handle polylog, + handle linear, + handle longpoll) +:instance_multimap +{ + std::move(name) +} +,_polylog +{ + std::move(polylog) +} +,_linear +{ + std::move(linear) +} +,_longpoll +{ + std::move(longpoll) +} +{ + log::debug + { + log, "Registered sync item(%p) '%s'", + this, + this->name() + }; +} + +ircd::m::sync::item::~item() +noexcept +{ + log::debug + { + log, "Unregistered sync item(%p) '%s'", + this, + this->name() + }; +} + +bool +ircd::m::sync::item::longpoll(data &data, + const m::event &event) +try +{ + const auto ret + { + _longpoll(data) + }; + + return ret; +} +catch(const std::bad_function_call &) +{ + return false; +} + +bool +ircd::m::sync::item::linear(data &data, + const m::event &event) +try +{ + const scope_restore theirs + { + data.event, &event + }; + + const auto ret + { + _linear(data) + }; + + return ret; +} +catch(const std::bad_function_call &) +{ + return false; +} + +bool +ircd::m::sync::item::polylog(data &data) +try +{ + #ifdef RB_DEBUG + sync::stats stats{data.stats}; + stats.timer = {}; + #endif + + const auto ret + { + _polylog(data) + }; + + #ifdef RB_DEBUG + thread_local char rembuf[128], iecbuf[64], tmbuf[32]; + log::debug + { + log, "polylog %s %s '%s' %s wc:%zu in %s", + string(rembuf, ircd::remote(data.client)), + string_view{data.user.user_id}, + name(), + ircd::pretty(iecbuf, iec(data.stats.flush_bytes - stats.flush_bytes)), + data.stats.flush_count - stats.flush_count, + ircd::pretty(tmbuf, stats.timer.at(), true) + }; + #endif + + return ret; +} +catch(const std::bad_function_call &) +{ + return false; +} +catch(const std::exception &e) +{ + thread_local char rembuf[128], iecbuf[64], tmbuf[32]; + log::derror + { + log, "polylog %s %s '%s' %s wc:%zu in %s :%s", + string(rembuf, ircd::remote(data.client)), + string_view{data.user.user_id}, + name(), + ircd::pretty(iecbuf, iec(data.stats.flush_bytes)), + data.stats.flush_count, + ircd::pretty(tmbuf, data.stats.timer.at(), true), + e.what() + }; + + throw; +} + +ircd::string_view +ircd::m::sync::item::name() +const +{ + return this->instance_multimap::it->first; +} + /////////////////////////////////////////////////////////////////////////////// // // m/feds.h diff --git a/modules/Makefile.am b/modules/Makefile.am index 4cdf3cf19..9a67245d2 100644 --- a/modules/Makefile.am +++ b/modules/Makefile.am @@ -258,6 +258,28 @@ client_module_LTLIBRARIES += \ client/client_thirdparty_protocols.la \ ### +# +# client/sync/ +# + +client_client_sync_account_data_la_SOURCES = client/sync/account_data.cc +client_client_sync_presence_la_SOURCES = client/sync/presence.cc +client_client_sync_rooms_account_data_la_SOURCES = client/sync/rooms/account_data.cc +client_client_sync_rooms_receipt_la_SOURCES = client/sync/rooms/receipt.cc +client_client_sync_rooms_state_la_SOURCES = client/sync/rooms/state.cc +client_client_sync_rooms_timeline_la_SOURCES = client/sync/rooms/timeline.cc +client_client_sync_rooms_unread_notifications_la_SOURCES = client/sync/rooms/unread_notifications.cc + +client_module_LTLIBRARIES += \ + client/client_sync_account_data.la \ + client/client_sync_presence.la \ + client/client_sync_rooms_account_data.la \ + client/client_sync_rooms_receipt.la \ + client/client_sync_rooms_state.la \ + client/client_sync_rooms_timeline.la \ + client/client_sync_rooms_unread_notifications.la \ + ### + ############################################################################### # # /_matrix/key/ diff --git a/modules/client/sync.cc b/modules/client/sync.cc index a4cbd456e..98d567d9e 100644 --- a/modules/client/sync.cc +++ b/modules/client/sync.cc @@ -16,12 +16,6 @@ IRCD_MODULE "Client 6.2.1 :Sync" }; -decltype(ircd::m::sync::log) -ircd::m::sync::log -{ - "sync", 's' -}; - decltype(ircd::m::sync::resource) ircd::m::sync::resource { @@ -62,13 +56,6 @@ ircd::m::sync::args::timeout_default { "default", 10 * 1000L }, }; -ircd::conf::item -ircd::m::sync::shortpoll::flush_hiwat -{ - { "name", "ircd.client.sync.flush.hiwat" }, - { "default", long(24_KiB) }, -}; - // // GET sync // @@ -93,20 +80,27 @@ try request }; - shortpoll sp + const std::pair range { - client, args + args.since, // start at since token + m::vm::current_sequence // stop at present }; - if(sp.since > sp.current) + stats stats; + data data + { + stats, client, request.user_id, range, args.filter_id + }; + + if(data.since > data.current + 1) throw m::NOT_FOUND { - "Since parameter is in the future..." + "Since parameter is too far in the future..." }; json::stack::object top { - sp.out + data.out }; const size_t linear_delta_max @@ -116,11 +110,11 @@ try const bool shortpolled { - sp.delta == 0? + data.delta == 0? false: - sp.delta > linear_delta_max? - polylog::handle(client, sp, top): - linear::handle(client, sp, top) + data.delta > linear_delta_max? + polylog::handle(client, data, top): + linear::handle(client, data, top) }; // When shortpoll was successful, do nothing else. @@ -456,7 +450,7 @@ ircd::m::sync::linear::delta_max bool ircd::m::sync::linear::handle(client &client, - shortpoll &sp, + data &sp, json::stack::object &object) { uint64_t since @@ -485,7 +479,7 @@ ircd::m::sync::linear::handle(client &client, json::get<"room_id"_>(event) }; - if(!room.membership(sp.args.request.user_id)) + if(!room.membership(sp.user.user_id)) return true; auto it @@ -631,119 +625,122 @@ ircd::m::sync::linear::handle(client &client, return true; } +static long +ircd::m::sync::notification_count(const m::room &room, + const m::event::idx &a, + const m::event::idx &b) +{ + return m::count_since(room, a, a < b? b : a); +} + +static long +ircd::m::sync::highlight_count(const m::room &r, + const m::user &u, + const m::event::idx &a, + const m::event::idx &b) +{ + using namespace ircd::m; + using proto = size_t (const user &, const room &, const event::idx &, const event::idx &); + + static mods::import count + { + "m_user", "highlighted_count__between" + }; + + return count(u, r, a, a < b? b : a); +} + // // polylog // -ircd::conf::item -ircd::m::sync::polylog::prefetch_state +namespace ircd { - { "name", "ircd.client.sync.polylog.prefetch.state" }, - { "default", false }, -}; + ctx::pool::opts mepool_opts + { + 256_KiB + }; -ircd::conf::item -ircd::m::sync::polylog::prefetch_timeline -{ - { "name", "ircd.client.sync.polylog.prefetch.timeline" }, - { "default", false }, + ctx::pool mepool + { + "me pool", mepool_opts + }; }; bool ircd::m::sync::polylog::handle(client &client, - shortpoll &sp, + data &data, json::stack::object &object) try { // Generate individual stats for sections - thread_local char iecbuf[64], rembuf[128]; - sync::stats stats{sp.stats}; + thread_local char iecbuf[64], rembuf[128], tmbuf[32]; + sync::stats stats{data.stats}; stats.timer = timer{}; + { + json::stack::member member{object, "account_data"}; + const scope_restore theirs + { + data.member, &member + }; + + auto it(m::sync::item::map.find("account_data")); + assert(it != m::sync::item::map.end()); + const auto &item(it->second); + assert(item); + item->polylog(data); + } + { json::stack::member member{object, "rooms"}; json::stack::object object{member}; - rooms(sp, object); + sync_rooms(data, object, "invite"); + sync_rooms(data, object, "join"); + sync_rooms(data, object, "leave"); + sync_rooms(data, object, "ban"); } - #ifdef RB_DEBUG - log::debug - { - log, "polylog %s %s rooms %s wc:%zu in %lu$ms", - string(rembuf, ircd::remote(sp.client)), - string_view{sp.request.user_id}, - pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)), - sp.stats.flush_count - stats.flush_count, - stats.timer.at().count() - }; - stats = sync::stats{sp.stats}; - stats.timer = timer{}; - #endif - { json::stack::member member{object, "presence"}; - json::stack::object object{member}; - presence(sp, object); + const scope_restore theirs + { + data.member, &member + }; + + auto it(m::sync::item::map.find("presence")); + assert(it != m::sync::item::map.end()); + const auto &item(it->second); + assert(item); + item->polylog(data); } - #ifdef RB_DEBUG - log::debug - { - log, "polylog %s %s presence %s wc:%zu in %lu$ms", - string(rembuf, ircd::remote(sp.client)), - string_view{sp.request.user_id}, - pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)), - sp.stats.flush_count - stats.flush_count, - stats.timer.at().count() - }; - stats = sync::stats{sp.stats}; - stats.timer = timer{}; - #endif - - { - json::stack::member member{object, "account_data"}; - json::stack::object object{member}; - account_data(sp, object); - } - - #ifdef RB_DEBUG - log::debug - { - log, "polylog %s %s account_data %s wc:%zu in %lu$ms", - string(rembuf, ircd::remote(sp.client)), - string_view{sp.request.user_id}, - pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)), - sp.stats.flush_count - stats.flush_count, - stats.timer.at().count() - }; - #endif - { json::stack::member member { - object, "next_batch", json::value(lex_cast(int64_t(sp.current)), json::STRING) + object, "next_batch", json::value(lex_cast(int64_t(data.current)), json::STRING) }; } log::info { - log, "polylog %s %s %s wc:%zu in %lu$ms", - string(rembuf, ircd::remote(sp.client)), - string_view{sp.request.user_id}, - pretty(iecbuf, iec(sp.stats.flush_bytes)), - sp.stats.flush_count, - sp.stats.timer.at().count() + log, "polylog %s %s %s wc:%zu in %s", + string(rembuf, ircd::remote(data.client)), + string_view{data.user.user_id}, + pretty(iecbuf, iec(data.stats.flush_bytes)), + data.stats.flush_count, + ircd::pretty(tmbuf, data.stats.timer.at(), true) }; - return sp.committed; + return data.committed(); } catch(const std::exception &e) { log::error { log, "polylog sync FAILED %lu to %lu (vm @ %zu) :%s" - ,sp.since - ,sp.current + ,data.since + ,data.current ,m::vm::current_sequence ,e.what() }; @@ -752,137 +749,34 @@ catch(const std::exception &e) } void -ircd::m::sync::polylog::presence(shortpoll &sp, - json::stack::object &out) -{ - json::stack::member member{out, "events"}; - json::stack::array array{member}; - - const m::user::mitsein mitsein - { - sp.user - }; - - mitsein.for_each("join", [&sp, &array] - (const m::user &user) - { - const m::user::room user_room{user}; - if(head_idx(std::nothrow, user_room) <= sp.since) - return; - - //TODO: can't check event_idx cuz only closed presence content - m::presence::get(std::nothrow, user, [&sp, &array] - (const json::object &event) - { - json::stack::object object{array}; - - // sender - { - json::stack::member member - { - object, "sender", unquote(event.get("user_id")) - }; - } - - // type - { - json::stack::member member - { - object, "type", json::value{"m.presence"} - }; - } - - // content - { - json::stack::member member - { - object, "content", event - }; - } - }); - }); -} - -void -ircd::m::sync::polylog::account_data(shortpoll &sp, - json::stack::object &out) -try -{ - json::stack::member member{out, "events"}; - json::stack::array array{member}; - const m::room::state state - { - sp.user_room - }; - - state.for_each("ircd.account_data", [&sp, &array] - (const m::event &event) - { - const auto &event_idx - { - index(event, std::nothrow) - }; - - if(event_idx < sp.since || event_idx >= sp.current) - return; - - json::stack::object object{array}; - - // type - { - json::stack::member member - { - object, "type", at<"state_key"_>(event) - }; - } - - // content - { - json::stack::member member - { - object, "content", at<"content"_>(event) - }; - } - }); -} -catch(const json::not_found &e) -{ - log::critical - { - log, "polylog sync account data error %lu to %lu (vm @ %zu) :%s" - ,sp.since - ,sp.current - ,m::vm::current_sequence - ,e.what() - }; -} - -void -ircd::m::sync::polylog::rooms(shortpoll &sp, - json::stack::object &object) -{ - sync_rooms(sp, object, "invite"); - sync_rooms(sp, object, "join"); - sync_rooms(sp, object, "leave"); - sync_rooms(sp, object, "ban"); -} - -void -ircd::m::sync::polylog::sync_rooms(shortpoll &sp, +ircd::m::sync::polylog::sync_rooms(data &data, json::stack::object &out, const string_view &membership) { - json::stack::member rooms_member{out, membership}; - json::stack::object rooms_object{rooms_member}; - sp.rooms.for_each(membership, [&sp, &rooms_object] + const scope_restore theirs + { + data.membership, membership + }; + + json::stack::member rooms_member + { + out, membership + }; + + json::stack::object rooms_object + { + rooms_member + }; + + data.user_rooms.for_each(membership, [&data, &rooms_object] (const m::room &room, const string_view &membership) { - if(head_idx(std::nothrow, room) <= sp.since) + if(head_idx(std::nothrow, room) <= data.since) return; // Generate individual stats for this room's sync #ifdef RB_DEBUG - sync::stats stats{sp.stats}; + sync::stats stats{data.stats}; stats.timer = timer{}; #endif @@ -891,71 +785,132 @@ ircd::m::sync::polylog::sync_rooms(shortpoll &sp, { json::stack::member member{rooms_object, room.room_id}; json::stack::object object{member}; - sync_room(sp, object, room, membership); + sync_room(data, object, room); } #ifdef RB_DEBUG - thread_local char iecbuf[64], rembuf[128]; + thread_local char iecbuf[64], rembuf[128], tmbuf[32]; log::debug { - log, "polylog %s %s %s %s wc:%zu in %lu$ms", - string(rembuf, ircd::remote(sp.client)), - string_view{sp.request.user_id}, + log, "polylog %s %s %s %s wc:%zu in %s", + string(rembuf, ircd::remote(data.client)), + string_view{data.user.user_id}, string_view{room.room_id}, - pretty(iecbuf, iec(sp.stats.flush_bytes - stats.flush_bytes)), - sp.stats.flush_count - stats.flush_count, - stats.timer.at().count() + pretty(iecbuf, iec(data.stats.flush_bytes - stats.flush_bytes)), + data.stats.flush_count - stats.flush_count, + ircd::pretty(tmbuf, stats.timer.at(), true) }; #endif }); } void -ircd::m::sync::polylog::sync_room(shortpoll &sp, +ircd::m::sync::polylog::sync_room(data &data, json::stack::object &out, - const m::room &room, - const string_view &membership) + const m::room &room) try { + const scope_restore theirs + { + data.room, &room + }; + // timeline { + auto it(m::sync::item::map.find("rooms...timeline")); + assert(it != m::sync::item::map.end()); + const auto &item(it->second); + assert(item); json::stack::member member{out, "timeline"}; - json::stack::object object{member}; - room_timeline(sp, object, room); + const scope_restore theirs + { + data.member, &member + }; + + item->polylog(data); } // state { + auto it(m::sync::item::map.find("rooms...state")); + assert(it != m::sync::item::map.end()); + const auto &item(it->second); + assert(item); json::stack::member member { - out, membership != "invite"? + out, data.membership != "invite"? "state": "invite_state" }; - json::stack::object object{member}; - room_state(sp, object, room); + const scope_restore theirs + { + data.member, &member + }; + + item->polylog(data); } // ephemeral { + auto pit + { + m::sync::item::map.equal_range("rooms...ephemeral") + }; + + assert(pit.first != pit.second); json::stack::member member{out, "ephemeral"}; json::stack::object object{member}; - room_ephemeral(sp, object, room); + const scope_restore theirs + { + data.object, &object + }; + + { + json::stack::member member{object, "events"}; + json::stack::array array{member}; + const scope_restore theirs + { + data.array, &array + }; + + for(; pit.first != pit.second; ++pit.first) + { + const auto &item(pit.first->second); + assert(item); + item->polylog(data); + } + } } // account_data { + auto it(m::sync::item::map.find("rooms...account_data")); + assert(it != m::sync::item::map.end()); + const auto &item(it->second); + assert(item); json::stack::member member{out, "account_data"}; - json::stack::object object{member}; - room_account_data(sp, object, room); + const scope_restore theirs + { + data.member, &member + }; + + item->polylog(data); } // unread_notifications { + auto it(m::sync::item::map.find("rooms...unread_notifications")); + assert(it != m::sync::item::map.end()); + const auto &item(it->second); + assert(item); json::stack::member member{out, "unread_notifications"}; - json::stack::object object{member}; - room_unread_notifications(sp, object, room); + const scope_restore theirs + { + data.member, &member + }; + + item->polylog(data); } } catch(const json::not_found &e) @@ -964,362 +919,9 @@ catch(const json::not_found &e) { log, "polylog sync room %s error %lu to %lu (vm @ %zu) :%s" ,string_view{room.room_id} - ,sp.since - ,sp.current + ,data.since + ,data.current ,m::vm::current_sequence ,e.what() }; } - -void -ircd::m::sync::polylog::room_state(shortpoll &sp, - json::stack::object &out, - const m::room &room) -{ - static const m::event::fetch::opts fopts - { - m::event::keys::include - { - "content", - "depth", - "event_id", - "origin_server_ts", - "redacts", - "room_id", - "sender", - "state_key", - "type", - }, - }; - - json::stack::member member - { - out, "events" - }; - - json::stack::array array - { - member - }; - - m::room::state state - { - room - }; - - if(bool(prefetch_state)) - state.prefetch(sp.since, sp.current); - - state.for_each([&sp, &array] - (const m::event::idx &event_idx) - { - if(event_idx < sp.since || event_idx >= sp.current) - return; - - const event::fetch event - { - event_idx, std::nothrow, &fopts - }; - - if(!event.valid || at<"depth"_>(event) >= int64_t(sp.state_at)) - return; - - array.append(event); - sp.committed = true; - }); -} - -void -ircd::m::sync::polylog::room_timeline(shortpoll &sp, - json::stack::object &out, - const m::room &room) -{ - // events - bool limited{false}; - m::event::id::buf prev; - { - json::stack::member member{out, "events"}; - json::stack::array array{member}; - prev = room_timeline_events(sp, array, room, limited); - } - - // prev_batch - { - json::stack::member member - { - out, "prev_batch", string_view{prev} - }; - } - - // limited - { - json::stack::member member - { - out, "limited", json::value{limited} - }; - } -} - -ircd::m::event::id::buf -ircd::m::sync::polylog::room_timeline_events(shortpoll &sp, - json::stack::array &out, - const m::room &room, - bool &limited) -{ - static const m::event::fetch::opts fopts - { - m::event::keys::include - { - "content", - "depth", - "event_id", - "origin_server_ts", - "prev_events", - "redacts", - "room_id", - "sender", - "state_key", - "type", - }, - }; - - // messages seeks to the newest event, but the client wants the oldest - // event first so we seek down first and then iterate back up. Due to - // an issue with rocksdb's prefix-iteration this iterator becomes - // toxic as soon as it becomes invalid. As a result we have to copy the - // event_id on the way down in case of renewing the iterator for the - // way back. This is not a big deal but rocksdb should fix their shit. - ssize_t i(0); - m::event::id::buf event_id; - m::room::messages it - { - room, &fopts - }; - - for(; it && i < 10; --it, ++i) - { - event_id = it.event_id(); - if(it.event_idx() < sp.since) - break; - - if(it.event_idx() >= sp.current) - break; - - if(bool(prefetch_timeline)) - m::prefetch(it.event_idx(), fopts); - } - - limited = i >= 10; - sp.committed |= i > 0; - - if(i > 0 && !it) - it.seek(event_id); - - if(i > 0 && it) - { - const m::event &event{*it}; - sp.state_at = at<"depth"_>(event); - } - - if(i > 0) - for(; it && i > -1; ++it, --i) - out.append(*it); - - return event_id; -} - -void -ircd::m::sync::polylog::room_ephemeral(shortpoll &sp, - json::stack::object &out, - const m::room &room) -{ - { - json::stack::member member{out, "events"}; - json::stack::array array{member}; - room_ephemeral_events(sp, array, room); - } -} - -void -ircd::m::sync::polylog::room_ephemeral_events(shortpoll &sp, - json::stack::array &out, - const m::room &room) -{ - const m::room::members members{room}; - members.for_each("join", m::room::members::closure{[&] - (const m::user &user) - { - static const m::event::fetch::opts fopts - { - m::event::keys::include - { - "event_id", - "content", - "sender", - }, - }; - - m::user::room user_room{user}; - user_room.fopts = &fopts; - - if(head_idx(std::nothrow, user_room) <= sp.since) - return; - - user_room.get(std::nothrow, "ircd.read", room.room_id, [&] - (const m::event &event) - { - const auto &event_idx - { - index(event, std::nothrow) - }; - - if(event_idx < sp.since || event_idx >= sp.current) - return; - - sp.committed = true; - json::stack::object object{out}; - - // type - { - json::stack::member member - { - object, "type", "m.receipt" - }; - } - - // content - { - const json::object data - { - at<"content"_>(event) - }; - - thread_local char buf[1024]; - const json::members reformat - { - { unquote(data.at("event_id")), - { - { "m.read", - { - { at<"sender"_>(event), - { - { "ts", data.at("ts") } - }} - }} - }} - }; - - json::stack::member member - { - object, "content", json::stringify(mutable_buffer{buf}, reformat) - }; - } - }); - }}); -} - -void -ircd::m::sync::polylog::room_account_data(shortpoll &sp, - json::stack::object &out, - const m::room &room) -{ - json::stack::member member{out, "events"}; - json::stack::array array{member}; - const m::room::state state - { - sp.user_room - }; - - char typebuf[288]; //TODO: room_account_data_typebuf_size - const auto type - { - m::user::_account_data_type(typebuf, room.room_id) - }; - - state.for_each(type, [&sp, &array] - (const m::event &event) - { - const auto &event_idx - { - index(event, std::nothrow) - }; - - if(event_idx < sp.since || event_idx >= sp.current) - return; - - json::stack::object object{array}; - - // type - { - json::stack::member member - { - object, "type", at<"state_key"_>(event) - }; - } - - // content - { - json::stack::member member - { - object, "content", at<"content"_>(event) - }; - } - }); -} - -void -ircd::m::sync::polylog::room_unread_notifications(shortpoll &sp, - json::stack::object &out, - const m::room &room) -{ - m::event::id::buf last_read; - if(!m::receipt::read(last_read, room.room_id, sp.user)) - return; - - const auto last_read_idx - { - index(last_read) - }; - - // highlight_count - json::stack::member - { - out, "highlight_count", json::value - { - highlight_count(room, sp.user, last_read_idx, sp.current) - } - }; - - // notification_count - json::stack::member - { - out, "notification_count", json::value - { - notification_count(room, last_read_idx, sp.current) - } - }; -} - -long -ircd::m::sync::highlight_count(const room &r, - const user &u, - const event::idx &a, - const event::idx &b) -{ - using proto = size_t (const user &, const room &, const event::idx &, const event::idx &); - - static mods::import count - { - "m_user", "highlighted_count__between" - }; - - return count(u, r, a, a < b? b : a); -} - -long -ircd::m::sync::notification_count(const room &room, - const event::idx &a, - const event::idx &b) -{ - return m::count_since(room, a, a < b? b : a); -} diff --git a/modules/client/sync.h b/modules/client/sync.h index deebb9a0d..14a4d216b 100644 --- a/modules/client/sync.h +++ b/modules/client/sync.h @@ -12,9 +12,9 @@ namespace ircd::m::sync { struct args; struct stats; - struct shortpoll; + struct data; + struct response; - extern log::log log; extern const string_view description; extern resource resource; extern resource::method method_get; @@ -70,27 +70,14 @@ namespace ircd::m::sync::linear { extern conf::item delta_max; - static bool handle(client &, shortpoll &, json::stack::object &); + static bool handle(client &, data &, json::stack::object &); } namespace ircd::m::sync::polylog { - extern conf::item prefetch_state; - extern conf::item prefetch_timeline; - - static void room_state(shortpoll &, json::stack::object &, const m::room &); - static m::event::id::buf room_timeline_events(shortpoll &, json::stack::array &, const m::room &, bool &limited); - static void room_timeline(shortpoll &, json::stack::object &, const m::room &); - static void room_ephemeral_events(shortpoll &, json::stack::array &, const m::room &); - static void room_ephemeral(shortpoll &, json::stack::object &, const m::room &); - static void room_account_data(shortpoll &, json::stack::object &, const m::room &); - static void room_unread_notifications(shortpoll &, json::stack::object &, const m::room &); - static void sync_room(shortpoll &, json::stack::object &, const m::room &, const string_view &membership); - static void sync_rooms(shortpoll &, json::stack::object &, const string_view &membership); - static void rooms(shortpoll &, json::stack::object &); - static void presence(shortpoll &, json::stack::object &); - static void account_data(shortpoll &, json::stack::object &); - static bool handle(client &, shortpoll &, json::stack::object &); + static void sync_room(data &, json::stack::object &, const m::room &); + static void sync_rooms(data &, json::stack::object &, const string_view &membership); + static bool handle(client &, data &, json::stack::object &); } /// Argument parser for the client's /sync request @@ -151,113 +138,3 @@ struct ircd::m::sync::args request.query.get("set_presence", true) }; }; - -struct ircd::m::sync::stats -{ - ircd::timer timer; - size_t flush_bytes {0}; - size_t flush_count {0}; -}; - -struct ircd::m::sync::shortpoll -{ - static conf::item flush_hiwat; - - shortpoll(ircd::client &client, - const sync::args &args) - :client{client} - ,args{args} - {} - - sync::stats stats; - ircd::client &client; - const sync::args &args; - const resource::request &request - { - args.request - }; - - const uint64_t &since - { - args.since - }; - - const uint64_t current - { - m::vm::current_sequence - }; - - const uint64_t delta - { - current - since - }; - - const m::user user - { - request.user_id - }; - - const std::string filter_buf - { - args.filter_id? - user.filter(std::nothrow, args.filter_id): - std::string{} - }; - - const m::filter filter - { - json::object{filter_buf} - }; - - const m::user::room user_room - { - user - }; - - const m::user::rooms rooms - { - user - }; - - uint64_t state_at - { - 0 - }; - - bool committed - { - false - }; - - unique_buffer buf - { - std::max(size_t(96_KiB), size_t(flush_hiwat)) - }; - - std::unique_ptr response; - json::stack out - { - buf, std::bind(&shortpoll::flush, this, ph::_1), size_t(flush_hiwat) - }; - - void commit() - { - response = std::make_unique - ( - client, http::OK, "application/json; charset=utf-8" - ); - } - - const_buffer flush(const const_buffer &buf) - { - if(!committed) - return buf; - - if(!response) - commit(); - - stats.flush_bytes += response->write(buf); - stats.flush_count++; - return buf; - } -}; diff --git a/modules/client/sync/account_data.cc b/modules/client/sync/account_data.cc new file mode 100644 index 000000000..a087b7b29 --- /dev/null +++ b/modules/client/sync/account_data.cc @@ -0,0 +1,64 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +ircd::mapi::header +IRCD_MODULE +{ + "Client Sync :Account Data" +}; + +namespace ircd::m::sync +{ + static bool account_data_polylog(data &); + extern item account_data; +} + +decltype(ircd::m::sync::account_data) +ircd::m::sync::account_data +{ + "account_data", + account_data_polylog +}; + +bool +ircd::m::sync::account_data_polylog(data &data) +{ + json::stack::object out{*data.member}; + json::stack::member member{out, "events"}; + json::stack::array array{member}; + const m::room::state state{data.user_room}; + state.for_each("ircd.account_data", [&data, &array] + (const m::event &event) + { + const auto &event_idx(index(event, std::nothrow)); + if(event_idx < data.since || event_idx > data.current) + return; + + json::stack::object object{array}; + + // type + { + json::stack::member member + { + object, "type", at<"state_key"_>(event) + }; + } + + // content + { + json::stack::member member + { + object, "content", at<"content"_>(event) + }; + } + }); + + return true; +} diff --git a/modules/client/sync/presence.cc b/modules/client/sync/presence.cc new file mode 100644 index 000000000..b78dc1c0d --- /dev/null +++ b/modules/client/sync/presence.cc @@ -0,0 +1,114 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +ircd::mapi::header +IRCD_MODULE +{ + "Client Sync :Presence" +}; + +namespace ircd::m::sync +{ + static bool presence_polylog(data &); + extern item presence; +} + +decltype(ircd::m::sync::presence) +ircd::m::sync::presence +{ + "presence", + presence_polylog +}; + +namespace ircd +{ + ctx::pool::opts meepool_opts + { + 256_KiB + }; + + ctx::pool meepool + { + "meepool", meepool_opts + }; +}; + +bool +ircd::m::sync::presence_polylog(data &data) +{ + json::stack::object out{*data.member}; + json::stack::member member{out, "events"}; + json::stack::array array{member}; + const m::user::mitsein mitsein + { + data.user + }; + + ctx::mutex mutex; + const auto closure{[&data, &array, &mutex] + (const json::object &event) + { + // Lock the json::stack for the append operations. This mutex will only be + // contended during a json::stack flush to the client; not during database + // queries leading to this. + const std::lock_guard l{mutex}; + json::stack::object object{array}; + + // sender + json::stack::member + { + object, "sender", unquote(event.get("user_id")) + }; + + // type + json::stack::member + { + object, "type", json::value{"m.presence"} + }; + + // content + json::stack::member + { + object, "content", event + }; + }}; + + const auto each_user{[&data, &closure] + (const m::user::id &user_id) + { + const m::user user{user_id}; + const m::user::room user_room{user}; + //TODO: can't check event_idx cuz only closed presence content + if(head_idx(std::nothrow, user_room) > data.since) + m::presence::get(std::nothrow, user, closure); + }}; + + //TODO: conf + static const size_t fibers(24); + string_view q[fibers]; + char buf[fibers][256]; + ctx::parallel parallel + { + meepool, q, each_user + }; + + const auto paraclosure{[¶llel, &q, &buf] + (const m::user &u) + { + assert(parallel.snd < fibers); + strlcpy(buf[parallel.snd], string_view{u.user_id}); + q[parallel.snd] = buf[parallel.snd]; + parallel(); + }}; + + mitsein.for_each("join", paraclosure); +// mitsein.for_each("join", each_user); + return true; +} diff --git a/modules/client/sync/rooms/account_data.cc b/modules/client/sync/rooms/account_data.cc new file mode 100644 index 000000000..d2592a96e --- /dev/null +++ b/modules/client/sync/rooms/account_data.cc @@ -0,0 +1,68 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +ircd::mapi::header +IRCD_MODULE +{ + "Client Sync :Room Account Data" +}; + +namespace ircd::m::sync +{ + static bool room_account_data_polylog(data &); + extern item room_account_data; +} + +decltype(ircd::m::sync::room_account_data) +ircd::m::sync::room_account_data +{ + "rooms...account_data", + room_account_data_polylog +}; + +bool +ircd::m::sync::room_account_data_polylog(data &data) +{ + json::stack::object out{*data.member}; + json::stack::member member{out, "events"}; + json::stack::array array{member}; + const m::room::state state + { + data.user_room + }; + + char typebuf[288]; //TODO: room_account_data_typebuf_size + const auto type + { + m::user::_account_data_type(typebuf, data.room->room_id) + }; + + state.for_each(type, [&data, &array] + (const m::event &event) + { + const auto &event_idx(index(event, std::nothrow)); + if(event_idx < data.since || event_idx >= data.current) + return; + + json::stack::object object{array}; + + json::stack::member + { + object, "type", at<"state_key"_>(event) + }; + + json::stack::member + { + object, "content", at<"content"_>(event) + }; + }); + + return true; +} diff --git a/modules/client/sync/rooms/receipt.cc b/modules/client/sync/rooms/receipt.cc new file mode 100644 index 000000000..78a3ca255 --- /dev/null +++ b/modules/client/sync/rooms/receipt.cc @@ -0,0 +1,99 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +ircd::mapi::header +IRCD_MODULE +{ + "Client Sync :Room Receipts" +}; + +namespace ircd::m::sync +{ + static bool room_ephemeral_m_receipt_m_read_polylog(data &); + extern item room_ephemeral_m_receipt_m_read; +} + +decltype(ircd::m::sync::room_ephemeral_m_receipt_m_read) +ircd::m::sync::room_ephemeral_m_receipt_m_read +{ + "rooms...ephemeral", + room_ephemeral_m_receipt_m_read_polylog +}; + +bool +ircd::m::sync::room_ephemeral_m_receipt_m_read_polylog(data &data) +{ + const m::room &room{*data.room}; + const m::room::members members{room}; + const m::room::members::closure closure{[&] + (const m::user::id &user_id) + { + static const m::event::fetch::opts fopts + { + m::event::keys::include + { + "event_id", + "content", + "sender", + }, + }; + + const m::user user{user_id}; + m::user::room user_room{user}; + user_room.fopts = &fopts; + if(head_idx(std::nothrow, user_room) <= data.since) + return; + + user_room.get(std::nothrow, "ircd.read", room.room_id, [&] + (const m::event &event) + { + const auto &event_idx(index(event, std::nothrow)); + if(event_idx < data.since || event_idx >= data.current) + return; + + data.commit(); + json::stack::object object{*data.array}; + + // type + json::stack::member + { + object, "type", "m.receipt" + }; + + // content + const json::object data + { + at<"content"_>(event) + }; + + thread_local char buf[1024]; + const json::members reformat + { + { unquote(data.at("event_id")), + { + { "m.read", + { + { at<"sender"_>(event), + { + { "ts", data.at("ts") } + }} + }} + }} + }; + + json::stack::member + { + object, "content", json::stringify(mutable_buffer{buf}, reformat) + }; + }); + }}; + + return true; +} diff --git a/modules/client/sync/rooms/state.cc b/modules/client/sync/rooms/state.cc new file mode 100644 index 000000000..c73dd02d0 --- /dev/null +++ b/modules/client/sync/rooms/state.cc @@ -0,0 +1,92 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +ircd::mapi::header +IRCD_MODULE +{ + "Client Sync :Room State" +}; + +namespace ircd::m::sync +{ + static bool room_state_polylog(data &); + extern item room_state; +} + +decltype(ircd::m::sync::room_state) +ircd::m::sync::room_state +{ + "rooms...state", + room_state_polylog +}; + +bool +ircd::m::sync::room_state_polylog(data &data) +{ + static const m::event::keys::include default_keys + { + "content", + "depth", + "event_id", + "origin_server_ts", + "redacts", + "room_id", + "sender", + "state_key", + "type", + }; + + static const m::event::fetch::opts fopts + { + default_keys + }; + + json::stack::object out{*data.member}; + json::stack::member member + { + out, "events" + }; + + json::stack::array array + { + member + }; + + ctx::mutex mutex; + const event::closure_idx each_idx{[&data, &array, &mutex] + (const m::event::idx &event_idx) + { + assert(event_idx); + const event::fetch event + { + event_idx, std::nothrow, &fopts + }; + + if(!event.valid || at<"depth"_>(event) >= int64_t(data.state_at)) + return; + + const std::lock_guard lock{mutex}; + array.append(event); + data.commit(); + }}; + + const event::closure_idx _each_idx{[&data, &each_idx] + (const m::event::idx &event_idx) + { + assert(event_idx); + if(event_idx >= data.since && event_idx <= data.current) + each_idx(event_idx); + }}; + + const m::room &room{*data.room}; + const m::room::state state{room}; + state.for_each(_each_idx); + return true; +} diff --git a/modules/client/sync/rooms/timeline.cc b/modules/client/sync/rooms/timeline.cc new file mode 100644 index 000000000..936482042 --- /dev/null +++ b/modules/client/sync/rooms/timeline.cc @@ -0,0 +1,124 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +ircd::mapi::header +IRCD_MODULE +{ + "Client Sync :Room Timeline" +}; + +namespace ircd::m::sync +{ + static event::id::buf _room_timeline_events(data &, json::stack::array &, const m::room &, bool &); + static bool room_timeline_polylog(data &); + extern item room_timeline; +} + +decltype(ircd::m::sync::room_timeline) +ircd::m::sync::room_timeline +{ + "rooms...timeline", + room_timeline_polylog +}; + +bool +ircd::m::sync::room_timeline_polylog(data &data) +{ + json::stack::object out{*data.member}; + + // events + bool limited{false}; + m::event::id::buf prev; + { + json::stack::member member{out, "events"}; + json::stack::array array{member}; + prev = _room_timeline_events(data, array, *data.room, limited); + } + + // prev_batch + json::stack::member + { + out, "prev_batch", string_view{prev} + }; + + // limited + json::stack::member + { + out, "limited", json::value{limited} + }; + + return true; +} + +ircd::m::event::id::buf +ircd::m::sync::_room_timeline_events(data &data, + json::stack::array &out, + const m::room &room, + bool &limited) +{ + static const m::event::fetch::opts fopts + { + m::event::keys::include + { + "content", + "depth", + "event_id", + "origin_server_ts", + "prev_events", + "redacts", + "room_id", + "sender", + "state_key", + "type", + }, + }; + + // messages seeks to the newest event, but the client wants the oldest + // event first so we seek down first and then iterate back up. Due to + // an issue with rocksdb's prefix-iteration this iterator becomes + // toxic as soon as it becomes invalid. As a result we have to copy the + // event_id on the way down in case of renewing the iterator for the + // way back. This is not a big deal but rocksdb should fix their shit. + ssize_t i(0); + m::event::id::buf event_id; + m::room::messages it + { + room, &fopts + }; + + for(; it && i < 10; --it, ++i) + { + event_id = it.event_id(); + if(it.event_idx() < data.since) + break; + + if(it.event_idx() > data.current) + break; + } + + limited = i >= 10; + if(i > 0) + data.commit(); + + if(i > 0 && !it) + it.seek(event_id); + + if(i > 0 && it) + { + const m::event &event{*it}; + data.state_at = at<"depth"_>(event); + } + + if(i > 0) + for(; it && i > -1; ++it, --i) + out.append(*it); + + return event_id; +} diff --git a/modules/client/sync/rooms/unread_notifications.cc b/modules/client/sync/rooms/unread_notifications.cc new file mode 100644 index 000000000..d38faa0bf --- /dev/null +++ b/modules/client/sync/rooms/unread_notifications.cc @@ -0,0 +1,89 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +ircd::mapi::header +IRCD_MODULE +{ + "Client Sync :Room Unread Notifications" +}; + +namespace ircd::m::sync +{ + static long _notification_count(const room &, const event::idx &a, const event::idx &b); + static long _highlight_count(const room &, const user &u, const event::idx &a, const event::idx &b); + static bool room_unread_notifications_polylog(data &); + extern item room_unread_notifications; +} + +decltype(ircd::m::sync::room_unread_notifications) +ircd::m::sync::room_unread_notifications +{ + "rooms...unread_notifications", + room_unread_notifications_polylog +}; + +bool +ircd::m::sync::room_unread_notifications_polylog(data &data) +{ + auto &room{*data.room}; + m::event::id::buf last_read; + if(!m::receipt::read(last_read, room.room_id, data.user)) + return false; + + json::stack::object out{*data.member}; + const auto last_read_idx + { + index(last_read) + }; + + // highlight_count + json::stack::member + { + out, "highlight_count", json::value + { + _highlight_count(room, data.user, last_read_idx, data.current) + } + }; + + // notification_count + json::stack::member + { + out, "notification_count", json::value + { + _notification_count(room, last_read_idx, data.current) + } + }; + + return true; +} + +long +ircd::m::sync::_notification_count(const room &room, + const event::idx &a, + const event::idx &b) +{ + return m::count_since(room, a, a < b? b : a); +} + +long +ircd::m::sync::_highlight_count(const room &r, + const user &u, + const event::idx &a, + const event::idx &b) +{ + using proto = size_t (const user &, const room &, const event::idx &, const event::idx &); + + static mods::import count + { + "m_user", "highlighted_count__between" + }; + + return count(u, r, a, a < b? b : a); +}