0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-25 08:12:37 +01:00

ircd::net: Socket interface package.

This commit is contained in:
Jason Volk 2018-01-01 02:42:00 -07:00
parent 9ad618a329
commit acc73957f8
3 changed files with 222 additions and 248 deletions

View file

@ -31,8 +31,6 @@
namespace ircd::net
{
using asio::steady_timer;
struct socket;
extern asio::ssl::context sslv23_client;
@ -45,29 +43,31 @@ struct ircd::net::socket
{
struct io;
struct stat;
struct xfer;
struct scope_timeout;
using endpoint = ip::tcp::endpoint;
using wait_type = ip::tcp::socket::wait_type;
using message_flags = asio::socket_base::message_flags;
using handshake_type = asio::ssl::stream<ip::tcp::socket>::handshake_type;
using handler = std::function<void (const error_code &) noexcept>;
using xfer_handler = std::function<void (xfer) noexcept>;
struct stat
{
size_t bytes {0};
size_t calls {0};
};
using message_flags = boost::asio::socket_base::message_flags;
using handshake_type = asio::ssl::stream<ip::tcp::socket>::handshake_type;
using handler = std::function<void (const error_code &) noexcept>;
using xfer_handler = std::function<void (const error_code &, const size_t &) noexcept>;
ip::tcp::socket sd;
asio::ssl::stream<ip::tcp::socket &> ssl;
steady_timer timer;
stat in, out;
bool timedout {false};
void call_user(const handler &, const error_code &) noexcept;
bool handle_error(const error_code &ec);
void call_user(const handler &, const error_code &ec) noexcept;
void handle_timeout(std::weak_ptr<socket> wp, const error_code &ec) noexcept;
void handle(std::weak_ptr<socket>, handler, const error_code &, const size_t &) noexcept;
void handle(std::weak_ptr<socket>, handler, const error_code &) noexcept;
public:
operator const ip::tcp::socket &() const { return sd; }
@ -75,25 +75,29 @@ struct ircd::net::socket
operator const SSL &() const;
operator SSL &();
ip::tcp::endpoint remote() const; // getpeername(); throws if not conn
ip::tcp::endpoint local() const; // getsockname(); throws if not conn/bound
bool connected() const noexcept; // false on any sock errs
endpoint remote() const; // getpeername(); throws if not conn
endpoint local() const; // getsockname(); throws if not conn/bound
size_t available() const; // throws on errors; use friend variant for noex..
size_t readable() const; // throws on errors; ioctl
size_t rbufsz() const; // throws on errors; SO_RCVBUF
size_t wbufsz() const; // throws on errors; SO_SNDBUF
bool blocking() const; // throws on errors;
void rbufsz(const size_t &); // throws; set SO_RCVBUF bytes
void wbufsz(const size_t &); // throws; set SO_RCVBUF bytes
void blocking(const bool &); // throws; set blocking
// low level read suite
template<class iov> auto read_some(const iov &, xfer_handler);
template<class iov> auto read_some(const iov &, error_code &);
template<class iov> auto read_some(const iov &);
template<class iov> auto read(const iov &, xfer_handler);
template<class iov> auto read(const iov &, error_code &);
template<class iov> auto read(const iov &);
// low level write suite
template<class iov> auto write_some(const iov &, xfer_handler);
template<class iov> auto write_some(const iov &, error_code &);
template<class iov> auto write_some(const iov &);
template<class iov> auto write(const iov &, xfer_handler);
template<class iov> auto write(const iov &, error_code &);
template<class iov> auto write(const iov &);
// Timer for this socket
@ -101,15 +105,14 @@ struct ircd::net::socket
void set_timeout(const milliseconds &);
error_code cancel_timeout() noexcept;
// Asynchronous callback when socket ready for read
void operator()(const milliseconds &timeout, handler);
void operator()(handler);
// Asynchronous callback when socket ready
void operator()(const wait_type &, const milliseconds &timeout, handler);
void operator()(const wait_type &, handler);
bool cancel() noexcept;
// Connect to host; synchronous (yield) and asynchronous (callback) variants
void connect(const ip::tcp::endpoint &ep, const milliseconds &timeout, handler callback);
void connect(const ip::tcp::endpoint &ep, const milliseconds &timeout = 30000ms);
void connect(const net::remote &, const milliseconds &timeout = 30000ms);
void connect(const endpoint &ep, const milliseconds &timeout, handler callback);
void connect(const endpoint &ep, const milliseconds &timeout = 30000ms);
bool disconnect(const dc &type);
socket(asio::ssl::context &ssl = sslv23_client,
@ -152,25 +155,22 @@ class ircd::net::socket::io
io(struct socket &, struct stat &, const std::function<size_t ()> &closure);
};
struct ircd::net::socket::xfer
{
std::exception_ptr eptr;
size_t bytes {0};
xfer(const error_code &ec = {}, const size_t &bytes = 0);
};
template<class iov>
auto
ircd::net::socket::write(const iov &bufs)
{
return io(*this, out, [&]
return io{*this, out, [&]
{
return async_write(ssl, bufs, asio::transfer_all(), yield_context{to_asio{}});
});
}
template<class iov>
auto
ircd::net::socket::write(const iov &bufs,
error_code &ec)
{
return io(*this, out, [&]
{
return async_write(ssl, bufs, asio::transfer_all(), yield_context{to_asio{}}[ec]);
});
}};
}
template<class iov>
@ -183,7 +183,7 @@ ircd::net::socket::write(const iov &bufs,
noexcept
{
io{*this, out, bytes};
handler(ec, bytes);
handler(xfer{ec, bytes});
});
}
@ -191,21 +191,10 @@ template<class iov>
auto
ircd::net::socket::write_some(const iov &bufs)
{
return io(*this, out, [&]
return io{*this, out, [&]
{
return ssl.async_write_some(bufs, yield_context{to_asio{}});
});
}
template<class iov>
auto
ircd::net::socket::write_some(const iov &bufs,
error_code &ec)
{
return io(*this, out, [&]
{
return ssl.async_write_some(bufs, yield_context{to_asio{}}[ec]);
});
}};
}
template<class iov>
@ -218,7 +207,7 @@ ircd::net::socket::write_some(const iov &bufs,
noexcept
{
io{*this, out, bytes};
handler(ec, bytes);
handler(xfer{ec, bytes});
});
}
@ -226,26 +215,18 @@ template<class iov>
auto
ircd::net::socket::read(const iov &bufs)
{
return io(*this, in, [&]
return io{*this, in, [&]
{
const size_t ret(async_read(ssl, bufs, yield_context{to_asio{}}));
const size_t ret
{
async_read(ssl, bufs, yield_context{to_asio{}})
};
if(unlikely(!ret))
throw boost::system::system_error(boost::asio::error::eof);
return ret;
});
}
template<class iov>
auto
ircd::net::socket::read(const iov &bufs,
error_code &ec)
{
return io(*this, in, [&]
{
return async_read(ssl, bufs, yield_context{to_asio{}}[ec]);
});
}};
}
template<class iov>
@ -258,7 +239,7 @@ ircd::net::socket::read(const iov &bufs,
noexcept
{
io{*this, in, bytes};
handler(ec, bytes);
handler(xfer{ec, bytes});
});
}
@ -266,26 +247,18 @@ template<class iov>
auto
ircd::net::socket::read_some(const iov &bufs)
{
return io(*this, in, [&]
return io{*this, in, [&]
{
const size_t ret(ssl.async_read_some(bufs, yield_context{to_asio{}}));
const size_t ret
{
ssl.async_read_some(bufs, yield_context{to_asio{}})
};
if(unlikely(!ret))
throw boost::system::system_error(boost::asio::error::eof);
return ret;
});
}
template<class iov>
auto
ircd::net::socket::read_some(const iov &bufs,
error_code &ec)
{
return io(*this, in, [&]
{
return ssl.async_read_some(bufs, yield_context{to_asio{}}[ec]);
});
}};
}
template<class iov>
@ -298,6 +271,6 @@ ircd::net::socket::read_some(const iov &bufs,
noexcept
{
io{*this, in, bytes};
handler(ec, bytes);
handler(xfer{ec, bytes});
});
}

