0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-25 05:18:23 +02:00

ircd:Ⓜ️:dbs: Add prefetch handlers; mechanism and related options.

This commit is contained in:
Jason Volk 2020-10-29 23:03:32 -07:00
parent ae2d112612
commit d3b33ec811
6 changed files with 457 additions and 10 deletions

View file

@ -28,7 +28,7 @@ namespace ircd::m::dbs
extern std::shared_ptr<db::database> events;
// [SET (txn)] Basic write suite
void write(db::txn &, const event &, const write_opts &);
size_t write(db::txn &, const event &, const write_opts &);
}
/// Database description
@ -113,6 +113,17 @@ struct ircd::m::dbs::write_opts
/// and "blacklist" they must know that `event_id => 0` was *found* to be
/// zero.
bool blacklist {false};
/// Whether index operations should be performed. This effectively toggles
/// building of the transaction; if set to false, write() will not append
/// to the transaction.
bool index {true};
/// Perform a round of prefetches for data which may block queries made
/// during indexing. Note that when index=false, only prefetches are made
/// and write() should not have a reason to block; the return value becomes
/// the number of prefetches launched.
bool prefetch {true};
};
/// Values which represent some element(s) included in a transaction or
@ -201,6 +212,9 @@ struct ircd::m::dbs::init
// Internal utils (here for now)
namespace ircd::m::dbs
{
size_t prefetch_event_idx(const vector_view<const event::id> &in, const write_opts &);
bool prefetch_event_idx(const event::id &, const write_opts &);
size_t find_event_idx(const vector_view<event::idx> &out, const vector_view<const event::id> &in, const write_opts &);
event::idx find_event_idx(const event::id &, const write_opts &);
}
@ -215,3 +229,11 @@ ircd::m::dbs::find_event_idx(const event::id &event_id,
find_event_idx(out, in, wopts);
return ret;
}
inline bool
ircd::m::dbs::prefetch_event_idx(const event::id &event_id,
const write_opts &wopts)
{
const vector_view<const event::id> in(&event_id, 1);
return prefetch_event_idx(in, wopts);
}

View file

@ -22,6 +22,7 @@ namespace ircd::m::dbs
string_view event_horizon_key(const mutable_buffer &out, const id::event &);
std::tuple<event::idx> event_horizon_key(const string_view &amalgam);
size_t _prefetch_event_horizon_resolve(db::txn &, const event &, const write_opts &);
void _index_event_horizon_resolve(db::txn &, const event &, const write_opts &); //query
void _index_event_horizon(db::txn &, const event &, const write_opts &, const id::event &);

View file

@ -40,6 +40,7 @@ namespace ircd::m::dbs
string_view
reflect(const ref &);
size_t _prefetch_event_refs(db::txn &, const event &, const write_opts &);
void _index_event_refs(db::txn &, const event &, const write_opts &);
// event_idx | ref_type, event_idx

View file

