mirror of
https://github.com/matrix-construct/construct
synced 2025-01-20 03:21:54 +01:00
782 lines
16 KiB
C++
782 lines
16 KiB
C++
// The Construct
|
|
//
|
|
// Copyright (C) The Construct Developers, Authors & Contributors
|
|
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
|
|
//
|
|
// Permission to use, copy, modify, and/or distribute this software for any
|
|
// purpose with or without fee is hereby granted, provided that the above
|
|
// copyright notice and this permission notice is present in all copies. The
|
|
// full license for this software is available in the LICENSE file.
|
|
|
|
namespace ircd::m::vm::fetch
|
|
{
|
|
static void prev_check(const event &, vm::eval &);
|
|
static bool prev_wait(const event &, vm::eval &);
|
|
static std::forward_list<ctx::future<m::fetch::result>> prev_fetch(const event &, vm::eval &, const room &);
|
|
static void prev(const event &, vm::eval &, const room &);
|
|
static std::forward_list<ctx::future<m::fetch::result>> state_fetch(const event &, vm::eval &, const room &);
|
|
static void state(const event &, vm::eval &, const room &);
|
|
static void auth_chain_eval(const event &, vm::eval &, const room &, const json::array &, const string_view &);
|
|
static void auth_chain(const event &, vm::eval &, const room &);
|
|
static void auth(const event &, vm::eval &, const room &);
|
|
static void handle(const event &, vm::eval &);
|
|
|
|
extern conf::item<milliseconds> prev_fetch_check_interval;
|
|
extern conf::item<milliseconds> prev_wait_time;
|
|
extern conf::item<size_t> prev_wait_count;
|
|
extern conf::item<size_t> prev_backfill_limit;
|
|
extern conf::item<seconds> event_timeout;
|
|
extern conf::item<seconds> state_timeout;
|
|
extern conf::item<seconds> auth_timeout;
|
|
extern conf::item<bool> enable;
|
|
extern hookfn<vm::eval &> auth_hook;
|
|
extern hookfn<vm::eval &> prev_hook;
|
|
extern hookfn<vm::eval &> state_hook;
|
|
extern log::log log;
|
|
}
|
|
|
|
decltype(ircd::m::vm::fetch::log)
|
|
ircd::m::vm::fetch::log
|
|
{
|
|
"m.vm.fetch"
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::auth_hook)
|
|
ircd::m::vm::fetch::auth_hook
|
|
{
|
|
handle,
|
|
{
|
|
{ "_site", "vm.fetch.auth" }
|
|
}
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::prev_hook)
|
|
ircd::m::vm::fetch::prev_hook
|
|
{
|
|
handle,
|
|
{
|
|
{ "_site", "vm.fetch.prev" }
|
|
}
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::state_hook)
|
|
ircd::m::vm::fetch::state_hook
|
|
{
|
|
handle,
|
|
{
|
|
{ "_site", "vm.fetch.state" }
|
|
}
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::enable)
|
|
ircd::m::vm::fetch::enable
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.enable" },
|
|
{ "default", true },
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::auth_timeout)
|
|
ircd::m::vm::fetch::auth_timeout
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.auth.timeout" },
|
|
{ "default", 15L },
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::state_timeout)
|
|
ircd::m::vm::fetch::state_timeout
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.state.timeout" },
|
|
{ "default", 20L },
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::event_timeout)
|
|
ircd::m::vm::fetch::event_timeout
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.event.timeout" },
|
|
{ "default", 10L },
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::prev_backfill_limit)
|
|
ircd::m::vm::fetch::prev_backfill_limit
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.prev.backfill.limit" },
|
|
{ "default", 128L },
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::prev_wait_count)
|
|
ircd::m::vm::fetch::prev_wait_count
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.prev.wait.count" },
|
|
{ "default", 4L },
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::prev_wait_time)
|
|
ircd::m::vm::fetch::prev_wait_time
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.prev.wait.time" },
|
|
{ "default", 200L },
|
|
};
|
|
|
|
decltype(ircd::m::vm::fetch::prev_fetch_check_interval)
|
|
ircd::m::vm::fetch::prev_fetch_check_interval
|
|
{
|
|
{ "name", "ircd.m.vm.fetch.prev.fetch.check_interval" },
|
|
{ "default", 500L },
|
|
};
|
|
|
|
//
|
|
// fetch_phase
|
|
//
|
|
|
|
void
|
|
ircd::m::vm::fetch::handle(const event &event,
|
|
vm::eval &eval)
|
|
try
|
|
{
|
|
if(eval.room_internal)
|
|
return;
|
|
|
|
assert(eval.opts);
|
|
const auto &opts
|
|
{
|
|
*eval.opts
|
|
};
|
|
|
|
const auto &type
|
|
{
|
|
at<"type"_>(event)
|
|
};
|
|
|
|
if(type == "m.room.create")
|
|
return;
|
|
|
|
const m::event::id &event_id
|
|
{
|
|
event.event_id
|
|
};
|
|
|
|
const m::room::id &room_id
|
|
{
|
|
at<"room_id"_>(event)
|
|
};
|
|
|
|
// Can't construct m::room with the event_id argument because it
|
|
// won't be found (we're evaluating that event here!) so we just set
|
|
// the member manually to make further use of the room struct.
|
|
m::room room{room_id};
|
|
room.event_id = event_id;
|
|
|
|
if(eval.phase == phase::FETCH_AUTH)
|
|
return auth(event, eval, room);
|
|
|
|
if(eval.phase == phase::FETCH_PREV)
|
|
return prev(event, eval, room);
|
|
|
|
if(eval.phase == phase::FETCH_STATE)
|
|
return state(event, eval, room);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::derror
|
|
{
|
|
log, "%s :%s",
|
|
loghead(eval),
|
|
e.what(),
|
|
};
|
|
|
|
throw;
|
|
}
|
|
|
|
//
|
|
// auth_events handler stack
|
|
//
|
|
|
|
void
|
|
ircd::m::vm::fetch::auth(const event &event,
|
|
vm::eval &eval,
|
|
const room &room)
|
|
|
|
{
|
|
// Count how many of the auth_events provided exist locally.
|
|
const auto &opts{*eval.opts};
|
|
const event::auth prev
|
|
{
|
|
event
|
|
};
|
|
|
|
const size_t auth_count
|
|
{
|
|
prev.auth_events_count()
|
|
};
|
|
|
|
const size_t auth_exists
|
|
{
|
|
prev.auth_events_exist()
|
|
};
|
|
|
|
assert(auth_exists <= auth_count);
|
|
if(auth_exists != auth_count) try
|
|
{
|
|
log::dwarning
|
|
{
|
|
log, "%s auth_events:%zu miss:%zu",
|
|
loghead(eval),
|
|
auth_count,
|
|
auth_count - auth_exists,
|
|
};
|
|
|
|
if(!bool(m::vm::fetch::enable))
|
|
throw vm::error
|
|
{
|
|
vm::fault::AUTH, "Fetching auth_events disabled by configuration",
|
|
};
|
|
|
|
// This is a blocking call to recursively fetch and evaluate the auth_chain
|
|
// for this event. Upon return all of the auth_events for this event will
|
|
// have themselves been fetched and auth'ed recursively.
|
|
auth_chain(event, eval, room);
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
throw vm::error
|
|
{
|
|
vm::fault::AUTH, "Failed to fetch %zu of %zu auth_events :%s",
|
|
auth_count - prev.auth_events_exist(),
|
|
auth_count,
|
|
e.what()
|
|
};
|
|
}
|
|
}
|
|
|
|
void
|
|
ircd::m::vm::fetch::auth_chain(const event &event,
|
|
vm::eval &eval,
|
|
const room &room)
|
|
try
|
|
{
|
|
assert(eval.opts);
|
|
m::fetch::opts opts;
|
|
opts.op = m::fetch::op::auth;
|
|
opts.room_id = room.room_id;
|
|
opts.event_id = room.event_id;
|
|
opts.check_hashes = false;
|
|
|
|
// Figure out a remote hint as the primary target to request the missing
|
|
// auth events from; if provided, m::fetch will ask this remote first. We
|
|
// try to use the eval.node_id, which is set to a server that is conducting
|
|
// the eval (i.e in a /send/ or when processing some response data from
|
|
// them); next we try the origin of the event itself. These remotes are
|
|
// most likely to provide a satisfying response.
|
|
opts.hint =
|
|
{
|
|
eval.opts->node_id && !my_host(eval.opts->node_id)?
|
|
eval.opts->node_id:
|
|
|
|
event.event_id.host() && !my_host(event.event_id.host())?
|
|
event.event_id.host():
|
|
|
|
!my_host(json::get<"origin"_>(event))?
|
|
string_view(json::get<"origin"_>(event)):
|
|
|
|
room.room_id.host() && !my_host(room.room_id.host())?
|
|
room.room_id.host():
|
|
|
|
string_view{}
|
|
};
|
|
|
|
log::debug
|
|
{
|
|
log, "Fetching auth chain for %s in %s hint:%s",
|
|
string_view{room.event_id},
|
|
string_view{room.room_id},
|
|
opts.hint,
|
|
};
|
|
|
|
// send
|
|
auto future
|
|
{
|
|
m::fetch::start(opts)
|
|
};
|
|
|
|
// recv
|
|
const auto result
|
|
{
|
|
future.get(seconds(auth_timeout))
|
|
};
|
|
|
|
const json::object response
|
|
{
|
|
result
|
|
};
|
|
|
|
// parse
|
|
const json::array &auth_chain
|
|
{
|
|
response["auth_chain"]
|
|
};
|
|
|
|
auth_chain_eval(event, eval, room, auth_chain, result.origin);
|
|
}
|
|
catch(const vm::error &e)
|
|
{
|
|
throw;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "Fetching auth chain for %s in %s :%s",
|
|
string_view{room.event_id},
|
|
string_view{room.room_id},
|
|
e.what(),
|
|
};
|
|
|
|
throw;
|
|
}
|
|
|
|
void
|
|
ircd::m::vm::fetch::auth_chain_eval(const event &event,
|
|
vm::eval &eval,
|
|
const room &room,
|
|
const json::array &auth_chain_,
|
|
const string_view &origin)
|
|
try
|
|
{
|
|
assert(eval.opts);
|
|
auto opts(*eval.opts);
|
|
opts.fetch = false;
|
|
opts.infolog_accept = true;
|
|
opts.warnlog &= ~vm::fault::EXISTS;
|
|
opts.notify_servers = false;
|
|
opts.node_id = origin;
|
|
|
|
std::vector<m::event> auth_chain
|
|
(
|
|
std::begin(auth_chain_), std::end(auth_chain_)
|
|
);
|
|
|
|
// pre-sort here and indicate that to eval.
|
|
std::sort(begin(auth_chain), end(auth_chain));
|
|
opts.ordered = true;
|
|
|
|
log::debug
|
|
{
|
|
log, "Evaluating auth chain for %s in %s events:%zu",
|
|
string_view{room.event_id},
|
|
string_view{room.room_id},
|
|
auth_chain.size(),
|
|
};
|
|
|
|
// eval
|
|
m::vm::eval
|
|
{
|
|
auth_chain, opts
|
|
};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "Evaluating auth chain for %s in %s :%s",
|
|
string_view{room.event_id},
|
|
string_view{room.room_id},
|
|
e.what(),
|
|
};
|
|
|
|
throw;
|
|
}
|
|
|
|
//
|
|
// state handler stack
|
|
//
|
|
|
|
void
|
|
ircd::m::vm::fetch::state(const event &event,
|
|
vm::eval &eval,
|
|
const room &room)
|
|
try
|
|
{
|
|
const auto &opts{*eval.opts};
|
|
const event::prev prev
|
|
{
|
|
event
|
|
};
|
|
|
|
const bool prev_exist
|
|
{
|
|
prev.prev_exist()
|
|
};
|
|
|
|
if(opts.fetch_state_any)
|
|
if(prev_exist && prev.prev_events_count() == prev.prev_events_exist())
|
|
return;
|
|
|
|
if(likely(!opts.fetch_state_any))
|
|
if(prev_exist)
|
|
return;
|
|
|
|
if(likely(!opts.fetch_state_shallow))
|
|
{
|
|
const auto &[sounding_depth, sounding_idx]
|
|
{
|
|
m::sounding(room.room_id)
|
|
};
|
|
|
|
if(at<"depth"_>(event) > sounding_depth)
|
|
return;
|
|
}
|
|
|
|
log::dwarning
|
|
{
|
|
log, "%s fetching possible missing state in %s",
|
|
loghead(eval),
|
|
string_view{room.room_id},
|
|
};
|
|
|
|
struct m::acquire::opts acq_opts;
|
|
acq_opts.room = room;
|
|
acq_opts.head = false;
|
|
acq_opts.history = false;
|
|
acq_opts.state = true;
|
|
acq_opts.hint = opts.node_id;
|
|
m::acquire acq
|
|
{
|
|
acq_opts
|
|
};
|
|
|
|
//TODO: XXX
|
|
log::info
|
|
{
|
|
log, "%s evaluated missing state in %s fetched:-- good:-- fail:--",
|
|
loghead(eval),
|
|
string_view{room.room_id},
|
|
0,
|
|
0,
|
|
0,
|
|
};
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "%s state fetch in %s :%s",
|
|
loghead(eval),
|
|
string_view{room.room_id},
|
|
e.what(),
|
|
};
|
|
|
|
throw;
|
|
}
|
|
|
|
//
|
|
// prev_events handler stack
|
|
//
|
|
|
|
void
|
|
ircd::m::vm::fetch::prev(const event &event,
|
|
vm::eval &eval,
|
|
const room &room)
|
|
{
|
|
const auto &opts{*eval.opts};
|
|
const event::prev prev{event};
|
|
const size_t prev_count
|
|
{
|
|
prev.prev_events_count()
|
|
};
|
|
|
|
const size_t prev_exists
|
|
{
|
|
prev.prev_events_exist()
|
|
};
|
|
|
|
assert(prev_exists <= prev_count);
|
|
if(prev_count == prev_exists)
|
|
return;
|
|
|
|
// Attempt to wait for missing prev_events without issuing fetches here.
|
|
if(prev_wait(event, eval))
|
|
return;
|
|
|
|
if(!m::vm::fetch::enable)
|
|
{
|
|
prev_check(event, eval);
|
|
return;
|
|
}
|
|
|
|
auto futures
|
|
{
|
|
prev_fetch(event, eval, room)
|
|
};
|
|
|
|
// At this point one or more prev_events are missing; the fetches were
|
|
// launched asynchronously if the options allowed for it.
|
|
log::dwarning
|
|
{
|
|
log, "%s depth:%ld prev_events:%zu miss:%zu fetching:%zu fetching ...",
|
|
loghead(eval),
|
|
at<"depth"_>(event),
|
|
prev_count,
|
|
prev_count - prev_exists,
|
|
std::distance(begin(futures), end(futures)),
|
|
};
|
|
|
|
auto fetching
|
|
{
|
|
ctx::when_all(begin(futures), end(futures))
|
|
};
|
|
|
|
const auto timeout
|
|
{
|
|
now<system_point>() + seconds(event_timeout)
|
|
};
|
|
|
|
const milliseconds &check_interval
|
|
{
|
|
prev_fetch_check_interval
|
|
};
|
|
|
|
// Rather than waiting for all of the events to arrive or for the entire
|
|
// timeout to expire, we check if the sought events made it to the server
|
|
// in the meantime. If so we can drop these requests and bail.
|
|
//TODO: Ideally should be replaced with listener/notification/hook on the
|
|
//TODO: events arriving rather than this coarse sleep cycles.
|
|
while(now<system_point>() < timeout)
|
|
{
|
|
// Wait for an interval to give this loop some iterations.
|
|
if(fetching.wait(check_interval, std::nothrow))
|
|
break;
|
|
|
|
// Check for satisfaction.
|
|
if(prev.prev_events_exist() == prev_count)
|
|
return;
|
|
}
|
|
|
|
// evaluate results
|
|
for(auto &future : futures) try
|
|
{
|
|
m::fetch::result result
|
|
{
|
|
future.get()
|
|
};
|
|
|
|
const json::object content
|
|
{
|
|
result
|
|
};
|
|
|
|
const json::array &pdus
|
|
{
|
|
content["pdus"]
|
|
};
|
|
|
|
auto opts(*eval.opts);
|
|
opts.phase.set(m::vm::phase::FETCH_PREV, false);
|
|
opts.phase.set(m::vm::phase::FETCH_STATE, false);
|
|
opts.notify_servers = false;
|
|
opts.node_id = result.origin;
|
|
log::debug
|
|
{
|
|
log, "%s fetched %zu pdus; evaluating...",
|
|
loghead(eval),
|
|
pdus.size(),
|
|
};
|
|
|
|
vm::eval
|
|
{
|
|
pdus, opts
|
|
};
|
|
}
|
|
catch(const ctx::interrupted &)
|
|
{
|
|
throw;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::derror
|
|
{
|
|
log, "%s prev fetch/eval :%s",
|
|
loghead(eval),
|
|
e.what(),
|
|
};
|
|
}
|
|
|
|
// check if result evals have satisfied this eval now; or throw
|
|
prev_check(event, eval);
|
|
}
|
|
|
|
std::forward_list
|
|
<
|
|
ircd::ctx::future<ircd::m::fetch::result>
|
|
>
|
|
ircd::m::vm::fetch::prev_fetch(const event &event,
|
|
vm::eval &eval,
|
|
const room &room)
|
|
{
|
|
const long room_depth
|
|
{
|
|
m::depth(std::nothrow, room)
|
|
};
|
|
|
|
const long viewport_depth
|
|
{
|
|
room_depth - room::events::viewport_size
|
|
};
|
|
|
|
std::forward_list
|
|
<
|
|
ctx::future<m::fetch::result>
|
|
>
|
|
ret;
|
|
const event::prev prev{event};
|
|
for(size_t i(0); i < prev.prev_events_count(); ++i) try
|
|
{
|
|
const auto &prev_id
|
|
{
|
|
prev.prev_event(i)
|
|
};
|
|
|
|
if(m::exists(prev_id))
|
|
continue;
|
|
|
|
const long depth_gap
|
|
{
|
|
std::max(std::abs(at<"depth"_>(event) - room_depth), 1L)
|
|
};
|
|
|
|
m::fetch::opts opts;
|
|
opts.op = m::fetch::op::backfill;
|
|
opts.room_id = room.room_id;
|
|
opts.event_id = prev_id;
|
|
opts.backfill_limit = size_t(depth_gap);
|
|
opts.backfill_limit = std::min(opts.backfill_limit, eval.opts->fetch_prev_limit);
|
|
opts.backfill_limit = std::min(opts.backfill_limit, size_t(prev_backfill_limit));
|
|
opts.hint =
|
|
{
|
|
eval.opts->node_id && !my_host(eval.opts->node_id)?
|
|
eval.opts->node_id:
|
|
|
|
prev_id.host() && !my_host(prev_id.host())?
|
|
prev_id.host():
|
|
|
|
!my_host(json::get<"origin"_>(event))?
|
|
string_view(json::get<"origin"_>(event)):
|
|
|
|
room.room_id.host() && !my_host(room.room_id.host())?
|
|
room.room_id.host():
|
|
|
|
string_view{}
|
|
};
|
|
|
|
log::debug
|
|
{
|
|
log, "%s requesting backfill off %s; depth:%ld viewport:%ld room:%ld gap:%ld limit:%zu",
|
|
loghead(eval),
|
|
string_view{prev_id},
|
|
at<"depth"_>(event),
|
|
viewport_depth,
|
|
room_depth,
|
|
depth_gap,
|
|
opts.backfill_limit,
|
|
};
|
|
|
|
ret.emplace_front(m::fetch::start(opts));
|
|
}
|
|
catch(const ctx::interrupted &)
|
|
{
|
|
throw;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::derror
|
|
{
|
|
log, "%s requesting backfill off prev %s; depth:%ld :%s",
|
|
loghead(eval),
|
|
string_view{prev.prev_event(i)},
|
|
json::get<"depth"_>(event),
|
|
e.what(),
|
|
};
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
//TODO: Adjust when PDU lookahead/lookaround is fixed in the vm::eval iface.
|
|
//TODO: Wait on another eval completion instead of just coarse sleep()'s.
|
|
bool
|
|
ircd::m::vm::fetch::prev_wait(const event &event,
|
|
vm::eval &eval)
|
|
{
|
|
const auto &opts(*eval.opts);
|
|
const event::prev prev(event);
|
|
const size_t prev_count
|
|
{
|
|
prev.prev_events_count()
|
|
};
|
|
|
|
const size_t &wait_count
|
|
{
|
|
ssize_t(opts.fetch_prev_wait_count) >= 0?
|
|
opts.fetch_prev_wait_count:
|
|
size_t(prev_wait_count)
|
|
};
|
|
|
|
const milliseconds &wait_time
|
|
{
|
|
opts.fetch_prev_wait_time >= 0ms?
|
|
opts.fetch_prev_wait_time:
|
|
milliseconds(prev_wait_time)
|
|
};
|
|
|
|
size_t i(0); while(i < wait_count)
|
|
{
|
|
sleep(milliseconds(++i * wait_time));
|
|
if(prev_count == prev.prev_events_exist())
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void
|
|
ircd::m::vm::fetch::prev_check(const event &event,
|
|
vm::eval &eval)
|
|
{
|
|
const auto &opts
|
|
{
|
|
*eval.opts
|
|
};
|
|
|
|
const event::prev prev
|
|
{
|
|
event
|
|
};
|
|
|
|
const size_t prev_exists
|
|
{
|
|
prev.prev_events_exist()
|
|
};
|
|
|
|
// Aborts this event if the options want us to guarantee at least one
|
|
// prev_event was fetched and evaluated for this event. This is generally
|
|
// used in conjunction with the fetch_prev_wait option to be effective.
|
|
if(opts.fetch_prev_any && !prev_exists)
|
|
throw vm::error
|
|
{
|
|
vm::fault::EVENT, "Failed to fetch any of the %zu prev_events for %s in %s",
|
|
prev.prev_events_count(),
|
|
string_view{event.event_id},
|
|
json::get<"room_id"_>(event)
|
|
};
|
|
|
|
// Aborts this event if the options want us to guarantee ALL of the
|
|
// prev_events were fetched and evaluated for this event.
|
|
if(opts.fetch_prev_all && prev_exists < prev.prev_events_count())
|
|
throw vm::error
|
|
{
|
|
vm::fault::EVENT, "Missing %zu of %zu required prev_events for %s in %s",
|
|
prev_exists,
|
|
prev.prev_events_count(),
|
|
string_view{event.event_id},
|
|
json::get<"room_id"_>(event)
|
|
};
|
|
}
|