From 5a933ee7136d93143d9cee9a606f7ddbfbb7320a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 14 Mar 2018 13:36:13 -0700 Subject: [PATCH] modules/federation: Checkpoint preliminary sender. --- modules/Makefile.am | 2 + modules/federation/send.cc | 119 +--------- modules/federation/sender.cc | 373 ++++++++++++++++++++++++++++++++ modules/federation/sender.int.h | 86 ++++++++ 4 files changed, 469 insertions(+), 111 deletions(-) create mode 100644 modules/federation/sender.cc create mode 100644 modules/federation/sender.int.h diff --git a/modules/Makefile.am b/modules/Makefile.am index ba7421141..c3fe4e50b 100644 --- a/modules/Makefile.am +++ b/modules/Makefile.am @@ -231,12 +231,14 @@ federation_federation_send_la_SOURCES = federation/send.cc federation_federation_event_la_SOURCES = federation/event.cc federation_federation_get_missing_events_la_SOURCES = federation/get_missing_events.cc federation_federation_version_la_SOURCES = federation/version.cc +federation_federation_sender_la_SOURCES = federation/sender.cc federation_module_LTLIBRARIES = \ federation/federation_send.la \ federation/federation_event.la \ federation/federation_get_missing_events.la \ federation/federation_version.la \ + federation/federation_sender.la \ ### ############################################################################### diff --git a/modules/federation/send.cc b/modules/federation/send.cc index 981516795..8ae934d27 100644 --- a/modules/federation/send.cc +++ b/modules/federation/send.cc @@ -10,33 +10,13 @@ using namespace ircd; -void sender_worker(); -ircd::context sender_context +mapi::header +IRCD_MODULE { - "sender", - 256_KiB, - &sender_worker, - ircd::context::POST, + "federation send" }; -const auto on_unload{[] -{ - sender_context.interrupt(); - sender_context.join(); -}}; - -mapi::header IRCD_MODULE -{ - "federation send", - nullptr, - on_unload -}; - -struct send -:resource -{ - using resource::resource; -} +resource send_resource { "/_matrix/federation/v1/send/", @@ -62,6 +42,8 @@ handle_edu(client &client, vmopts.non_conform.set(m::event::conforms::INVALID_OR_MISSING_EVENT_ID); vmopts.non_conform.set(m::event::conforms::INVALID_OR_MISSING_ROOM_ID); vmopts.non_conform.set(m::event::conforms::INVALID_OR_MISSING_SENDER_ID); + vmopts.non_conform.set(m::event::conforms::MISSING_ORIGIN_SIGNATURE); + vmopts.non_conform.set(m::event::conforms::MISSING_SIGNATURES); vmopts.non_conform.set(m::event::conforms::MISSING_PREV_EVENTS); vmopts.non_conform.set(m::event::conforms::MISSING_PREV_STATE); vmopts.non_conform.set(m::event::conforms::DEPTH_ZERO); @@ -171,7 +153,8 @@ handle_put(client &client, }; } -resource::method method_put +resource::method +method_put { send_resource, "PUT", handle_put, { @@ -179,89 +162,3 @@ resource::method method_put 4_MiB // larger = HTTP 413 //TODO: conf } }; - -// -// Main worker stack -// - -void sender_handle(const m::event &, const m::room::id &room_id); -void sender_handle(const m::event &); - -void -sender_worker() -{ - while(1) try - { - std::unique_lock lock - { - m::vm::inserted - }; - - // reference to the event on the inserter's stack - const auto &event - { - m::vm::inserted.wait(lock) - }; - - sender_handle(event); - } - catch(const ircd::ctx::interrupted &e) - { - ircd::log::debug("sender worker interrupted"); - return; - } - catch(const timeout &e) - { - ircd::log::debug("sender worker: %s", e.what()); - } - catch(const std::exception &e) - { - ircd::log::error("sender worker: %s", e.what()); - } -} - -void -sender_handle(const m::event &event) -{ - const auto &room_id - { - json::get<"room_id"_>(event) - }; - - if(room_id) - { - sender_handle(event, room_id); - return; - } - - assert(0); -} - -ssize_t txn_ctr -{ - 8 -}; - -void -sender_handle(const m::event &event, - const m::room::id &room_id) -try -{ - const m::room room - { - room_id - }; - - const m::event::id &event_id - { - json::get<"event_id"_>(event) - }; -} -catch(const http::error &e) -{ - throw; -} -catch(const std::exception &e) -{ - throw; -} diff --git a/modules/federation/sender.cc b/modules/federation/sender.cc new file mode 100644 index 000000000..5251c64a1 --- /dev/null +++ b/modules/federation/sender.cc @@ -0,0 +1,373 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#include "sender.int.h" + + +std::list txns; +std::map> nodes; + +static void recv_timeout(txn &); +static void recv_timeouts(); +static bool recv_handle(txn &, node &); +static void recv(); +static void recv_worker(); +ctx::dock recv_action; + +static void send(const m::event &, const m::room::id &room_id); +static void send(const m::event &); +static void send(); +static void send_worker(); + +context +sender +{ + "fedsnd S", 1_MiB, &send_worker, context::POST, +}; + +context +receiver +{ + "fedsnd R", 1_MiB, &recv_worker, context::POST, +}; + +mapi::header +IRCD_MODULE +{ + "federation sender", + nullptr, [] + { + sender.interrupt(); + receiver.interrupt(); + sender.join(); + receiver.join(); + } +}; + +void +send_worker() +try +{ + while(1) + send(); +} +catch(const ircd::ctx::interrupted &e) +{ + ircd::log::debug + { + "Sender worker interrupted..." + }; +} + +void +send() +try +{ + std::unique_lock lock + { + m::vm::accept + }; + + // reference to the event on the inserter's stack + const auto &event + { + m::vm::accept.wait(lock) + }; + + if(my(event)) + send(event); +} +catch(const ircd::ctx::interrupted &e) +{ + throw; +} +catch(const std::exception &e) +{ + ircd::log::error + { + "sender worker: %s", e.what() + }; +} + +void +send(const m::event &event) +{ + const auto &room_id + { + json::get<"room_id"_>(event) + }; + + if(room_id) + send(event, room_id); +} + +void +send(const m::event &event, + const m::room::id &room_id) +{ + // Unit is not allocated until we find another server in the room. + std::shared_ptr unit; + + const m::room room{room_id}; + const m::room::origins origins{room}; + origins.for_each([&unit, &event] + (const string_view &origin) + { + if(my_host(origin)) + return; + + auto it{nodes.lower_bound(origin)}; + if(it == end(nodes) || it->first != origin) + it = nodes.emplace_hint(it, origin, origin); + + auto &node{it->second}; + if(!unit) + unit = std::make_shared(event); + + node.push(unit); + }); +} + +void +node::push(std::shared_ptr su) +{ + q.emplace_back(std::move(su)); + flush(); +} + +bool +node::flush() +try +{ + if(q.empty()) + return false; + + if(curtxn) + return false; + + const auto pdus + { + std::count_if(begin(q), end(q), [](const auto &p) + { + return p->type == unit::PDU; + }) + }; + + std::vector pdu(pdus); + for(ssize_t i(0); i < pdus; ++i) + pdu.at(i) = string_view{q.at(i)->s}; + + std::string content + { + m::txn::create({pdu.data(), pdu.size()}, {}) + }; + + m::v1::send::opts opts; + opts.remote = id.host(); + opts.sopts = &sopts; + txns.emplace_back(*this, std::move(content), std::move(opts)); + q.clear(); + recv_action.notify_one(); + return true; +} +catch(const std::exception &e) +{ + log::error + { + "flush error to %s :%s", string_view{id}, e.what() + }; + + return false; +} + +void +recv_worker() +try +{ + while(1) + { + recv_action.wait([] + { + return !txns.empty(); + }); + + recv(); + recv_timeouts(); + } +} +catch(const ircd::ctx::interrupted &e) +{ + log::debug + { + "Receive worker interrupted..." + }; +} + +void +recv() +try +{ + auto next + { + ctx::when_any(begin(txns), end(txns)) + }; + + if(next.wait(seconds(2), std::nothrow) == ctx::future_status::timeout) //TODO: conf + return; + + const auto it + { + next.get() + }; + + if(unlikely(it == end(txns))) + { + assert(0); + return; + } + + auto &txn + { + *it + }; + + assert(txn.node); + auto &node{*txn.node}; + const auto ret + { + recv_handle(txn, node) + }; + + node.curtxn = nullptr; + txns.erase(it); + + if(ret) + node.flush(); +} +catch(const ctx::interrupted &e) +{ + throw; +} +catch(const std::exception &e) +{ + ircd::assertion(e); + return; +} + +bool +recv_handle(txn &txn, + node &node) +try +{ + const auto code + { + txn.get() + }; + + const json::object obj + { + txn + }; + + const m::v1::send::response resp + { + obj + }; + + log::debug + { + "%u %s from %s for %s", + ushort(code), + http::status(code), + string_view{node.id}, + txn.txnid + }; + + resp.for_each_pdu([&txn, &node] + (const m::event::id &event_id, const json::object &error) + { + if(empty(error)) + return; + + log::error + { + "Error from %s in %s for %s :%s", + string_view{node.id}, + txn.txnid, + string_view{event_id}, + string_view{error} + }; + }); + + return true; +} +catch(const http::error &e) +{ + log::error + { + "%u %s from %s for %s :%s", + ushort(e.code), + http::status(e.code), + string_view{node.id}, + txn.txnid, + e.what() + }; + + return false; +} +catch(const std::exception &e) +{ + log::error + { + "Error from %s for %s :%s", + string_view{node.id}, + txn.txnid, + e.what() + }; + + return false; +} + +void +recv_timeouts() +{ + const auto &now + { + ircd::now() + }; + + auto it(begin(txns)); + while(it != end(txns)) + { + auto &txn(*it); + if(txn.timeout + seconds(15) < now) //TODO: conf + { + recv_timeout(txn); + it = txns.erase(it); + } + else ++it; + } +} + +void +recv_timeout(txn &txn) +{ + assert(txn.node); + auto &node(*txn.node); + + log::warning + { + "Timeout to %s for txn %s", + string_view{node.id}, + txn.txnid + }; + + node.curtxn = nullptr; +} diff --git a/modules/federation/sender.int.h b/modules/federation/sender.int.h new file mode 100644 index 000000000..4f87c9375 --- /dev/null +++ b/modules/federation/sender.int.h @@ -0,0 +1,86 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +// +// Internal header for sender.cc only. +// Do not include. +// + +using namespace ircd; + +struct txn; +struct node; + +struct unit +:std::enable_shared_from_this +{ + enum type { PDU, EDU, FAILURE }; + + std::string s; + enum type type; + + unit(std::string s, const enum type &type) + :s{std::move(s)} + ,type{type} + {} + + unit(const m::event &event) + :unit{json::strung{event}, PDU} + {} +}; + +struct txndata +{ + std::string content; + string_view txnid; + char txnidbuf[64]; + + txndata(std::string content) + :content{std::move(content)} + ,txnid{m::txn::create_id(txnidbuf, this->content)} + {} +}; + +struct node +{ + std::deque> q; + m::node::id::buf id; + m::node::room room; + server::request::opts sopts; + txn *curtxn {nullptr}; + + bool flush(); + void push(std::shared_ptr); + + node(const string_view &origin) + :id{"", origin} + ,room{id} + {} +}; + +struct txn +:txndata +,m::v1::send +{ + struct node *node; + steady_point timeout; + char headers[8_KiB]; + + txn(struct node &node, + std::string content, + m::v1::send::opts opts) + :txndata{std::move(content)} + ,send{txnid, string_view{this->content}, headers, std::move(opts)} + ,node{&node} + ,timeout{now()} //TODO: conf + {} + + txn() = default; +};