0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-05-29 08:13:46 +02:00

ircd:Ⓜ️:fetch: Reorient interface to expose fetch::request.

This commit is contained in:
Jason Volk 2019-04-12 03:13:40 -07:00
parent eace5e309c
commit c14d67102a
5 changed files with 310 additions and 223 deletions

View file

@ -19,8 +19,35 @@ namespace ircd::m::fetch
{
struct request;
static bool for_each(const std::function<bool (request &)> &);
void state_ids(const room &, const net::hostport &);
void state_ids(const room &);
extern log::log log;
}
/// Fetch entity state. This is not meant for construction by users of this
/// interface.
struct ircd::m::fetch::request
:m::v1::event
{
using is_transparent = void;
m::room::id::buf room_id;
m::event::id::buf event_id;
unique_buffer<mutable_buffer> buf;
std::set<std::string, std::less<>> attempted;
string_view origin;
time_t started {0};
time_t last {0};
time_t finished {0};
std::exception_ptr eptr;
request(const m::room::id &, const m::event::id &, const size_t &bufsz = 8_KiB);
request(request &&) = delete;
request(const request &) = delete;
static void start(const m::room::id &, const m::event::id &);
static bool prefetch(const m::room::id &, const m::event::id &);
};

View file

@ -467,6 +467,51 @@ ircd::m::fetch::state_ids(const room &r,
call(r, hp);
}
bool
ircd::m::fetch::for_each(const std::function<bool (request &)> &closure)
{
using prototype = bool (const std::function<bool (request &)> &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::for_each"
};
return call(closure);
}
//
// request
//
bool
ircd::m::fetch::request::prefetch(const m::room::id &room_id,
const m::event::id &event_id)
{
using prototype = bool (const m::room::id &, const m::event::id &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::request::prefetch"
};
return call(room_id, event_id);
}
void
ircd::m::fetch::request::start(const m::room::id &room_id,
const m::event::id &event_id)
{
using prototype = void (const m::room::id &, const m::event::id &);
static mods::import<prototype> call
{
"s_fetch", "ircd::m::fetch::request::start"
};
return call(room_id, event_id);
}
///////////////////////////////////////////////////////////////////////////////
//
// m/sync.h

View file

@ -12257,32 +12257,57 @@ console_cmd__mc__register__available(opt &out, const string_view &line)
// fetch
//
bool
console_cmd__fetch__list(opt &out, const string_view &line)
{
m::fetch::for_each([&out]
(const m::fetch::request &request)
{
out
<< std::left << std::setw(48) << request.event_id << " "
<< std::left << std::setw(32) << request.room_id << " "
<< std::left << std::setw(32) << trunc(request.origin, 32) << " "
<< std::left << "S:" << request.started << " "
<< std::left << "A:" << request.attempted.size() << " "
<< std::left << "E:" << bool(request.eptr) << " "
<< std::left << "F:" << request.finished << " "
;
return true;
});
return true;
}
bool
console_cmd__fetch(opt &out, const string_view &line)
{
const params param{line, " ",
{
"room_id", "remote"
"room_id", "event_id"
}};
if(!param.count())
return console_cmd__fetch__list(out, line);
const auto room_id
{
m::room_id(param.at("room_id"))
};
if(!param["remote"])
if(!param["event_id"])
{
m::fetch::state_ids(room_id);
out << "done" << std::endl;
return true;
}
const net::hostport hostport
const m::event::id event_id
{
param["remote"]
param.at("event_id")
};
m::fetch::state_ids(room_id, hostport);
out << "done" << std::endl;
m::fetch::request::start(room_id, event_id);
out << "in work..." << std::endl;
return true;
}

View file

