0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-25 23:14:13 +01:00

ircd::net::dns: Refactor system for the !dns room.

This commit is contained in:
Jason Volk 2019-03-21 18:24:36 -07:00
parent e133cd5a25
commit 68e3655a1d
11 changed files with 1361 additions and 1352 deletions

View file

@ -19,27 +19,21 @@
///
namespace ircd::net::dns
{
struct tag;
struct opts extern const opts_default;
using answers = vector_view<const rfc1035::answer>;
using records = vector_view<const rfc1035::record *>;
using callback = std::function<void (std::exception_ptr, const hostport &, const records &)>;
using callback_A_one = std::function<void (std::exception_ptr, const hostport &, const rfc1035::record::A &)>;
using callback_SRV_one = std::function<void (std::exception_ptr, const hostport &, const rfc1035::record::SRV &)>;
using callback_ipport_one = std::function<void (std::exception_ptr, const hostport &, const ipport &)>;
using answers_callback = std::function<void (std::exception_ptr, const tag &, const answers &)>;
// Cache warming drop-in callbacks.
extern const callback_A_one prefetch_A;
extern const callback_SRV_one prefetch_SRV;
extern const callback_ipport_one prefetch_ipport;
using callback = std::function<void (const hostport &, const json::array &)>;
using callback_one = std::function<void (const hostport &, const json::object &)>;
using callback_ipport = std::function<void (std::exception_ptr, const hostport &, const ipport &)>;
// Callback-based interface
void resolve(const hostport &, const opts &, callback);
void resolve(const hostport &, const opts &, callback_A_one);
void resolve(const hostport &, const opts &, callback_SRV_one);
void resolve(const hostport &, const opts &, callback_ipport_one);
// Callback-based interface (default options)
template<class Callback> void resolve(const hostport &, Callback&&);
void resolve(const hostport &, const opts &, callback_one); // convenience
void resolve(const hostport &, const opts &, callback_ipport); // convenience
// (internal) generate strings for rfc1035 questions or dns::cache keys.
string_view make_SRV_key(const mutable_buffer &out, const hostport &, const opts &);
@ -76,9 +70,7 @@ struct ircd::net::dns::opts
bool cache_check {true};
/// Whether the result of this lookup from the nameserver should be
/// added to the cache. If true, the TTL value in result records will be
/// modified to an absolute expiration time. If false, no modification
/// occurs from the original value.
/// added to the cache.
bool cache_result {true};
/// When false, nxdomain errors are not treated as exceptions and the
@ -94,19 +86,14 @@ struct ircd::net::dns::opts
/// (internal) DNS cache
namespace ircd::net::dns::cache
{
using closure = std::function<bool (const string_view &, const rfc1035::record &)>;
using closure = std::function<bool (const string_view &, const json::object &)>;
bool for_each(const uint16_t &type, const closure &);
bool for_each(const string_view &type, const closure &);
string_view make_type(const mutable_buffer &out, const string_view &);
string_view make_type(const mutable_buffer &out, const uint16_t &);
bool for_each(const string_view &type, const closure &); // do not make_type() here
bool for_each(const hostport &, const opts &, const closure &);
bool get(const hostport &, const opts &, const callback &);
rfc1035::record *put(const rfc1035::question &, const rfc1035::answer &);
rfc1035::record *put_error(const rfc1035::question &, const uint &code);
bool put(const hostport &, const opts &, const records &);
bool put(const hostport &, const opts &, const uint &code, const string_view &msg = {});
};
template<class Callback>
void
ircd::net::dns::resolve(const hostport &hostport,
Callback&& callback)
{
resolve(hostport, opts_default, std::forward<Callback>(callback));
}

View file

@ -668,7 +668,7 @@ ircd::net::open(socket &socket,
handler(std::move(eptr));
}};
auto connector{[&socket, opts, complete(std::move(complete))]
const dns::callback_ipport connector{[&socket, opts, complete(std::move(complete))]
(std::exception_ptr eptr, const hostport &hp, const ipport &ipport)
{
if(eptr)
@ -679,7 +679,7 @@ ircd::net::open(socket &socket,
}};
if(!opts.ipport)
dns::resolve(opts.hostport, std::move(connector));
dns::resolve(opts.hostport, dns::opts_default, std::move(connector));
else
connector({}, opts.hostport, opts.ipport);
}
@ -3278,80 +3278,36 @@ ircd::net::dns::log
decltype(ircd::net::dns::opts_default)
ircd::net::dns::opts_default;
decltype(ircd::net::dns::prefetch_ipport)
ircd::net::dns::prefetch_ipport{[]
(std::exception_ptr, const auto &hostport, const auto &record)
{
// Do nothing; cache already updated if necessary
}};
decltype(ircd::net::dns::prefetch_SRV)
ircd::net::dns::prefetch_SRV{[]
(std::exception_ptr, const auto &hostport, const auto &record)
{
// Do nothing; cache already updated if necessary
}};
decltype(ircd::net::dns::prefetch_A)
ircd::net::dns::prefetch_A{[]
(std::exception_ptr, const auto &hostport, const auto &record)
{
// Do nothing; cache already updated if necessary
}};
/// Convenience composition with a single ipport callback. This is the result of
/// an automatic chain of queries such as SRV and A/AAAA based on the input and
/// intermediate results.
void
ircd::net::dns::resolve(const hostport &hp,
const opts &op,
callback_ipport_one cb)
callback_ipport cb)
{
using prototype = void (const hostport &, opts, callback_ipport_one);
using prototype = void (const hostport &, const opts &, callback_ipport);
static mods::import<prototype> function
static mods::import<prototype> call
{
"s_dns", "_resolve_ipport"
"s_dns", "ircd::net::dns::resolve"
};
function(hp, op, std::move(cb));
call(hp, op, std::move(cb));
}
/// Convenience callback with a single SRV record which was selected from
/// the vector with stochastic respect for weighting and priority.
void
ircd::net::dns::resolve(const hostport &hp,
const opts &op,
callback_SRV_one cb)
callback_one cb)
{
using prototype = void (const hostport &, opts, callback_SRV_one);
using prototype = void (const hostport &, const opts &, callback_one);
static mods::import<prototype> function
static mods::import<prototype> call
{
"s_dns", "_resolve__SRV"
"s_dns", "ircd::net::dns::resolve"
};
function(hp, op, std::move(cb));
call(hp, op, std::move(cb));
}
/// Convenience callback with a single A record which was selected from
/// the vector randomly.
void
ircd::net::dns::resolve(const hostport &hp,
const opts &op,
callback_A_one cb)
{
using prototype = void (const hostport &, opts, callback_A_one);
static mods::import<prototype> function
{
"s_dns", "_resolve__A"
};
function(hp, op, std::move(cb));
}
/// Fundamental callback with a vector of abstract resource records.
void
ircd::net::dns::resolve(const hostport &hp,
const opts &op,
@ -3359,12 +3315,12 @@ ircd::net::dns::resolve(const hostport &hp,
{
using prototype = void (const hostport &, const opts &, callback);
static mods::import<prototype> function
static mods::import<prototype> call
{
"s_dns", "_resolve__"
"s_dns", "ircd::net::dns::resolve"
};
function(hp, op, std::move(cb));
call(hp, op, std::move(cb));
}
/// Really assumptional and hacky right now. We're just assuming the SRV
@ -3407,56 +3363,61 @@ ircd::net::dns::make_SRV_key(const mutable_buffer &out,
// cache
//
ircd::rfc1035::record *
ircd::net::dns::cache::put_error(const rfc1035::question &question,
const uint &code)
bool
ircd::net::dns::cache::put(const hostport &h,
const opts &o,
const uint &r,
const string_view &m)
try
{
using prototype = rfc1035::record *(const rfc1035::question &, const uint &);
using prototype = bool (const hostport &, const opts &, const uint &, const string_view &);
static mods::import<prototype> function
static mods::import<prototype> call
{
"s_dns", "_put_error"
"s_dns", "ircd::net::dns::cache::put"
};
return function(question, code);
return call(h, o, r, m);
}
catch(const mods::unavailable &e)
{
thread_local char buf[128];
log::dwarning
{
log, "Failed to put error for '%s' in DNS cache :%s",
question.name,
string(buf, h),
e.what()
};
return nullptr;
return false;
}
ircd::rfc1035::record *
ircd::net::dns::cache::put(const rfc1035::question &question,
const rfc1035::answer &answer)
bool
ircd::net::dns::cache::put(const hostport &h,
const opts &o,
const records &r)
try
{
using prototype = rfc1035::record *(const rfc1035::question &, const rfc1035::answer &);
using prototype = bool (const hostport &, const opts &, const records &);
static mods::import<prototype> function
static mods::import<prototype> call
{
"s_dns", "_put"
"s_dns", "ircd::net::dns::cache::put"
};
return function(question, answer);
return call(h, o, r);
}
catch(const mods::unavailable &e)
{
thread_local char buf[128];
log::dwarning
{
log, "Failed to put '%s' in DNS cache :%s",
question.name,
string(buf, h),
e.what()
};
return nullptr;
return false;
}
/// This function has an opportunity to respond from the DNS cache. If it
@ -3466,19 +3427,19 @@ catch(const mods::unavailable &e)
/// be of a cached successful result, or a cached error. Both will return
/// true.
bool
ircd::net::dns::cache::get(const hostport &hp,
ircd::net::dns::cache::get(const hostport &h,
const opts &o,
const callback &cb)
const callback &c)
try
{
using prototype = bool (const hostport &, const opts &, const callback &);
static mods::import<prototype> function
static mods::import<prototype> call
{
"s_dns", "_get"
"s_dns", "ircd::net::dns::cache::get"
};
return function(hp, o, cb);
return call(h, o, c);
}
catch(const mods::unavailable &e)
{
@ -3486,7 +3447,7 @@ catch(const mods::unavailable &e)
log::dwarning
{
log, "Failed to get '%s' from DNS cache :%s",
string(buf, hp),
string(buf, h),
e.what()
};
@ -3494,24 +3455,57 @@ catch(const mods::unavailable &e)
}
bool
ircd::net::dns::cache::for_each(const string_view &type,
const closure &closure)
ircd::net::dns::cache::for_each(const hostport &h,
const opts &o,
const closure &c)
{
return for_each(rfc1035::qtype.at(type), closure);
using prototype = bool (const hostport &, const opts &, const closure &);
static mods::import<prototype> call
{
"s_dns", "ircd::net::dns::cache::for_each"
};
return call(h, o, c);
}
bool
ircd::net::dns::cache::for_each(const uint16_t &type,
ircd::net::dns::cache::for_each(const string_view &type,
const closure &c)
{
using prototype = bool (const uint16_t &, const closure &);
using prototype = bool (const string_view &, const closure &);
static mods::import<prototype> function
static mods::import<prototype> call
{
"s_dns", "_for_each"
"s_dns", "ircd::net::dns::cache::for_each"
};
return function(type, c);
return call(type, c);
}
ircd::string_view
ircd::net::dns::cache::make_type(const mutable_buffer &out,
const uint16_t &type)
try
{
return make_type(out, rfc1035::rqtype.at(type));
}
catch(const std::out_of_range &)
{
throw error
{
"Record type[%u] is not recognized", type
};
}
ircd::string_view
ircd::net::dns::cache::make_type(const mutable_buffer &out,
const string_view &type)
{
return fmt::sprintf
{
out, "ircd.dns.rrs.%s", type
};
}
///////////////////////////////////////////////////////////////////////////////

View file

@ -731,7 +731,7 @@ try
return;
// Make a query through SRV and A records.
net::dns::resolve(origin, net::dns::prefetch_ipport);
//net::dns::resolve(origin, net::dns::prefetch_ipport);
}
catch(const std::exception &e)
{

View file

@ -931,7 +931,8 @@ ircd::server::peer::resolve(const hostport &hostport)
};
op_resolve = true;
net::dns::resolve(hostport, std::move(handler));
net::dns::opts opts;
net::dns::resolve(hostport, opts, std::move(handler));
}
void

View file

@ -77,7 +77,7 @@ s_moduledir = @moduledir@
s_conf_la_SOURCES = s_conf.cc
s_control_la_SOURCES = s_control.cc
s_dns_la_SOURCES = s_dns.cc s_dns_cache.cc s_dns_resolver.cc
s_dns_la_SOURCES = s_dns.cc s_dns_resolver.cc
s_node_la_SOURCES = s_node.cc
s_listen_la_SOURCES = s_listen.cc
s_keys_la_SOURCES = s_keys.cc

View file

@ -4184,7 +4184,7 @@ console_cmd__net__host(opt &out, const string_view &line)
{
const params param{line, " ",
{
"hostport"
"hostport", "qtype"
}};
const net::hostport hostport
@ -4192,18 +4192,41 @@ console_cmd__net__host(opt &out, const string_view &line)
param["hostport"]
};
const string_view &qtype
{
param["qtype"]
};
ctx::dock dock;
bool done{false};
net::ipport ipport;
std::string res[2];
std::exception_ptr eptr;
net::dns::resolve(hostport, [&done, &dock, &eptr, &ipport]
(std::exception_ptr eptr_, const net::hostport &, const net::ipport &ipport_)
net::dns::opts opts;
opts.qtype = qtype? rfc1035::qtype.at(qtype) : 0;
const net::dns::callback_ipport cbipp{[&done, &dock, &eptr, &res]
(std::exception_ptr eptr_, const net::hostport &hp, const net::ipport &ipport)
{
eptr = std::move(eptr_);
ipport = ipport_;
res[0] = string(hp);
res[1] = string(ipport);
done = true;
dock.notify_one();
});
}};
const net::dns::callback cbarr{[&done, &dock, &eptr, &res]
(const net::hostport &hp, const json::array &rrs)
{
res[0] = string(hp);
res[1] = rrs;
done = true;
dock.notify_one();
}};
if(!opts.qtype)
net::dns::resolve(hostport, opts, cbipp);
else
net::dns::resolve(hostport, opts, cbarr);
while(!done)
dock.wait();
@ -4211,7 +4234,7 @@ console_cmd__net__host(opt &out, const string_view &line)
if(eptr)
std::rethrow_exception(eptr);
else
out << ipport << std::endl;
out << res[0] << " : " << res[1] << std::endl;
return true;
}
@ -4223,96 +4246,45 @@ console_cmd__host(opt &out, const string_view &line)
}
bool
console_cmd__net__host__cache__A(opt &out, const string_view &line)
{
net::dns::cache::for_each("A", [&]
(const auto &host, const auto &r)
{
const auto &record
{
dynamic_cast<const rfc1035::record::A &>(r)
};
const net::ipport ipp{record.ip4, 0};
out << std::setw(48) << std::right << host
<< " => " << std::setw(21) << std::left << ipp
<< " expires " << timestr(record.ttl, ircd::localtime)
<< " (" << record.ttl << ")"
<< std::endl;
return true;
});
return true;
}
bool
console_cmd__net__host__cache__A__count(opt &out, const string_view &line)
{
size_t count[2] {0};
net::dns::cache::for_each("A", [&]
(const auto &host, const auto &r)
{
const auto &record
{
dynamic_cast<const rfc1035::record::A &>(r)
};
++count[bool(record.ip4)];
return true;
});
out << "resolved: " << count[1] << std::endl;
out << "error: " << count[0] << std::endl;
return true;
}
bool
console_cmd__net__host__cache__A__clear(opt &out, const string_view &line)
console_cmd__net__host__cache(opt &out, const string_view &line)
{
const params param{line, " ",
{
"hostport"
"qtype", "hostport"
}};
if(!param.count())
const string_view &qtype
{
out << "NOT IMPLEMENTED" << std::endl;
param["qtype"]
};
if(!param["hostport"])
{
net::dns::cache::for_each(qtype, [&]
(const string_view &host, const auto &r)
{
out << std::left << std::setw(48) << host
<< r
<< std::endl;
return true;
});
return true;
}
const net::hostport hostport
{
param.at("hostport")
param["hostport"]
};
out << "NOT IMPLEMENTED" << std::endl;
return true;
}
bool
console_cmd__net__host__cache__SRV(opt &out, const string_view &line)
{
net::dns::cache::for_each("SRV", [&]
(const auto &key, const auto &r)
net::dns::opts opts;
opts.qtype = rfc1035::qtype.at(qtype);
net::dns::cache::for_each(hostport, opts, [&]
(const auto &host, const auto &r)
{
const auto &record
{
dynamic_cast<const rfc1035::record::SRV &>(r)
};
thread_local char buf[256];
const string_view remote{fmt::sprintf
{
buf, "%s:%u",
rstrip(record.tgt, '.'),
record.port
}};
out << std::setw(48) << std::right << key
<< " => " << std::setw(48) << std::left << remote
<< " expires " << timestr(record.ttl, ircd::localtime)
<< " (" << record.ttl << ")"
out << std::left << std::setw(48) << host
<< r
<< std::endl;
return true;
@ -4322,18 +4294,23 @@ console_cmd__net__host__cache__SRV(opt &out, const string_view &line)
}
bool
console_cmd__net__host__cache__SRV__count(opt &out, const string_view &line)
console_cmd__net__host__cache__count(opt &out, const string_view &line)
{
const params param{line, " ",
{
"qtype"
}};
const string_view &qtype
{
param["qtype"]
};
size_t count[2] {0};
net::dns::cache::for_each("SRV", [&]
net::dns::cache::for_each(qtype, [&]
(const auto &host, const auto &r)
{
const auto &record
{
dynamic_cast<const rfc1035::record::SRV &>(r)
};
++count[bool(record.tgt)];
++count[bool(r.size() > 1)];
return true;
});
@ -4343,73 +4320,17 @@ console_cmd__net__host__cache__SRV__count(opt &out, const string_view &line)
}
bool
console_cmd__net__host__cache__SRV__clear(opt &out, const string_view &line)
console_cmd__net__host__cache__clear(opt &out, const string_view &line)
{
const params param{line, " ",
{
"hostport", "[service]"
}};
if(!param.count())
{
out << "NOT IMPLEMENTED" << std::endl;
return true;
}
const net::hostport hostport
{
param.at("hostport")
};
net::dns::opts opts;
opts.srv = param.at("[service]", "_matrix._tcp."_sv);
thread_local char srv_key_buf[128];
const auto srv_key
{
net::dns::make_SRV_key(srv_key_buf, hostport, opts)
};
out << "NOT IMPLEMENTED" << std::endl;
return true;
}
bool
console_cmd__net__host__prefetch(opt &out, const string_view &line)
{
const params param{line, " ",
{
"room_id",
}};
const auto &room_id
{
m::room_id(param.at(0))
};
const m::room room
{
room_id
};
const m::room::origins origins
{
room
};
size_t count{0};
origins.for_each([&count](const string_view &origin)
{
net::dns::resolve(origin, net::dns::prefetch_ipport);
++count;
});
out << "Prefetch resolving " << count << " origins."
<< std::endl;
return true;
}
bool
console_cmd__net__listen__list(opt &out, const string_view &line)
{

View file

@ -16,7 +16,7 @@ IRCD_MODULE
"Domain Name System Client, Cache & Components",
[] // init
{
ircd::net::dns::resolver_init();
ircd::net::dns::resolver_init(ircd::net::dns::handle_resolved);
},
[] // fini
{
@ -24,202 +24,132 @@ IRCD_MODULE
}
};
/// Convenience composition with a single ipport callback. This is the result of
/// an automatic chain of queries such as SRV and A/AAAA based on the input and
/// intermediate results.
void
ircd::net::dns::_resolve_ipport(const hostport &hp,
opts opts,
callback_ipport_one callback)
decltype(ircd::net::dns::cache::error_ttl)
ircd::net::dns::cache::error_ttl
{
auto handler
{
std::bind(&handle_ipport__A, std::move(callback), ph::_1, ph::_2, ph::_3)
};
{ "name", "ircd.net.dns.cache.error_ttl" },
{ "default", 1200L },
};
if(!hp.service)
return _resolve__A(hp, opts, std::move(handler));
decltype(ircd::net::dns::cache::nxdomain_ttl)
ircd::net::dns::cache::nxdomain_ttl
{
{ "name", "ircd.net.dns.cache.nxdomain_ttl" },
{ "default", 43200L },
};
decltype(ircd::net::dns::cache::min_ttl)
ircd::net::dns::cache::min_ttl
{
{ "name", "ircd.net.dns.cache.min_ttl" },
{ "default", 1200L },
};
decltype(ircd::net::dns::cache::room_id)
ircd::net::dns::cache::room_id
{
"dns", my_host()
};
decltype(ircd::net::dns::cache::hook)
ircd::net::dns::cache::hook
{
handle_cached,
{
{ "_site", "vm.notify" },
{ "room_id", string_view{room_id} },
}
};
decltype(ircd::net::dns::waiting)
ircd::net::dns::waiting;
void
IRCD_MODULE_EXPORT
ircd::net::dns::resolve(const hostport &hp,
const opts &opts_,
callback_ipport callback)
{
if(unlikely(!port(hp) && !hp.service))
throw error
{
"Port or service is required for this query"
};
dns::opts opts(opts_);
opts.qtype = 33;
opts.nxdomain_exceptions = false;
_resolve__SRV(hp, opts, [opts(opts), handler(std::move(handler))]
(std::exception_ptr eptr, hostport hp, const rfc1035::record::SRV &record)
resolve(hp, opts, dns::callback_one{[opts, callback(std::move(callback))]
(const hostport &hp, const json::object &rr)
mutable
{
if(eptr)
if(rr.has("error"))
{
static const rfc1035::record::A empty;
return handler(std::move(eptr), hp, empty);
const json::string &error(rr.get("error"));
const auto eptr(make_exception_ptr<rfc1035::error>("%s", error));
return callback(eptr, {host(hp), 0}, {});
}
opts.qtype = 0;
const net::hostport target
{
rr.has("tgt")?
rstrip(unquote(rr.at("tgt")), '.'):
host(hp),
rr.has("port")?
rr.get<uint16_t>("port"):
port(hp)
};
opts.qtype = 1;
opts.nxdomain_exceptions = true;
hp.host = record.tgt?: unmake_SRV_key(hp.host);
hp.port = record.port? record.port : hp.port;
_resolve__A(hp, opts, std::move(handler));
});
resolve(target, opts, dns::callback_one{[callback(std::move(callback)), target]
(const hostport &hp, const json::object &rr)
{
const json::string &error(rr.get("error"));
const auto eptr
{
!empty(error)?
make_exception_ptr<rfc1035::error>("%s", error):
std::exception_ptr{}
};
const json::string &ip(rr.get("ip", "0.0.0.0"));
const net::ipport ipport(ip, port(target));
return callback(eptr, {host(hp), port(target)}, ipport);
}});
}});
}
void
ircd::net::dns::handle_ipport__A(callback_ipport_one callback,
std::exception_ptr eptr,
const hostport &hp,
const rfc1035::record::A &record)
IRCD_MODULE_EXPORT
ircd::net::dns::resolve(const hostport &hp,
const opts &opts,
callback_one callback)
{
if(!eptr && !record.ip4)
eptr = make_exception_ptr<not_found>("Host has no A record");
const ipport ipport
{
record.ip4, port(hp)
};
callback(std::move(eptr), hp, ipport);
}
/// Convenience callback with a single SRV record which was selected from
/// the vector with stochastic respect for weighting and priority.
void
ircd::net::dns::_resolve__SRV(const hostport &hp,
opts opts,
callback_SRV_one callback)
{
static const auto &qtype
{
rfc1035::qtype.at("SRV")
};
if(unlikely(opts.qtype && opts.qtype != qtype))
if(unlikely(!opts.qtype))
throw error
{
"Specified query type '%s' (%u) but user's callback is for SRV records only.",
rfc1035::rqtype.at(opts.qtype),
opts.qtype
"A query type is required; not specified; cannot be deduced here."
};
if(!opts.qtype)
opts.qtype = qtype;
auto handler
resolve(hp, opts, dns::callback{[callback(std::move(callback))]
(const hostport &hp, const json::array &rrs)
{
std::bind(&handle__SRV, std::move(callback), ph::_1, ph::_2, ph::_3)
};
_resolve__(hp, opts, std::move(handler));
const size_t &count(rrs.size());
const auto choice(count? rand::integer(0, count - 1) : 0UL);
const json::object &rr(rrs[choice]);
callback(hp, rr);
}});
}
void
ircd::net::dns::handle__SRV(callback_SRV_one callback,
std::exception_ptr eptr,
const hostport &hp,
const records &rrs)
{
static const rfc1035::record::SRV empty;
static const auto &qtype
{
rfc1035::qtype.at("SRV")
};
if(eptr)
return callback(std::move(eptr), hp, empty);
//TODO: prng on weight / prio plz
for(size_t i(0); i < rrs.size(); ++i)
{
const auto &rr(*rrs.at(i));
if(rr.type != qtype)
continue;
const auto &record(rr.as<const rfc1035::record::SRV>());
return callback(std::move(eptr), hp, record);
}
return callback(std::move(eptr), hp, empty);
}
/// Convenience callback with a single A record which was selected from
/// the vector randomly.
void
ircd::net::dns::_resolve__A(const hostport &hp,
opts opts,
callback_A_one callback)
{
static const auto &qtype
{
rfc1035::qtype.at("A")
};
if(unlikely(opts.qtype && opts.qtype != qtype))
throw error
{
"Specified query type '%s' (%u) but user's callback is for A records only.",
rfc1035::rqtype.at(opts.qtype),
opts.qtype
};
if(!opts.qtype)
opts.qtype = qtype;
auto handler
{
std::bind(&handle__A, std::move(callback), ph::_1, ph::_2, ph::_3)
};
_resolve__(hp, opts, std::move(handler));
}
void
ircd::net::dns::handle__A(callback_A_one callback,
std::exception_ptr eptr,
const hostport &hp,
const records &rrs)
{
static const rfc1035::record::A empty;
static const auto &qtype
{
rfc1035::qtype.at("A")
};
if(eptr)
return callback(std::move(eptr), hp, empty);
// Get the actual number of A records in these results
size_t rec_count(0);
for(size_t i(0); i < rrs.size(); ++i)
rec_count += rrs.at(i)->type == qtype;
// Make a random selection for round-robin; rand::integer's range
// is inclusive so it's shifted down by one.
uint64_t selection
{
rec_count > 1?
rand::integer(1, rec_count) - 1:
0
};
assert(!rec_count || selection < rec_count);
assert(rec_count || selection == 0);
for(size_t i(0); i < rrs.size(); ++i)
{
const auto &rr(*rrs.at(i));
if(rr.type != qtype)
continue;
if(selection-- != 0)
continue;
const auto &record(rr.as<const rfc1035::record::A>());
return callback(std::move(eptr), hp, record);
}
return callback(std::move(eptr), hp, empty);
}
/// Fundamental callback with a vector of abstract resource records.
void
ircd::net::dns::_resolve__(const hostport &hp,
const opts &opts,
callback cb)
IRCD_MODULE_EXPORT
ircd::net::dns::resolve(const hostport &hp,
const opts &opts,
callback cb)
{
assert(ctx::current);
if(unlikely(!opts.qtype))
throw error
{
@ -230,5 +160,609 @@ ircd::net::dns::_resolve__(const hostport &hp,
if(cache::get(hp, opts, cb))
return;
resolver_call(hp, opts, std::move(cb));
waiting.emplace_front([cb(std::move(cb)), opts, h(std::string(host(hp)))]
(const string_view &type, const string_view &key, const json::array &rrs)
{
if(type != rfc1035::rqtype.at(opts.qtype))
return false;
if(cache::get(hostport(h), opts, cb))
return true;
cb(hostport(h), rrs);
return true;
});
resolver_call(hp, opts);
}
void
ircd::net::dns::handle_cached(const m::event &event,
m::vm::eval &eval)
try
{
const string_view &full_type
{
json::get<"type"_>(event)
};
if(!startswith(full_type, "ircd.dns.rrs."))
return;
const string_view &type
{
lstrip(full_type, "ircd.dns.rrs.")
};
const string_view &state_key
{
json::get<"state_key"_>(event)
};
const json::array &rrs
{
json::get<"content"_>(event).get("")
};
auto it(begin(waiting));
while(it != end(waiting)) try
{
const auto &proffer(*it);
if(proffer(type, state_key, rrs))
it = waiting.erase(it);
else
++it;
}
catch(const std::exception &e)
{
++it;
log::error
{
log, "proffer :%s", e.what()
};
}
}
catch(const std::exception &e)
{
log::critical
{
log, "handle_cached() :%s", e.what()
};
}
/// Called back from the dns::resolver with a vector of answers to the
/// question (we get the whole tag here).
///
/// This is being invoked on the dns::resolver's receiver context stack
/// under lock preventing any other activity with the resolver.
///
/// We process these results and insert them into our cache. The cache
/// insertion involves sending a message to the DNS room. Matrix hooks
/// on that room will catch this message for the user(s) which initiated
/// this query; we don't callback or deal with said users here.
///
void
ircd::net::dns::handle_resolved(std::exception_ptr eptr,
const tag &tag,
const answers &an)
try
{
static const size_t recsz(1024);
thread_local char recbuf[recsz * MAX_COUNT];
thread_local std::array<const rfc1035::record *, MAX_COUNT> record;
size_t i(0);
mutable_buffer buf{recbuf};
for(; i < an.size(); ++i) switch(an.at(i).qtype)
{
case 1:
record.at(i) = new_record<rfc1035::record::A>(buf, an.at(i));
continue;
case 5:
record.at(i) = new_record<rfc1035::record::CNAME>(buf, an.at(i));
continue;
case 28:
record.at(i) = new_record<rfc1035::record::AAAA>(buf, an.at(i));
continue;
case 33:
record.at(i) = new_record<rfc1035::record::SRV>(buf, an.at(i));
continue;
default:
record.at(i) = new_record<rfc1035::record>(buf, an.at(i));
continue;
}
// Sort the records by type so we can create smaller vectors to send to the
// cache. nulls from running out of space should be pushed to the back.
std::sort(begin(record), begin(record) + an.size(), []
(const auto *const &a, const auto *const &b)
{
if(!a)
return false;
if(!b)
return true;
return a->type < b->type;
});
//TODO: don't send cache ephemeral rcodes
// Bail on error here; send the cache the message
if(eptr)
{
cache::put(tag.hp, tag.opts, tag.rcode, what(eptr));
return;
}
// Branch on no records with no error
if(!i)
{
static const records empty;
cache::put(tag.hp, tag.opts, empty);
return;
}
// Iterate the record vector which was sorted by type;
// send the cache an individual view of each type since
// the cache is organized by record type.
size_t s(0), e(0);
auto last(record.at(e)->type);
for(++e; e <= i; ++e)
{
if(e < i && record.at(e)->type == last)
continue;
const vector_view<const rfc1035::record *> records
{
record.data() + s, record.data() + e
};
cache::put(tag.hp, tag.opts, records);
if(e < i)
{
last = record.at(e)->type;
s = e;
}
}
}
catch(const std::exception &e)
{
log::error
{
log, "handle resolved: tag[%u] :%s",
tag.id,
e.what()
};
throw;
}
template<class type>
ircd::rfc1035::record *
ircd::net::dns::new_record(mutable_buffer &buf,
const rfc1035::answer &answer)
{
if(unlikely(sizeof(type) > size(buf)))
return nullptr;
const auto pos(data(buf));
consume(buf, sizeof(type));
return new (data(buf)) type(answer);
}
//
// cache
//
bool
IRCD_MODULE_EXPORT
ircd::net::dns::cache::put(const hostport &hp,
const opts &opts,
const uint &code,
const string_view &msg)
{
char type_buf[64];
const string_view type
{
make_type(type_buf, opts.qtype)
};
char state_key_buf[rfc1035::NAME_BUF_SIZE * 2];
const string_view &state_key
{
opts.qtype == 33?
make_SRV_key(state_key_buf, host(hp), opts):
host(hp)
};
char content_buf[768];
json::stack out{content_buf};
json::stack::object content{out};
json::stack::array array
{
content, ""
};
json::stack::object rr0
{
array
};
json::stack::member
{
rr0, "errcode", lex_cast(code)
};
json::stack::member
{
rr0, "error", msg
};
json::stack::member
{
rr0, "ttl", json::value
{
code == 3?
long(seconds(nxdomain_ttl).count()):
long(seconds(error_ttl).count())
}
};
rr0.~object();
array.~array();
content.~object();
send(room_id, m::me, type, state_key, json::object(out.completed()));
return true;
}
bool
IRCD_MODULE_EXPORT
ircd::net::dns::cache::put(const hostport &hp,
const opts &opts,
const records &rrs)
{
const auto &type_code
{
!rrs.empty()? rrs.at(0)->type : opts.qtype
};
char type_buf[48];
const string_view type
{
make_type(type_buf, type_code)
};
char state_key_buf[rfc1035::NAME_BUF_SIZE * 2];
const string_view &state_key
{
opts.qtype == 33?
make_SRV_key(state_key_buf, host(hp), opts):
host(hp)
};
const unique_buffer<mutable_buffer> buf
{
8_KiB
};
json::stack out{buf};
json::stack::object content{out};
json::stack::array array
{
content, ""
};
if(rrs.empty())
{
// Add one object to the array with nothing except a ttl indicating no
// records (and no error) so we can cache that for the ttl. We use the
// nxdomain ttl for this value.
json::stack::object rr0{array};
json::stack::member
{
rr0, "ttl", json::value
{
long(seconds(nxdomain_ttl).count())
}
};
}
else for(const auto &record : rrs)
{
switch(record->type)
{
case 1: // A
{
json::stack::object object{array};
dynamic_cast<const rfc1035::record::A *>(record)->append(object);
continue;
}
case 5: // CNAME
{
json::stack::object object{array};
dynamic_cast<const rfc1035::record::CNAME *>(record)->append(object);
continue;
}
case 28: // AAAA
{
json::stack::object object{array};
dynamic_cast<const rfc1035::record::AAAA *>(record)->append(object);
continue;
}
case 33: // SRV
{
json::stack::object object{array};
dynamic_cast<const rfc1035::record::SRV *>(record)->append(object);
continue;
}
}
}
array.~array();
content.~object();
send(room_id, m::me, type, state_key, json::object{out.completed()});
return true;
}
bool
IRCD_MODULE_EXPORT
ircd::net::dns::cache::get(const hostport &hp,
const opts &opts,
const callback &closure)
{
char type_buf[48];
const string_view type
{
make_type(type_buf, opts.qtype)
};
char state_key_buf[rfc1035::NAME_BUF_SIZE * 2];
const string_view &state_key
{
opts.qtype == 33?
make_SRV_key(state_key_buf, host(hp), opts):
host(hp)
};
const m::room::state state
{
room_id
};
const m::event::idx &event_idx
{
state.get(std::nothrow, type, state_key)
};
if(!event_idx)
return false;
time_t origin_server_ts;
if(!m::get<time_t>(event_idx, "origin_server_ts", origin_server_ts))
return false;
bool ret{false};
const time_t ts{origin_server_ts / 1000L};
m::get(std::nothrow, event_idx, "content", [&hp, &closure, &ret, &ts]
(const json::object &content)
{
const json::array &rrs
{
content.get("")
};
// If all records are expired then skip; otherwise since this closure
// expects a single array we reveal both expired and valid records.
ret = !std::all_of(begin(rrs), end(rrs), [&ts]
(const json::object &rr)
{
return expired(rr, ts);
});
if(ret)
closure(hp, rrs);
});
return ret;
}
bool
IRCD_MODULE_EXPORT
ircd::net::dns::cache::for_each(const hostport &hp,
const opts &opts,
const closure &closure)
{
char type_buf[48];
const string_view type
{
make_type(type_buf, opts.qtype)
};
char state_key_buf[rfc1035::NAME_BUF_SIZE * 2];
const string_view &state_key
{
opts.qtype == 33?
make_SRV_key(state_key_buf, host(hp), opts):
host(hp)
};
const m::room::state state
{
room_id
};
const m::event::idx &event_idx
{
state.get(std::nothrow, type, state_key)
};
if(!event_idx)
return false;
time_t origin_server_ts;
if(!m::get<time_t>(event_idx, "origin_server_ts", origin_server_ts))
return false;
bool ret{true};
const time_t ts{origin_server_ts / 1000L};
m::get(std::nothrow, event_idx, "content", [&state_key, &closure, &ret, &ts]
(const json::object &content)
{
for(const json::object &rr : json::array(content.get("")))
{
if(expired(rr, ts))
continue;
if(!(ret = closure(state_key, rr)))
break;
}
});
return ret;
}
bool
IRCD_MODULE_EXPORT
ircd::net::dns::cache::for_each(const string_view &type,
const closure &closure)
{
char type_buf[48];
const string_view full_type
{
make_type(type_buf, type)
};
const m::room::state state
{
room_id
};
return state.for_each(full_type, [&closure]
(const string_view &, const string_view &state_key, const m::event::idx &event_idx)
{
time_t origin_server_ts;
if(!m::get<time_t>(event_idx, "origin_server_ts", origin_server_ts))
return true;
bool ret{true};
const time_t ts{origin_server_ts / 1000L};
m::get(std::nothrow, event_idx, "content", [&state_key, &closure, &ret, &ts]
(const json::object &content)
{
for(const json::object &rr : json::array(content.get("")))
{
if(expired(rr, ts))
continue;
if(!(ret = closure(state_key, rr)))
break;
}
});
return ret;
});
}
bool
ircd::net::dns::cache::expired(const json::object &rr,
const time_t &ts)
{
const auto ttl(get_ttl(rr));
return ts + ttl < ircd::time();
}
time_t
ircd::net::dns::cache::get_ttl(const json::object &rr)
{
const seconds &min_ttl_s(min_ttl);
const seconds &err_ttl_s(error_ttl);
const time_t min_ttl_t(min_ttl_s.count());
const time_t err_ttl_t(err_ttl_s.count());
const time_t rr_ttl
{
rr.get<time_t>("ttl", err_ttl_t)
};
return std::max(rr_ttl, min_ttl_t);
}
//
// cache room creation
//
namespace ircd::net::dns::cache
{
static void create_room();
extern bool room_exists;
extern const m::hookfn<m::vm::eval &> create_room_hook;
extern const ircd::run::changed create_room_hook_alt;
}
decltype(ircd::net::dns::cache::room_exists)
ircd::net::dns::cache::room_exists
{
m::exists(room_id)
};
decltype(ircd::net::dns::cache::create_room_hook)
ircd::net::dns::cache::create_room_hook
{
{
{ "_site", "vm.effect" },
{ "room_id", "!ircd" },
{ "type", "m.room.create" },
},
[](const m::event &, m::vm::eval &)
{
create_room();
}
};
/// This is for existing installations that won't catch an
/// !ircd room create and must create this room.
decltype(ircd::net::dns::cache::create_room_hook_alt)
ircd::net::dns::cache::create_room_hook_alt{[]
(const auto &level)
{
if(level != run::level::RUN || room_exists)
return;
context{[]
{
if(m::exists(m::my_room)) // if false, the other hook will succeed.
create_room();
}};
}};
void
ircd::net::dns::cache::create_room()
try
{
const m::room room
{
m::create(room_id, m::me, "internal")
};
log::debug
{
m::log, "Created '%s' for the DNS cache module.",
string_view{room.room_id}
};
}
catch(const std::exception &e)
{
log::critical
{
m::log, "Creating the '%s' room failed :%s",
string_view{room_id},
e.what()
};
}

View file

@ -8,50 +8,36 @@
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#include <ircd/asio.h>
extern ircd::mapi::header
IRCD_MODULE;
namespace ircd::net::dns
{
// Maximum number of records we present in result vector to any closure
struct waiter;
using proffer = std::function<bool (const string_view &, const string_view &, const json::array &)>;
constexpr const size_t MAX_COUNT {64};
static void handle__A(callback_A_one, std::exception_ptr, const hostport &, const records &);
static void handle__SRV(callback_SRV_one, std::exception_ptr, const hostport &, const records &);
static void handle_ipport__A(callback_ipport_one, std::exception_ptr, const hostport &, const rfc1035::record::A &);
template<class T> rfc1035::record *new_record(mutable_buffer &, const rfc1035::answer &);
void handle_resolved(std::exception_ptr, const tag &, const answers &);
void handle_cached(const m::event &, m::vm::eval &);
extern "C" void _resolve__(const hostport &, const opts &, callback);
extern "C" void _resolve__A(const hostport &, opts, callback_A_one);
extern "C" void _resolve__SRV(const hostport &, opts, callback_SRV_one);
extern "C" void _resolve_ipport(const hostport &, opts, callback_ipport_one);
extern std::list<proffer> waiting;
}
//
// s_dns_cache.cc
//
namespace ircd::net::dns::cache
{
static time_t get_ttl(const json::object &rr);
static bool expired(const json::object &rr, const time_t &ts);
extern conf::item<seconds> min_ttl;
extern conf::item<seconds> clear_nxdomain;
extern conf::item<seconds> error_ttl;
extern conf::item<seconds> nxdomain_ttl;
extern const m::room::id::buf room_id;
extern std::multimap<std::string, rfc1035::record::A, std::less<>> cache_A;
extern std::multimap<std::string, rfc1035::record::SRV, std::less<>> cache_SRV;
template<class Map>
static bool _for_each_(Map &, const closure &);
template<class Map>
static rfc1035::record *_cache_answer(Map &, const string_view &host, const rfc1035::answer &);
template<class T, class Map>
static rfc1035::record *_cache_error(Map &, const string_view &host);
extern "C" rfc1035::record *_put(const rfc1035::question &, const rfc1035::answer &);
extern "C" rfc1035::record *_put_error(const rfc1035::question &, const uint &code);
extern "C" bool _get(const hostport &, const opts &, const callback &);
extern "C" bool _for_each(const uint16_t &type, const closure &);
extern m::hookfn<m::vm::eval &> hook;
extern ctx::dock dock;
}
//
@ -63,9 +49,104 @@ namespace ircd::net::dns
// Resolver instance
struct resolver extern *resolver;
// Interface to resolver because it is not included here to avoid requiring
// boost headers (ircd/asio.h) for units other than s_dns_resolver.cc
void resolver_call(const hostport &, const opts &, callback &&);
void resolver_init();
void resolver_call(const hostport &, const opts &);
void resolver_init(answers_callback);
void resolver_fini();
}
struct ircd::net::dns::resolver
{
using header = rfc1035::header;
static conf::item<std::string> servers;
static conf::item<milliseconds> timeout;
static conf::item<milliseconds> send_rate;
static conf::item<size_t> send_burst;
static conf::item<size_t> retry_max;
answers_callback callback;
std::vector<ip::udp::endpoint> server; // The list of active servers
size_t server_next{0}; // Round-robin state to hit servers
ctx::dock dock, done;
ctx::mutex mutex;
std::map<uint16_t, tag> tags; // The active requests
steady_point send_last; // Time of last send
std::deque<uint16_t> sendq; // Queue of frames for rate-limiting
ip::udp::socket ns; // A pollable activity object
// util
void add_server(const ipport &);
void add_server(const string_view &);
void set_servers(const string_view &list);
void set_servers();
// removal (must have lock)
void unqueue(tag &);
void remove(tag &);
decltype(tags)::iterator remove(tag &, const decltype(tags)::iterator &);
void error_one(tag &, const std::exception_ptr &);
void error_one(tag &, const std::system_error &);
void error_all(const std::error_code &);
void cancel_all();
// reception
bool handle_error(const header &, tag &);
void handle_reply(const header &, const const_buffer &body, tag &);
void handle_reply(const ipport &, const header &, const const_buffer &body);
void handle(const ipport &, const mutable_buffer &);
void recv_worker();
ctx::context recv_context;
// submission
void send_query(const ip::udp::endpoint &, tag &);
void queue_query(tag &);
void send_query(tag &);
void submit(tag &);
// timeout
bool check_timeout(const uint16_t &id, tag &, const steady_point &expired);
void check_timeouts(const milliseconds &timeout);
void timeout_worker();
ctx::context timeout_context;
// sendq
void flush(const uint16_t &);
void sendq_work();
void sendq_clear();
void sendq_worker();
ctx::context sendq_context;
template<class... A> tag &set_tag(A&&...);
const_buffer make_query(const mutable_buffer &buf, tag &);
uint16_t operator()(const hostport &, const opts &);
resolver(answers_callback);
~resolver() noexcept;
};
struct ircd::net::dns::tag
{
uint16_t id {0};
hostport hp;
dns::opts opts; // note: invalid after query sent
const_buffer question;
steady_point last {steady_point::min()};
uint8_t tries {0};
uint rcode {0};
ipport server;
char hostbuf[rfc1035::NAME_BUF_SIZE];
char qbuf[512];
tag(const hostport &, const dns::opts &);
tag(tag &&) = delete;
tag(const tag &) = delete;
};
inline
ircd::net::dns::tag::tag(const hostport &hp,
const dns::opts &opts)
:hp{hp}
,opts{opts}
{
this->hp.host = { hostbuf, copy(hostbuf, hp.host) };
}

View file

@ -1,365 +0,0 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#include "s_dns.h"
decltype(ircd::net::dns::cache::clear_nxdomain)
ircd::net::dns::cache::clear_nxdomain
{
{ "name", "ircd.net.dns.cache.clear_nxdomain" },
{ "default", 43200L },
};
decltype(ircd::net::dns::cache::min_ttl)
ircd::net::dns::cache::min_ttl
{
{ "name", "ircd.net.dns.cache.min_ttl" },
{ "default", 900L },
};
decltype(ircd::net::dns::cache::room_id)
ircd::net::dns::cache::room_id
{
"dns", my_host()
};
decltype(ircd::net::dns::cache::cache_A)
ircd::net::dns::cache::cache_A;
decltype(ircd::net::dns::cache::cache_SRV)
ircd::net::dns::cache::cache_SRV;
bool
ircd::net::dns::cache::_for_each(const uint16_t &type,
const closure &closure)
{
switch(type)
{
case 1: // A
return _for_each_(cache_A, closure);
case 33: // SRV
return _for_each_(cache_SRV, closure);
default:
return true;
}
}
/// This function has an opportunity to respond from the DNS cache. If it
/// returns true, that indicates it responded by calling back the user and
/// nothing further should be done for them. If it returns false, that
/// indicates it did not respond and to proceed normally. The response can
/// be of a cached successful result, or a cached error. Both will return
/// true.
bool
ircd::net::dns::cache::_get(const hostport &hp,
const opts &opts,
const callback &cb)
{
// It's no use putting the result record array on the stack in case this
// function is either called from an ircd::ctx or calls back an ircd::ctx.
// If the ctx yields the records can still be evicted from the cache.
// It's better to just force the user to conform here rather than adding
// ref counting and other pornographic complications to this cache.
const ctx::critical_assertion ca;
thread_local std::array<const rfc1035::record *, MAX_COUNT> record;
std::exception_ptr eptr;
size_t count{0};
if(opts.qtype == 33) // deduced SRV query
{
assert(!empty(host(hp)));
thread_local char srvbuf[512];
const string_view srvhost
{
make_SRV_key(srvbuf, hp, opts)
};
auto &map{cache_SRV};
const auto pit{map.equal_range(srvhost)};
if(pit.first == pit.second)
return false;
const auto &now{ircd::time()};
for(auto it(pit.first); it != pit.second; )
{
const auto &rr{it->second};
// Cached entry is too old, ignore and erase
if(rr.ttl < now)
{
it = map.erase(it);
continue;
}
// Cached entry is a cached error, we set the eptr, but also
// include the record and increment the count like normal.
if((!rr.tgt || !rr.port) && opts.nxdomain_exceptions && !eptr)
{
//TODO: we don't cache what the error was, assuming it's
//TODO: NXDomain can be incorrect and in bad ways downstream...
static const auto rcode{3}; //NXDomain
eptr = make_exception_ptr<rfc1035::error>
(
"protocol error #%u (cached) :%s",
rcode,
rfc1035::rcode.at(rcode)
);
}
if(count < record.size())
record.at(count++) = &rr;
++it;
}
}
else if(opts.qtype == 1)
{
auto &map{cache_A};
const auto &key{rstrip(host(hp), '.')};
if(unlikely(empty(key)))
return false;
const auto pit{map.equal_range(key)};
if(pit.first == pit.second)
return false;
const auto &now{ircd::time()};
for(auto it(pit.first); it != pit.second; )
{
const auto &rr{it->second};
// Cached entry is too old, ignore and erase
if(rr.ttl < now)
{
it = map.erase(it);
continue;
}
// Cached entry is a cached error, we set the eptr, but also
// include the record and increment the count like normal.
if(!rr.ip4 && !eptr)
{
//TODO: we don't cache what the error was, assuming it's
//TODO: NXDomain can be incorrect and in bad ways downstream...
static const auto rcode{3}; //NXDomain
eptr = make_exception_ptr<rfc1035::error>
(
"protocol error #%u (cached) :%s",
rcode,
rfc1035::rcode.at(rcode)
);
}
if(count < record.size())
record.at(count++) = &rr;
++it;
}
}
assert(count || !eptr); // no error if no cache response
assert(!eptr || count == 1); // if error, should only be one entry.
if(count)
cb(std::move(eptr), hp, vector_view<const rfc1035::record *>(record.data(), count));
return count;
}
ircd::rfc1035::record *
ircd::net::dns::cache::_put(const rfc1035::question &question,
const rfc1035::answer &answer)
{
const auto &host
{
rstrip(question.name, '.')
};
assert(!empty(host));
switch(answer.qtype)
{
case 1: // A
return _cache_answer(cache_A, host, answer);
case 33: // SRV
return _cache_answer(cache_SRV, host, answer);
default:
return nullptr;
}
}
ircd::rfc1035::record *
ircd::net::dns::cache::_put_error(const rfc1035::question &question,
const uint &code)
{
const auto &host
{
rstrip(question.name, '.')
};
assert(!empty(host));
switch(question.qtype)
{
case 1: // A
return _cache_error<rfc1035::record::A>(cache_A, host);
case 33: // SRV
return _cache_error<rfc1035::record::SRV>(cache_SRV, host);
default:
return nullptr;
}
}
template<class Map>
ircd::rfc1035::record *
ircd::net::dns::cache::_cache_answer(Map &map,
const string_view &host,
const rfc1035::answer &answer)
{
auto pit
{
map.equal_range(host)
};
auto it(pit.first);
while(it != pit.second)
{
const auto &rr{it->second};
if(rr == answer)
it = map.erase(it);
else
++it;
}
const auto &iit
{
map.emplace_hint(it, host, answer)
};
return &iit->second;
}
template<class T,
class Map>
ircd::rfc1035::record *
ircd::net::dns::cache::_cache_error(Map &map,
const string_view &host)
{
auto pit
{
map.equal_range(host)
};
auto it
{
pit.first != pit.second?
map.erase(pit.first, pit.second):
pit.first
};
T record;
record.ttl = ircd::time() + seconds(dns::cache::clear_nxdomain).count(); //TODO: code
it = map.emplace_hint(it, host, record);
return &it->second;
}
template<class Map>
bool
ircd::net::dns::cache::_for_each_(Map &map,
const closure &closure)
{
for(const auto &pair : map)
{
const auto &host(pair.first);
const auto &record(pair.second);
if(!closure(host, record))
return false;
}
return true;
}
//
// cache room creation
//
namespace ircd::net::dns::cache
{
static void create_room();
extern bool room_exists;
extern const m::hookfn<m::vm::eval &> create_room_hook;
extern const ircd::run::changed create_room_hook_alt;
}
decltype(ircd::net::dns::cache::room_exists)
ircd::net::dns::cache::room_exists
{
m::exists(room_id)
};
decltype(ircd::net::dns::cache::create_room_hook)
ircd::net::dns::cache::create_room_hook
{
{
{ "_site", "vm.effect" },
{ "room_id", "!ircd" },
{ "type", "m.room.create" },
},
[](const m::event &, m::vm::eval &)
{
create_room();
}
};
/// This is for existing installations that won't catch an
/// !ircd room create and must create this room.
decltype(ircd::net::dns::cache::create_room_hook_alt)
ircd::net::dns::cache::create_room_hook_alt{[]
(const auto &level)
{
if(level != run::level::RUN || room_exists)
return;
context{[]
{
if(m::exists(m::my_room)) // if false, the other hook will succeed.
create_room();
}};
}};
void
ircd::net::dns::cache::create_room()
try
{
const m::room room
{
m::create(room_id, m::me, "internal")
};
log::debug
{
m::log, "Created '%s' for the DNS cache module.",
string_view{room.room_id}
};
}
catch(const std::exception &e)
{
log::critical
{
m::log, "Creating the '%s' room failed :%s",
string_view{room_id},
e.what()
};
}

View file

@ -8,9 +8,7 @@
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#include <ircd/asio.h>
#include "s_dns.h"
#include "s_dns_resolver.h"
decltype(ircd::net::dns::resolver)
ircd::net::dns::resolver;
@ -61,10 +59,13 @@ ircd::net::dns::resolver::retry_max
//
void
ircd::net::dns::resolver_init()
ircd::net::dns::resolver_init(answers_callback callback)
{
assert(!ircd::net::dns::resolver);
ircd::net::dns::resolver = new typename ircd::net::dns::resolver{};
ircd::net::dns::resolver = new typename ircd::net::dns::resolver
{
std::move(callback)
};
}
void
@ -76,8 +77,7 @@ ircd::net::dns::resolver_fini()
void
ircd::net::dns::resolver_call(const hostport &hp,
const opts &opts,
callback &&cb)
const opts &opts)
{
if(unlikely(!resolver))
throw error
@ -94,18 +94,25 @@ ircd::net::dns::resolver_call(const hostport &hp,
host(hp)
};
resolver(hp, opts, std::move(cb));
resolver(hp, opts);
}
//
// resolver::resolver
//
ircd::net::dns::resolver::resolver()
:ns{ios::get()}
,reply
ircd::net::dns::resolver::resolver(answers_callback callback)
:callback
{
64_KiB // worst-case UDP datagram size
std::move(callback)
}
,ns
{
ios::get()
}
,recv_context
{
"dnsres R", 128_KiB, std::bind(&resolver::recv_worker, this), context::POST
}
,timeout_context
{
@ -119,30 +126,118 @@ ircd::net::dns::resolver::resolver()
ns.open(ip::udp::v4());
ns.non_blocking(true);
set_servers();
set_handle();
}
ircd::net::dns::resolver::~resolver()
noexcept
{
ns.close();
sendq_context.terminate();
timeout_context.terminate();
while(!tags.empty())
{
log::warning
{
log, "Waiting for %zu unfinished DNS resolutions", tags.size()
};
if(ns.is_open())
ns.close();
ctx::sleep(3);
}
timeout_context.terminate();
sendq_context.terminate();
recv_context.terminate();
done.wait([this]
{
const bool ret(tags.empty());
if(!ret)
log::warning
{
log, "Waiting for %zu unfinished DNS resolutions", tags.size()
};
return ret;
});
assert(tags.empty());
}
__attribute__((noreturn))
/// Internal resolver entry interface.
uint16_t
ircd::net::dns::resolver::operator()(const hostport &hp,
const opts &opts)
{
auto &tag(set_tag(hp, opts)); try
{
tag.question = make_query(tag.qbuf, tag);
tag.hp.host = strlcpy(tag.hostbuf, host(hp));
tag.hp.service = {};
submit(tag);
}
catch(...)
{
remove(tag);
throw;
}
return tag.id;
}
ircd::const_buffer
ircd::net::dns::resolver::make_query(const mutable_buffer &buf,
tag &tag)
{
thread_local char hostbuf[rfc1035::NAME_BUF_SIZE * 2];
string_view hoststr;
switch(tag.opts.qtype)
{
case 0: throw error
{
"A query type is required to form a question."
};
case 33: // SRV
hoststr = make_SRV_key(hostbuf, host(tag.hp), tag.opts);
break;
default:
hoststr = host(tag.hp);
break;
}
assert(hoststr);
assert(tag.opts.qtype);
const rfc1035::question question
{
hoststr, tag.opts.qtype
};
return rfc1035::make_query(buf, tag.id, question);
}
template<class... A>
ircd::net::dns::tag &
ircd::net::dns::resolver::set_tag(A&&... args)
{
while(tags.size() < 65535)
{
auto id
{
ircd::rand::integer(1, 65535)
};
auto it{tags.lower_bound(id)};
if(it != end(tags) && it->first == id)
continue;
it = tags.emplace_hint(it,
std::piecewise_construct,
std::forward_as_tuple(id),
std::forward_as_tuple(std::forward<A>(args)...));
it->second.id = id;
dock.notify_one();
return it->second;
}
throw panic
{
"Too many DNS queries"
};
}
void
__attribute__((noreturn))
ircd::net::dns::resolver::sendq_worker()
{
while(1)
@ -163,6 +258,11 @@ ircd::net::dns::resolver::sendq_worker()
void
ircd::net::dns::resolver::sendq_work()
{
const std::lock_guard lock
{
mutex
};
assert(!sendq.empty());
assert(sendq.size() < 65535);
assert(sendq.size() <= tags.size());
@ -207,30 +307,18 @@ try
}
catch(const ctx::terminated &)
{
cancel_all_tags();
}
void
ircd::net::dns::resolver::cancel_all_tags()
{
static const std::system_error ec
{
make_error_code(std::errc::operation_canceled)
};
if(!tags.empty())
log::dwarning
{
log, "Attempting to cancel all %zu pending tags.", tags.size()
};
for(auto &p : tags)
post_error(p.second, ec);
const ctx::exception_handler eh;
cancel_all();
}
void
ircd::net::dns::resolver::check_timeouts(const milliseconds &timeout)
{
const std::lock_guard lock
{
mutex
};
const auto cutoff
{
now<steady_point>() - timeout
@ -280,185 +368,13 @@ ircd::net::dns::resolver::check_timeout(const uint16_t &id,
make_error_code(std::errc::timed_out)
};
post_error(tag, ec);
error_one(tag, ec);
return true;
}
void
ircd::net::dns::resolver::post_error(tag &tag,
const std::system_error &ec)
{
// We ignore this request to post an error here if the tag has no callback
// function set. Nulling the callback is used as hack-hoc state to indicate
// that something else has called back the user and will unmap this tag so
// there's no reason for us to post this.
if(!tag.cb)
return;
auto handler
{
std::bind(&resolver::handle_post_error, this, tag.id, std::ref(tag), std::cref(ec))
};
// Callback gets a fresh stack off this timeout worker ctx's stack.
ircd::post(std::move(handler));
}
void
ircd::net::dns::resolver::handle_post_error(const uint16_t id,
tag &tag,
const std::system_error &ec)
{
// Have to check if the tag is still mapped at this point. It may
// have been removed if a belated reply came in while this closure
// was posting. If so, that's good news and we bail on the timeout.
if(!tags.count(id))
return;
log::error
{
log, "DNS error id:%u for '%s' :%s",
id,
string(tag.hp),
string(ec)
};
assert(tag.cb);
tag.cb(std::make_exception_ptr(ec), tag.hp, {});
remove(tag);
}
/// Internal resolver entry interface.
void
ircd::net::dns::resolver::operator()(const hostport &hp,
const opts &opts,
callback &&callback)
{
auto &tag(set_tag(hp, opts, std::move(callback))); try
{
tag.question = make_query(tag.qbuf, tag);
submit(tag);
}
catch(...)
{
remove(tag);
throw;
}
}
ircd::const_buffer
ircd::net::dns::resolver::make_query(const mutable_buffer &buf,
const tag &tag)
{
thread_local char hostbuf[rfc1035::NAME_BUF_SIZE * 2];
string_view hoststr;
switch(tag.opts.qtype)
{
case 0: throw error
{
"A query type is required to form a question."
};
case 33: // SRV
{
hoststr = make_SRV_key(hostbuf, host(tag.hp), tag.opts);
break;
}
default:
hoststr = host(tag.hp);
break;
}
assert(hoststr);
assert(tag.opts.qtype);
const rfc1035::question question
{
hoststr, tag.opts.qtype
};
return rfc1035::make_query(buf, tag.id, question);
}
template<class... A>
ircd::net::dns::resolver::tag &
ircd::net::dns::resolver::set_tag(A&&... args)
{
while(tags.size() < 65535)
{
auto id(ircd::rand::integer(1, 65535));
auto it{tags.lower_bound(id)};
if(it != end(tags) && it->first == id)
continue;
it = tags.emplace_hint(it,
std::piecewise_construct,
std::forward_as_tuple(id),
std::forward_as_tuple(std::forward<A>(args)...));
it->second.id = id;
dock.notify_one();
return it->second;
}
throw panic
{
"Too many DNS queries"
};
}
void
ircd::net::dns::resolver::remove(tag &tag)
{
remove(tag, tags.find(tag.id));
}
decltype(ircd::net::dns::resolver::tags)::iterator
ircd::net::dns::resolver::remove(tag &tag,
const decltype(tags)::iterator &it)
{
log::debug
{
log, "dns tag:%u t:%u qtype:%u removing (tags:%zu sendq:%zu)",
tag.id,
tag.tries,
tag.opts.qtype,
tags.size(),
sendq.size()
};
unqueue(tag);
return it != end(tags)? tags.erase(it) : it;
}
void
ircd::net::dns::resolver::unqueue(tag &tag)
{
const auto it
{
std::find(begin(sendq), end(sendq), tag.id)
};
if(it != end(sendq))
sendq.erase(it);
}
void
ircd::net::dns::resolver::queue_query(tag &tag)
{
assert(sendq.size() <= tags.size());
sendq.emplace_back(tag.id);
dock.notify_one();
log::debug
{
log, "dns tag:%u t:%u qtype:%u added to sendq (tags:%zu sendq:%zu)",
tag.id,
tag.tries,
tag.opts.qtype,
tags.size(),
sendq.size()
};
}
//
// submit
//
void
ircd::net::dns::resolver::submit(tag &tag)
@ -518,6 +434,24 @@ catch(const std::out_of_range &)
};
}
void
ircd::net::dns::resolver::queue_query(tag &tag)
{
assert(sendq.size() <= tags.size());
sendq.emplace_back(tag.id);
dock.notify_one();
log::debug
{
log, "dns tag:%u t:%u qtype:%u added to sendq (tags:%zu sendq:%zu)",
tag.id,
tag.tries,
tag.opts.qtype,
tags.size(),
sendq.size()
};
}
void
ircd::net::dns::resolver::send_query(const ip::udp::endpoint &ep,
tag &tag)
@ -537,47 +471,87 @@ ircd::net::dns::resolver::send_query(const ip::udp::endpoint &ep,
tag.tries++;
}
//
// recv
//
void
ircd::net::dns::resolver::set_handle()
ircd::net::dns::resolver::recv_worker()
try
{
auto handler
const unique_buffer<mutable_buffer> buf
{
std::bind(&resolver::handle, this, ph::_1, ph::_2)
64_KiB
};
const asio::mutable_buffers_1 bufs{reply};
ns.async_receive_from(bufs, reply_from, std::move(handler));
const auto interruption{[this]
(ctx::ctx *const &)
{
if(this->ns.is_open())
this->ns.cancel();
}};
const asio::mutable_buffers_1 bufs{buf};
ip::udp::endpoint ep;
while(ns.is_open()) try
{
size_t recv; continuation
{
continuation::asio_predicate, interruption, [this, &bufs, &recv, &ep]
(auto &yield)
{
recv = ns.async_receive_from(bufs, ep, yield);
}
};
const mutable_buffer &reply
{
data(buf), recv
};
const net::ipport &from
{
make_ipport(ep)
};
handle(from, reply);
}
catch(const boost::system::system_error &e)
{
switch(make_error_code(e).value())
{
case int(std::errc::operation_canceled):
break;
default:
throw;
}
}
}
catch(const std::exception &e)
{
log::error
{
log, "%s", e.what()
};
}
void
ircd::net::dns::resolver::handle(const error_code &ec,
const size_t &bytes)
noexcept try
ircd::net::dns::resolver::handle(const ipport &from,
const mutable_buffer &buf)
try
{
if(!handle_error(ec))
return;
const unwind reset{[this]
{
set_handle();
}};
if(unlikely(bytes < sizeof(rfc1035::header)))
if(unlikely(size(buf) < sizeof(rfc1035::header)))
throw rfc1035::error
{
"Got back %zu bytes < rfc1035 %zu byte header",
bytes,
size(buf),
sizeof(rfc1035::header)
};
char *const reply
{
data(this->reply)
};
rfc1035::header &header
{
*reinterpret_cast<rfc1035::header *>(reply)
*reinterpret_cast<rfc1035::header *>(data(buf))
};
bswap(&header.qdcount);
@ -587,41 +561,51 @@ noexcept try
const const_buffer body
{
reply + sizeof(header), bytes - sizeof(header)
data(buf) + sizeof(header), size(buf) - sizeof(header)
};
handle_reply(header, body);
handle_reply(from, header, body);
}
catch(const std::exception &e)
{
throw panic
log::error
{
"resolver::handle_reply(): %s", e.what()
log, "%s", e.what()
};
}
void
ircd::net::dns::resolver::handle_reply(const header &header,
ircd::net::dns::resolver::handle_reply(const ipport &from,
const header &header,
const const_buffer &body)
try
{
thread_local char addr_strbuf[2][128];
const std::lock_guard lock
{
// The primary mutex is locked here while this result is
// processed. This locks out the sendq and timeout worker.
mutex
};
const auto it
{
tags.find(header.id)
};
const auto it{tags.find(header.id)};
if(it == end(tags))
throw error
{
"DNS reply from %s for unrecognized tag id:%u",
string(addr_strbuf[0], reply_from),
string(addr_strbuf[0], from),
header.id
};
auto &tag{it->second};
if(make_ipport(reply_from) != tag.server)
if(from != tag.server)
throw error
{
"DNS reply from %s for tag:%u which we sent to %s",
string(addr_strbuf[0], reply_from),
string(addr_strbuf[0], from),
header.id,
string(addr_strbuf[1], tag.server)
};
@ -634,7 +618,7 @@ try
log::debug
{
log, "dns %s recv tag:%u t:%u qtype:%u qd:%u an:%u ns:%u ar:%u",
string(addr_strbuf[0], make_ipport(reply_from)),
string(addr_strbuf[0], from),
tag.id,
tag.tries,
tag.opts.qtype,
@ -646,17 +630,9 @@ try
assert(tag.tries > 0);
tag.last = steady_point::min();
tag.rcode = header.rcode;
handle_reply(header, body, tag);
}
catch(const std::exception &e)
{
log::error
{
log, "%s", e.what()
};
return;
}
void
ircd::net::dns::resolver::handle_reply(const header &header,
@ -676,6 +652,20 @@ try
"Response contains too many sections..."
};
if(header.qdcount < 1)
throw error
{
"Response does not contain the question."
};
if(!handle_error(header, tag))
throw rfc1035::error
{
"protocol error #%u :%s",
header.rcode,
rfc1035::rcode.at(header.rcode)
};
const_buffer buffer
{
body
@ -686,118 +676,22 @@ try
for(size_t i(0); i < header.qdcount; ++i)
consume(buffer, size(qd.at(i).parse(buffer)));
if(!handle_error(header, qd.at(0), tag))
throw rfc1035::error
{
"protocol error #%u :%s", header.rcode, rfc1035::rcode.at(header.rcode)
};
// Answers are parsed into this buffer
thread_local std::array<rfc1035::answer, MAX_COUNT> an;
for(size_t i(0); i < header.ancount; ++i)
consume(buffer, size(an.at(i).parse(buffer)));
if(tag.opts.cache_result)
const vector_view<const rfc1035::answer> answers
{
// We convert all TTL values in the answers to absolute epoch time
// indicating when they expire. This makes more sense for our caches.
const auto &now{ircd::time()};
for(size_t i(0); i < header.ancount; ++i)
{
const uint &min_ttl(seconds(cache::min_ttl).count());
an.at(i).ttl = now + std::max(an.at(i).ttl, min_ttl);
}
}
an.data(), header.ancount
};
// The callback to the user will be passed a vector_view of pointers
// to this array. The actual record instances will either be located
// in the cache map or placement-newed to the buffer below.
thread_local std::array<const rfc1035::record *, MAX_COUNT> record;
// This will be where we place the record instances which are dynamically
// laid out and sized types. 512 bytes is assumed as a soft maximum for
// each RR instance.
static const size_t recsz(512);
thread_local uint8_t recbuf[recsz * MAX_COUNT];
size_t i(0);
uint8_t *pos{recbuf};
for(; i < header.ancount; ++i) switch(an.at(i).qtype)
{
case 1: // A records are inserted into cache
{
using type = rfc1035::record::A;
if(!tag.opts.cache_result)
{
if(unlikely(pos + sizeof(type) > recbuf + sizeof(recbuf)))
break;
record.at(i) = new (pos) type(an.at(i));
pos += sizeof(type);
continue;
}
record.at(i) = cache::put(qd.at(0), an.at(i));
continue;
}
case 5:
{
using type = rfc1035::record::CNAME;
if(unlikely(pos + sizeof(type) > recbuf + sizeof(recbuf)))
break;
record.at(i) = new (pos) type(an.at(i));
pos += sizeof(type);
continue;
}
case 33:
{
using type = rfc1035::record::SRV;
if(!tag.opts.cache_result)
{
if(unlikely(pos + sizeof(type) > recbuf + sizeof(recbuf)))
break;
record.at(i) = new (pos) type(an.at(i));
pos += sizeof(type);
continue;
}
record.at(i) = cache::put(qd.at(0), an.at(i));
continue;
}
default:
{
using type = rfc1035::record;
if(unlikely(pos + sizeof(type) > recbuf + sizeof(recbuf)))
break;
record.at(i) = new (pos) type(an.at(i));
pos += sizeof(type);
continue;
}
}
// Cache no answers here.
if(!header.ancount && tag.opts.cache_result)
cache::put_error(qd.at(0), header.rcode);
if(tag.cb)
{
static const std::exception_ptr no_exception{};
const vector_view<const rfc1035::record *> records
{
record.data(), i
};
tag.cb(no_exception, tag.hp, records);
}
callback({}, tag, answers);
}
catch(const std::exception &e)
{
const ctx::exception_handler eh;
// There's no need to flash red to the log for NXDOMAIN which is
// common in this system when probing SRV.
if(unlikely(header.rcode != 3))
@ -808,16 +702,12 @@ catch(const std::exception &e)
e.what()
};
if(tag.cb)
{
assert(header.rcode != 3 || tag.opts.nxdomain_exceptions);
tag.cb(std::current_exception(), tag.hp, {});
}
assert(header.rcode != 3 || tag.opts.nxdomain_exceptions);
callback(std::current_exception(), tag, answers{});
}
bool
ircd::net::dns::resolver::handle_error(const header &header,
const rfc1035::question &question,
tag &tag)
{
switch(header.rcode)
@ -826,55 +716,122 @@ ircd::net::dns::resolver::handle_error(const header &header,
return true;
case 3: // NXDomain; exception
{
if(!tag.opts.cache_result)
return false;
const auto *record
if(!tag.opts.nxdomain_exceptions)
{
cache::put_error(question, header.rcode)
};
// When the user doesn't want an eptr for nxdomain we just make
// their callback here and then null the cb pointer so it's not
// called again. It is done here because we have a reference to
// the cached error record readily accessible.
if(!tag.opts.nxdomain_exceptions && tag.cb)
{
assert(record);
tag.cb({}, tag.hp, vector_view<const rfc1035::record *>(&record, 1));
tag.cb = {};
callback({}, tag, answers{});
return true;
}
return false;
}
default: // Unhandled error; exception
return false;
}
}
bool
ircd::net::dns::resolver::handle_error(const error_code &ec)
const
//
// removal
//
// This whole stack must be called under lock
//
void
ircd::net::dns::resolver::cancel_all()
{
using std::errc;
if(system_category(ec)) switch(ec.value())
static const std::error_code &ec
{
case 0:
return true;
make_error_code(std::errc::operation_canceled)
};
case int(errc::operation_canceled):
return false;
default:
break;
}
throw std::system_error{ec};
error_all(ec);
}
void
ircd::net::dns::resolver::error_all(const std::error_code &ec)
{
if(tags.empty())
return;
log::dwarning
{
log, "Attempting to cancel all %zu pending tags.", tags.size()
};
const auto eptr
{
make_system_eptr(ec)
};
for(auto &p : tags)
error_one(p.second, eptr);
}
void
ircd::net::dns::resolver::error_one(tag &tag,
const std::system_error &se)
{
error_one(tag, std::make_exception_ptr(se));
}
void
ircd::net::dns::resolver::error_one(tag &tag,
const std::exception_ptr &eptr)
{
thread_local char hpbuf[128];
log::error
{
log, "DNS error id:%u for '%s' :%s",
tag.id,
string(hpbuf, tag.hp),
what(eptr)
};
static const answers empty;
callback(eptr, tag, empty);
remove(tag);
}
void
ircd::net::dns::resolver::remove(tag &tag)
{
remove(tag, tags.find(tag.id));
}
decltype(ircd::net::dns::resolver::tags)::iterator
ircd::net::dns::resolver::remove(tag &tag,
const decltype(tags)::iterator &it)
{
log::debug
{
log, "dns tag:%u t:%u qtype:%u removing (tags:%zu sendq:%zu)",
tag.id,
tag.tries,
tag.opts.qtype,
tags.size(),
sendq.size()
};
unqueue(tag);
done.notify_all();
return it != end(tags)? tags.erase(it) : it;
}
void
ircd::net::dns::resolver::unqueue(tag &tag)
{
const auto it
{
std::find(begin(sendq), end(sendq), tag.id)
};
if(it != end(sendq))
sendq.erase(it);
}
//
// util
//
void
ircd::net::dns::resolver::set_servers()
try

View file

@ -1,101 +0,0 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
struct ircd::net::dns::resolver
{
struct tag;
using header = rfc1035::header;
static conf::item<std::string> servers;
static conf::item<milliseconds> timeout;
static conf::item<milliseconds> send_rate;
static conf::item<size_t> send_burst;
static conf::item<size_t> retry_max;
std::vector<ip::udp::endpoint> server; // The list of active servers
size_t server_next{0}; // Round-robin state to hit servers
ctx::dock dock;
std::map<uint16_t, tag> tags; // The active requests
steady_point send_last; // Time of last send
std::deque<uint16_t> sendq; // Queue of frames for rate-limiting
ip::udp::socket ns; // A pollable activity object
ip::udp::endpoint reply_from; // Remote addr of recv
unique_buffer<mutable_buffer> reply; // Buffer for recv
void add_server(const ipport &);
void add_server(const string_view &);
void set_servers(const string_view &list);
void set_servers();
bool handle_error(const error_code &ec) const;
bool handle_error(const header &, const rfc1035::question &, tag &);
void handle_reply(const header &, const const_buffer &body, tag &);
void handle_reply(const header &, const const_buffer &body);
void handle(const error_code &ec, const size_t &) noexcept;
void set_handle();
void send_query(const ip::udp::endpoint &, tag &);
void queue_query(tag &);
void send_query(tag &);
void submit(tag &);
void unqueue(tag &);
void remove(tag &);
decltype(tags)::iterator remove(tag &, const decltype(tags)::iterator &);
template<class... A> tag &set_tag(A&&...);
static const_buffer make_query(const mutable_buffer &buf, const tag &);
void operator()(const hostport &, const opts &, callback &&);
void handle_post_error(const uint16_t id, tag &, const std::system_error &);
void post_error(tag &, const std::system_error &ec);
bool check_timeout(const uint16_t &id, tag &, const steady_point &expired);
void check_timeouts(const milliseconds &timeout);
void cancel_all_tags();
void timeout_worker();
ctx::context timeout_context;
void flush(const uint16_t &);
void sendq_work();
void sendq_clear();
void sendq_worker();
ctx::context sendq_context;
resolver();
~resolver() noexcept;
};
struct ircd::net::dns::resolver::tag
{
uint16_t id {0};
hostport hp;
dns::opts opts; // note: invalid after query sent
const_buffer question;
callback cb;
steady_point last {steady_point::min()};
uint8_t tries {0};
ipport server;
char hostbuf[rfc1035::NAME_BUF_SIZE];
char qbuf[384];
tag(const hostport &, const dns::opts &, callback &&);
tag(tag &&) = delete;
tag(const tag &) = delete;
};
inline
ircd::net::dns::resolver::tag::tag(const hostport &hp,
const dns::opts &opts,
callback &&cb)
:hp{hp}
,opts{opts}
,cb{std::move(cb)}
{
this->hp.host = { hostbuf, copy(hostbuf, hp.host) };
}