0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-05-29 08:13:46 +02:00

ircd:Ⓜ️:fetch: Add additional interface tools; consolidate interface; cleanup/reorg.

ircd:Ⓜ️:fetch: Add backfill suite.
This commit is contained in:
Jason Volk 2019-04-12 04:02:09 -07:00
parent 7d27126f96
commit eb33688bc1
8 changed files with 367 additions and 220 deletions

View file

@ -19,10 +19,21 @@ namespace ircd::m::fetch
{
struct request;
static bool for_each(const std::function<bool (request &)> &);
// Observers
bool for_each(const std::function<bool (request &)> &);
bool exists(const m::event::id &);
// Control panel
bool cancel(request &);
void start(const m::room::id &, const m::event::id &);
bool prefetch(const m::room::id &, const m::event::id &);
// Composed operations
void auth_chain(const room &, const net::hostport &);
void state_ids(const room &, const net::hostport &);
void state_ids(const room &);
void backfill(const room &, const net::hostport &);
void backfill(const room &);
extern log::log log;
}
@ -47,7 +58,4 @@ struct ircd::m::fetch::request
request(const m::room::id &, const m::event::id &, const size_t &bufsz = 8_KiB);
request(request &&) = delete;
request(const request &) = delete;
static void start(const m::room::id &, const m::event::id &);
static bool prefetch(const m::room::id &, const m::event::id &);
};

View file

@ -22,10 +22,6 @@ struct ircd::m::room::auth
static bool for_each(const auth &, const closure_bool &);
static void make_refs(const auth &, json::stack::array &, const types &, const m::id::user & = {});
using fetch_closure = std::function<bool (const json::object &)>;
static bool chain_fetch(const auth &, const net::hostport &, const fetch_closure &);
static void chain_eval(const auth &, const net::hostport &);
m::room room;
public:

127
ircd/m.cc
View file

@ -440,6 +440,33 @@ ircd::m::fetch::log
"matrix.fetch"
};
void
ircd::m::fetch::backfill(const room &r)
{
using prototype = void (const room &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::backfill"
};
call(r);
}
void
ircd::m::fetch::backfill(const room &r,
const net::hostport &hp)
{
using prototype = void (const room &, const net::hostport &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::backfill"
};
call(r, hp);
}
void
ircd::m::fetch::state_ids(const room &r)
{
@ -467,6 +494,74 @@ ircd::m::fetch::state_ids(const room &r,
call(r, hp);
}
void
ircd::m::fetch::auth_chain(const room &r,
const net::hostport &hp)
{
using prototype = void (const room &, const net::hostport &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::auth_chain"
};
call(r, hp);
}
bool
ircd::m::fetch::prefetch(const m::room::id &room_id,
const m::event::id &event_id)
{
using prototype = bool (const m::room::id &, const m::event::id &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::prefetch"
};
return call(room_id, event_id);
}
void
ircd::m::fetch::start(const m::room::id &room_id,
const m::event::id &event_id)
{
using prototype = void (const m::room::id &, const m::event::id &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::start"
};
return call(room_id, event_id);
}
bool
ircd::m::fetch::cancel(request &r)
{
using prototype = bool (request &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::cancel"
};
return call(r);
}
bool
ircd::m::fetch::exists(const m::event::id &event_id)
{
using prototype = bool (const m::event::id &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::exists"
};
return call(event_id);
}
bool
ircd::m::fetch::for_each(const std::function<bool (request &)> &closure)
{
@ -480,38 +575,6 @@ ircd::m::fetch::for_each(const std::function<bool (request &)> &closure)
return call(closure);
}
//
// request
//
bool
ircd::m::fetch::request::prefetch(const m::room::id &room_id,
const m::event::id &event_id)
{
using prototype = bool (const m::room::id &, const m::event::id &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::request::prefetch"
};
return call(room_id, event_id);
}
void
ircd::m::fetch::request::start(const m::room::id &room_id,
const m::event::id &event_id)
{
using prototype = void (const m::room::id &, const m::event::id &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::request::start"
};
return call(room_id, event_id);
}
///////////////////////////////////////////////////////////////////////////////
//
// m/sync.h

View file

@ -2281,35 +2281,6 @@ ircd::m::room::head::reset(const head &h)
// room::auth
//
void
ircd::m::room::auth::chain_eval(const auth &a,
const net::hostport &h)
{
using prototype = bool (const auth &, const net::hostport &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::room::auth::chain_eval"
};
call(a, h);
}
bool
ircd::m::room::auth::chain_fetch(const auth &a,
const net::hostport &h,
const fetch_closure &c)
{
using prototype = bool (const auth &, const net::hostport &, const fetch_closure &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::room::auth::chain_fetch"
};
return call(a, h, c);
}
ircd::json::array
ircd::m::room::auth::make_refs(const mutable_buffer &buf,
const types &types,

View file

@ -260,14 +260,5 @@ get__initialsync_remote(client &client,
room_.event_id.host()
};
m::room::auth auth{room_};
m::room::auth::chain_eval(auth, remote);
using prototype = void (const m::room &);
static mods::import<prototype> state_ids
{
"s_fetch", "ircd::m::fetch::state_ids"
};
state_ids(room_);
m::fetch::state_ids(room_);
}

View file

@ -12307,7 +12307,7 @@ console_cmd__fetch(opt &out, const string_view &line)
param.at("event_id")
};
m::fetch::request::start(room_id, event_id);
m::fetch::start(room_id, event_id);
out << "in work..." << std::endl;
return true;
}

