0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-15 14:31:11 +01:00

ircd::net::acceptor: Track and limit operations based on configuration.

This commit is contained in:
Jason Volk 2019-06-01 13:54:55 -07:00
parent d7edd1960e
commit dafb8bd42f
3 changed files with 149 additions and 43 deletions

View file

@ -23,11 +23,15 @@ struct ircd::net::acceptor
using error_code = boost::system::error_code; using error_code = boost::system::error_code;
using callback = listener::callback; using callback = listener::callback;
using proffer = listener::proffer; using proffer = listener::proffer;
using sockets = std::list<std::shared_ptr<socket>>;
IRCD_EXCEPTION(listener::error, error) IRCD_EXCEPTION(listener::error, error)
IRCD_EXCEPTION(error, sni_warning) IRCD_EXCEPTION(error, sni_warning)
static log::log log; static log::log log;
static conf::item<size_t> accepting_max;
static conf::item<size_t> handshaking_max;
static conf::item<size_t> handshaking_max_per_peer;
static conf::item<milliseconds> timeout; static conf::item<milliseconds> timeout;
static conf::item<std::string> ssl_curve_list; static conf::item<std::string> ssl_curve_list;
static conf::item<std::string> ssl_cipher_list; static conf::item<std::string> ssl_cipher_list;
@ -42,10 +46,9 @@ struct ircd::net::acceptor
asio::ssl::context ssl; asio::ssl::context ssl;
ip::tcp::endpoint ep; ip::tcp::endpoint ep;
ip::tcp::acceptor a; ip::tcp::acceptor a;
size_t accepting {0}; sockets accepting;
size_t handshaking {0}; sockets handshaking;
bool interrupting {false}; bool interrupting {false};
bool handle_set {false};
ctx::dock joining; ctx::dock joining;
void configure(const json::object &opts); void configure(const json::object &opts);
@ -54,14 +57,15 @@ struct ircd::net::acceptor
bool handle_sni(SSL &, int &ad); bool handle_sni(SSL &, int &ad);
string_view handle_alpn(SSL &, const vector_view<const string_view> &in); string_view handle_alpn(SSL &, const vector_view<const string_view> &in);
void check_handshake_error(const error_code &ec, socket &); void check_handshake_error(const error_code &ec, socket &);
void handshake(const error_code &ec, std::shared_ptr<socket>, std::weak_ptr<acceptor>) noexcept; void handshake(const error_code &, const std::shared_ptr<socket>, const decltype(handshaking)::const_iterator) noexcept;
// Acceptance stack // Acceptance stack
bool check_accept_error(const error_code &ec, socket &); bool check_accept_error(const error_code &ec, socket &);
void accept(const error_code &ec, std::shared_ptr<socket>, std::weak_ptr<acceptor>) noexcept; void accept(const error_code &, const std::shared_ptr<socket>, const decltype(accepting)::const_iterator) noexcept;
// Accept next // Accept next
bool set_handle(); bool set_handle();
size_t set_handles();
// Acceptor shutdown // Acceptor shutdown
bool interrupt() noexcept; bool interrupt() noexcept;

View file

@ -23,6 +23,11 @@ namespace ircd::net
string_view name(const acceptor &); string_view name(const acceptor &);
ipport binder(const acceptor &); ipport binder(const acceptor &);
ipport local(const acceptor &); ipport local(const acceptor &);
size_t handshaking_count(const acceptor &, const ipaddr &);
size_t handshaking_count(const acceptor &);
size_t accepting_count(const acceptor &);
string_view loghead(const mutable_buffer &, const acceptor &); string_view loghead(const mutable_buffer &, const acceptor &);
string_view loghead(const acceptor &); string_view loghead(const acceptor &);
std::ostream &operator<<(std::ostream &s, const acceptor &); std::ostream &operator<<(std::ostream &s, const acceptor &);

View file

