0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-17 10:01:51 +01:00

[temp] checkpoint some crap which isn't going to stick.

This commit is contained in:
Jason Volk 2016-09-22 23:59:24 -07:00
parent c0ce3f4808
commit 757d7e10a8
7 changed files with 385 additions and 157 deletions

View file

@ -30,6 +30,12 @@
#ifdef __cplusplus
namespace ircd {
IRCD_EXCEPTION(ircd::error, client_error)
IRCD_EXCEPTION(client_error, broken_pipe)
IRCD_EXCEPTION(client_error, disconnected)
// ctx::timeout also relevant
// ctx::interrupted also relevant
struct sock;
struct client;
@ -57,24 +63,14 @@ enum class dc
FIN_RECV, // graceful shutdown recv side
};
bool connected(const client &) noexcept;
bool disconnect(std::nothrow_t, client &, const dc & = dc::FIN) noexcept;
void disconnect(client &, const dc & = dc::FIN);
void recv_cancel(client &);
void recv_next(client &, const std::chrono::milliseconds &timeout);
void recv_next(client &);
void disconnect_all();
class sendf
:std::array<char, BUFSIZE>
,public fmt::snprintf
{
void flush(client &);
public:
template<class... Args>
sendf(client &, const char *const &fmt, Args&&... args);
};
void async_recv_cancel(client &);
void async_recv_next(client &, const std::chrono::milliseconds &timeout);
void async_recv_next(client &);
// Destroys a client. This only removes the client from the clients list,
// and may result in a destruction and disconnect, or it may not.
@ -87,17 +83,97 @@ std::shared_ptr<client> add_client(std::shared_ptr<struct sock>);
using clist = std::list<std::shared_ptr<client>>;
const clist &clients();
extern hook::sequence<client &> h_client_added;
template<class... Args>
sendf::sendf(client &client,
const char *const &fmt,
Args&&... args)
:fmt::snprintf
void execute(client &client, line);
void execute(client &client, tape &);
void execute(client &client, const std::string &line);
void execute(client &client, const uint8_t *const &line, const size_t &len);
//
// contexted I/O
//
IRCD_OVERLOAD(text_raw); // Sends string as-is
IRCD_OVERLOAD(text_terminate); // Sends terminator "\r\n" after string
void send(client &, text_raw_t, const char *const &, const size_t &len);
void send(client &, text_raw_t, const char *const &);
void send(client &, text_raw_t, const std::string &);
void send(client &, text_terminate_t, const char *const &, const size_t &len);
void send(client &, text_terminate_t, const char *const &);
void send(client &, text_terminate_t, const std::string &);
template<class... A> ssize_t snsendf(client &, char *const &buf, const size_t &max, const char *const &fmt, A&&... args);
template<class... A> ssize_t sendf(client &, const char *const &fmt, A&&... args);
size_t recv(client &, char *const &buf, const size_t &max, milliseconds &timeout);
template<class duration> size_t recv(client &, char *const &buf, const size_t &max, const duration &timeout = -1s);
template<class duration> uint recv(client &, tape &, const duration &timeout = -1s);
template<class duration> line recv(client &, const duration &timeout = -1s);
struct client_init
{
array::data(), array::size(), fmt, std::forward<Args>(args)...
client_init();
~client_init() noexcept;
};
template<class duration>
line
recv(client &client,
const duration &timeout)
{
return {};
}
template<class duration>
uint
recv(client &client,
tape &tape,
const duration &timeout)
{
flush(client);
size_t len(0);
size_t before(tape.size());
milliseconds remaining(timeout);
char buf[BUFSIZE]; do
{
len += recv(client, buf + len, sizeof(buf) - len, remaining);
}
while(!tape.append(buf, len));
return tape.size() - before;
}
template<class duration>
size_t
recv(client &client,
char *const &buf,
const size_t &max,
const duration &timeout)
{
return recv(client, buf, max, milliseconds(timeout));
}
template<class... A>
ssize_t
sendf(client &client,
const char *const &fmt,
A&&... args)
{
char buf[BUFSIZE];
return snsendf(client, buf, sizeof(buf), fmt, std::forward<A>(args)...);
}
template<class... A>
ssize_t
snsendf(client &client,
char *const &buf,
const size_t &max,
const char *const &fmt,
A&&... args)
{
const ssize_t len{fmt::snprintf{buf, max, fmt, std::forward<A>(args)...}};
send(client, text_terminate, buf, std::min(len, ssize_t(BUFSIZE-2)));
return len;
}
} // namespace ircd

View file

