From 652ffa8ea2ceea7c49669f60205ce96f3d498e9a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 22 Aug 2019 04:32:15 -0700 Subject: [PATCH] ircd::m::room::state: Improve coarse rebuild interface. --- include/ircd/m/room/state.h | 8 +- include/ircd/m/room/state_space.h | 1 + ircd/m_room.cc | 221 +++++++++++++++++++----------- modules/console.cc | 39 ++++-- 4 files changed, 174 insertions(+), 95 deletions(-) diff --git a/include/ircd/m/room/state.h b/include/ircd/m/room/state.h index e0cc63c70..24b58676f 100644 --- a/include/ircd/m/room/state.h +++ b/include/ircd/m/room/state.h @@ -25,6 +25,7 @@ struct ircd::m::room::state struct opts; struct space; struct history; + struct rebuild; using closure = std::function; using closure_bool = std::function; @@ -97,8 +98,13 @@ struct ircd::m::room::state static size_t prefetch(const state &, const string_view &, const event::idx_range &); static bool present(const event::idx &); - static size_t rebuild_present(const room::id &); static size_t purge_replaced(const room::id &); static bool is(std::nothrow_t, const event::idx &); static bool is(const event::idx &); }; + +struct ircd::m::room::state::rebuild +{ + rebuild(const room::id &); + rebuild(); +}; diff --git a/include/ircd/m/room/state_space.h b/include/ircd/m/room/state_space.h index 7378844f1..83855c88a 100644 --- a/include/ircd/m/room/state_space.h +++ b/include/ircd/m/room/state_space.h @@ -44,5 +44,6 @@ struct ircd::m::room::state::space struct ircd::m::room::state::space::rebuild { + rebuild(const room::id &); rebuild(); }; diff --git a/ircd/m_room.cc b/ircd/m_room.cc index 100ba0324..f4a42ed53 100644 --- a/ircd/m_room.cc +++ b/ircd/m_room.cc @@ -36,6 +36,98 @@ ircd::m::room::purge(const room &room) return ret; } +ircd::m::room::state::rebuild::rebuild() +{ + rooms::opts opts; + opts.remote_joined_only = true; + rooms::for_each(opts, [](const auto &room_id) + { + rebuild{room_id}; + return true; + }); +} + +ircd::m::room::state::rebuild::rebuild(const room::id &room_id) +{ + const m::event::id::buf event_id + { + m::head(room_id) + }; + + const m::room::state::history history + { + room_id, event_id + }; + + const m::room::state present_state + { + room_id + }; + + m::dbs::write_opts opts; + opts.appendix.reset(); + opts.appendix.set(dbs::appendix::ROOM_STATE); + opts.appendix.set(dbs::appendix::ROOM_JOINED); + db::txn txn + { + *m::dbs::events + }; + + ssize_t deleted(0); + present_state.for_each([&opts, &txn, &deleted] + (const auto &type, const auto &state_key, const auto &event_idx) + { + const m::event::fetch &event + { + event_idx, std::nothrow + }; + + if(!event.valid) + return true; + + auto _opts(opts); + _opts.op = db::op::DELETE; + _opts.event_idx = event_idx; + dbs::write(txn, event, _opts); + ++deleted; + return true; + }); + + ssize_t added(0); + history.for_each([&opts, &txn, &added] + (const auto &type, const auto &state_key, const auto &depth, const auto &event_idx) + { + const m::event::fetch &event + { + event_idx, std::nothrow + }; + + if(!event.valid) + return true; + + auto _opts(opts); + _opts.op = db::op::SET; + _opts.event_idx = event_idx; + dbs::write(txn, event, _opts); + ++added; + return true; + }); + + log::info + { + log, "Present state of %s @ %s rebuild complete with %zu size:%s del:%zd add:%zd (%zd)", + string_view{room_id}, + string_view{event_id}, + txn.size(), + pretty(iec(txn.bytes())), + deleted, + added, + (added - deleted), + }; + + txn(); +} + bool ircd::m::room::state::is(const event::idx &event_idx) { @@ -95,48 +187,6 @@ ircd::m::room::state::purge_replaced(const room::id &room_id) return ret; } -size_t -ircd::m::room::state::rebuild_present(const room::id &room_id) -{ - size_t ret{0}; - m::room::messages it - { - room_id, uint64_t(0) - }; - - if(!it) - return ret; - - const m::room::state state - { - room_id - }; - - db::txn txn - { - *m::dbs::events - }; - - for(; it; ++it) - { - const m::event::idx &event_idx{it.event_idx()}; - if(!state.is(std::nothrow, event_idx)) - continue; - - const m::event &event{*it}; - m::dbs::write_opts opts; - opts.event_idx = event_idx; - opts.appendix.reset(); - opts.appendix.set(dbs::appendix::ROOM_STATE); - opts.appendix.set(dbs::appendix::ROOM_JOINED); - dbs::write(txn, event, opts); - ++ret; - } - - txn(); - return ret; -} - bool ircd::m::room::state::present(const event::idx &event_idx) { @@ -2941,56 +2991,67 @@ const ircd::m::room::state::space::rebuild::rebuild() { - static const size_t interval + rooms::opts opts; + opts.remote_joined_only = true; + rooms::for_each(opts, [](const auto &room_id) { - 500000UL - }; + rebuild{room_id}; + return true; + }); +} - auto &column - { - dbs::event_column.at(json::indexof("state_key"_sv)) - }; - - size_t total(0); - size_t current(0); +ircd::m::room::state::space::rebuild::rebuild(const room::id &room_id) +{ db::txn txn { *m::dbs::events }; - dbs::write_opts wopts; - wopts.appendix.reset(); - wopts.appendix.set(dbs::appendix::ROOM_STATE_SPACE); - - event::fetch event; - for(auto it(column.begin()); it; ++it) try + m::room::messages it { - const event::idx &event_idx + room_id, uint64_t(0) + }; + + if(!it) + return; + + size_t state_count(0), messages_count(0), state_deleted(0); + for(; it; ++it, ++messages_count) try + { + const m::event::idx &event_idx { - byte_view(it->first) + it.event_idx() }; - seek(event, event_idx); - wopts.event_idx = event_idx; - dbs::write(txn, event, wopts); + if(!state::is(std::nothrow, event_idx)) + continue; - ++total; - ++current; - if(current >= interval) + ++state_count; + const m::event &event{*it}; + const auto &[pass, reason] { - log::info + m::room::auth::check_static(event) + }; + + if(!pass) + log::dwarning { - log, "room::state::space::rebuild total:%zu committing events:%zu elems:%zu size:%s", - total, - current, - txn.size(), - pretty(iec(txn.bytes())) + log, "%s in %s erased from state space :%s", + string_view{event.event_id}, + string_view{room_id}, + what(reason), }; - txn(); - txn.clear(); - current = 0; - } + dbs::write_opts opts; + opts.event_idx = event_idx; + + opts.appendix.reset(); + opts.appendix.set(dbs::appendix::ROOM_STATE_SPACE); + + opts.op = pass? db::op::SET : db::op::DELETE; + state_deleted += !pass; + + dbs::write(txn, event, opts); } catch(const ctx::interrupted &e) { @@ -3013,9 +3074,11 @@ ircd::m::room::state::space::rebuild::rebuild() log::info { - log, "room::state::space::rebuild total:%zu final transaction events:%zu elems:%zu size:%s", - total, - current, + log, "room::state::space::rebuild %s complete msgs:%zu state:%zu del:%zu transaction elems:%zu size:%s", + string_view{room_id}, + messages_count, + state_count, + state_deleted, txn.size(), pretty(iec(txn.bytes())) }; diff --git a/modules/console.cc b/modules/console.cc index 84c86aa8d..eda4f434f 100644 --- a/modules/console.cc +++ b/modules/console.cc @@ -9167,7 +9167,23 @@ console_cmd__room__state__space(opt &out, const string_view &line) bool console_cmd__room__state__space__rebuild(opt &out, const string_view &line) { - m::room::state::space::rebuild{}; + const params param{line, " ", + { + "room_id", + }}; + + const auto room_id + { + param["room_id"]? + m::room_id(param.at("room_id")): + m::room::id::buf{} + }; + + if(room_id) + m::room::state::space::rebuild{room_id}; + else + m::room::state::space::rebuild{}; + return true; } @@ -9194,7 +9210,7 @@ console_cmd__room__state__purge__replaced(opt &out, const string_view &line) } bool -console_cmd__room__state__rebuild__present(opt &out, const string_view &line) +console_cmd__room__state__rebuild(opt &out, const string_view &line) { const params param{line, " ", { @@ -9212,19 +9228,12 @@ console_cmd__room__state__rebuild__present(opt &out, const string_view &line) { m::rooms::opts opts; opts.remote_joined_only = room_id == "remote_joined_only"; - m::rooms::for_each(opts, [&out] + m::rooms::for_each(opts, [] (const m::room::id &room_id) { - const size_t count + m::room::state::rebuild { - m::room::state::rebuild_present(room_id) - }; - - log::info - { - "Rebuild of %s complete with %zu present state events.", - string_view{room_id}, - count, + room_id }; return true; @@ -9233,12 +9242,12 @@ console_cmd__room__state__rebuild__present(opt &out, const string_view &line) return true; } - const size_t count + m::room::state::rebuild { - m::room::state::rebuild_present(room_id) + room_id }; - out << "done " << room_id << " " << count << std::endl; + out << "done" << std::endl; return true; }