mirror of
https://github.com/matrix-construct/construct
synced 2025-02-18 09:40:12 +01:00
modules/m_vm_fetch: Preliminary state acquire hook.
This commit is contained in:
parent
0bbc45e634
commit
1afa0daf3f
1 changed files with 181 additions and 1 deletions
|
@ -13,6 +13,8 @@ namespace ircd::m::vm::fetch
|
|||
static void prev_check(const event &, vm::eval &);
|
||||
static std::forward_list<ctx::future<m::fetch::result>> prev_fetch(const event &, vm::eval &, const room &);
|
||||
static void prev(const event &, vm::eval &, const room &);
|
||||
static std::forward_list<ctx::future<m::fetch::result>> state_fetch(const event &, vm::eval &, const room &);
|
||||
static void state(const event &, vm::eval &, const room &);
|
||||
static void auth_chain_eval(const event &, vm::eval &, const room &, const json::array &);
|
||||
static void auth_chain(const event &, vm::eval &, const room &);
|
||||
static void auth(const event &, vm::eval &, const room &);
|
||||
|
@ -112,6 +114,9 @@ try
|
|||
|
||||
if(opts.fetch_prev)
|
||||
prev(event, eval, room);
|
||||
|
||||
if(opts.fetch_state)
|
||||
state(event, eval, room);
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
|
@ -316,6 +321,181 @@ catch(const std::exception &e)
|
|||
throw;
|
||||
}
|
||||
|
||||
//
|
||||
// state handler stack
|
||||
//
|
||||
|
||||
void
|
||||
ircd::m::vm::fetch::state(const event &event,
|
||||
vm::eval &eval,
|
||||
const room &room)
|
||||
{
|
||||
const event::prev prev{event};
|
||||
const size_t prev_miss
|
||||
{
|
||||
prev.prev_events_count() - prev.prev_events_exist()
|
||||
};
|
||||
|
||||
if(!prev_miss)
|
||||
return;
|
||||
|
||||
const auto &[sounding_depth, sounding_idx]
|
||||
{
|
||||
m::sounding(room)
|
||||
};
|
||||
|
||||
if(at<"depth"_>(event) < sounding_depth)
|
||||
return;
|
||||
|
||||
auto futures
|
||||
{
|
||||
state_fetch(event, eval, room)
|
||||
};
|
||||
|
||||
if(!std::distance(begin(futures), end(futures)))
|
||||
return;
|
||||
|
||||
auto fetching
|
||||
{
|
||||
ctx::when_all(begin(futures), end(futures))
|
||||
};
|
||||
|
||||
log::info
|
||||
{
|
||||
log, "%s fetching %zu missing state events in %s",
|
||||
loghead(eval),
|
||||
std::distance(begin(futures), end(futures)),
|
||||
string_view{room.room_id},
|
||||
};
|
||||
|
||||
// yields context
|
||||
fetching.wait();
|
||||
|
||||
// evaluate results
|
||||
size_t good(0), fail(0);
|
||||
for(auto &future : futures) try
|
||||
{
|
||||
m::fetch::result result
|
||||
{
|
||||
future.get()
|
||||
};
|
||||
|
||||
const json::array &pdus
|
||||
{
|
||||
json::object(result).get("pdus")
|
||||
};
|
||||
|
||||
auto opts(*eval.opts);
|
||||
opts.fetch_prev = false;
|
||||
opts.fetch_state = false;
|
||||
vm::eval
|
||||
{
|
||||
pdus, opts
|
||||
};
|
||||
|
||||
++good;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
++fail;
|
||||
log::derror
|
||||
{
|
||||
log, "%s state eval :%s",
|
||||
loghead(eval),
|
||||
e.what(),
|
||||
};
|
||||
}
|
||||
|
||||
log::info
|
||||
{
|
||||
log, "%s evaluated missing state in %s fetched:%zu good:%zu fail:%zu",
|
||||
loghead(eval),
|
||||
string_view{room.room_id},
|
||||
std::distance(begin(futures), end(futures)),
|
||||
good,
|
||||
fail,
|
||||
};
|
||||
}
|
||||
|
||||
std::forward_list
|
||||
<
|
||||
ircd::ctx::future<ircd::m::fetch::result>
|
||||
>
|
||||
ircd::m::vm::fetch::state_fetch(const event &event,
|
||||
vm::eval &eval,
|
||||
const room &room)
|
||||
{
|
||||
feds::opts opts;
|
||||
opts.op = feds::op::state;
|
||||
opts.event_id = room.event_id;
|
||||
opts.room_id = room.room_id;
|
||||
opts.arg[0] = "ids";
|
||||
opts.exclude_myself = true;
|
||||
opts.closure_errors = false;
|
||||
opts.nothrow_closure = true;
|
||||
log::debug
|
||||
{
|
||||
log, "%s acquire state event ids in %s from %zu servers (est).",
|
||||
loghead(eval),
|
||||
string_view{room.room_id},
|
||||
};
|
||||
|
||||
std::set<std::string, std::less<>> req;
|
||||
std::forward_list<ctx::future<m::fetch::result>> ret;
|
||||
feds::execute(opts, [&eval, &ret, &req]
|
||||
(const auto &result)
|
||||
{
|
||||
const auto each_state_id{[&eval, &ret, &req, &result]
|
||||
(const m::event::id &event_id)
|
||||
{
|
||||
if(m::exists(event_id))
|
||||
return;
|
||||
|
||||
auto it(req.lower_bound(event_id));
|
||||
if(it != end(req) && *it == event_id)
|
||||
return;
|
||||
|
||||
req.emplace_hint(it, event_id);
|
||||
|
||||
m::fetch::opts opts;
|
||||
opts.op = m::fetch::op::event;
|
||||
opts.room_id = result.request->room_id;
|
||||
opts.event_id = event_id;
|
||||
ret.emplace_front(m::fetch::start(opts));
|
||||
|
||||
assert(std::distance(begin(ret), end(ret)) <= ssize_t(req.size()));
|
||||
log::debug
|
||||
{
|
||||
log, "%s requesting state event %s off %s in %s reqs:%zu",
|
||||
loghead(eval),
|
||||
string_view{event_id},
|
||||
string_view{result.request->event_id},
|
||||
string_view{result.request->room_id},
|
||||
};
|
||||
}};
|
||||
|
||||
const json::array &auth_chain_ids
|
||||
{
|
||||
result.object["auth_chain_ids"]
|
||||
};
|
||||
|
||||
for(const json::string &auth_chain_id : auth_chain_ids)
|
||||
each_state_id(auth_chain_id);
|
||||
|
||||
const json::array &pdus
|
||||
{
|
||||
result.object["pdus"]
|
||||
};
|
||||
|
||||
for(const json::string &pdu : pdus)
|
||||
each_state_id(pdu);
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
//
|
||||
// prev_events handler stack
|
||||
//
|
||||
|
@ -403,7 +583,7 @@ ircd::m::vm::fetch::prev(const event &event,
|
|||
{
|
||||
log::derror
|
||||
{
|
||||
log, "%s :%s",
|
||||
log, "%s prev fetch/eval :%s",
|
||||
loghead(eval),
|
||||
e.what(),
|
||||
};
|
||||
|
|
Loading…
Add table
Reference in a new issue