mirror of
https://github.com/matrix-construct/construct
synced 2025-01-02 10:54:16 +01:00
ircd:Ⓜ️:vm::fetch: Replace redundant state fetch/eval w/ m::acquire.
This commit is contained in:
parent
7eaad92e3c
commit
84b5bf536c
1 changed files with 13 additions and 189 deletions
|
@ -423,87 +423,26 @@ try
|
|||
string_view{room.room_id},
|
||||
};
|
||||
|
||||
auto futures
|
||||
struct m::acquire::opts acq_opts;
|
||||
acq_opts.room = room;
|
||||
acq_opts.head = false;
|
||||
acq_opts.history = false;
|
||||
acq_opts.state = true;
|
||||
acq_opts.hint = opts.node_id;
|
||||
m::acquire acq
|
||||
{
|
||||
state_fetch(event, eval, room)
|
||||
acq_opts
|
||||
};
|
||||
|
||||
if(!std::distance(begin(futures), end(futures)))
|
||||
return;
|
||||
|
||||
auto fetching
|
||||
{
|
||||
ctx::when_all(begin(futures), end(futures))
|
||||
};
|
||||
|
||||
log::warning
|
||||
{
|
||||
log, "%s fetching %zu missing state events in %s",
|
||||
loghead(eval),
|
||||
std::distance(begin(futures), end(futures)),
|
||||
string_view{room.room_id},
|
||||
};
|
||||
|
||||
// yields context
|
||||
const bool done
|
||||
{
|
||||
fetching.wait(seconds(state_timeout), std::nothrow)
|
||||
};
|
||||
|
||||
// evaluate results
|
||||
size_t good(0), fail(0);
|
||||
for(auto &future : futures) try
|
||||
{
|
||||
m::fetch::result result
|
||||
{
|
||||
future.get()
|
||||
};
|
||||
|
||||
const json::object content
|
||||
{
|
||||
result
|
||||
};
|
||||
|
||||
const json::array &pdus
|
||||
{
|
||||
content["pdus"]
|
||||
};
|
||||
|
||||
auto opts(*eval.opts);
|
||||
opts.phase.set(m::vm::phase::FETCH_PREV, false);
|
||||
opts.phase.set(m::vm::phase::FETCH_STATE, false);
|
||||
opts.node_id = result.origin;
|
||||
opts.notify_servers = false;
|
||||
vm::eval
|
||||
{
|
||||
pdus, opts
|
||||
};
|
||||
|
||||
++good;
|
||||
}
|
||||
catch(const ctx::interrupted &)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
{
|
||||
++fail;
|
||||
log::derror
|
||||
{
|
||||
log, "%s state eval :%s",
|
||||
loghead(eval),
|
||||
e.what(),
|
||||
};
|
||||
}
|
||||
|
||||
//TODO: XXX
|
||||
log::info
|
||||
{
|
||||
log, "%s evaluated missing state in %s fetched:%zu good:%zu fail:%zu",
|
||||
log, "%s evaluated missing state in %s fetched:-- good:-- fail:--",
|
||||
loghead(eval),
|
||||
string_view{room.room_id},
|
||||
std::distance(begin(futures), end(futures)),
|
||||
good,
|
||||
fail,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
};
|
||||
}
|
||||
catch(const std::exception &e)
|
||||
|
@ -519,121 +458,6 @@ catch(const std::exception &e)
|
|||
throw;
|
||||
}
|
||||
|
||||
std::forward_list
|
||||
<
|
||||
ircd::ctx::future<ircd::m::fetch::result>
|
||||
>
|
||||
ircd::m::vm::fetch::state_fetch(const event &event,
|
||||
vm::eval &eval,
|
||||
const room &room)
|
||||
{
|
||||
feds::opts opts;
|
||||
opts.op = feds::op::state;
|
||||
opts.event_id = room.event_id;
|
||||
opts.room_id = room.room_id;
|
||||
opts.arg[0] = "ids";
|
||||
opts.exclude_myself = true;
|
||||
opts.closure_errors = false;
|
||||
opts.nothrow_closure = true;
|
||||
log::debug
|
||||
{
|
||||
log, "%s acquire state event ids in %s...",
|
||||
loghead(eval),
|
||||
string_view{room.room_id},
|
||||
};
|
||||
|
||||
std::set<std::string, std::less<>> req;
|
||||
std::forward_list<ctx::future<m::fetch::result>> ret;
|
||||
feds::execute(opts, [&eval, &ret, &req]
|
||||
(const auto &result)
|
||||
{
|
||||
const auto each_state_id{[&eval, &ret, &req, &result]
|
||||
(const m::event::id &event_id)
|
||||
{
|
||||
auto it(req.lower_bound(event_id));
|
||||
if(it != end(req) && *it == event_id)
|
||||
return;
|
||||
|
||||
m::fetch::opts opts;
|
||||
opts.op = m::fetch::op::event;
|
||||
opts.room_id = result.request->room_id;
|
||||
opts.event_id = event_id;
|
||||
opts.hint =
|
||||
{
|
||||
event_id.host() && !my_host(event_id.host())?
|
||||
event_id.host():
|
||||
|
||||
!my_host(result.origin)?
|
||||
result.origin:
|
||||
|
||||
eval.opts->node_id && !my_host(eval.opts->node_id)?
|
||||
eval.opts->node_id:
|
||||
|
||||
opts.room_id.host() && !my_host(opts.room_id.host())?
|
||||
opts.room_id.host():
|
||||
|
||||
string_view{}
|
||||
};
|
||||
|
||||
req.emplace_hint(it, event_id);
|
||||
ret.emplace_front(m::fetch::start(opts));
|
||||
|
||||
assert(std::distance(begin(ret), end(ret)) <= ssize_t(req.size()));
|
||||
log::debug
|
||||
{
|
||||
log, "%s requesting state event %s off %s in %s reqs:%zu",
|
||||
loghead(eval),
|
||||
string_view{event_id},
|
||||
string_view{result.request->event_id},
|
||||
string_view{result.request->room_id},
|
||||
};
|
||||
}};
|
||||
|
||||
const auto each_state_set{[&each_state_id]
|
||||
(const json::array &ids)
|
||||
{
|
||||
event::id event_id[32]; //TODO conf
|
||||
auto it(begin(ids)); do
|
||||
{
|
||||
size_t i(0);
|
||||
for(; i < 32 && it != end(ids); ++it)
|
||||
event_id[i++] = json::string{*it};
|
||||
|
||||
const vector_view<const event::id> event_ids
|
||||
(
|
||||
event_id, i
|
||||
);
|
||||
|
||||
const uint64_t exists
|
||||
{
|
||||
m::exists(event_ids)
|
||||
};
|
||||
|
||||
for(size_t j(0); j < i; ++j)
|
||||
if(~exists & (1UL << j))
|
||||
each_state_id(event_id[j]);
|
||||
}
|
||||
while(it != end(ids));
|
||||
}};
|
||||
|
||||
const json::array &auth_chain_ids
|
||||
{
|
||||
result.object["auth_chain_ids"]
|
||||
};
|
||||
|
||||
const json::array &pdu_ids
|
||||
{
|
||||
result.object["pdu_ids"]
|
||||
};
|
||||
|
||||
each_state_set(auth_chain_ids);
|
||||
each_state_set(pdu_ids);
|
||||
return true;
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
//
|
||||
// prev_events handler stack
|
||||
//
|
||||
|
|
Loading…
Reference in a new issue