// Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors // Copyright (C) 2016-2018 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. #include "sender.int.h" std::list txns; std::map> nodes; void remove_node(const node &); static void recv_timeout(txn &, node &); static void recv_timeouts(); static bool recv_handle(txn &, node &); static void recv(); static void recv_worker(); ctx::dock recv_action; static void send(const m::event &, const m::user::id &user_id); static void send(const m::event &, const m::room::id &room_id); static void send(const m::event &); static void send_worker(); static void handle_notify(const m::event &, m::vm::eval &); context sender { "fedsnd S", 1_MiB, &send_worker, context::POST, }; context receiver { "fedsnd R", 1_MiB, &recv_worker, context::POST, }; mapi::header IRCD_MODULE { "federation sender", nullptr, [] { sender.terminate(); receiver.terminate(); sender.join(); receiver.join(); } }; std::deque notified_queue; ctx::dock notified_dock; m::hookfn notified { handle_notify, { { "_site", "vm.notify" }, } }; void handle_notify(const m::event &event, m::vm::eval &eval) { if(!my(event)) return; assert(eval.opts); if(!eval.opts->notify_servers) return; notified_queue.emplace_back(json::strung{event}); notified_dock.notify_all(); } void __attribute__((noreturn)) send_worker() { while(1) try { notified_dock.wait([] { return !notified_queue.empty(); }); const unwind pop{[] { assert(!notified_queue.empty()); notified_queue.pop_front(); }}; const m::event event { json::object{notified_queue.front()} }; send(event); } catch(const std::exception &e) { log::error { "sender worker: %s", e.what() }; } } void send(const m::event &event) { const auto &room_id { json::get<"room_id"_>(event) }; if(json::get<"depth"_>(event) == json::undefined_number) return; if(valid(m::id::ROOM, room_id)) return send(event, m::room::id{room_id}); if(valid(m::id::USER, room_id)) return send(event, m::user::id{room_id}); } void send(const m::event &event, const m::room::id &room_id) { // Unit is not allocated until we find another server in the room. std::shared_ptr unit; const m::room room{room_id}; const m::room::origins origins{room}; origins.for_each([&unit, &event] (const string_view &origin) { if(my_host(origin)) return; auto it{nodes.lower_bound(origin)}; if(it == end(nodes) || it->first != origin) { if(server::errmsg(origin)) return; it = nodes.emplace_hint(it, origin, origin); } auto &node{it->second}; if(node.err) return; if(!unit) unit = std::make_shared(event); node.push(unit); node.flush(); }); } void send(const m::event &event, const m::user::id &user_id) { const string_view &remote { user_id.host() }; if(my_host(remote)) return; auto it{nodes.lower_bound(remote)}; if(it == end(nodes) || it->first != remote) { if(server::errmsg(remote)) return; it = nodes.emplace_hint(it, remote, remote); } auto &node{it->second}; if(node.err) return; auto unit { std::make_shared(event) }; node.push(std::move(unit)); node.flush(); } void node::push(std::shared_ptr su) { q.emplace_back(std::move(su)); } bool node::flush() try { if(q.empty()) return true; if(curtxn) return true; size_t pdus{0}, edus{0}; for(const auto &unit : q) switch(unit->type) { case unit::PDU: ++pdus; break; case unit::EDU: ++edus; break; default: break; } size_t pc(0), ec(0); std::vector units(pdus + edus); for(const auto &unit : q) switch(unit->type) { case unit::PDU: units.at(pc++) = string_view{unit->s}; break; case unit::EDU: units.at(pdus + ec++) = string_view{unit->s}; break; default: break; } m::v1::send::opts opts; opts.remote = remote; opts.sopts = &sopts; const vector_view pduv { units.data(), units.data() + pc }; const vector_view eduv { units.data() + pdus, units.data() + pdus + ec }; std::string content { m::txn::create(pduv, eduv) }; txns.emplace_back(*this, std::move(content), std::move(opts)); const unwind::nominal::assertion na; curtxn = &txns.back(); q.clear(); recv_action.notify_one(); return true; } catch(const std::exception &e) { log::error { "flush error to %s :%s", remote, e.what() }; err = true; return false; } void __attribute__((noreturn)) recv_worker() { while(1) { recv_action.wait([] { return !txns.empty(); }); recv(); recv_timeouts(); } } void recv() try { auto next { ctx::when_any(begin(txns), end(txns)) }; if(!next.wait(seconds(2), std::nothrow)) //TODO: conf return; const auto it { next.get() }; if(unlikely(it == end(txns))) { assert(0); return; } auto &txn { *it }; assert(txn.node); auto &node{*txn.node}; const auto ret { recv_handle(txn, node) }; node.curtxn = nullptr; txns.erase(it); if(node.err) return remove_node(node); if(!ret) return; node.flush(); } catch(const std::exception &e) { ircd::panicking(e); } bool recv_handle(txn &txn, node &node) try { const auto code { txn.get() }; const json::object obj { txn }; const m::v1::send::response resp { obj }; if(code != http::OK) log::dwarning { "%u %s from %s for %s", ushort(code), http::status(code), node.remote, txn.txnid }; resp.for_each_pdu([&txn, &node] (const m::event::id &event_id, const json::object &error) { if(empty(error)) return; log::derror { "Error from %s in %s for %s :%s", node.remote, txn.txnid, string_view{event_id}, string_view{error} }; }); return true; } catch(const http::error &e) { log::derror { "%u %s from %s for %s :%s", ushort(e.code), http::status(e.code), node.remote, txn.txnid, e.what() }; node.err = true; return false; } catch(const std::exception &e) { log::derror { "Error from %s for %s :%s", node.remote, txn.txnid, e.what() }; node.err = true; return false; } void recv_timeouts() { const auto &now { ircd::now() }; auto it(begin(txns)); for(; it != end(txns); ++it) { auto &txn(*it); assert(txn.node); if(txn.node->err) continue; if(txn.timeout + seconds(45) < now) //TODO: conf recv_timeout(txn, *txn.node); } } void recv_timeout(txn &txn, node &node) { log::dwarning { "Timeout to %s for txn %s", node.remote, txn.txnid }; cancel(txn); node.err = true; } void remove_node(const node &node) { const auto it { nodes.find(node.remote) }; assert(it != end(nodes)); nodes.erase(it); }