0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-05-29 08:13:46 +02:00

ircd:Ⓜ️:fetch: Cleanup/document interface; add basis for operation abstraction.

This commit is contained in:
Jason Volk 2019-08-29 18:30:47 -07:00
parent fb8e3de485
commit ac573cd21d
4 changed files with 445 additions and 227 deletions

View file

@ -11,54 +11,172 @@
#pragma once
#define HAVE_IRCD_M_FETCH_H
/// Event Fetcher (remote). This is probably not the interface you want
/// because there is no ordinary reason for developers fetch a remote event
/// directly. This is an interface to the low-level fetch system.
/// Event Fetcher (remote).
///
/// This is a federation network interface to find and retrieve datas from
/// remote parties serially. It operates by querying servers in a room until
/// one server can provide a satisfying response. The exact method for
/// determining who to contact, when and how is encapsulated internally for
/// further development, but it is primarily stochastic. This is liable to be
/// optimized with further development of selection algorithms and hinting.
/// All viable servers in a room are exhausted before an error is the result.
///
/// This is an asynchronous promise/future based interface. The result package
/// is delivered by a ctx::future. Note that while fetch::start() is not
/// intended to yield the ircd::ctx, though it is possible in rare cases.
///
/// Due to the composition of multiple operations performed internally, result
/// future has no real timeout control over the operation as a whole. While it
/// can always go out of scope for an effective cancelation, internal conf::item
/// are used to timeout failures after a deterministic `timeout * servers`.
/// This means the user is not required to wait_for() or wait_until() on the
/// future unless they want a stricter timeout; that may miss a valid response
/// for a rare piece of data held by a minority of servers.
///
/// Alternatively, m::feds is another federation network interface geared to
/// conducting a parallel request to every server in a room; this conducts a
/// serial request to every server in a room (and stopping when satisfied).
///
namespace ircd::m::fetch
{
struct opts;
struct result;
struct request;
enum class op :uint8_t;
// Observers
string_view reflect(const op &);
bool for_each(const std::function<bool (request &)> &);
bool exists(const m::event::id &);
bool exists(const opts &);
size_t count();
// Primary operations
ctx::future<result> start(const m::room::id &, const m::event::id &);
ctx::future<result> start(opts);
}
struct ircd::m::fetch::result
:m::event
enum class ircd::m::fetch::op
:uint8_t
{
unique_buffer<mutable_buffer> buf;
result(request &);
result() = default;
noop,
event,
};
/// Fetch entity state. This is not meant for construction by users of this
/// interface.
struct ircd::m::fetch::opts
{
/// Operation to perform.
fetch::op op {op::noop};
/// room::id apropos. Many federation requests require a room_id, but
/// nevertheless a room_id is still used by this unit as a pool of servers.
room::id room_id;
/// event::id apropos. For op::event operations this is being sought, but
/// for others it may be required as a reference point. If not supplied and
/// required, we'll try to use the top head from any room_id.
event::id event_id;
/// If the op makes use of a spec limit parameter that can be controlled
/// by the user here. The default of 0 will be replaced by some internal
/// configured limit like 8 or 16 etc.
size_t limit {0};
/// The principal allocation size. This is passed up the stack to m::fed,
/// server::request and ends up containing the request head and content,
/// and response head. The response content is usually dynamically
/// allocated and that buffer is the one which ends up in result. Note
/// that sufficiently large values here may allow for eliding the content
/// allocation based on the following formula: >= 16_KiB + (64_KiB * limit)
/// where 16_KiB is [current server default] for headers and 64_KiB is
/// m::event::MAX_SIZE.
size_t bufsz {0};
};
struct ircd::m::fetch::result
{
/// Backing buffer for any data pointed to by this result.
unique_buffer<mutable_buffer> buf;
/// The backing buffer may contain other data ahead of the response
/// content; in any case this points to a view of the response content.
/// User access to response content should be via a json conversion rather
/// than this reference.
string_view content;
/// JSON result conversion. Note that developers should not let the result
/// instance go out of scope by making this conversion.
explicit operator json::object() const;
explicit operator json::array() const;
};
/// Fetch entity state. DO NOT CONSTRUCT. This is an internal structure but we
/// expose it here for examination, statistics and hacking since it has no
/// non-standard symbols; this is simpler than creating some accessor suite.
/// Instances of this object are created and managed internally by the m::fetch
/// unit after a fetch::start() is called. This definition is not required to
/// operate the m::fetch interface as a user.
struct ircd::m::fetch::request
:m::v1::event
{
using is_transparent = void;
m::room::id::buf room_id;
m::event::id::buf event_id;
ctx::promise<result> promise;
unique_buffer<mutable_buffer> buf;
/// Copy of the user's request options. Note that the backing of strings in
/// opts was changed to point at this structure; allowing safe access.
fetch::opts opts;
/// Time the first attempt was made; this value is not modified so it can
/// be used to measure the total time of all attempts.
system_point started;
/// Time the last attempt was started
system_point last;
/// Time the request entered the finished state. This being non-zero
/// indicates a finished state; may be difficult to observe.
system_point finished;
/// State for failed attempts; the names of servers which failed are
/// stored here. Failure here means the request succeeded but the server
/// did not provide a satisfying response. Appearing in this list prevents
/// a server from being selected for the next attempt.
std::set<std::string, std::less<>> attempted;
/// Reference to the current server being attempted. This string is placed
/// in the attempted set at the start of an attempt.
string_view origin;
time_t started {0};
time_t last {0};
time_t finished {0};
/// Our future for the server::request. Since we make
std::unique_ptr<server::request> future;
/// Promise for our user's future of this request.
ctx::promise<result> promise;
/// Error pointer state for an attempt. This is cleared each attempt.
std::exception_ptr eptr;
request(const m::room::id &, const m::event::id &, const size_t &bufsz = 8_KiB);
/// Buffer backing for opts
unique_buffer<mutable_buffer> buf;
m::event::id::buf event_id;
m::room::id::buf room_id;
/// Internal
request(const fetch::opts &);
request(request &&) = delete;
request(const request &) = delete;
request &operator=(request &&) = delete;
request &operator=(const request &) = delete;
};
inline
ircd::m::fetch::result::operator
json::array()
const
{
return content;
}
inline
ircd::m::fetch::result::operator
json::object()
const
{
return content;
}

