diff --git a/include/ircd/m/acquire.h b/include/ircd/m/acquire.h index ed5bd5877..5bfe4cefc 100644 --- a/include/ircd/m/acquire.h +++ b/include/ircd/m/acquire.h @@ -50,11 +50,23 @@ struct ircd::m::acquire::opts size_t viewport_size {0}; /// The number of rounds the algorithm runs for. - size_t rounds {1}; + size_t rounds {-1UL}; /// Total event limit over all operations. size_t fetch_max {-1UL}; /// Limit the number of requests in flight at any given time. size_t fetch_width {128}; + + /// Avoids filling gaps with a depth sounding lte this value + size_t gap_min {0}; + + /// Avoids filling gaps with a depth sounding greater than this value + size_t gap_max {-1UL}; + + /// Won't fetch missing unless ref gt (newer) than this idx. + event::idx ref_min {0}; + + /// Won't fetch missing unless ref lt (older) than this idx. + event::idx ref_max {-1UL}; }; diff --git a/matrix/acquire.cc b/matrix/acquire.cc index 69963e1fa..14316de86 100644 --- a/matrix/acquire.cc +++ b/matrix/acquire.cc @@ -17,7 +17,7 @@ namespace ircd::m::acquire static bool handle(const opts &, ctx::future &); static bool handle(const opts &, list &); static void fetch_head(const opts &, list &); - static bool fetch_missing(const opts &, list &); + static event::idx fetch_missing(const opts &, list &); static void submit(const opts &, list &); }; @@ -42,8 +42,9 @@ ircd::m::acquire::log // execute::execute // -ircd::m::acquire::execute::execute(const opts &opts) +ircd::m::acquire::execute::execute(const opts &opts_) { + auto opts(opts_); list fetching; // Branch to acquire head @@ -53,36 +54,61 @@ ircd::m::acquire::execute::execute(const opts &opts) // Branch to acquire missing if(opts.missing) for(size_t i(0); i < opts.rounds; ++i) - if(!fetch_missing(opts, fetching)) + { + event::idx last; + if(!(last = fetch_missing(opts, fetching))) break; + // After each round, set the floor for the next round. + opts.ref_min = opts_.ref_min?: last + 1; + if(opts.ref_min > opts.ref_max) + break; + } + // Complete all work before returning, otherwise everything // will be cancelled on unwind. - while(handle(opts, fetching)); + while(!fetching.empty()) + while(handle(opts, fetching)); } -bool -ircd::m::acquire::fetch_missing(const opts &opts, +ircd::m::event::idx +ircd::m::acquire::fetch_missing(const opts &opts_, list &fetching) { + auto opts(opts_); const auto top { m::top(opts.room.room_id) }; + const auto &[top_id, top_depth, top_idx] + { + top + }; + + // When the viewport will be counted we seek to a depth near the room head + // by default unless the caller forced another depth floor. + if(!opts.depth.first && opts.viewport_size) + opts.depth.first = top_depth - std::clamp(long(opts.viewport_size), 1024L, top_depth); + m::room::events::missing missing { opts.room }; - bool ret(false); - missing.for_each(opts.depth, [&opts, &fetching, &top, &ret] + event::idx last(0); + missing.for_each(opts.depth, [&opts, &fetching, &top, &last] (const event::id &event_id, const int64_t &ref_depth, const event::idx &ref_idx) { - // Bail if interrupted if(ctx::interruption_requested()) return false; + if(ref_idx < opts.ref_min) + return true; + + if(ref_idx > opts.ref_max) + return true; + // Branch if we have to measure the viewportion if(opts.viewport_size) { @@ -94,7 +120,7 @@ ircd::m::acquire::fetch_missing(const opts &opts, // Bail if this event sits above the viewport. if(m::room::events::count(opts.room, range) > opts.viewport_size) - return false; + return true; } const auto ref_id @@ -119,15 +145,22 @@ ircd::m::acquire::fetch_missing(const opts &opts, std::make_pair(0L, 0UL) }; + const auto gap + { + sound_depth >= twain_depth? + size_t(sound_depth - twain_depth): + 0UL + }; + + if(gap < opts.gap_min || gap > opts.gap_max) + return true; + auto _opts(opts); _opts.room.event_id = event_id; _opts.hint = opts.hint; - _opts.viewport_size = twain_depth? - std::clamp(sound_depth - twain_depth, 1L, 48L): - 1UL; - + _opts.viewport_size = std::clamp(gap, 1UL, 48UL); submit(_opts, fetching); - ret = true; + log::debug { log, "Fetch %s miss prev of %s @%lu in %s @%lu sound:%lu twain:%ld fetching:%zu", @@ -141,17 +174,23 @@ ircd::m::acquire::fetch_missing(const opts &opts, fetching.size(), }; + last = std::max(last, ref_idx); return true; }); - return ret; + return last; } void ircd::m::acquire::fetch_head(const opts &opts, list &fetching) { - const auto handle_head{[&opts, &fetching] + const auto top + { + m::top(opts.room.room_id) + }; + + const auto handle_head{[&opts, &fetching, &top] (const m::event &result) { // Bail if interrupted @@ -162,20 +201,21 @@ ircd::m::acquire::fetch_head(const opts &opts, if(json::get<"depth"_>(result) < opts.depth.first) return false; + const auto &[top_id, top_depth, top_idx] {top}; + const auto gap + { + json::get<"depth"_>(result) - top_depth + }; + auto _opts(opts); _opts.room.event_id = result.event_id; _opts.hint = json::get<"origin"_>(result); _opts.hint_only = true; - _opts.viewport_size = 1; //XXX + _opts.viewport_size = std::clamp(gap, 1L, 48L); submit(_opts, fetching); return true; }}; - const auto top - { - m::top(opts.room.room_id) - }; - m::room::head::fetch::opts hfopts; hfopts.room_id = opts.room.room_id; hfopts.top = top; @@ -190,9 +230,7 @@ ircd::m::acquire::submit(const opts &opts, list &fetching) { start(opts, fetching); - while(!fetching.empty()) - if(!handle(opts, fetching)) - break; + while(handle(opts, fetching)); } void @@ -213,6 +251,9 @@ bool ircd::m::acquire::handle(const opts &opts, list &fetching) { + if(fetching.empty()) + return false; + const bool full { fetching.size() >= opts.fetch_width @@ -278,7 +319,8 @@ try m::vm::opts vmopts; vmopts.infolog_accept = true; vmopts.warnlog &= ~vm::fault::EXISTS; - //vmopts.phase.set(m::vm::phase::NOTIFY, false); + vmopts.notify_servers = false; + vmopts.phase.set(m::vm::phase::NOTIFY, false); vmopts.phase.set(m::vm::phase::FETCH_PREV, false); vmopts.phase.set(m::vm::phase::FETCH_STATE, false); vmopts.wopts.appendix.set(dbs::appendix::ROOM_HEAD, false); diff --git a/matrix/init_backfill.cc b/matrix/init_backfill.cc index 7480e609a..b15e5f424 100644 --- a/matrix/init_backfill.cc +++ b/matrix/init_backfill.cc @@ -305,7 +305,6 @@ ircd::m::init::backfill::handle_room(const room::id &room_id) opts.room = room_id; opts.viewport_size = ssize_t(m::room::events::viewport_size); opts.viewport_size *= size_t(viewports); - opts.rounds = opts.viewport_size / 2; m::acquire::execute { opts diff --git a/modules/console.cc b/modules/console.cc index 916e10149..7aec826a1 100644 --- a/modules/console.cc +++ b/modules/console.cc @@ -11013,7 +11013,7 @@ console_cmd__room__acquire(opt &out, const string_view &line) { const params param{line, " ", { - "room_id", "depth_start", "depth_stop", "viewport_size", "rounds" + "room_id", "depth_start", "depth_stop", "viewport_size", "gap_min", "rounds" }}; const auto &room_id @@ -11036,9 +11036,14 @@ console_cmd__room__acquire(opt &out, const string_view &line) param.at("viewport_size", 0L) }; + const auto gap_min + { + param.at("gap_min", 0UL) + }; + const auto rounds { - param.at("rounds", 1L) + param.at("rounds", -1UL) }; const m::room room @@ -11053,6 +11058,7 @@ console_cmd__room__acquire(opt &out, const string_view &line) opts.viewport_size = viewport_size; opts.rounds = rounds; opts.head = depth_stop == 0; + opts.gap_min = gap_min; m::acquire::execute { opts