diff --git a/include/ircd/m/dbs/appendix.h b/include/ircd/m/dbs/appendix.h index 266c717c0..0c74fac20 100644 --- a/include/ircd/m/dbs/appendix.h +++ b/include/ircd/m/dbs/appendix.h @@ -47,6 +47,13 @@ enum ircd::m::dbs::appendix::index /// types are involved. EVENT_REFS, + /// Involves the event_horizon column which saves the event_id of any + /// unresolved event_refs at the time of the transaction. This is important + /// for out-of-order writes to the database. When the unresolved prev_event + /// is encountered later and finds its event_id in event_horizon it can + /// properly complete the event_refs graph to all the referencing events. + EVENT_HORIZON, + /// Involves the event_sender column (reverse index on the event sender). EVENT_SENDER, @@ -101,6 +108,7 @@ namespace ircd::m::dbs string_view _index_room(db::txn &, const event &, const write_opts &); void _index_event_type(db::txn &, const event &, const write_opts &); void _index_event_sender(db::txn &, const event &, const write_opts &); + void _index_event_horizon(db::txn &, const event &, const write_opts &, const id::event &); void _index_event_refs_m_room_redaction(db::txn &, const event &, const write_opts &); void _index_event_refs_m_receipt_m_read(db::txn &, const event &, const write_opts &); void _index_event_refs_m_relates_m_reply(db::txn &, const event &, const write_opts &); diff --git a/include/ircd/m/dbs/dbs.h b/include/ircd/m/dbs/dbs.h index 98513e287..b9f55c2b8 100644 --- a/include/ircd/m/dbs/dbs.h +++ b/include/ircd/m/dbs/dbs.h @@ -36,6 +36,7 @@ namespace ircd::m::dbs extern db::column event_idx; // event_id => event_idx extern db::column event_json; // event_idx => full json extern db::index event_refs; // event_idx | ref_type, event_idx + extern db::index event_horizon; // event_id | event_idx extern db::index event_type; // type | event_idx extern db::index event_sender; // host | local, event_idx extern db::index room_head; // room_id | event_id => event_idx diff --git a/include/ircd/m/dbs/desc.h b/include/ircd/m/dbs/desc.h index 6308b48ff..d95330063 100644 --- a/include/ircd/m/dbs/desc.h +++ b/include/ircd/m/dbs/desc.h @@ -101,6 +101,14 @@ namespace ircd::m::dbs::desc extern const db::comparator events__event_refs__cmp; extern const db::descriptor events__event_refs; + // event horizon + extern conf::item events__event_horizon__block__size; + extern conf::item events__event_horizon__meta_block__size; + extern conf::item events__event_horizon__cache__size; + extern conf::item events__event_horizon__cache_comp__size; + extern const db::prefix_transform events__event_horizon__pfx; + extern const db::descriptor events__event_horizon; + // events sender extern conf::item events__event_sender__block__size; extern conf::item events__event_sender__meta_block__size; diff --git a/include/ircd/m/dbs/util.h b/include/ircd/m/dbs/util.h index 618500f01..4e080495b 100644 --- a/include/ircd/m/dbs/util.h +++ b/include/ircd/m/dbs/util.h @@ -20,6 +20,10 @@ namespace ircd::m::dbs std::tuple event_refs_key(const string_view &amalgam); string_view reflect(const ref &); + constexpr size_t EVENT_HORIZON_KEY_MAX_SIZE {id::MAX_SIZE + 1 + 8}; + string_view event_horizon_key(const mutable_buffer &out, const id::event &, const event::idx &); + std::tuple event_horizon_key(const string_view &amalgam); + constexpr size_t EVENT_SENDER_KEY_MAX_SIZE {id::MAX_SIZE + 1 + 8}; string_view event_sender_key(const mutable_buffer &out, const string_view &origin, const string_view &localpart = {}, const event::idx & = 0); string_view event_sender_key(const mutable_buffer &out, const id::user &, const event::idx &); diff --git a/ircd/m_dbs.cc b/ircd/m_dbs.cc index dd82ad802..7117ea4b0 100644 --- a/ircd/m_dbs.cc +++ b/ircd/m_dbs.cc @@ -36,6 +36,11 @@ decltype(ircd::m::dbs::event_refs) ircd::m::dbs::event_refs {}; +/// Linkage for a reference to the event_horizon column. +decltype(ircd::m::dbs::event_horizon) +ircd::m::dbs::event_horizon +{}; + /// Linkage for a reference to the event_sender column. decltype(ircd::m::dbs::event_sender) ircd::m::dbs::event_sender @@ -150,6 +155,7 @@ ircd::m::dbs::init::init(std::string dbopts) event_idx = db::column{*events, desc::events__event_idx.name}; event_json = db::column{*events, desc::events__event_json.name}; event_refs = db::index{*events, desc::events__event_refs.name}; + event_horizon = db::index{*events, desc::events__event_horizon.name}; event_sender = db::index{*events, desc::events__event_sender.name}; event_type = db::index{*events, desc::events__event_type.name}; room_head = db::index{*events, desc::events__room_head.name}; @@ -432,9 +438,14 @@ ircd::m::dbs::_index_event_refs_prev(db::txn &txn, m::index(prev_id, std::nothrow) // query }; - if(!prev_idx) + if(opts.appendix.test(appendix::EVENT_HORIZON) && !prev_idx) { - log::warning + _index_event_horizon(txn, event, opts, prev_id); + continue; + } + else if(!prev_idx) + { + log::derror { log, "No index found to ref %s PREV of %s", string_view{prev_id}, @@ -487,6 +498,9 @@ ircd::m::dbs::_index_event_refs_auth(db::txn &txn, if(unlikely(!auth_idx)) { + if(opts.appendix.test(appendix::EVENT_HORIZON)) + _index_event_horizon(txn, event, opts, auth_id); + log::error { log, "No index found to ref %s AUTH of %s", @@ -548,7 +562,10 @@ ircd::m::dbs::_index_event_refs_state(db::txn &txn, state.get(std::nothrow, at<"type"_>(event), at<"state_key"_>(event)) // query }; - if(!prev_state_idx || prev_state_idx == opts.event_idx) + if(!prev_state_idx) + return; + + if(prev_state_idx >= opts.event_idx) return; thread_local char buf[EVENT_REFS_KEY_MAX_SIZE]; @@ -614,7 +631,12 @@ ircd::m::dbs::_index_event_refs_m_receipt_m_read(db::txn &txn, m::index(event_id, std::nothrow) // query }; - if(!event_idx) + if(opts.appendix.test(appendix::EVENT_HORIZON) && !event_idx) + { + _index_event_horizon(txn, event, opts, event_id); + return; + } + else if(!event_idx) { log::derror { @@ -622,6 +644,7 @@ ircd::m::dbs::_index_event_refs_m_receipt_m_read(db::txn &txn, string_view{event_id}, json::get<"event_id"_>(event) }; + return; } @@ -705,9 +728,14 @@ ircd::m::dbs::_index_event_refs_m_relates_m_reply(db::txn &txn, m::index(event_id, std::nothrow) // query }; - if(!event_idx) + if(opts.appendix.test(appendix::EVENT_HORIZON) && !event_idx) { - log::dwarning + _index_event_horizon(txn, event, opts, event_id); + return; + } + else if(!event_idx) + { + log::derror { log, "Cannot index m.in_reply_to in %s; referenced %s not found.", json::get<"event_id"_>(event), @@ -758,8 +786,22 @@ ircd::m::dbs::_index_event_refs_m_room_redaction(db::txn &txn, m::index(event_id, std::nothrow) // query }; - if(!event_idx) + if(opts.appendix.test(appendix::EVENT_HORIZON) && !event_idx) + { + _index_event_horizon(txn, event, opts, event_id); return; + } + else if(!event_idx) + { + log::derror + { + log, "Cannot index m.room.redaction in %s; referenced %s not found.", + json::get<"event_id"_>(event), + string_view{event_id} + }; + + return; + } thread_local char buf[EVENT_REFS_KEY_MAX_SIZE]; assert(opts.event_idx != 0 && event_idx != 0); @@ -778,6 +820,28 @@ ircd::m::dbs::_index_event_refs_m_room_redaction(db::txn &txn, }; } +void +ircd::m::dbs::_index_event_horizon(db::txn &txn, + const event &event, + const write_opts &opts, + const m::event::id &unresolved_id) +{ + thread_local char buf[EVENT_HORIZON_KEY_MAX_SIZE]; + assert(opts.event_idx != 0 && unresolved_id); + const string_view &key + { + event_horizon_key(buf, unresolved_id, opts.event_idx) + }; + + db::txn::append + { + txn, dbs::event_horizon, + { + opts.op, key, string_view{} + } + }; +} + void ircd::m::dbs::_index_event_sender(db::txn &txn, const event &event, @@ -1708,6 +1772,152 @@ ircd::m::dbs::desc::events__event_refs size_t(events__event_refs__meta_block__size), }; +// +// event_horizon +// + +decltype(ircd::m::dbs::desc::events__event_horizon__block__size) +ircd::m::dbs::desc::events__event_horizon__block__size +{ + { "name", "ircd.m.dbs.events._event_horizon.block.size" }, + { "default", 512L }, +}; + +decltype(ircd::m::dbs::desc::events__event_horizon__meta_block__size) +ircd::m::dbs::desc::events__event_horizon__meta_block__size +{ + { "name", "ircd.m.dbs.events._event_horizon.meta_block.size" }, + { "default", 1024L }, +}; + +decltype(ircd::m::dbs::desc::events__event_horizon__cache__size) +ircd::m::dbs::desc::events__event_horizon__cache__size +{ + { + { "name", "ircd.m.dbs.events._event_horizon.cache.size" }, + { "default", long(16_MiB) }, + }, [] + { + const size_t &value{events__event_horizon__cache__size}; + db::capacity(db::cache(event_horizon), value); + } +}; + +decltype(ircd::m::dbs::desc::events__event_horizon__cache_comp__size) +ircd::m::dbs::desc::events__event_horizon__cache_comp__size +{ + { + { "name", "ircd.m.dbs.events._event_horizon.cache_comp.size" }, + { "default", long(0_MiB) }, + }, [] + { + const size_t &value{events__event_horizon__cache_comp__size}; + db::capacity(db::cache_compressed(event_horizon), value); + } +}; + +ircd::string_view +ircd::m::dbs::event_horizon_key(const mutable_buffer &out, + const event::id &event_id, + const event::idx &event_idx) +{ + mutable_buffer buf(out); + consume(buf, copy(buf, event_id)); + consume(buf, copy(buf, "\0"_sv)); + consume(buf, copy(buf, byte_view(event_idx))); + return + { + data(out), data(buf) + }; +} + +std::tuple +ircd::m::dbs::event_horizon_key(const string_view &amalgam) +{ + return + { + byte_view(lstrip(amalgam, "\0"_sv)) + }; +} + +const ircd::db::prefix_transform +ircd::m::dbs::desc::events__event_horizon__pfx +{ + "_event_horizon", + [](const string_view &key) + { + return has(key, "\0"_sv); + }, + + [](const string_view &key) + { + assert(size(key) >= sizeof(event::idx)); + return split(key, "\0"_sv).first; + } +}; + +const ircd::db::descriptor +ircd::m::dbs::desc::events__event_horizon +{ + // name + "_event_horizon", + + // explanation + R"(Unresolved references in the reverse reference graph of events. + + event_id | event_idx => -- + + The first part of the key is an event_id which the server does not have. + The suffix of the key is the index number of an event which the server + does have and it contains a reference to event_id. + + We use the information in this column to find all of the events which + have an unresolved reference to this event and complete the holes in the + event_refs graph which could not be completed without this event. + + When a new event is written to the database the event_horizon column is + queried seeking the event's ID. Each entry in event_horizon is the index + of an event which we previously wrote to the database without knowing the + index of the event currently being written (an out-of-order write). + + )", + + // typing (key, value) + { + typeid(string_view), typeid(string_view) + }, + + // options + {}, + + // comparator + {}, + + // prefix transform + events__event_horizon__pfx, + + // drop column + false, + + // cache size + bool(events_cache_enable)? -1 : 0, //uses conf item + + // cache size for compressed assets + bool(events_cache_comp_enable)? -1 : 0, + + // bloom filter bits + 0, + + // expect queries hit + false, + + // block size + size_t(events__event_horizon__block__size), + + // meta_block size + size_t(events__event_horizon__meta_block__size), +}; + // // event_sender // @@ -4220,6 +4430,10 @@ ircd::m::dbs::desc::events // Reverse mapping of the event reference graph. events__event_refs, + // event_idx | event_idx + // Mapping of unresolved event refs. + events__event_horizon, + // origin | sender, event_idx // Mapping of senders to event_idx's they are the sender of. events__event_sender,