@ -275,6 +275,17 @@ ircd::m::fetch::state_ids(const room &room,
start(event_id, room.room_id);
}
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::for_each(const std::function<bool (request &)> &closure)
{
for(auto &request : requests)
if(!closure(const_cast<fetch::request &>(request)))
return false;
return true;
}
//
// auth chain fetch
//
@ -406,7 +417,7 @@ ircd::m::fetch::request_handle()
{
auto &request(const_cast<fetch::request &>(request_));
if(!request.finished)
request.retry();
retry(request);
}
return;
@ -435,7 +446,7 @@ try
if(request.finished)
return;
if(!request.handle())
if(!handle(request))
return;
complete.emplace_back(it);
@ -510,8 +521,7 @@ try
const unwind free{[&request]
{
request._buf = {};
request.buf = request._buf;
request.buf = {};
}};
if(request.eptr)
@ -553,6 +563,182 @@ catch(const std::exception &e)
// fetch::request
//
bool
IRCD_MODULE_EXPORT
ircd::m::fetch::request::prefetch(const m::room::id &room_id,
const m::event::id &event_id)
{
if(m::exists(event_id))
return false;
start(room_id, event_id);
return true;
}
void
IRCD_MODULE_EXPORT
ircd::m::fetch::request::start(const m::room::id &room_id,
const m::event::id &event_id)
{
fetch::start(event_id, room_id);
}
void
ircd::m::fetch::start(request &request)
{
m::v1::event::opts opts;
opts.dynamic = true;
opts.remote = request.origin?: select_random_origin(request);
start(request, opts);
}
void
ircd::m::fetch::start(request &request,
m::v1::event::opts &opts)
{
assert(request.finished == 0);
if(!request.started)
request.started = ircd::time();
request.last = ircd::time();
static_cast<m::v1::event &>(request) =
{
request.event_id, request.buf, std::move(opts)
};
log::debug
{
log, "Started request for %s in %s from '%s'",
string_view{request.event_id},
string_view{request.room_id},
string_view{request.origin},
};
dock.notify_all();
}
ircd::string_view
ircd::m::fetch::select_random_origin(request &request)
{
const m::room::origins origins
{
request.room_id
};
// copies randomly selected origin into the attempted set.
const auto closure{[&request]
(const string_view &origin)
{
select_origin(request, origin);
}};
// Tests if origin is potentially viable
const auto proffer{[&request]
(const string_view &origin)
{
// Don't want to request from myself.
if(my_host(origin))
return false;
// Don't want to use a peer we already tried and failed with.
if(request.attempted.count(origin))
return false;
// Don't want to use a peer marked with an error by ircd::server
if(ircd::server::errmsg(origin))
return false;
return true;
}};
if(!origins.random(closure, proffer))
throw m::NOT_FOUND
{
"Cannot find any server to fetch %s in %s",
string_view{request.event_id},
string_view{request.room_id},
};
return request.origin;
}
ircd::string_view
ircd::m::fetch::select_origin(request &request,
const string_view &origin)
{
const auto iit
{
request.attempted.emplace(std::string{origin})
};
request.origin = *iit.first;
return request.origin;
}
bool
ircd::m::fetch::handle(request &request)
{
request.wait(); try
{
const auto code
{
request.get()
};
log::debug
{
log, "%u %s for %s in %s from '%s'",
uint(code),
status(code),
string_view{request.event_id},
string_view{request.room_id},
string_view{request.origin}
};
}
catch(...)
{
request.eptr = std::current_exception();
log::derror
{
log, "Failure for %s in %s from '%s' :%s",
string_view{request.event_id},
string_view{request.room_id},
string_view{request.origin},
what(request.eptr),
};
}
if(!request.eptr)
finish(request);
else
retry(request);
return request.finished;
}
void
ircd::m::fetch::retry(request &request)
try
{
server::cancel(request);
request.eptr = std::exception_ptr{};
request.origin = {};
start(request);
}
catch(...)
{
request.eptr = std::current_exception();
finish(request);
}
void
ircd::m::fetch::finish(request &request)
{
request.finished = ircd::time();
dock.notify_all();
}
bool
ircd::m::fetch::operator<(const request &a,
const request &b)
@ -578,184 +764,14 @@ noexcept
}
//
// fetch::request::request
// request::request
//
ircd::m::fetch::request::request(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer &buf)
const size_t &bufsz)
:room_id{room_id}
,event_id{event_id}
,_buf
,buf{bufsz}
{
!buf?
unique_buffer<mutable_buffer>{8_KiB}:
unique_buffer<mutable_buffer>{}
}
,buf
{
empty(buf)? _buf: buf
}
{
//assert(size(this->buf) >= 64_KiB + 8_KiB + 8_KiB);
assert(size(this->buf) >= 8_KiB);
}
void
ircd::m::fetch::request::start()
{
m::v1::event::opts opts;
opts.dynamic = true;
opts.remote = origin?: select_random_origin();
start(opts);
}
void
ircd::m::fetch::request::start(m::v1::event::opts &opts)
{
assert(finished == 0);
if(!started)
started = ircd::time();
last = ircd::time();
static_cast<m::v1::event &>(*this) =
{
this->event_id, this->buf, std::move(opts)
};
dock.notify_all();
log::debug
{
log, "Started request for %s in %s from '%s'",
string_view{event_id},
string_view{room_id},
string_view{origin},
};
}
ircd::string_view
ircd::m::fetch::request::select_random_origin()
{
const m::room::origins origins
{
room_id
};
// copies randomly selected origin into the attempted set.
const auto closure{[this]
(const string_view &origin)
{
this->select_origin(origin);
}};
// Tests if origin is potentially viable
const auto proffer{[this]
(const string_view &origin)
{
// Don't want to request from myself.
if(my_host(origin))
return false;
// Don't want to use a peer we already tried and failed with.
if(attempted.count(origin))
return false;
// Don't want to use a peer marked with an error by ircd::server
if(ircd::server::errmsg(origin))
return false;
return true;
}};
if(!origins.random(closure, proffer))
throw m::NOT_FOUND
{
"Cannot find any server to fetch %s in %s",
string_view{event_id},
string_view{room_id},
};
return this->origin;
}
ircd::string_view
ircd::m::fetch::request::select_origin(const string_view &origin)
{
const auto iit
{
attempted.emplace(std::string{origin})
};
this->origin = *iit.first;
return this->origin;
}
bool
ircd::m::fetch::request::handle()
{
auto &future
{
static_cast<m::v1::event &>(*this)
};
future.wait(); try
{
const auto code
{
future.get()
};
log::debug
{
log, "%u %s for %s in %s from '%s'",
uint(code),
status(code),
string_view{event_id},
string_view{room_id},
string_view{origin}
};
}
catch(...)
{
eptr = std::current_exception();
log::derror
{
log, "Failure for %s in %s from '%s' :%s",
string_view{event_id},
string_view{room_id},
string_view{origin},
what(eptr),
};
}
if(!eptr)
finish();
else
retry();
return finished;
}
void
ircd::m::fetch::request::retry()
try
{
server::cancel(*this);
eptr = std::exception_ptr{};
origin = {};
start();
}
catch(...)
{
eptr = std::current_exception();
finish();
}
void
ircd::m::fetch::request::finish()
{
finished = ircd::time();
dock.notify_all();
}

