diff --git a/include/ircd/net/acceptor.h b/include/ircd/net/acceptor.h index 5e2c7c6ed..5f1496a5a 100644 --- a/include/ircd/net/acceptor.h +++ b/include/ircd/net/acceptor.h @@ -59,3 +59,32 @@ struct ircd::net::listener::acceptor ~acceptor() noexcept; }; + +struct ircd::net::listener_udp::acceptor +{ + using error_code = boost::system::error_code; + + static constexpr log::log &log {listener::acceptor::log}; + static ip::udp::socket::message_flags flags(const flag &); + + std::string name; + std::string opts; + ip::udp::endpoint ep; + ip::udp::socket a; + size_t waiting {0}; + ctx::dock joining; + + // Yield context for datagram. + datagram &operator()(datagram &); + + // Acceptor shutdown + bool interrupt() noexcept; + void join() noexcept; + + acceptor(const string_view &name, + const json::object &opts); + + ~acceptor() noexcept; + + friend std::ostream &operator<<(std::ostream &s, const acceptor &); +}; diff --git a/include/ircd/net/listener.h b/include/ircd/net/listener.h index c812a0b4a..c089ed1bf 100644 --- a/include/ircd/net/listener.h +++ b/include/ircd/net/listener.h @@ -14,6 +14,7 @@ namespace ircd::net { struct listener; + struct listener_udp; } struct ircd::net::listener @@ -38,3 +39,43 @@ struct ircd::net::listener ~listener() noexcept; }; + +struct ircd::net::listener_udp +{ + struct acceptor; + struct datagram; + enum flag :uint; + + IRCD_EXCEPTION(net::error, error) + + private: + std::unique_ptr acceptor; + + public: + datagram &operator()(datagram &); + + listener_udp(const string_view &name, + const json::object &options); + + explicit + listener_udp(const string_view &name, + const std::string &options); + + ~listener_udp() noexcept; +}; + +struct ircd::net::listener_udp::datagram +{ + mutable_buffer buf; + ipport remote; + enum flag flag {(enum flag)0}; + vector_view bufs { &buf, 1 }; + + datagram() = default; +}; + +enum ircd::net::listener_udp::flag +:uint +{ + PEEK = 0x01, +}; diff --git a/ircd/net.cc b/ircd/net.cc index 8a3330643..b282fc67a 100644 --- a/ircd/net.cc +++ b/ircd/net.cc @@ -885,6 +885,10 @@ ircd::net::blocking(const socket &socket) // net/listener.h // +// +// listener +// + ircd::net::listener::listener(const string_view &name, const std::string &opts, callback cb) @@ -918,11 +922,51 @@ noexcept acceptor->join(); } +// +// listener_udp +// + +ircd::net::listener_udp::listener_udp(const string_view &name, + const std::string &opts) +:listener_udp +{ + name, json::object{opts} +} +{ +} + +ircd::net::listener_udp::listener_udp(const string_view &name, + const json::object &opts) +:acceptor +{ + std::make_unique(name, opts) +} +{ +} + +ircd::net::listener_udp::~listener_udp() +noexcept +{ + if(acceptor) + acceptor->join(); +} + +ircd::net::listener_udp::datagram & +ircd::net::listener_udp::operator()(datagram &datagram) +{ + assert(acceptor); + return acceptor->operator()(datagram); +} + /////////////////////////////////////////////////////////////////////////////// // // net/acceptor.h // +// +// listener::acceptor +// + ircd::log::log ircd::net::listener::acceptor::log { @@ -936,6 +980,10 @@ ircd::net::listener::acceptor::timeout { "default", 12000L }, }; +// +// listener::acceptor::acceptor +// + ircd::net::listener::acceptor::acceptor(const string_view &name, const json::object &opts, listener::callback cb) @@ -1395,6 +1443,141 @@ const }; } +// +// listener_udp::acceptor +// + +std::ostream & +ircd::net::operator<<(std::ostream &s, const struct listener_udp::acceptor &a) +{ + s << "'" << a.name << "' @ [" << string(a.ep.address()) << "]:" << a.ep.port(); + return s; +} + +// +// listener_udp::acceptor::acceptor +// + +ircd::net::listener_udp::acceptor::acceptor(const string_view &name, + const json::object &opts) +try +:name +{ + name +} +,opts +{ + opts +} +,ep +{ + ip::address::from_string(unquote(opts.get("host", "0.0.0.0"s))), + opts.get("port", 8448L) +} +,a +{ + *ircd::ios +} +{ + static const ip::udp::socket::reuse_address reuse_address + { + true + }; + + a.open(ep.protocol()); + a.set_option(reuse_address); + log::debug + { + log, "%s opened listener socket", string(*this) + }; + + a.bind(ep); + log::debug + { + log, "%s bound listener socket", string(*this) + }; +} +catch(const boost::system::system_error &e) +{ + throw error + { + "listener_udp: %s", e.what() + }; +} + +ircd::net::listener_udp::acceptor::~acceptor() +noexcept +{ +} + +void +ircd::net::listener_udp::acceptor::join() +noexcept try +{ + interrupt(); + joining.wait([this] + { + return waiting == 0; + }); +} +catch(const std::exception &e) +{ + log::error + { + log, "acceptor(%p) join: %s", this, e.what() + }; +} + +bool +ircd::net::listener_udp::acceptor::interrupt() +noexcept try +{ + a.cancel(); + return true; +} +catch(const boost::system::system_error &e) +{ + log::error + { + log, "acceptor(%p) interrupt: %s", this, string(e) + }; + + return false; +} + +ircd::net::listener_udp::datagram & +ircd::net::listener_udp::acceptor::operator()(datagram &datagram) +{ + assert(ctx::current); + + this->waiting++; + const unwind dec{[this] + { + this->waiting--; + }}; + + ip::udp::endpoint ep; + const size_t rlen + { + a.async_receive_from(datagram.bufs, ep, flags(datagram.flag), yield_context{to_asio{}}) + }; + + datagram.remote = make_ipport(ep); + datagram.buf = {data(datagram.buf), rlen}; + return datagram; +} + +boost::asio::ip::udp::socket::message_flags +ircd::net::listener_udp::acceptor::flags(const flag &flag) +{ + ip::udp::socket::message_flags ret{0}; + + if(flag & flag::PEEK) + ret |= ip::udp::socket::message_peek; + + return ret; +} + /////////////////////////////////////////////////////////////////////////////// // // net/scope_timeout.h