0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-17 23:40:57 +01:00

modules/federation/federation: Abstract the m::feds request process.

This commit is contained in:
Jason Volk 2019-04-12 00:09:32 -07:00
parent 4cd8a0c80d
commit 2ce53fef3c

View file

@ -18,7 +18,16 @@ IRCD_MODULE
namespace ircd::m::feds
{
template<class T> struct request;
template<class T>
struct request;
template<class T>
static bool
handler(const opts &, const closure &, std::list<request<T>> &);
template<class T>
static std::list<request<T>>
creator(const opts &, const std::function<T (request<T> &, const string_view &origin)> &);
}
template<class T>
@ -38,37 +47,27 @@ ircd::m::feds::request<T>::request(const std::function<T (request &)> &closure)
:T{closure(*this)}
{}
bool
IRCD_MODULE_EXPORT
ircd::m::feds::version(const opts &opts,
const closure &closure)
template<class T>
std::list<m::feds::request<T>>
ircd::m::feds::creator(const opts &opts,
const std::function<T (request<T> &, const string_view &origin)> &closure)
{
static const auto make_request{[]
(auto &request, const auto &origin)
assert(opts.room_id);
const m::room::origins origins
{
m::v1::version::opts opts;
opts.dynamic = false;
opts.remote = string_view
{
request.origin, strlcpy{request.origin, origin}
};
opts.room_id
};
return m::v1::version
{
request.buf, std::move(opts)
};
}};
std::list<m::feds::request<m::v1::version>> reqs;
m::room::origins{opts.room_id}.for_each([&reqs]
std::list<m::feds::request<T>> ret;
origins.for_each([&ret, &closure]
(const string_view &origin)
{
if(!server::errmsg(origin)) try
{
reqs.emplace_back([&origin]
ret.emplace_back([&closure, &origin]
(auto &request)
{
return make_request(request, origin);
return closure(request, origin);
});
}
catch(const std::exception &)
@ -77,6 +76,15 @@ ircd::m::feds::version(const opts &opts,
}
});
return ret;
}
template<class T>
bool
ircd::m::feds::handler(const opts &opts,
const closure &closure,
std::list<request<T>> &reqs)
{
const auto when
{
now<steady_point>() + opts.timeout
@ -100,8 +108,9 @@ ircd::m::feds::version(const opts &opts,
auto &req{*it}; try
{
const auto code{req.get()};
const json::object &response{req};
if(!closure({req.origin, {}, response}))
const json::array &array{req.in.content};
const json::object &object{req.in.content};
if(!closure({req.origin, {}, object, array}))
return false;
}
catch(const std::exception &)
@ -117,6 +126,35 @@ ircd::m::feds::version(const opts &opts,
return true;
}
bool
IRCD_MODULE_EXPORT
ircd::m::feds::version(const opts &opts,
const closure &closure)
{
static const auto make_request{[]
(auto &request, const auto &origin)
{
m::v1::version::opts opts;
opts.dynamic = false;
opts.remote = string_view
{
request.origin, strlcpy{request.origin, origin}
};
return m::v1::version
{
request.buf, std::move(opts)
};
}};
auto requests
{
creator<m::v1::version>(opts, make_request)
};
return handler(opts, closure, requests);
}
bool
IRCD_MODULE_EXPORT
ircd::m::feds::state(const opts &opts,
@ -140,62 +178,12 @@ ircd::m::feds::state(const opts &opts,
};
}};
std::list<m::feds::request<m::v1::state>> reqs;
m::room::origins{opts.room_id}.for_each([&reqs, &make_request]
(const string_view &origin)
auto requests
{
if(!server::errmsg(origin)) try
{
reqs.emplace_back([&origin, &make_request]
(auto &request)
{
return make_request(request, origin);
});
}
catch(const std::exception &)
{
return;
}
});
const auto when
{
now<steady_point>() + opts.timeout
creator<m::v1::state>(opts, make_request)
};
while(!reqs.empty())
{
auto next
{
ctx::when_any(begin(reqs), end(reqs))
};
if(!next.wait_until(when, std::nothrow))
break;
const auto it
{
next.get()
};
auto &req{*it}; try
{
const auto code{req.get()};
const json::object &response{req};
if(!closure({req.origin, {}, response}))
return false;
}
catch(const std::exception &)
{
const ctx::exception_handler eptr;
if(!closure({req.origin, eptr}))
return false;
}
reqs.erase(it);
}
return true;
return handler(opts, closure, requests);
}
bool
@ -218,64 +206,18 @@ ircd::m::feds::head(const opts &opts,
};
}};
std::list<m::feds::request<m::v1::make_join>> reqs;
m::room::origins{opts.room_id}.for_each([&reqs, &make_request]
(const string_view &origin)
auto requests
{
if(!server::errmsg(origin)) try
{
reqs.emplace_back([&origin, &make_request]
(auto &request)
{
return make_request(request, origin);
});
}
catch(const std::exception &)
{
return;
}
});
const auto when
{
now<steady_point>() + opts.timeout
creator<m::v1::make_join>(opts, make_request)
};
while(!reqs.empty())
{
auto next
{
ctx::when_any(begin(reqs), end(reqs))
};
if(!next.wait_until(when, std::nothrow))
break;
const auto it
{
next.get()
};
auto &req{*it}; try
{
const auto code{req.get()};
const json::object &response{req};
if(!closure({req.origin, {}, response}))
return false;
}
catch(const std::exception &)
{
const ctx::exception_handler eptr;
if(!closure({req.origin, eptr}))
return false;
}
reqs.erase(it);
}
return true;
return handler(opts, closure, requests);
}
//
// legacy
//
extern "C" void
feds__event(const m::event::id &event_id, std::ostream &out)
{