@ -174,11 +174,12 @@ ircd::m::dbs::write_opts::appendix_all{[]
namespace ircd::m::dbs
{
static void _index(db::txn &, const event &, const write_opts &);
static void blacklist(db::txn &txn, const event::id &, const write_opts &);
static size_t _prefetch(db::txn &, const event &, const write_opts &);
static size_t _index(db::txn &, const event &, const write_opts &);
static size_t blacklist(db::txn &txn, const event::id &, const write_opts &);
}
void
size_t
ircd::m::dbs::write(db::txn &txn,
const event &event,
const write_opts &opts)
@ -193,7 +194,14 @@ try
"Cannot write to database: no index specified for event."
};
_index(txn, event, opts);
size_t ret(0);
if(likely(opts.prefetch))
ret = _prefetch(txn, event, opts);
if(likely(opts.index))
ret = _index(txn, event, opts);
return ret;
}
catch(const std::exception &e)
{
@ -207,7 +215,7 @@ catch(const std::exception &e)
throw;
}
void
size_t
ircd::m::dbs::blacklist(db::txn &txn,
const event::id &event_id,
const write_opts &opts)
@ -233,6 +241,8 @@ ircd::m::dbs::blacklist(db::txn &txn,
zero_value
}
};
return true;
}
//
@ -241,20 +251,42 @@ ircd::m::dbs::blacklist(db::txn &txn,
namespace ircd::m::dbs
{
static size_t _prefetch_room_redact(db::txn &, const event &, const write_opts &);
static void _index_room_redact(db::txn &, const event &, const write_opts &);
static size_t _prefetch_room(db::txn &, const event &, const write_opts &);
static void _index_room(db::txn &, const event &, const write_opts &);
static size_t _prefetch_event(db::txn &, const event &, const write_opts &);
static void _index_event(db::txn &, const event &, const write_opts &);
}
void
size_t
ircd::m::dbs::_index(db::txn &txn,
const event &event,
const write_opts &opts)
{
size_t ret(0);
_index_event(txn, event, opts);
if(json::get<"room_id"_>(event))
_index_room(txn, event, opts);
return ret;
}
size_t
ircd::m::dbs::_prefetch(db::txn &txn,
const event &event,
const write_opts &opts)
{
size_t ret(0);
ret += _prefetch_event(txn, event, opts);
if(json::get<"room_id"_>(event))
ret += _prefetch_room(txn, event, opts);
return ret;
}
void
@ -287,6 +319,39 @@ ircd::m::dbs::_index_event(db::txn &txn,
_index_event_horizon_resolve(txn, event, opts);
}
size_t
ircd::m::dbs::_prefetch_event(db::txn &txn,
const event &event,
const write_opts &opts)
{
size_t ret(0);
if(opts.appendix.test(appendix::EVENT_ID))
;//ret += _prefetch_event_id(txn, event, opts);
if(opts.appendix.test(appendix::EVENT_COLS))
;//ret += _prefetch_event_cols(txn, event, opts);
if(opts.appendix.test(appendix::EVENT_JSON))
;//ret += _prefetch_event_json(txn, event, opts);
if(opts.appendix.test(appendix::EVENT_SENDER))
;//ret += _prefetch_event_sender(txn, event, opts);
if(opts.appendix.test(appendix::EVENT_TYPE))
;//ret += _prefetch_event_type(txn, event, opts);
if(opts.appendix.test(appendix::EVENT_STATE))
;//ret += _prefetch_event_state(txn, event, opts);
if(opts.appendix.test(appendix::EVENT_REFS) && opts.event_refs.any())
ret += _prefetch_event_refs(txn, event, opts);
if(opts.appendix.test(appendix::EVENT_HORIZON_RESOLVE) && opts.horizon_resolve.any())
ret += _prefetch_event_horizon_resolve(txn, event, opts);
return ret;
}
void
ircd::m::dbs::_index_room(db::txn &txn,
const event &event,
@ -322,6 +387,44 @@ ircd::m::dbs::_index_room(db::txn &txn,
_index_room_redact(txn, event, opts);
}
size_t
ircd::m::dbs::_prefetch_room(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(!empty(json::get<"room_id"_>(event)));
size_t ret(0);
if(opts.appendix.test(appendix::ROOM_EVENTS))
;//ret += _prefetch_room_events(txn, event, opts);
if(opts.appendix.test(appendix::ROOM_TYPE))
;//ret += _prefetch_room_type(txn, event, opts);
if(opts.appendix.test(appendix::ROOM_HEAD))
;//ret += _prefetch_room_head(txn, event, opts);
if(opts.appendix.test(appendix::ROOM_HEAD_RESOLVE))
;//ret += _prefetch_room_head_resolve(txn, event, opts);
if(defined(json::get<"state_key"_>(event)))
{
if(opts.appendix.test(appendix::ROOM_STATE))
;//ret += _prefetch_room_state(txn, event, opts);
if(opts.appendix.test(appendix::ROOM_STATE_SPACE))
;//ret += _prefetch_room_state_space(txn, event, opts);
if(opts.appendix.test(appendix::ROOM_JOINED) && at<"type"_>(event) == "m.room.member")
;//ret += _prefetch_room_joined(txn, event, opts);
}
if(opts.appendix.test(appendix::ROOM_REDACT) && json::get<"type"_>(event) == "m.room.redaction")
ret += _prefetch_room_redact(txn, event, opts);
return ret;
}
// NOTE: QUERY
void
ircd::m::dbs::_index_room_redact(db::txn &txn,
@ -389,6 +492,38 @@ ircd::m::dbs::_index_room_redact(db::txn &txn,
};
}
size_t
ircd::m::dbs::_prefetch_room_redact(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::ROOM_REDACT));
assert(json::get<"type"_>(event) == "m.room.redaction");
const auto &target_id
{
at<"redacts"_>(event)
};
// If the prefetch was launched we can't do anything more here.
if(prefetch_event_idx(target_id, opts))
return 1;
// If the result is cached we can peek at it for more prefetches.
const m::event::idx target_idx
{
find_event_idx(target_id, opts)
};
if(unlikely(!target_idx))
return 0;
size_t ret(0);
ret += m::prefetch(target_idx, "state_key");
ret += m::prefetch(target_idx, "type");
return ret;
}
// NOTE: QUERY
size_t
ircd::m::dbs::find_event_idx(const vector_view<event::idx> &idx,
@ -427,3 +562,21 @@ ircd::m::dbs::find_event_idx(const vector_view<event::idx> &idx,
return ret;
}
size_t
ircd::m::dbs::prefetch_event_idx(const vector_view<const event::id> &event_id,
const write_opts &wopts)
{
size_t ret(0);
for(size_t i(0); i < event_id.size(); ++i)
{
if(wopts.interpose)
if(wopts.interpose->has(db::op::SET, "_event_idx", event_id[i]))
continue;
if(wopts.allow_queries)
ret += m::prefetch(event_id[i], "_event_idx");
}
return ret;
}

View file

@ -183,15 +183,41 @@ ircd::m::dbs::_index_event_horizon_resolve(db::txn &txn,
num += event_idx[num] != 0;
}
for(size_t i(0); i < num; ++i)
m::prefetch(event_idx[i]);
for(size_t i(0); i < num; ++i)
_index_event_horizon_resolve_one(txn, event, opts, event_idx[i]);
}
while(it);
}
size_t
ircd::m::dbs::_prefetch_event_horizon_resolve(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_HORIZON_RESOLVE));
assert(opts.event_idx != 0);
assert(event.event_id);
char buf[EVENT_HORIZON_KEY_MAX_SIZE];
const string_view &key
{
event_horizon_key(buf, event.event_id)
};
size_t ret(0);
for(auto it(dbs::event_horizon.begin(key)); it; ++it)
{
const auto event_idx
{
std::get<0>(event_horizon_key(it->first))
};
ret += m::prefetch(event_idx);
}
return ret;
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_horizon_resolve_one(db::txn &txn,

View file

@ -10,12 +10,19 @@
namespace ircd::m::dbs
{
static size_t _prefetch_event_refs_m_room_redaction(db::txn &, const event &, const write_opts &);
static void _index_event_refs_m_room_redaction(db::txn &, const event &, const write_opts &); //query
static size_t _prefetch_event_refs_m_receipt_m_read(db::txn &, const event &, const write_opts &);
static void _index_event_refs_m_receipt_m_read(db::txn &, const event &, const write_opts &); //query
static size_t _prefetch_event_refs_m_relates_m_reply(db::txn &, const event &, const write_opts &);
static void _index_event_refs_m_relates_m_reply(db::txn &, const event &, const write_opts &); //query
static size_t _prefetch_event_refs_m_relates(db::txn &, const event &, const write_opts &);
static void _index_event_refs_m_relates(db::txn &, const event &, const write_opts &); //query
static size_t _prefetch_event_refs_state(db::txn &, const event &, const write_opts &);
static void _index_event_refs_state(db::txn &, const event &, const write_opts &); // query
static size_t _prefetch_event_refs_auth(db::txn &, const event &, const write_opts &);
static void _index_event_refs_auth(db::txn &, const event &, const write_opts &); //query
static size_t _prefetch_event_refs_prev(db::txn &, const event &, const write_opts &);
static void _index_event_refs_prev(db::txn &, const event &, const write_opts &); //query
static bool event_refs__cmp_less(const string_view &a, const string_view &b);
}
@ -204,6 +211,39 @@ ircd::m::dbs::_index_event_refs(db::txn &txn,
_index_event_refs_m_room_redaction(txn, event, opts);
}
size_t
ircd::m::dbs::_prefetch_event_refs(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
size_t ret(0);
if(opts.event_refs.test(uint(ref::NEXT)))
ret += _prefetch_event_refs_prev(txn, event, opts);
if(opts.event_refs.test(uint(ref::NEXT_AUTH)))
ret += _prefetch_event_refs_auth(txn, event, opts);
if(opts.event_refs.test(uint(ref::NEXT_STATE)) ||
opts.event_refs.test(uint(ref::PREV_STATE)))
ret += _prefetch_event_refs_state(txn, event, opts);
if(opts.event_refs.test(uint(ref::M_RECEIPT__M_READ)))
ret += _prefetch_event_refs_m_receipt_m_read(txn, event, opts);
if(opts.event_refs.test(uint(ref::M_RELATES)))
ret += _prefetch_event_refs_m_relates(txn, event, opts);
if(opts.event_refs.test(uint(ref::M_RELATES)))
ret += _prefetch_event_refs_m_relates_m_reply(txn, event, opts);
if(opts.event_refs.test(uint(ref::M_ROOM_REDACTION)))
ret += _prefetch_event_refs_m_room_redaction(txn, event, opts);
return ret;
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_refs_prev(db::txn &txn,
@ -271,6 +311,31 @@ ircd::m::dbs::_index_event_refs_prev(db::txn &txn,
}
}
size_t
ircd::m::dbs::_prefetch_event_refs_prev(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
assert(opts.event_refs.test(uint(ref::NEXT)));
const event::prev prev
{
event
};
const size_t count
{
std::min(prev.prev_events_count(), event::prev::MAX)
};
event::id prev_id[count];
for(size_t i(0); i < count; ++i)
prev_id[i] = prev.prev_event(i);
return prefetch_event_idx({prev_id, count}, opts);
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_refs_auth(db::txn &txn,
@ -338,6 +403,34 @@ ircd::m::dbs::_index_event_refs_auth(db::txn &txn,
}
}
size_t
ircd::m::dbs::_prefetch_event_refs_auth(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
assert(opts.event_refs.test(uint(ref::NEXT_AUTH)));
if(!m::room::auth::is_power_event(event))
return false;
const event::prev prev
{
event
};
const size_t count
{
std::min(prev.auth_events_count(), event::prev::MAX)
};
event::id auth_id[count];
for(size_t i(0); i < count; ++i)
auth_id[i] = prev.auth_event(i);
return prefetch_event_idx({auth_id, count}, opts);
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_refs_state(db::txn &txn,
@ -418,6 +511,33 @@ ircd::m::dbs::_index_event_refs_state(db::txn &txn,
}
}
size_t
ircd::m::dbs::_prefetch_event_refs_state(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
assert(opts.event_refs.test(uint(ref::NEXT_STATE)) ||
opts.event_refs.test(uint(ref::PREV_STATE)));
assert(json::get<"type"_>(event));
assert(json::get<"room_id"_>(event));
if(!json::get<"state_key"_>(event))
return false;
const m::room room
{
json::get<"room_id"_>(event)
};
const m::room::state state
{
room
};
return state.prefetch(json::get<"type"_>(event), json::get<"state_key"_>(event));
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_refs_m_receipt_m_read(db::txn &txn,
@ -440,6 +560,9 @@ ircd::m::dbs::_index_event_refs_m_receipt_m_read(db::txn &txn,
json::get<"content"_>(event).get("event_id")
};
if(!valid(m::id::EVENT, event_id))
return;
const event::idx &event_idx
{
find_event_idx(event_id, opts)
@ -479,6 +602,31 @@ ircd::m::dbs::_index_event_refs_m_receipt_m_read(db::txn &txn,
};
}
size_t
ircd::m::dbs::_prefetch_event_refs_m_receipt_m_read(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
assert(opts.event_refs.test(uint(ref::M_RECEIPT__M_READ)));
if(json::get<"type"_>(event) != "ircd.read")
return false;
if(!my_host(json::get<"origin"_>(event)))
return false;
const json::string &event_id
{
json::get<"content"_>(event).get("event_id")
};
if(!valid(m::id::EVENT, event_id))
return false;
return prefetch_event_idx(event_id, opts);
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_refs_m_relates(db::txn &txn,
@ -560,6 +708,36 @@ ircd::m::dbs::_index_event_refs_m_relates(db::txn &txn,
};
}
size_t
ircd::m::dbs::_prefetch_event_refs_m_relates(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
assert(opts.event_refs.test(uint(ref::M_RELATES)));
if(!json::get<"content"_>(event).has("m.relates_to"))
return false;
if(!json::type(json::get<"content"_>(event).get("m.relates_to"), json::OBJECT))
return false;
const json::object &m_relates_to
{
json::get<"content"_>(event).get("m.relates_to")
};
const json::string &event_id
{
m_relates_to.get("event_id")
};
if(!valid(m::id::EVENT, event_id))
return false;
return prefetch_event_idx(event_id, opts);
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_refs_m_relates_m_reply(db::txn &txn,
@ -658,6 +836,50 @@ ircd::m::dbs::_index_event_refs_m_relates_m_reply(db::txn &txn,
};
}
size_t
ircd::m::dbs::_prefetch_event_refs_m_relates_m_reply(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
assert(opts.event_refs.test(uint(ref::M_RELATES)));
if(json::get<"type"_>(event) != "m.room.message")
return false;
if(!json::get<"content"_>(event).has("m.relates_to"))
return false;
if(!json::type(json::get<"content"_>(event).get("m.relates_to"), json::OBJECT))
return false;
const json::object &m_relates_to
{
json::get<"content"_>(event).get("m.relates_to")
};
if(!m_relates_to.has("m.in_reply_to"))
return false;
if(!json::type(m_relates_to.get("m.in_reply_to"), json::OBJECT))
return false;
const json::object &m_in_reply_to
{
m_relates_to.get("m.in_reply_to")
};
const json::string &event_id
{
m_in_reply_to.get("event_id")
};
if(!valid(m::id::EVENT, event_id))
return false;
return prefetch_event_idx(event_id, opts);
}
// NOTE: QUERY
void
ircd::m::dbs::_index_event_refs_m_room_redaction(db::txn &txn,
@ -717,6 +939,28 @@ ircd::m::dbs::_index_event_refs_m_room_redaction(db::txn &txn,
};
}
size_t
ircd::m::dbs::_prefetch_event_refs_m_room_redaction(db::txn &txn,
const event &event,
const write_opts &opts)
{
assert(opts.appendix.test(appendix::EVENT_REFS));
assert(opts.event_refs.test(uint(ref::M_ROOM_REDACTION)));
if(json::get<"type"_>(event) != "m.room.redaction")
return false;
const auto &event_id
{
json::get<"redacts"_>(event)
};
if(!valid(m::id::EVENT, event_id))
return false;
return prefetch_event_idx(event_id, opts);
}
//
// cmp
//