// The Construct // // Copyright (C) The Construct Developers, Authors & Contributors // Copyright (C) 2016-2020 Jason Volk // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. /// Residence of the events database instance pointer. decltype(ircd::m::dbs::events) ircd::m::dbs::events; /// Coarse variable for enabling the uncompressed cache on the events database; /// note this conf item is only effective by setting an environmental variable /// before daemon startup. It has no effect in any other regard. decltype(ircd::m::dbs::cache_enable) ircd::m::dbs::cache_enable { { "name", "ircd.m.dbs.cache.enable" }, { "default", true }, }; /// Coarse variable for enabling the compressed cache on the events database; /// note this conf item is only effective by setting an environmental variable /// before daemon startup. It has no effect in any other regard. decltype(ircd::m::dbs::cache_comp_enable) ircd::m::dbs::cache_comp_enable { { "name", "ircd.m.dbs.cache.comp.enable" }, { "default", false }, }; /// The size of the memory buffer for new writes to the DB (backed by the WAL /// on disk). When this buffer is full it is flushed to sorted SST files on /// disk. If this is 0, a per-column value can be used; otherwise this value /// takes precedence as a total value for all columns. (db_write_buffer_size) decltype(ircd::m::dbs::mem_write_buffer_size) ircd::m::dbs::mem_write_buffer_size { { "name", "ircd.m.dbs.mem.write_buffer_size" }, { "default", 0L }, }; /// Value determines the size of writes when creating SST files (i.e during /// compaction). Consider that write calls are yield-points for IRCd and the /// time spent filling the write buffer between calls may hog the CPU doing /// compression during that time etc. (writable_file_max_buffer_size) decltype(ircd::m::dbs::sst_write_buffer_size) ircd::m::dbs::sst_write_buffer_size { { { "name", "ircd.m.dbs.sst.write_buffer_size" }, { "default", long(1_MiB) }, }, [] { static const string_view key{"writable_file_max_buffer_size"}; const size_t &value{sst_write_buffer_size}; if(events && !events->slave) db::setopt(*events, key, lex_cast(value)); } }; // // init // /// Initializes the m::dbs subsystem; sets up the events database. Held/called /// by m::init. Most of the extern variables in m::dbs are not ready until /// this call completes. /// /// We also update the fs::basepath for the database directory to include our /// servername in the path component. The fs::base::DB setting was generated /// during the build and install process, and is unaware of our servername /// at runtime. This change deconflicts multiple instances of IRCd running in /// the same installation prefix using different servernames (i.e clustering /// on the same machine). /// ircd::m::dbs::init::init(const string_view &servername, std::string dbopts) :our_dbpath { fs::path_string(fs::path_views { fs::base::db, servername }) } ,their_dbpath { fs::base::db } { // NOTE that this is a global change that leaks outside of ircd::m. The // database directory for the entire process is being changed here. fs::base::db.set(our_dbpath); // Recall the db directory init manually with the now-updated basepath db::init::directory(); // Open the events database static const string_view &dbname{"events"}; events = std::make_shared(dbname, std::move(dbopts), desc::events); // Cache the columns for the event tuple in order for constant time lookup assert(event_columns == event::size()); std::array keys; //TODO: why did this happen? _key_transform(event{}, begin(keys), end(keys)); //TODO: how did this happen? // Construct global convenience references for the event property columns. for(size_t i(0); i < keys.size(); ++i) event_column.at(i) = db::column { *events, keys.at(i), std::nothrow }; // Construct global convenience references for the metadata columns event_idx = db::column{*events, desc::event_idx.name}; event_json = db::column{*events, desc::event_json.name}; event_refs = db::domain{*events, desc::event_refs.name}; event_horizon = db::domain{*events, desc::event_horizon.name}; event_sender = db::domain{*events, desc::event_sender.name}; event_type = db::domain{*events, desc::event_type.name}; event_state = db::domain{*events, desc::event_state.name}; room_head = db::domain{*events, desc::room_head.name}; room_events = db::domain{*events, desc::room_events.name}; room_type = db::domain{*events, desc::room_type.name}; room_joined = db::domain{*events, desc::room_joined.name}; room_state = db::domain{*events, desc::room_state.name}; room_state_space = db::domain{*events, desc::room_state_space.name}; } /// Shuts down the m::dbs subsystem; closes the events database. The extern /// variables in m::dbs will no longer be functioning after this call. ircd::m::dbs::init::~init() noexcept { // Unref DB (should close) events = {}; // restore the fs::base::DB path the way we found it. fs::base::db.set(their_dbpath); } // // write_opts // decltype(ircd::m::dbs::write_opts::event_refs_all) ircd::m::dbs::write_opts::event_refs_all{[] { char full[event_refs_all.size()]; memset(full, '1', sizeof(full)); return decltype(event_refs_all) { full, sizeof(full) }; }()}; decltype(ircd::m::dbs::write_opts::appendix_all) ircd::m::dbs::write_opts::appendix_all{[] { char full[appendix_all.size()]; memset(full, '1', sizeof(full)); return decltype(appendix_all) { full, sizeof(full) }; }()}; // // Basic write suite // namespace ircd::m::dbs { static size_t _prefetch(db::txn &, const event &, const write_opts &); static size_t _index(db::txn &, const event &, const write_opts &); static size_t blacklist(db::txn &txn, const event::id &, const write_opts &); } size_t ircd::m::dbs::write(db::txn &txn, const event &event, const write_opts &opts) try { if(opts.event_idx == 0 && opts.blacklist) return blacklist(txn, event.event_id, opts); if(unlikely(opts.event_idx == 0)) throw panic { "Cannot write to database: no index specified for event." }; size_t ret(0); if(likely(opts.prefetch)) ret = _prefetch(txn, event, opts); if(likely(opts.index)) ret = _index(txn, event, opts); return ret; } catch(const std::exception &e) { log::error { log, "Event %s txn building error :%s", string_view{event.event_id}, e.what() }; throw; } size_t ircd::m::dbs::blacklist(db::txn &txn, const event::id &event_id, const write_opts &opts) { // An entry in the event_idx column with a value 0 is blacklisting // because 0 is not a valid event_idx. Thus a value here can only // have the value zero. assert(opts.event_idx == 0); assert(!event_id.empty()); static const m::event::idx &zero_idx{0UL}; static const byte_view zero_value { zero_idx }; db::txn::append { txn, event_idx, { opts.op, string_view{event_id}, zero_value } }; return true; } // // Internal interface // namespace ircd::m::dbs { static size_t _prefetch_room_redact(db::txn &, const event &, const write_opts &); static void _index_room_redact(db::txn &, const event &, const write_opts &); static size_t _prefetch_room(db::txn &, const event &, const write_opts &); static void _index_room(db::txn &, const event &, const write_opts &); static size_t _prefetch_event(db::txn &, const event &, const write_opts &); static void _index_event(db::txn &, const event &, const write_opts &); } size_t ircd::m::dbs::_index(db::txn &txn, const event &event, const write_opts &opts) { size_t ret(0); _index_event(txn, event, opts); if(json::get<"room_id"_>(event)) _index_room(txn, event, opts); return ret; } size_t ircd::m::dbs::_prefetch(db::txn &txn, const event &event, const write_opts &opts) { size_t ret(0); ret += _prefetch_event(txn, event, opts); if(json::get<"room_id"_>(event)) ret += _prefetch_room(txn, event, opts); return ret; } void ircd::m::dbs::_index_event(db::txn &txn, const event &event, const write_opts &opts) { if(opts.appendix.test(appendix::EVENT_ID)) _index_event_id(txn, event, opts); if(opts.appendix.test(appendix::EVENT_COLS)) _index_event_cols(txn, event, opts); if(opts.appendix.test(appendix::EVENT_JSON)) _index_event_json(txn, event, opts); if(opts.appendix.test(appendix::EVENT_SENDER)) _index_event_sender(txn, event, opts); if(opts.appendix.test(appendix::EVENT_TYPE)) _index_event_type(txn, event, opts); if(opts.appendix.test(appendix::EVENT_STATE)) _index_event_state(txn, event, opts); if(opts.appendix.test(appendix::EVENT_REFS) && opts.event_refs.any()) _index_event_refs(txn, event, opts); if(opts.appendix.test(appendix::EVENT_HORIZON_RESOLVE) && opts.horizon_resolve.any()) _index_event_horizon_resolve(txn, event, opts); } size_t ircd::m::dbs::_prefetch_event(db::txn &txn, const event &event, const write_opts &opts) { size_t ret(0); if(opts.appendix.test(appendix::EVENT_ID)) ;//ret += _prefetch_event_id(txn, event, opts); if(opts.appendix.test(appendix::EVENT_COLS)) ;//ret += _prefetch_event_cols(txn, event, opts); if(opts.appendix.test(appendix::EVENT_JSON)) ;//ret += _prefetch_event_json(txn, event, opts); if(opts.appendix.test(appendix::EVENT_SENDER)) ;//ret += _prefetch_event_sender(txn, event, opts); if(opts.appendix.test(appendix::EVENT_TYPE)) ;//ret += _prefetch_event_type(txn, event, opts); if(opts.appendix.test(appendix::EVENT_STATE)) ;//ret += _prefetch_event_state(txn, event, opts); if(opts.appendix.test(appendix::EVENT_REFS) && opts.event_refs.any()) ret += _prefetch_event_refs(txn, event, opts); if(opts.appendix.test(appendix::EVENT_HORIZON_RESOLVE) && opts.horizon_resolve.any()) ret += _prefetch_event_horizon_resolve(txn, event, opts); return ret; } void ircd::m::dbs::_index_room(db::txn &txn, const event &event, const write_opts &opts) { assert(!empty(json::get<"room_id"_>(event))); if(opts.appendix.test(appendix::ROOM_EVENTS)) _index_room_events(txn, event, opts); if(opts.appendix.test(appendix::ROOM_TYPE)) _index_room_type(txn, event, opts); if(opts.appendix.test(appendix::ROOM_HEAD)) _index_room_head(txn, event, opts); if(opts.appendix.test(appendix::ROOM_HEAD_RESOLVE)) _index_room_head_resolve(txn, event, opts); if(defined(json::get<"state_key"_>(event))) { if(opts.appendix.test(appendix::ROOM_STATE)) _index_room_state(txn, event, opts); if(opts.appendix.test(appendix::ROOM_STATE_SPACE)) _index_room_state_space(txn, event, opts); if(opts.appendix.test(appendix::ROOM_JOINED) && at<"type"_>(event) == "m.room.member") _index_room_joined(txn, event, opts); } if(opts.appendix.test(appendix::ROOM_REDACT) && json::get<"type"_>(event) == "m.room.redaction") _index_room_redact(txn, event, opts); } size_t ircd::m::dbs::_prefetch_room(db::txn &txn, const event &event, const write_opts &opts) { assert(!empty(json::get<"room_id"_>(event))); size_t ret(0); if(opts.appendix.test(appendix::ROOM_EVENTS)) ;//ret += _prefetch_room_events(txn, event, opts); if(opts.appendix.test(appendix::ROOM_TYPE)) ;//ret += _prefetch_room_type(txn, event, opts); if(opts.appendix.test(appendix::ROOM_HEAD)) ;//ret += _prefetch_room_head(txn, event, opts); if(opts.appendix.test(appendix::ROOM_HEAD_RESOLVE)) ;//ret += _prefetch_room_head_resolve(txn, event, opts); if(defined(json::get<"state_key"_>(event))) { if(opts.appendix.test(appendix::ROOM_STATE)) ;//ret += _prefetch_room_state(txn, event, opts); if(opts.appendix.test(appendix::ROOM_STATE_SPACE)) ;//ret += _prefetch_room_state_space(txn, event, opts); if(opts.appendix.test(appendix::ROOM_JOINED) && at<"type"_>(event) == "m.room.member") ;//ret += _prefetch_room_joined(txn, event, opts); } if(opts.appendix.test(appendix::ROOM_REDACT) && json::get<"type"_>(event) == "m.room.redaction") ret += _prefetch_room_redact(txn, event, opts); return ret; } // NOTE: QUERY void ircd::m::dbs::_index_room_redact(db::txn &txn, const event &event, const write_opts &opts) { assert(opts.appendix.test(appendix::ROOM_REDACT)); assert(json::get<"type"_>(event) == "m.room.redaction"); const auto &target_id { at<"redacts"_>(event) }; const m::event::idx target_idx { find_event_idx(target_id, opts) }; if(unlikely(!target_idx)) { log::dwarning { "Redaction from '%s' missing redaction target '%s'", string_view{event.event_id}, target_id }; if(opts.appendix.test(appendix::EVENT_HORIZON)) _index_event_horizon(txn, event, opts, target_id); return; } char state_key_buf[event::STATE_KEY_MAX_SIZE]; const string_view &state_key { m::get(std::nothrow, target_idx, "state_key", state_key_buf) }; if(!state_key) return; char type_buf[event::TYPE_MAX_SIZE]; const string_view &type { m::get(std::nothrow, target_idx, "type", type_buf) }; assert(!empty(type)); const ctx::critical_assertion ca; thread_local char buf[ROOM_STATE_SPACE_KEY_MAX_SIZE]; const string_view &key { room_state_key(buf, at<"room_id"_>(event), type, state_key) }; db::txn::append { txn, room_state, { db::op::DELETE, key, } }; } size_t ircd::m::dbs::_prefetch_room_redact(db::txn &txn, const event &event, const write_opts &opts) { assert(opts.appendix.test(appendix::ROOM_REDACT)); assert(json::get<"type"_>(event) == "m.room.redaction"); const auto &target_id { at<"redacts"_>(event) }; // If the prefetch was launched we can't do anything more here. if(prefetch_event_idx(target_id, opts)) return 1; // If the result is cached we can peek at it for more prefetches. const m::event::idx target_idx { find_event_idx(target_id, opts) }; if(unlikely(!target_idx)) return 0; size_t ret(0); ret += m::prefetch(target_idx, "state_key"); ret += m::prefetch(target_idx, "type"); return ret; } // NOTE: QUERY size_t ircd::m::dbs::find_event_idx(const vector_view &idx, const vector_view &event_id, const write_opts &wopts) { const size_t num { std::min(idx.size(), event_id.size()) }; size_t ret(0); if(wopts.interpose) for(size_t i(0); i < num; ++i) { idx[i] = wopts.interpose->val(db::op::SET, "_event_idx", event_id[i], 0UL); assert(!idx[i] || idx[i] >= vm::sequence::retired); ret += idx[i] != 0; } // Taken when everything satisfied by interpose if(ret == num || !wopts.allow_queries) return ret; // Only do parallel m::index() if there's no results from the prior // queries; they'll get clobbered by the parallel m::index(). if(likely(!ret)) return m::index(idx, event_id); // Fallback to serial queries. for(size_t i(0); i < num; ++i) { idx[i] = m::index(std::nothrow, event_id[i]); ret += idx[i] != 0; } return ret; } size_t ircd::m::dbs::prefetch_event_idx(const vector_view &event_id, const write_opts &wopts) { size_t ret(0); for(size_t i(0); i < event_id.size(); ++i) { if(wopts.interpose) if(wopts.interpose->has(db::op::SET, "_event_idx", event_id[i])) continue; if(wopts.allow_queries) ret += m::prefetch(event_id[i], "_event_idx"); } return ret; }