0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-02 18:18:56 +02:00

ircd: Add low-level net utils for client (non-abstract).

This commit is contained in:
Jason Volk 2016-09-12 14:07:46 -07:00
parent 221145b31b
commit e87a8cc5d2
3 changed files with 242 additions and 80 deletions

View file

@ -32,22 +32,38 @@ namespace ircd {
struct sock;
struct client;
using clist = std::list<std::shared_ptr<client>>;
std::shared_ptr<client> shared_from(client &);
std::weak_ptr<client> weak_from(client &);
// Client socket addressing
bool has_socket(const client &);
const sock &socket(const client &);
sock &socket(client &);
using ip_port_pair = std::pair<std::string, uint16_t>;
using ip_port = IRCD_WEAK_T(ip_port_pair);
ip_port remote_address(const client &);
ip_port local_address(const client &);
std::string string(const ip_port &);
std::shared_ptr<client> shared_from(client &);
std::weak_ptr<client> weak_from(client &);
enum class dc
{
RST, // hardest disconnect
FIN, // graceful shutdown both directions
FIN_SEND, // graceful shutdown send side
FIN_RECV, // graceful shutdown recv side
};
bool connected(const client &) noexcept;
bool disconnect(std::nothrow_t, client &, const dc & = dc::FIN) noexcept;
void disconnect(client &, const dc & = dc::FIN);
// Makes a client
std::shared_ptr<client> add_client();
std::shared_ptr<client> add_client(std::unique_ptr<struct sock>);
using clist = std::list<std::shared_ptr<client>>;
const clist &clients();
} // namespace ircd

View file

