diff --git a/include/ircd/m/acquire.h b/include/ircd/m/acquire.h new file mode 100644 index 000000000..b86d2b801 --- /dev/null +++ b/include/ircd/m/acquire.h @@ -0,0 +1,32 @@ +// The Construct +// +// Copyright (C) The Construct Developers, Authors & Contributors +// Copyright (C) 2016-2020 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +#pragma once +#define HAVE_IRCD_M_ACQUIRE_H + +namespace ircd::m::acquire +{ + struct opts; + struct acquire; + + extern log::log log; +}; + +struct ircd::m::acquire::acquire +{ + acquire(const room &, const opts &); +}; + +struct ircd::m::acquire::opts +{ + string_view hint; + + bool hint_only {false}; +}; diff --git a/include/ircd/m/m.h b/include/ircd/m/m.h index cd60f00ed..8890173f4 100644 --- a/include/ircd/m/m.h +++ b/include/ircd/m/m.h @@ -95,6 +95,7 @@ namespace ircd #include "media.h" #include "search.h" #include "gossip.h" +#include "acquire.h" #include "burst.h" #include "resource.h" #include "homeserver.h" diff --git a/matrix/Makefile.am b/matrix/Makefile.am index 126ade7ae..897e92dbf 100644 --- a/matrix/Makefile.am +++ b/matrix/Makefile.am @@ -125,6 +125,7 @@ libircd_matrix_la_SOURCES += user_room_account_data.cc libircd_matrix_la_SOURCES += user_room_tags.cc libircd_matrix_la_SOURCES += user_rooms.cc libircd_matrix_la_SOURCES += user_tokens.cc +libircd_matrix_la_SOURCES += acquire.cc libircd_matrix_la_SOURCES += bridge.cc libircd_matrix_la_SOURCES += breadcrumb_rooms.cc libircd_matrix_la_SOURCES += burst.cc diff --git a/matrix/acquire.cc b/matrix/acquire.cc new file mode 100644 index 000000000..389944617 --- /dev/null +++ b/matrix/acquire.cc @@ -0,0 +1,342 @@ +// The Construct +// +// Copyright (C) The Construct Developers, Authors & Contributors +// Copyright (C) 2016-2020 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +namespace ircd::m::acquire +{ + static bool handle_event(const room &, const event::id &, const opts &); + static void handle_missing(const room &, const opts &); + static void handle_room(const room &, const opts &); + static void handle(const room::id &, const opts &); +}; + +decltype(ircd::m::acquire::log) +ircd::m::acquire::log +{ + "m.acquire" +}; + +// +// acquire::acquire +// + +ircd::m::acquire::acquire::acquire(const room &room, + const opts &opts) +{ + handle_room(room, opts); + ctx::interruption_point(); + + handle_missing(room, opts); + ctx::interruption_point(); +} + +// +// internal +// + +void +ircd::m::acquire::handle_room(const room &room, + const opts &opts) +try +{ + const room::origins origins + { + room + }; + + // When the room isn't public we need to supply a user_id of one of our + // users in the room to satisfy matrix protocol requirements upstack. + const auto user_id + { + m::any_user(room, my_host(), "join") + }; + + size_t respond(0), behind(0), equal(0), ahead(0); + size_t exists(0), fetching(0), evaluated(0); + std::set> errors; + const auto &[top_event_id, top_event_depth, top_event_idx] + { + m::top(std::nothrow, room) + }; + + log::info + { + log, "Resynchronizing %s from %s [idx:%lu depth:%ld] from %zu joined servers...", + string_view{room.room_id}, + string_view{top_event_id}, + top_event_idx, + top_event_depth, + origins.count(), + }; + + feds::opts fopts; + fopts.op = feds::op::head; + fopts.room_id = room.room_id; + fopts.user_id = user_id; + fopts.closure_errors = false; // exceptions wil not propagate feds::execute + fopts.exclude_myself = true; + const auto &top_depth(top_event_depth); // clang structured-binding & closure oops + feds::execute(fopts, [&](const auto &result) + { + const m::event event + { + result.object.get("event") + }; + + // The depth comes back as one greater than any existing + // depth so we subtract one. + const auto &depth + { + std::max(json::get<"depth"_>(event) - 1L, 0L) + }; + + ++respond; + ahead += depth > top_depth; + equal += depth == top_depth; + behind += depth < top_depth; + const event::prev prev + { + event + }; + + return m::for_each(prev, [&](const event::id &event_id) + { + if(unlikely(ctx::interruption_requested())) + return false; + + if(errors.count(event_id)) + return true; + + if(!m::exists(event::id(event_id))) + { + ++fetching; + m::acquire::opts _opts(opts); + _opts.hint = result.origin; + _opts.hint_only = true; + if(!handle_event(room, event_id, _opts)) + { + // If we fail the process the event we cache that and cease here. + errors.emplace(event_id); + return true; + } + else ++evaluated; + } + else ++exists; + + // If the event already exists or was successfully obtained we + // reward the remote with gossip of events which reference this + // event which it is unlikely to have. + //if(gossip_enable) + // gossip(room_id, event_id, result.origin); + + return true; + }); + }); + + if(unlikely(ctx::interruption_requested())) + return; + + log::info + { + log, "Acquired %s remote head; servers:%zu online:%zu" + " depth:%ld lt:eq:gt %zu:%zu:%zu exist:%zu eval:%zu error:%zu", + string_view{room.room_id}, + origins.count(), + origins.count_online(), + top_depth, + behind, + equal, + ahead, + exists, + evaluated, + errors.size(), + }; + + assert(ahead + equal + behind == respond); +} +catch(const std::exception &e) +{ + log::error + { + log, "Failed to synchronize recent %s :%s", + string_view{room.room_id}, + e.what(), + }; +} + +void +ircd::m::acquire::handle_missing(const room &room, + const opts &opts) +try +{ + const m::room::events::missing missing + { + room + }; + + const int64_t &room_depth + { + m::depth(std::nothrow, room) + }; + + const ssize_t &viewport_size + { + m::room::events::viewport_size + }; + + const int64_t min_depth + { + std::max(room_depth - viewport_size * 2, 0L) + }; + + ssize_t attempted(0); + std::set> fail; + missing.for_each(min_depth, [&] + (const auto &event_id, const int64_t &ref_depth, const auto &ref_idx) + { + if(unlikely(ctx::interruption_requested())) + return false; + + auto it{fail.lower_bound(event_id)}; + if(it == end(fail) || *it != event_id) + { + log::debug + { + log, "Fetching missing %s ref_depth:%zd in %s head_depth:%zu min_depth:%zd", + string_view{event_id}, + ref_depth, + string_view{room.room_id}, + room_depth, + min_depth, + }; + + if(!handle_event(room, event_id, opts)) + fail.emplace_hint(it, event_id); + } + + ++attempted; + return true; + }); + + if(unlikely(ctx::interruption_requested())) + return; + + if(attempted - ssize_t(fail.size()) > 0L) + log::info + { + log, "Fetched %zu recent missing events in %s attempted:%zu fail:%zu", + attempted - fail.size(), + string_view{room.room_id}, + attempted, + fail.size(), + }; +} +catch(const std::exception &e) +{ + log::error + { + log, "Failed to synchronize missing %s :%s", + string_view{room.room_id}, + e.what(), + }; +} + +bool +ircd::m::acquire::handle_event(const room &room, + const event::id &event_id, + const opts &opts) +try +{ + fetch::opts fopts; + fopts.op = fetch::op::event; + fopts.room_id = room.room_id; + fopts.event_id = event_id; + fopts.backfill_limit = 1; + fopts.hint = opts.hint; + fopts.attempt_limit = opts.hint_only; + auto future + { + fetch::start(fopts) + }; + + m::fetch::result result + { + future.get() + }; + + const json::object response + { + result + }; + + const json::array &pdus + { + response["pdus"] + }; + + const m::event event + { + pdus.at(0), event_id + }; + + const auto &[viewport_depth, _] + { + m::viewport(room.room_id) + }; + + const bool below_viewport + { + json::get<"depth"_>(event) < viewport_depth + }; + + if(below_viewport) + log::debug + { + log, "Will not fetch children of %s depth:%ld below viewport:%ld in %s", + string_view{event_id}, + json::get<"depth"_>(event), + viewport_depth, + string_view{room.room_id}, + }; + + m::vm::opts vmopts; + vmopts.infolog_accept = true; + vmopts.fetch_prev = !below_viewport; + vmopts.fetch_state = below_viewport; + vmopts.warnlog &= ~vm::fault::EXISTS; + vmopts.node_id = opts.hint; + m::vm::eval eval + { + event, vmopts + }; + + log::info + { + log, "acquired %s in %s depth:%ld viewport:%ld state:%b", + string_view{event_id}, + string_view{room.room_id}, + json::get<"depth"_>(event), + viewport_depth, + defined(json::get<"state_key"_>(event)), + }; + + return true; +} +catch(const std::exception &e) +{ + log::derror + { + log, "Failed to acquire %s synchronizing %s :%s", + string_view{event_id}, + string_view{room.room_id}, + e.what(), + }; + + return false; +} diff --git a/matrix/init_backfill.cc b/matrix/init_backfill.cc index 3d445aa66..dbedd0bb8 100644 --- a/matrix/init_backfill.cc +++ b/matrix/init_backfill.cc @@ -10,19 +10,16 @@ namespace ircd::m::init::backfill { - extern conf::item gossip_enable; - extern conf::item gossip_timeout; void gossip(const room::id &, const event::id &, const string_view &remote); - - bool handle_event(const room::id &, const event::id &, const string_view &hint, const bool &ask_one); - void handle_missing(const room::id &); void handle_room(const room::id &); void worker(); extern std::unique_ptr worker_context; - extern conf::item enable; - extern conf::item pool_size; + extern conf::item gossip_timeout; + extern conf::item gossip_enable; extern conf::item local_joined_only; + extern conf::item pool_size; + extern conf::item enable; extern log::log log; }; @@ -169,52 +166,41 @@ try ctx::pool pool { - "m.init.backfill", pool_opts + "m.init.backfill", + pool_opts }; - ctx::dock dock; - size_t count(0), complete(0); - const auto each_room{[&estimate, &count, &complete, &dock] - (const room::id &room_id) - { - const unwind completed{[&complete, &dock] - { - ++complete; - dock.notify_one(); - }}; - - handle_room(room_id); - ctx::interruption_point(); - - handle_missing(room_id); - ctx::interruption_point(); - - log::info - { - log, "Initial backfill of %s complete:%zu", //estimate:%zu %02.2lf%%", - string_view{room_id}, - complete, - estimate, - (complete / double(estimate)) * 100.0, - }; - - return true; - }}; - // Iterate the room_id's, submitting a copy of each to the next pool // worker; the submission blocks when all pool workers are busy, as per // the pool::opts. + ctx::dock dock; + size_t count(0), complete(0); const ctx::uninterruptible ui; - rooms::for_each(opts, [&pool, &each_room, &count] + rooms::for_each(opts, [&pool, &count, &complete, &estimate, &dock] (const room::id &room_id) { if(unlikely(ctx::interruption_requested())) return false; ++count; - pool([&each_room, room_id(std::string(room_id))] + pool([&, room_id(std::string(room_id))] // asynchronous { - each_room(room_id); + const unwind completed{[&complete, &dock] + { + ++complete; + dock.notify_one(); + }}; + + handle_room(room_id); + + log::info + { + log, "Initial backfill of %s complete:%zu", //estimate:%zu %02.2lf%%", + string_view{room_id}, + complete, + estimate, + (complete / double(estimate)) * 100.0, + }; }); return true; @@ -277,309 +263,12 @@ catch(const std::exception &e) void ircd::m::init::backfill::handle_room(const room::id &room_id) -try { - const m::room room + m::acquire::opts opts; + m::acquire::acquire { - room_id + room_id, opts }; - - const room::origins origins - { - room - }; - - // When the room isn't public we need to supply a user_id of one of our - // users in the room to satisfy matrix protocol requirements upstack. - const auto user_id - { - m::any_user(room, my_host(), "join") - }; - - size_t respond(0), behind(0), equal(0), ahead(0); - size_t exists(0), fetching(0), evaluated(0); - std::set> errors; - const auto &[top_event_id, top_event_depth, top_event_idx] - { - m::top(std::nothrow, room) - }; - - log::info - { - log, "Resynchronizing %s from %s [idx:%lu depth:%ld] from %zu joined servers...", - string_view{room_id}, - string_view{top_event_id}, - top_event_idx, - top_event_depth, - origins.count(), - }; - - feds::opts opts; - opts.op = feds::op::head; - opts.room_id = room_id; - opts.user_id = user_id; - opts.closure_errors = false; // exceptions wil not propagate feds::execute - opts.exclude_myself = true; - const auto &top_depth(top_event_depth); // clang structured-binding & closure oops - feds::execute(opts, [&](const auto &result) - { - const m::event event - { - result.object.get("event") - }; - - // The depth comes back as one greater than any existing - // depth so we subtract one. - const auto &depth - { - std::max(json::get<"depth"_>(event) - 1L, 0L) - }; - - ++respond; - ahead += depth > top_depth; - equal += depth == top_depth; - behind += depth < top_depth; - const event::prev prev - { - event - }; - - return m::for_each(prev, [&](const event::id &event_id) - { - if(unlikely(ctx::interruption_requested())) - return false; - - if(errors.count(event_id)) - return true; - - if(!m::exists(event::id(event_id))) - { - ++fetching; - if(!handle_event(room_id, event_id, result.origin, true)) - { - // If we fail the process the event we cache that and cease here. - errors.emplace(event_id); - return true; - } - else ++evaluated; - } - else ++exists; - - // If the event already exists or was successfully obtained we - // reward the remote with gossip of events which reference this - // event which it is unlikely to have. - if(gossip_enable) - gossip(room_id, event_id, result.origin); - - return true; - }); - }); - - if(unlikely(ctx::interruption_requested())) - return; - - log::info - { - log, "Acquired %s remote head; servers:%zu online:%zu" - " depth:%ld lt:eq:gt %zu:%zu:%zu exist:%zu eval:%zu error:%zu", - string_view{room_id}, - origins.count(), - origins.count_online(), - top_depth, - behind, - equal, - ahead, - exists, - evaluated, - errors.size(), - }; - - assert(ahead + equal + behind == respond); -} -catch(const std::exception &e) -{ - log::error - { - log, "Failed to synchronize recent %s :%s", - string_view{room_id}, - e.what(), - }; -} - -void -ircd::m::init::backfill::handle_missing(const room::id &room_id) -try -{ - const m::room room - { - room_id - }; - - const m::room::events::missing missing - { - room - }; - - const int64_t &room_depth - { - m::depth(std::nothrow, room) - }; - - const ssize_t &viewport_size - { - m::room::events::viewport_size - }; - - const int64_t min_depth - { - std::max(room_depth - viewport_size * 2, 0L) - }; - - ssize_t attempted(0); - std::set> fail; - missing.for_each(min_depth, [&room_id, &fail, &attempted, &room_depth, &min_depth] - (const auto &event_id, const int64_t &ref_depth, const auto &ref_idx) - { - if(unlikely(ctx::interruption_requested())) - return false; - - auto it{fail.lower_bound(event_id)}; - if(it == end(fail) || *it != event_id) - { - log::debug - { - log, "Fetching missing %s ref_depth:%zd in %s head_depth:%zu min_depth:%zd", - string_view{event_id}, - ref_depth, - string_view{room_id}, - room_depth, - min_depth, - }; - - if(!handle_event(room_id, event_id, string_view{}, false)) - fail.emplace_hint(it, event_id); - } - - ++attempted; - return true; - }); - - if(unlikely(ctx::interruption_requested())) - return; - - if(attempted - ssize_t(fail.size()) > 0L) - log::info - { - log, "Fetched %zu recent missing events in %s attempted:%zu fail:%zu", - attempted - fail.size(), - string_view{room_id}, - attempted, - fail.size(), - }; -} -catch(const std::exception &e) -{ - log::error - { - log, "Failed to synchronize missing %s :%s", - string_view{room_id}, - e.what(), - }; -} - -bool -ircd::m::init::backfill::handle_event(const room::id &room_id, - const event::id &event_id, - const string_view &hint, - const bool &ask_hint_only) -try -{ - fetch::opts opts; - opts.op = fetch::op::event; - opts.room_id = room_id; - opts.event_id = event_id; - opts.backfill_limit = 1; - opts.hint = hint; - opts.attempt_limit = ask_hint_only; - auto future - { - fetch::start(opts) - }; - - m::fetch::result result - { - future.get() - }; - - const json::object response - { - result - }; - - const json::array &pdus - { - json::object(result).at("pdus") - }; - - const m::event event - { - pdus.at(0), event_id - }; - - const auto &[viewport_depth, _] - { - m::viewport(room_id) - }; - - const bool below_viewport - { - json::get<"depth"_>(event) < viewport_depth - }; - - if(below_viewport) - log::debug - { - log, "Will not fetch children of %s depth:%ld below viewport:%ld in %s", - string_view{event_id}, - json::get<"depth"_>(event), - viewport_depth, - string_view{room_id}, - }; - - m::vm::opts vmopts; - vmopts.infolog_accept = true; - vmopts.fetch_prev = !below_viewport; - vmopts.fetch_state = below_viewport; - vmopts.warnlog &= ~vm::fault::EXISTS; - vmopts.node_id = hint; - m::vm::eval eval - { - event, vmopts - }; - - log::info - { - log, "acquired %s in %s depth:%ld viewport:%ld state:%b", - string_view{event_id}, - string_view{room_id}, - json::get<"depth"_>(event), - viewport_depth, - defined(json::get<"state_key"_>(event)), - }; - - return true; -} -catch(const std::exception &e) -{ - log::derror - { - log, "Failed to acquire %s synchronizing %s :%s", - string_view{event_id}, - string_view{room_id}, - e.what(), - }; - - return false; } void