0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-28 00:14:07 +01:00

modules/m_vm_fetch: Split and reorg handler stack; add conf items.

This commit is contained in:
Jason Volk 2019-09-05 16:30:51 -07:00
parent 6831f195aa
commit 6300b5a9fb

View file

@ -10,11 +10,15 @@
namespace ircd::m::vm::fetch namespace ircd::m::vm::fetch
{ {
static void hook_handle_prev(const event &, vm::eval &, const room &); static void prev_check(const event &, vm::eval &);
static void auth_chain(const room &, const string_view &remote); static std::forward_list<ctx::future<m::fetch::result>> prev_fetch(const event &, vm::eval &, const room &);
static void hook_handle_auth(const event &, vm::eval &, const room &); static void prev(const event &, vm::eval &, const room &);
static void hook_handle(const event &, vm::eval &); static void auth_chain_eval(const event &, vm::eval &, const room &, const json::array &);
static void auth_chain(const event &, vm::eval &, const room &);
static void auth(const event &, vm::eval &, const room &);
static void handle(const event &, vm::eval &);
extern conf::item<size_t> prev_backfill_limit;
extern conf::item<seconds> auth_timeout; extern conf::item<seconds> auth_timeout;
extern conf::item<bool> enable; extern conf::item<bool> enable;
extern hookfn<vm::eval &> hook; extern hookfn<vm::eval &> hook;
@ -33,6 +37,15 @@ ircd::m::vm::fetch::log
"m.vm.fetch" "m.vm.fetch"
}; };
decltype(ircd::m::vm::fetch::hook)
ircd::m::vm::fetch::hook
{
handle,
{
{ "_site", "vm.fetch" }
}
};
decltype(ircd::m::vm::fetch::enable) decltype(ircd::m::vm::fetch::enable)
ircd::m::vm::fetch::enable ircd::m::vm::fetch::enable
{ {
@ -47,13 +60,11 @@ ircd::m::vm::fetch::auth_timeout
{ "default", 15L }, { "default", 15L },
}; };
decltype(ircd::m::vm::fetch::hook) decltype(ircd::m::vm::fetch::prev_backfill_limit)
ircd::m::vm::fetch::hook ircd::m::vm::fetch::prev_backfill_limit
{ {
hook_handle, { "name", "ircd.m.vm.fetch.prev.backfill.limit" },
{ { "default", 128L },
{ "_site", "vm.fetch" }
}
}; };
// //
@ -61,13 +72,16 @@ ircd::m::vm::fetch::hook
// //
void void
ircd::m::vm::fetch::hook_handle(const event &event, ircd::m::vm::fetch::handle(const event &event,
vm::eval &eval) vm::eval &eval)
try try
{ {
assert(eval.opts); assert(eval.opts);
assert(eval.opts->fetch); assert(eval.opts->fetch);
const auto &opts{*eval.opts}; const auto &opts
{
*eval.opts
};
const auto &type const auto &type
{ {
@ -94,17 +108,10 @@ try
room.event_id = event_id; room.event_id = event_id;
if(opts.fetch_auth) if(opts.fetch_auth)
hook_handle_auth(event, eval, room); auth(event, eval, room);
if(opts.fetch_prev) if(opts.fetch_prev)
hook_handle_prev(event, eval, room); prev(event, eval, room);
log::debug
{
log, "%s in %s complete",
loghead(eval),
json::get<"room_id"_>(event),
};
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
@ -118,31 +125,40 @@ catch(const std::exception &e)
throw; throw;
} }
void //
ircd::m::vm::fetch::hook_handle_auth(const event &event, // auth_events handler stack
vm::eval &eval, //
const room &room)
void
ircd::m::vm::fetch::auth(const event &event,
vm::eval &eval,
const room &room)
try
{ {
// Count how many of the auth_events provided exist locally. // Count how many of the auth_events provided exist locally.
const auto &opts{*eval.opts}; const auto &opts{*eval.opts};
const event::prev prev{event}; const event::prev prev{event};
const size_t auth_count
{
prev.auth_events_count()
};
size_t exists(0); size_t auth_exists{0};
for(size_t i(0); i < prev.auth_events_count(); ++i) for(size_t i(0); i < auth_count; ++i)
{ {
const auto &auth_id const auto &auth_id
{ {
prev.auth_event(i) prev.auth_event(i)
}; };
exists += bool(m::exists(auth_id)); auth_exists += bool(m::exists(auth_id));
} }
// We are satisfied at this point if all auth_events for this event exist, // We are satisfied at this point if all auth_events for this event exist,
// as those events have themselves been successfully evaluated and so forth. // as those events have themselves been successfully evaluated and so forth.
assert(exists <= prev.auth_events_count()); assert(auth_exists <= auth_count);
if(exists == prev.auth_events_count()) if(auth_exists == auth_count)
return; return;
// At this point we are missing one or more auth_events for this event. // At this point we are missing one or more auth_events for this event.
@ -150,62 +166,80 @@ ircd::m::vm::fetch::hook_handle_auth(const event &event,
{ {
log, "%s auth_events:%zu miss:%zu", log, "%s auth_events:%zu miss:%zu",
loghead(eval), loghead(eval),
prev.auth_events_count(), auth_count,
exists - prev.auth_events_count(), auth_exists - auth_count,
}; };
// We need to figure out where best to sling a request to fetch these if(!bool(m::vm::fetch::enable))
// missing auth_events. We prefer the remote client conducting this eval
// with their /federation/send/ request which we stored in the opts.
const string_view &remote
{
opts.node_id?
opts.node_id:
!my_host(json::get<"origin"_>(event))?
string_view(json::get<"origin"_>(event)):
!my_host(room.room_id.host())? //TODO: XXX
room.room_id.host():
string_view{}
};
// Bail out here if we can't or won't attempt fetching auth_events.
if(!opts.fetch_auth || !bool(m::vm::fetch::enable) || !remote)
throw vm::error throw vm::error
{ {
vm::fault::EVENT, "Failed to fetch auth_events for %s in %s", vm::fault::AUTH, "Fetching auth_events disabled by configuration",
string_view{event.event_id}, };
json::get<"room_id"_>(event)
if(!opts.fetch_auth)
throw vm::error
{
vm::fault::AUTH, "Not fetching auth_events for this evaluation",
}; };
// This is a blocking call to recursively fetch and evaluate the auth_chain // This is a blocking call to recursively fetch and evaluate the auth_chain
// for this event. Upon return all of the auth_events for this event will // for this event. Upon return all of the auth_events for this event will
// have themselves been fetched and auth'ed recursively or throws. // have themselves been fetched and auth'ed recursively.
auth_chain(room, remote); auth_chain(event, eval, room);
}
catch(const std::exception &e)
{
throw vm::error
{
vm::fault::AUTH, "Failed to fetch all auth_events :%s",
string_view{event.event_id},
json::get<"room_id"_>(event),
e.what()
};
} }
void void
ircd::m::vm::fetch::auth_chain(const room &room, ircd::m::vm::fetch::auth_chain(const event &event,
const string_view &remote) vm::eval &eval,
const room &room)
try try
{ {
log::debug assert(eval.opts);
{
log, "Fetching auth chain for %s in %s (hint: %s)",
string_view{room.event_id},
string_view{room.room_id},
remote,
};
m::fetch::opts opts; m::fetch::opts opts;
opts.op = m::fetch::op::auth; opts.op = m::fetch::op::auth;
opts.room_id = room.room_id; opts.room_id = room.room_id;
opts.event_id = room.event_id; opts.event_id = room.event_id;
opts.hint = remote;
// Figure out a remote hint as the primary target to request the missing
// auth events from; if provided, m::fetch will ask this remote first. We
// try to use the eval.node_id, which is set to a server that is conducting
// the eval (i.e in a /send/ or when processing some response data from
// them); next we try the origin of the event itself. These remotes are
// most likely to provide a satisfying response.
opts.hint =
{
eval.opts->node_id?
eval.opts->node_id:
!my_host(json::get<"origin"_>(event))?
string_view(json::get<"origin"_>(event)):
string_view{}
};
log::debug
{
log, "Fetching auth chain for %s in %s hint:%s",
string_view{room.event_id},
string_view{room.room_id},
opts.hint,
};
// send
auto future auto future
{ {
m::fetch::start(opts) m::fetch::start(opts)
}; };
// recv
const auto result const auto result
{ {
future.get(seconds(auth_timeout)) future.get(seconds(auth_timeout))
@ -216,38 +250,25 @@ try
result result
}; };
// parse
const json::array &auth_chain const json::array &auth_chain
{ {
response["auth_chain"] response["auth_chain"]
}; };
log::debug auth_chain_eval(event, eval, room, auth_chain);
{ }
log, "Evaluating %zu auth events in chain for %s in %s", catch(const vm::error &e)
auth_chain.size(), {
string_view{room.event_id}, throw;
string_view{room.room_id},
};
m::vm::opts vmopts;
vmopts.infolog_accept = true;
vmopts.fetch_prev = false;
vmopts.fetch_state = false;
vmopts.warnlog &= ~vm::fault::EXISTS;
m::vm::eval
{
auth_chain, vmopts
};
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
thread_local char rembuf[64];
log::error log::error
{ {
log, "Fetching auth chain for %s in %s from %s :%s", log, "Fetching auth chain for %s in %s :%s",
string_view{room.event_id}, string_view{room.event_id},
string_view{room.room_id}, string_view{room.room_id},
string(rembuf, remote),
e.what(), e.what(),
}; };
@ -255,9 +276,54 @@ catch(const std::exception &e)
} }
void void
ircd::m::vm::fetch::hook_handle_prev(const event &event, ircd::m::vm::fetch::auth_chain_eval(const event &event,
vm::eval &eval, vm::eval &eval,
const room &room) const room &room,
const json::array &auth_chain)
try
{
assert(eval.opts);
m::vm::opts opts;
opts.node_id = eval.opts->node_id;
opts.fetch_prev = false;
opts.fetch_state = false;
opts.infolog_accept = true;
opts.warnlog &= ~vm::fault::EXISTS;
log::debug
{
log, "Evaluating auth chain for %s in %s events:%zu",
string_view{room.event_id},
string_view{room.room_id},
auth_chain.size(),
};
// eval
m::vm::eval
{
auth_chain, opts
};
}
catch(const std::exception &e)
{
log::error
{
log, "Evaluating auth chain for %s in %s :%s",
string_view{room.event_id},
string_view{room.room_id},
e.what(),
};
throw;
}
//
// prev_events handler stack
//
void
ircd::m::vm::fetch::prev(const event &event,
vm::eval &eval,
const room &room)
{ {
const auto &opts{*eval.opts}; const auto &opts{*eval.opts};
const event::prev prev{event}; const event::prev prev{event};
@ -266,66 +332,36 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event,
prev.prev_events_count() prev.prev_events_count()
}; };
size_t prev_exists(0); const size_t prev_exists
std::list<ctx::future<m::fetch::result>> futures;
for(size_t i(0); i < prev_count; ++i)
{ {
const auto &prev_id prev.prev_events_exist()
{ };
prev.prev_event(i)
};
if(m::exists(prev_id)) assert(prev_exists <= prev_count);
{ if(prev_count == prev_exists)
++prev_exists; return;
continue;
}
if(!opts.fetch_prev || !m::vm::fetch::enable) if(!opts.fetch_prev || !m::vm::fetch::enable)
continue; {
prev_check(event, eval);
const int64_t room_depth return;
{
m::depth(std::nothrow, room)
};
//TODO: XXX
const bool recent_event
{
at<"depth"_>(event) >= room_depth - 20L //TODO: XXX
};
if(!recent_event)
continue;
const ssize_t limit
{
at<"depth"_>(event) - room_depth
};
m::fetch::opts opts;
opts.op = m::fetch::op::backfill;
opts.limit = std::min(limit, 32L);
opts.room_id = room.room_id;
opts.event_id = prev_id;
futures.emplace_back(m::fetch::start(opts));
} }
// If we have all of the referenced prev_events we are satisfied here. auto futures
assert(prev_exists < prev_count); {
if(prev_exists == prev_count) prev_fetch(event, eval, room)
return; };
// At this point one or more prev_events are missing; the fetches were // At this point one or more prev_events are missing; the fetches were
// launched asynchronously if the options allowed for it. // launched asynchronously if the options allowed for it.
log::dwarning log::dwarning
{ {
log, "%s prev_events:%zu miss:%zu fetching:%zu", log, "%s depth:%ld prev_events:%zu miss:%zu fetching:%zu fetching ...",
loghead(eval), loghead(eval),
at<"depth"_>(event),
prev_count, prev_count,
prev_exists,
prev_count - prev_exists, prev_count - prev_exists,
futures.size(), std::distance(begin(futures), end(futures)),
}; };
auto fetching auto fetching
@ -333,7 +369,10 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event,
ctx::when_all(begin(futures), end(futures)) ctx::when_all(begin(futures), end(futures))
}; };
// yields context
fetching.wait(); fetching.wait();
// evaluate results
for(auto &future : futures) try for(auto &future : futures) try
{ {
m::fetch::result result m::fetch::result result
@ -353,7 +392,7 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event,
pdus.size(), pdus.size(),
}; };
m::vm::eval vm::eval
{ {
pdus, opts pdus, opts
}; };
@ -368,19 +407,113 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event,
}; };
} }
const bool recount // check if result evals have satisfied this eval now; or throw
prev_check(event, eval);
}
std::forward_list
<
ircd::ctx::future<ircd::m::fetch::result>
>
ircd::m::vm::fetch::prev_fetch(const event &event,
vm::eval &eval,
const room &room)
{
const long room_depth
{ {
(opts.fetch_prev_any && !prev_exists) m::depth(std::nothrow, room)
|| opts.fetch_prev_all
}; };
if(recount) const long viewport_depth
{ {
prev_exists = 0; room_depth - long(room::events::viewport_size)
for(size_t i(0); i < prev_count; ++i) };
prev_exists += bool(m::exists(prev.prev_event(i)));
std::forward_list
<
ctx::future<m::fetch::result>
>
ret;
const event::prev prev{event};
for(size_t i(0); i < prev.prev_events_count(); ++i)
{
const auto &prev_id
{
prev.prev_event(i)
};
if(m::exists(prev_id))
continue;
const bool recent_event
{
at<"depth"_>(event) >= viewport_depth
};
if(!recent_event)
{
log::dwarning
{
log, "%s no action for missing prev %s; depth:%ld room:%ld viewport:%ld",
loghead(eval),
string_view{prev_id},
at<"depth"_>(event),
room_depth,
viewport_depth,
};
continue;
}
const long depth_gap
{
std::max(std::abs(at<"depth"_>(event) - room_depth), 1L)
};
m::fetch::opts opts;
opts.op = m::fetch::op::backfill;
opts.room_id = room.room_id;
opts.event_id = prev_id;
opts.limit = size_t(depth_gap);
opts.limit = std::min(opts.limit, eval.opts->fetch_prev_limit);
opts.limit = std::min(opts.limit, size_t(prev_backfill_limit));
log::debug
{
log, "%s requesting backfill off %s; depth:%ld viewport:%ld room:%ld gap:%ld limit:%zu",
loghead(eval),
string_view{prev_id},
at<"depth"_>(event),
viewport_depth,
room_depth,
depth_gap,
opts.limit,
};
ret.emplace_front(m::fetch::start(opts));
} }
return ret;
}
void
ircd::m::vm::fetch::prev_check(const event &event,
vm::eval &eval)
{
const auto &opts
{
*eval.opts
};
const event::prev prev
{
event
};
const size_t prev_exists
{
prev.prev_events_exist()
};
// Aborts this event if the options want us to guarantee at least one // Aborts this event if the options want us to guarantee at least one
// prev_event was fetched and evaluated for this event. This is generally // prev_event was fetched and evaluated for this event. This is generally
// used in conjunction with the fetch_prev_wait option to be effective. // used in conjunction with the fetch_prev_wait option to be effective.
@ -388,19 +521,19 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event,
throw vm::error throw vm::error
{ {
vm::fault::EVENT, "Failed to fetch any of the %zu prev_events for %s in %s", vm::fault::EVENT, "Failed to fetch any of the %zu prev_events for %s in %s",
prev_count, prev.prev_events_count(),
string_view{event.event_id}, string_view{event.event_id},
json::get<"room_id"_>(event) json::get<"room_id"_>(event)
}; };
// Aborts this event if the options want us to guarantee ALL of the // Aborts this event if the options want us to guarantee ALL of the
// prev_events were fetched and evaluated for this event. // prev_events were fetched and evaluated for this event.
if(opts.fetch_prev_all && prev_exists < prev_count) if(opts.fetch_prev_all && prev_exists < prev.prev_events_count())
throw vm::error throw vm::error
{ {
vm::fault::EVENT, "Missing %zu of %zu required prev_events for %s in %s", vm::fault::EVENT, "Missing %zu of %zu required prev_events for %s in %s",
prev_exists, prev_exists,
prev_count, prev.prev_events_count(),
string_view{event.event_id}, string_view{event.event_id},
json::get<"room_id"_>(event) json::get<"room_id"_>(event)
}; };