From cc586a352e0f2da3dbaa30457ecc8ffd8497cb1f Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 26 Feb 2019 17:02:21 -0800 Subject: [PATCH] modules/client/sync: Implement iteration base of linear sync. --- include/ircd/m/sync.h | 3 +- ircd/m.cc | 3 + modules/client/sync.cc | 202 +++++++++++++++++++++++++++++++++-------- modules/client/sync.h | 15 +-- 4 files changed, 173 insertions(+), 50 deletions(-) diff --git a/include/ircd/m/sync.h b/include/ircd/m/sync.h index 91d53b167..0a963e951 100644 --- a/include/ircd/m/sync.h +++ b/include/ircd/m/sync.h @@ -95,7 +95,8 @@ struct ircd::m::sync::data const m::event *event {nullptr}; const m::room *room {nullptr}; string_view membership; - event::idx room_head {0}; + event::idx room_head {0}; // if *room + event::idx event_idx {0}; // if *event data(const m::user &user, const m::events::range &range, diff --git a/ircd/m.cc b/ircd/m.cc index 673dde76b..fddf32013 100644 --- a/ircd/m.cc +++ b/ircd/m.cc @@ -755,6 +755,9 @@ bool ircd::m::sync::item::linear(data &data) try { + if(!enable) + return false; + return _linear(data); } catch(const std::bad_function_call &e) diff --git a/modules/client/sync.cc b/modules/client/sync.cc index f6ff871d2..fe118fab3 100644 --- a/modules/client/sync.cc +++ b/modules/client/sync.cc @@ -49,8 +49,8 @@ ircd::m::sync::buffer_size { "default", long(128_KiB) }, }; -decltype(ircd::m::sync::linear::delta_max) -ircd::m::sync::linear::delta_max +decltype(ircd::m::sync::linear_delta_max) +ircd::m::sync::linear_delta_max { { "name", "ircd.client.sync.linear.delta.max" }, { "default", 1024 }, @@ -145,9 +145,9 @@ ircd::m::sync::handle_get(client &client, { range.first > range.second? false: - range.second - range.first <= size_t(linear::delta_max)? - polylog::handle(data): - polylog::handle(data) + range.second - range.first <= size_t(linear_delta_max)? + polylog_handle(data): + polylog_handle(data) }; // When shortpoll was successful, do nothing else. @@ -208,12 +208,20 @@ ircd::m::sync::flush(data &data, }; } -// // polylog // +// Random access approach for large `since` ranges. The /sync schema itself is +// recursed. For every component in the schema, the handler seeks the events +// appropriate for the user and appends it to the output. Concretely, this +// involves a full iteration of the rooms a user is a member of, and a full +// iteration of the presence status for all users visible to a user, etc. +// +// This entire process occurs in a single pass. The schema is traced with +// json::stack and its buffer is flushed to the client periodically with +// chunked encoding. bool -ircd::m::sync::polylog::handle(data &data) +ircd::m::sync::polylog_handle(data &data) try { json::stack::checkpoint checkpoint @@ -279,49 +287,72 @@ catch(const std::exception &e) // // linear // +// Approach for small `since` ranges. The range of events is iterated and +// the event itself is presented to each handler in the schema. This also +// involves a json::stack trace of the schema so that if the handler determines +// the event is appropriate for syncing to the user the output buffer will +// contain a residue of a /sync response with a single event. +// +// After the iteration of events is complete we are left with several buffers +// of properly formatted individual /sync responses which we rewrite into a +// single response to overcome the inefficiency of request ping-pong under +// heavy load. + +namespace ircd::m::sync +{ + static bool linear_proffer_event_one(data &); + static size_t linear_proffer_event(data &, const mutable_buffer &); + static event::idx linear_proffer(data &, window_buffer &); + static void linear_rewrite(data &, const json::vector &); +} bool -ircd::m::sync::linear::handle(data &data) +ircd::m::sync::linear_handle(data &data) try { - bool ret{false}; - m::events::for_each(data.range, [&data, &ret] - (const m::event::idx &event_idx, const m::event &event) + json::stack::checkpoint checkpoint { - const scope_restore theirs + *data.out + }; + + const unique_buffer buf + { + 96_KiB //TODO: XXX + }; + + window_buffer wb{buf}; + const event::idx last + { + linear_proffer(data, wb) + }; + + const json::vector vector + { + wb.completed() + }; + + if(last) + { + json::stack::member { - data.event, &event + *data.out, "next_batch", json::value + { + lex_cast(last+1), json::STRING + } }; - m::sync::for_each(string_view{}, [&data, &ret] - (item &item) - { - json::stack::member member - { - data.out, item.member_name() - }; - - ret |= item.linear(data); - return true; - }); - - return true; - }); - - json::stack::member - { - data.out, "next_batch", json::value - { - lex_cast(data.range.second), json::STRING - } - }; + linear_rewrite(data, vector); + } + else checkpoint.rollback(); log::debug { - log, "linear %s complete", loghead(data) + log, "linear %s last:%lu complete", + loghead(data), + last }; - return ret; + return last; } catch(const std::exception &e) { @@ -335,6 +366,103 @@ catch(const std::exception &e) throw; } +void +ircd::m::sync::linear_rewrite(data &data, + const json::vector &vector) +{ + +} + +/// Iterates the events in the data.range and creates a json::vector in +/// the supplied window_buffer. The return value is the event_idx of the +/// last event which fit in the buffer, or 0 of nothing was of interest +/// to our client in the event iteration. +ircd::m::event::idx +ircd::m::sync::linear_proffer(data &data, + window_buffer &wb) +{ + event::idx ret(0); + m::events::for_each(data.range, [&data, &wb, &ret] + (const m::event::idx &event_idx, const m::event &event) + { + const scope_restore their_event + { + data.event, &event + }; + + const scope_restore their_event_idx + { + data.event_idx, event_idx + }; + + wb([&data, &ret, &event_idx] + (const mutable_buffer &buf) + { + const auto consumed + { + linear_proffer_event(data, buf) + }; + + if(consumed) + ret = event_idx; + + return consumed; + }); + + return wb.remaining() >= 65_KiB; //TODO: XXX + }); + + return ret; +} + +/// Sets up a json::stack for the iteration of handlers for +/// one event. +size_t +ircd::m::sync::linear_proffer_event(data &data, + const mutable_buffer &buf) +{ + json::stack out{buf}; + const scope_restore their_out + { + data.out, &out + }; + + return linear_proffer_event_one(data)? + size(out.completed()): + 0UL; +} + +/// Generates a candidate /sync response for a single event by +/// iterating all of the handlers. +bool +ircd::m::sync::linear_proffer_event_one(data &data) +{ + json::stack::object top + { + *data.out + }; + + return !m::sync::for_each(string_view{}, [&data] + (item &item) + { + json::stack::checkpoint checkpoint + { + *data.out + }; + + json::stack::object object + { + *data.out, item.member_name() + }; + + if(item.linear(data)) + return false; + + checkpoint.rollback(); + return true; + }); +} + // // longpoll // diff --git a/modules/client/sync.h b/modules/client/sync.h index 3d1b25c30..e00d70db0 100644 --- a/modules/client/sync.h +++ b/modules/client/sync.h @@ -21,9 +21,12 @@ namespace ircd::m::sync extern conf::item flush_hiwat; extern conf::item buffer_size; + extern conf::item linear_delta_max; static const_buffer flush(data &, resource::response::chunked &, const const_buffer &); static void empty_response(data &); + static bool linear_handle(data &); + static bool polylog_handle(data &); static resource::response handle_get(client &, const resource::request &); } @@ -65,18 +68,6 @@ namespace ircd::m::sync::longpoll extern m::hookfn notified; } -namespace ircd::m::sync::linear -{ - extern conf::item delta_max; - - static bool handle(data &); -} - -namespace ircd::m::sync::polylog -{ - static bool handle(data &); -} - struct ircd::m::sync::args { static conf::item timeout_max;