@ -51,12 +51,18 @@ struct sock
void handle_timeout(const std::weak_ptr<sock>, const error_code &);
template<class duration> void set_timeout(const duration &);
template<class mutable_buffers> auto recv_some(const mutable_buffers &, error_code &ec, const message_flags & = 0);
template<class mutable_buffers> auto recv_some(const mutable_buffers &, const message_flags & = 0);
template<class mutable_buffers> auto recv(const mutable_buffers &, error_code &ec);
template<class mutable_buffers> auto recv(const mutable_buffers &);
template<class const_buffers> auto send_some(const const_buffers &, error_code &ec, const message_flags & = 0);
template<class const_buffers> auto send_some(const const_buffers &, const message_flags & = 0);
template<class const_buffers> auto send(const const_buffers &, error_code &ec);
template<class const_buffers> auto send(const const_buffers &);
void disconnect(const dc &type = dc::FIN);
sock(boost::asio::io_service *const &ios = ircd::ios);
};
@ -74,6 +80,19 @@ sock::sock(boost::asio::io_service *const &ios)
{
}
inline void
sock::disconnect(const dc &type)
{
if(sd.is_open()) switch(type)
{
default:
case dc::RST: sd.close(); break;
case dc::FIN: sd.shutdown(ip::tcp::socket::shutdown_both); break;
case dc::FIN_SEND: sd.shutdown(ip::tcp::socket::shutdown_send); break;
case dc::FIN_RECV: sd.shutdown(ip::tcp::socket::shutdown_receive); break;
}
}
// Block until entirely transmitted
template<class const_buffers>
auto
@ -84,6 +103,16 @@ sock::send(const const_buffers &bufs)
return ret;
}
template<class const_buffers>
auto
sock::send(const const_buffers &bufs,
error_code &ec)
{
const auto ret(async_write(sd, bufs, yield(continuation())[ec]));
timer.cancel();
return ret;
}
// Block until something transmitted, returns amount
template<class const_buffers>
auto
@ -93,6 +122,15 @@ sock::send_some(const const_buffers &bufs,
return sd.async_send(bufs, flags, yield(continuation()));
}
template<class const_buffers>
auto
sock::send_some(const const_buffers &bufs,
error_code &ec,
const message_flags &flags)
{
return sd.async_send(bufs, flags, yield(continuation())[ec]);
}
// Block until the buffers are completely full
template<class mutable_buffers>
auto
@ -103,6 +141,16 @@ sock::recv(const mutable_buffers &bufs)
return ret;
}
template<class mutable_buffers>
auto
sock::recv(const mutable_buffers &bufs,
error_code &ec)
{
const auto ret(async_read(sd, bufs, yield(continuation())[ec]));
timer.cancel();
return ret;
}
// Block until something in buffers, returns size
template<class mutable_buffers>
auto
@ -112,6 +160,15 @@ sock::recv_some(const mutable_buffers &bufs,
return sd.async_receive(bufs, flags, yield(continuation()));
}
template<class mutable_buffers>
auto
sock::recv_some(const mutable_buffers &bufs,
error_code &ec,
const message_flags &flags)
{
return sd.async_receive(bufs, flags, yield(continuation())[ec]);
}
template<class duration>
void
sock::set_timeout(const duration &t)

View file

@ -22,23 +22,9 @@
#pragma once
#define HAVE_IRCD_VM_H
#ifdef __cplusplus
namespace ircd {
namespace vm {
struct pool
{
};
void execute(client &client, line);
void execute(client &client, tape &);
void execute(client &client, const std::string &line);
void execute(client &client, const uint8_t *const &line, const size_t &len);
} // namespace vm
using vm::execute;
} // namespace ircd
#endif

View file

