ircd:Ⓜ️:vm::fetch: Eliminate the remaining coarse sleep for prev fetch+eval.
This commit is contained in:
parent
dcfae310ab
commit
767f6cbae5
|
@ -13,7 +13,7 @@ namespace ircd::m::vm::fetch
|
|||
static void prev_check(const event &, vm::eval &);
|
||||
static bool prev_wait(const event &, vm::eval &);
|
||||
static std::forward_list<ctx::future<m::fetch::result>> prev_fetch(const event &, vm::eval &, const room &);
|
||||
static void prev_eval(const event &, vm::eval &, ctx::future<m::fetch::result> &);
|
||||
static void prev_eval(const event &, vm::eval &, ctx::future<m::fetch::result> &, const system_point &);
|
||||
static void prev(const event &, vm::eval &, const room &);
|
||||
static std::forward_list<ctx::future<m::fetch::result>> state_fetch(const event &, vm::eval &, const room &);
|
||||
static void state(const event &, vm::eval &, const room &);
|
||||
|
@ -22,9 +22,8 @@ namespace ircd::m::vm::fetch
|
|||
static void auth(const event &, vm::eval &, const room &);
|
||||
static void handle(const event &, vm::eval &);
|
||||
|
||||
extern conf::item<milliseconds> prev_fetch_check_interval;
|
||||
extern conf::item<milliseconds> prev_preempt_time;
|
||||
extern conf::item<milliseconds> prev_wait_time;
|
||||
extern conf::item<size_t> prev_wait_count;
|
||||
extern conf::item<size_t> prev_backfill_limit;
|
||||
extern conf::item<seconds> event_timeout;
|
||||
extern conf::item<seconds> state_timeout;
|
||||
|
@ -104,25 +103,18 @@ ircd::m::vm::fetch::prev_backfill_limit
|
|||
{ "default", 128L },
|
||||
};
|
||||
|
||||
decltype(ircd::m::vm::fetch::prev_wait_count)
|
||||
ircd::m::vm::fetch::prev_wait_count
|
||||
{
|
||||
{ "name", "ircd.m.vm.fetch.prev.wait.count" },
|
||||
{ "default", 4L },
|
||||
};
|
||||
|
||||
decltype(ircd::m::vm::fetch::prev_wait_time)
|
||||
ircd::m::vm::fetch::prev_wait_time
|
||||
{
|
||||
{ "name", "ircd.m.vm.fetch.prev.wait.time" },
|
||||
{ "default", 200L },
|
||||
{ "default", 750L },
|
||||
};
|
||||
|
||||
decltype(ircd::m::vm::fetch::prev_fetch_check_interval)
|
||||
ircd::m::vm::fetch::prev_fetch_check_interval
|
||||
decltype(ircd::m::vm::fetch::prev_preempt_time)
|
||||
ircd::m::vm::fetch::prev_preempt_time
|
||||
{
|
||||
{ "name", "ircd.m.vm.fetch.prev.fetch.check_interval" },
|
||||
{ "default", 500L },
|
||||
{ "name", "ircd.m.vm.fetch.prev.preempt.time" },
|
||||
{ "default", 5000L },
|
||||
};
|
||||
|
||||
//
|
||||
|
@ -508,11 +500,13 @@ ircd::m::vm::fetch::prev(const event &event,
|
|||
|
||||
if(!m::vm::fetch::enable)
|
||||
{
|
||||
// No fetches will take place; only check if satisfied.
|
||||
prev_check(event, eval);
|
||||
return;
|
||||
}
|
||||
|
||||
auto futures
|
||||
// Launch fetches for missing prev events
|
||||
auto fetching
|
||||
{
|
||||
prev_fetch(event, eval, room)
|
||||
};
|
||||
|
@ -526,43 +520,62 @@ ircd::m::vm::fetch::prev(const event &event,
|
|||
at<"depth"_>(event),
|
||||
prev_count,
|
||||
prev_count - prev_exists,
|
||||
std::distance(begin(futures), end(futures)),
|
||||
};
|
||||
|
||||
auto fetching
|
||||
{
|
||||
ctx::when_all(begin(futures), end(futures))
|
||||
};
|
||||
|
||||
const auto timeout
|
||||
{
|
||||
now<system_point>() + seconds(event_timeout)
|
||||
};
|
||||
|
||||
const milliseconds &check_interval
|
||||
{
|
||||
prev_fetch_check_interval
|
||||
std::distance(begin(fetching), end(fetching)),
|
||||
};
|
||||
|
||||
// Rather than waiting for all of the events to arrive or for the entire
|
||||
// timeout to expire, we check if the sought events made it to the server
|
||||
// in the meantime. If so we can drop these requests and bail.
|
||||
//TODO: Ideally should be replaced with listener/notification/hook on the
|
||||
//TODO: events arriving rather than this coarse sleep cycles.
|
||||
while(now<system_point>() < timeout)
|
||||
{
|
||||
// Wait for an interval to give this loop some iterations.
|
||||
if(fetching.wait(check_interval, std::nothrow))
|
||||
break;
|
||||
// in the meantime. If so we can drop these fetches and bail.
|
||||
std::optional<vm::notify::future> evaling[prev_count];
|
||||
for(size_t i(0); i < prev_count; ++i)
|
||||
evaling[i].emplace(prev.prev_event(i));
|
||||
|
||||
// Check for satisfaction.
|
||||
if(prev.prev_events_exist() == prev_count)
|
||||
return;
|
||||
// Either all of the fetches are done and we can start evaluating or all
|
||||
// of the events arrived elsehow and we don't need any of the fetches.
|
||||
// XXX: Ideally this could be refactored with mix-and-match granularity
|
||||
// but at this time it's unknown if there's practical benefit.
|
||||
ctx::future<> when[]
|
||||
{
|
||||
ctx::when_all(begin(fetching), end(fetching)),
|
||||
ctx::when_all(evaling, evaling + prev_count, []
|
||||
(auto &optional) -> ctx::future<> &
|
||||
{
|
||||
return optional->value();
|
||||
}),
|
||||
};
|
||||
|
||||
// Represents one of the two outcomes.
|
||||
auto future
|
||||
{
|
||||
ctx::when_any(begin(when), end(when))
|
||||
};
|
||||
|
||||
const auto prev_wait_until
|
||||
{
|
||||
now<system_point>() + milliseconds(prev_preempt_time)
|
||||
};
|
||||
|
||||
// Wait for one of the two outcomes.
|
||||
const bool finished
|
||||
{
|
||||
future.wait_until(prev_wait_until, std::nothrow)
|
||||
};
|
||||
|
||||
// Check for satisfaction.
|
||||
if(prev.prev_events_exist() == prev_count)
|
||||
{
|
||||
assert(finished);
|
||||
return;
|
||||
}
|
||||
|
||||
// evaluate results
|
||||
for(auto &future : futures)
|
||||
prev_eval(event, eval, future);
|
||||
const auto event_wait_until
|
||||
{
|
||||
now<system_point>() + seconds(event_timeout)
|
||||
};
|
||||
|
||||
// If we're not satisfied we commit to evaluating the fetches.
|
||||
for(auto &fetch : fetching)
|
||||
prev_eval(event, eval, fetch, event_wait_until);
|
||||
|
||||
// check if result evals have satisfied this eval now; or throw
|
||||
prev_check(event, eval);
|
||||
|
@ -571,12 +584,13 @@ ircd::m::vm::fetch::prev(const event &event,
|
|||
void
|
||||
ircd::m::vm::fetch::prev_eval(const event &event,
|
||||
vm::eval &eval,
|
||||
ctx::future<m::fetch::result> &future)
|
||||
ctx::future<m::fetch::result> &future,
|
||||
const system_point &until)
|
||||
try
|
||||
{
|
||||
m::fetch::result result
|
||||
{
|
||||
future.get()
|
||||
future.get_until(until)
|
||||
};
|
||||
|
||||
const json::object content
|
||||
|
|
Loading…
Reference in New Issue