@ -1304,6 +1304,32 @@ ircd::net::acceptor::timeout
{ "default", 12000L }, { "default", 12000L },
}; };
/// The number of sockets we precreate and set accept handles for.
decltype(ircd::net::acceptor::accepting_max)
ircd::net::acceptor::accepting_max
{
{ "name", "ircd.net.acceptor.accepting.max" },
{ "default", 1L },
};
/// The number of simultaneous handshakes we conduct across all clients.
decltype(ircd::net::acceptor::handshaking_max)
ircd::net::acceptor::handshaking_max
{
{ "name", "ircd.net.acceptor.handshaking.max" },
{ "default", 64L },
};
/// The number of simultaneous handshakes we conduct for a single peer (which
/// is an IP without a port in this context). This prevents a peer from
/// reaching the handshaking.max limit to DoS out other peers.
decltype(ircd::net::acceptor::handshaking_max_per_peer)
ircd::net::acceptor::handshaking_max_per_peer
{
{ "name", "ircd.net.acceptor.handshaking.max_per_peer" },
{ "default", 16L },
};
decltype(ircd::net::acceptor::ssl_curve_list) decltype(ircd::net::acceptor::ssl_curve_list)
ircd::net::acceptor::ssl_curve_list ircd::net::acceptor::ssl_curve_list
{ {
@ -1348,10 +1374,7 @@ ircd::net::allow(acceptor &a)
if(unlikely(!a.a.is_open())) if(unlikely(!a.a.is_open()))
return false; return false;
if(unlikely(a.handle_set)) a.set_handles();
return false;
a.set_handle();
return true; return true;
} }
@ -1407,6 +1430,29 @@ ircd::net::config(const acceptor &a)
return a.opts; return a.opts;
} }
size_t
ircd::net::accepting_count(const acceptor &a)
{
return a.accepting.size();
}
size_t
ircd::net::handshaking_count(const acceptor &a)
{
return a.handshaking.size();
}
size_t
ircd::net::handshaking_count(const acceptor &a,
const ipaddr &ipaddr)
{
return std::count_if(begin(a.handshaking), end(a.handshaking), [&ipaddr]
(const auto &socket_p)
{
return remote_ipport(*socket_p) == ipaddr;
});
}
// //
// acceptor::acceptor // acceptor::acceptor
// //
@ -1474,13 +1520,13 @@ catch(const boost::system::system_error &e)
ircd::net::acceptor::~acceptor() ircd::net::acceptor::~acceptor()
noexcept noexcept
{ {
if(accepting || handshaking) if(!accepting.empty() || handshaking.empty())
log::critical log::critical
{ {
"The acceptor must not have clients during destruction!" "The acceptor must not have clients during destruction!"
" (accepting:%zu handshaking:%zu)", " (accepting:%zu handshaking:%zu)",
accepting, accepting.size(),
handshaking handshaking.size(),
}; };
} }
@ -1534,6 +1580,9 @@ ircd::net::acceptor::close()
if(a.is_open()) if(a.is_open())
a.close(); a.close();
for(const auto &sock : handshaking)
sock->cancel();
join(); join();
log::debug log::debug
{ {
@ -1553,7 +1602,7 @@ noexcept try
joining.wait([this] joining.wait([this]
{ {
return !accepting && !handshaking; return accepting.empty() && handshaking.empty();
}); });
interrupting = false; interrupting = false;
@ -1591,6 +1640,16 @@ catch(const boost::system::system_error &e)
return false; return false;
} }
size_t
ircd::net::acceptor::set_handles()
{
size_t ret(0);
while(set_handle())
++ret;
return ret;
}
/// Sets the next asynchronous handler to start the next accept sequence. /// Sets the next asynchronous handler to start the next accept sequence.
/// Each call to next() sets one handler which handles the connect for one /// Each call to next() sets one handler which handles the connect for one
/// socket. After the connect, an asynchronous SSL handshake handler is set /// socket. After the connect, an asynchronous SSL handshake handler is set
@ -1604,20 +1663,25 @@ try
"ircd::net::acceptor accept" "ircd::net::acceptor accept"
}; };
assert(!handle_set); if(accepting.size() >= size_t(accepting_max))
handle_set = true; return false;
auto sock
const auto it
{ {
std::make_shared<ircd::socket>(ssl) accepting.emplace(end(accepting), std::make_shared<ircd::socket>(ssl))
};
const auto &sock
{
*it
}; };
++accepting;
ip::tcp::socket &sd(*sock);
auto handler auto handler
{ {
std::bind(&acceptor::accept, this, ph::_1, sock, weak_from(*this)) std::bind(&acceptor::accept, this, ph::_1, sock, it)
}; };
ip::tcp::socket &sd(*sock);
a.async_accept(sd, ios::handle(desc, std::move(handler))); a.async_accept(sd, ios::handle(desc, std::move(handler)));
return true; return true;
} }
@ -1635,40 +1699,68 @@ catch(const std::exception &e)
void void
ircd::net::acceptor::accept(const error_code &ec, ircd::net::acceptor::accept(const error_code &ec,
const std::shared_ptr<socket> sock, const std::shared_ptr<socket> sock,
const std::weak_ptr<acceptor> a) const decltype(accepting)::const_iterator it)
noexcept try noexcept try
{ {
if(unlikely(a.expired()))
return;
assert(bool(sock)); assert(bool(sock));
assert(handle_set); assert(!accepting.empty());
assert(accepting > 0); assert(it != end(accepting));
handle_set = false;
--accepting;
thread_local char ecbuf[64]; thread_local char ecbuf[64];
log::debug log::debug
{ {
log, "%s: accepted(%zu) %s %s", log, "%s: %s accepted(%zd:%zu) %s",
loghead(*this), loghead(*this),
accepting,
loghead(*sock), loghead(*sock),
std::distance(cbegin(accepting), it),
accepting.size(),
string(ecbuf, ec) string(ecbuf, ec)
}; };
accepting.erase(it);
if(!check_accept_error(ec, *sock)) if(!check_accept_error(ec, *sock))
return; return;
const auto &remote
{
remote_ipport(*sock)
};
// Call the proffer-callback if available. This allows the application // Call the proffer-callback if available. This allows the application
// to check whether to allow or deny this remote before the handshake. // to check whether to allow or deny this remote before the handshake.
if(pcb && !pcb(*listener_, remote_ipport(*sock))) if(pcb && !pcb(*listener_, remote))
{ {
net::close(*sock, dc::RST, close_ignore); net::close(*sock, dc::RST, close_ignore);
return; return;
} }
if(unlikely(handshaking_count(*this) >= size_t(handshaking_max)))
{
log::dwarning
{
log, "%s: refusing to handshake %s; exceeds maximum of %zu handshakes.",
loghead(*this),
loghead(*sock),
size_t(handshaking_max),
};
net::close(*sock, dc::RST, close_ignore);
return;
}
if(unlikely(handshaking_count(*this, remote) >= size_t(handshaking_max_per_peer)))
{
log::dwarning
{
log, "%s: refusing to handshake %s; exceeds maximum of %zu handshakes to them.",
loghead(*this),
loghead(*sock),
size_t(handshaking_max_per_peer),
};
net::close(*sock, dc::RST, close_ignore);
return;
}
static const socket::handshake_type handshake_type static const socket::handshake_type handshake_type
{ {
socket::handshake_type::server socket::handshake_type::server
@ -1679,12 +1771,16 @@ noexcept try
"ircd::net::acceptor async_handshake" "ircd::net::acceptor async_handshake"
}; };
auto handshake const auto it
{ {
std::bind(&acceptor::handshake, this, ph::_1, sock, a) handshaking.emplace(end(handshaking), sock)
};
auto handshake
{
std::bind(&acceptor::handshake, this, ph::_1, sock, it)
}; };
++handshaking;
sock->set_timeout(milliseconds(timeout)); sock->set_timeout(milliseconds(timeout));
sock->ssl.async_handshake(handshake_type, ios::handle(desc, std::move(handshake))); sock->ssl.async_handshake(handshake_type, ios::handle(desc, std::move(handshake)));
} }
@ -1771,14 +1867,12 @@ ircd::net::acceptor::check_accept_error(const error_code &ec,
void void
ircd::net::acceptor::handshake(const error_code &ec, ircd::net::acceptor::handshake(const error_code &ec,
const std::shared_ptr<socket> sock, const std::shared_ptr<socket> sock,
const std::weak_ptr<acceptor> a) const decltype(handshaking)::const_iterator it)
noexcept try noexcept try
{ {
if(unlikely(a.expired()))
return;
--handshaking;
assert(bool(sock)); assert(bool(sock));
assert(!handshaking.empty());
assert(it != end(handshaking));
#ifdef RB_DEBUG #ifdef RB_DEBUG
const auto *const current_cipher const auto *const current_cipher
@ -1791,9 +1885,11 @@ noexcept try
thread_local char ecbuf[64]; thread_local char ecbuf[64];
log::debug log::debug
{ {
log, "%s handshook(%zu) cipher:%s %s", log, "%s: %s handshook(%zd:%zu) cipher:%s %s",
loghead(*this),
loghead(*sock), loghead(*sock),
handshaking, std::distance(cbegin(handshaking), it),
handshaking.size(),
current_cipher? current_cipher?
openssl::name(*current_cipher): openssl::name(*current_cipher):
"<NO CIPHER>"_sv, "<NO CIPHER>"_sv,
@ -1801,6 +1897,7 @@ noexcept try
}; };
#endif #endif
handshaking.erase(it);
check_handshake_error(ec, *sock); check_handshake_error(ec, *sock);
sock->cancel_timeout(); sock->cancel_timeout();
assert(bool(cb)); assert(bool(cb));