// The Construct // // Copyright (C) The Construct Developers, Authors & Contributors // Copyright (C) 2016-2020 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. /// Residence of the events database instance pointer. decltype(ircd::m::dbs::events) ircd::m::dbs::events; /// Coarse variable for enabling the uncompressed cache on the events database; /// note this conf item is only effective by setting an environmental variable /// before daemon startup. It has no effect in any other regard. decltype(ircd::m::dbs::cache_enable) ircd::m::dbs::cache_enable { { "name", "ircd.m.dbs.cache.enable" }, { "default", true }, }; /// Coarse variable for enabling the compressed cache on the events database; /// note this conf item is only effective by setting an environmental variable /// before daemon startup. It has no effect in any other regard. decltype(ircd::m::dbs::cache_comp_enable) ircd::m::dbs::cache_comp_enable { { "name", "ircd.m.dbs.cache.comp.enable" }, { "default", false }, }; /// Coarse toggle for the prefetch phase before the transaction building /// handlers (indexers) are called. If this is false, prefetching will be /// disabled; otherwise the opts passed to write() control whether /// prefetching is enabled. decltype(ircd::m::dbs::prefetch_enable) ircd::m::dbs::prefetch_enable { { "name", "ircd.m.dbs.prefetch.enable" }, { "default", true }, }; /// The size of the memory buffer for new writes to the DB (backed by the WAL /// on disk). When this buffer is full it is flushed to sorted SST files on /// disk. If this is 0, a per-column value can be used; otherwise this value /// takes precedence as a total value for all columns. (db_write_buffer_size) decltype(ircd::m::dbs::mem_write_buffer_size) ircd::m::dbs::mem_write_buffer_size { { "name", "ircd.m.dbs.mem.write_buffer_size" }, { "default", 0L }, }; /// Value determines the size of writes when creating SST files (i.e during /// compaction). Consider that write calls are yield-points for IRCd and the /// time spent filling the write buffer between calls may hog the CPU doing /// compression during that time etc. (writable_file_max_buffer_size) decltype(ircd::m::dbs::sst_write_buffer_size) ircd::m::dbs::sst_write_buffer_size { { { "name", "ircd.m.dbs.sst.write_buffer_size" }, { "default", long(1_MiB) }, }, [](conf::item &) { static const string_view key{"writable_file_max_buffer_size"}; const size_t &value{sst_write_buffer_size}; if(events && !events->slave) db::setopt(*events, key, lex_cast(value)); } }; // // opts // decltype(ircd::m::dbs::opts::event_refs_all) ircd::m::dbs::opts::event_refs_all{[] { char full[event_refs_all.size()]; memset(full, '1', sizeof(full)); return decltype(event_refs_all) { full, sizeof(full) }; }()}; decltype(ircd::m::dbs::opts::appendix_all) ircd::m::dbs::opts::appendix_all{[] { char full[appendix_all.size()]; memset(full, '1', sizeof(full)); return decltype(appendix_all) { full, sizeof(full) }; }()}; // // Basic write suite // namespace ircd::m::dbs { static size_t _prefetch(const event &, const opts &); static size_t _index(db::txn &, const event &, const opts &); static size_t blacklist(db::txn &txn, const event::id &, const opts &); } size_t ircd::m::dbs::write(db::txn &txn, const event &event, const opts &opts) try { if(opts.event_idx == 0 && opts.blacklist) return blacklist(txn, event.event_id, opts); if(unlikely(opts.event_idx == 0)) throw panic { "Cannot write to database: no index specified for event." }; return _index(txn, event, opts); } catch(const std::exception &e) { log::error { log, "Event %s txn building error :%s", string_view{event.event_id}, e.what() }; throw; } size_t ircd::m::dbs::prefetch(const event &event, const opts &opts) try { if(!prefetch_enable) return false; return _prefetch(event, opts); } catch(const std::exception &e) { log::error { log, "Event %s txn prefetching error :%s", string_view{event.event_id}, e.what() }; return false; } size_t ircd::m::dbs::blacklist(db::txn &txn, const event::id &event_id, const opts &opts) { // An entry in the event_idx column with a value 0 is blacklisting // because 0 is not a valid event_idx. Thus a value here can only // have the value zero. assert(opts.event_idx == 0); assert(!event_id.empty()); static const m::event::idx &zero_idx{0UL}; static const byte_view zero_value { zero_idx }; db::txn::append { txn, event_idx, { opts.op, string_view{event_id}, zero_value } }; return true; } // // Internal interface // namespace ircd::m::dbs { static size_t _prefetch_room_redact(const event &, const opts &); static void _index_room_redact(db::txn &, const event &, const opts &); static size_t _prefetch_room(const event &, const opts &); static void _index_room(db::txn &, const event &, const opts &); static size_t _prefetch_event(const event &, const opts &); static void _index_event(db::txn &, const event &, const opts &); } size_t ircd::m::dbs::_index(db::txn &txn, const event &event, const opts &opts) { size_t ret(0); _index_event(txn, event, opts); if(json::get<"room_id"_>(event)) _index_room(txn, event, opts); return ret; } size_t ircd::m::dbs::_prefetch(const event &event, const opts &opts) { size_t ret(0); ret += _prefetch_event(event, opts); if(json::get<"room_id"_>(event)) ret += _prefetch_room(event, opts); return ret; } void ircd::m::dbs::_index_event(db::txn &txn, const event &event, const opts &opts) { if(opts.appendix.test(appendix::EVENT_ID)) _index_event_id(txn, event, opts); if(opts.appendix.test(appendix::EVENT_COLS)) _index_event_cols(txn, event, opts); if(opts.appendix.test(appendix::EVENT_JSON)) _index_event_json(txn, event, opts); if(opts.appendix.test(appendix::EVENT_SENDER)) _index_event_sender(txn, event, opts); if(opts.appendix.test(appendix::EVENT_TYPE)) _index_event_type(txn, event, opts); if(opts.appendix.test(appendix::EVENT_STATE)) _index_event_state(txn, event, opts); if(opts.appendix.test(appendix::EVENT_REFS) && opts.event_refs.any()) _index_event_refs(txn, event, opts); if(opts.appendix.test(appendix::EVENT_HORIZON_RESOLVE) && opts.horizon_resolve.any()) _index_event_horizon_resolve(txn, event, opts); } size_t ircd::m::dbs::_prefetch_event(const event &event, const opts &opts) { size_t ret(0); if(opts.appendix.test(appendix::EVENT_ID)) ;//ret += _prefetch_event_id(txn, event, opts); if(opts.appendix.test(appendix::EVENT_COLS)) ;//ret += _prefetch_event_cols(txn, event, opts); if(opts.appendix.test(appendix::EVENT_JSON)) ;//ret += _prefetch_event_json(txn, event, opts); if(opts.appendix.test(appendix::EVENT_SENDER)) ;//ret += _prefetch_event_sender(txn, event, opts); if(opts.appendix.test(appendix::EVENT_TYPE)) ;//ret += _prefetch_event_type(txn, event, opts); if(opts.appendix.test(appendix::EVENT_STATE)) ;//ret += _prefetch_event_state(txn, event, opts); if(opts.appendix.test(appendix::EVENT_REFS) && opts.event_refs.any()) ret += _prefetch_event_refs(event, opts); if(opts.appendix.test(appendix::EVENT_HORIZON_RESOLVE) && opts.horizon_resolve.any()) ret += _prefetch_event_horizon_resolve(event, opts); return ret; } void ircd::m::dbs::_index_room(db::txn &txn, const event &event, const opts &opts) { assert(!empty(json::get<"room_id"_>(event))); if(opts.appendix.test(appendix::ROOM_EVENTS)) _index_room_events(txn, event, opts); if(opts.appendix.test(appendix::ROOM_TYPE)) _index_room_type(txn, event, opts); if(opts.appendix.test(appendix::ROOM_HEAD)) _index_room_head(txn, event, opts); if(opts.appendix.test(appendix::ROOM_HEAD_RESOLVE)) _index_room_head_resolve(txn, event, opts); if(opts.appendix.test(appendix::ROOM_STATE)) _index_room_state(txn, event, opts); if(opts.appendix.test(appendix::ROOM_STATE_SPACE)) _index_room_state_space(txn, event, opts); if(opts.appendix.test(appendix::ROOM_JOINED) && at<"type"_>(event) == "m.room.member") _index_room_joined(txn, event, opts); if(opts.appendix.test(appendix::ROOM_REDACT) && json::get<"type"_>(event) == "m.room.redaction") _index_room_redact(txn, event, opts); } size_t ircd::m::dbs::_prefetch_room(const event &event, const opts &opts) { assert(!empty(json::get<"room_id"_>(event))); size_t ret(0); if(opts.appendix.test(appendix::ROOM_EVENTS)) ;//ret += _prefetch_room_events(event, opts); if(opts.appendix.test(appendix::ROOM_TYPE)) ;//ret += _prefetch_room_type(event, opts); if(opts.appendix.test(appendix::ROOM_HEAD)) ;//ret += _prefetch_room_head(event, opts); if(opts.appendix.test(appendix::ROOM_HEAD_RESOLVE)) ;//ret += _prefetch_room_head_resolve(event, opts); if(opts.appendix.test(appendix::ROOM_STATE)) ;//ret += _prefetch_room_state(event, opts); if(opts.appendix.test(appendix::ROOM_STATE_SPACE)) ;//ret += _prefetch_room_state_space(event, opts); if(opts.appendix.test(appendix::ROOM_JOINED) && at<"type"_>(event) == "m.room.member") ;//ret += _prefetch_room_joined(event, opts); if(opts.appendix.test(appendix::ROOM_REDACT) && json::get<"type"_>(event) == "m.room.redaction") ret += _prefetch_room_redact(event, opts); return ret; } // NOTE: QUERY void ircd::m::dbs::_index_room_redact(db::txn &txn, const event &event, const opts &opts) { assert(opts.appendix.test(appendix::ROOM_REDACT)); assert(json::get<"type"_>(event) == "m.room.redaction"); const auto &target_id { at<"redacts"_>(event) }; const m::event::idx target_idx { find_event_idx(target_id, opts) }; if(unlikely(!target_idx)) { log::dwarning { "Redaction from '%s' missing redaction target '%s'", string_view{event.event_id}, target_id }; if(opts.appendix.test(appendix::EVENT_HORIZON)) _index_event_horizon(txn, event, opts, target_id); return; } char state_key_buf[event::STATE_KEY_MAX_SIZE]; const string_view &state_key { m::get(std::nothrow, target_idx, "state_key", state_key_buf) }; if(!state_key) return; char type_buf[event::TYPE_MAX_SIZE]; const string_view &type { m::get(std::nothrow, target_idx, "type", type_buf) }; assert(!empty(type)); const ctx::critical_assertion ca; thread_local char buf[ROOM_STATE_SPACE_KEY_MAX_SIZE]; const string_view &key { room_state_key(buf, at<"room_id"_>(event), type, state_key) }; db::txn::append { txn, room_state, { db::op::DELETE, key, } }; } size_t ircd::m::dbs::_prefetch_room_redact(const event &event, const opts &opts) { assert(opts.appendix.test(appendix::ROOM_REDACT)); assert(json::get<"type"_>(event) == "m.room.redaction"); const auto &target_id { at<"redacts"_>(event) }; // If the prefetch was launched we can't do anything more here. if(prefetch_event_idx(target_id, opts)) return 1; // If the result is cached we can peek at it for more prefetches. const m::event::idx target_idx { find_event_idx(target_id, opts) }; if(unlikely(!target_idx)) return 0; size_t ret(0); ret += m::prefetch(target_idx, "state_key"); ret += m::prefetch(target_idx, "type"); return ret; } // NOTE: QUERY size_t ircd::m::dbs::find_event_idx(const vector_view &idx, const vector_view &event_id, const opts &wopts) { const size_t num { std::min(idx.size(), event_id.size()) }; size_t ret(0); if(wopts.interpose) for(size_t i(0); i < num; ++i) { idx[i] = wopts.interpose->val(db::op::SET, "_event_idx", event_id[i], 0UL); assert(!idx[i] || idx[i] >= vm::sequence::retired); ret += idx[i] != 0; } // Taken when everything satisfied by interpose if(ret == num || !wopts.allow_queries) return ret; // Only do parallel m::index() if there's no results from the prior // queries; they'll get clobbered by the parallel m::index(). if(likely(!ret)) return m::index(idx, event_id); // Fallback to serial queries. for(size_t i(0); i < num; ++i) { idx[i] = m::index(std::nothrow, event_id[i]); ret += idx[i] != 0; } return ret; } size_t ircd::m::dbs::prefetch_event_idx(const vector_view &event_id, const opts &wopts) { size_t ret(0); for(size_t i(0); i < event_id.size(); ++i) { if(wopts.interpose) if(wopts.interpose->has(db::op::SET, "_event_idx", event_id[i])) continue; if(wopts.allow_queries) ret += m::prefetch(event_id[i], "_event_idx"); } return ret; }