mirror of
https://github.com/matrix-construct/construct
synced 2025-01-14 00:34:18 +01:00
ircd:Ⓜ️:acquire: Add state dispatch component; minor cleanup.
This commit is contained in:
parent
0110e803ef
commit
e1d52b28ce
2 changed files with 96 additions and 6 deletions
|
@ -27,6 +27,7 @@ struct ircd::m::acquire
|
|||
const struct opts &opts;
|
||||
vm::opts head_vmopts;
|
||||
vm::opts history_vmopts;
|
||||
vm::opts state_vmopts;
|
||||
std::list<result> fetching;
|
||||
|
||||
private:
|
||||
|
@ -44,6 +45,9 @@ struct ircd::m::acquire
|
|||
bool fetch_history(event::idx &);
|
||||
void acquire_history();
|
||||
|
||||
bool fetch_state(const m::event::id &, const string_view &);
|
||||
void acquire_state();
|
||||
|
||||
public:
|
||||
acquire(const struct opts &);
|
||||
acquire(const acquire &) = delete;
|
||||
|
|
|
@ -38,8 +38,19 @@ ircd::m::acquire::acquire::acquire(const struct opts &opts)
|
|||
if(opts.head)
|
||||
{
|
||||
head_vmopts.notify_servers = false;
|
||||
head_vmopts.phase.set(m::vm::phase::NOTIFY, false);
|
||||
head_vmopts.phase.set(m::vm::phase::FETCH_PREV, false);
|
||||
head_vmopts.phase.set(m::vm::phase::FETCH_STATE, opts.state);
|
||||
head_vmopts.phase.set(m::vm::phase::FETCH_STATE, false);
|
||||
head_vmopts.non_conform.set(event::conforms::MISMATCH_HASHES);
|
||||
}
|
||||
|
||||
if(opts.state)
|
||||
{
|
||||
state_vmopts.notify_servers = false;
|
||||
state_vmopts.phase.set(m::vm::phase::FETCH_PREV, false);
|
||||
state_vmopts.phase.set(m::vm::phase::FETCH_STATE, false);
|
||||
state_vmopts.non_conform.set(event::conforms::MISMATCH_HASHES);
|
||||
state_vmopts.wopts.appendix.set(dbs::appendix::ROOM_HEAD, false);
|
||||
}
|
||||
|
||||
if(opts.history)
|
||||
|
@ -48,17 +59,23 @@ ircd::m::acquire::acquire::acquire(const struct opts &opts)
|
|||
history_vmopts.phase.set(m::vm::phase::NOTIFY, false);
|
||||
history_vmopts.phase.set(m::vm::phase::FETCH_PREV, false);
|
||||
history_vmopts.phase.set(m::vm::phase::FETCH_STATE, false);
|
||||
history_vmopts.non_conform.set(event::conforms::MISMATCH_HASHES);
|
||||
history_vmopts.wopts.appendix.set(dbs::appendix::ROOM_HEAD, false);
|
||||
}
|
||||
|
||||
// Branch to acquire head
|
||||
if(opts.head)
|
||||
acquire_head();
|
||||
if(!opts.depth.second)
|
||||
acquire_head();
|
||||
|
||||
// Branch to acquire history
|
||||
if(opts.history)
|
||||
acquire_history();
|
||||
|
||||
// Branch to acquire state
|
||||
if(opts.state)
|
||||
acquire_state();
|
||||
|
||||
// Complete all work before returning, otherwise everything
|
||||
// will be cancelled on unwind.
|
||||
while(!fetching.empty())
|
||||
|
@ -230,6 +247,72 @@ ircd::m::acquire::fetch_history(event::idx &ref_min)
|
|||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::acquire::acquire_state()
|
||||
{
|
||||
m::event::id::buf event_id;
|
||||
if(opts.room.event_id)
|
||||
event_id = opts.room.event_id;
|
||||
|
||||
if(!event_id && opts.viewport_size)
|
||||
event_id = m::event_id(std::nothrow, m::viewport(opts.room).second); //TODO: opts.viewport_size
|
||||
|
||||
if(!event_id && opts.history)
|
||||
event_id = m::event_id(std::nothrow, m::sounding(opts.room).second);
|
||||
|
||||
if(!event_id && opts.head)
|
||||
event_id = m::head(opts.room);
|
||||
|
||||
if(!event_id)
|
||||
return;
|
||||
|
||||
m::room::state::fetch::opts sfopts;
|
||||
sfopts.room.room_id = opts.room.room_id;
|
||||
sfopts.room.event_id = event_id;
|
||||
m::room::state::fetch
|
||||
{
|
||||
sfopts, [this](const m::event::id &event_id, const string_view &remote)
|
||||
{
|
||||
return fetch_state(event_id, remote);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
bool
|
||||
ircd::m::acquire::fetch_state(const m::event::id &event_id,
|
||||
const string_view &remote)
|
||||
{
|
||||
// Bail if interrupted
|
||||
if(ctx::interruption_requested())
|
||||
return false;
|
||||
|
||||
const auto hostpart
|
||||
{
|
||||
event_id.host()
|
||||
};
|
||||
|
||||
const auto hint
|
||||
{
|
||||
hostpart? hostpart: remote
|
||||
};
|
||||
|
||||
const bool submitted
|
||||
{
|
||||
submit(event_id, hint, false, 1, &state_vmopts)
|
||||
};
|
||||
|
||||
if(submitted)
|
||||
log::debug
|
||||
{
|
||||
log, "Fetch %s in state of %s fetching:%zu",
|
||||
string_view{event_id},
|
||||
string_view{opts.room.room_id},
|
||||
fetching.size(),
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::m::acquire::acquire_head()
|
||||
{
|
||||
|
@ -240,10 +323,6 @@ ircd::m::acquire::acquire_head()
|
|||
{
|
||||
hfopts, [this, &hfopts](const m::event &result)
|
||||
{
|
||||
// Bail if interrupted
|
||||
if(ctx::interruption_requested())
|
||||
return false;
|
||||
|
||||
const auto &[top_id, top_depth, top_idx]
|
||||
{
|
||||
hfopts.top
|
||||
|
@ -258,6 +337,10 @@ bool
|
|||
ircd::m::acquire::fetch_head(const m::event &result,
|
||||
const int64_t &top_depth)
|
||||
{
|
||||
// Bail if interrupted
|
||||
if(ctx::interruption_requested())
|
||||
return false;
|
||||
|
||||
// Bail if the depth is below the window
|
||||
if(json::get<"depth"_>(result) < opts.depth.first)
|
||||
return false;
|
||||
|
@ -324,6 +407,8 @@ ircd::m::acquire::start(const m::event::id &event_id,
|
|||
const vm::opts *const &vmopts)
|
||||
try
|
||||
{
|
||||
assert(vmopts);
|
||||
|
||||
fetch::opts fopts;
|
||||
fopts.op = fetch::op::backfill;
|
||||
fopts.room_id = opts.room.room_id;
|
||||
|
@ -438,6 +523,7 @@ try
|
|||
false
|
||||
|| result.vmopts == &this->head_vmopts
|
||||
|| result.vmopts == &this->history_vmopts
|
||||
|| result.vmopts == &this->state_vmopts
|
||||
);
|
||||
|
||||
m::vm::eval
|
||||
|
|
Loading…
Reference in a new issue