/* * Copyright (C) 2017 Charybdis Development Team * Copyright (C) 2017 Jason Volk * * Permission to use, copy, modify, and/or distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice is present in all copies. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /////////////////////////////////////////////////////////////////////////////// // // m/vm.h // namespace ircd::m::vm { } decltype(ircd::m::vm::log) ircd::m::vm::log { "vm", 'v' }; decltype(ircd::m::vm::fronts) ircd::m::vm::fronts { }; void ircd::m::vm::trace(const id::event &event_id, const tracer &closure) { std::multimap tree; room::branch root{event_id}; event::fetch tab{root.event_id, root.buf}; io::acquire(tab); root.pdu = tab.pdu; const event event{root.pdu}; const auto &depth{at<"depth"_>(event)}; tree.emplace(depth, std::move(root)); for(int64_t d(depth); d > 0; --d) { const auto pit(tree.equal_range(d)); if(pit.first == pit.second) if(tree.lower_bound(d - 1) == end(tree)) break; for(auto it(pit.first); it != pit.second; ++it) { room::branch &b(it->second); const m::event event{b.pdu}; const json::array &prev_events { json::get<"prev_events"_>(event) }; const auto count { prev_events.count() }; room::branch child[count]; event::fetch tab[count]; for(size_t i(0); i < count; ++i) { const json::array &prev_event{prev_events[i]}; child[i] = room::branch { unquote(prev_event[0]) }; tab[i] = { child[i].event_id, child[i].buf }; } io::acquire({tab, count}); for(size_t i(0); i < count; ++i) { child[i].pdu = tab[i].pdu; if(tab[i].error) continue; event::id::buf tmp; const m::event event{child[i].pdu}; if(!closure(event, tmp)) return; const auto &depth{at<"depth"_>(event)}; tree.emplace(depth, std::move(child[i])); } } } for(const auto &pit : tree) { const auto &depth(pit.first); const auto &branch(pit.second); std::cout << pretty_oneline(m::event{branch.pdu}) << std::endl; } } void ircd::m::vm::statefill(const room::id &room_id, const event::id &event_id) try { const unique_buffer buf { 16_MiB //TODO: XXX }; room::state::fetch tab { event_id, room_id, buf }; io::acquire(tab); const json::array &auth_chain { tab.auth_chain }; const json::array &pdus { tab.pdus }; std::vector events(pdus.count()); std::transform(begin(pdus), end(pdus), begin(events), [] (const json::object &event) -> m::event { return event; }); eval{events}; } catch(const std::exception &e) { log.error("Acquiring state for %s at %s: %s", string_view{room_id}, string_view{event_id}, e.what()); throw; } void ircd::m::vm::backfill(const room::id &room_id, const event::id &event_id, const size_t &limit) try { const unique_buffer buf { 16_MiB //TODO: XXX }; room::fetch tab { event_id, room_id, buf }; io::acquire(tab); const json::array &auth_chain { tab.auth_chain }; const json::array &pdus { tab.pdus }; std::vector events(pdus.count()); std::transform(begin(pdus), end(pdus), begin(events), [] (const json::object &event) -> m::event { return event; }); eval{events}; } catch(const std::exception &e) { log.error("Acquiring backfill for %s at %s: %s", string_view{room_id}, string_view{event_id}, e.what()); throw; } ircd::json::object ircd::m::vm::acquire(const id::event &event_id, const mutable_buffer &buf) { const vector_view event_ids(&event_id, &event_id + 1); const vector_view bufs(&buf, &buf + 1); return acquire(event_ids, bufs)? json::object{buf} : json::object{}; } size_t ircd::m::vm::acquire(const vector_view &event_id, const vector_view &buf) { std::vector tabs(event_id.size()); for(size_t i(0); i < event_id.size(); ++i) if(!exists(event_id[i])) tabs[i] = event::fetch { event_id[i], buf[i] }; size_t i(0); io::acquire(vector_view(tabs)); for(const auto &fetch : tabs) if(fetch.pdu) { i++; eval{m::event{fetch.pdu}}; } return i; } /// Federation join user to room /// ircd::m::event::id::buf ircd::m::vm::join(const room::id &room_id, json::iov &iov) { const user::id user_id { iov.at("sender") }; const auto &hostname{room_id.hostname()}; const auto &hostport{room_id.hostport()}; m::log.debug("%s make_join %s to %s from %s:%u", my_host(), string_view{user_id}, string_view{room_id}, hostname, hostport); char room_id_urle_buf[768]; const auto room_id_urle { urlencode(room_id, room_id_urle_buf), }; char user_id_urle_buf[768]; const auto user_id_urle { urlencode(user_id, user_id_urle_buf) }; const fmt::snstringf url { 1024, "_matrix/federation/v1/make_join/%s/%s", room_id_urle, user_id_urle }; m::request request { "GET", url, {}, {} }; struct session session { { std::string(hostname), hostport } }; unique_buffer buf { 64_KiB }; ircd::parse::buffer pb{buf}; const json::object response { session(pb, request) }; const m::event proto { response.at("event") }; //TODO: hash prototype //at<"hashes"_>(proto) //TODO: verify prototype //at<"signatures"_>(proto) m::log.debug("%s make_join %s to %s responded. depth: %ld prev: %s auth: %s", room_id.host(), string_view{user_id}, string_view{room_id}, json::get<"depth"_>(proto), json::get<"prev_events"_>(proto), json::get<"auth_events"_>(proto)); const json::strung content { json::members {{ "membership", "join" }} }; json::iov event; const json::iov::push push[] { { event, { "type", "m.room.member" }}, { event, { "membership", "join" }}, { event, { "room_id", room_id }}, { event, { "origin", my_host() }}, { event, { "sender", user_id }}, { event, { "state_key", user_id }}, { event, { "origin_server_ts", ircd::time() }}, { event, { "depth", at<"depth"_>(proto) }}, { event, { "content", content }}, // { event, { "auth_events", at<"auth_events"_>(proto) }}, // { event, { "prev_events", at<"prev_events"_>(proto) }}, // { event, { "prev_state", at<"prev_state"_>(proto) }}, }; //TODO: XXX auto replaced_auth_events{replace(at<"auth_events"_>(proto), '\\', "")}; auto replaced_prev_events{replace(at<"prev_events"_>(proto), '\\', "")}; auto replaced_prev_state{replace(at<"prev_state"_>(proto), '\\', "")}; const json::iov::push replacements[] { { event, { "auth_events", replaced_auth_events }}, { event, { "prev_events", replaced_prev_events }}, { event, { "prev_state", replaced_prev_state }}, }; const json::strung hash_preimage { event }; const fixed_buffer hash { sha256{const_buffer{hash_preimage}} }; char hashb64[uint(hash.size() * 1.34) + 2]; const json::strung hashes { json::members { { "sha256", b64encode_unpadded(hashb64, hash) } } }; const json::strung event_id_preimage { event }; const fixed_buffer event_id_hash { sha256{const_buffer{event_id_preimage}} }; char event_id_hash_b64[uint(event_id_hash.size() * 1.34) + 2]; const event::id::buf event_id_buf { b64encode_unpadded(event_id_hash_b64, event_id_hash), my_host() }; const json::iov::push _event_id { event, { "event_id", event_id_buf } }; const json::strung signature_preimage { event }; const ed25519::sig sig { self::secret_key.sign(const_buffer{signature_preimage}) }; char signature_buffer[128]; const json::strung signatures{json::members {{ my_host(), json::members { json::member { self::public_key_id, b64encode_unpadded(signature_buffer, sig) } } }}}; const json::iov::push _signatures { event, { "signatures", signatures } }; char event_id_urle_buf[768]; const auto event_id_urle { urlencode(event.at("event_id"), event_id_urle_buf) }; const fmt::bsprintf<1024> send_join_url { "_matrix/federation/v1/send_join/%s/%s", room_id_urle, event_id_urle }; const auto join_event { json::strung(event) }; m::log.debug("%s send_join %s to %s sending: %s membership: %s %s", my_host(), string_view{user_id}, string_view{room_id}, string_view{event.at("type")}, string_view{event.at("membership")}, string_view{event.at("event_id")}); m::request send_join_request { "PUT", send_join_url, {}, join_event }; unique_buffer send_join_buf { 4_MiB }; ircd::parse::buffer sjpb{send_join_buf}; const json::array send_join_response { session(sjpb, send_join_request) }; const auto status { send_join_response.at(0) }; const json::object data { send_join_response.at(1) }; const json::array state { data.at("state") }; const json::array auth_chain { data.at("auth_chain") }; m::log.debug("%s %u send_join %s to %s responded with %zu state and %zu auth_chain events", room_id.host(), status, string_view{user_id}, string_view{room_id}, state.count(), auth_chain.count()); eval{state}; eval{event}; return event_id_buf; } /// Insert a new event originating from this server. /// /// Figure 1: /// in . /// ___:::::::__V <-- this function /// | ||||||| // /// | \\|// //| /// | ||| // | /// | ||// | /// | !!! | /// | * | <----- IRCd core /// | |//|||\\| | /// |/|/|/|\|\|\| <---- release commitment propagation cone /// out /// /// This function takes an event object vector and adds our origin and event_id /// and hashes and signature and attempts to inject the event into the core. /// The caller is expected to have done their best to check if this event will /// succeed once it hits the core because failures blow all this effort. The /// caller's ircd::ctx will obviously yield for evaluation, which may involve /// requests over the internet in the worst case. Nevertheless, the evaluation, /// write and release sequence of the core commitment is designed to allow the /// caller to service a usual HTTP request conveying success or error without /// hanging their client too much. /// ircd::m::event::id::buf ircd::m::vm::commit(json::iov &iov) { const json::iov::set set[] { { iov, { "origin_server_ts", ircd::time() }}, { iov, { "origin", my_host() }}, }; // Need this for now const unique_buffer scratch { 64_KiB }; // derp auto preimage { json::stringify(mutable_buffer{scratch}, iov) }; const fixed_buffer event_id_hash { sha256{const_buffer{preimage}} }; char event_id_hash_b64[uint(event_id_hash.size() * 1.34) + 2]; const event::id::buf event_id_buf { b64encode_unpadded(event_id_hash_b64, event_id_hash), my_host() }; const json::iov::set event_id { iov, { "event_id", event_id_buf } }; // derp preimage = { json::stringify(mutable_buffer{scratch}, iov) }; const fixed_buffer hash { sha256{const_buffer{preimage}} }; fixed_buffer hashb64; const json::iov::set hashes { iov, json::member { "hashes", json::members { { "sha256", b64encode_unpadded(hashb64, hash) } } } }; // derp preimage = { json::stringify(mutable_buffer{scratch}, iov) }; const ed25519::sig sig { self::secret_key.sign(const_buffer{preimage}) }; char signature_buffer[128]; const json::iov::set signatures { iov, json::member { "signatures", json::members {{ my_host(), json::members { json::member { self::public_key_id, b64encode_unpadded(signature_buffer, sig) } } }} } }; const m::event event { iov }; if(!json::get<"type"_>(event)) throw BAD_JSON("Required event field: type"); if(!json::get<"sender"_>(event)) throw BAD_JSON("Required event field: sender"); log.debug("injecting event(mark: %ld) %s", vm::current_sequence, pretty_oneline(event)); ircd::timer timer; eval{event}; log.debug("committed event %s (mark: %ld time: %ld$ms)", at<"event_id"_>(event), vm::current_sequence, timer.at().count()); return event_id_buf; } // // Eval // // Processes any event from any place from any time and does whatever is // necessary to validate, reject, learn from new information, ignore old // information and advance the state of IRCd as best as possible. namespace ircd::m::vm { bool check_fault_resume(eval &); void check_fault_throw(eval &); void write(const event &, db::iov &txn); void write(eval &); int _query(eval &, const query<> &, const closure_bool &); int _query_where(eval &, const query &where, const closure_bool &closure); int _query_where(eval &, const query &where, const closure_bool &closure); bool evaluate(eval &, const vector_view &, const size_t &i); enum eval::fault evaluate(eval &, const event &); enum eval::fault evaluate(eval &, const vector_view &); } decltype(ircd::m::vm::eval::default_opts) ircd::m::vm::eval::default_opts {}; ircd::m::vm::eval::eval() :eval(default_opts) { } ircd::m::vm::eval::eval(const struct opts &opts) :opts{&opts} { } enum ircd::m::vm::eval::fault ircd::m::vm::eval::operator()() { assert(0); return fault::ACCEPT; } enum ircd::m::vm::eval::fault ircd::m::vm::eval::operator()(const json::array &events) { std::vector event; event.reserve(events.count()); auto it(begin(events)); for(; it != end(events); ++it) event.emplace_back(*it); return operator()(vector_view(event)); } enum ircd::m::vm::eval::fault ircd::m::vm::eval::operator()(const json::vector &events) { std::vector event; event.reserve(events.count()); auto it(begin(events)); for(; it != end(events); ++it) event.emplace_back(*it); return operator()(vector_view(event)); } enum ircd::m::vm::eval::fault ircd::m::vm::eval::operator()(const vector_view &events) { static const size_t max { 1024 }; for(size_t i(0); i < events.size(); i += max) { const vector_view evs { events.data() + i, std::min(events.size() - i, size_t(max)) }; enum fault code; switch((code = evaluate(*this, evs))) { case fault::ACCEPT: continue; default: //check_fault_throw(*this); return code; } } return fault::ACCEPT; } enum ircd::m::vm::eval::fault ircd::m::vm::eval::operator()(const event &event) { const auto code { evaluate(*this, {&event, 1}) }; switch(code) { case ACCEPT: break; default: return code; } } enum ircd::m::vm::eval::fault ircd::m::vm::evaluate(eval &eval, const vector_view &events) { const std::unique_ptr port { new eval::port[events.size()] }; const vector_view p { port.get(), events.size() }; auto &ef{eval.ef}; size_t a{0}, b{0}, i{0}; for(size_t i(0); i < events.size(); ++i) p[i].event = events.data() + i; for(; b < events.size() && a < events.size(); ++b, ++i, i %= events.size()) { if(p[i].h) continue; if((p[i].h = evaluate(eval, p, i))) { if(p[i].w) { write(*p[i].event, eval.txn); ++eval.cs; } b = 0; ++a; } } const auto cs{eval.cs}; if(cs) { write(eval); for(size_t i(0); i < p.size(); ++i) { if(p[i].event && p[i].w) { log.info("%s", pretty_oneline(*p[i].event)); vm::inserted.notify(*p[i].event); p[i] = eval::port{}; } } } return cs == events.size()? eval.fault::ACCEPT: eval.fault::EVENT; } bool ircd::m::vm::evaluate(eval &eval, const vector_view &pv, const size_t &i) { auto &ef{eval.ef}; auto &p{pv[i]}; if(!p.event) return true; auto &event{*p.event}; switch((p.code = evaluate(eval, event))) { case eval.fault::ACCEPT: { p.w = true; ef.erase(at<"event_id"_>(event)); return true; } case eval.fault::EVENT: { const auto it(std::find_if(begin(pv), end(pv), [&ef] (const auto &port) -> bool { return port.event? ef.count(at<"event_id"_>(*port.event)) : false; })); return it == end(pv); } case eval.fault::STATE: return true; default: return true; } } enum ircd::m::vm::eval::fault ircd::m::vm::evaluate(eval &eval, const event &event) { const auto &event_id { at<"event_id"_>(event) }; const auto &depth { at<"depth"_>(event) }; auto code { eval.fault::ACCEPT }; const m::event::prev prev{event}; m::for_each(prev, [&eval, &code](const event::id &event_id) { const query q { { "event_id", event_id } }; if(eval.capstan.test(q) != true) { eval.ef.emplace(event_id); code = eval.fault::EVENT; } }); log.debug("%s %s", reflect(code), pretty_oneline(event)); if(code == eval.fault::ACCEPT) eval.capstan.fwd(event); return code; } void ircd::m::vm::write(eval &eval) { log.debug("Committing %zu events to database...", eval.cs); eval.txn(); vm::current_sequence += eval.cs; eval.txn.clear(); eval.cs = 0; } /* bool ircd::m::vm::check_fault_resume(eval &eval) { switch(eval.fault) { case eval.fault::ACCEPT: return true; default: check_fault_throw(eval); return false; } } void ircd::m::vm::check_fault_throw(eval &eval) { // With nothrow there is nothing else for this function to do as the user // will have to test the fault code and error ptr manually. if(eval.opts->nothrow) return; switch(eval.fault) { case eval.fault::ACCEPT: return; case eval.fault::GENERAL: if(eval.error) std::rethrow_exception(eval.error); // [[fallthrough]] default: throw VM_FAULT("(%u): %s", uint(eval.fault), reflect(eval.fault)); } } */ int ircd::m::vm::test(eval &eval, const query<> &where) { return test(eval, where, [](const auto &event) { return true; }); } int ircd::m::vm::test(eval &eval, const query<> &clause, const closure_bool &closure) { return _query(eval, clause, [&closure](const auto &event) { return closure(event); }); } int ircd::m::vm::_query(eval &eval, const query<> &clause, const closure_bool &closure) { switch(clause.type) { case where::equal: { const auto &c { dynamic_cast &>(clause) }; return _query_where(eval, c, closure); } case where::logical_and: { const auto &c { dynamic_cast &>(clause) }; return _query_where(eval, c, closure); } default: return -1; } } int ircd::m::vm::_query_where(eval &eval, const query &where, const closure_bool &closure) { log.debug("eval(%p): Query [%s]: %s", &eval, "where equal", pretty_oneline(where.value)); const int ret { eval.capstan.test(where) }; return ret; } int ircd::m::vm::_query_where(eval &eval, const query &where, const closure_bool &closure) { const auto &lhs{*where.a}, &rhs{*where.b}; const auto reclosure{[&lhs, &rhs, &closure] (const auto &event) { if(!rhs(event)) return false; return closure(event); }}; return _query(eval, lhs, reclosure); } ircd::ctx::view ircd::m::vm::inserted {}; uint64_t ircd::m::vm::current_sequence {}; template<> std::list ircd::m::vm::witness::instance_list::list { }; ircd::m::vm::capstan::capstan() :acc(witness::list.size()) { size_t i(0); const auto &list{vm::witness::list}; for(auto it(begin(list)); it != end(list); ++it, i++) { auto &witness{**it}; this->acc[i] = witness.init(); } } ircd::m::vm::capstan::~capstan() noexcept { } void ircd::m::vm::capstan::fwd(const event &event) { size_t i(0); const auto &list{vm::witness::list}; for(auto it(begin(list)); it != end(list); ++it, i++) { auto &witness{**it}; const auto &acc{this->acc[i]}; witness.add(acc.get(), event); } } void ircd::m::vm::capstan::rev(const event &event) { size_t i(0); const auto &list{vm::witness::list}; for(auto it(begin(list)); it != end(list); ++it, i++) { auto &witness{**it}; const auto &acc{this->acc[i]}; witness.add(acc.get(), event); } } int ircd::m::vm::capstan::test(const query<> &q) const { size_t i(0); const auto &list{vm::witness::list}; for(auto it(begin(list)); it != end(list); ++it, i++) { auto &witness{**it}; const auto &acc{this->acc[i]}; const auto res{witness.test(acc.get(), q)}; if(res >= 0) return res; } return -1; } ssize_t ircd::m::vm::capstan::count(const query<> &q) const { size_t i(0); const auto &list{vm::witness::list}; for(auto it(begin(list)); it != end(list); ++it, i++) { auto &witness{**it}; const auto &acc{this->acc[i]}; const auto res{witness.count(acc.get(), q)}; if(res >= 0) return res; } return -1; } ircd::m::vm::accumulator::~accumulator() noexcept { } ircd::m::vm::witness::witness(std::string name) :name{std::move(name)} { } ircd::m::vm::witness::~witness() noexcept { } ircd::m::vm::front & ircd::m::vm::fronts::get(const room::id &room_id, const event &event) try { front &ret { map[std::string(room_id)] }; if(ret.map.empty()) fetch(room_id, ret, event); return ret; } catch(const std::exception &e) { map.erase(std::string(room_id)); throw; } ircd::m::vm::front & ircd::m::vm::fronts::get(const room::id &room_id) { const auto it { map.find(std::string(room_id)) }; if(it == end(map)) throw m::NOT_FOUND { "No fronts for unknown room %s", room_id }; front &ret{it->second}; if(unlikely(ret.map.empty())) throw m::NOT_FOUND { "No fronts for room %s", room_id }; return ret; } ircd::m::vm::front & ircd::m::vm::fetch(const room::id &room_id, front &front, const event &event) { const query query { { "room_id", room_id }, }; for_each(query, [&front] (const auto &event) { for_each(m::event::prev{event}, [&front, &event] (const auto &key, const auto &prev_events) { for(const json::array &prev_event : prev_events) { const event::id &prev_event_id{unquote(prev_event[0])}; front.map.erase(std::string{prev_event_id}); } const auto &depth { json::get<"depth"_>(event) }; if(depth > front.top) front.top = depth; front.map.emplace(at<"event_id"_>(event), depth); }); }); if(!front.map.empty()) return front; const event::id &event_id { at<"event_id"_>(event) }; /* if(!my_host(room_id.host())) { log.debug("No fronts available for %s; acquiring state eigenvalue at %s...", string_view{room_id}, string_view{event_id}); statefill(room_id, event_id); return front; } */ log.debug("No fronts available for %s using %s", string_view{room_id}, string_view{event_id}); front.map.emplace(at<"event_id"_>(event), json::get<"depth"_>(event)); front.top = json::get<"depth"_>(event); /* if(!my_host(room_id.host())) { backfill(room_id, event_id, 64); return front; } */ return front; } ircd::string_view ircd::m::vm::reflect(const enum eval::fault &code) { switch(code) { case eval::fault::ACCEPT: return "ACCEPT"; case eval::fault::GENERAL: return "GENERAL"; case eval::fault::DEBUGSTEP: return "DEBUGSTEP"; case eval::fault::BREAKPOINT: return "BREAKPOINT"; case eval::fault::EVENT: return "EVENT"; } return "??????"; } // // Write // namespace ircd::m { void append_indexes(const event &, db::iov &); } void ircd::m::vm::write(const event &event, db::iov &txn) { db::iov::append { txn, at<"event_id"_>(event), event }; append_indexes(event, txn); } // // Query // namespace ircd::m::vm { bool _query(const query<> &, const closure_bool &); bool _query_event_id(const query<> &, const closure_bool &); bool _query_in_room_id(const query<> &, const closure_bool &, const room::id &); bool _query_for_type_state_key_in_room_id(const query<> &, const closure_bool &, const room::id &, const string_view &type = {}, const string_view &state_key = {}); int _query_where_event_id(const query &, const closure_bool &); int _query_where_room_id_at_event_id(const query &, const closure_bool &); int _query_where_room_id(const query &, const closure_bool &); int _query_where(const query &where, const closure_bool &closure); int _query_where(const query &where, const closure_bool &closure); const query<> noop; } ircd::m::vm::query<>::~query() noexcept { } bool ircd::m::vm::exists(const event::id &event_id) { db::column column { *event::events, "event_id" }; return has(column, event_id); } bool ircd::m::vm::test(const query<> &where) { return test(where, [](const auto &event) { return true; }); } bool ircd::m::vm::test(const query<> &clause, const closure_bool &closure) { bool ret{false}; _query(clause, [&ret, &closure](const auto &event) { ret = closure(event); return true; }); return ret; } size_t ircd::m::vm::count(const query<> &where) { return count(where, [](const auto &event) { return true; }); } size_t ircd::m::vm::count(const query<> &where, const closure_bool &closure) { size_t i(0); for_each(where, [&closure, &i](const auto &event) { i += closure(event); }); return i; } /* void ircd::m::vm::rfor_each(const closure &closure) { const query where{}; rfor_each(where, closure); } void ircd::m::vm::rfor_each(const query<> &where, const closure &closure) { rquery(where, [&closure](const auto &event) { closure(event); return false; }); } bool ircd::m::vm::rquery(const closure_bool &closure) { const query where{}; return rquery(where, closure); } bool ircd::m::vm::rquery(const query<> &where, const closure_bool &closure) { //return _rquery_(where, closure); return true; } */ void ircd::m::vm::for_each(const closure &closure) { const query where{}; for_each(where, closure); } void ircd::m::vm::for_each(const query<> &clause, const closure &closure) { _query(clause, [&closure](const auto &event) { closure(event); return false; }); } bool ircd::m::vm::_query(const query<> &clause, const closure_bool &closure) { switch(clause.type) { case where::equal: { const auto &c { dynamic_cast &>(clause) }; switch(_query_where(c, closure)) { case 0: return false; case 1: return true; default: break; } break; } case where::logical_and: { const auto &c { dynamic_cast &>(clause) }; switch(_query_where(c, closure)) { case 0: return false; case 1: return true; default: break; } break; } default: { return _query_event_id(clause, closure); } } return _query_event_id(clause, closure); } int ircd::m::vm::_query_where(const query &where, const closure_bool &closure) { log.debug("Query [%s]: %s", "where equal", pretty_oneline(where.value)); const auto &value{where.value}; const auto &room_id{json::get<"room_id"_>(value)}; if(room_id) return _query_where_room_id(where, closure); const auto &event_id{json::get<"event_id"_>(value)}; if(event_id) return _query_where_event_id(where, closure); return -1; } int ircd::m::vm::_query_where(const query &where, const closure_bool &closure) { const auto &lhs{*where.a}, &rhs{*where.b}; const auto reclosure{[&lhs, &rhs, &closure] (const auto &event) { if(!rhs(event)) return false; return closure(event); }}; return _query(lhs, reclosure); } int ircd::m::vm::_query_where_event_id(const query &where, const closure_bool &closure) { const event::id &event_id { at<"event_id"_>(where.value) }; if(my(event_id)) { std::cout << "GET LOCAL? " << event_id << std::endl; if(_query_event_id(where, closure)) return true; } std::cout << "GET REMOTE? " << event_id << std::endl; const unique_buffer buf { 64_KiB }; event::fetch tab { event_id, buf }; const m::event event { io::acquire(tab) }; return closure(event); } int ircd::m::vm::_query_where_room_id_at_event_id(const query &where, const closure_bool &closure) { //std::cout << "where room id at event id?" << std::endl; const auto &value{where.value}; const auto &room_id{json::get<"room_id"_>(value)}; const auto &event_id{json::get<"event_id"_>(value)}; const auto &state_key{json::get<"state_key"_>(value)}; if(!defined(state_key)) return _query_where_event_id(where, closure); if(my(event_id)) { std::cout << "GET LOCAL STATE? " << event_id << std::endl; return -1; } std::cout << "GET REMOTE STATE? " << event_id << std::endl; vm::statefill(room_id, event_id); auto &front{fronts.get(room_id)}; const auto &type{json::get<"type"_>(value)}; if(type && state_key) return _query_for_type_state_key_in_room_id(where, closure, room_id, type, state_key); return _query_in_room_id(where, closure, room_id); } int ircd::m::vm::_query_where_room_id(const query &where, const closure_bool &closure) { //std::cout << "where room id?" << std::endl; const auto &value{where.value}; const auto &room_id{json::get<"room_id"_>(value)}; const auto &event_id{json::get<"event_id"_>(value)}; if(event_id) return _query_where_room_id_at_event_id(where, closure); //auto &front{fronts.get(room_id)}; //std::cout << "where room id..." << std::endl; const auto &type{json::get<"type"_>(value)}; const auto &state_key{json::get<"state_key"_>(value)}; const bool is_state{defined(state_key) == true}; if(type && is_state) return _query_for_type_state_key_in_room_id(where, closure, room_id, type, state_key); if(is_state) return _query_for_type_state_key_in_room_id(where, closure, room_id, type, state_key); //std::cout << "in room id?" << std::endl; return _query_in_room_id(where, closure, room_id); } bool ircd::m::vm::_query_event_id(const query<> &query, const closure_bool &closure) { cursor cursor { "event_id", &query }; for(auto it(cursor.begin()); bool(it); ++it) if(closure(*it)) return true; return false; } bool ircd::m::vm::_query_in_room_id(const query<> &query, const closure_bool &closure, const room::id &room_id) { cursor cursor { "event_id in room_id", &query }; for(auto it(cursor.begin(room_id)); bool(it); ++it) if(closure(*it)) return true; return false; } bool ircd::m::vm::_query_for_type_state_key_in_room_id(const query<> &query, const closure_bool &closure, const room::id &room_id, const string_view &type, const string_view &state_key) { cursor cursor { "event_id for type,state_key in room_id", &query }; static const size_t max_type_size { 255 }; static const size_t max_state_key_size { 255 }; const auto key_max { room::id::buf::SIZE + max_type_size + max_state_key_size + 2 }; size_t key_len; char key[key_max]; key[0] = '\0'; key_len = strlcat(key, room_id, sizeof(key)); key_len = strlcat(key, "..", sizeof(key)); //TODO: prefix protocol key_len = strlcat(key, type, sizeof(key)); //TODO: prefix protocol key_len = strlcat(key, state_key, sizeof(key)); //TODO: prefix protocol for(auto it(cursor.begin(key)); bool(it); ++it) if(closure(*it)) return true; return false; } namespace ircd::m { struct indexer; extern std::set> indexers; } struct ircd::m::indexer { struct concat; struct concat_v; struct concat_2v; std::string name; virtual void operator()(const event &, db::iov &iov) const = 0; indexer(std::string name) :name{std::move(name)} {} virtual ~indexer() noexcept; }; ircd::m::indexer::~indexer() noexcept { } void ircd::m::append_indexes(const event &event, db::iov &iov) { for(const auto &ptr : indexers) { const m::indexer &indexer{*ptr}; indexer(event, iov); } } struct ircd::m::indexer::concat :indexer { std::string col_a; std::string col_b; void operator()(const event &, db::iov &) const override; concat(std::string col_a, std::string col_b) :indexer { fmt::snstringf(512, "%s in %s", col_a, col_b) } ,col_a{col_a} ,col_b{col_b} {} }; void ircd::m::indexer::concat::operator()(const event &event, db::iov &iov) const { if(!iov.has(db::op::SET, col_a) || !iov.has(db::op::SET, col_b)) return; static const size_t buf_max { 1024 }; char index[buf_max]; index[0] = '\0'; const auto function { [&index](auto &val) { strlcat(index, byte_view{val}, buf_max); } }; at(event, col_b, function); at(event, col_a, function); db::iov::append { iov, db::delta { name, // col index, // key {}, // val } }; } struct ircd::m::indexer::concat_v :indexer { std::string col_a; std::string col_b; std::string col_c; void operator()(const event &, db::iov &) const override; concat_v(std::string col_a, std::string col_b, std::string col_c) :indexer { fmt::snstringf(512, "%s for %s in %s", col_a, col_b, col_c) } ,col_a{col_a} ,col_b{col_b} ,col_c{col_c} {} }; void ircd::m::indexer::concat_v::operator()(const event &event, db::iov &iov) const { if(!iov.has(db::op::SET, col_c) || !iov.has(db::op::SET, col_b) || !iov.has(db::op::SET, col_a)) return; static const size_t buf_max { 1024 }; char index[buf_max]; index[0] = '\0'; const auto concat { [&index](auto &val) { strlcat(index, byte_view{val}, buf_max); } }; at(event, col_c, concat); at(event, col_b, concat); string_view val; at(event, col_a, [&val](auto &_val) { val = byte_view{_val}; }); db::iov::append { iov, db::delta { name, // col index, // key val, // val } }; } struct ircd::m::indexer::concat_2v :indexer { std::string col_a; std::string col_b0; std::string col_b1; std::string col_c; void operator()(const event &, db::iov &) const override; concat_2v(std::string col_a, std::string col_b0, std::string col_b1, std::string col_c) :indexer { fmt::snstringf(512, "%s for %s,%s in %s", col_a, col_b0, col_b1, col_c) } ,col_a{col_a} ,col_b0{col_b0} ,col_b1{col_b1} ,col_c{col_c} {} }; void ircd::m::indexer::concat_2v::operator()(const event &event, db::iov &iov) const { if(!iov.has(db::op::SET, col_c) || !iov.has(db::op::SET, col_b0) || !iov.has(db::op::SET, col_b1)) return; static const size_t buf_max { 2048 }; char index[buf_max]; index[0] = '\0'; const auto concat { [&index](auto &val) { strlcat(index, byte_view{val}, buf_max); } }; at(event, col_c, concat); strlcat(index, "..", buf_max); //TODO: special at(event, col_b0, concat); at(event, col_b1, concat); string_view val; at(event, col_a, [&val](auto &_val) { val = byte_view{_val}; }); db::iov::append { iov, db::delta { name, // col index, // key val, // val } }; } std::set> ircd::m::indexers {{ std::make_shared("event_id", "sender"), std::make_shared("event_id", "room_id"), std::make_shared("event_id", "room_id", "type"), std::make_shared("event_id", "room_id", "sender"), std::make_shared("event_id", "type", "state_key", "room_id"), }}; ircd::string_view ircd::m::vm::reflect(const where &w) { switch(w) { case where::noop: return "noop"; case where::test: return "test"; case where::equal: return "equal"; case where::not_equal: return "not_equal"; case where::logical_or: return "logical_or"; case where::logical_and: return "logical_and"; case where::logical_not: return "logical_not"; } return "?????"; }