0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-14 00:34:18 +01:00

ircd:Ⓜ️:room::state: Improve coarse rebuild interface.

This commit is contained in:
Jason Volk 2019-08-22 04:32:15 -07:00
parent cf662a833a
commit 652ffa8ea2
4 changed files with 174 additions and 95 deletions

View file

@ -25,6 +25,7 @@ struct ircd::m::room::state
struct opts;
struct space;
struct history;
struct rebuild;
using closure = std::function<void (const string_view &, const string_view &, const event::idx &)>;
using closure_bool = std::function<bool (const string_view &, const string_view &, const event::idx &)>;
@ -97,8 +98,13 @@ struct ircd::m::room::state
static size_t prefetch(const state &, const string_view &, const event::idx_range &);
static bool present(const event::idx &);
static size_t rebuild_present(const room::id &);
static size_t purge_replaced(const room::id &);
static bool is(std::nothrow_t, const event::idx &);
static bool is(const event::idx &);
};
struct ircd::m::room::state::rebuild
{
rebuild(const room::id &);
rebuild();
};

View file

@ -44,5 +44,6 @@ struct ircd::m::room::state::space
struct ircd::m::room::state::space::rebuild
{
rebuild(const room::id &);
rebuild();
};

View file

@ -36,6 +36,98 @@ ircd::m::room::purge(const room &room)
return ret;
}
ircd::m::room::state::rebuild::rebuild()
{
rooms::opts opts;
opts.remote_joined_only = true;
rooms::for_each(opts, [](const auto &room_id)
{
rebuild{room_id};
return true;
});
}
ircd::m::room::state::rebuild::rebuild(const room::id &room_id)
{
const m::event::id::buf event_id
{
m::head(room_id)
};
const m::room::state::history history
{
room_id, event_id
};
const m::room::state present_state
{
room_id
};
m::dbs::write_opts opts;
opts.appendix.reset();
opts.appendix.set(dbs::appendix::ROOM_STATE);
opts.appendix.set(dbs::appendix::ROOM_JOINED);
db::txn txn
{
*m::dbs::events
};
ssize_t deleted(0);
present_state.for_each([&opts, &txn, &deleted]
(const auto &type, const auto &state_key, const auto &event_idx)
{
const m::event::fetch &event
{
event_idx, std::nothrow
};
if(!event.valid)
return true;
auto _opts(opts);
_opts.op = db::op::DELETE;
_opts.event_idx = event_idx;
dbs::write(txn, event, _opts);
++deleted;
return true;
});
ssize_t added(0);
history.for_each([&opts, &txn, &added]
(const auto &type, const auto &state_key, const auto &depth, const auto &event_idx)
{
const m::event::fetch &event
{
event_idx, std::nothrow
};
if(!event.valid)
return true;
auto _opts(opts);
_opts.op = db::op::SET;
_opts.event_idx = event_idx;
dbs::write(txn, event, _opts);
++added;
return true;
});
log::info
{
log, "Present state of %s @ %s rebuild complete with %zu size:%s del:%zd add:%zd (%zd)",
string_view{room_id},
string_view{event_id},
txn.size(),
pretty(iec(txn.bytes())),
deleted,
added,
(added - deleted),
};
txn();
}
bool
ircd::m::room::state::is(const event::idx &event_idx)
{
@ -95,48 +187,6 @@ ircd::m::room::state::purge_replaced(const room::id &room_id)
return ret;
}
size_t
ircd::m::room::state::rebuild_present(const room::id &room_id)
{
size_t ret{0};
m::room::messages it
{
room_id, uint64_t(0)
};
if(!it)
return ret;
const m::room::state state
{
room_id
};
db::txn txn
{
*m::dbs::events
};
for(; it; ++it)
{
const m::event::idx &event_idx{it.event_idx()};
if(!state.is(std::nothrow, event_idx))
continue;
const m::event &event{*it};
m::dbs::write_opts opts;
opts.event_idx = event_idx;
opts.appendix.reset();
opts.appendix.set(dbs::appendix::ROOM_STATE);
opts.appendix.set(dbs::appendix::ROOM_JOINED);
dbs::write(txn, event, opts);
++ret;
}
txn();
return ret;
}
bool
ircd::m::room::state::present(const event::idx &event_idx)
{
@ -2941,56 +2991,67 @@ const
ircd::m::room::state::space::rebuild::rebuild()
{
static const size_t interval
rooms::opts opts;
opts.remote_joined_only = true;
rooms::for_each(opts, [](const auto &room_id)
{
500000UL
};
rebuild{room_id};
return true;
});
}
auto &column
{
dbs::event_column.at(json::indexof<m::event>("state_key"_sv))
};
size_t total(0);
size_t current(0);
ircd::m::room::state::space::rebuild::rebuild(const room::id &room_id)
{
db::txn txn
{
*m::dbs::events
};
dbs::write_opts wopts;
wopts.appendix.reset();
wopts.appendix.set(dbs::appendix::ROOM_STATE_SPACE);
event::fetch event;
for(auto it(column.begin()); it; ++it) try
m::room::messages it
{
const event::idx &event_idx
room_id, uint64_t(0)
};
if(!it)
return;
size_t state_count(0), messages_count(0), state_deleted(0);
for(; it; ++it, ++messages_count) try
{
const m::event::idx &event_idx
{
byte_view<event::idx>(it->first)
it.event_idx()
};
seek(event, event_idx);
wopts.event_idx = event_idx;
dbs::write(txn, event, wopts);
if(!state::is(std::nothrow, event_idx))
continue;
++total;
++current;
if(current >= interval)
++state_count;
const m::event &event{*it};
const auto &[pass, reason]
{
log::info
m::room::auth::check_static(event)
};
if(!pass)
log::dwarning
{
log, "room::state::space::rebuild total:%zu committing events:%zu elems:%zu size:%s",
total,
current,
txn.size(),
pretty(iec(txn.bytes()))
log, "%s in %s erased from state space :%s",
string_view{event.event_id},
string_view{room_id},
what(reason),
};
txn();
txn.clear();
current = 0;
}
dbs::write_opts opts;
opts.event_idx = event_idx;
opts.appendix.reset();
opts.appendix.set(dbs::appendix::ROOM_STATE_SPACE);
opts.op = pass? db::op::SET : db::op::DELETE;
state_deleted += !pass;
dbs::write(txn, event, opts);
}
catch(const ctx::interrupted &e)
{
@ -3013,9 +3074,11 @@ ircd::m::room::state::space::rebuild::rebuild()
log::info
{
log, "room::state::space::rebuild total:%zu final transaction events:%zu elems:%zu size:%s",
total,
current,
log, "room::state::space::rebuild %s complete msgs:%zu state:%zu del:%zu transaction elems:%zu size:%s",
string_view{room_id},
messages_count,
state_count,
state_deleted,
txn.size(),
pretty(iec(txn.bytes()))
};

View file

@ -9167,7 +9167,23 @@ console_cmd__room__state__space(opt &out, const string_view &line)
bool
console_cmd__room__state__space__rebuild(opt &out, const string_view &line)
{
m::room::state::space::rebuild{};
const params param{line, " ",
{
"room_id",
}};
const auto room_id
{
param["room_id"]?
m::room_id(param.at("room_id")):
m::room::id::buf{}
};
if(room_id)
m::room::state::space::rebuild{room_id};
else
m::room::state::space::rebuild{};
return true;
}
@ -9194,7 +9210,7 @@ console_cmd__room__state__purge__replaced(opt &out, const string_view &line)
}
bool
console_cmd__room__state__rebuild__present(opt &out, const string_view &line)
console_cmd__room__state__rebuild(opt &out, const string_view &line)
{
const params param{line, " ",
{
@ -9212,19 +9228,12 @@ console_cmd__room__state__rebuild__present(opt &out, const string_view &line)
{
m::rooms::opts opts;
opts.remote_joined_only = room_id == "remote_joined_only";
m::rooms::for_each(opts, [&out]
m::rooms::for_each(opts, []
(const m::room::id &room_id)
{
const size_t count
m::room::state::rebuild
{
m::room::state::rebuild_present(room_id)
};
log::info
{
"Rebuild of %s complete with %zu present state events.",
string_view{room_id},
count,
room_id
};
return true;
@ -9233,12 +9242,12 @@ console_cmd__room__state__rebuild__present(opt &out, const string_view &line)
return true;
}
const size_t count
m::room::state::rebuild
{
m::room::state::rebuild_present(room_id)
room_id
};
out << "done " << room_id << " " << count << std::endl;
out << "done" << std::endl;
return true;
}