0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-16 08:58:20 +02:00

ircd:Ⓜ️:fed::well_known: Refactor to single-worker pattern.

This commit is contained in:
Jason Volk 2020-10-15 04:03:53 -07:00
parent 8d174ea80d
commit c11e8e85ed
2 changed files with 413 additions and 217 deletions

View file

@ -11,20 +11,30 @@
#pragma once #pragma once
#define HAVE_IRCD_M_FED_WELL_KNOWN_H #define HAVE_IRCD_M_FED_WELL_KNOWN_H
/// well-known for server name resolution.
///
/// This is a future-based interface. It performs local caching in the !dns
/// room as well as conducting network requests. The cache is queried on the
/// callers ircd::ctx and valid results cheaply return an already-satisfied
/// future. In the case of expired or missing results, a request structure is
/// allocated and managed internally and an unsatisfied future is returned;
/// an internal worker will resolve the promise asynchronously.
///
namespace ircd::m::fed::well_known namespace ircd::m::fed::well_known
{ {
struct request;
struct opts; struct opts;
// Primary query interface
ctx::future<string_view> ctx::future<string_view>
get(const mutable_buffer &out, const string_view &name, const opts &); get(const mutable_buffer &out, const string_view &name, const opts &);
extern conf::item<size_t> fetch_redirects;
extern conf::item<seconds> fetch_timeout;
extern conf::item<seconds> cache_max; extern conf::item<seconds> cache_max;
extern conf::item<seconds> cache_error; extern conf::item<seconds> cache_error;
extern conf::item<seconds> cache_default; extern conf::item<seconds> cache_default;
} }
/// Options used for resolving well-known.
struct ircd::m::fed::well_known::opts struct ircd::m::fed::well_known::opts
{ {
/// Whether to check the cache before making any request. /// Whether to check the cache before making any request.
@ -39,3 +49,35 @@ struct ircd::m::fed::well_known::opts
/// Whether to cache the result of any request. /// Whether to cache the result of any request.
bool cache_result {true}; bool cache_result {true};
}; };
/// Internal request structure; do not instantiate or manage manually. The
/// request::list allows traversal of all requests and observation of their
/// state.
struct ircd::m::fed::well_known::request
:instance_list<request>
{
static const string_view path, type;
static const server::request::opts sopts;
static conf::item<size_t> redirects_max;
static conf::item<seconds> timeout;
static ctx::mutex mutex;
static uint64_t id_ctr;
mutable_buffer out; // only safe if promise valid
string_view target;
well_known::opts opts;
uint64_t id {++id_ctr};
system_point expires;
ctx::promise<string_view> promise;
unique_mutable_buffer carry;
rfc3986::uri uri;
server::request req;
http::code code {0};
http::response::head head;
string_view location;
size_t redirects {0};
json::object response;
json::string m_server;
char tgtbuf[rfc3986::DOMAIN_BUFSIZE];
char buf[15_KiB];
};

View file