View file

@ -233,7 +233,8 @@ ircd::async_recv_next(std::shared_ptr<client> client,
// its stack while waiting for activity on idle connections between requests.
auto &sock(*client->sock);
sock(timeout, [client(std::move(client)), timeout](const net::error_code &ec)
static const auto op{sock.sd.wait_read};
sock(op, timeout, [client(std::move(client)), timeout](const net::error_code &ec)
noexcept
{
// Right here this handler is executing on the main stack (not in any

View file

@ -552,7 +552,7 @@ ircd::net::listener::acceptor::configure(const json::object &opts)
{
log.debug("%s preparing listener socket configuration...",
std::string(*this));
/*
ssl.set_options
(
//ssl.default_workarounds
@ -561,10 +561,9 @@ ircd::net::listener::acceptor::configure(const json::object &opts)
//| ssl.no_tlsv1_2
//| ssl.no_sslv2
//| ssl.no_sslv3
ssl.single_dh_use
//| ssl.single_dh_use
);
*/
//TODO: XXX
ssl.set_password_callback([this]
(const auto &size, const auto &purpose)
@ -811,6 +810,17 @@ ircd::net::socket::scope_timeout::release()
return s != nullptr;
}
//
// socket::xfer
//
ircd::net::socket::xfer::xfer(const error_code &error_code,
const size_t &bytes)
:eptr{make_eptr(error_code)}
,bytes{bytes}
{
}
//
// socket
//
@ -888,27 +898,6 @@ catch(const std::exception &e)
throw;
}
/// Attempt to connect and ssl handshake remote; yields ircd::ctx; throws timeout
///
void
ircd::net::socket::connect(const net::remote &remote,
const milliseconds &timeout)
{
const ip::tcp::endpoint ep
{
is_v6(remote)? asio::ip::tcp::endpoint
{
asio::ip::address_v6 { std::get<remote.IP>(remote) }, port(remote)
}
: asio::ip::tcp::endpoint
{
asio::ip::address_v4 { host4(remote) }, port(remote)
},
};
this->connect(ep, timeout);
}
/// Attempt to connect and ssl handshake; asynchronous, callback when done.
///
void
@ -1067,14 +1056,15 @@ bool
ircd::net::socket::cancel()
noexcept
{
static const auto good{[](const auto &ec)
{
return ec == boost::system::errc::success;
}};
boost::system::error_code ec[2];
sd.cancel(ec[0]);
timer.cancel(ec[1]);
return std::all_of(begin(ec), end(ec), [](const auto &ec)
{
return ec == boost::system::errc::success;
});
return std::all_of(begin(ec), end(ec), good);
}
/// Asynchronous callback when the socket is ready
@ -1082,79 +1072,86 @@ noexcept
/// Overload for operator() without a timeout. see: operator()
///
void
ircd::net::socket::operator()(handler h)
ircd::net::socket::operator()(const wait_type &type,
handler h)
{
operator()(milliseconds(-1), std::move(h));
operator()(type, milliseconds(-1), std::move(h));
}
/// Asynchronous callback when the socket is ready
///
/// This function calls back the handler when the socket has received
/// something and is ready to be read from.
///
/// The purpose here is to allow waiting for data from the socket without
/// blocking any context and using any stack space whatsoever, i.e full
/// asynchronous mode.
/// This function calls back the handler when the socket is ready
/// for the operation of the specified type.
///
void
ircd::net::socket::operator()(const milliseconds &timeout,
ircd::net::socket::operator()(const wait_type &type,
const milliseconds &timeout,
handler callback)
{
static const auto flags
auto handle
{
ip::tcp::socket::message_peek
};
static char buffer[0];
static const asio::mutable_buffers_1 buffers
{
buffer, sizeof(buffer)
};
auto handler
{
std::bind(&socket::handle, this, weak_from(*this), std::move(callback), ph::_1, ph::_2)
std::bind(&socket::handle, this, weak_from(*this), std::move(callback), ph::_1)
};
assert(connected());
set_timeout(timeout);
sd.async_receive(buffers, flags, std::move(handler));
switch(type)
{
case wait_type::wait_error:
case wait_type::wait_write:
{
sd.async_wait(type, std::move(handle));
break;
}
// There might be a bug in boost on linux which is only reproducible
// when serving a large number of assets: a ready status for the socket
// is not indicated when it ought to be, at random. This is fixed below
// by doing it the old way (pre boost-1.66 sd.async_wait()) with the
// proper peek.
case wait_type::wait_read:
{
static const auto flags{ip::tcp::socket::message_peek};
sd.async_receive(buffer::null_buffers, flags, std::move(handle));
break;
}
}
}
void
ircd::net::socket::handle(const std::weak_ptr<socket> wp,
const handler callback,
const error_code &ec,
const size_t &bytes)
const error_code &ec)
noexcept try
{
using namespace boost::system::errc;
using boost::system::system_category;
// After life_guard is constructed it is safe to use *this in this frame.
const life_guard<socket> s{wp};
/*
log.debug("socket(%p): %zu bytes; %s: %s",
log.debug("socket(%p): handle: (%s)",
this,
bytes,
string(ec));
*/
// This handler and the timeout handler are responsible for canceling each other
// when one or the other is entered. If the timeout handler has already fired for
// a timeout on the socket, `timedout` will be `true` and this handler will be
// entered with an `operation_canceled` error.
if(!timedout)
cancel_timeout();
else
assert(ec == boost::system::errc::operation_canceled);
// We can handle a few errors at this level which don't ever need to invoke the
// user's callback. Otherwise they are passed up.
if(!handle_error(ec))
if(ec.category() == system_category()) switch(ec.value())
{
log.error("socket(%p): %s",
this,
string(ec));
return;
case operation_canceled:
if(timedout)
break;
return;
// This is a condition which we hide from the user.
case bad_file_descriptor:
return;
// Everything else is passed up to the user.
default:
break;
}
call_user(callback, ec);
@ -1169,88 +1166,12 @@ catch(const std::bad_weak_ptr &e)
e.what());
assert(0);
}
catch(const boost::system::system_error &e)
{
log.error("socket(%p): handle: %s %s",
this,
string(ec));
assert(0);
}
catch(const std::exception &e)
{
log.error("socket(%p): handle: %s",
this,
e.what());
assert(0);
}
void
ircd::net::socket::call_user(const handler &callback,
const error_code &ec)
noexcept try
{
callback(ec);
}
catch(const std::exception &e)
{
log.critical("socket(%p): async handler: unhandled exception: %s",
log.critical("socket(%p): handle: %s",
this,
e.what());
}
bool
ircd::net::socket::handle_error(const error_code &ec)
{
using namespace boost::system::errc;
using boost::system::system_category;
using boost::asio::error::get_ssl_category;
using boost::asio::error::get_misc_category;
if(likely(ec == success))
return true;
log.warning("socket(%p): handle error: %s: %s",
this,
string(ec));
if(ec.category() == system_category()) switch(ec.value())
{
// A cancel is triggered either by the timeout handler or by
// a request to shutdown/close the socket. We only call the user's
// handler for a timeout, otherwise this is hidden from the user.
case operation_canceled:
return timedout;
// This is a condition which we hide from the user.
case bad_file_descriptor:
return false;
// Everything else is passed up to the user.
default:
return true;
}
else if(ec.category() == get_ssl_category()) switch(uint8_t(ec.value()))
{
// Docs say this means we read less bytes off the socket than desired.
case SSL_R_SHORT_READ:
return true;
default:
return true;
}
else if(ec.category() == get_misc_category()) switch(ec.value())
{
// This indicates the remote closed the socket, we still
// pass this up to the user so they can know that too.
case boost::asio::error::eof:
return true;
default:
return true;
}
assert(0);
return true;
}
void
@ -1265,9 +1186,9 @@ noexcept try
// A 'success' for this handler means there was a timeout on the socket
case success:
{
sd.cancel();
assert(timedout == false);
timedout = true;
sd.cancel();
break;
}
@ -1287,9 +1208,10 @@ noexcept try
}
catch(const boost::system::system_error &e)
{
log.error("socket(%p): handle_timeout: unexpected: %s\n",
(const void *)this,
e.what());
log.critical("socket(%p): handle_timeout: unexpected: %s\n",
(const void *)this,
e.what());
assert(0);
}
catch(const std::exception &e)
{
@ -1298,6 +1220,84 @@ catch(const std::exception &e)
e.what());
}
void
ircd::net::socket::call_user(const handler &callback,
const error_code &ec)
noexcept try
{
callback(ec);
}
catch(const std::exception &e)
{
log.critical("socket(%p): async handler: unhandled exception: %s",
this,
e.what());
}
void
ircd::net::socket::blocking(const bool &b)
{
sd.non_blocking(b);
}
void
ircd::net::socket::wbufsz(const size_t &bytes)
{
assert(bytes <= std::numeric_limits<int>::max());
ip::tcp::socket::send_buffer_size option
{
int(bytes)
};
sd.set_option(option);
}
void
ircd::net::socket::rbufsz(const size_t &bytes)
{
assert(bytes <= std::numeric_limits<int>::max());
ip::tcp::socket::receive_buffer_size option
{
int(bytes)
};
sd.set_option(option);
}
bool
ircd::net::socket::blocking()
const
{
return !sd.non_blocking();
}
size_t
ircd::net::socket::wbufsz()
const
{
ip::tcp::socket::send_buffer_size option{};
sd.get_option(option);
return option.value();
}
size_t
ircd::net::socket::rbufsz()
const
{
ip::tcp::socket::receive_buffer_size option{};
sd.get_option(option);
return option.value();
}
size_t
ircd::net::socket::readable()
const
{
ip::tcp::socket::bytes_readable command{true};
const_cast<ip::tcp::socket &>(sd).io_control(command);
return command.get();
}
size_t
ircd::net::socket::available()
const
@ -1305,6 +1305,20 @@ const
return sd.available();
}
boost::asio::ip::tcp::endpoint
ircd::net::socket::local()
const
{
return sd.local_endpoint();
}
boost::asio::ip::tcp::endpoint
ircd::net::socket::remote()
const
{
return sd.remote_endpoint();
}
bool
ircd::net::socket::connected()
const noexcept try
@ -1326,20 +1340,6 @@ noexcept
return ec;
}
boost::asio::ip::tcp::endpoint
ircd::net::socket::local()
const
{
return sd.local_endpoint();
}
boost::asio::ip::tcp::endpoint
ircd::net::socket::remote()
const
{
return sd.remote_endpoint();
}
void
ircd::net::socket::set_timeout(const milliseconds &t)
{