0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-10 05:58:56 +02:00

ircd:Ⓜ️:fetch: Add fetch+eval workers; start central interface; various.

This commit is contained in:
Jason Volk 2019-04-10 21:16:00 -07:00
parent 1209d47fa5
commit 562651aa7d
7 changed files with 549 additions and 75 deletions

26
include/ircd/m/fetch.h Normal file
View file

@ -0,0 +1,26 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2019 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_M_FETCH_H
/// Event Fetcher (remote). This is probably not the interface you want
/// because there is no ordinary reason for developers fetch a remote event
/// directly. This is an interface to the low-level fetch system.
///
namespace ircd::m::fetch
{
struct request;
void state_ids(const room &, const net::hostport &);
void state_ids(const room &);
extern log::log log;
}

View file

@ -62,6 +62,7 @@ namespace ircd::m::vm
#include "feds.h"
#include "app.h"
#include "sync.h"
#include "fetch.h"
struct ircd::m::init
{

View file

@ -429,6 +429,44 @@ ircd::m::self::init::init(const string_view &origin,
};
}
///////////////////////////////////////////////////////////////////////////////
//
// m/fetch.h
//
decltype(ircd::m::fetch::log)
ircd::m::fetch::log
{
"matrix.fetch"
};
void
ircd::m::fetch::state_ids(const room &r)
{
using prototype = void (const room &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::state_ids"
};
call(r);
}
void
ircd::m::fetch::state_ids(const room &r,
const net::hostport &hp)
{
using prototype = void (const room &, const net::hostport &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::state_ids"
};
call(r, hp);
}
///////////////////////////////////////////////////////////////////////////////
//
// m/sync.h

View file

@ -240,12 +240,8 @@ bootstrap(const string_view &host,
};
m::vm::copts opts;
opts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
opts.prev_check_exists = false;
opts.head_must_exist = false;
opts.history = false;
opts.infolog_accept = true;
opts.eval = false;
const m::event::id::buf event_id
{
m::vm::eval

View file

@ -10599,7 +10599,7 @@ console_cmd__fed__sync(opt &out, const string_view &line)
m::vm::opts vmopts;
vmopts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
vmopts.prev_check_exists = false;
vmopts.head_must_exist = false;
vmopts.state_must_exist = false;
vmopts.history = false;
vmopts.verify = false;
vmopts.notify = false;
@ -10708,7 +10708,7 @@ console_cmd__fed__state(opt &out, const string_view &line)
m::vm::opts vmopts;
vmopts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
vmopts.prev_check_exists = false;
vmopts.head_must_exist = false;
vmopts.state_must_exist = false;
vmopts.verify = false;
vmopts.history = false;
vmopts.notify = false;
@ -10867,7 +10867,7 @@ console_cmd__fed__backfill(opt &out, const string_view &line)
m::vm::opts vmopts;
vmopts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
vmopts.prev_check_exists = false;
vmopts.head_must_exist = false;
vmopts.state_must_exist = false;
vmopts.history = false;
vmopts.verify = false;
vmopts.notify = false;
@ -10982,6 +10982,7 @@ console_cmd__fed__event(opt &out, const string_view &line)
m::v1::event::opts opts;
opts.remote = remote;
opts.dynamic = false;
const unique_buffer<mutable_buffer> buf
{
96_KiB
@ -11044,17 +11045,16 @@ console_cmd__fed__event(opt &out, const string_view &line)
m::vm::opts vmopts;
vmopts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
vmopts.prev_check_exists = false;
vmopts.head_must_exist = false;
vmopts.prev_check_exists = true;
vmopts.state_must_exist = true;
vmopts.history = false;
vmopts.notify = false;
vmopts.verify = false;
//vmopts.verify = false;
m::vm::eval eval
{
vmopts
event, vmopts
};
eval(event);
return true;
}
@ -11981,3 +11981,33 @@ console_cmd__mc__register__available(opt &out, const string_view &line)
out << uint(code) << ": " << string_view{response} << std::endl;
return true;
}
//
// fetch
//
bool
console_cmd__fetch(opt &out, const string_view &line)
{
const params param{line, " ",
{
"room_id", "remote"
}};
const auto room_id
{
m::room_id(param.at("room_id"))
};
const net::hostport hostport
{
param["remote"]?
param.at("remote"):
room_id.host()
};
m::fetch::state_ids(room_id, hostport);
out << "done" << std::endl;
return true;
}