@ -10,32 +10,30 @@
namespace ircd::m::fed::well_known namespace ircd::m::fed::well_known
{ {
struct request; static net::hostport make_remote(const string_view &);
static void submit(request &); static void submit(request &);
static void receive(request &); static void receive(request &);
static int handle(request &); static void finish(request &);
static string_view fetch(const mutable_buffer &, const string_view &); static bool handle(request &);
static void worker();
static server::request request_skip;
extern ctx::dock worker_dock;
extern ctx::context worker_context;
extern run::changed handle_quit;
extern log::log log; extern log::log log;
} }
struct ircd::m::fed::well_known::request template<>
{ decltype(ircd::util::instance_list<ircd::m::fed::well_known::request>::allocator)
static const string_view path; ircd::util::instance_list<ircd::m::fed::well_known::request>::allocator
static const string_view type; {};
static const server::request::opts sopts;
unique_mutable_buffer buf; template<>
unique_mutable_buffer carry; decltype(ircd::util::instance_list<ircd::m::fed::well_known::request>::list)
rfc3986::uri uri; ircd::util::instance_list<ircd::m::fed::well_known::request>::list
server::request req; {
http::code code {0}; allocator
http::response::head head;
string_view location;
size_t redirects {0};
json::object response;
json::string m_server;
}; };
decltype(ircd::m::fed::well_known::log) decltype(ircd::m::fed::well_known::log)
@ -44,24 +42,6 @@ ircd::m::fed::well_known::log
"m.well-known" "m.well-known"
}; };
decltype(ircd::m::fed::well_known::request::path)
ircd::m::fed::well_known::request::path
{
"/.well-known/matrix/server"
};
decltype(ircd::m::fed::well_known::request::type)
ircd::m::fed::well_known::request::type
{
"well-known.matrix.server"
};
decltype(ircd::m::fed::well_known::request::sopts)
ircd::m::fed::well_known::request::sopts
{
false // http_exceptions
};
decltype(ircd::m::fed::well_known::cache_default) decltype(ircd::m::fed::well_known::cache_default)
ircd::m::fed::well_known::cache_default ircd::m::fed::well_known::cache_default
{ {
@ -84,40 +64,85 @@ ircd::m::fed::well_known::cache_max
{ "default", 48 * 60 * 60L }, { "default", 48 * 60 * 60L },
}; };
decltype(ircd::m::fed::well_known::fetch_timeout) decltype(ircd::m::fed::well_known::request::path)
ircd::m::fed::well_known::fetch_timeout ircd::m::fed::well_known::request::path
{ {
{ "name", "ircd.m.fed.well-known.fetch.timeout" }, "/.well-known/matrix/server"
{ "default", 15L },
}; };
decltype(ircd::m::fed::well_known::fetch_redirects) decltype(ircd::m::fed::well_known::request::type)
ircd::m::fed::well_known::fetch_redirects ircd::m::fed::well_known::request::type
{ {
{ "name", "ircd.m.fed.well-known.fetch.redirects" }, "well-known.matrix.server"
{ "default", 2L }, };
decltype(ircd::m::fed::well_known::request::sopts)
ircd::m::fed::well_known::request::sopts
{
false // http_exceptions
};
decltype(ircd::m::fed::well_known::request::timeout)
ircd::m::fed::well_known::request::timeout
{
{ "name", "ircd.m.fed.well-known.request.timeout" },
{ "default", 15L },
};
decltype(ircd::m::fed::well_known::request::redirects_max)
ircd::m::fed::well_known::request::redirects_max
{
{ "name", "ircd.m.fed.well-known.request.redirects.max" },
{ "default", 2L },
};
decltype(ircd::m::fed::well_known::request::id_ctr)
ircd::m::fed::well_known::request::id_ctr;
decltype(ircd::m::fed::well_known::request::mutex)
ircd::m::fed::well_known::request::mutex;
decltype(ircd::m::fed::well_known::worker_dock)
ircd::m::fed::well_known::worker_dock;
decltype(ircd::m::fed::well_known::worker_context)
ircd::m::fed::well_known::worker_context
{
"m.fed.well_known",
512_KiB,
&worker,
context::POST
};
decltype(ircd::m::fed::well_known::handle_quit)
ircd::m::fed::well_known::handle_quit
{
run::level::QUIT, []
{
worker_dock.notify_all();
}
}; };
ircd::ctx::future<ircd::string_view> ircd::ctx::future<ircd::string_view>
ircd::m::fed::well_known::get(const mutable_buffer &buf, ircd::m::fed::well_known::get(const mutable_buffer &buf,
const string_view &origin, const string_view &target,
const opts &opts) const opts &opts)
try try
{ {
const m::room::id::buf room_id const m::room::id::buf cache_room_id
{ {
"dns", m::my_host() "dns", m::my_host()
}; };
const m::room room const m::room cache_room
{ {
room_id cache_room_id
}; };
const m::event::idx event_idx const m::event::idx event_idx
{ {
likely(opts.cache_check)? likely(opts.cache_check)?
room.get(std::nothrow, request::type, origin): cache_room.get(std::nothrow, request::type, target):
0UL 0UL
}; };
@ -148,114 +173,115 @@ try
ircd::now<system_point>() > expires ircd::now<system_point>() > expires
}; };
const string_view cached const json::string cached
{ {
data(buf), move(buf, json::string(content["m.server"])) content["m.server"]
}; };
const bool valid const bool valid
{ {
// entry must not be blank
!empty(cached) !empty(cached)
// entry must not be expired unless options allow expired hits
&& (!expired || opts.expired)
}; };
// Branch on valid cache hit to return result. // Branch to return cache hit
if(valid && (!expired || opts.expired)) if(likely(valid))
{
char tmbuf[48];
log::debug
{
log, "%s found in cache delegated to %s event_idx:%u expires %s",
origin,
cached,
event_idx,
timef(tmbuf, expires, localtime),
};
return ctx::future<string_view> return ctx::future<string_view>
{ {
ctx::already, cached ctx::already, string_view
};
}
const string_view fetched
{
opts.request?
fetch(buf + size(cached), origin):
string_view{}
};
const string_view delegated
{
data(buf), move(buf, fetched?: origin)
};
// Conditions for valid expired cache hit w/ failure to reacquire.
const bool fallback
{
valid
&& expired
&& cached != delegated
&& origin == delegated
&& now<system_point>() < expires + seconds(cache_max)
};
if(fallback)
{
char tmbuf[48];
log::debug
{
log, "%s found in cache delegated to %s event_idx:%u expired %s",
origin,
cached,
event_idx,
timef(tmbuf, expires, localtime),
};
assert(opts.cache_check);
return ctx::future<string_view>
{
ctx::already, cached
};
}
if(likely(opts.request && opts.cache_result))
{
// Any time the well-known result is the same as the origin (that
// includes legitimate errors where fetch_well_known() returns the
// origin to default) we consider that an error and use the error
// TTL value. Sorry, no exponential backoff implemented yet.
const auto cache_ttl
{
origin == delegated?
seconds(cache_error).count():
seconds(cache_default).count()
};
// Write our record to the cache room; note that this doesn't really
// match the format of other DNS records in this room since it's a bit
// simpler, but we don't share the ircd.dns.rr type prefix anyway.
const auto cache_id
{
m::send(room, m::me(), request::type, origin, json::members
{ {
{ "ttl", cache_ttl }, data(buf), move(buf, cached)
{ "m.server", delegated }, }
})
}; };
log::debug const net::hostport remote
{
make_remote(target)
};
const bool fetch
{
// options must allow network request
opts.request
// check if the peer already has a cached error in server::
&& !server::errant(remote)
};
// Branch if won't fetch; return target itself as result
if(!fetch)
return ctx::future<string_view>
{ {
log, "%s caching delegation to %s to cache in %s", ctx::already, string_view
origin, {
delegated, data(buf), move(buf, target)
string_view{cache_id}, }
};
if(opts.cache_check)
{
char tmbuf[48];
log::dwarning
{
log, "%s cache invalid %s event_idx:%u expires %s",
target,
cached?: json::string{"<not found>"},
event_idx,
cached? timef(tmbuf, expires, localtime): "<never>"_sv,
}; };
} }
return ctx::future<string_view> // Synchronize modification of the request::list
const std::lock_guard request_lock
{ {
ctx::already, delegated request::mutex
}; };
// Start request
auto req
{
std::make_unique<request>()
};
// req->target is a copy in the request struct so the caller can disappear.
req->target = string_view
{
req->tgtbuf, copy(req->tgtbuf, target)
};
// but req->out is not safe once the caller destroys their future, which
// is indicated by req->promise's invalidation. Do not write to this
// unless the promise is valid.
req->out = buf;
// all other properties are independent of the caller
req->opts = opts;
req->m_server = cached;
req->expires = expires;
req->uri.path = request::path;
req->uri.remote = req->target;
ctx::future<string_view> ret{req->promise}; try
{
submit(*req);
req.release();
}
catch(const std::exception &e)
{
log::derror
{
log, "request submit for %s :%s",
target,
e.what(),
};
const ctx::exception_handler eh;
finish(*req);
}
return ret;
} }
catch(const ctx::interrupted &) catch(const ctx::interrupted &)
{ {
@ -265,8 +291,8 @@ catch(const std::exception &e)
{ {
log::error log::error
{ {
log, "%s :%s", log, "get %s :%s",
origin, target,
e.what(), e.what(),
}; };
@ -274,94 +300,93 @@ catch(const std::exception &e)
{ {
ctx::already, string_view ctx::already, string_view
{ {
data(buf), move(buf, origin) data(buf), move(buf, target)
} }
}; };
} }
ircd::string_view void
ircd::m::fed::well_known::fetch(const mutable_buffer &user_buf, ircd::m::fed::well_known::worker()
const string_view &target)
try try
{ {
request req; // Wait for runlevel RUN before proceeding...
req.uri.path = request::path; run::barrier<ctx::interrupted>{};
req.uri.remote = target; while(!request::list.empty() || run::level == run::level::RUN)
req.buf = unique_mutable_buffer
{ {
8_KiB worker_dock.wait([]
};
for(; req.redirects < fetch_redirects; ++req.redirects)
{
submit(req);
receive(req);
switch(handle(req))
{ {
case -1: return !request::list.empty() || run::level != run::level::RUN;
continue; });
case false: if(request::list.empty())
break; break;
case true: auto next
return string_view {
{ ctx::when_any(std::begin(request::list), std::end(request::list), []
data(user_buf), move(user_buf, req.m_server) (auto &it) -> server::request &
}; {
} return !(*it)->req? request_skip: (*it)->req;
})
};
const ctx::uninterruptible::nothrow ui;
if(!next.wait(milliseconds(250), std::nothrow))
continue;
const auto it
{
next.get()
};
assert(it != std::end(request::list));
if(unlikely(it == std::end(request::list)))
continue;
const std::lock_guard request_lock
{
request::mutex
};
std::unique_ptr<request> req
{
*it
};
if(!handle(*req)) // redirect
req.release();
else
finish(*req);
} }
return {}; assert(request::list.empty());
}
catch(const ctx::interrupted &)
{
throw;
} }
catch(const std::exception &e) catch(const std::exception &e)
{ {
log::derror log::critical
{ {
log, "%s in network query :%s", log, "Worker unhandled :%s",
target,
e.what(), e.what(),
}; };
return {};
} }
int bool
ircd::m::fed::well_known::handle(request &req) ircd::m::fed::well_known::handle(request &req)
try
{ {
receive(req);
// Successful result // Successful result
if(likely(req.code < 300)) if(likely(req.code < 300))
{
req.m_server = req.response["m.server"];
// This construction validates we didn't get a junk string
volatile const net::hostport ret
{
req.m_server
};
log::debug
{
log, "query to %s found delegation to %s",
req.uri.remote,
req.m_server,
};
assert(bool(req.m_server));
return true; return true;
}
// Successful error; bail // Successful error; bail
if(req.code >= 400) if(req.code >= 400)
return false; return true;
// Indirection code, but no location response header // Indirection code, but no location response header
if(!req.location) if(!req.location)
return false; return true;
// Redirection; carry over the new target by copying it because it's // Redirection; carry over the new target by copying it because it's
// in the buffer which we'll be overwriting for the new request. // in the buffer which we'll be overwriting for the new request.
@ -370,21 +395,133 @@ ircd::m::fed::well_known::handle(request &req)
// Indirection code, bad location header. // Indirection code, bad location header.
if(!req.uri.path || !req.uri.remote) if(!req.uri.path || !req.uri.remote)
return false; return true;
if(req.redirects++ >= request::redirects_max)
return true;
// Redirect // Redirect
return -1; submit(req);
return false;
}
catch(const std::exception &e)
{
log::derror
{
log, "%s handling :%s",
req.target,
e.what(),
};
return true;
}
void
ircd::m::fed::well_known::finish(request &req)
try
{
json::string result;
const unwind resolve{[&req, &result]
{
if(req.promise.valid())
req.promise.set_value(string_view
{
data(req.out), move(req.out, result?: req.target)
});
}};
if(json::valid(req.response, std::nothrow))
result = req.response["m.server"];
if(!result)
result = req.m_server;
if(!result)
result = req.target;
// This construction validates we didn't get a junk string
volatile const net::hostport ret
{
result
};
if(result != req.target)
log::debug
{
log, "query to %s for %s found delegation to %s",
req.uri.remote,
req.target,
result,
};
const bool cache_expired
{
req.expires + seconds(cache_max) < now<system_point>()
};
const bool cache_result
{
result
&& req.opts.cache_result
&& req.opts.request
&& (cache_expired || result != req.m_server)
};
req.m_server = result;
if(!cache_result)
return;
// Any time the well-known result is the same as the req.target (that
// includes legitimate errors where fetch_well_known() returns the
// req.target to default) we consider that an error and use the error
// TTL value. Sorry, no exponential backoff implemented yet.
const auto cache_ttl
{
req.target == req.m_server?
seconds(cache_error).count():
seconds(cache_default).count()
};
const m::room::id::buf cache_room_id
{
"dns", m::my_host()
};
// Write our record to the cache room; note that this doesn't really
// match the format of other DNS records in this room since it's a bit
// simpler, but we don't share the ircd.dns.rr type prefix anyway.
const auto cache_id
{
m::send(cache_room_id, m::me(), request::type, req.target, json::members
{
{ "ttl", cache_ttl },
{ "m.server", req.m_server },
})
};
log::debug
{
log, "%s cached delegation to %s with %s ttl:%ld",
req.target,
req.m_server,
string_view{cache_id},
cache_ttl,
};
}
catch(const std::exception &e)
{
log::error
{
log, "%s completion :%s",
req.target,
e.what(),
};
} }
void void
ircd::m::fed::well_known::receive(request &req) ircd::m::fed::well_known::receive(request &req)
{ {
const seconds timeout req.code = req.req.get(seconds(request::timeout));
{
fetch_timeout
};
req.code = req.req.get(timeout);
req.head = req.req.in.gethead(req.req); req.head = req.req.in.gethead(req.req);
req.location = req.head.location; req.location = req.head.location;
req.response = json::object req.response = json::object
@ -395,7 +532,7 @@ ircd::m::fed::well_known::receive(request &req)
char dom_buf[rfc3986::DOMAIN_BUFSIZE]; char dom_buf[rfc3986::DOMAIN_BUFSIZE];
log::debug log::debug
{ {
log, "fetch from %s %s :%u %s", log, "fetch to %s %s :%u %s",
req.uri.remote, req.uri.remote,
req.uri.path, req.uri.path,
uint(req.code), uint(req.code),
@ -406,15 +543,9 @@ ircd::m::fed::well_known::receive(request &req)
void void
ircd::m::fed::well_known::submit(request &req) ircd::m::fed::well_known::submit(request &req)
{ {
const net::hostport &remote
{
req.uri.remote
};
// Hard target https service; do not inherit any matrix service from remote.
const net::hostport target const net::hostport target
{ {
net::host(remote), "https", net::port(remote) make_remote(req.uri.remote)
}; };
const http::header headers[] const http::header headers[]
@ -440,11 +571,34 @@ ircd::m::fed::well_known::submit(request &req)
// recognized by ircd::server to place received content directly after // recognized by ircd::server to place received content directly after
// head in this buffer without any additional dynamic allocation. // head in this buffer without any additional dynamic allocation.
server::in in; server::in in;
in.head = req.buf + size(out.head); in.head = mutable_buffer{req.buf} + size(out.head);
in.content = in.head; in.content = in.head;
req.code = http::code(0);
req.head = {};
req.response = {};
req.location = {};
req.req = server::request req.req = server::request
{ {
target, std::move(out), std::move(in), &req.sopts target, std::move(out), std::move(in), &req.sopts
}; };
worker_dock.notify();
}
ircd::net::hostport
ircd::m::fed::well_known::make_remote(const string_view &target)
{
const net::hostport remote
{
target
};
// Hard target https service; do not inherit any matrix service from remote.
const net::hostport ret
{
net::host(remote), "https", net::port(remote)
};
return ret;
} }