0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-15 14:31:11 +01:00

ircd::net: Integrate SRV query composition; RFC1035 records to the user interface.

This commit is contained in:
Jason Volk 2018-02-03 13:15:55 -08:00
parent b00129071e
commit 565a760255
3 changed files with 241 additions and 185 deletions

View file

@ -23,29 +23,50 @@ namespace ircd::net
/// ///
struct ircd::net::dns struct ircd::net::dns
{ {
struct opts;
struct resolver; struct resolver;
struct resolver static *resolver; struct resolver static *resolver;
struct opts static const opts_default;
public: public:
enum flag :uint; using callback = std::function<void (std::exception_ptr, vector_view<const rfc1035::record *>)>;
using callback_A_one = std::function<void (std::exception_ptr, const rfc1035::record::A &)>;
using callback_one = std::function<void (std::exception_ptr, const ipport &)>; using callback_SRV_one = std::function<void (std::exception_ptr, const rfc1035::record::SRV &)>;
using callback_many = std::function<void (std::exception_ptr, vector_view<const ipport>)>; using callback_ipport_one = std::function<void (std::exception_ptr, const ipport &)>;
using callback_reverse = std::function<void (std::exception_ptr, const string_view &)>;
// Callback-based interface // Callback-based interface
void operator()(const ipport &, callback_reverse); void operator()(const hostport &, const opts &, callback);
void operator()(const hostport &, callback_many); void operator()(const hostport &, const opts &, callback_A_one);
void operator()(const hostport &, callback_one); void operator()(const hostport &, const opts &, callback_SRV_one);
void operator()(const hostport &, const opts &, callback_ipport_one);
// Future-based interface // Callback-based interface (default options)
ctx::future<ipport> operator()(const hostport &); template<class Callback> void operator()(const hostport &, Callback&&);
ctx::future<std::string> operator()(const ipport &);
}; };
enum ircd::net::dns::flag /// DNS resolution options
:uint struct ircd::net::dns::opts
{ {
/// Overrides the SRV query to make for this resolution. If empty an
/// SRV query may still be made from other deductions. This string is
/// copied at the start of the resolution. It must be a fully qualified
/// SRV query string: Example: "_matrix._tcp."
string_view srv;
/// Specifies the SRV protocol part when deducing the SRV query to be
/// made. This is used when the net::hostport.service string is just
/// the name of the service like "matrix" so we then use this value
/// to build the protocol part of the SRV query string. It is ignored
/// if `srv` is set or if no service is specified (thus no SRV query is
/// made in the first place).
string_view proto{"tcp"};
}; };
template<class Callback>
void
ircd::net::dns::operator()(const hostport &hostport,
Callback&& callback)
{
operator()(hostport, opts_default, std::forward<Callback>(callback));
}

View file

@ -30,8 +30,8 @@ struct ircd::net::dns::resolver
std::map<uint16_t, tag> tags; // The active requests std::map<uint16_t, tag> tags; // The active requests
ip::udp::socket ns; // A pollable activity object ip::udp::socket ns; // A pollable activity object
ip::udp::endpoint reply_from; ip::udp::endpoint reply_from; // Remote addr of recv
char reply[64_KiB] alignas(16); char reply[64_KiB] alignas(16); // Buffer for recv
bool handle_error(const error_code &ec) const; bool handle_error(const error_code &ec) const;
void handle_reply(const header &, const const_buffer &body, tag &); void handle_reply(const header &, const const_buffer &body, tag &);
@ -42,8 +42,9 @@ struct ircd::net::dns::resolver
void send_query(const ip::udp::endpoint &, const const_buffer &); void send_query(const ip::udp::endpoint &, const const_buffer &);
void send_query(const const_buffer &); void send_query(const const_buffer &);
void operator()(const hostport &, const flag &, callback_many); tag &set_tag(tag &&);
void operator()(const ipport &, const flag &, callback_reverse); const_buffer make_query(const mutable_buffer &buf, const tag &) const;
void operator()(const hostport &, const opts &, callback);
bool check_timeout(const uint16_t &id, tag &, const steady_point &now); bool check_timeout(const uint16_t &id, tag &, const steady_point &now);
void check_timeouts(); void check_timeouts();
@ -56,25 +57,21 @@ struct ircd::net::dns::resolver
struct ircd::net::dns::resolver::tag struct ircd::net::dns::resolver::tag
{ {
uint16_t id {0};
hostport hp; hostport hp;
ipport ipp; dns::opts opts;
flag flags; callback cb;
callback_many cb_many;
callback_reverse cb_reverse;
steady_point last {ircd::now<steady_point>()}; steady_point last {ircd::now<steady_point>()};
uint8_t tries {0}; uint8_t tries {0};
void set_exception(std::exception_ptr &&); tag(const hostport &, const dns::opts &, callback);
tag(const hostport &hp, const flag &flags, callback_many cb_many)
:hp{hp}
,flags{flags}
,cb_many{std::move(cb_many)}
{}
tag(const ipport &ipp, const flag &flags, callback_reverse cb_reverse)
:ipp{ipp}
,flags{flags}
,cb_reverse{std::move(cb_reverse)}
{}
}; };
inline
ircd::net::dns::resolver::tag::tag(const hostport &hp,
const dns::opts &opts,
callback cb)
:hp{hp}
,opts{opts}
,cb{std::move(cb)}
{}

