diff --git a/include/ircd/m/acquire.h b/include/ircd/m/acquire.h index dcfa29890..0ca811139 100644 --- a/include/ircd/m/acquire.h +++ b/include/ircd/m/acquire.h @@ -45,6 +45,9 @@ struct ircd::m::acquire bool fetch_history(event::idx &); void acquire_history(); + bool fetch_timeline(event::idx &); + void acquire_timeline(); + bool fetch_state(const m::event::id &, const string_view &); void acquire_state(); @@ -74,10 +77,14 @@ struct ircd::m::acquire::opts /// a depth ceiling effectively makes this false. bool head {true}; - /// Perform history acquisition. Setting this to false disables operations - /// which fill in gaps in the timeline below the head. + /// Perform history acquisition. Setting this to false disables depthwise + /// operations which fill in timeline gaps below the head. bool history {true}; + /// Perform timeline acquisition. Setting this to false disables + /// breadthwise operations which fill in timeline gaps below the head. + bool timeline {true}; + /// Perform state acquisition. Setting this to false may result in an /// acquisition that is missing state events and subject to inconsistency /// from the ABA problem etc. @@ -109,6 +116,9 @@ struct ircd::m::acquire::opts /// unlimited and that's usually a waste of time in practice. size_t attempt_max {16}; + /// Limit on the depth of leafs pursued by the timeline acquisition. + size_t leaf_depth {0}; + /// Default vm::opts to be used during eval; some options are /// unconditionally overriden to perform some evals. Use caution, setting /// options may cause results not expected from this interface. diff --git a/matrix/acquire.cc b/matrix/acquire.cc index 0ac9cfee3..c11797a13 100644 --- a/matrix/acquire.cc +++ b/matrix/acquire.cc @@ -48,7 +48,7 @@ ircd::m::acquire::acquire::acquire(const struct opts &opts) state_vmopts.wopts.appendix.set(dbs::appendix::ROOM_HEAD, false); } - if(opts.history) + if(opts.history || opts.timeline) { history_vmopts.notify_servers = false; history_vmopts.phase.set(m::vm::phase::NOTIFY, false); @@ -66,6 +66,10 @@ ircd::m::acquire::acquire::acquire(const struct opts &opts) if(opts.history) acquire_history(); + // Branch to acquire timeline + if(opts.timeline) + acquire_timeline(); + // Branch to acquire state if(opts.state) acquire_state(); @@ -248,6 +252,176 @@ ircd::m::acquire::fetch_history(event::idx &ref_min) return ret; } +void +ircd::m::acquire::acquire_timeline() +{ + event::idx ref_min + { + opts.ref.first + }; + + for(size_t i(0); i < opts.rounds; ++i) + { + if(!fetch_timeline(ref_min)) + break; + + if(ref_min > opts.ref.second) + break; + } +} + +bool +ircd::m::acquire::fetch_timeline(event::idx &ref_min) +{ + bool ret(false); + auto _ref_min(ref_min); + std::set pe; + std::deque pq; + + event::idx _event_idx; + if(opts.room.event_id) + if((_event_idx = m::index(std::nothrow, opts.room.event_id))) + pq.emplace_back(_event_idx); + + if(pq.empty()) + m::room::head(opts.room).for_each([&pq] + (const auto &event_idx, const auto &event_id) + { + pq.emplace_back(event_idx); + return pq.size() < event::prev::MAX; + }); + + if(pq.empty()) + pq.emplace_back(m::head_idx(opts.room)); + + size_t submits(0); + size_t leaf_ctr(0); + m::event::fetch e, p; do + { + const auto ref_idx + { + pq.front() + }; + + pq.pop_front(); + if(ref_idx < opts.ref.first || ref_idx < ref_min) + continue; + + if(ref_idx > opts.ref.second) + continue; + + if(!seek(std::nothrow, e, ref_idx)) + continue; + + const event::prev prev{e}; + event::id _prev_id[prev.MAX]; + const auto &prev_id + { + prev.ids(_prev_id) + }; + + assert(prev_id.size() <= prev.MAX); + event::idx _prev_idx[prev.MAX]; + const auto prev_idx + { + prev.idxs(_prev_idx) + }; + + size_t fetched(0); + for(size_t i(0); i < prev_idx.size(); ++i) + { + if(prev_idx[i]) + continue; + + const bool submitted + { + submit(prev_id[i], opts.hint, false, 1, &history_vmopts) + }; + + if(!submitted) + continue; + + log::debug + { + log, "Fetch from %s (%lu) miss prev %s fetch:%zu in %s pe:%zu pq:%zu fetching:%zu", + string_view{e.event_id}, + ref_idx, + string_view{prev_id[i]}, + fetched, + string_view{opts.room.room_id}, + pe.size(), + pq.size(), + fetching.size(), + }; + + ++fetched; + ++submits; + ret |= true; + } + + if(pq.size() >= (opts.leaf_depth?: prev.MAX)) + continue; + + if(opts.leaf_depth || opts.viewport_size) + { + if(prev_id.size() == 1) + { + if(++leaf_ctr % (opts.viewport_size?: opts.leaf_depth) == 0) + continue; + } + else leaf_ctr = 0; + } + + size_t pushed(0); + for(size_t i(0); i < prev_idx.size(); ++i) + { + if(!prev_idx[i]) + continue; + + auto it(pe.lower_bound(prev_idx[i])); + if(it != end(pe) && *it == prev_idx[i]) + continue; + + pe.emplace_hint(it, prev_idx[i]); + if(!seek(std::nothrow, p, prev_idx[i])) + continue; + + pq.emplace_back(prev_idx[i]); + _ref_min = std::max(_ref_min, prev_idx[i]); + ++pushed; + log::debug + { + log, "Queue from %s (%lu) next prev %s (%lu) push:%zu in %s pe:%zu pq:%zu fetching:%zu", + string_view{e.event_id}, + ref_idx, + string_view{prev_id[i]}, + prev_idx[i], + pushed, + string_view{opts.room.room_id}, + pe.size(), + pq.size(), + fetching.size(), + }; + } + } + while(!pq.empty() && submits < opts.fetch_max); + + log::debug + { + log, "Round in %s pe:%zu pq:%zu submits:%zu fetching:%zu ref_min:%lu:%lu", + string_view{opts.room.room_id}, + pe.size(), + pq.size(), + submits, + fetching.size(), + ref_min, + _ref_min, + }; + + ref_min = std::max(ref_min, _ref_min); + return ret; +} + void ircd::m::acquire::acquire_state() {