0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-09-28 11:48:54 +02:00

ircd:Ⓜ️:dbs: Add the event_horizon column.

This commit is contained in:
Jason Volk 2019-05-09 00:02:33 -07:00
parent a8a6e1039d
commit ed3a7343cf
5 changed files with 242 additions and 7 deletions

View file

@ -47,6 +47,13 @@ enum ircd::m::dbs::appendix::index
/// types are involved.
EVENT_REFS,
/// Involves the event_horizon column which saves the event_id of any
/// unresolved event_refs at the time of the transaction. This is important
/// for out-of-order writes to the database. When the unresolved prev_event
/// is encountered later and finds its event_id in event_horizon it can
/// properly complete the event_refs graph to all the referencing events.
EVENT_HORIZON,
/// Involves the event_sender column (reverse index on the event sender).
EVENT_SENDER,
@ -101,6 +108,7 @@ namespace ircd::m::dbs
string_view _index_room(db::txn &, const event &, const write_opts &);
void _index_event_type(db::txn &, const event &, const write_opts &);
void _index_event_sender(db::txn &, const event &, const write_opts &);
void _index_event_horizon(db::txn &, const event &, const write_opts &, const id::event &);
void _index_event_refs_m_room_redaction(db::txn &, const event &, const write_opts &);
void _index_event_refs_m_receipt_m_read(db::txn &, const event &, const write_opts &);
void _index_event_refs_m_relates_m_reply(db::txn &, const event &, const write_opts &);

View file

@ -36,6 +36,7 @@ namespace ircd::m::dbs
extern db::column event_idx; // event_id => event_idx
extern db::column event_json; // event_idx => full json
extern db::index event_refs; // event_idx | ref_type, event_idx
extern db::index event_horizon; // event_id | event_idx
extern db::index event_type; // type | event_idx
extern db::index event_sender; // host | local, event_idx
extern db::index room_head; // room_id | event_id => event_idx

View file

@ -101,6 +101,14 @@ namespace ircd::m::dbs::desc
extern const db::comparator events__event_refs__cmp;
extern const db::descriptor events__event_refs;
// event horizon
extern conf::item<size_t> events__event_horizon__block__size;
extern conf::item<size_t> events__event_horizon__meta_block__size;
extern conf::item<size_t> events__event_horizon__cache__size;
extern conf::item<size_t> events__event_horizon__cache_comp__size;
extern const db::prefix_transform events__event_horizon__pfx;
extern const db::descriptor events__event_horizon;
// events sender
extern conf::item<size_t> events__event_sender__block__size;
extern conf::item<size_t> events__event_sender__meta_block__size;

View file

@ -20,6 +20,10 @@ namespace ircd::m::dbs
std::tuple<ref, event::idx> event_refs_key(const string_view &amalgam);
string_view reflect(const ref &);
constexpr size_t EVENT_HORIZON_KEY_MAX_SIZE {id::MAX_SIZE + 1 + 8};
string_view event_horizon_key(const mutable_buffer &out, const id::event &, const event::idx &);
std::tuple<event::idx> event_horizon_key(const string_view &amalgam);
constexpr size_t EVENT_SENDER_KEY_MAX_SIZE {id::MAX_SIZE + 1 + 8};
string_view event_sender_key(const mutable_buffer &out, const string_view &origin, const string_view &localpart = {}, const event::idx & = 0);
string_view event_sender_key(const mutable_buffer &out, const id::user &, const event::idx &);

View file

