From 16898ba19eef6fb6d69051257b6ff5b62f3133a3 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 2 Feb 2021 07:52:37 -0800 Subject: [PATCH] modules/m_bridge: Add worker stack; push loop. --- modules/m_bridge.cc | 309 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 301 insertions(+), 8 deletions(-) diff --git a/modules/m_bridge.cc b/modules/m_bridge.cc index 5c95a2ab0..806cc2d07 100644 --- a/modules/m_bridge.cc +++ b/modules/m_bridge.cc @@ -8,18 +8,42 @@ // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. +namespace ircd::m::bridge +{ + static size_t make_txn(const config &, json::stack &, events::range &); + static event::idx worker_handle(const config &, const rfc3986::uri &, const events::range &, window_buffer); + static void worker_loop(const config &, const rfc3986::uri &, const mutable_buffer &); + static void worker(std::string config); + static void handle_event(const event &, vm::eval &); + static void fini(); + static void init(); + + extern conf::item timeout; + extern ctx::dock worker_dock; + extern std::vector worker_context; + extern hookfn notify_hook; +} + ircd::mapi::header IRCD_MODULE { - "Bridges (Application Services)" + "Bridges (Application Services)", + ircd::m::bridge::init, + ircd::m::bridge::fini, }; -namespace ircd::m::bridge +decltype(ircd::m::bridge::timeout) +ircd::m::bridge::timeout { - static void handle_event(const m::event &, vm::eval &); + { "name", "ircd.m.bridge.txn.timeout" }, + { "default", 10L }, +}; - extern hookfn notify_hook; -} +decltype(ircd::m::bridge::worker_dock) +ircd::m::bridge::worker_dock; + +decltype(ircd::m::bridge::worker_context) +ircd::m::bridge::worker_context; decltype(ircd::m::bridge::notify_hook) ircd::m::bridge::notify_hook @@ -30,26 +54,295 @@ ircd::m::bridge::notify_hook } }; +void +ircd::m::bridge::init() +{ + +} + +void +ircd::m::bridge::fini() +{ + for(auto &worker : worker_context) + worker.terminate(); + + for(auto &worker : worker_context) + worker.join(); +} + void ircd::m::bridge::handle_event(const m::event &event, vm::eval &eval) try { - // Drop internal room traffic if(eval.room_internal) return; - // Drop EDU's ??? if(!event.event_id) return; + worker_dock.notify_all(); +} +catch(const ctx::interrupted &) +{ + throw; +} +catch(const std::exception &e) +{ + log::critical + { + log, "Failed to handle %s notify :%s", + string_view{event.event_id}, + e.what(), + }; +} +void +ircd::m::bridge::worker(std::string config_) +try +{ + const bridge::config config + { + config_ + }; + + const rfc3986::uri uri + { + at<"url"_>(config) + }; + + const unique_mutable_buffer buf + { + event::MAX_SIZE * 8 + }; + + // Wait for run::level RUN before entering work loop. + run::barrier {}; + worker_loop(config, uri, buf); +} +catch(const ctx::interrupted &) +{ + throw; } catch(const std::exception &e) { log::error { - log, "Failed to handle for bridgelication services :%s", + log, "Worker failed to initialize :%s", e.what(), }; } + +void +ircd::m::bridge::worker_loop(const config &config, + const rfc3986::uri &uri, + const mutable_buffer &buf) +try +{ + auto since {vm::sequence::retired}; do + { + worker_dock.wait([&since] + { + return since < vm::sequence::retired; + }); + + const events::range range + { + since, vm::sequence::retired + 1 + }; + + since = worker_handle(config, uri, range, buf); + assert(since >= range.first); + assert(since <= range.second); + + // Prevent spin for retrying the same range on handled exception. + if(unlikely(since == range.first)) + sleep(15s); + } + while(run::level == run::level::RUN); +} +catch(const ctx::interrupted &) +{ + throw; +} +catch(const std::exception &e) +{ + log::critical + { + log, "Worker unhandled :%s", + e.what(), + }; +} + +ircd::m::event::idx +ircd::m::bridge::worker_handle(const config &config, + const rfc3986::uri &uri, + const events::range &range_, + window_buffer buf) +try +{ + size_t count {0}; + auto range {range_}; + buf([&config, &count, &range] + (const mutable_buffer &buf) + { + json::stack out + { + buf + }; + + count += make_txn(config, out, range); + return out.completed(); + }); + + if(!count) + return range.second; + + const json::object content + { + buf.completed() + }; + + buf = window_buffer + { + buf.remains() + }; + + // Generate URL for the PUT + char uribuf[448], txnidbuf[64]; + const string_view url + { + make_uri(uribuf, config, fmt::sprintf + { + txnidbuf, "transactions/%lu", range.first, + }) + }; + + log::debug + { + log, "[%s] PUT txn:%lu:%lu (%lu:%lu) events:%zu", + json::get<"id"_>(config), + range.first, + range.second, + range_.first, + range_.second, + count, + }; + + const net::hostport target + { + uri.remote + }; + + // HTTP request sans + http::request + { + buf, + host(target), + "PUT", + url, + size(string_view(content)), + "application/json; charset=utf-8"_sv, + }; + + // Outputs from consumed buffer + server::out out; + out.head = buf.completed(); + out.content = content; + + // Inputs to remaining buffer + server::in in; + in.head = buf.remains(); + in.content = in.head; + + // Send to bridge + server::request::opts sopts; + server::request req + { + target, std::move(out), std::move(in), &sopts + }; + + // Recv response + const auto code + { + req.get(seconds(timeout)) + }; + + log::logf + { + log, log::level::DEBUG, + "[%s] %u txn:%lu:%lu (%lu:%lu) events:%zu :%s", + json::get<"id"_>(config), + uint(code), + range.first, + range.second, + range_.first, + range_.second, + count, + http::status(code), + }; + + return range.second + 1; +} +catch(const ctx::interrupted &) +{ + throw; +} +catch(const std::exception &e) +{ + log::error + { + log, "worker handle range:%lu:%lu :%s", + range_.first, + range_.second, + e.what(), + }; + + return range_.first; +} + +size_t +ircd::m::bridge::make_txn(const config &config, + json::stack &out, + events::range &range) +{ + json::stack::object top + { + out + }; + + json::stack::array events + { + top, "events" + }; + + size_t count {0}; + m::events::for_each(events::range{range}, [&] + (const event::idx &event_idx, m::event event) + { + if(m::internal(json::get<"room_id"_>(event))) + return true; + + m::event::append::opts opts; + opts.event_idx = &event_idx; + opts.query_txnid = false; + opts.query_prev_state = true; + opts.query_redacted = false; + m::event::append + { + events, event + }; + + range.second = event_idx; + ++count; + + const bool sufficient_buffer + { + out.remaining() > event::MAX_SIZE + 16_KiB + }; + + return sufficient_buffer; + }); + + return count; +}