View file

@ -13770,9 +13770,13 @@ console_cmd__fetch(opt &out, const string_view &line)
m::event::id{param.at("event_id")}
};
m::fetch::opts opts;
opts.op = m::fetch::op::event;
opts.room_id = room_id;
opts.event_id = event_id;
auto future
{
m::fetch::start(room_id, event_id)
m::fetch::start(opts)
};
const auto result
@ -13787,7 +13791,7 @@ console_cmd__fetch(opt &out, const string_view &line)
<< std::endl
;
out << m::event{result}
out << json::object{result}
<< std::endl
;

View file

@ -11,27 +11,29 @@
namespace ircd::m::fetch
{
static bool operator<(const request &a, const request &b) noexcept;
static bool operator<(const request &a, const string_view &b) noexcept;
static bool operator<(const string_view &a, const request &b) noexcept;
static bool operator<(const request &a, const opts &b) noexcept;
static bool operator<(const opts &a, const request &b) noexcept;
extern ctx::dock dock;
extern ctx::mutex requests_mutex;
extern std::set<request, std::less<>> requests;
extern std::multimap<room::id, request *> rooms;
extern ctx::context request_context;
extern conf::item<size_t> requests_max;
extern conf::item<seconds> timeout;
extern conf::item<bool> enable;
extern log::log log;
static bool timedout(const request &, const time_t &now);
static bool timedout(const request &, const system_point &now);
static void _check_event(const request &, const m::event &);
static void check_response__event(const request &, const json::object &);
static void check_response(const request &, const json::object &);
static string_view select_origin(request &, const string_view &);
static string_view select_random_origin(request &);
static void finish(request &);
static void retry(request &);
static bool start(request &, m::v1::event::opts &);
static bool start(request &, const string_view &remote);
static bool start(request &);
static void handle_result(request &);
static bool handle(request &);
static void request_handle(const decltype(requests)::iterator &);
@ -39,13 +41,6 @@ namespace ircd::m::fetch
static size_t request_cleanup();
static void request_worker();
template<class... args>
static ctx::future<result>
submit(const event::id &,
const room::id &,
const size_t &bufsz = 8_KiB,
args&&...);
static void init();
static void fini();
}
@ -89,9 +84,6 @@ ircd::m::fetch::request_context
"m.fetch.req", 512_KiB, &request_worker, context::POST
};
decltype(ircd::m::fetch::rooms)
ircd::m::fetch::rooms;
decltype(ircd::m::fetch::requests)
ircd::m::fetch::requests;
@ -124,71 +116,43 @@ ircd::m::fetch::fini()
// <ircd/m/fetch.h>
//
ircd::ctx::future<ircd::m::fetch::result>
IRCD_MODULE_EXPORT
ircd::m::fetch::start(const m::room::id &room_id,
const m::event::id &event_id)
static void
wait_for_run()
{
ircd::run::changed::dock.wait([]
using namespace ircd::run;
using namespace ircd;
changed::dock.wait([]
{
return run::level == run::level::RUN ||
run::level == run::level::QUIT;
return level == level::RUN || level == level::QUIT;
});
if(unlikely(run::level != run::level::RUN))
if(unlikely(level != level::RUN))
throw m::UNAVAILABLE
{
"Cannot fetch %s in %s in runlevel '%s'",
string_view{event_id},
string_view{room_id},
reflect(run::level)
"Cannot fetch in runlevel '%s'",
reflect(level)
};
}
ircd::ctx::future<ircd::m::fetch::result>
IRCD_MODULE_EXPORT
ircd::m::fetch::start(opts opts)
{
assert(opts.room_id && opts.event_id);
// in case requests are started before runlevel RUN they are stalled here
wait_for_run();
// in case the fetch unit has reached capacity the context will yield.
dock.wait([]
{
return count() < size_t(requests_max);
});
return submit(event_id, room_id);
}
size_t
IRCD_MODULE_EXPORT
ircd::m::fetch::count()
{
return requests.size();
}
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::exists(const m::event::id &event_id)
{
return requests.count(string_view{event_id});
}
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::for_each(const std::function<bool (request &)> &closure)
{
for(auto &request : requests)
if(!closure(const_cast<fetch::request &>(request)))
return false;
return true;
}
//
// internal
//
template<class... args>
ircd::ctx::future<ircd::m::fetch::result>
ircd::m::fetch::submit(const m::event::id &event_id,
const m::room::id &room_id,
const size_t &bufsz,
args&&... a)
{
assert(room_id && event_id);
// the state in this unit is primarily driven by the fetch::context; in
// some cases it may want to prevent others from changing this state until
// it's ready to accept.
std::unique_lock lock
{
requests_mutex
@ -199,14 +163,18 @@ ircd::m::fetch::submit(const m::event::id &event_id,
fetch::dock
};
auto it(requests.lower_bound(string_view(event_id)));
if(it != end(requests) && it->event_id == event_id)
auto it
{
assert(it->room_id == room_id);
requests.lower_bound(opts)
};
if(it != end(requests) && it->opts.event_id == opts.event_id)
{
assert(it->opts.room_id == opts.room_id);
return ctx::future<result>{}; //TODO: shared_future.
}
it = requests.emplace_hint(it, room_id, event_id, bufsz, std::forward<args>(a)...);
it = requests.emplace_hint(it, opts);
auto &request
{
const_cast<fetch::request &>(*it)
@ -221,6 +189,48 @@ ircd::m::fetch::submit(const m::event::id &event_id,
return ret;
}
size_t
IRCD_MODULE_EXPORT
ircd::m::fetch::count()
{
return requests.size();
}
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::exists(const opts &opts)
{
return requests.count(opts);
}
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::for_each(const std::function<bool (request &)> &closure)
{
for(auto &request : requests)
if(!closure(const_cast<fetch::request &>(request)))
return false;
return true;
}
ircd::string_view
IRCD_MODULE_EXPORT
ircd::m::fetch::reflect(const op &op)
{
switch(op)
{
case op::noop: return "noop";
case op::event: return "event";
}
return "?????";
}
//
// internal
//
//
// request worker
//
@ -236,7 +246,7 @@ try
return std::any_of(begin(requests), end(requests), []
(const auto &request)
{
return request.started || request.finished;
return !request.finished;
});
});
@ -262,36 +272,55 @@ ircd::m::fetch::request_handle()
requests_mutex
};
if(requests.empty())
return;
const scope_notify notify
{
fetch::dock
};
const unwind cleanup
{
request_cleanup
};
static const auto dereferencer{[]
(auto &it) -> server::request &
{
auto &request
{
const_cast<fetch::request &>(*it)
};
assert(request.future);
return *request.future;
}};
auto next
{
ctx::when_any(requests.begin(), requests.end())
ctx::when_any(requests.begin(), requests.end(), dereferencer)
};
bool timeout{true};
{
const unlock_guard unlock{lock};
const unlock_guard unlock
{
lock
};
timeout = !next.wait(seconds(timeout), std::nothrow);
};
if(timeout)
{
request_cleanup();
if(unlikely(timeout))
return;
}
const auto it
{
next.get()
};
if(it == end(requests))
if(unlikely(it == end(requests)))
return;
request_handle(it);
dock.notify_all();
}
void
@ -302,8 +331,9 @@ ircd::m::fetch::request_handle(const decltype(requests)::iterator &it)
const_cast<fetch::request &>(*it)
};
if(!request.finished && !handle(request))
return;
if(!request.finished)
if(!handle(request))
return;
requests.erase(it);
}
@ -311,8 +341,12 @@ ircd::m::fetch::request_handle(const decltype(requests)::iterator &it)
size_t
ircd::m::fetch::request_cleanup()
{
const auto now
{
ircd::now<system_point>()
};
size_t ret(0);
const auto now(ircd::time());
for(auto it(begin(requests)); it != end(requests); ++it)
{
auto &request
@ -330,14 +364,14 @@ ircd::m::fetch::request_cleanup()
retry(request);
}
for(auto it(begin(requests)); it != end(requests); )
auto it(begin(requests)); while(it != end(requests))
{
auto &request
{
const_cast<fetch::request &>(*it)
};
if(request.finished)
if(!!request.finished)
{
it = requests.erase(it);
++ret;
@ -355,26 +389,19 @@ ircd::m::fetch::request_cleanup()
bool
ircd::m::fetch::start(request &request) try
{
m::v1::event::opts opts;
opts.dynamic = true;
assert(request.finished == 0);
assert(!request.finished);
if(!request.started)
request.started = ircd::time();
request.started = ircd::now<system_point>();
if(!request.origin)
{
select_random_origin(request);
opts.remote = request.origin;
}
while(request.origin)
{
if(start(request, opts))
if(start(request, request.origin))
return true;
select_random_origin(request);
opts.remote = request.origin;
}
assert(!request.finished);
@ -391,31 +418,41 @@ catch(...)
bool
ircd::m::fetch::start(request &request,
m::v1::event::opts &opts)
const string_view &remote)
try
{
assert(request.finished == 0);
assert(!request.finished);
request.last = ircd::now<system_point>();
if(!request.started)
request.started = ircd::time();
request.started = request.last;
request.last = ircd::time(); try
switch(request.opts.op)
{
*static_cast<m::v1::event *>(&request) =
case op::event:
{
request.event_id, request.buf, std::move(opts)
};
}
catch(...)
{
server::cancel(request);
throw;
v1::event::opts opts;
opts.remote = remote;
opts.dynamic = true;
request.future = std::make_unique<v1::event>
(
request.opts.event_id,
request.buf,
std::move(opts)
);
break;
}
case op::noop:
break;
}
log::debug
{
log, "Starting request for %s in %s from '%s'",
string_view{request.event_id},
string_view{request.room_id},
log, "Starting %s request for %s in %s from '%s'",
reflect(request.opts.op),
string_view{request.opts.event_id},
string_view{request.opts.room_id},
string_view{request.origin},
};
@ -427,9 +464,10 @@ catch(const http::error &e)
log::logf
{
log, run::level == run::level::QUIT? log::DERROR: log::ERROR,
"Starting request for %s in %s to '%s' :%s %s",
string_view{request.event_id},
string_view{request.room_id},
"Starting %s request for %s in %s to '%s' :%s %s",
reflect(request.opts.op),
string_view{request.opts.event_id},
string_view{request.opts.room_id},
string_view{request.origin},
e.what(),
e.content,
@ -442,9 +480,10 @@ catch(const std::exception &e)
log::logf
{
log, run::level == run::level::QUIT? log::DERROR: log::ERROR,
"Starting request for %s in %s to '%s' :%s",
string_view{request.event_id},
string_view{request.room_id},
"Starting %s request for %s in %s to '%s' :%s",
reflect(request.opts.op),
string_view{request.opts.event_id},
string_view{request.opts.room_id},
string_view{request.origin},
e.what()
};
@ -457,7 +496,7 @@ ircd::m::fetch::select_random_origin(request &request)
{
const m::room::origins origins
{
request.room_id
request.opts.room_id
};
// copies randomly selected origin into the attempted set.
@ -491,8 +530,8 @@ ircd::m::fetch::select_random_origin(request &request)
throw m::NOT_FOUND
{
"Cannot find any server to fetch %s in %s",
string_view{request.event_id},
string_view{request.room_id},
string_view{request.opts.event_id},
string_view{request.opts.room_id},
};
return request.origin;
@ -514,52 +553,59 @@ ircd::m::fetch::select_origin(request &request,
bool
ircd::m::fetch::handle(request &request)
{
request.wait(); try
{
const auto code
{
request.get()
};
const json::object response
{
request
};
check_response(request, response);
char pbuf[48];
log::debug
{
log, "Received %u %s good %s in %s from '%s' %s",
uint(code),
status(code),
string_view{request.event_id},
string_view{request.room_id},
string_view{request.origin},
pretty(pbuf, iec(size(string_view(response)))),
};
}
catch(...)
{
request.eptr = std::current_exception();
log::derror
{
log, "Erroneous remote for %s in %s from '%s' :%s",
string_view{request.event_id},
string_view{request.room_id},
string_view{request.origin},
what(request.eptr),
};
}
if(likely(request.future))
handle_result(request);
if(!request.eptr)
finish(request);
else
retry(request);
return request.finished;
return !!request.finished;
}
void
ircd::m::fetch::handle_result(request &request)
try
{
const auto code
{
request.future->get()
};
const string_view &content
{
request.future->in.content
};
check_response(request, content);
char pbuf[48];
log::debug
{
log, "Received %u %s good %s %s in %s from '%s' %s",
uint(code),
status(code),
reflect(request.opts.op),
string_view{request.opts.event_id},
string_view{request.opts.room_id},
string_view{request.origin},
pretty(pbuf, iec(size(content))),
};
}
catch(...)
{
request.eptr = std::current_exception();
log::derror
{
log, "Erroneous remote for %s %s in %s from '%s' :%s",
reflect(request.opts.op),
string_view{request.opts.event_id},
string_view{request.opts.room_id},
string_view{request.origin},
what(request.eptr),
};
}
void
@ -567,9 +613,8 @@ ircd::m::fetch::retry(request &request)
try
{
assert(!request.finished);
assert(request.started && request.last);
assert(!!request.started && !!request.last);
server::cancel(request);
request.eptr = std::exception_ptr{};
request.origin = {};
start(request);
@ -583,17 +628,17 @@ catch(...)
void
ircd::m::fetch::finish(request &request)
{
request.finished = ircd::time();
request.finished = ircd::now<system_point>();
#if 0
log::logf
{
log, request.eptr? log::DERROR: log::DEBUG,
"%s in %s started:%ld finished:%d attempted:%zu abandon:%b %S%s",
string_view{request.event_id},
string_view{request.room_id},
request.started,
request.finished,
string_view{request.opts.event_id},
string_view{request.opts.room_id},
seconds(tse(request.started)).count(),
seconds(tse(request.finished)).count(),
request.attempted.size(),
!request.promise,
request.eptr? " :" : "",
@ -610,7 +655,10 @@ ircd::m::fetch::finish(request &request)
return;
}
request.promise.set_value(result{request});
result res;
res.buf = std::move(request.future->in.dynamic);
res.content = res.buf;
request.promise.set_value(std::move(res));
}
namespace ircd::m::fetch
@ -658,24 +706,56 @@ void
ircd::m::fetch::check_response(const request &request,
const json::object &response)
{
const m::event event
switch(request.opts.op)
{
response, request.event_id
case op::event:
return check_response__event(request, response);
case op::noop:
return;
}
ircd::panic
{
"Unchecked response; fetch op:%u",
uint(request.opts.op),
};
}
void
ircd::m::fetch::check_response__event(const request &request,
const json::object &response)
{
const json::array &pdus
{
response.at("pdus")
};
const m::event event
{
pdus.at(0), request.opts.event_id
};
_check_event(request, event);
}
void
ircd::m::fetch::_check_event(const request &request,
const m::event &event)
{
if(check_event_id && !m::check_id(event))
{
event::id::buf buf;
const m::event &claim
{
buf, response
buf, event.source
};
throw ircd::error
{
"event::id claim:%s != sought:%s",
string_view{claim.event_id},
string_view{request.event_id},
string_view{request.opts.event_id},
};
}
@ -740,10 +820,10 @@ ircd::m::fetch::check_response(const request &request,
bool
ircd::m::fetch::timedout(const request &request,
const time_t &now)
const system_point &now)
{
assert(request.started && request.finished >= 0 && request.last != 0);
return request.last + seconds(timeout).count() < now;
assert(!!request.started && !request.finished && !!request.last);
return request.last + seconds(timeout) < now;
}
bool
@ -751,50 +831,53 @@ ircd::m::fetch::operator<(const request &a,
const request &b)
noexcept
{
return a.event_id < b.event_id;
return uint(a.opts.op) < uint(b.opts.op) &&
a.opts.event_id < b.opts.event_id;
a.opts.room_id < b.opts.room_id;
}
bool
ircd::m::fetch::operator<(const request &a,
const string_view &b)
const opts &b)
noexcept
{
return a.event_id < b;
return uint(a.opts.op) < uint(b.op) &&
a.opts.event_id < b.event_id;
a.opts.room_id < b.room_id;;
}
bool
ircd::m::fetch::operator<(const string_view &a,
ircd::m::fetch::operator<(const opts &a,
const request &b)
noexcept
{
return a < b.event_id;
return uint(a.op) < uint(b.opts.op) &&
a.event_id < b.opts.event_id &&
a.room_id < b.opts.room_id;;
}
//
// request::request
//
ircd::m::fetch::request::request(const m::room::id &room_id,
const m::event::id &event_id,
const size_t &bufsz)
:room_id{room_id}
,event_id{event_id}
,buf{bufsz}
ircd::m::fetch::request::request(const fetch::opts &opts)
:opts
{
}
//
// result::result
//
ircd::m::fetch::result::result(request &request)
:m::event
{
static_cast<json::object>(request)
opts
}
,buf
{
std::move(request.in.dynamic)
opts.bufsz?: 16_KiB
}
,event_id
{
opts.event_id
}
,room_id
{
opts.room_id
}
{
this->opts.event_id = this->event_id;
this->opts.room_id = this->room_id;
}

View file

@ -281,9 +281,13 @@ ircd::m::init::backfill::handle_head(const room::id &room_id,
const event::id &event_id)
try
{
fetch::opts opts;
opts.op = fetch::op::event;
opts.room_id = room_id;
opts.event_id = event_id;
auto future
{
fetch::start(room_id, event_id)
fetch::start(opts)
};
m::fetch::result result
@ -291,12 +295,21 @@ try
future.get()
};
m::vm::opts opts;
opts.infolog_accept = true;
result.event_id = event_id;
const json::array &pdu
{
json::object(result).at("pdus")
};
const m::event event
{
pdu.at(0), event_id
};
m::vm::opts vmopts;
vmopts.infolog_accept = true;
m::vm::eval eval
{
result, opts
event, vmopts
};
return true;