0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-16 15:00:51 +01:00

modules: Add net with Node.js compatible net.Socket / net.Server.

This commit is contained in:
Jason Volk 2016-11-02 17:55:30 -07:00
parent 7b4c06575f
commit 848337a76b
5 changed files with 675 additions and 0 deletions

View file

@ -41,6 +41,12 @@ dns_module_LTLIBRARIES = dns/dns.la
dns_dns_la_SOURCES = dns/dns.cc \ dns_dns_la_SOURCES = dns/dns.cc \
dns/lookup.cc dns/lookup.cc
net_moduledir=@moduledir@
net_module_LTLIBRARIES = net/net.la
net_net_la_SOURCES = net/net.cc \
net/socket.cc \
net/server.cc
moduledir=@moduledir@ moduledir=@moduledir@
future_la_SOURCES = future.cc future_la_SOURCES = future.cc
require_la_SOURCES = require.cc require_la_SOURCES = require.cc

81
modules/net/net.cc Normal file
View file

@ -0,0 +1,81 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "net.h"
namespace ircd {
namespace js {
extern trap socket; // socket.cc
extern trap server; // server.cc
struct net
:trap
{
bool on_has(object::handle obj, id::handle id) override
{
if(id == "Server")
return true;
if(id == "Socket")
return true;
return false;
}
value on_get(object::handle obj, id::handle id, value::handle val) override;
void on_new(object::handle outer, object &that, const args &args) override
{
}
using trap::trap;
}
net
__attribute__((init_priority(1000)))
{
"net"
};
} // namespace js
} // namespace ircd
ircd::js::value
ircd::js::net::on_get(object::handle obj,
id::handle id,
value::handle val)
{
if(!undefined(val))
return val;
if(id == "Server")
return ctor(server);
if(id == "Socket")
return ctor(socket);
return val;
}
ircd::mapi::header IRCD_MODULE
{
"Network server and socket support."
};

58
modules/net/net.h Normal file
View file

@ -0,0 +1,58 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <ircd/js/js.h>
#include <boost/asio.hpp>
namespace ip = boost::asio::ip;
namespace ircd {
namespace js {
struct socket
:trap
{
struct state;
struct close;
struct write;
struct read;
struct connect;
void on_new(object::handle, object &, const args &) override;
void on_gc(JSObject *const &) override;
socket();
};
struct socket::state
:priv_data
{
ip::tcp::socket socket;
boost::asio::streambuf in;
boost::asio::streambuf out;
state(const std::pair<size_t, size_t> &buffer_size = { 1024, 1024 });
~state() noexcept;
};
} // namespace js
} // namespace ircd

239
modules/net/server.cc Normal file
View file

