mirror of
https://github.com/matrix-construct/construct
synced 2024-06-02 18:18:56 +02:00
modules/s_dns_resolver: Improve the worker shutdown process and related.
This commit is contained in:
parent
bf4cdbf878
commit
727b9fb16a
|
@ -126,12 +126,13 @@ ircd::net::dns::resolver::~resolver()
|
|||
noexcept
|
||||
{
|
||||
ns.close();
|
||||
sendq_context.terminate();
|
||||
timeout_context.interrupt();
|
||||
while(!tags.empty())
|
||||
{
|
||||
log::warning
|
||||
{
|
||||
log, "Waiting for %zu unfinished DNS resolutions",
|
||||
tags.size()
|
||||
log, "Waiting for %zu unfinished DNS resolutions", tags.size()
|
||||
};
|
||||
|
||||
ctx::sleep(3);
|
||||
|
@ -146,25 +147,31 @@ ircd::net::dns::resolver::sendq_worker()
|
|||
{
|
||||
while(1)
|
||||
{
|
||||
assert(sendq.empty() || !tags.empty());
|
||||
dock.wait([this]
|
||||
{
|
||||
assert(sendq.empty() || !tags.empty());
|
||||
return !sendq.empty();
|
||||
});
|
||||
|
||||
assert(sendq.size() < 65535);
|
||||
assert(sendq.size() <= tags.size());
|
||||
if(tags.size() > size_t(send_burst))
|
||||
ctx::sleep(milliseconds(send_rate));
|
||||
|
||||
const unwind::nominal::assertion na;
|
||||
assert(!sendq.empty());
|
||||
const uint16_t next(sendq.front());
|
||||
sendq.pop_front();
|
||||
flush(next);
|
||||
sendq_work();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ircd::net::dns::resolver::sendq_work()
|
||||
{
|
||||
assert(!sendq.empty());
|
||||
assert(sendq.size() < 65535);
|
||||
assert(sendq.size() <= tags.size());
|
||||
const unwind::nominal::assertion na;
|
||||
const uint16_t next(sendq.front());
|
||||
sendq.pop_front();
|
||||
flush(next);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::net::dns::resolver::flush(const uint16_t &next)
|
||||
try
|
||||
|
@ -180,13 +187,13 @@ catch(const std::out_of_range &e)
|
|||
{
|
||||
log::error
|
||||
{
|
||||
"Queued tag id[%u] is no longer mapped", next
|
||||
log, "Queued tag id[%u] is no longer mapped", next
|
||||
};
|
||||
}
|
||||
|
||||
__attribute__((noreturn))
|
||||
void
|
||||
ircd::net::dns::resolver::timeout_worker()
|
||||
try
|
||||
{
|
||||
while(1)
|
||||
{
|
||||
|
@ -199,6 +206,29 @@ ircd::net::dns::resolver::timeout_worker()
|
|||
check_timeouts(milliseconds(timeout));
|
||||
}
|
||||
}
|
||||
catch(const ctx::interrupted &)
|
||||
{
|
||||
cancel_all_tags();
|
||||
return;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::net::dns::resolver::cancel_all_tags()
|
||||
{
|
||||
static const std::system_error ec
|
||||
{
|
||||
make_error_code(std::errc::operation_canceled)
|
||||
};
|
||||
|
||||
if(!tags.empty())
|
||||
log::dwarning
|
||||
{
|
||||
log, "Attempting to cancel all %zu pending tags.", tags.size()
|
||||
};
|
||||
|
||||
for(auto &p : tags)
|
||||
post_error(p.second, ec);
|
||||
}
|
||||
|
||||
void
|
||||
ircd::net::dns::resolver::check_timeouts(const milliseconds &timeout)
|
||||
|
@ -241,43 +271,64 @@ ircd::net::dns::resolver::check_timeout(const uint16_t &id,
|
|||
};
|
||||
|
||||
tag.last = steady_point{};
|
||||
if(ns.is_open() && tag.tries < size_t(retry_max))
|
||||
if(tag.tries < size_t(retry_max))
|
||||
{
|
||||
submit(tag);
|
||||
return false;
|
||||
}
|
||||
|
||||
static const std::system_error ec
|
||||
{
|
||||
make_error_code(std::errc::timed_out)
|
||||
};
|
||||
|
||||
post_error(tag, ec);
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
ircd::net::dns::resolver::post_error(tag &tag,
|
||||
const std::system_error &ec)
|
||||
{
|
||||
// We ignore this request to post an error here if the tag has no callback
|
||||
// function set. Nulling the callback is used as hack-hoc state to indicate
|
||||
// that something else has called back the user and will unmap this tag so
|
||||
// there's no reason for us to post this.
|
||||
if(!tag.cb)
|
||||
return true;
|
||||
return;
|
||||
|
||||
auto handler
|
||||
{
|
||||
std::bind(&resolver::handle_post_error, this, tag.id, std::ref(tag), std::cref(ec))
|
||||
};
|
||||
|
||||
// Callback gets a fresh stack off this timeout worker ctx's stack.
|
||||
ircd::post([this, id, &tag]
|
||||
ircd::post(std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
ircd::net::dns::resolver::handle_post_error(const uint16_t id,
|
||||
tag &tag,
|
||||
const std::system_error &ec)
|
||||
{
|
||||
// Have to check if the tag is still mapped at this point. It may
|
||||
// have been removed if a belated reply came in while this closure
|
||||
// was posting. If so, that's good news and we bail on the timeout.
|
||||
if(!tags.count(id))
|
||||
return;
|
||||
|
||||
log::error
|
||||
{
|
||||
using boost::system::system_error;
|
||||
static const error_code ec
|
||||
{
|
||||
boost::system::errc::timed_out, boost::system::system_category()
|
||||
};
|
||||
log, "DNS error id:%u for '%s' :%s",
|
||||
id,
|
||||
string(tag.hp),
|
||||
string(ec)
|
||||
};
|
||||
|
||||
// Have to check if the tag is still mapped at this point. It may
|
||||
// have been removed if a belated reply came in while this closure
|
||||
// was posting. If so, that's good news and we bail on the timeout.
|
||||
if(!tags.count(id))
|
||||
return;
|
||||
|
||||
log::error
|
||||
{
|
||||
log, "DNS timeout id:%u for '%s'",
|
||||
id,
|
||||
string(tag.hp)
|
||||
};
|
||||
|
||||
tag.cb(std::make_exception_ptr(system_error{ec}), tag.hp, {});
|
||||
const auto erased(tags.erase(tag.id));
|
||||
assert(erased == 1);
|
||||
});
|
||||
|
||||
return false;
|
||||
assert(tag.cb);
|
||||
tag.cb(std::make_exception_ptr(ec), tag.hp, {});
|
||||
const auto erased(tags.erase(tag.id));
|
||||
assert(erased == 1);
|
||||
}
|
||||
|
||||
/// Internal resolver entry interface.
|
||||
|
@ -372,6 +423,7 @@ ircd::net::dns::resolver::queue_query(tag &tag)
|
|||
void
|
||||
ircd::net::dns::resolver::submit(tag &tag)
|
||||
{
|
||||
assert(ns.is_open());
|
||||
const auto rate(milliseconds(send_rate) / server.size());
|
||||
const auto elapsed(now<steady_point>() - send_last);
|
||||
if(elapsed >= rate || tags.size() < size_t(send_burst))
|
||||
|
|
|
@ -49,12 +49,17 @@ struct ircd::net::dns::resolver
|
|||
static const_buffer make_query(const mutable_buffer &buf, const tag &);
|
||||
void operator()(const hostport &, const opts &, callback &&);
|
||||
|
||||
void handle_post_error(const uint16_t id, tag &, const std::system_error &);
|
||||
void post_error(tag &, const std::system_error &ec);
|
||||
bool check_timeout(const uint16_t &id, tag &, const steady_point &expired);
|
||||
void check_timeouts(const milliseconds &timeout);
|
||||
void cancel_all_tags();
|
||||
void timeout_worker();
|
||||
ctx::context timeout_context;
|
||||
|
||||
void flush(const uint16_t &);
|
||||
void sendq_work();
|
||||
void sendq_clear();
|
||||
void sendq_worker();
|
||||
ctx::context sendq_context;
|
||||
|
||||
|
|
Loading…
Reference in a new issue