mirror of
https://github.com/matrix-construct/construct
synced 2024-12-26 07:23:53 +01:00
ircd:Ⓜ️:dbs: Add prefetch loop for horizon resolver.
This commit is contained in:
parent
0335bfa948
commit
076384d697
1 changed files with 82 additions and 65 deletions
|
@ -143,97 +143,114 @@ ircd::m::dbs::desc::event_horizon
|
|||
// indexer
|
||||
//
|
||||
|
||||
namespace ircd::m::dbs
|
||||
{
|
||||
static void _index_event_horizon_resolve_one(db::txn &, const event &, const write_opts &, const event::idx &);
|
||||
}
|
||||
|
||||
// NOTE: QUERY
|
||||
void
|
||||
ircd::m::dbs::_index_event_horizon_resolve(db::txn &txn,
|
||||
const event &event,
|
||||
const write_opts &opts)
|
||||
{
|
||||
char buf[EVENT_HORIZON_KEY_MAX_SIZE];
|
||||
assert(opts.appendix.test(appendix::EVENT_HORIZON_RESOLVE));
|
||||
assert(opts.event_idx != 0);
|
||||
assert(event.event_id);
|
||||
|
||||
char buf[EVENT_HORIZON_KEY_MAX_SIZE];
|
||||
const string_view &key
|
||||
{
|
||||
event_horizon_key(buf, event.event_id)
|
||||
};
|
||||
|
||||
auto it
|
||||
auto it(dbs::event_horizon.begin(key)); do
|
||||
{
|
||||
dbs::event_horizon.begin(key)
|
||||
};
|
||||
static const size_t max{32};
|
||||
|
||||
for(; it; ++it)
|
||||
{
|
||||
const auto parts
|
||||
size_t num(0);
|
||||
event::idx event_idx[max];
|
||||
for(; num < max && it; ++it)
|
||||
{
|
||||
event_horizon_key(it->first)
|
||||
};
|
||||
|
||||
const auto &event_idx
|
||||
{
|
||||
std::get<0>(parts)
|
||||
};
|
||||
|
||||
assert(event_idx != 0);
|
||||
assert(event_idx != opts.event_idx);
|
||||
const event::fetch _event
|
||||
{
|
||||
std::nothrow, event_idx
|
||||
};
|
||||
|
||||
if(!_event.valid)
|
||||
{
|
||||
log::dwarning
|
||||
{
|
||||
log, "Horizon resolve for %s @%lu not possible @%lu",
|
||||
string_view{event.event_id},
|
||||
opts.event_idx,
|
||||
event_idx,
|
||||
};
|
||||
|
||||
continue;
|
||||
event_idx[num] = std::get<0>(event_horizon_key(it->first));
|
||||
num += event_idx[num] != 0;
|
||||
}
|
||||
|
||||
log::debug
|
||||
for(size_t i(0); i < num; ++i)
|
||||
m::prefetch(event_idx[i]);
|
||||
|
||||
for(size_t i(0); i < num; ++i)
|
||||
_index_event_horizon_resolve_one(txn, event, opts, event_idx[i]);
|
||||
}
|
||||
while(it);
|
||||
}
|
||||
|
||||
// NOTE: QUERY
|
||||
void
|
||||
ircd::m::dbs::_index_event_horizon_resolve_one(db::txn &txn,
|
||||
const event &event,
|
||||
const write_opts &opts,
|
||||
const event::idx &event_idx)
|
||||
{
|
||||
assert(event_idx != 0);
|
||||
assert(event_idx != opts.event_idx);
|
||||
|
||||
const event::fetch _event
|
||||
{
|
||||
std::nothrow, event_idx
|
||||
};
|
||||
|
||||
if(!_event.valid)
|
||||
{
|
||||
log::dwarning
|
||||
{
|
||||
log, "Horizon resolve for %s @%lu; remisé %s @%lu",
|
||||
log, "Horizon resolve for %s @%lu not possible @%lu",
|
||||
string_view{event.event_id},
|
||||
opts.event_idx,
|
||||
string_view{_event.event_id},
|
||||
event_idx,
|
||||
};
|
||||
|
||||
// Make the references on behalf of the future event
|
||||
write_opts _opts;
|
||||
_opts.op = opts.op;
|
||||
_opts.event_idx = event_idx;
|
||||
_opts.appendix.reset();
|
||||
_opts.appendix.set(appendix::EVENT_REFS);
|
||||
_opts.appendix.set(appendix::ROOM_REDACT);
|
||||
_opts.appendix.set(appendix::ROOM_HEAD_RESOLVE);
|
||||
_opts.event_refs = opts.horizon_resolve;
|
||||
_opts.interpose = &txn;
|
||||
write(txn, _event, _opts);
|
||||
|
||||
// Delete the event_horizon entry after resolving.
|
||||
thread_local char buf[EVENT_HORIZON_KEY_MAX_SIZE];
|
||||
const string_view &key
|
||||
{
|
||||
event_horizon_key(buf, event.event_id, event_idx)
|
||||
};
|
||||
|
||||
db::txn::append
|
||||
{
|
||||
txn, dbs::event_horizon,
|
||||
{
|
||||
opts.op == db::op::SET?
|
||||
db::op::DELETE:
|
||||
db::op::SET,
|
||||
key
|
||||
}
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
log::debug
|
||||
{
|
||||
log, "Horizon resolve for %s @%lu; remisé %s @%lu",
|
||||
string_view{event.event_id},
|
||||
opts.event_idx,
|
||||
string_view{_event.event_id},
|
||||
event_idx,
|
||||
};
|
||||
|
||||
// Make the references on behalf of the future event
|
||||
write_opts _opts;
|
||||
_opts.op = opts.op;
|
||||
_opts.event_idx = event_idx;
|
||||
_opts.appendix.reset();
|
||||
_opts.appendix.set(appendix::EVENT_REFS);
|
||||
_opts.appendix.set(appendix::ROOM_REDACT);
|
||||
_opts.appendix.set(appendix::ROOM_HEAD_RESOLVE);
|
||||
_opts.event_refs = opts.horizon_resolve;
|
||||
_opts.interpose = &txn;
|
||||
write(txn, _event, _opts);
|
||||
|
||||
// Delete the event_horizon entry after resolving.
|
||||
thread_local char buf[EVENT_HORIZON_KEY_MAX_SIZE];
|
||||
const string_view &key
|
||||
{
|
||||
event_horizon_key(buf, event.event_id, event_idx)
|
||||
};
|
||||
|
||||
db::txn::append
|
||||
{
|
||||
txn, dbs::event_horizon,
|
||||
{
|
||||
opts.op == db::op::SET?
|
||||
db::op::DELETE:
|
||||
db::op::SET,
|
||||
key
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void
|
||||
|
|
Loading…
Reference in a new issue