View file

@ -2147,86 +2147,97 @@ decltype(ircd::net::dns::resolver)
ircd::net::dns::resolver ircd::net::dns::resolver
{}; {};
/// Resolve a numerical address to a hostname string. This is a PTR record /// Linkage for default opts
/// query or 'reverse DNS' lookup. decltype(ircd::net::dns::opts_default)
ircd::ctx::future<std::string> ircd::net::dns::opts_default
ircd::net::dns::operator()(const ipport &ipport) {};
{
ctx::promise<std::string> p;
ctx::future<std::string> ret{p};
operator()(ipport, [p(std::move(p))]
(std::exception_ptr eptr, const string_view &ptr)
mutable
{
if(eptr)
p.set_exception(std::move(eptr));
else
p.set_value(std::string{ptr});
});
return ret; /// Convenience composition with a single ipport callback. This is the result of
} /// an automatic chain of queries such as SRV and A/AAAA based on the input and
/// intermediate results.
/// Resolve a hostname (with service name/portnum) to a numerical address. This
/// is an A or AAAA query (with automatic SRV query) returning a single result.
ircd::ctx::future<ircd::net::ipport>
ircd::net::dns::operator()(const hostport &hostport)
{
ctx::promise<ipport> p;
ctx::future<ipport> ret{p};
operator()(hostport, [p(std::move(p))]
(std::exception_ptr eptr, const ipport &ip)
mutable
{
if(eptr)
p.set_exception(std::move(eptr));
else
p.set_value(ip);
});
return ret;
}
/// Lower-level A or AAAA query (with automatic SRV query) with asynchronous
/// callback interface. This returns only one result.
void void
ircd::net::dns::operator()(const hostport &hostport, ircd::net::dns::operator()(const hostport &hostport,
callback_one callback) const opts &opts,
callback_ipport_one callback)
{ {
assert(bool(ircd::net::dns::resolver)); operator()(hostport, opts, [this, hostport(hostport), opts(opts), callback(std::move(callback))]
operator()(hostport, [callback(std::move(callback))] (std::exception_ptr eptr, const rfc1035::record::SRV &record)
(std::exception_ptr eptr, const vector_view<const ipport> &results) mutable
{ {
if(eptr) if(eptr)
return callback(std::move(eptr), {}); return callback(std::move(eptr), {});
if(results.empty()) if(!record.tgt.empty())
return callback(std::make_exception_ptr(nxdomain{}), {}); host(hostport) = record.tgt;
callback(std::move(eptr), results.at(0)); if(record.port != 0)
port(hostport) = record.port;
// Have to kill the service name to not run another SRV query now.
hostport.service = {};
opts.srv = {};
this->operator()(hostport, opts, [hostport, callback(std::move(callback))]
(std::exception_ptr eptr, const rfc1035::record::A &record)
{
if(eptr)
return callback(std::move(eptr), {});
const ipport ipport{record.ip4, port(hostport)};
callback(std::move(eptr), ipport);
});
}); });
} }
/// Lower-level A+AAAA query (with automatic SRV query). This returns a vector /// Convenience callback with a single SRV record which was selected from
/// of all results in the callback. /// the vector with stochastic respect for weighting and priority.
void void
ircd::net::dns::operator()(const hostport &hostport, ircd::net::dns::operator()(const hostport &hostport,
callback_many callback) const opts &opts,
callback_SRV_one callback)
{ {
static const flag flags{};
assert(bool(ircd::net::dns::resolver)); assert(bool(ircd::net::dns::resolver));
(*resolver)(hostport, flags, std::move(callback)); operator()(hostport, opts, [callback(std::move(callback))]
(std::exception_ptr eptr, const vector_view<const rfc1035::record *> rrs)
{
if(eptr || rrs.empty())
return callback(std::move(eptr), {});
//TODO: prng on weight / prio plz
const auto &rr{*rrs.at(0)};
const auto &record(rr.as<const rfc1035::record::SRV>());
callback(std::move(eptr), record);
});
} }
/// Lower-level PTR query (i.e "reverse DNS") with asynchronous callback /// Convenience callback with a single A record which was selected from
/// interface. /// the vector randomly.
void void
ircd::net::dns::operator()(const ipport &ipport, ircd::net::dns::operator()(const hostport &hostport,
callback_reverse callback) const opts &opts,
callback_A_one callback)
{ {
static const flag flags{};
assert(bool(ircd::net::dns::resolver)); assert(bool(ircd::net::dns::resolver));
(*resolver)(ipport, flags, std::move(callback)); operator()(hostport, opts, [callback(std::move(callback))]
(std::exception_ptr eptr, const vector_view<const rfc1035::record *> rrs)
{
if(eptr || rrs.empty())
return callback(std::move(eptr), {});
//TODO: prng plz
const auto &rr{*rrs.at(0)};
const auto &record(rr.as<const rfc1035::record::A>());
callback(std::move(eptr), record);
});
}
/// Fundamental callback with a vector of abstract resource records.
void
ircd::net::dns::operator()(const hostport &hostport,
const opts &opts,
callback cb)
{
assert(bool(ircd::net::dns::resolver));
(*resolver)(hostport, opts, std::move(cb));
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -2309,71 +2320,83 @@ ircd::net::dns::resolver::check_timeout(const uint16_t &id,
boost::system::errc::timed_out, boost::system::system_category() boost::system::errc::timed_out, boost::system::system_category()
}; };
tag.set_exception(make_eptr(ec)); if(tag.cb)
tag.cb(make_eptr(ec), {});
return false; return false;
} }
/// Internal A/AAAA record resolver function /// Internal resolver entry interface.
void void
ircd::net::dns::resolver::operator()(const hostport &hostport, ircd::net::dns::resolver::operator()(const hostport &hostport,
const flag &flags, const opts &opts,
callback_many callback) callback callback)
{ {
uint16_t id; do const auto &tag
{ {
id = ircd::rand::integer(1, 65535); set_tag(resolver::tag
assert(tags.size() < 65535); {
const auto it{tags.lower_bound(id)}; hostport, opts, std::move(callback)
if(it != end(tags) && it->first == id) })
continue; };
tags.emplace_hint(it, id, tag{hostport, flags, std::move(callback)});
dock.notify_one();
break;
}
while(1);
// Escape trunk // Escape trunk
const unwind::exceptional untag{[this, &id] const unwind::exceptional untag{[this, &tag]
{ {
tags.erase(id); tags.erase(tag.id);
}}; }};
// Trivial host string
const string_view &host
{
hostport.host
};
// Determine if the port is a string or requires a lex_cast to one.
char portbuf[8];
const string_view &port
{
hostport.portnum? lex_cast(hostport.portnum, portbuf) : hostport.port
};
// Generate the question
const rfc1035::question question
{
host, "A"
};
thread_local char buf[64_KiB]; thread_local char buf[64_KiB];
const auto query send_query(make_query(buf, tag));
{
rfc1035::make_query(buf, id, question)
};
send_query(query);
} }
/// Internal PTR record resolver function ircd::const_buffer
void ircd::net::dns::resolver::make_query(const mutable_buffer &buf,
ircd::net::dns::resolver::operator()(const ipport &ipport, const tag &tag)
const flag &flags, const
callback_reverse callback)
{ {
throw not_implemented{}; if(tag.hp.service || tag.opts.srv)
{
thread_local char srvbuf[512];
string_view srvhost;
if(!tag.opts.srv)
srvhost = fmt::sprintf
{
srvbuf, "_%s._%s.%s", service(tag.hp), tag.opts.proto, host(tag.hp)
};
else
srvhost = fmt::sprintf
{
srvbuf, "%s%s", tag.opts.srv, host(tag.hp)
};
const rfc1035::question question{srvhost, "SRV"};
return rfc1035::make_query(buf, tag.id, question);
}
const rfc1035::question question{host(tag.hp), "A"};
return rfc1035::make_query(buf, tag.id, question);
}
ircd::net::dns::resolver::tag &
ircd::net::dns::resolver::set_tag(tag &&tag)
{
while(tags.size() < 65535)
{
tag.id = ircd::rand::integer(1, 65535);
auto it{tags.lower_bound(tag.id)};
if(it != end(tags) && it->first == tag.id)
continue;
it = tags.emplace_hint(it, tag.id, std::move(tag));
dock.notify_one();
return it->second;
}
throw assertive
{
"Too many DNS queries"
};
} }
void void
@ -2512,60 +2535,84 @@ try
"protocol error: %s", rfc1035::rcode.at(header.rcode) "protocol error: %s", rfc1035::rcode.at(header.rcode)
}; };
if(header.qdcount > 8 || header.ancount > 8) // The maximum number of records we're accepting for a section
static const size_t MAX_COUNT
{
64
};
if(header.qdcount > MAX_COUNT || header.ancount > MAX_COUNT)
throw error throw error
{ {
"Response contains too many sections..." "Response contains too many sections..."
}; };
if(!header.ancount) const_buffer buffer
{ {
tag.cb_many({}, {}); body
return; };
}
const_buffer buf{body}; // Questions are regurgitated back to us so they must be parsed first
rfc1035::question qd[header.qdcount]; thread_local rfc1035::question qd[MAX_COUNT];
for(size_t i(0); i < header.qdcount; ++i) for(size_t i(0); i < header.qdcount; ++i)
consume(buf, size(qd[i].parse(buf))); consume(buffer, size(qd[i].parse(buffer)));
rfc1035::answer an[header.ancount]; // Answers are parsed into this buffer
thread_local rfc1035::answer an[MAX_COUNT];
for(size_t i(0); i < header.ancount; ++i) for(size_t i(0); i < header.ancount; ++i)
consume(buf, size(an[i].parse(buf))); consume(buffer, size(an[i].parse(buffer)));
size_t ippi(0); // This will be where we place the record instances which are dynamically
net::ipport ipp[header.ancount]; // laid out and sized types; this is an alternative to new'ing them
for(size_t i(0); i < header.ancount; ++i) // without placement. 512 bytes is assumed as a soft maximum for each RR.
thread_local uint8_t recbuf[MAX_COUNT * 512];
thread_local const rfc1035::record *record[MAX_COUNT];
size_t i(0);
uint8_t *pos{recbuf};
for(; i < header.ancount; ++i) switch(an[i].qtype)
{ {
switch(an[i].qtype) case 1:
{ {
case 0x01: record[i] = new (pos) rfc1035::record::A(an[i]);
{ pos += sizeof(rfc1035::record::A);
const rfc1035::record::A rr(an[i].rdata); continue;
ipp[ippi++] = { rr.ip4, port(tag.hp) }; }
continue;
}
case 0x21: case 5:
{ {
const rfc1035::record::SRV rr(an[i].rdata); record[i] = new (pos) rfc1035::record::CNAME(an[i]);
continue; pos += sizeof(rfc1035::record::CNAME);
} continue;
}
case 0x05: case 33:
{ {
const rfc1035::record::CNAME rr(an[i].rdata); record[i] = new (pos) rfc1035::record::SRV(an[i]);
continue; pos += sizeof(rfc1035::record::SRV);
} continue;
}
default:
{
record[i] = new (pos) rfc1035::record(an[i]);
pos += sizeof(rfc1035::record);
continue;
} }
} }
if(tag.cb_many) if(tag.cb)
tag.cb_many({}, vector_view<const ipport>(ipp, header.ancount)); tag.cb({}, vector_view<const rfc1035::record *>(record, i));
} }
catch(...) catch(const std::exception &e)
{ {
tag.set_exception(std::current_exception()); log.error("resolver tag:%u [%s]: %s",
tag.id,
string(tag.hp),
e.what());
if(tag.cb)
tag.cb(std::current_exception(), {});
} }
bool bool
@ -2609,15 +2656,6 @@ ircd::net::dns::resolver::init_servers()
}); });
} }
void
ircd::net::dns::resolver::tag::set_exception(std::exception_ptr &&eptr)
{
if(cb_many)
cb_many(std::move(eptr), {});
else if(cb_reverse)
cb_reverse(std::move(eptr), {});
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// //
// net/remote.h // net/remote.h