View file

@ -11,7 +11,7 @@
// Fetch unit state
namespace ircd::m::fetch
{
struct request;
struct request; // m/fetch.h
static bool operator<(const request &a, const request &b) noexcept;
static bool operator<(const request &a, const string_view &b) noexcept;
@ -19,14 +19,22 @@ namespace ircd::m::fetch
extern ctx::dock dock;
extern std::set<request, std::less<>> requests;
extern std::multimap<m::room::id, request *> rooms;
extern std::multimap<room::id, request *> rooms;
extern std::deque<decltype(requests)::iterator> complete;
extern ctx::context eval_context;
extern ctx::context request_context;
extern hookfn<vm::eval &> hook;
extern conf::item<bool> enable;
template<class... args> static void start(const m::event::id &, const m::room::id &, args&&...);
static string_view select_origin(request &, const string_view &);
static string_view select_random_origin(request &);
static void finish(request &);
static void retry(request &);
static void start(request &, m::v1::event::opts &);
static void start(request &);
static bool handle(request &);
template<class... args> static void start(const event::id &, const room::id &, const size_t &bufsz = 8_KiB, args&&...);
static void eval_handle(const decltype(requests)::iterator &);
static void eval_handle();
static void eval_worker();
@ -39,45 +47,11 @@ namespace ircd::m::fetch
static void fini();
}
/// Fetch entity state
struct ircd::m::fetch::request
:m::v1::event
{
using is_transparent = void;
m::room::id::buf room_id;
m::event::id::buf event_id;
unique_buffer<mutable_buffer> _buf;
mutable_buffer buf;
std::set<std::string, std::less<>> attempted;
string_view origin;
time_t started {0};
time_t last {0};
time_t finished {0};
std::exception_ptr eptr;
void finish();
void retry();
bool handle();
string_view select_origin(const string_view &);
string_view select_random_origin();
void start(m::v1::event::opts &);
void start();
request(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer & = {});
request() = default;
request(request &&) = delete;
request(const request &) = delete;
};
template<class... args>
void
ircd::m::fetch::start(const m::event::id &event_id,
const m::room::id &room_id,
const size_t &bufsz,
args&&... a)
{
auto it
@ -87,8 +61,8 @@ ircd::m::fetch::start(const m::event::id &event_id,
if(it == end(requests) || it->event_id != event_id) try
{
it = requests.emplace_hint(it, room_id, event_id, std::forward<args>(a)...);
const_cast<request &>(*it).start();
it = requests.emplace_hint(it, room_id, event_id, bufsz, std::forward<args>(a)...);
start(const_cast<request &>(*it));
}
catch(const std::exception &e)
{