View file

@ -104,7 +104,7 @@ ircd::m::fetch::hook_handler(const event &event,
at<"room_id"_>(event)
};
if(opts.state_check_exists && !exists(room_id))
if(opts.state_check_exists && !m::exists(room_id))
{
// Don't pass event_id in ctor here or m::NOT_FOUND.
m::room room{room_id};
@ -118,10 +118,7 @@ ircd::m::fetch::hook_handler(const event &event,
};
if(opts.auth_chain_fetch)
{
const m::room::auth auth{room};
auth.chain_eval(auth, event_id.host());
}
fetch::auth_chain(room, room_id.host()); //TODO: XXX
}
const event::prev prev
@ -145,7 +142,7 @@ ircd::m::fetch::hook_handler(const event &event,
if(!opts.prev_check_exists)
continue;
if(exists(prev_id))
if(m::exists(prev_id))
{
++prev_exists;
continue;
@ -166,7 +163,7 @@ ircd::m::fetch::hook_handler(const event &event,
};
if(can_fetch)
start(prev_id, room_id);
start(room_id, prev_id);
if(can_fetch && opts.prev_wait)
dock.wait([&prev_id]
@ -181,6 +178,144 @@ ircd::m::fetch::hook_handler(const event &event,
// m/fetch.h
//
namespace ircd::m::fetch
{
static void handle_backfill(const m::room &, const m::feds::result &);
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::backfill(const room &room)
{
m::feds::opts opts;
opts.room_id = room.room_id;
opts.event_id = room.event_id;
opts.argi[0] = 4;
m::feds::backfill(opts, [&room]
(const auto &result)
{
handle_backfill(room, result);
return true;
});
}
void
ircd::m::fetch::handle_backfill(const m::room &room,
const m::feds::result &result)
try
{
if(result.eptr)
std::rethrow_exception(result.eptr);
const json::array &pdus
{
result.object["pdus"]
};
log::debug
{
log, "Got %zu events for %s off %s from '%s'",
pdus.size(),
string_view{room.room_id},
string_view{room.event_id},
string_view{result.origin},
};
for(const json::object &event : pdus)
{
const m::event::id &event_id
{
unquote(event.get("event_id"))
};
if(m::exists(event_id))
continue;
m::vm::opts vmopts;
vmopts.nothrows = -1;
m::vm::eval
{
m::event{event}, vmopts
};
}
}
catch(const std::exception &e)
{
log::error
{
log, "Requesting backfill for %s off %s from '%s' :%s",
string_view{room.room_id},
string_view{room.event_id},
result.origin,
e.what(),
};
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::backfill(const room &room,
const net::hostport &remote)
{
m::event::id::buf event_id;
if(!room.event_id)
event_id = m::v1::fetch_head(room.room_id, remote, room.any_user(my_host(), "join"));
m::v1::backfill::opts opts;
opts.remote = remote;
opts.event_id = room.event_id?: event_id;
opts.dynamic = true;
const unique_buffer<mutable_buffer> buf
{
8_KiB
};
m::v1::backfill request
{
room.room_id, buf, std::move(opts)
};
request.wait(seconds(20)); //TODO: conf
request.get();
const json::object &response
{
request
};
log::debug
{
log, "Got %zu events for %s off %s from '%s'",
json::array{response["pdus"]}.size(),
string_view{room.room_id},
string_view{room.event_id},
host(remote),
};
for(const json::object &event : json::array(response["pdus"]))
{
const m::event::id &event_id
{
unquote(event.get("event_id"))
};
if(m::exists(event_id))
continue;
m::vm::opts vmopts;
vmopts.nothrows = -1;
m::vm::eval
{
m::event{event}, vmopts
};
}
}
namespace ircd::m::fetch
{
static void handle_state_ids(const m::room &, const m::feds::result &);
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::state_ids(const room &room)
@ -189,45 +324,51 @@ ircd::m::fetch::state_ids(const room &room)
opts.room_id = room.room_id;
opts.event_id = room.event_id;
opts.arg[0] = "ids";
m::feds::state(opts, [&room](const auto &result)
m::feds::state(opts, [&room]
(const auto &result)
{
try
{
if(result.eptr)
std::rethrow_exception(result.eptr);
const json::array &ids
{
result.object["pdu_ids"]
};
log::debug
{
log, "Got %zu state_ids for %s from '%s'",
ids.size(),
string_view{room.room_id},
string_view{result.origin},
};
for(const json::string &event_id : ids)
if(!exists(m::event::id(event_id)))
start(event_id, room.room_id);
}
catch(const std::exception &e)
{
log::error
{
log, "Requesting state_ids for %s from '%s' :%s",
string_view{room.room_id},
result.origin,
e.what(),
};
}
handle_state_ids(room, result);
return true;
});
}
void
ircd::m::fetch::handle_state_ids(const m::room &room,
const m::feds::result &result)
try
{
if(result.eptr)
std::rethrow_exception(result.eptr);
const json::array &ids
{
result.object["pdu_ids"]
};
log::debug
{
log, "Got %zu state_ids for %s from '%s'",
ids.size(),
string_view{room.room_id},
string_view{result.origin},
};
for(const json::string &event_id : ids)
if(!m::exists(m::event::id(event_id)))
start(room.room_id, event_id);
}
catch(const std::exception &e)
{
log::error
{
log, "Requesting state_ids for %s from '%s' :%s",
string_view{room.room_id},
result.origin,
e.what(),
};
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::state_ids(const room &room,
@ -267,12 +408,79 @@ ircd::m::fetch::state_ids(const room &room,
};
for(const json::string &event_id : auth_chain_ids)
if(!exists(m::event::id(event_id)))
start(event_id, room.room_id);
if(!m::exists(m::event::id(event_id)))
start(room.room_id, event_id);
for(const json::string &event_id : pdu_ids)
if(!exists(m::event::id(event_id)))
start(event_id, room.room_id);
if(!m::exists(m::event::id(event_id)))
start(room.room_id, event_id);
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::auth_chain(const room &room,
const net::hostport &remote)
{
m::v1::event_auth::opts opts;
opts.remote = remote;
opts.dynamic = true;
const unique_buffer<mutable_buffer> buf
{
8_KiB
};
m::v1::event_auth request
{
room.room_id, room.event_id, buf, std::move(opts)
};
request.wait(seconds(20)); //TODO: conf
request.get();
const json::array &array
{
request
};
std::vector<json::object> events(array.count());
std::copy(begin(array), end(array), begin(events));
std::sort(begin(events), end(events), []
(const json::object &a, const json::object &b)
{
return a.at<uint64_t>("depth") < b.at<uint64_t>("depth");
});
m::vm::opts vmopts;
vmopts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
vmopts.infolog_accept = true;
vmopts.prev_check_exists = false;
vmopts.warnlog |= m::vm::fault::STATE;
vmopts.warnlog &= ~m::vm::fault::EXISTS;
vmopts.errorlog &= ~m::vm::fault::STATE;
for(const auto &event : events)
m::vm::eval
{
m::event{event}, vmopts
};
}
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::prefetch(const m::room::id &room_id,
const m::event::id &event_id)
{
if(m::exists(event_id))
return false;
start(room_id, event_id);
return true;
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::start(const m::room::id &room_id,
const m::event::id &event_id)
{
submit(event_id, room_id);
}
bool
@ -286,76 +494,6 @@ ircd::m::fetch::for_each(const std::function<bool (request &)> &closure)
return true;
}
//
// auth chain fetch
//
void
IRCD_MODULE_EXPORT
ircd::m::room::auth::chain_eval(const auth &auth,
const net::hostport &remote)
{
m::vm::opts opts;
opts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
opts.infolog_accept = true;
opts.prev_check_exists = false;
opts.warnlog |= m::vm::fault::STATE;
opts.warnlog &= ~m::vm::fault::EXISTS;
opts.errorlog &= ~m::vm::fault::STATE;
chain_fetch(auth, remote, [&opts]
(const json::object &event)
{
m::vm::eval
{
m::event{event}, opts
};
return true;
});
}
bool
IRCD_MODULE_EXPORT
ircd::m::room::auth::chain_fetch(const auth &auth,
const net::hostport &remote,
const fetch_closure &closure)
{
m::v1::event_auth::opts opts;
opts.remote = remote;
opts.dynamic = true;
const unique_buffer<mutable_buffer> buf
{
16_KiB
};
m::v1::event_auth request
{
auth.room.room_id, auth.room.event_id, buf, std::move(opts)
};
request.wait(seconds(20)); //TODO: conf
request.get();
const json::array &auth_chain
{
request
};
std::vector<json::object> events(auth_chain.count());
std::copy(begin(auth_chain), end(auth_chain), begin(events));
std::sort(begin(events), end(events), []
(const json::object &a, const json::object &b)
{
return a.at<uint64_t>("depth") < b.at<uint64_t>("depth");
});
for(const auto &event : events)
if(!closure(event))
return false;
return true;
}
//
// request worker
//
@ -578,26 +716,6 @@ catch(const std::exception &e)
// fetch::request
//
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::request::prefetch(const m::room::id &room_id,
const m::event::id &event_id)
{
if(m::exists(event_id))
return false;
start(room_id, event_id);
return true;
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::request::start(const m::room::id &room_id,
const m::event::id &event_id)
{
fetch::start(event_id, room_id);
}
void
ircd::m::fetch::start(request &request)
{

View file

@ -34,7 +34,7 @@ namespace ircd::m::fetch
static void start(request &);
static bool handle(request &);
template<class... args> static void start(const event::id &, const room::id &, const size_t &bufsz = 8_KiB, args&&...);
template<class... args> static void submit(const event::id &, const room::id &, const size_t &bufsz = 8_KiB, args&&...);
static void eval_handle(const decltype(requests)::iterator &);
static void eval_handle();
static void eval_worker();
@ -49,10 +49,10 @@ namespace ircd::m::fetch
template<class... args>
void
ircd::m::fetch::start(const m::event::id &event_id,
const m::room::id &room_id,
const size_t &bufsz,
args&&... a)
ircd::m::fetch::submit(const m::event::id &event_id,
const m::room::id &room_id,
const size_t &bufsz,
args&&... a)
{
auto it
{