From 848337a76bc1f614d2ea8999cfa064240ebbd861 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 2 Nov 2016 17:55:30 -0700 Subject: [PATCH] modules: Add net with Node.js compatible net.Socket / net.Server. --- modules/Makefile.am | 6 + modules/net/net.cc | 81 ++++++++++++ modules/net/net.h | 58 +++++++++ modules/net/server.cc | 239 ++++++++++++++++++++++++++++++++++ modules/net/socket.cc | 291 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 675 insertions(+) create mode 100644 modules/net/net.cc create mode 100644 modules/net/net.h create mode 100644 modules/net/server.cc create mode 100644 modules/net/socket.cc diff --git a/modules/Makefile.am b/modules/Makefile.am index 8b49ec5cc..cba24e941 100644 --- a/modules/Makefile.am +++ b/modules/Makefile.am @@ -41,6 +41,12 @@ dns_module_LTLIBRARIES = dns/dns.la dns_dns_la_SOURCES = dns/dns.cc \ dns/lookup.cc +net_moduledir=@moduledir@ +net_module_LTLIBRARIES = net/net.la +net_net_la_SOURCES = net/net.cc \ + net/socket.cc \ + net/server.cc + moduledir=@moduledir@ future_la_SOURCES = future.cc require_la_SOURCES = require.cc diff --git a/modules/net/net.cc b/modules/net/net.cc new file mode 100644 index 000000000..8ad3a83bb --- /dev/null +++ b/modules/net/net.cc @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2016 Charybdis Development Team + * Copyright (C) 2016 Jason Volk + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice is present in all copies. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "net.h" + +namespace ircd { +namespace js { + +extern trap socket; // socket.cc +extern trap server; // server.cc + +struct net +:trap +{ + bool on_has(object::handle obj, id::handle id) override + { + if(id == "Server") + return true; + + if(id == "Socket") + return true; + + return false; + } + + value on_get(object::handle obj, id::handle id, value::handle val) override; + + void on_new(object::handle outer, object &that, const args &args) override + { + } + + using trap::trap; +} +net +__attribute__((init_priority(1000))) +{ + "net" +}; + +} // namespace js +} // namespace ircd + +ircd::js::value +ircd::js::net::on_get(object::handle obj, + id::handle id, + value::handle val) +{ + if(!undefined(val)) + return val; + + if(id == "Server") + return ctor(server); + + if(id == "Socket") + return ctor(socket); + + return val; +} + +ircd::mapi::header IRCD_MODULE +{ + "Network server and socket support." +}; diff --git a/modules/net/net.h b/modules/net/net.h new file mode 100644 index 000000000..df92d45a1 --- /dev/null +++ b/modules/net/net.h @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2016 Charybdis Development Team + * Copyright (C) 2016 Jason Volk + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice is present in all copies. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include + +namespace ip = boost::asio::ip; + +namespace ircd { +namespace js { + +struct socket +:trap +{ + struct state; + + struct close; + struct write; + struct read; + struct connect; + + void on_new(object::handle, object &, const args &) override; + void on_gc(JSObject *const &) override; + + socket(); +}; + +struct socket::state +:priv_data +{ + ip::tcp::socket socket; + boost::asio::streambuf in; + boost::asio::streambuf out; + + state(const std::pair &buffer_size = { 1024, 1024 }); + ~state() noexcept; +}; + +} // namespace js +} // namespace ircd diff --git a/modules/net/server.cc b/modules/net/server.cc new file mode 100644 index 000000000..7d9b5863d --- /dev/null +++ b/modules/net/server.cc @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2016 Charybdis Development Team + * Copyright (C) 2016 Jason Volk + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice is present in all copies. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "net.h" + +namespace ircd { +namespace js { + +extern trap socket; + +struct server +:trap +{ + struct state; // privdata + + struct listen; + struct close; + + void on_new(object::handle, object &, const args &) override; + void on_trace(const JSObject *const &) override; + void on_gc(JSObject *const &that) override; + + server(); +} +server __attribute__((init_priority(1002))); + +struct server::state +:priv_data +{ + ip::tcp::endpoint ep; + ip::tcp::acceptor acceptor {*ios}; + + state(const string &host, const value &port); + ~state() noexcept; +}; + +struct server::listen +:trap::function +{ + trap &future{trap::find("future")}; + + value on_call(object::handle, value::handle, const args &args) override; + using trap::function::function; +} +static listen{server, "listen"}; + +struct server::close +:trap::function +{ + trap &future{trap::find("future")}; + + value on_call(object::handle, value::handle, const args &args) override; + using trap::function::function; +} +static close{server, "close"}; + +} // namespace js +} // namespace ircd + +ircd::js::server::state::state(const string &host, + const value &port) +:ep{ip::address::from_string(host), uint16_t(port)} +{ +} + +ircd::js::server::state::~state() +noexcept +{ +} + +ircd::js::server::server() +:trap +{ + "Server", // Uppercase for Node.js compatibility. + JSCLASS_HAS_PRIVATE, +} +{ + parent_prototype = &trap::find("events"); +} + +void +ircd::js::server::on_new(object::handle that, + object &obj, + const args &args) +{ +} + +void +ircd::js::server::on_trace(const JSObject *const &that) +{ + trap::on_trace(that); +} + +void +ircd::js::server::on_gc(JSObject *const &that) +try +{ + const scope always_parent([this, &that] + { + trap::on_gc(that); + }); + + if(!has(that, priv)) + return; + + auto &state(get(that, priv)); + auto &acceptor(state.acceptor); + boost::system::error_code ec; + acceptor.cancel(ec); +} +catch(const std::exception &e) +{ + log.warning("server::on_gc(%p): %s\n", + (const void *)this, + e.what()); +} + +ircd::js::value +ircd::js::server::close::on_call(object::handle obj, + value::handle _that, + const args &args) +{ + object that(_that); + + object emission; + set(emission, "event", "close"); + set(emission, "emitter", that); + + contract result(ctor(future)); + set(result.future, "emit", emission); + if(args.has(0)) + set(result.future, "callback", args[0]); + + auto &state(get(that, priv)); + auto &acceptor(state.acceptor); + result([&acceptor] + { + boost::system::error_code ec; + acceptor.close(ec); + jserror e("%s", ec.message().c_str()); + return e.val.get(); + }); + + return result; +} + + +ircd::js::value +ircd::js::server::listen::on_call(object::handle obj, + value::handle _that, + const args &args) +{ + object that(_that); + object opts(args[0]); + + const value port + { + has(opts, "port")? get(opts, "port") : value{6667} + }; + + const string host + { + has(opts, "host")? get(opts, "host") : value{"localhost"} + }; + + const value backlog + { + has(opts, "backlog")? get(opts, "backlog") : value{4096} + }; + + const string path + { + has(opts, "path")? get(opts, "path") : value{} + }; + + const bool exclusive + { + has(opts, "exclusive")? bool(get(opts, "exclusive")) : false + }; + + if(!has(that, priv)) + { + auto state(std::make_shared(host, port)); + state->acceptor.open(state->ep.protocol()); + state->acceptor.set_option(ip::tcp::socket::reuse_address(true)); + state->acceptor.bind(state->ep); + state->acceptor.listen(uint(backlog)); + set(that, state); + } + + auto &state(get(that, priv)); + + object emission; + set(emission, "event", "connection"); + set(emission, "emitter", that); + + contract result(ctor(future)); + set(result.future, "emit", emission); + set(result.future, "cancel", get(that, "close")); + if(args.has(1)) + set(result.future, "callback", args[1]); + + object socket_instance(ctor(socket)); + auto &sstate(get(socket_instance, priv)); + auto accepted([result, socket_instance(heap_object(socket_instance)), state(shared_from(state)), sstate(shared_from(sstate))] + (const boost::system::error_code &e) + mutable + { + result([&e, &result, &state, &socket_instance]() -> value + { + if(e) + throw jserror("%s", e.message().c_str()); + + call("read", socket_instance); + return socket_instance; + }); + }); + + state.acceptor.async_accept(sstate.socket, std::move(accepted)); + return value(result); +} diff --git a/modules/net/socket.cc b/modules/net/socket.cc new file mode 100644 index 000000000..46d103fcc --- /dev/null +++ b/modules/net/socket.cc @@ -0,0 +1,291 @@ +/* + * Copyright (C) 2016 Charybdis Development Team + * Copyright (C) 2016 Jason Volk + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice is present in all copies. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "net.h" +#include + +namespace ircd { +namespace js { + +struct socket socket __attribute__((init_priority(1001))); + +ircd::js::socket::socket() +:trap +{ + "Socket", // Uppercase for Node.js compatibility. + JSCLASS_HAS_PRIVATE +} +{ + parent_prototype = &trap::find("stream"); +} + +ircd::js::socket::state::state(const std::pair &buffer_size) +:socket{*ios} +,in{buffer_size.first} +,out{buffer_size.second} +{ +} + +ircd::js::socket::state::~state() +noexcept +{ +} + +void +ircd::js::socket::on_gc(JSObject *const &that) +try +{ + const scope always_parent([this, &that] + { + trap::on_gc(that); + }); + + if(!has(that, priv)) + return; + + auto &state(get(that, priv)); + auto &socket(state.socket); + boost::system::error_code ec; + socket.cancel(ec); +} +catch(const std::exception &e) +{ + log.warning("socket::on_gc(%p): %s\n", + (const void *)this, + e.what()); +} + +void +ircd::js::socket::on_new(object::handle that, + object &obj, + const args &args) +{ +// auto &stream(trap::find("stream")); +// obj.prototype(ctor(stream)); + +// set(obj, "connect", connect(obj)); +// set(obj, "close", close(obj)); +// set(obj, "write", write(obj)); +// set(obj, "read", read(obj)); + + set(obj, std::make_shared()); +} + +struct socket::close +:trap::function +{ + trap &future{trap::find("future")}; + + value on_call(object::handle obj, value::handle, const args &args) override; + using trap::function::function; +} +static close{socket, "close"}; + +ircd::js::value +ircd::js::socket::close::on_call(object::handle obj, + value::handle _that, + const args &args) +{ + object that(_that); + + object emission; + set(emission, "event", "close"); + set(emission, "emitter", that); + + contract result(ctor(future)); + set(result.future, "emit", emission); + if(args.has(0)) + set(result.future, "callback", args[0]); + + auto &state(get(that, priv)); + auto &socket(state.socket); + result([&socket] + { + boost::system::error_code ec; + socket.shutdown(ip::tcp::socket::shutdown_send, ec); + socket.shutdown(ip::tcp::socket::shutdown_receive, ec); + jserror e("%s", ec.message().c_str()); + return e.val.get(); + }); + + return result; +} + +struct socket::read +:trap::function +{ + trap &future{trap::find("future")}; + + value on_call(object::handle obj, value::handle, const args &args) override; + using trap::function::function; +} +static read{socket, "read"}; + +ircd::js::value +ircd::js::socket::read::on_call(object::handle obj, + value::handle _that, + const args &args) +{ + object that(_that); + auto &state(get(that, priv)); + auto &socket(state.socket); + + object emission; + set(emission, "event", "data"); + set(emission, "emitter", that); + + contract result(ctor(future)); + set(result.future, "emit", emission); + set(result.future, "cancel", get(that, "close")); + + const auto finisher([result, that(heap_object(that)), state(shared_from(state))] + (const boost::system::error_code &e, const size_t bytes) + mutable -> void + { + result([&]() -> value + { + if(e) + throw jserror("%s", e.message().c_str()); + + if(!bytes) + throw jserror("empty message"); + + string ret(buffer_cast(state->in.data()), bytes); + state->in.consume(bytes); + call("read", that); + return ret; + }); + }); + + const auto condition([&state] + (const boost::system::error_code &e, const size_t bytes) + -> size_t + { + if(e) + return 0; + + if(bytes > 0) + return 0; + + return state.in.max_size() - state.in.size(); + }); + + boost::asio::async_read(socket, state.in, condition, finisher); + return result; +} + +struct socket::write +:trap::function +{ + trap &future{trap::find("future")}; + + value on_call(object::handle, value::handle, const args &args) override; + using trap::function::function; +} +static write{socket, "write"}; + +ircd::js::value +ircd::js::socket::write::on_call(object::handle obj, + value::handle _that, + const args &args) +{ + const object that(_that); + auto &state(get(that, priv)); + auto &socket(state.socket); + + const string data(args[0]); + auto buffer(state.out.prepare(size(data))); + copy(const_buffer(data.c_str(), data.size()), buffer); + state.out.commit(size(data)); + + object emission; + set(emission, "event", "drain"); + set(emission, "emitter", that); + + contract result(ctor(future)); + set(result.future, "emit", emission); + set(result.future, "cancel", get(that, "close")); + + boost::asio::async_write(socket, state.out, [result, state(shared_from(state))] + (const boost::system::system_error &e, const size_t &bytes) + mutable + { + result([&]() -> value + { + if(e.code()) + throw jserror("%s", e.what()); + + return bytes; + }); + }); + + return result; +} + +struct socket::connect +:trap::function +{ + trap &future{trap::find("future")}; + + value on_call(object::handle obj, value::handle, const args &args) override; + using trap::function::function; +} +static connect{socket, "connect"}; + +ircd::js::value +ircd::js::socket::connect::on_call(object::handle obj, + value::handle _that, + const args &args) +{ + const object that(_that); + const object options(args[0]); + const std::string host(get(options, "host")); + const uint16_t port(get(options, "port")); + ip::tcp::endpoint ep(ip::address::from_string(host), port); + + object emission; + set(emission, "event", "connect"); + set(emission, "emitter", that); + + contract result(ctor(future)); + set(result.future, "emit", emission); + set(result.future, "cancel", get(that, "close")); + + auto &state(get(that, priv)); + state.socket.async_connect(ep, [result, that(heap_object(that)), state(shared_from(state))] + (const boost::system::system_error &e) + mutable + { + result([&e, &that]() -> value + { + if(e.code()) + throw jserror("%s", e.what()); + + call("read", that); + return true; + }); + }); + + return result; +} + +} // namespace js +} // namespace ircd