From 767f6cbae56a51dbf471b9f6a12d08459dfad2f5 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 16 Mar 2023 20:39:17 -0700 Subject: [PATCH] ircd::m::vm::fetch: Eliminate the remaining coarse sleep for prev fetch+eval. --- matrix/vm_fetch.cc | 110 +++++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/matrix/vm_fetch.cc b/matrix/vm_fetch.cc index 3114c6894..c32699f4e 100644 --- a/matrix/vm_fetch.cc +++ b/matrix/vm_fetch.cc @@ -13,7 +13,7 @@ namespace ircd::m::vm::fetch static void prev_check(const event &, vm::eval &); static bool prev_wait(const event &, vm::eval &); static std::forward_list> prev_fetch(const event &, vm::eval &, const room &); - static void prev_eval(const event &, vm::eval &, ctx::future &); + static void prev_eval(const event &, vm::eval &, ctx::future &, const system_point &); static void prev(const event &, vm::eval &, const room &); static std::forward_list> state_fetch(const event &, vm::eval &, const room &); static void state(const event &, vm::eval &, const room &); @@ -22,9 +22,8 @@ namespace ircd::m::vm::fetch static void auth(const event &, vm::eval &, const room &); static void handle(const event &, vm::eval &); - extern conf::item prev_fetch_check_interval; + extern conf::item prev_preempt_time; extern conf::item prev_wait_time; - extern conf::item prev_wait_count; extern conf::item prev_backfill_limit; extern conf::item event_timeout; extern conf::item state_timeout; @@ -104,25 +103,18 @@ ircd::m::vm::fetch::prev_backfill_limit { "default", 128L }, }; -decltype(ircd::m::vm::fetch::prev_wait_count) -ircd::m::vm::fetch::prev_wait_count -{ - { "name", "ircd.m.vm.fetch.prev.wait.count" }, - { "default", 4L }, -}; - decltype(ircd::m::vm::fetch::prev_wait_time) ircd::m::vm::fetch::prev_wait_time { { "name", "ircd.m.vm.fetch.prev.wait.time" }, - { "default", 200L }, + { "default", 750L }, }; -decltype(ircd::m::vm::fetch::prev_fetch_check_interval) -ircd::m::vm::fetch::prev_fetch_check_interval +decltype(ircd::m::vm::fetch::prev_preempt_time) +ircd::m::vm::fetch::prev_preempt_time { - { "name", "ircd.m.vm.fetch.prev.fetch.check_interval" }, - { "default", 500L }, + { "name", "ircd.m.vm.fetch.prev.preempt.time" }, + { "default", 5000L }, }; // @@ -508,11 +500,13 @@ ircd::m::vm::fetch::prev(const event &event, if(!m::vm::fetch::enable) { + // No fetches will take place; only check if satisfied. prev_check(event, eval); return; } - auto futures + // Launch fetches for missing prev events + auto fetching { prev_fetch(event, eval, room) }; @@ -526,43 +520,62 @@ ircd::m::vm::fetch::prev(const event &event, at<"depth"_>(event), prev_count, prev_count - prev_exists, - std::distance(begin(futures), end(futures)), - }; - - auto fetching - { - ctx::when_all(begin(futures), end(futures)) - }; - - const auto timeout - { - now() + seconds(event_timeout) - }; - - const milliseconds &check_interval - { - prev_fetch_check_interval + std::distance(begin(fetching), end(fetching)), }; // Rather than waiting for all of the events to arrive or for the entire // timeout to expire, we check if the sought events made it to the server - // in the meantime. If so we can drop these requests and bail. - //TODO: Ideally should be replaced with listener/notification/hook on the - //TODO: events arriving rather than this coarse sleep cycles. - while(now() < timeout) - { - // Wait for an interval to give this loop some iterations. - if(fetching.wait(check_interval, std::nothrow)) - break; + // in the meantime. If so we can drop these fetches and bail. + std::optional evaling[prev_count]; + for(size_t i(0); i < prev_count; ++i) + evaling[i].emplace(prev.prev_event(i)); - // Check for satisfaction. - if(prev.prev_events_exist() == prev_count) - return; + // Either all of the fetches are done and we can start evaluating or all + // of the events arrived elsehow and we don't need any of the fetches. + // XXX: Ideally this could be refactored with mix-and-match granularity + // but at this time it's unknown if there's practical benefit. + ctx::future<> when[] + { + ctx::when_all(begin(fetching), end(fetching)), + ctx::when_all(evaling, evaling + prev_count, [] + (auto &optional) -> ctx::future<> & + { + return optional->value(); + }), + }; + + // Represents one of the two outcomes. + auto future + { + ctx::when_any(begin(when), end(when)) + }; + + const auto prev_wait_until + { + now() + milliseconds(prev_preempt_time) + }; + + // Wait for one of the two outcomes. + const bool finished + { + future.wait_until(prev_wait_until, std::nothrow) + }; + + // Check for satisfaction. + if(prev.prev_events_exist() == prev_count) + { + assert(finished); + return; } - // evaluate results - for(auto &future : futures) - prev_eval(event, eval, future); + const auto event_wait_until + { + now() + seconds(event_timeout) + }; + + // If we're not satisfied we commit to evaluating the fetches. + for(auto &fetch : fetching) + prev_eval(event, eval, fetch, event_wait_until); // check if result evals have satisfied this eval now; or throw prev_check(event, eval); @@ -571,12 +584,13 @@ ircd::m::vm::fetch::prev(const event &event, void ircd::m::vm::fetch::prev_eval(const event &event, vm::eval &eval, - ctx::future &future) + ctx::future &future, + const system_point &until) try { m::fetch::result result { - future.get() + future.get_until(until) }; const json::object content