View file

@ -16,6 +16,39 @@ IRCD_MODULE
"Event Fetch Unit", ircd::m::fetch::init, ircd::m::fetch::fini
};
decltype(ircd::m::fetch::hook)
ircd::m::fetch::hook
{
hook_handler,
{
{ "_site", "vm.fetch" }
}
};
decltype(ircd::m::fetch::request_context)
ircd::m::fetch::request_context
{
"m::fetch req", 512_KiB, &request_worker, context::POST
};
decltype(ircd::m::fetch::eval_context)
ircd::m::fetch::eval_context
{
"m::fetch eval", 512_KiB, &eval_worker, context::POST
};
decltype(ircd::m::fetch::complete)
ircd::m::fetch::complete;
decltype(ircd::m::fetch::rooms)
ircd::m::fetch::rooms;
decltype(ircd::m::fetch::requests)
ircd::m::fetch::requests;
decltype(ircd::m::fetch::dock)
ircd::m::fetch::dock;
//
// init
//
@ -29,22 +62,16 @@ ircd::m::fetch::init()
void
ircd::m::fetch::fini()
{
request_context.terminate();
eval_context.terminate();
request_context.join();
eval_context.join();
}
//
// fetch_phase
//
decltype(ircd::m::fetch::hook)
ircd::m::fetch::hook
{
hook_handler,
{
{ "_site", "vm.fetch" }
}
};
void
ircd::m::fetch::hook_handler(const event &event,
vm::eval &eval)
@ -69,19 +96,24 @@ ircd::m::fetch::hook_handler(const event &event,
at<"room_id"_>(event)
};
if(!exists(room_id))
if(opts.state_check_exists && !exists(room_id))
{
if((opts.head_must_exist || opts.history) && !opts.fetch_auth_chain)
// Don't pass event_id in ctor here or m::NOT_FOUND.
m::room room{room_id};
room.event_id = event_id;
if(opts.state_must_exist && !opts.state_wait && !opts.state_fetch)
throw vm::error
{
vm::fault::STATE, "Missing state for room %s",
string_view{room_id}
};
m::room room{room_id};
room.event_id = event_id;
const m::room::auth auth{room};
auth.chain_eval(auth, event_id.host());
if(opts.auth_chain_fetch)
{
const m::room::auth auth{room};
auth.chain_eval(auth, event_id.host());
}
}
const event::prev prev
@ -94,6 +126,7 @@ ircd::m::fetch::hook_handler(const event &event,
size(json::get<"prev_events"_>(prev))
};
size_t prev_exists(0);
for(size_t i(0); i < prev_count; ++i)
{
const auto &prev_id
@ -101,22 +134,110 @@ ircd::m::fetch::hook_handler(const event &event,
prev.prev_event(i)
};
if(!eval.opts->prev_check_exists)
if(!opts.prev_check_exists)
continue;
if(exists(prev_id))
continue;
throw vm::error
{
vm::fault::EVENT, "Missing prev %s in %s in %s",
string_view{prev_id},
json::get<"event_id"_>(*eval.event_),
json::get<"room_id"_>(*eval.event_)
};
++prev_exists;
continue;
}
if(!opts.prev_fetch || !opts.prev_wait)
if(opts.prev_must_all_exist)
throw vm::error
{
vm::fault::EVENT, "Missing prev %s in %s in %s",
string_view{prev_id},
json::get<"event_id"_>(*eval.event_),
json::get<"room_id"_>(*eval.event_)
};
}
}
///////////////////////////////////////////////////////////////////////////////
//
// m/fetch.h
//
void
IRCD_MODULE_EXPORT
ircd::m::fetch::state_ids(const room &room)
{
const m::room::origins origins{room};
origins.for_each([&room](const string_view &origin)
{
log::debug
{
log, "Requesting state_ids for %s from '%s'",
string_view{room.room_id},
string_view{origin},
};
try
{
state_ids(room, origin);
}
catch(const std::exception &e)
{
log::error
{
log, "Requesting state_ids for %s from '%s' :%s",
string_view{room.room_id},
origin,
e.what(),
};
}
});
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::state_ids(const room &room,
const net::hostport &remote)
{
m::v1::state::opts opts;
opts.remote = remote;
opts.event_id = room.event_id;
opts.ids_only = true;
opts.dynamic = true;
const unique_buffer<mutable_buffer> buf
{
8_KiB
};
m::v1::state request
{
room.room_id, buf, std::move(opts)
};
request.wait(seconds(20)); //TODO: conf
request.get();
const json::object &response
{
request
};
const json::array &auth_chain_ids
{
response["auth_chain_ids"]
};
const json::array &pdu_ids
{
response["pdu_ids"]
};
for(const json::string &event_id : auth_chain_ids)
if(!exists(m::event::id(event_id)))
start(event_id, room.room_id);
for(const json::string &event_id : pdu_ids)
if(!exists(m::event::id(event_id)))
start(event_id, room.room_id);
}
//
// auth chain fetch
//
@ -187,10 +308,217 @@ ircd::m::room::auth::chain_fetch(const auth &auth,
return true;
}
//
// request worker
//
void
ircd::m::fetch::request_worker()
try
{
while(1)
{
dock.wait([]
{
return std::any_of(begin(requests), end(requests), []
(const request &r)
{
return r.finished == 0;
});
});
request_handle();
}
}
catch(const std::exception &e)
{
log::critical
{
log, "fetch request worker :%s",
e.what()
};
throw;
}
void
ircd::m::fetch::request_handle()
{
auto next
{
ctx::when_any(requests.begin(), requests.end())
};
if(!next.wait(seconds(5), std::nothrow))
{
for(const auto &request_ : requests)
{
auto &request(const_cast<fetch::request &>(request_));
if(!request.finished)
request.retry();
}
return;
}
const auto it
{
next.get()
};
if(it == end(requests))
return;
request_handle(it);
}
void
ircd::m::fetch::request_handle(const decltype(requests)::iterator &it)
try
{
auto &request
{
const_cast<fetch::request &>(*it)
};
if(request.finished)
return;
if(!request.handle())
return;
complete.emplace_back(it);
dock.notify_all();
}
catch(const std::exception &e)
{
log::error
{
log, "fetch eval %s in %s :%s",
string_view{it->event_id},
string_view{it->room_id},
e.what()
};
}
//
// eval worker
//
void
ircd::m::fetch::eval_worker()
try
{
while(1)
{
dock.wait([]
{
return !complete.empty();
});
eval_handle();
}
}
catch(const std::exception &e)
{
log::critical
{
log, "fetch eval worker :%s",
e.what()
};
throw;
}
void
ircd::m::fetch::eval_handle()
{
const unwind pop{[]
{
complete.pop_front();
}};
const auto &it
{
complete.front()
};
const unwind erase{[&it]
{
requests.erase(it);
}};
eval_handle(it);
}
void
ircd::m::fetch::eval_handle(const decltype(requests)::iterator &it)
try
{
auto &request
{
const_cast<fetch::request &>(*it)
};
if(request.eptr)
std::rethrow_exception(request.eptr);
const json::object &event
{
request
};
m::vm::opts opts;
opts.prev_check_exists = false;
opts.infolog_accept = true;
m::vm::eval
{
m::event{event}, opts
};
}
catch(const std::exception &e)
{
log::error
{
log, "fetch eval %s in %s :%s",
string_view{it->event_id},
string_view{it->room_id},
e.what()
};
}
//
// fetch::request
//
bool
ircd::m::fetch::operator<(const request &a,
const request &b)
noexcept
{
return a.event_id < b.event_id;
}
bool
ircd::m::fetch::operator<(const request &a,
const string_view &b)
noexcept
{
return a.event_id < b;
}
bool
ircd::m::fetch::operator<(const string_view &a,
const request &b)
noexcept
{
return a < b.event_id;
}
//
// fetch::request::request
//
ircd::m::fetch::request::request(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer &buf)
@ -199,7 +527,7 @@ ircd::m::fetch::request::request(const m::room::id &room_id,
,_buf
{
!buf?
unique_buffer<mutable_buffer>{96_KiB}:
unique_buffer<mutable_buffer>{8_KiB}:
unique_buffer<mutable_buffer>{}
}
,buf
@ -207,22 +535,21 @@ ircd::m::fetch::request::request(const m::room::id &room_id,
empty(buf)? _buf: buf
}
{
// Ensure buffer has enough room for a worst-case event, request
// headers and response headers.
assert(size(this->buf) >= 64_KiB + 8_KiB + 8_KiB);
//assert(size(this->buf) >= 64_KiB + 8_KiB + 8_KiB);
assert(size(this->buf) >= 8_KiB);
}
void
ircd::m::fetch::request::start()
{
m::v1::event::opts opts;
opts.dynamic = false;
opts.dynamic = true;
opts.remote = origin?: select_random_origin();
start(std::move(opts));
start(opts);
}
void
ircd::m::fetch::request::start(m::v1::event::opts &&opts)
ircd::m::fetch::request::start(m::v1::event::opts &opts)
{
if(!started)
started = ircd::time();
@ -232,6 +559,16 @@ ircd::m::fetch::request::start(m::v1::event::opts &&opts)
{
this->event_id, this->buf, std::move(opts)
};
dock.notify_all();
log::debug
{
log, "Started request for %s in %s from '%s'",
string_view{event_id},
string_view{room_id},
string_view{origin},
};
}
ircd::string_view
@ -301,11 +638,33 @@ ircd::m::fetch::request::handle()
future.wait(); try
{
future.get();
const auto code
{
future.get()
};
log::debug
{
log, "%u %s for %s in %s from '%s'",
uint(code),
status(code),
string_view{event_id},
string_view{room_id},
string_view{origin}
};
}
catch(...)
{
eptr = std::current_exception();
log::derror
{
log, "Failure for %s in %s from '%s' :%s",
string_view{event_id},
string_view{room_id},
string_view{origin},
what(eptr),
};
}
if(!eptr)
@ -320,6 +679,7 @@ void
ircd::m::fetch::request::retry()
try
{
server::cancel(*this);
eptr = std::exception_ptr{};
origin = {};
start();
@ -334,28 +694,5 @@ void
ircd::m::fetch::request::finish()
{
finished = ircd::time();
}
bool
ircd::m::fetch::request::operator()(const request &a,
const request &b)
const
{
return a.event_id < b.event_id;
}
bool
ircd::m::fetch::request::operator()(const request &a,
const string_view &b)
const
{
return a.event_id < b;
}
bool
ircd::m::fetch::request::operator()(const string_view &a,
const request &b)
const
{
return a < b.event_id;
dock.notify_all();
}

View file

@ -13,9 +13,26 @@ namespace ircd::m::fetch
{
struct request;
static void hook_handler(const event &, vm::eval &);
static bool operator<(const request &a, const request &b) noexcept;
static bool operator<(const request &a, const string_view &b) noexcept;
static bool operator<(const string_view &a, const request &b) noexcept;
extern ctx::dock dock;
extern std::set<request, std::less<>> requests;
extern std::multimap<m::room::id, request *> rooms;
extern std::deque<decltype(requests)::iterator> complete;
extern ctx::context eval_context;
extern ctx::context request_context;
extern hookfn<vm::eval &> hook;
template<class... args> static void start(const m::event::id &, const m::room::id &, args&&...);
static void eval_handle(const decltype(requests)::iterator &);
static void eval_handle();
static void eval_worker();
static void request_handle(const decltype(requests)::iterator &);
static void request_handle();
static void request_worker();
static void hook_handler(const event &, vm::eval &);
static void init();
static void fini();
}
@ -37,17 +54,13 @@ struct ircd::m::fetch::request
time_t finished {0};
std::exception_ptr eptr;
bool operator()(const request &a, const request &b) const;
bool operator()(const request &a, const string_view &b) const;
bool operator()(const string_view &a, const request &b) const;
void finish();
void retry();
bool handle();
string_view select_origin(const string_view &);
string_view select_random_origin();
void start(m::v1::event::opts &&);
void start(m::v1::event::opts &);
void start();
request(const m::room::id &room_id,
@ -58,3 +71,36 @@ struct ircd::m::fetch::request
request(request &&) = delete;
request(const request &) = delete;
};
template<class... args>
void
ircd::m::fetch::start(const m::event::id &event_id,
const m::room::id &room_id,
args&&... a)
{
auto it
{
requests.lower_bound(string_view(event_id))
};
if(it == end(requests) || it->event_id != event_id) try
{
it = requests.emplace_hint(it, room_id, event_id, std::forward<args>(a)...);
const_cast<request &>(*it).start();
}
catch(const std::exception &e)
{
log::error
{
m::log, "Failed to start fetch for %s in %s :%s",
string_view{event_id},
string_view{room_id},
e.what(),
};
requests.erase(it);
return;
};
assert(it->room_id == room_id);
}