From 6300b5a9fbc249f061cd4e94e9561e4022f53b9a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 5 Sep 2019 16:30:51 -0700 Subject: [PATCH] modules/m_vm_fetch: Split and reorg handler stack; add conf items. --- modules/m_vm_fetch.cc | 425 +++++++++++++++++++++++++++--------------- 1 file changed, 279 insertions(+), 146 deletions(-) diff --git a/modules/m_vm_fetch.cc b/modules/m_vm_fetch.cc index 3b1346986..428ba9518 100644 --- a/modules/m_vm_fetch.cc +++ b/modules/m_vm_fetch.cc @@ -10,11 +10,15 @@ namespace ircd::m::vm::fetch { - static void hook_handle_prev(const event &, vm::eval &, const room &); - static void auth_chain(const room &, const string_view &remote); - static void hook_handle_auth(const event &, vm::eval &, const room &); - static void hook_handle(const event &, vm::eval &); + static void prev_check(const event &, vm::eval &); + static std::forward_list> prev_fetch(const event &, vm::eval &, const room &); + static void prev(const event &, vm::eval &, const room &); + static void auth_chain_eval(const event &, vm::eval &, const room &, const json::array &); + static void auth_chain(const event &, vm::eval &, const room &); + static void auth(const event &, vm::eval &, const room &); + static void handle(const event &, vm::eval &); + extern conf::item prev_backfill_limit; extern conf::item auth_timeout; extern conf::item enable; extern hookfn hook; @@ -33,6 +37,15 @@ ircd::m::vm::fetch::log "m.vm.fetch" }; +decltype(ircd::m::vm::fetch::hook) +ircd::m::vm::fetch::hook +{ + handle, + { + { "_site", "vm.fetch" } + } +}; + decltype(ircd::m::vm::fetch::enable) ircd::m::vm::fetch::enable { @@ -47,13 +60,11 @@ ircd::m::vm::fetch::auth_timeout { "default", 15L }, }; -decltype(ircd::m::vm::fetch::hook) -ircd::m::vm::fetch::hook +decltype(ircd::m::vm::fetch::prev_backfill_limit) +ircd::m::vm::fetch::prev_backfill_limit { - hook_handle, - { - { "_site", "vm.fetch" } - } + { "name", "ircd.m.vm.fetch.prev.backfill.limit" }, + { "default", 128L }, }; // @@ -61,13 +72,16 @@ ircd::m::vm::fetch::hook // void -ircd::m::vm::fetch::hook_handle(const event &event, - vm::eval &eval) +ircd::m::vm::fetch::handle(const event &event, + vm::eval &eval) try { assert(eval.opts); assert(eval.opts->fetch); - const auto &opts{*eval.opts}; + const auto &opts + { + *eval.opts + }; const auto &type { @@ -94,17 +108,10 @@ try room.event_id = event_id; if(opts.fetch_auth) - hook_handle_auth(event, eval, room); + auth(event, eval, room); if(opts.fetch_prev) - hook_handle_prev(event, eval, room); - - log::debug - { - log, "%s in %s complete", - loghead(eval), - json::get<"room_id"_>(event), - }; + prev(event, eval, room); } catch(const std::exception &e) { @@ -118,31 +125,40 @@ catch(const std::exception &e) throw; } -void -ircd::m::vm::fetch::hook_handle_auth(const event &event, - vm::eval &eval, - const room &room) +// +// auth_events handler stack +// +void +ircd::m::vm::fetch::auth(const event &event, + vm::eval &eval, + const room &room) + +try { // Count how many of the auth_events provided exist locally. const auto &opts{*eval.opts}; const event::prev prev{event}; + const size_t auth_count + { + prev.auth_events_count() + }; - size_t exists(0); - for(size_t i(0); i < prev.auth_events_count(); ++i) + size_t auth_exists{0}; + for(size_t i(0); i < auth_count; ++i) { const auto &auth_id { prev.auth_event(i) }; - exists += bool(m::exists(auth_id)); + auth_exists += bool(m::exists(auth_id)); } // We are satisfied at this point if all auth_events for this event exist, // as those events have themselves been successfully evaluated and so forth. - assert(exists <= prev.auth_events_count()); - if(exists == prev.auth_events_count()) + assert(auth_exists <= auth_count); + if(auth_exists == auth_count) return; // At this point we are missing one or more auth_events for this event. @@ -150,62 +166,80 @@ ircd::m::vm::fetch::hook_handle_auth(const event &event, { log, "%s auth_events:%zu miss:%zu", loghead(eval), - prev.auth_events_count(), - exists - prev.auth_events_count(), + auth_count, + auth_exists - auth_count, }; - // We need to figure out where best to sling a request to fetch these - // missing auth_events. We prefer the remote client conducting this eval - // with their /federation/send/ request which we stored in the opts. - const string_view &remote - { - opts.node_id? - opts.node_id: - !my_host(json::get<"origin"_>(event))? - string_view(json::get<"origin"_>(event)): - !my_host(room.room_id.host())? //TODO: XXX - room.room_id.host(): - string_view{} - }; - - // Bail out here if we can't or won't attempt fetching auth_events. - if(!opts.fetch_auth || !bool(m::vm::fetch::enable) || !remote) + if(!bool(m::vm::fetch::enable)) throw vm::error { - vm::fault::EVENT, "Failed to fetch auth_events for %s in %s", - string_view{event.event_id}, - json::get<"room_id"_>(event) + vm::fault::AUTH, "Fetching auth_events disabled by configuration", + }; + + if(!opts.fetch_auth) + throw vm::error + { + vm::fault::AUTH, "Not fetching auth_events for this evaluation", }; // This is a blocking call to recursively fetch and evaluate the auth_chain // for this event. Upon return all of the auth_events for this event will - // have themselves been fetched and auth'ed recursively or throws. - auth_chain(room, remote); + // have themselves been fetched and auth'ed recursively. + auth_chain(event, eval, room); +} +catch(const std::exception &e) +{ + throw vm::error + { + vm::fault::AUTH, "Failed to fetch all auth_events :%s", + string_view{event.event_id}, + json::get<"room_id"_>(event), + e.what() + }; } void -ircd::m::vm::fetch::auth_chain(const room &room, - const string_view &remote) +ircd::m::vm::fetch::auth_chain(const event &event, + vm::eval &eval, + const room &room) try { - log::debug - { - log, "Fetching auth chain for %s in %s (hint: %s)", - string_view{room.event_id}, - string_view{room.room_id}, - remote, - }; - + assert(eval.opts); m::fetch::opts opts; opts.op = m::fetch::op::auth; opts.room_id = room.room_id; opts.event_id = room.event_id; - opts.hint = remote; + + // Figure out a remote hint as the primary target to request the missing + // auth events from; if provided, m::fetch will ask this remote first. We + // try to use the eval.node_id, which is set to a server that is conducting + // the eval (i.e in a /send/ or when processing some response data from + // them); next we try the origin of the event itself. These remotes are + // most likely to provide a satisfying response. + opts.hint = + { + eval.opts->node_id? + eval.opts->node_id: + !my_host(json::get<"origin"_>(event))? + string_view(json::get<"origin"_>(event)): + string_view{} + }; + + log::debug + { + log, "Fetching auth chain for %s in %s hint:%s", + string_view{room.event_id}, + string_view{room.room_id}, + opts.hint, + }; + + // send auto future { m::fetch::start(opts) }; + // recv const auto result { future.get(seconds(auth_timeout)) @@ -216,38 +250,25 @@ try result }; + // parse const json::array &auth_chain { response["auth_chain"] }; - log::debug - { - log, "Evaluating %zu auth events in chain for %s in %s", - auth_chain.size(), - string_view{room.event_id}, - string_view{room.room_id}, - }; - - m::vm::opts vmopts; - vmopts.infolog_accept = true; - vmopts.fetch_prev = false; - vmopts.fetch_state = false; - vmopts.warnlog &= ~vm::fault::EXISTS; - m::vm::eval - { - auth_chain, vmopts - }; + auth_chain_eval(event, eval, room, auth_chain); +} +catch(const vm::error &e) +{ + throw; } catch(const std::exception &e) { - thread_local char rembuf[64]; log::error { - log, "Fetching auth chain for %s in %s from %s :%s", + log, "Fetching auth chain for %s in %s :%s", string_view{room.event_id}, string_view{room.room_id}, - string(rembuf, remote), e.what(), }; @@ -255,9 +276,54 @@ catch(const std::exception &e) } void -ircd::m::vm::fetch::hook_handle_prev(const event &event, - vm::eval &eval, - const room &room) +ircd::m::vm::fetch::auth_chain_eval(const event &event, + vm::eval &eval, + const room &room, + const json::array &auth_chain) +try +{ + assert(eval.opts); + m::vm::opts opts; + opts.node_id = eval.opts->node_id; + opts.fetch_prev = false; + opts.fetch_state = false; + opts.infolog_accept = true; + opts.warnlog &= ~vm::fault::EXISTS; + log::debug + { + log, "Evaluating auth chain for %s in %s events:%zu", + string_view{room.event_id}, + string_view{room.room_id}, + auth_chain.size(), + }; + + // eval + m::vm::eval + { + auth_chain, opts + }; +} +catch(const std::exception &e) +{ + log::error + { + log, "Evaluating auth chain for %s in %s :%s", + string_view{room.event_id}, + string_view{room.room_id}, + e.what(), + }; + + throw; +} + +// +// prev_events handler stack +// + +void +ircd::m::vm::fetch::prev(const event &event, + vm::eval &eval, + const room &room) { const auto &opts{*eval.opts}; const event::prev prev{event}; @@ -266,66 +332,36 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event, prev.prev_events_count() }; - size_t prev_exists(0); - std::list> futures; - for(size_t i(0); i < prev_count; ++i) + const size_t prev_exists { - const auto &prev_id - { - prev.prev_event(i) - }; + prev.prev_events_exist() + }; - if(m::exists(prev_id)) - { - ++prev_exists; - continue; - } + assert(prev_exists <= prev_count); + if(prev_count == prev_exists) + return; - if(!opts.fetch_prev || !m::vm::fetch::enable) - continue; - - const int64_t room_depth - { - m::depth(std::nothrow, room) - }; - - //TODO: XXX - const bool recent_event - { - at<"depth"_>(event) >= room_depth - 20L //TODO: XXX - }; - - if(!recent_event) - continue; - - const ssize_t limit - { - at<"depth"_>(event) - room_depth - }; - - m::fetch::opts opts; - opts.op = m::fetch::op::backfill; - opts.limit = std::min(limit, 32L); - opts.room_id = room.room_id; - opts.event_id = prev_id; - futures.emplace_back(m::fetch::start(opts)); + if(!opts.fetch_prev || !m::vm::fetch::enable) + { + prev_check(event, eval); + return; } - // If we have all of the referenced prev_events we are satisfied here. - assert(prev_exists < prev_count); - if(prev_exists == prev_count) - return; + auto futures + { + prev_fetch(event, eval, room) + }; // At this point one or more prev_events are missing; the fetches were // launched asynchronously if the options allowed for it. log::dwarning { - log, "%s prev_events:%zu miss:%zu fetching:%zu", + log, "%s depth:%ld prev_events:%zu miss:%zu fetching:%zu fetching ...", loghead(eval), + at<"depth"_>(event), prev_count, - prev_exists, prev_count - prev_exists, - futures.size(), + std::distance(begin(futures), end(futures)), }; auto fetching @@ -333,7 +369,10 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event, ctx::when_all(begin(futures), end(futures)) }; + // yields context fetching.wait(); + + // evaluate results for(auto &future : futures) try { m::fetch::result result @@ -353,7 +392,7 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event, pdus.size(), }; - m::vm::eval + vm::eval { pdus, opts }; @@ -368,19 +407,113 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event, }; } - const bool recount + // check if result evals have satisfied this eval now; or throw + prev_check(event, eval); +} + +std::forward_list +< + ircd::ctx::future +> +ircd::m::vm::fetch::prev_fetch(const event &event, + vm::eval &eval, + const room &room) +{ + const long room_depth { - (opts.fetch_prev_any && !prev_exists) - || opts.fetch_prev_all + m::depth(std::nothrow, room) }; - if(recount) + const long viewport_depth { - prev_exists = 0; - for(size_t i(0); i < prev_count; ++i) - prev_exists += bool(m::exists(prev.prev_event(i))); + room_depth - long(room::events::viewport_size) + }; + + std::forward_list + < + ctx::future + > + ret; + const event::prev prev{event}; + for(size_t i(0); i < prev.prev_events_count(); ++i) + { + const auto &prev_id + { + prev.prev_event(i) + }; + + if(m::exists(prev_id)) + continue; + + const bool recent_event + { + at<"depth"_>(event) >= viewport_depth + }; + + if(!recent_event) + { + log::dwarning + { + log, "%s no action for missing prev %s; depth:%ld room:%ld viewport:%ld", + loghead(eval), + string_view{prev_id}, + at<"depth"_>(event), + room_depth, + viewport_depth, + }; + + continue; + } + + const long depth_gap + { + std::max(std::abs(at<"depth"_>(event) - room_depth), 1L) + }; + + m::fetch::opts opts; + opts.op = m::fetch::op::backfill; + opts.room_id = room.room_id; + opts.event_id = prev_id; + opts.limit = size_t(depth_gap); + opts.limit = std::min(opts.limit, eval.opts->fetch_prev_limit); + opts.limit = std::min(opts.limit, size_t(prev_backfill_limit)); + log::debug + { + log, "%s requesting backfill off %s; depth:%ld viewport:%ld room:%ld gap:%ld limit:%zu", + loghead(eval), + string_view{prev_id}, + at<"depth"_>(event), + viewport_depth, + room_depth, + depth_gap, + opts.limit, + }; + + ret.emplace_front(m::fetch::start(opts)); } + return ret; +} + +void +ircd::m::vm::fetch::prev_check(const event &event, + vm::eval &eval) +{ + const auto &opts + { + *eval.opts + }; + + const event::prev prev + { + event + }; + + const size_t prev_exists + { + prev.prev_events_exist() + }; + // Aborts this event if the options want us to guarantee at least one // prev_event was fetched and evaluated for this event. This is generally // used in conjunction with the fetch_prev_wait option to be effective. @@ -388,19 +521,19 @@ ircd::m::vm::fetch::hook_handle_prev(const event &event, throw vm::error { vm::fault::EVENT, "Failed to fetch any of the %zu prev_events for %s in %s", - prev_count, + prev.prev_events_count(), string_view{event.event_id}, json::get<"room_id"_>(event) }; // Aborts this event if the options want us to guarantee ALL of the // prev_events were fetched and evaluated for this event. - if(opts.fetch_prev_all && prev_exists < prev_count) + if(opts.fetch_prev_all && prev_exists < prev.prev_events_count()) throw vm::error { vm::fault::EVENT, "Missing %zu of %zu required prev_events for %s in %s", prev_exists, - prev_count, + prev.prev_events_count(), string_view{event.event_id}, json::get<"room_id"_>(event) };