@ -0,0 +1,239 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "net.h"
namespace ircd {
namespace js {
extern trap socket;
struct server
:trap
{
struct state; // privdata
struct listen;
struct close;
void on_new(object::handle, object &, const args &) override;
void on_trace(const JSObject *const &) override;
void on_gc(JSObject *const &that) override;
server();
}
server __attribute__((init_priority(1002)));
struct server::state
:priv_data
{
ip::tcp::endpoint ep;
ip::tcp::acceptor acceptor {*ios};
state(const string &host, const value &port);
~state() noexcept;
};
struct server::listen
:trap::function
{
trap &future{trap::find("future")};
value on_call(object::handle, value::handle, const args &args) override;
using trap::function::function;
}
static listen{server, "listen"};
struct server::close
:trap::function
{
trap &future{trap::find("future")};
value on_call(object::handle, value::handle, const args &args) override;
using trap::function::function;
}
static close{server, "close"};
} // namespace js
} // namespace ircd
ircd::js::server::state::state(const string &host,
const value &port)
:ep{ip::address::from_string(host), uint16_t(port)}
{
}
ircd::js::server::state::~state()
noexcept
{
}
ircd::js::server::server()
:trap
{
"Server", // Uppercase for Node.js compatibility.
JSCLASS_HAS_PRIVATE,
}
{
parent_prototype = &trap::find("events");
}
void
ircd::js::server::on_new(object::handle that,
object &obj,
const args &args)
{
}
void
ircd::js::server::on_trace(const JSObject *const &that)
{
trap::on_trace(that);
}
void
ircd::js::server::on_gc(JSObject *const &that)
try
{
const scope always_parent([this, &that]
{
trap::on_gc(that);
});
if(!has(that, priv))
return;
auto &state(get<struct state>(that, priv));
auto &acceptor(state.acceptor);
boost::system::error_code ec;
acceptor.cancel(ec);
}
catch(const std::exception &e)
{
log.warning("server::on_gc(%p): %s\n",
(const void *)this,
e.what());
}
ircd::js::value
ircd::js::server::close::on_call(object::handle obj,
value::handle _that,
const args &args)
{
object that(_that);
object emission;
set(emission, "event", "close");
set(emission, "emitter", that);
contract result(ctor(future));
set(result.future, "emit", emission);
if(args.has(0))
set(result.future, "callback", args[0]);
auto &state(get<struct state>(that, priv));
auto &acceptor(state.acceptor);
result([&acceptor]
{
boost::system::error_code ec;
acceptor.close(ec);
jserror e("%s", ec.message().c_str());
return e.val.get();
});
return result;
}
ircd::js::value
ircd::js::server::listen::on_call(object::handle obj,
value::handle _that,
const args &args)
{
object that(_that);
object opts(args[0]);
const value port
{
has(opts, "port")? get(opts, "port") : value{6667}
};
const string host
{
has(opts, "host")? get(opts, "host") : value{"localhost"}
};
const value backlog
{
has(opts, "backlog")? get(opts, "backlog") : value{4096}
};
const string path
{
has(opts, "path")? get(opts, "path") : value{}
};
const bool exclusive
{
has(opts, "exclusive")? bool(get(opts, "exclusive")) : false
};
if(!has(that, priv))
{
auto state(std::make_shared<struct state>(host, port));
state->acceptor.open(state->ep.protocol());
state->acceptor.set_option(ip::tcp::socket::reuse_address(true));
state->acceptor.bind(state->ep);
state->acceptor.listen(uint(backlog));
set(that, state);
}
auto &state(get<struct state>(that, priv));
object emission;
set(emission, "event", "connection");
set(emission, "emitter", that);
contract result(ctor(future));
set(result.future, "emit", emission);
set(result.future, "cancel", get(that, "close"));
if(args.has(1))
set(result.future, "callback", args[1]);
object socket_instance(ctor(socket));
auto &sstate(get<socket::state>(socket_instance, priv));
auto accepted([result, socket_instance(heap_object(socket_instance)), state(shared_from(state)), sstate(shared_from(sstate))]
(const boost::system::error_code &e)
mutable
{
result([&e, &result, &state, &socket_instance]() -> value
{
if(e)
throw jserror("%s", e.message().c_str());
call("read", socket_instance);
return socket_instance;
});
});
state.acceptor.async_accept(sstate.socket, std::move(accepted));
return value(result);
}

291
modules/net/socket.cc Normal file
View file