@ -26,8 +26,9 @@
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include "bufs.h"
#include "ctx_ctx.h"
namespace ircd {
namespace ircd {
namespace ip = boost::asio::ip;
using boost::system::error_code;
@ -35,22 +36,21 @@ using boost::asio::steady_timer;
struct sock
{
using message_flags = boost::asio::socket_base::message_flags;
ip::tcp::socket sd;
steady_timer timer;
std::exception_ptr eptr;
std::array<char, BUFSIZE> rbuf alignas(16);
uint16_t checked;
uint16_t length;
tape reel;
operator const ip::tcp::socket &() const { return sd; }
operator ip::tcp::socket &() { return sd; }
ip::tcp::endpoint remote() const { return sd.remote_endpoint(); }
ip::tcp::endpoint local() const { return sd.local_endpoint(); }
bool terminated() const;
uint16_t remaining() const;
size_t handle_pck(const error_code &, const size_t) noexcept;
template<class mutable_buffers> auto recv_some(const mutable_buffers &, const message_flags & = 0);
template<class mutable_buffers> auto recv(const mutable_buffers &);
template<class const_buffers> auto send_some(const const_buffers &, const message_flags & = 0);
template<class const_buffers> auto send(const const_buffers &);
sock(boost::asio::io_service *const &ios = ircd::ios);
};
@ -62,6 +62,47 @@ ip::address local_address(const sock &);
std::string local_ip(const sock &);
uint16_t local_port(const sock &);
inline
sock::sock(boost::asio::io_service *const &ios)
:sd{*ios}
,timer{*ios}
{
}
// Block until entirely transmitted
template<class const_buffers>
auto
sock::send(const const_buffers &bufs)
{
return async_write(sd, bufs, yield(continuation()));
}
// Block until something transmitted, returns amount
template<class const_buffers>
auto
sock::send_some(const const_buffers &bufs,
const message_flags &flags)
{
return sd.async_send(bufs, flags, yield(continuation()));
}
// Block until the buffers are completely full
template<class mutable_buffers>
auto
sock::recv(const mutable_buffers &bufs)
{
return async_read(sd, bufs, yield(continuation()));
}
// Block until something in buffers, returns size
template<class mutable_buffers>
auto
sock::recv_some(const mutable_buffers &bufs,
const message_flags &flags)
{
return sd.async_receive(bufs, flags, yield(continuation()));
}
inline uint16_t
local_port(const sock &sock)
{
@ -98,8 +139,5 @@ remote_address(const sock &sock)
return sock.remote().address();
}
static_assert(BUFSIZE == 512, "");
} // namespace ircd
#endif // __cplusplus

View file

@ -28,11 +28,91 @@
#include <boost/asio.hpp>
#include <ircd/sock.h>
#ifdef __cplusplus
namespace ircd {
struct rbuf
{
tape reel;
std::exception_ptr eptr;
std::array<char, BUFSIZE> buf alignas(16);
uint16_t checked;
uint16_t length;
bool terminated() const; // rbuf has CRLF termination
uint16_t remaining() const; // bytes remaining in rbuf
size_t handle_pck(const error_code &, const size_t) noexcept;
void reset();
rbuf();
};
static_assert(BUFSIZE == 512, "");
inline
rbuf::rbuf()
:checked{0}
,length{0}
{
}
inline void
rbuf::reset()
{
checked = 0;
length = 0;
}
inline size_t
rbuf::handle_pck(const error_code &ec,
const size_t bytes)
noexcept try
{
if(ec)
return 0;
length += bytes;
if(reel.append(buf.data(), length))
return 0;
if(terminated())
throw rfc1459::syntax_error("invalid syntax"); //TODO: eps + ERR_
checked = length;
return remaining()?: throw rfc1459::syntax_error("message length exceeded"); //TODO: ERR_
}
catch(...)
{
eptr = std::current_exception();
return 0;
}
inline uint16_t
ircd::rbuf::remaining()
const
{
return buf.size() - length;
}
inline bool
rbuf::terminated()
const
{
const auto b(std::next(buf.rbegin(), remaining()));
const auto e(std::next(buf.rbegin(), buf.size() - checked));
return std::find(b, e, '\n') != e;
}
} // namespace ircd
#endif // __cplusplus
namespace ircd
{
struct client
:std::enable_shared_from_this<client>
{
struct rbuf rbuf;
clist::const_iterator clit;
std::unique_ptr<struct sock> sock;
@ -87,16 +167,55 @@ ircd::add_client()
return client;
}
std::weak_ptr<client>
ircd::weak_from(client &client)
bool
ircd::disconnect(std::nothrow_t,
client &client,
const dc &type)
noexcept try
{
return shared_from(client);
disconnect(client, type);
return true;
}
catch(...)
{
return false;
}
std::shared_ptr<client>
ircd::shared_from(client &client)
void
ircd::disconnect(client &client,
const dc &type)
{
return client.shared_from_this();
auto &sd(socket(client).sd);
switch(type)
{
case dc::RST:
sd.close();
break;
case dc::FIN:
sd.shutdown(ip::tcp::socket::shutdown_both);
break;
case dc::FIN_SEND:
sd.shutdown(ip::tcp::socket::shutdown_send);
break;
case dc::FIN_RECV:
sd.shutdown(ip::tcp::socket::shutdown_receive);
break;
}
}
bool
ircd::connected(const client &client)
noexcept
try
{
return socket(client).sd.is_open();
}
catch(...)
{
return false;
}
void
@ -104,12 +223,12 @@ ircd::set_recv(client &client)
{
using boost::asio::async_read;
auto &sock(*client.sock);
sock.checked = 0;
sock.length = 0;
auto &rbuf(client.rbuf);
rbuf.reset();
async_read(sock.sd, mutable_buffers_1(sock.rbuf.data(), sock.rbuf.size()),
std::bind(&sock::handle_pck, &sock, ph::_1, ph::_2),
auto &sock(*client.sock);
async_read(sock.sd, mutable_buffers_1(rbuf.buf.data(), rbuf.buf.size()),
std::bind(&rbuf::handle_pck, &rbuf, ph::_1, ph::_2),
std::bind(&ircd::handle_recv, std::ref(client), ph::_1, ph::_2));
}
@ -122,7 +241,8 @@ try
if(!handle_error(client, ec))
return;
auto &reel(client.sock->reel);
auto &rbuf(client.rbuf);
auto &reel(rbuf.reel);
for(const auto &line : reel)
std::cout << line << std::endl;
@ -145,8 +265,8 @@ ircd::handle_error(client &client,
using namespace boost::system::errc;
using boost::asio::error::eof;
if(client.sock && client.sock->eptr)
std::rethrow_exception(client.sock->eptr);
if(client.rbuf.eptr)
std::rethrow_exception(client.rbuf.eptr);
switch(ec.value())
{
@ -167,72 +287,60 @@ ircd::string(const ip_port &pair)
ircd::ip_port
ircd::local_address(const client &client)
try
{
if(!client.sock)
return { "0.0.0.0"s, 0 };
return { local_ip(*client.sock), local_port(*client.sock) };
const auto &sock(socket(client));
return { local_ip(sock), local_port(sock) };
}
catch(const std::bad_weak_ptr &)
{
return { "0.0.0.0"s, 0 };
}
ircd::ip_port
ircd::remote_address(const client &client)
try
{
if(!client.sock)
return { "0.0.0.0"s, 0 };
return { remote_ip(*client.sock), remote_port(*client.sock) };
const auto &sock(socket(client));
return { remote_ip(sock), remote_port(sock) };
}
catch(const std::bad_weak_ptr &)
{
return { "0.0.0.0"s, 0 };
}
///////////////////////////////////////////////////////////////////////////////
//
// client_sock.h
//
ircd::sock::sock(boost::asio::io_service *const &ios)
:sd{*ios}
,timer{*ios}
,checked{0}
,length{0}
sock &
ircd::socket(client &client)
{
if(unlikely(!has_socket(client)))
throw std::bad_weak_ptr();
return *client.sock;
}
size_t
ircd::sock::handle_pck(const error_code &ec,
const size_t bytes)
noexcept try
const sock &
ircd::socket(const client &client)
{
if(ec)
return 0;
if(unlikely(!has_socket(client)))
throw std::bad_weak_ptr();
length += bytes;
if(reel.append(rbuf.data(), length))
return 0;
if(terminated())
throw rfc1459::syntax_error("invalid syntax"); //TODO: eps + ERR_
checked = length;
return remaining()?: throw rfc1459::syntax_error("message length exceeded"); //TODO: ERR_
}
catch(...)
{
eptr = std::current_exception();
return 0;
}
uint16_t
ircd::sock::remaining()
const
{
return sizeof(rbuf) - length;
return *client.sock;
}
bool
ircd::sock::terminated()
const
ircd::has_socket(const client &client)
{
const auto b(std::next(rbuf.rbegin(), remaining()));
const auto e(std::next(rbuf.rbegin(), sizeof(rbuf) - checked));
return std::find(b, e, '\n') != e;
return bool(client.sock);
}
std::weak_ptr<client>
ircd::weak_from(client &client)
{
return shared_from(client);
}
std::shared_ptr<client>
ircd::shared_from(client &client)
{
return client.shared_from_this();
}