@ -29,32 +29,48 @@
#include <ircd/sock.h>
#include "rbuf.h"
namespace ircd
namespace ircd {
struct client
:std::enable_shared_from_this<client>
{
struct client
:std::enable_shared_from_this<client>
{
struct rbuf rbuf;
clist::const_iterator clit;
std::shared_ptr<struct sock> sock;
struct rbuf rbuf;
clist::const_iterator clit;
std::shared_ptr<struct sock> sock;
client();
client(const client &) = delete;
client &operator=(const client &) = delete;
~client() noexcept;
};
client();
client(const client &) = delete;
client &operator=(const client &) = delete;
~client() noexcept;
};
clist clients_list;
std::unique_ptr<db::handle> client_db;
clist client_list;
bool handle_ec_eof(client &);
bool handle_ec_cancel(client &);
bool handle_ec_success(client &);
bool handle_ec(client &, const error_code &);
void handle_recv(client &, const error_code &, const size_t);
}
hook::sequence<client &> h_client_added;
bool handle_ec_eof(client &);
bool handle_ec_cancel(client &);
bool handle_ec_success(client &);
bool handle_ec(client &, const error_code &);
void handle_recv(client &, const error_code &, const size_t);
} // namespace ircd
using namespace ircd;
client_init::client_init()
{
client_db.reset(new db::handle("clients"));
}
client_init::~client_init()
noexcept
{
disconnect_all();
client_db.reset(nullptr);
}
client::client()
{
}
@ -67,7 +83,7 @@ noexcept
const clist &
ircd::clients()
{
return clients_list;
return client_list;
}
std::shared_ptr<client>
@ -79,7 +95,8 @@ ircd::add_client(std::shared_ptr<struct sock> sock)
string(remote_address(*client)).c_str(),
string(local_address(*client)).c_str());
recv_next(*client);
h_client_added(*client);
async_recv_next(*client);
return client;
}
@ -87,34 +104,137 @@ std::shared_ptr<client>
ircd::add_client()
{
auto client(std::make_shared<client>());
client->clit = clients_list.emplace(end(clients_list), client);
client->clit = client_list.emplace(end(client_list), client);
return client;
}
void
ircd::sendf::flush(client &client)
ircd::finished(client &client)
{
static const char *const terminator
{
"\r\n"
};
const auto p(shared_from(client));
client_list.erase(client.clit);
log::debug("client[%p] finished. (refs: %zu)", (const void *)p.get(), p.use_count());
}
size_t
ircd::recv(client &client,
char *const &buf,
const size_t &max,
milliseconds &timeout)
try
{
using std::chrono::duration_cast;
auto &sock(socket(client));
sock.set_timeout(timeout);
const auto ret(sock.recv_some(mutable_buffers_1(buf, max)));
if(timeout < milliseconds(0))
return ret;
sock.timer.cancel();
timeout = duration_cast<milliseconds>(sock.timer.expires_from_now());
return ret;
}
catch(const boost::system::system_error &e)
{
using namespace boost::system::errc;
using boost::asio::error::eof;
switch(e.code().value())
{
case success:
return 0;
case eof:
throw disconnected();
case operation_canceled:
if(socket(client).timedout)
throw ctx::timeout();
else
throw ctx::interrupted();
default:
throw error("%s", e.what());
}
}
void
ircd::send(client &client,
text_terminate_t,
const std::string &text)
{
send(client, text_terminate, text.c_str(), text.size());
}
void
ircd::send(client &client,
text_terminate_t,
const char *const &text)
{
send(client, text_terminate, text, strlen(text));
}
void
ircd::send(client &client,
text_terminate_t,
const char *const &text,
const size_t &len)
{
assert(len <= 510);
const std::array<const_buffer, 2> buffers
{{
{ buffer(), std::min(consumed(), size_t(510)) }, //TODO: framemax
{ terminator, 2 }
{ text, len },
{ "\r\n", 2 }
}};
auto &sock(socket(client));
sock.send(buffers);
}
void
ircd::send(client &client,
text_raw_t,
const std::string &text)
{
send(client, text_raw, text.c_str(), text.size());
}
void
ircd::send(client &client,
text_raw_t,
const char *const &text)
{
send(client, text_raw, text, strlen(text));
}
void
ircd::send(client &client,
text_raw_t,
const char *const &text,
const size_t &len)
{
assert(len <= 512);
auto &sock(socket(client));
sock.send(const_buffers_1(text, len));
}
void
ircd::disconnect_all()
{
for(auto &client : client_list)
disconnect(std::nothrow, *client, dc::RST);
}
bool
ircd::disconnect(std::nothrow_t,
client &client,
const dc &type)
noexcept try
{
if(!client.sock)
return true;
disconnect(client, type);
return true;
}
@ -127,25 +247,8 @@ void
ircd::disconnect(client &client,
const dc &type)
{
auto &sd(socket(client).sd);
switch(type)
{
case dc::RST:
sd.close();
break;
case dc::FIN:
sd.shutdown(ip::tcp::socket::shutdown_both);
break;
case dc::FIN_SEND:
sd.shutdown(ip::tcp::socket::shutdown_send);
break;
case dc::FIN_RECV:
sd.shutdown(ip::tcp::socket::shutdown_receive);
break;
}
auto &sock(socket(client));
sock.disconnect(type);
}
bool
@ -161,14 +264,14 @@ catch(...)
}
void
ircd::recv_next(client &client)
ircd::async_recv_next(client &client)
{
recv_next(client, std::chrono::milliseconds(-1));
async_recv_next(client, std::chrono::milliseconds(-1));
}
void
ircd::recv_next(client &client,
const std::chrono::milliseconds &timeout)
ircd::async_recv_next(client &client,
const std::chrono::milliseconds &timeout)
{
using boost::asio::async_read;
@ -183,7 +286,7 @@ ircd::recv_next(client &client,
}
void
ircd::recv_cancel(client &client)
ircd::async_recv_cancel(client &client)
{
auto &sock(socket(client));
sock.sd.cancel();
@ -272,11 +375,66 @@ ircd::handle_ec_eof(client &client)
}
void
ircd::finished(client &client)
ircd::execute(client &client,
const uint8_t *const &ptr,
const size_t &len)
{
const auto p(shared_from(client));
clients_list.erase(client.clit);
log::debug("client[%p] finished. (refs: %zu)", (const void *)p.get(), p.use_count());
execute(client, line(ptr, len));
}
void
ircd::execute(client &client,
const std::string &l)
{
execute(client, line(l));
}
void
ircd::execute(client &client,
tape &reel)
{
context([wp(weak_from(client)), &client, &reel]
{
// Hold the client for the lifetime of this context
const life_guard<struct client> lg(wp);
while(!reel.empty()) try
{
auto &line(reel.front());
const scope pop([&reel]
{
reel.pop_front();
});
if(line.empty())
continue;
auto &handle(cmds::find(command(line)));
handle(client, std::move(line));
}
catch(const std::exception &e)
{
log::error("vm: %s", e.what());
disconnect(client);
finished(client);
return;
}
async_recv_next(client);
},
ctx::DEFER_POST | ctx::SELF_DESTRUCT);
}
void
ircd::execute(client &c,
line line)
{
if(line.empty())
return;
auto &handle(cmds::find(command(line)));
handle(c, std::move(line));
}
std::string

View file

@ -94,6 +94,7 @@ noexcept try
ctx::ole::init _ole_;
mods::init _mods_;
db::init _db_;
client_init _client_;
// Create IRCd's agency
ircd::me = add_client();

View file

@ -20,68 +20,10 @@
*/
namespace ircd {
namespace vm {
} // namespace vm
} // namespace ircd
void
ircd::vm::execute(client &client,
const uint8_t *const &ptr,
const size_t &len)
{
execute(client, line(ptr, len));
}
void
ircd::vm::execute(client &client,
const std::string &l)
{
execute(client, line(l));
}
void
ircd::vm::execute(client &client,
tape &reel)
{
context([wp(weak_from(client)), &client, &reel]
{
// Hold the client for the lifetime of this context
const lifeguard<struct client> lg(wp);
while(!reel.empty()) try
{
auto &line(reel.front());
const scope pop([&reel]
{
reel.pop_front();
});
if(line.empty())
continue;
auto &handle(cmds::find(command(line)));
handle(client, std::move(line));
}
catch(const std::exception &e)
{
log::error("vm: %s", e.what());
disconnect(client);
finished(client);
return;
}
recv_next(client);
},
ctx::SELF_DESTRUCT);
}
void
ircd::vm::execute(client &client,
line line)
{
if(line.empty())
return;
auto &handle(cmds::find(command(line)));
handle(client, std::move(line));
}
using namespace ircd;

View file

@ -43,9 +43,16 @@ ip::tcp::resolver tcp_resolver
*ircd::ios
};
ctx::pool pool
{
1, // There can be X concurrent hostname lookups (if posted to this pool)
16_KiB // The stack size is 16 kilobytes (which is pretty small, so just resolve hostnames)
};
mapi::fini fini{[]
{
tcp_resolver.cancel();
pool.del(pool.size());
}};
mapi::header IRCD_MODULE
@ -60,15 +67,16 @@ m_host::operator()(client &client,
line line)
try
{
const lifeguard<struct client> lg(client);
const life_guard<struct client> lg(client);
const auto &host(line[0]);
const auto &port(has(line, 1)? line[1] : std::string{});
const ip::tcp::resolver::query query(host, port);
const ip::tcp::resolver::query query(host, std::string{});
auto epit(tcp_resolver.async_resolve(query, yield(continuation())));
static const ip::tcp::resolver::iterator end;
for(; epit != end; ++epit)
std::cout << "ep: " << epit->endpoint().address().to_string() << std::endl;
sendf(client, "lookup for %s returned [%s]",
host,
epit->endpoint().address().to_string());
}
catch(const std::exception &e)
{