// Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors // Copyright (C) 2016-2019 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. namespace ircd::m::init::backfill { extern conf::item gossip_enable; extern conf::item gossip_timeout; size_t gossip(const room::id &, const event::id &, const string_view &remote); bool handle_event(const room::id &, const event::id &, const string_view &hint, const bool &ask_one); void handle_missing(const room::id &); void handle_room(const room::id &); void worker(); extern std::unique_ptr worker_context; extern conf::item enable; extern conf::item pool_size; extern conf::item local_joined_only; extern log::log log; }; decltype(ircd::m::init::backfill::log) ircd::m::init::backfill::log { "m.init.backfill" }; decltype(ircd::m::init::backfill::enable) ircd::m::init::backfill::enable { { "name", "m.init.backfill.enable" }, { "default", true }, }; decltype(ircd::m::init::backfill::pool_size) ircd::m::init::backfill::pool_size { { "name", "m.init.backfill.pool_size" }, { "default", 12L }, }; decltype(ircd::m::init::backfill::local_joined_only) ircd::m::init::backfill::local_joined_only { { "name", "m.init.backfill.local_joined_only" }, { "default", true }, }; decltype(ircd::m::init::backfill::worker_context) ircd::m::init::backfill::worker_context; void ircd::m::init::backfill::init() { if(!enable) { log::warning { log, "Initial synchronization of rooms from remote servers has" " been disabled by the configuration. Not fetching latest events." }; return; } if(ircd::read_only || ircd::write_avoid) { log::warning { log, "Not performing initial backfill because write-avoid flag is set." }; return; } assert(!worker_context); worker_context.reset(new context { "m.init.backfill", 512_KiB, &worker, context::POST }); } void ircd::m::init::backfill::fini() noexcept { if(!worker_context) return; log::debug { log, "Terminating worker context..." }; worker_context.reset(nullptr); } void ircd::m::init::backfill::worker() try { // Wait for runlevel RUN before proceeding... run::barrier{}; // Set a low priority for this context; see related pool_opts ionice(ctx::cur(), 4); nice(ctx::cur(), 4); // Prepare to iterate all of the rooms this server is aware of which // contain at least one member from another server in any state, and // one member from our server in a joined state. rooms::opts opts; opts.remote_only = true; opts.local_joined_only = local_joined_only; // This is only an estimate because the rooms on the server can change // before this task completes. const auto estimate { 1UL //rooms::count(opts) }; if(!estimate) return; log::notice { log, "Starting initial backfill of rooms from other servers...", estimate, }; // Prepare a pool of child contexts to process rooms concurrently. // The context pool lives directly in this frame. static const ctx::pool::opts pool_opts { 512_KiB, // stack sz size_t(pool_size), // pool sz -1, // queue max hard 0, // queue max soft true, // queue max blocking true, // queue max warning 3, // ionice 3, // nice }; ctx::pool pool { "m.init.backfill", pool_opts }; ctx::dock dock; size_t count(0), complete(0); const auto each_room{[&estimate, &count, &complete, &dock] (const room::id &room_id) { const unwind completed{[&complete, &dock] { ++complete; dock.notify_one(); }}; handle_room(room_id); ctx::interruption_point(); handle_missing(room_id); ctx::interruption_point(); log::info { log, "Initial backfill of %s complete:%zu", //estimate:%zu %02.2lf%%", string_view{room_id}, complete, estimate, (complete / double(estimate)) * 100.0, }; return true; }}; // Iterate the room_id's, submitting a copy of each to the next pool // worker; the submission blocks when all pool workers are busy, as per // the pool::opts. const ctx::uninterruptible ui; rooms::for_each(opts, [&pool, &each_room, &count] (const room::id &room_id) { if(unlikely(ctx::interruption_requested())) return false; ++count; pool([&each_room, room_id(std::string(room_id))] { each_room(room_id); }); return true; }); if(complete < count) log::dwarning { log, "Waiting for initial resynchronization count:%zu complete:%zu rooms...", count, complete, }; if(unlikely(ctx::interruption_requested())) pool.terminate(); // All rooms have been submitted to the pool but the pool workers might // still be busy. If we unwind now the pool's dtor will kill the workers // so we synchronize their completion here. dock.wait([&complete, &count] { return complete >= count; }); if(unlikely(ctx::interruption_requested())) return; log::notice { log, "Initial resynchronization of %zu rooms completed.", count, }; } catch(const ctx::interrupted &e) { log::derror { log, "Worker interrupted without completing resynchronization of all rooms." }; throw; } catch(const ctx::terminated &e) { log::error { log, "Worker terminated without completing resynchronization of all rooms." }; throw; } catch(const std::exception &e) { log::critical { log, "Worker fatal :%s", e.what(), }; } void ircd::m::init::backfill::handle_room(const room::id &room_id) try { const m::room room { room_id }; const room::origins origins { room }; // When the room isn't public we need to supply a user_id of one of our // users in the room to satisfy matrix protocol requirements upstack. const auto user_id { m::any_user(room, my_host(), "join") }; size_t respond(0), behind(0), equal(0), ahead(0); size_t exists(0), fetching(0), evaluated(0); std::set> errors; const auto &[top_event_id, top_event_depth, top_event_idx] { m::top(std::nothrow, room) }; log::info { log, "Resynchronizing %s from %s [idx:%lu depth:%ld] from %zu joined servers...", string_view{room_id}, string_view{top_event_id}, top_event_idx, top_event_depth, origins.count(), }; feds::opts opts; opts.op = feds::op::head; opts.room_id = room_id; opts.user_id = user_id; opts.closure_errors = false; // exceptions wil not propagate feds::execute opts.exclude_myself = true; const auto &top_depth(top_event_depth); // clang structured-binding & closure oops feds::execute(opts, [&](const auto &result) { const m::event event { result.object.get("event") }; // The depth comes back as one greater than any existing // depth so we subtract one. const auto &depth { std::max(json::get<"depth"_>(event) - 1L, 0L) }; ++respond; ahead += depth > top_depth; equal += depth == top_depth; behind += depth < top_depth; const event::prev prev { event }; return m::for_each(prev, [&](const event::id &event_id) { if(unlikely(ctx::interruption_requested())) return false; if(errors.count(event_id)) return true; if(!m::exists(event::id(event_id))) { ++fetching; if(!handle_event(room_id, event_id, result.origin, true)) { // If we fail the process the event we cache that and cease here. errors.emplace(event_id); return true; } else ++evaluated; } else ++exists; // If the event already exists or was successfully obtained we // reward the remote with gossip of events which reference this // event which it is unlikely to have. if(gossip_enable) gossip(room_id, event_id, result.origin); return true; }); }); if(unlikely(ctx::interruption_requested())) return; log::info { log, "Acquired %s remote head; servers:%zu online:%zu" " depth:%ld lt:eq:gt %zu:%zu:%zu exist:%zu eval:%zu error:%zu", string_view{room_id}, origins.count(), origins.count_online(), top_depth, behind, equal, ahead, exists, evaluated, errors.size(), }; assert(ahead + equal + behind == respond); } catch(const std::exception &e) { log::error { log, "Failed to synchronize recent %s :%s", string_view{room_id}, e.what(), }; } void ircd::m::init::backfill::handle_missing(const room::id &room_id) try { const m::room room { room_id }; const m::room::events::missing missing { room }; const int64_t &room_depth { m::depth(std::nothrow, room) }; const ssize_t &viewport_size { m::room::events::viewport_size }; const int64_t min_depth { std::max(room_depth - viewport_size * 2, 0L) }; ssize_t attempted(0); std::set> fail; missing.for_each(min_depth, [&room_id, &fail, &attempted, &room_depth, &min_depth] (const auto &event_id, const int64_t &ref_depth, const auto &ref_idx) { if(unlikely(ctx::interruption_requested())) return false; auto it{fail.lower_bound(event_id)}; if(it == end(fail) || *it != event_id) { log::debug { log, "Fetching missing %s ref_depth:%zd in %s head_depth:%zu min_depth:%zd", string_view{event_id}, ref_depth, string_view{room_id}, room_depth, min_depth, }; if(!handle_event(room_id, event_id, string_view{}, false)) fail.emplace_hint(it, event_id); } ++attempted; return true; }); if(unlikely(ctx::interruption_requested())) return; if(attempted - ssize_t(fail.size()) > 0L) log::info { log, "Fetched %zu recent missing events in %s attempted:%zu fail:%zu", attempted - fail.size(), string_view{room_id}, attempted, fail.size(), }; } catch(const std::exception &e) { log::error { log, "Failed to synchronize missing %s :%s", string_view{room_id}, e.what(), }; } bool ircd::m::init::backfill::handle_event(const room::id &room_id, const event::id &event_id, const string_view &hint, const bool &ask_hint_only) try { fetch::opts opts; opts.op = fetch::op::event; opts.room_id = room_id; opts.event_id = event_id; opts.backfill_limit = 1; opts.hint = hint; opts.attempt_limit = ask_hint_only; auto future { fetch::start(opts) }; m::fetch::result result { future.get() }; const json::object response { result }; const json::array &pdus { json::object(result).at("pdus") }; const m::event event { pdus.at(0), event_id }; const auto &[viewport_depth, _] { m::viewport(room_id) }; const bool below_viewport { json::get<"depth"_>(event) < viewport_depth }; if(below_viewport) log::debug { log, "Will not fetch children of %s depth:%ld below viewport:%ld in %s", string_view{event_id}, json::get<"depth"_>(event), viewport_depth, string_view{room_id}, }; m::vm::opts vmopts; vmopts.infolog_accept = true; vmopts.fetch_prev = !below_viewport; vmopts.fetch_state = below_viewport; vmopts.warnlog &= ~vm::fault::EXISTS; vmopts.node_id = hint; m::vm::eval eval { event, vmopts }; log::info { log, "acquired %s in %s depth:%ld viewport:%ld state:%b", string_view{event_id}, string_view{room_id}, json::get<"depth"_>(event), viewport_depth, defined(json::get<"state_key"_>(event)), }; return true; } catch(const std::exception &e) { log::derror { log, "Failed to acquire %s synchronizing %s :%s", string_view{event_id}, string_view{room_id}, e.what(), }; return false; } decltype(ircd::m::init::backfill::gossip_enable) ircd::m::init::backfill::gossip_enable { { "name", "m.init.backfill.gossip.enable" }, { "default", true }, }; decltype(ircd::m::init::backfill::gossip_timeout) ircd::m::init::backfill::gossip_timeout { { "name", "m.init.backfill.gossip.timeout" }, { "default", 5L }, }; /// Initial gossip protocol works by sending the remote server some events which /// reference an event contained in the remote's head which we just obtained. /// This is part of a family of active measures taken to reduce forward /// extremities on other servers but without polluting the chain with /// permanent data for this purpose such as with org.matrix.dummy_event. size_t ircd::m::init::backfill::gossip(const room::id &room_id, const event::id &event_id, const string_view &remote) { size_t ret{0}; const m::event::refs refs { m::index(std::nothrow, event_id) }; static const size_t max{48}; const size_t count { std::min(refs.count(dbs::ref::NEXT), max) }; if(!count) return ret; const unique_mutable_buffer buf[] { { event::MAX_SIZE * (count + 1) }, { 16_KiB }, }; size_t i{0}; std::array next_idx; refs.for_each(dbs::ref::NEXT, [&next_idx, &i] (const event::idx &event_idx, const auto &ref_type) { assert(ref_type == dbs::ref::NEXT); next_idx.at(i) = event_idx; return ++i < next_idx.size(); }); json::stack out{buf[0]}; { json::stack::object top { out }; json::stack::member { top, "origin", m::my_host() }; json::stack::member { top, "origin_server_ts", json::value { long(ircd::time()) } }; json::stack::array pdus { top, "pdus" }; m::event::fetch event; for(assert(ret == 0); ret < i; ++ret) if(seek(event, next_idx.at(ret), std::nothrow)) pdus.append(event.source); } const string_view txn { out.completed() }; char idbuf[64]; const string_view txnid { m::txn::create_id(idbuf, txn) }; m::fed::send::opts opts; opts.remote = remote; m::fed::send request { txnid, txn, buf[1], std::move(opts) }; http::code code{0}; std::exception_ptr eptr; if(request.wait(seconds(gossip_timeout), std::nothrow)) try { code = request.get(); ret += code == http::OK; } catch(...) { eptr = std::current_exception(); } log::logf { log, code == http::OK? log::DEBUG : log::DERROR, "gossip %zu:%zu to %s reference to %s in %s :%s %s", ret, count, remote, string_view{event_id}, string_view{room_id}, code? status(code): "failed", eptr? what(eptr): string_view{}, }; return ret; }