diff --git a/modules/m_bridge.cc b/modules/m_bridge.cc index 806cc2d07..135253adc 100644 --- a/modules/m_bridge.cc +++ b/modules/m_bridge.cc @@ -11,13 +11,14 @@ namespace ircd::m::bridge { static size_t make_txn(const config &, json::stack &, events::range &); - static event::idx worker_handle(const config &, const rfc3986::uri &, const events::range &, window_buffer); + static event::idx worker_handle(const config &, const net::hostport &, const events::range &, window_buffer); static void worker_loop(const config &, const rfc3986::uri &, const mutable_buffer &); - static void worker(std::string config); + static void worker(std::string event, std::string event_id); static void handle_event(const event &, vm::eval &); static void fini(); static void init(); + extern conf::item enable; extern conf::item timeout; extern ctx::dock worker_dock; extern std::vector worker_context; @@ -32,6 +33,13 @@ IRCD_MODULE ircd::m::bridge::fini, }; +decltype(ircd::m::bridge::enable) +ircd::m::bridge::enable +{ + { "name", "ircd.m.bridge.enable" }, + { "default", true }, +}; + decltype(ircd::m::bridge::timeout) ircd::m::bridge::timeout { @@ -54,22 +62,6 @@ ircd::m::bridge::notify_hook } }; -void -ircd::m::bridge::init() -{ - -} - -void -ircd::m::bridge::fini() -{ - for(auto &worker : worker_context) - worker.terminate(); - - for(auto &worker : worker_context) - worker.join(); -} - void ircd::m::bridge::handle_event(const m::event &event, vm::eval &eval) @@ -98,12 +90,64 @@ catch(const std::exception &e) } void -ircd::m::bridge::worker(std::string config_) +ircd::m::bridge::init() +{ + if(!enable) + return; + + config::for_each([] + (const event::idx &event_idx, const event &event, const config &config) + { + log::debug + { + log, "Found configuration for '%s' in %s by %s", + json::get<"id"_>(config), + json::get<"room_id"_>(event), + string_view{event.event_id}, + }; + + worker_context.emplace_back(context + { + "m.bridge", + 512_KiB, + context::POST, + std::bind(&bridge::worker, std::string(event.source), std::string(event.event_id)), + }); + + return true; + }); +} + +void +ircd::m::bridge::fini() +{ + for(auto &worker : worker_context) + worker.terminate(); + + if(!worker_context.empty()) + log::debug + { + log, "Waiting for %zu bridge workers...", + worker_context.size(), + }; + + for(auto &worker : worker_context) + worker.join(); +} + +void +ircd::m::bridge::worker(const std::string event_, + const std::string event_id) try { + const m::event event + { + json::object{event_}, event_id + }; + const bridge::config config { - config_ + json::get<"content"_>(event) }; const rfc3986::uri uri @@ -116,8 +160,29 @@ try event::MAX_SIZE * 8 }; - // Wait for run::level RUN before entering work loop. + log::notice + { + log, "Bridging to '%s' via %s by %s", + json::get<"id"_>(config), + uri.remote, + event_id, + }; + run::barrier {}; + const bool prelinking + { + server::prelink(uri.remote) + }; + + if(!prelinking) + log::warning + { + log, "Bridging to '%s' via %s may not be possible :%s", + json::get<"id"_>(config), + uri.remote, + server::errmsg(uri.remote), + }; + worker_loop(config, uri, buf); } catch(const ctx::interrupted &) @@ -139,6 +204,11 @@ ircd::m::bridge::worker_loop(const config &config, const mutable_buffer &buf) try { + const net::hostport target + { + uri.remote + }; + auto since {vm::sequence::retired}; do { worker_dock.wait([&since] @@ -146,18 +216,36 @@ try return since < vm::sequence::retired; }); + // Wait here if the bridge is down. + while(unlikely(server::errant(target))) + { + log::error + { + log, "Waiting for '%s' at %s with error :%s", + json::get<"id"_>(config), + uri.remote, + server::errmsg(target), + }; + + sleep(15s); + continue; + } + const events::range range { since, vm::sequence::retired + 1 }; - since = worker_handle(config, uri, range, buf); + since = worker_handle(config, target, range, buf); assert(since >= range.first); assert(since <= range.second); // Prevent spin for retrying the same range on handled exception. if(unlikely(since == range.first)) + { sleep(15s); + continue; + } } while(run::level == run::level::RUN); } @@ -176,7 +264,7 @@ catch(const std::exception &e) ircd::m::event::idx ircd::m::bridge::worker_handle(const config &config, - const rfc3986::uri &uri, + const net::hostport &target, const events::range &range_, window_buffer buf) try @@ -229,11 +317,6 @@ try count, }; - const net::hostport target - { - uri.remote - }; - // HTTP request sans http::request {