diff --git a/include/ircd/m/sync.h b/include/ircd/m/sync.h index c63e62a13..379d1569e 100644 --- a/include/ircd/m/sync.h +++ b/include/ircd/m/sync.h @@ -51,6 +51,7 @@ struct ircd::m::sync::item handle _linear; json::strung feature; json::object opts; + bool phased; public: string_view name() const; @@ -77,6 +78,10 @@ struct ircd::m::sync::data /// index is one beyond the vm::current_sequence and used for next_batch. m::events::range range; + /// Whether to enable phased sync mode. The range.first will be <= 0 + /// in this case, and only handlers with the phased feature + bool phased {false}; + /// Statistics tracking. If null, stats won't be accumulated for the sync. sync::stats *stats {nullptr}; diff --git a/ircd/m.cc b/ircd/m.cc index 2fb18b9db..61f23561b 100644 --- a/ircd/m.cc +++ b/ircd/m.cc @@ -569,7 +569,7 @@ ircd::m::sync::loghead(const data &data) return fmt::sprintf { - headbuf, "%s %s %lu:%lu|%lu chunk:%zu sent:%s of %s in %s", + headbuf, "%s %s %ld:%lu|%lu chunk:%zu sent:%s of %s in %s", remstr, string_view{data.user.user_id}, data.range.first, @@ -697,6 +697,10 @@ ircd::m::sync::item::item(std::string name, { this->feature } +,phased +{ + opts.get("phased", false) +} { log::debug { @@ -722,9 +726,22 @@ bool ircd::m::sync::item::polylog(data &data) try { + // Skip the item if disabled by configuration if(!enable) return false; + // Skip the item for phased-sync ranges if it's not phased-sync aware. + if(!phased && data.phased && int64_t(data.range.first) < 0L) + { + assert(data.phased); + return false; + } + + // Skip the item for the initial-sync pass if it's phased-sync aware; + // it will be called for the first time at the next phase. + if(phased && data.phased && data.range.first == 0UL) + return false; + #ifdef RB_DEBUG sync::stats stats { diff --git a/modules/client/sync.cc b/modules/client/sync.cc index c72e6a6b3..56b0d1fcf 100644 --- a/modules/client/sync.cc +++ b/modules/client/sync.cc @@ -98,6 +98,13 @@ ircd::m::sync::linear_delta_max { "help", linear_delta_max_help }, }; +decltype(ircd::m::sync::polylog_phased) +ircd::m::sync::polylog_phased +{ + { "name", "ircd.client.sync.polylog.phased" }, + { "default", true }, +}; + decltype(ircd::m::sync::polylog_only) ircd::m::sync::polylog_only { @@ -145,18 +152,34 @@ ircd::m::sync::handle_get(client &client, args.since, std::min(args.next_batch, m::vm::sequence::retired + 1) }; - // When the range indexes are the same, the client is polling for the next - // event which doesn't exist yet. There is no reason for the since parameter - // to be greater than that. - if(range.first > range.second) + // The phased initial sync feature uses negative since tokens. + const bool phased_range + { + int64_t(range.first) < 0L + }; + + // Check if the admin disabled phased sync. + if(!polylog_phased && phased_range) throw m::NOT_FOUND { - "Since parameter '%lu' is too far in the future." - " Cannot be greater than '%lu'.", + "Since parameter '%ld' must be >= 0.", range.first, - range.second }; + // When the range indexes are the same, the client is polling for the next + // event which doesn't exist yet. There is no reason for the since parameter + // to be greater than that, unless it's a negative integer and phased + // sync is enabled + if(!polylog_phased || !phased_range) + if(range.first > range.second) + throw m::NOT_FOUND + { + "Since parameter '%lu' is too far in the future." + " Cannot be greater than '%lu'.", + range.first, + range.second + }; + // Keep state for statistics of this sync here on the stack. stats stats; data data @@ -169,6 +192,18 @@ ircd::m::sync::handle_get(client &client, args.filter_id }; + const bool initial_sync + { + range.first == 0UL + }; + + // Conditions for phased sync for this client + data.phased = + { + (polylog_phased && args.phased) && + (phased_range || initial_sync) + }; + // Start the chunked encoded response. resource::response::chunked response { @@ -190,11 +225,13 @@ ircd::m::sync::handle_get(client &client, const bool should_longpoll { + !data.phased && range.first > vm::sequence::retired }; const bool should_linear { + !data.phased && !should_longpoll && !bool(polylog_only) && range.second - range.first <= size_t(linear_delta_max) @@ -213,8 +250,9 @@ ircd::m::sync::handle_get(client &client, if(shortpolled) return {}; - if(longpoll_enable && longpoll::poll(data, args)) - return {}; + if(longpoll_enable && (!data.phased || initial_sync)) + if(longpoll::poll(data, args)) + return {}; const auto &next_batch { @@ -335,23 +373,34 @@ try }); if(ret) + { + const int64_t next_batch + { + data.phased? + int64_t(data.range.first) - 1L: + int64_t(data.range.second) + }; + json::stack::member { *data.out, "next_batch", json::value { - lex_cast(data.range.second), json::STRING + lex_cast(next_batch), json::STRING } }; + } if(!ret) checkpoint.decommit(); - if(stats_info) log::info + if(!data.phased && stats_info) log::info { - log, "request %s polylog commit:%b complete @%lu", + log, "request %s polylog commit:%b complete @%ld", loghead(data), ret, - data.range.second + data.phased? + data.range.first: + data.range.second }; return ret; diff --git a/modules/client/sync.h b/modules/client/sync.h index a55b81005..6fb2b7ee9 100644 --- a/modules/client/sync.h +++ b/modules/client/sync.h @@ -24,6 +24,7 @@ namespace ircd::m::sync extern conf::item linear_buffer_size; extern conf::item linear_delta_max; extern conf::item longpoll_enable; + extern conf::item polylog_phased; extern conf::item polylog_only; static const_buffer flush(data &, resource::response::chunked &, const const_buffer &); @@ -143,5 +144,10 @@ struct ircd::m::sync::args request.query.get("set_presence", true) }; + bool phased + { + request.query.get("phased", true) + }; + args(const resource::request &request); }; diff --git a/modules/client/sync/rooms.cc b/modules/client/sync/rooms.cc index c80155337..58dbf274f 100644 --- a/modules/client/sync/rooms.cc +++ b/modules/client/sync/rooms.cc @@ -19,7 +19,7 @@ namespace ircd::m::sync static bool should_ignore(const data &); static bool _rooms_polylog_room(data &, const m::room &); - static bool _rooms_polylog(data &, const string_view &membership); + static bool _rooms_polylog(data &, const string_view &membership, int64_t &phase); static bool rooms_polylog(data &); static bool _rooms_linear(data &, const string_view &membership); @@ -31,9 +31,10 @@ namespace ircd::m::sync decltype(ircd::m::sync::rooms) ircd::m::sync::rooms { - "rooms", - rooms_polylog, - rooms_linear + "rooms", rooms_polylog, rooms_linear, + { + { "phased", true } + } }; bool @@ -92,16 +93,31 @@ bool ircd::m::sync::rooms_polylog(data &data) { bool ret{false}; - ret |= _rooms_polylog(data, "invite"); - ret |= _rooms_polylog(data, "join"); - ret |= _rooms_polylog(data, "leave"); - ret |= _rooms_polylog(data, "ban"); + int64_t phase(0); + + ret |= _rooms_polylog(data, "join", phase); + if(data.phased && ret) + return ret; + + ret |= _rooms_polylog(data, "invite", phase); + if(data.phased && ret) + return ret; + + ret |= _rooms_polylog(data, "leave", phase); + if(data.phased && ret) + return ret; + + ret |= _rooms_polylog(data, "ban", phase); + if(data.phased && ret) + return ret; + return ret; } bool ircd::m::sync::_rooms_polylog(data &data, - const string_view &membership) + const string_view &membership, + int64_t &phase) { const scope_restore theirs { @@ -114,9 +130,23 @@ ircd::m::sync::_rooms_polylog(data &data, }; bool ret{false}; - data.user_rooms.for_each(membership, [&data, &ret] + const user::rooms::closure_bool closure{[&data, &ret, &phase] (const m::room &room, const string_view &membership_) { + assert(!data.phased || int64_t(data.range.first) < 0L); + + if(data.phased) + { + if(phase >= int64_t(data.range.first)) + { + --phase; + return true; + } + + if(phase < int64_t(data.range.first) && ret) + return false; + } + #if defined(RB_DEBUG) sync::stats stats { @@ -129,7 +159,20 @@ ircd::m::sync::_rooms_polylog(data &data, stats.timer = timer{}; #endif - ret |= _rooms_polylog_room(data, room); + { + const scope_restore range + { + data.range.first, data.phased? 0UL : data.range.first + }; + + ret |= _rooms_polylog_room(data, room); + } + + if(data.phased && !ret) + { + --data.range.first; + return true; + } #if defined(RB_DEBUG) thread_local char tmbuf[32]; @@ -141,7 +184,14 @@ ircd::m::sync::_rooms_polylog(data &data, ircd::pretty(tmbuf, stats.timer.at(), true) }; #endif - }); + + return true; + }}; + + const bool done + { + data.user_rooms.for_each(membership, closure) + }; return ret; }