0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-25 08:12:37 +01:00

ircd:Ⓜ️:acquire: Add divulgence acquisition algorithm to suite.

This commit is contained in:
Jason Volk 2020-12-16 18:16:05 -08:00
parent 45591b6877
commit 018f2655b6
2 changed files with 187 additions and 3 deletions

View file

@ -45,6 +45,9 @@ struct ircd::m::acquire
bool fetch_history(event::idx &);
void acquire_history();
bool fetch_timeline(event::idx &);
void acquire_timeline();
bool fetch_state(const m::event::id &, const string_view &);
void acquire_state();
@ -74,10 +77,14 @@ struct ircd::m::acquire::opts
/// a depth ceiling effectively makes this false.
bool head {true};
/// Perform history acquisition. Setting this to false disables operations
/// which fill in gaps in the timeline below the head.
/// Perform history acquisition. Setting this to false disables depthwise
/// operations which fill in timeline gaps below the head.
bool history {true};
/// Perform timeline acquisition. Setting this to false disables
/// breadthwise operations which fill in timeline gaps below the head.
bool timeline {true};
/// Perform state acquisition. Setting this to false may result in an
/// acquisition that is missing state events and subject to inconsistency
/// from the ABA problem etc.
@ -109,6 +116,9 @@ struct ircd::m::acquire::opts
/// unlimited and that's usually a waste of time in practice.
size_t attempt_max {16};
/// Limit on the depth of leafs pursued by the timeline acquisition.
size_t leaf_depth {0};
/// Default vm::opts to be used during eval; some options are
/// unconditionally overriden to perform some evals. Use caution, setting
/// options may cause results not expected from this interface.

View file

@ -48,7 +48,7 @@ ircd::m::acquire::acquire::acquire(const struct opts &opts)
state_vmopts.wopts.appendix.set(dbs::appendix::ROOM_HEAD, false);
}
if(opts.history)
if(opts.history || opts.timeline)
{
history_vmopts.notify_servers = false;
history_vmopts.phase.set(m::vm::phase::NOTIFY, false);
@ -66,6 +66,10 @@ ircd::m::acquire::acquire::acquire(const struct opts &opts)
if(opts.history)
acquire_history();
// Branch to acquire timeline
if(opts.timeline)
acquire_timeline();
// Branch to acquire state
if(opts.state)
acquire_state();
@ -248,6 +252,176 @@ ircd::m::acquire::fetch_history(event::idx &ref_min)
return ret;
}
void
ircd::m::acquire::acquire_timeline()
{
event::idx ref_min
{
opts.ref.first
};
for(size_t i(0); i < opts.rounds; ++i)
{
if(!fetch_timeline(ref_min))
break;
if(ref_min > opts.ref.second)
break;
}
}
bool
ircd::m::acquire::fetch_timeline(event::idx &ref_min)
{
bool ret(false);
auto _ref_min(ref_min);
std::set<event::idx> pe;
std::deque<event::idx> pq;
event::idx _event_idx;
if(opts.room.event_id)
if((_event_idx = m::index(std::nothrow, opts.room.event_id)))
pq.emplace_back(_event_idx);
if(pq.empty())
m::room::head(opts.room).for_each([&pq]
(const auto &event_idx, const auto &event_id)
{
pq.emplace_back(event_idx);
return pq.size() < event::prev::MAX;
});
if(pq.empty())
pq.emplace_back(m::head_idx(opts.room));
size_t submits(0);
size_t leaf_ctr(0);
m::event::fetch e, p; do
{
const auto ref_idx
{
pq.front()
};
pq.pop_front();
if(ref_idx < opts.ref.first || ref_idx < ref_min)
continue;
if(ref_idx > opts.ref.second)
continue;
if(!seek(std::nothrow, e, ref_idx))
continue;
const event::prev prev{e};
event::id _prev_id[prev.MAX];
const auto &prev_id
{
prev.ids(_prev_id)
};
assert(prev_id.size() <= prev.MAX);
event::idx _prev_idx[prev.MAX];
const auto prev_idx
{
prev.idxs(_prev_idx)
};
size_t fetched(0);
for(size_t i(0); i < prev_idx.size(); ++i)
{
if(prev_idx[i])
continue;
const bool submitted
{
submit(prev_id[i], opts.hint, false, 1, &history_vmopts)
};
if(!submitted)
continue;
log::debug
{
log, "Fetch from %s (%lu) miss prev %s fetch:%zu in %s pe:%zu pq:%zu fetching:%zu",
string_view{e.event_id},
ref_idx,
string_view{prev_id[i]},
fetched,
string_view{opts.room.room_id},
pe.size(),
pq.size(),
fetching.size(),
};
++fetched;
++submits;
ret |= true;
}
if(pq.size() >= (opts.leaf_depth?: prev.MAX))
continue;
if(opts.leaf_depth || opts.viewport_size)
{
if(prev_id.size() == 1)
{
if(++leaf_ctr % (opts.viewport_size?: opts.leaf_depth) == 0)
continue;
}
else leaf_ctr = 0;
}
size_t pushed(0);
for(size_t i(0); i < prev_idx.size(); ++i)
{
if(!prev_idx[i])
continue;
auto it(pe.lower_bound(prev_idx[i]));
if(it != end(pe) && *it == prev_idx[i])
continue;
pe.emplace_hint(it, prev_idx[i]);
if(!seek(std::nothrow, p, prev_idx[i]))
continue;
pq.emplace_back(prev_idx[i]);
_ref_min = std::max(_ref_min, prev_idx[i]);
++pushed;
log::debug
{
log, "Queue from %s (%lu) next prev %s (%lu) push:%zu in %s pe:%zu pq:%zu fetching:%zu",
string_view{e.event_id},
ref_idx,
string_view{prev_id[i]},
prev_idx[i],
pushed,
string_view{opts.room.room_id},
pe.size(),
pq.size(),
fetching.size(),
};
}
}
while(!pq.empty() && submits < opts.fetch_max);
log::debug
{
log, "Round in %s pe:%zu pq:%zu submits:%zu fetching:%zu ref_min:%lu:%lu",
string_view{opts.room.room_id},
pe.size(),
pq.size(),
submits,
fetching.size(),
ref_min,
_ref_min,
};
ref_min = std::max(ref_min, _ref_min);
return ret;
}
void
ircd::m::acquire::acquire_state()
{