// Matrix Construct // // Copyright (C) Matrix Construct Developers, Authors & Contributors // Copyright (C) 2016-2019 Jason Volk <jason@zemos.net> // // Permission to use, copy, modify, and/or distribute this software for any // purpose with or without fee is hereby granted, provided that the above // copyright notice and this permission notice is present in all copies. The // full license for this software is available in the LICENSE file. namespace ircd::m::sync { extern const ctx::pool::opts pool_opts; } decltype(ircd::m::sync::stats_info) ircd::m::sync::stats_info { { "name", "ircd.m.sync.stats.info" }, { "default", false }, }; decltype(ircd::m::sync::log) ircd::m::sync::log { "m.sync", 's' }; decltype(ircd::m::sync::pool_opts) ircd::m::sync::pool_opts { ctx::DEFAULT_STACK_SIZE, 0, -1, 0 }; decltype(ircd::m::sync::pool) ircd::m::sync::pool { "m.sync", pool_opts }; template<> decltype(ircd::util::instance_multimap<std::string, ircd::m::sync::item, std::less<>>::map) ircd::util::instance_multimap<std::string, ircd::m::sync::item, std::less<>>::map {}; template<> decltype(ircd::util::instance_list<ircd::m::sync::data>::allocator) ircd::util::instance_list<ircd::m::sync::data>::allocator {}; template<> decltype(ircd::util::instance_list<ircd::m::sync::data>::list) ircd::util::instance_list<ircd::m::sync::data>::list { allocator }; bool ircd::m::sync::for_each(const item_closure_bool &closure) { auto it(begin(item::map)); for(; it != end(item::map); ++it) if(!closure(*it->second)) return false; return true; } bool ircd::m::sync::for_each(const string_view &prefix, const item_closure_bool &closure) { const auto depth { token_count(prefix, '.') }; auto it { item::map.lower_bound(prefix) }; for(; it != end(item::map); ++it) { const auto item_depth { token_count(it->first, '.') }; if(item_depth > depth + 1) continue; if(it->first == prefix) continue; if(item_depth < depth + 1) break; if(!closure(*it->second)) return false; } return true; } bool ircd::m::sync::apropos(const data &d, const event &event) { return apropos(d, index(event, std::nothrow)); } bool ircd::m::sync::apropos(const data &d, const event::id &event_id) { return apropos(d, index(event_id, std::nothrow)); } bool ircd::m::sync::apropos(const data &d, const event::idx &event_idx) { return d.phased || (event_idx >= d.range.first && event_idx < d.range.second); } ircd::string_view ircd::m::sync::loghead(const data &data) { thread_local char headbuf[256], rembuf[128], iecbuf[2][64], tmbuf[32]; const auto remstr { data.client? string(rembuf, ircd::remote(*data.client)): string_view{} }; const auto flush_bytes { data.stats? data.stats->flush_bytes: 0U }; const auto flush_count { data.stats? data.stats->flush_count: 0U }; const auto tmstr { data.stats? ircd::pretty(tmbuf, data.stats->timer.at<milliseconds>(), true): string_view{} }; return fmt::sprintf { headbuf, "%s %s %ld:%lu|%lu%s chunk:%zu sent:%s of %s in %s", remstr, string_view{data.user.user_id}, data.range.first, data.range.second, vm::sequence::retired, data.phased? "|P"_sv : ""_sv, flush_count, ircd::pretty(iecbuf[1], iec(flush_bytes)), data.out? ircd::pretty(iecbuf[0], iec(flush_bytes + size(data.out->completed()))): string_view{}, tmstr }; } // // item // // // item::item // ircd::m::sync::item::item(std::string name, handle polylog, handle linear, const json::members &feature) :instance_multimap { std::move(name) } ,conf_name { fmt::snstringf{128, "ircd.m.sync.%s.enable", this->name()}, fmt::snstringf{128, "ircd.m.sync.%s.stats.debug", this->name()}, } ,enable { { "name", conf_name[0] }, { "default", true }, } ,stats_debug { { "name", conf_name[1] }, { "default", false }, } ,_polylog { std::move(polylog) } ,_linear { std::move(linear) } ,feature { feature } ,opts { this->feature } ,phased { opts.get<bool>("phased", false) } { log::debug { log, "Registered sync item(%p) '%s' (%zu features)", this, this->name(), opts.size(), }; } ircd::m::sync::item::~item() noexcept { log::debug { log, "Unregistered sync item(%p) '%s'", this, this->name() }; } bool ircd::m::sync::item::polylog(data &data) try { // Skip the item if disabled by configuration if(!enable) return false; if(data.phased && !phased && int64_t(data.range.first) < 0) return false; #ifdef RB_DEBUG sync::stats stats { data.stats && (stats_info || stats_debug)? *data.stats: sync::stats{} }; if(data.stats && (stats_info || stats_debug)) stats.timer = {}; #endif const bool ret { _polylog(data) }; #ifdef RB_DEBUG if(data.stats && (stats_info || stats_debug)) { //data.out.flush(); thread_local char tmbuf[32]; log::debug { log, "polylog %s commit:%b '%s' %s", loghead(data), ret, name(), ircd::pretty(tmbuf, stats.timer.at<microseconds>(), true) }; } #endif this_ctx::interruption_point(); return ret; } catch(const std::bad_function_call &e) { log::dwarning { log, "polylog %s '%s' missing handler :%s", loghead(data), name(), e.what() }; return false; } catch(const m::error &e) { log::derror { log, "polylog %s '%s' :%s %s", loghead(data), name(), e.what(), e.content }; return false; } catch(const std::exception &e) { log::derror { log, "polylog %s '%s' :%s", loghead(data), name(), e.what() }; throw; } bool ircd::m::sync::item::linear(data &data) try { if(!enable) return false; return _linear(data); } catch(const std::bad_function_call &e) { thread_local char rembuf[128]; log::dwarning { log, "linear %s '%s' missing handler :%s", loghead(data), name(), e.what() }; return false; } catch(const m::error &e) { log::derror { log, "linear %s '%s' :%s %s", loghead(data), name(), e.what(), e.content }; return false; } catch(const std::exception &e) { log::derror { log, "linear %s '%s' :%s", loghead(data), name(), e.what() }; throw; } size_t ircd::m::sync::item::children() const { size_t ret(0); sync::for_each(this->name(), [&ret] (auto &item) { ++ret; return true; }); return ret; } ircd::string_view ircd::m::sync::item::member_name() const { return token_last(name(), '.'); } ircd::string_view ircd::m::sync::item::name() const { return this->instance_multimap::it->first; }