0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-26 08:42:34 +01:00

modules/s_feds: Execute vector of operations concurrently.

This commit is contained in:
Jason Volk 2019-04-18 18:24:12 -07:00
parent 9907f7f477
commit ebe0f10e28

View file

@ -19,25 +19,20 @@ IRCD_MODULE
namespace ircd::m::feds
{
struct request_base;
template<class T> struct request;
using request_list = std::list<std::unique_ptr<request_base>>;
template<class T> using create_closure = std::function<T (request<T> &, const string_view &origin)>;
template<class T>
struct request;
template<class T> static request_list creator(const opts &, const create_closure<T> &);
static bool handler(request_list &, const milliseconds &, const closure &);
template<class T>
static bool
handler(const opts &, const closure &, std::list<std::unique_ptr<request_base>> &);
template<class T>
static std::list<std::unique_ptr<request_base>>
creator(const opts &, const std::function<T (request<T> &, const string_view &origin)> &);
static bool head(const opts &, const closure &);
static bool auth(const opts &, const closure &);
static bool event(const opts &, const closure &);
static bool state(const opts &, const closure &);
static bool backfill(const opts &, const closure &);
static bool version(const opts &, const closure &);
static bool keys(const opts &, const closure &);
static request_list head(const opts &, const closure &);
static request_list auth(const opts &, const closure &);
static request_list event(const opts &, const closure &);
static request_list state(const opts &, const closure &);
static request_list backfill(const opts &, const closure &);
static request_list version(const opts &, const closure &);
static request_list keys(const opts &, const closure &);
bool execute(const vector_view<const opts> &opts, const closure &closure);
}
@ -50,6 +45,7 @@ namespace ircd::m::feds
struct ircd::m::feds::request_base
{
const feds::opts *opts {nullptr};
char origin[256];
request_base(const feds::opts &opts)
:opts{&opts}
@ -73,7 +69,6 @@ struct ircd::m::feds::request
:request_base
,T
{
char origin[256];
char buf[8_KiB];
request(const feds::opts &opts, const std::function<T (request &)> &closure)
@ -107,22 +102,50 @@ IRCD_MODULE_EXPORT
ircd::m::feds::execute(const vector_view<const opts> &optsv,
const closure &closure)
{
request_list list;
for(const auto &opts : optsv) switch(opts.op)
{
case op::head: return head(opts, closure);
case op::auth: return auth(opts, closure);
case op::event: return event(opts, closure);
case op::state: return state(opts, closure);
case op::backfill: return backfill(opts, closure);
case op::version: return version(opts, closure);
case op::keys: return keys(opts, closure);
case op::noop: break;
case op::head:
list.splice(list.end(), head(opts, closure));
continue;
case op::auth:
list.splice(list.end(), auth(opts, closure));
continue;
case op::event:
list.splice(list.end(), event(opts, closure));
continue;
case op::state:
list.splice(list.end(), state(opts, closure));
continue;
case op::backfill:
list.splice(list.end(), backfill(opts, closure));
continue;
case op::version:
list.splice(list.end(), version(opts, closure));
continue;
case op::keys:
list.splice(list.end(), keys(opts, closure));
continue;
case op::noop:
continue;
}
return true;
milliseconds timeout {0};
for(const auto &opts : optsv)
if(opts.timeout > timeout)
timeout = opts.timeout;
return handler(list, timeout, closure);
}
bool
ircd::m::feds::request_list
ircd::m::feds::keys(const opts &opts,
const closure &closure)
{
@ -147,15 +170,10 @@ ircd::m::feds::keys(const opts &opts,
};
}};
auto requests
{
creator<m::v1::key::query>(opts, make_request)
};
return handler<m::v1::key::query>(opts, closure, requests);
return creator<m::v1::key::query>(opts, make_request);
}
bool
ircd::m::feds::request_list
ircd::m::feds::version(const opts &opts,
const closure &closure)
{
@ -175,15 +193,10 @@ ircd::m::feds::version(const opts &opts,
};
}};
auto requests
{
creator<m::v1::version>(opts, make_request)
};
return handler<m::v1::version>(opts, closure, requests);
return creator<m::v1::version>(opts, make_request);
}
bool
ircd::m::feds::request_list
ircd::m::feds::backfill(const opts &opts,
const closure &closure)
{
@ -205,15 +218,10 @@ ircd::m::feds::backfill(const opts &opts,
};
}};
auto requests
{
creator<m::v1::backfill>(opts, make_request)
};
return handler<m::v1::backfill>(opts, closure, requests);
return creator<m::v1::backfill>(opts, make_request);
}
bool
ircd::m::feds::request_list
ircd::m::feds::state(const opts &opts,
const closure &closure)
{
@ -235,15 +243,10 @@ ircd::m::feds::state(const opts &opts,
};
}};
auto requests
{
creator<m::v1::state>(opts, make_request)
};
return handler<m::v1::state>(opts, closure, requests);
return creator<m::v1::state>(opts, make_request);
}
bool
ircd::m::feds::request_list
ircd::m::feds::event(const opts &opts,
const closure &closure)
{
@ -263,15 +266,10 @@ ircd::m::feds::event(const opts &opts,
};
}};
auto requests
{
creator<m::v1::event>(opts, make_request)
};
return handler<m::v1::event>(opts, closure, requests);
return creator<m::v1::event>(opts, make_request);
}
bool
ircd::m::feds::request_list
ircd::m::feds::auth(const opts &opts,
const closure &closure)
{
@ -291,15 +289,10 @@ ircd::m::feds::auth(const opts &opts,
};
}};
auto requests
{
creator<m::v1::event_auth>(opts, make_request)
};
return handler<m::v1::event_auth>(opts, closure, requests);
return creator<m::v1::event_auth>(opts, make_request);
}
bool
ircd::m::feds::request_list
ircd::m::feds::head(const opts &opts,
const closure &closure)
{
@ -318,12 +311,7 @@ ircd::m::feds::head(const opts &opts,
};
}};
auto requests
{
creator<m::v1::make_join>(opts, make_request)
};
return handler<m::v1::make_join>(opts, closure, requests);
return creator<m::v1::make_join>(opts, make_request);
}
///////////////////////////////////////////////////////////////////////////////
@ -331,8 +319,64 @@ ircd::m::feds::head(const opts &opts,
// (internal)
//
bool
ircd::m::feds::handler(request_list &reqs,
const milliseconds &timeout,
const closure &closure)
{
const auto when
{
now<steady_point>() + timeout
};
while(!reqs.empty())
{
static const auto dereferencer{[]
(auto &iterator) -> server::request &
{
return dynamic_cast<server::request &>(**iterator);
}};
auto next
{
ctx::when_any(begin(reqs), end(reqs), dereferencer)
};
if(!next.wait_until(when, std::nothrow))
break;
const auto it
{
next.get()
};
assert(it != end(reqs));
request_base &req(**it);
server::request &sreq(dynamic_cast<server::request &>(req)); try
{
const auto code{sreq.get()};
const json::array &array{sreq.in.content};
const json::object &object{sreq.in.content};
if(!closure({req.opts, req.origin, {}, object, array}))
return false;
}
catch(const std::exception &)
{
const ctx::exception_handler eptr;
const std::exception_ptr &eptr_(eptr);
if(!closure({req.opts, req.origin, eptr_}))
return false;
}
reqs.erase(it);
}
return true;
}
template<class T>
std::list<std::unique_ptr<m::feds::request_base>>
ircd::m::feds::request_list
ircd::m::feds::creator(const opts &opts,
const std::function<T (request<T> &, const string_view &origin)> &closure)
{
@ -342,7 +386,7 @@ ircd::m::feds::creator(const opts &opts,
opts.room_id
};
std::list<std::unique_ptr<request_base>> ret;
request_list ret;
origins.for_each([&opts, &ret, &closure]
(const string_view &origin)
{
@ -362,59 +406,3 @@ ircd::m::feds::creator(const opts &opts,
return ret;
}
template<class T>
bool
ircd::m::feds::handler(const opts &opts,
const closure &closure,
std::list<std::unique_ptr<request_base>> &reqs)
{
const auto when
{
now<steady_point>() + opts.timeout
};
while(!reqs.empty())
{
static const auto dereferencer{[]
(auto &iterator) -> T &
{
return dynamic_cast<T &>(**iterator);
}};
auto next
{
ctx::when_any(begin(reqs), end(reqs), dereferencer)
};
if(!next.wait_until(when, std::nothrow))
break;
const auto it
{
next.get()
};
assert(it != end(reqs));
auto &req(dynamic_cast<feds::request<T> &>(**it)); try
{
const auto code{req.get()};
const json::array &array{req.in.content};
const json::object &object{req.in.content};
if(!closure({&opts, req.origin, {}, object, array}))
return false;
}
catch(const std::exception &)
{
const ctx::exception_handler eptr;
const std::exception_ptr &eptr_(eptr);
if(!closure({&opts, req.origin, eptr_}))
return false;
}
reqs.erase(it);
}
return true;
}