@ -0,0 +1,291 @@
/*
* Copyright (C) 2016 Charybdis Development Team
* Copyright (C) 2016 Jason Volk <jason@zemos.net>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice is present in all copies.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "net.h"
#include <ircd/bufs.h>
namespace ircd {
namespace js {
struct socket socket __attribute__((init_priority(1001)));
ircd::js::socket::socket()
:trap
{
"Socket", // Uppercase for Node.js compatibility.
JSCLASS_HAS_PRIVATE
}
{
parent_prototype = &trap::find("stream");
}
ircd::js::socket::state::state(const std::pair<size_t, size_t> &buffer_size)
:socket{*ios}
,in{buffer_size.first}
,out{buffer_size.second}
{
}
ircd::js::socket::state::~state()
noexcept
{
}
void
ircd::js::socket::on_gc(JSObject *const &that)
try
{
const scope always_parent([this, &that]
{
trap::on_gc(that);
});
if(!has(that, priv))
return;
auto &state(get<socket::state>(that, priv));
auto &socket(state.socket);
boost::system::error_code ec;
socket.cancel(ec);
}
catch(const std::exception &e)
{
log.warning("socket::on_gc(%p): %s\n",
(const void *)this,
e.what());
}
void
ircd::js::socket::on_new(object::handle that,
object &obj,
const args &args)
{
// auto &stream(trap::find("stream"));
// obj.prototype(ctor(stream));
// set(obj, "connect", connect(obj));
// set(obj, "close", close(obj));
// set(obj, "write", write(obj));
// set(obj, "read", read(obj));
set(obj, std::make_shared<socket::state>());
}
struct socket::close
:trap::function
{
trap &future{trap::find("future")};
value on_call(object::handle obj, value::handle, const args &args) override;
using trap::function::function;
}
static close{socket, "close"};
ircd::js::value
ircd::js::socket::close::on_call(object::handle obj,
value::handle _that,
const args &args)
{
object that(_that);
object emission;
set(emission, "event", "close");
set(emission, "emitter", that);
contract result(ctor(future));
set(result.future, "emit", emission);
if(args.has(0))
set(result.future, "callback", args[0]);
auto &state(get<socket::state>(that, priv));
auto &socket(state.socket);
result([&socket]
{
boost::system::error_code ec;
socket.shutdown(ip::tcp::socket::shutdown_send, ec);
socket.shutdown(ip::tcp::socket::shutdown_receive, ec);
jserror e("%s", ec.message().c_str());
return e.val.get();
});
return result;
}
struct socket::read
:trap::function
{
trap &future{trap::find("future")};
value on_call(object::handle obj, value::handle, const args &args) override;
using trap::function::function;
}
static read{socket, "read"};
ircd::js::value
ircd::js::socket::read::on_call(object::handle obj,
value::handle _that,
const args &args)
{
object that(_that);
auto &state(get<socket::state>(that, priv));
auto &socket(state.socket);
object emission;
set(emission, "event", "data");
set(emission, "emitter", that);
contract result(ctor(future));
set(result.future, "emit", emission);
set(result.future, "cancel", get(that, "close"));
const auto finisher([result, that(heap_object(that)), state(shared_from(state))]
(const boost::system::error_code &e, const size_t bytes)
mutable -> void
{
result([&]() -> value
{
if(e)
throw jserror("%s", e.message().c_str());
if(!bytes)
throw jserror("empty message");
string ret(buffer_cast<const char *>(state->in.data()), bytes);
state->in.consume(bytes);
call("read", that);
return ret;
});
});
const auto condition([&state]
(const boost::system::error_code &e, const size_t bytes)
-> size_t
{
if(e)
return 0;
if(bytes > 0)
return 0;
return state.in.max_size() - state.in.size();
});
boost::asio::async_read(socket, state.in, condition, finisher);
return result;
}
struct socket::write
:trap::function
{
trap &future{trap::find("future")};
value on_call(object::handle, value::handle, const args &args) override;
using trap::function::function;
}
static write{socket, "write"};
ircd::js::value
ircd::js::socket::write::on_call(object::handle obj,
value::handle _that,
const args &args)
{
const object that(_that);
auto &state(get<socket::state>(that, priv));
auto &socket(state.socket);
const string data(args[0]);
auto buffer(state.out.prepare(size(data)));
copy(const_buffer(data.c_str(), data.size()), buffer);
state.out.commit(size(data));
object emission;
set(emission, "event", "drain");
set(emission, "emitter", that);
contract result(ctor(future));
set(result.future, "emit", emission);
set(result.future, "cancel", get(that, "close"));
boost::asio::async_write(socket, state.out, [result, state(shared_from(state))]
(const boost::system::system_error &e, const size_t &bytes)
mutable
{
result([&]() -> value
{
if(e.code())
throw jserror("%s", e.what());
return bytes;
});
});
return result;
}
struct socket::connect
:trap::function
{
trap &future{trap::find("future")};
value on_call(object::handle obj, value::handle, const args &args) override;
using trap::function::function;
}
static connect{socket, "connect"};
ircd::js::value
ircd::js::socket::connect::on_call(object::handle obj,
value::handle _that,
const args &args)
{
const object that(_that);
const object options(args[0]);
const std::string host(get(options, "host"));
const uint16_t port(get(options, "port"));
ip::tcp::endpoint ep(ip::address::from_string(host), port);
object emission;
set(emission, "event", "connect");
set(emission, "emitter", that);
contract result(ctor(future));
set(result.future, "emit", emission);
set(result.future, "cancel", get(that, "close"));
auto &state(get<socket::state>(that, priv));
state.socket.async_connect(ep, [result, that(heap_object(that)), state(shared_from(state))]
(const boost::system::system_error &e)
mutable
{
result([&e, &that]() -> value
{
if(e.code())
throw jserror("%s", e.what());
call("read", that);
return true;
});
});
return result;
}
} // namespace js
} // namespace ircd