@ -36,6 +36,11 @@ decltype(ircd::m::dbs::event_refs)
ircd::m::dbs::event_refs
{};
/// Linkage for a reference to the event_horizon column.
decltype(ircd::m::dbs::event_horizon)
ircd::m::dbs::event_horizon
{};
/// Linkage for a reference to the event_sender column.
decltype(ircd::m::dbs::event_sender)
ircd::m::dbs::event_sender
@ -150,6 +155,7 @@ ircd::m::dbs::init::init(std::string dbopts)
event_idx = db::column{*events, desc::events__event_idx.name};
event_json = db::column{*events, desc::events__event_json.name};
event_refs = db::index{*events, desc::events__event_refs.name};
event_horizon = db::index{*events, desc::events__event_horizon.name};
event_sender = db::index{*events, desc::events__event_sender.name};
event_type = db::index{*events, desc::events__event_type.name};
room_head = db::index{*events, desc::events__room_head.name};
@ -432,9 +438,14 @@ ircd::m::dbs::_index_event_refs_prev(db::txn &txn,
m::index(prev_id, std::nothrow) // query
};
if(!prev_idx)
if(opts.appendix.test(appendix::EVENT_HORIZON) && !prev_idx)
{
log::warning
_index_event_horizon(txn, event, opts, prev_id);
continue;
}
else if(!prev_idx)
{
log::derror
{
log, "No index found to ref %s PREV of %s",
string_view{prev_id},
@ -487,6 +498,9 @@ ircd::m::dbs::_index_event_refs_auth(db::txn &txn,
if(unlikely(!auth_idx))
{
if(opts.appendix.test(appendix::EVENT_HORIZON))
_index_event_horizon(txn, event, opts, auth_id);
log::error
{
log, "No index found to ref %s AUTH of %s",
@ -548,7 +562,10 @@ ircd::m::dbs::_index_event_refs_state(db::txn &txn,
state.get(std::nothrow, at<"type"_>(event), at<"state_key"_>(event)) // query
};
if(!prev_state_idx || prev_state_idx == opts.event_idx)
if(!prev_state_idx)
return;
if(prev_state_idx >= opts.event_idx)
return;
thread_local char buf[EVENT_REFS_KEY_MAX_SIZE];
@ -614,7 +631,12 @@ ircd::m::dbs::_index_event_refs_m_receipt_m_read(db::txn &txn,
m::index(event_id, std::nothrow) // query
};
if(!event_idx)
if(opts.appendix.test(appendix::EVENT_HORIZON) && !event_idx)
{
_index_event_horizon(txn, event, opts, event_id);
return;
}
else if(!event_idx)
{
log::derror
{
@ -622,6 +644,7 @@ ircd::m::dbs::_index_event_refs_m_receipt_m_read(db::txn &txn,
string_view{event_id},
json::get<"event_id"_>(event)
};
return;
}
@ -705,9 +728,14 @@ ircd::m::dbs::_index_event_refs_m_relates_m_reply(db::txn &txn,
m::index(event_id, std::nothrow) // query
};
if(!event_idx)
if(opts.appendix.test(appendix::EVENT_HORIZON) && !event_idx)
{
log::dwarning
_index_event_horizon(txn, event, opts, event_id);
return;
}
else if(!event_idx)
{
log::derror
{
log, "Cannot index m.in_reply_to in %s; referenced %s not found.",
json::get<"event_id"_>(event),
@ -758,8 +786,22 @@ ircd::m::dbs::_index_event_refs_m_room_redaction(db::txn &txn,
m::index(event_id, std::nothrow) // query
};
if(!event_idx)
if(opts.appendix.test(appendix::EVENT_HORIZON) && !event_idx)
{
_index_event_horizon(txn, event, opts, event_id);
return;
}
else if(!event_idx)
{
log::derror
{
log, "Cannot index m.room.redaction in %s; referenced %s not found.",
json::get<"event_id"_>(event),
string_view{event_id}
};
return;
}
thread_local char buf[EVENT_REFS_KEY_MAX_SIZE];
assert(opts.event_idx != 0 && event_idx != 0);
@ -778,6 +820,28 @@ ircd::m::dbs::_index_event_refs_m_room_redaction(db::txn &txn,
};
}
void
ircd::m::dbs::_index_event_horizon(db::txn &txn,
const event &event,
const write_opts &opts,
const m::event::id &unresolved_id)
{
thread_local char buf[EVENT_HORIZON_KEY_MAX_SIZE];
assert(opts.event_idx != 0 && unresolved_id);
const string_view &key
{
event_horizon_key(buf, unresolved_id, opts.event_idx)
};
db::txn::append
{
txn, dbs::event_horizon,
{
opts.op, key, string_view{}
}
};
}
void
ircd::m::dbs::_index_event_sender(db::txn &txn,
const event &event,
@ -1708,6 +1772,152 @@ ircd::m::dbs::desc::events__event_refs
size_t(events__event_refs__meta_block__size),
};
//
// event_horizon
//
decltype(ircd::m::dbs::desc::events__event_horizon__block__size)
ircd::m::dbs::desc::events__event_horizon__block__size
{
{ "name", "ircd.m.dbs.events._event_horizon.block.size" },
{ "default", 512L },
};
decltype(ircd::m::dbs::desc::events__event_horizon__meta_block__size)
ircd::m::dbs::desc::events__event_horizon__meta_block__size
{
{ "name", "ircd.m.dbs.events._event_horizon.meta_block.size" },
{ "default", 1024L },
};
decltype(ircd::m::dbs::desc::events__event_horizon__cache__size)
ircd::m::dbs::desc::events__event_horizon__cache__size
{
{
{ "name", "ircd.m.dbs.events._event_horizon.cache.size" },
{ "default", long(16_MiB) },
}, []
{
const size_t &value{events__event_horizon__cache__size};
db::capacity(db::cache(event_horizon), value);
}
};
decltype(ircd::m::dbs::desc::events__event_horizon__cache_comp__size)
ircd::m::dbs::desc::events__event_horizon__cache_comp__size
{
{
{ "name", "ircd.m.dbs.events._event_horizon.cache_comp.size" },
{ "default", long(0_MiB) },
}, []
{
const size_t &value{events__event_horizon__cache_comp__size};
db::capacity(db::cache_compressed(event_horizon), value);
}
};
ircd::string_view
ircd::m::dbs::event_horizon_key(const mutable_buffer &out,
const event::id &event_id,
const event::idx &event_idx)
{
mutable_buffer buf(out);
consume(buf, copy(buf, event_id));
consume(buf, copy(buf, "\0"_sv));
consume(buf, copy(buf, byte_view<string_view>(event_idx)));
return
{
data(out), data(buf)
};
}
std::tuple<ircd::m::event::idx>
ircd::m::dbs::event_horizon_key(const string_view &amalgam)
{
return
{
byte_view<event::idx>(lstrip(amalgam, "\0"_sv))
};
}
const ircd::db::prefix_transform
ircd::m::dbs::desc::events__event_horizon__pfx
{
"_event_horizon",
[](const string_view &key)
{
return has(key, "\0"_sv);
},
[](const string_view &key)
{
assert(size(key) >= sizeof(event::idx));
return split(key, "\0"_sv).first;
}
};
const ircd::db::descriptor
ircd::m::dbs::desc::events__event_horizon
{
// name
"_event_horizon",
// explanation
R"(Unresolved references in the reverse reference graph of events.
event_id | event_idx => --
The first part of the key is an event_id which the server does not have.
The suffix of the key is the index number of an event which the server
does have and it contains a reference to event_id.
We use the information in this column to find all of the events which
have an unresolved reference to this event and complete the holes in the
event_refs graph which could not be completed without this event.
When a new event is written to the database the event_horizon column is
queried seeking the event's ID. Each entry in event_horizon is the index
of an event which we previously wrote to the database without knowing the
index of the event currently being written (an out-of-order write).
)",
// typing (key, value)
{
typeid(string_view), typeid(string_view)
},
// options
{},
// comparator
{},
// prefix transform
events__event_horizon__pfx,
// drop column
false,
// cache size
bool(events_cache_enable)? -1 : 0, //uses conf item
// cache size for compressed assets
bool(events_cache_comp_enable)? -1 : 0,
// bloom filter bits
0,
// expect queries hit
false,
// block size
size_t(events__event_horizon__block__size),
// meta_block size
size_t(events__event_horizon__meta_block__size),
};
//
// event_sender
//
@ -4220,6 +4430,10 @@ ircd::m::dbs::desc::events
// Reverse mapping of the event reference graph.
events__event_refs,
// event_idx | event_idx
// Mapping of unresolved event refs.
events__event_horizon,
// origin | sender, event_idx
// Mapping of senders to event_idx's they are the sender of.
events__event_sender,