diff --git a/include/ircd/m/vm.h b/include/ircd/m/vm.h index 58213e691..e643c80ce 100644 --- a/include/ircd/m/vm.h +++ b/include/ircd/m/vm.h @@ -34,6 +34,8 @@ namespace ircd::m::vm const uint64_t &sequence(const eval &); uint64_t retired_sequence(id::event::buf &); uint64_t retired_sequence(); + + ctx::future<> evaluated(const event::id &); } /// Event Evaluation Device diff --git a/ircd/m/m.cc b/ircd/m/m.cc index ee191ef91..09715aad4 100644 --- a/ircd/m/m.cc +++ b/ircd/m/m.cc @@ -383,6 +383,12 @@ ircd::m::feds::state::state(const m::room::id &room_id, // m/vm.h // +namespace ircd::m::vm +{ + extern std::map> depends; + static void notify_depends(const event::id &, std::exception_ptr); +} + decltype(ircd::m::vm::log) ircd::m::vm::log { @@ -405,6 +411,115 @@ decltype(ircd::m::vm::default_copts) ircd::m::vm::default_copts {}; +decltype(ircd::m::vm::depends) +ircd::m::vm::depends +{}; + +void +ircd::m::vm::notify_depends(const event::id &event_id, + std::exception_ptr eptr) +{ + const auto it + { + depends.find(event_id) + }; + + if(it == end(depends)) + return; + + auto &promise(it->second); + if(eptr) + promise.set_exception(std::move(eptr)); + else + promise.set_value(); + + depends.erase(it); +} + +ircd::ctx::future<> +ircd::m::vm::evaluated(const event::id &event_id) +{ + if(exists(event_id)) + return ctx::future<>::already; + + const auto iit + { + depends.emplace(event_id, ctx::promise<>{}) + }; + + return ctx::future<> + { + iit.first->second + }; +} + +const uint64_t & +ircd::m::vm::sequence(const eval &eval) +{ + return eval.sequence; +} + +uint64_t +ircd::m::vm::retired_sequence() +{ + event::id::buf event_id; + return retired_sequence(event_id); +} + +uint64_t +ircd::m::vm::retired_sequence(event::id::buf &event_id) +{ + static constexpr auto column_idx + { + json::indexof() + }; + + auto &column + { + dbs::event_column.at(column_idx) + }; + + const auto it + { + column.rbegin() + }; + + if(!it) + { + // If this iterator is invalid the events db should + // be completely fresh. + assert(db::sequence(*dbs::events) == 0); + return 0; + } + + const auto &ret + { + byte_view(it->first) + }; + + event_id = it->second; + return ret; +} + +ircd::string_view +ircd::m::vm::reflect(const enum fault &code) +{ + switch(code) + { + case fault::ACCEPT: return "ACCEPT"; + case fault::EXISTS: return "EXISTS"; + case fault::INVALID: return "INVALID"; + case fault::DEBUGSTEP: return "DEBUGSTEP"; + case fault::BREAKPOINT: return "BREAKPOINT"; + case fault::GENERAL: return "GENERAL"; + case fault::EVENT: return "EVENT"; + case fault::STATE: return "STATE"; + case fault::INTERRUPT: return "INTERRUPT"; + } + + return "??????"; +} + // // Eval // @@ -525,6 +640,7 @@ ircd::m::vm::eval::operator()(json::iov &event, enum ircd::m::vm::fault ircd::m::vm::eval::operator()(const event &event) +try { using prototype = fault (eval &, const m::event &); @@ -533,74 +649,36 @@ ircd::m::vm::eval::operator()(const event &event) "vm", "eval__event" }; - return function(*this, event); -} - -const uint64_t & -ircd::m::vm::sequence(const eval &eval) -{ - return eval.sequence; -} - -uint64_t -ircd::m::vm::retired_sequence() -{ - event::id::buf event_id; - return retired_sequence(event_id); -} - -uint64_t -ircd::m::vm::retired_sequence(event::id::buf &event_id) -{ - static constexpr auto column_idx + const vm::fault ret { - json::indexof() + function(*this, event) }; - auto &column + if(json::get<"event_id"_>(event)) switch(ret) { - dbs::event_column.at(column_idx) - }; + case fault::ACCEPT: + notify_depends(at<"event_id"_>(event), std::exception_ptr{}); + break; - const auto it - { - column.rbegin() - }; + default: + { + notify_depends(at<"event_id"_>(event), std::make_exception_ptr(error + { + ret, "fault" + })); - if(!it) - { - // If this iterator is invalid the events db should - // be completely fresh. - assert(db::sequence(*dbs::events) == 0); - return 0; + break; + } } - const auto &ret - { - byte_view(it->first) - }; - - event_id = it->second; return ret; } - -ircd::string_view -ircd::m::vm::reflect(const enum fault &code) +catch(...) { - switch(code) - { - case fault::ACCEPT: return "ACCEPT"; - case fault::EXISTS: return "EXISTS"; - case fault::INVALID: return "INVALID"; - case fault::DEBUGSTEP: return "DEBUGSTEP"; - case fault::BREAKPOINT: return "BREAKPOINT"; - case fault::GENERAL: return "GENERAL"; - case fault::EVENT: return "EVENT"; - case fault::STATE: return "STATE"; - case fault::INTERRUPT: return "INTERRUPT"; - } + if(json::get<"event_id"_>(event)) + notify_depends(at<"event_id"_>(event), std::current_exception()); - return "??????"; + throw; } // diff --git a/modules/vm_fetch.cc b/modules/vm_fetch.cc index 4770d8bb5..5e631ba2b 100644 --- a/modules/vm_fetch.cc +++ b/modules/vm_fetch.cc @@ -10,32 +10,531 @@ using namespace ircd; -m::vm::phase -fetch_phase -{ - "fetch" -}; - -void init() -void fini(); - -mapi::header -IRCD_MODULE -{ - "Matrix Virtual Machine: fetch unit", - init, fini -}; +#include "vm_fetch.int.h" // // init // void -init() +_init() { + m::vm::fetch::context = ctx::context + { + "fetcher", + 128_KiB, + std::bind(&m::vm::fetch::worker), + ctx::context::POST + }; + + log::debug + { + m::vm::log, "Fetch unit ready." + }; } void -fini() +_fini() { + log::debug + { + m::vm::log, "Shutting down fetch unit..." + }; + + m::vm::fetch::context.terminate(); + m::vm::fetch::context.join(); + + log::debug + { + m::vm::log, "Fetch unit complete." + }; +} + +// +// fetch_phase +// + +decltype(m::vm::fetch::phase) +m::vm::fetch::phase +{ + "fetch", enter +}; + +void +m::vm::fetch::enter(eval &eval) +{ + assert(eval.event_); + assert(eval.opts); + + const event::prev prev + { + *eval.event_ + }; + + const auto &prev_events + { + json::get<"prev_events"_>(prev) + }; + + const size_t &prev_count + { + size(prev_events) + }; + + for(size_t i(0); i < prev_count; ++i) + { + const auto &prev_id{prev.prev_event(i)}; + if(!exists(prev_id)) + { + log::warning + { + log, "Missing prev %s in %s in %s", + string_view{prev_id}, + json::get<"event_id"_>(*eval.event_), + json::get<"room_id"_>(*eval.event_) + }; +/* + auto future + { + vm::evaluated(prev_id) + }; +*/ + //future.wait(); +// std::cout << "got " << prev_id << " for " << json::get<"event_id"_>(*eval.event_) << std::endl; + } + + if(eval.opts->prev_check_exists && !exists(prev_id)) + throw error + { + fault::EVENT, "Missing prev event %s", prev_id + }; + } +} + +// +// API interface +// + +json::object +m::vm::fetch::acquire(const m::room::id &room_id, + const m::event::id &event_id, + const mutable_buffer &buf) +{ + auto &request + { + _fetch(room_id, event_id) + }; + + request.dock.wait([&request] + { + return request.finished; + }); + + const unwind _remove_{[&event_id] + { + remove(event_id); + }}; + + if(request.eptr) + std::rethrow_exception(request.eptr); + + const json::object &event + { + request + }; + + return json::object + { + data(buf), copy(buf, event) + }; +} + +bool +m::vm::fetch::prefetch(const m::room::id &room_id, + const m::event::id &event_id) +{ + _fetch(room_id, event_id); + return true; +} + +bool +ircd::m::vm::fetch::remove(const m::event::id &event_id) +{ + const auto it + { + fetching.find(event_id) + }; + + if(it == end(fetching)) + return false; + + const request &r{*it}; + if(!r.dock.empty()) + return false; + + for(auto it(begin(fetched)); it != end(fetched); ++it) + if(*it == &r) + { + fetched.erase(it); + break; + } + + fetching.erase(it); + return true; +} + +m::vm::fetch::request & +m::vm::fetch::_fetch(const m::room::id &room_id, + const m::event::id &event_id) +{ + auto it + { + fetching.lower_bound(event_id) + }; + + if(it == end(fetching) || it->event_id != event_id) + it = fetching.emplace_hint(it, room_id, event_id); + else + return const_cast(*it); + + auto &request + { + const_cast(*it) + }; + + request.start(); + dock.notify_all(); + return request; +} + +// +// fetcher +// + +decltype(m::vm::fetch::dock) +m::vm::fetch::dock; + +decltype(m::vm::fetch::fetching) +m::vm::fetch::fetching; + +decltype(m::vm::fetch::fetched) +m::vm::fetch::fetched; + +/// The fetch context is an internal worker which drives the fetch process +/// and then indicates completion to anybody waiting on a fetch. This involves +/// handling errors/timeouts from a fetch attempt and retrying with another +/// server etc. +/// +decltype(m::vm::fetch::context) +m::vm::fetch::context; + +void +ircd::m::vm::fetch::worker() +try +{ + while(1) + { + dock.wait(requesting); + handle(); + } +} +catch(const std::exception &e) +{ + log::critical + { + "Fetch worker :%s", e.what() + }; +} + +bool +ircd::m::vm::fetch::handle() +try +{ + auto next + { + ctx::when_any(begin(fetching), end(fetching)) + }; + + if(!next.wait(seconds(2), std::nothrow)) + return true; + + const auto it + { + next.get() + }; + + if(it == end(fetching)) + { + std::cout << "got nil" << std::endl; + return false; + } + + auto &request + { + const_cast(*it) + }; + + if(request.finished || request.eptr) + return true; + + request.handle(); + return true; +} +catch(const std::exception &e) +{ + log::error + { + "Fetch worker :%s", e.what() + }; + + return true; +} + +bool +ircd::m::vm::fetch::requesting() +{ + return std::any_of(begin(fetching), end(fetching), [] + (const request &request) + { + return !request.finished; + }); +} + +// +// fetch::request +// + +ircd::m::vm::fetch::request::request(const m::room::id &room_id, + const m::event::id &event_id, + const mutable_buffer &buf) +:room_id{room_id} +,event_id{event_id} +,_buf +{ + !buf? + unique_buffer{96_KiB}: + unique_buffer{} +} +,buf +{ + empty(buf)? _buf: buf +} +{ + // Ensure buffer has enough room for a worst-case event, request + // headers and response headers. + assert(size(this->buf) >= 64_KiB + 8_KiB + 8_KiB); +} + +void +ircd::m::vm::fetch::request::start() +{ + m::v1::event::opts opts; + opts.dynamic = false; + opts.remote = origin?: select_random_origin(); + start(std::move(opts)); +} + +void +ircd::m::vm::fetch::request::start(m::v1::event::opts &&opts) +{ + if(!started) + started = ircd::time(); + + last = ircd::time(); + static_cast(*this) = + { + this->event_id, this->buf, std::move(opts) + }; +} + +ircd::string_view +ircd::m::vm::fetch::request::select_random_origin() +{ + const m::room::origins origins + { + room_id + }; + + // copies randomly selected origin into the attempted set. + const auto closure{[this] + (const string_view &origin) + { + this->select_origin(origin); + }}; + + // Tests if origin is potentially viable + const auto proffer{[this] + (const string_view &origin) + { + // Don't want to use a peer we already tried and failed with. + if(attempted.count(origin)) + return false; + + // Don't want to use a peer marked with an error by ircd::server + if(ircd::server::errmsg(origin)) + return false; + + return true; + }}; + + if(!origins.random(closure, proffer)) + throw m::NOT_FOUND + { + "Cannot find any server to fetch %s in %s", + string_view{event_id}, + string_view{room_id}, + }; + + return this->origin; +} + +ircd::string_view +ircd::m::vm::fetch::request::select_origin(const string_view &origin) +{ + const auto iit + { + attempted.emplace(std::string{origin}) + }; + + this->origin = *iit.first; + return this->origin; +} + +void +ircd::m::vm::fetch::request::handle() +{ + auto &future + { + static_cast(*this) + }; + + future.wait(); try + { + future.get(); + } + catch(...) + { + eptr = std::current_exception(); + } + + if(!eptr) + finish(); + else + retry(); +} + +void +ircd::m::vm::fetch::request::retry() +try +{ + eptr = std::exception_ptr{}; + origin = {}; + start(); +} +catch(...) +{ + eptr = std::current_exception(); + finish(); +} + +void +ircd::m::vm::fetch::request::finish() +{ + finished = ircd::time(); + fetched.emplace_back(this); + dock.notify_all(); +} + +bool +ircd::m::vm::fetch::request::operator()(const request &a, + const request &b) +const +{ + return a.event_id < b.event_id; +} + +bool +ircd::m::vm::fetch::request::operator()(const request &a, + const string_view &b) +const +{ + return a.event_id < b; +} + +bool +ircd::m::vm::fetch::request::operator()(const string_view &a, + const request &b) +const +{ + return a < b.event_id; +} + +// +// +// + +extern "C" void +auth_chain_fetch(const m::room::id &room_id, + const m::event::id &event_id, + const net::hostport &remote, + const milliseconds &timeout, + const std::function &closure) +{ + m::v1::event_auth::opts opts; + opts.remote = remote; + opts.dynamic = true; + const unique_buffer buf + { + 16_KiB + }; + + m::v1::event_auth request + { + room_id, event_id, buf, std::move(opts) + }; + + request.wait(timeout); + request.get(); + + const json::array &auth_chain + { + request + }; + + std::vector events{auth_chain.count()}; + std::transform(begin(auth_chain), end(auth_chain), begin(events), [] + (const json::object &pdu) + { + return m::event{pdu}; + }); + + std::sort(begin(events), end(events)); + for(const auto &event : events) + if(!closure(event)) + return; +} + +extern "C" void +auth_chain_eval(const m::room::id &room_id, + const m::event::id &event_id, + const net::hostport &remote) +{ + m::vm::opts opts; + opts.non_conform.set(m::event::conforms::MISSING_PREV_STATE); + opts.non_conform.set(m::event::conforms::MISSING_MEMBERSHIP); + opts.infolog_accept = true; + opts.warnlog |= m::vm::fault::STATE; + opts.warnlog &= ~m::vm::fault::EXISTS; + opts.errorlog &= ~m::vm::fault::STATE; + + auth_chain_fetch(room_id, event_id, remote, seconds(30), [&opts] + (const auto &event) + { + m::vm::eval + { + event, opts + }; + + return true; + }); } diff --git a/modules/vm_fetch.int.h b/modules/vm_fetch.int.h new file mode 100644 index 000000000..36a851a22 --- /dev/null +++ b/modules/vm_fetch.int.h @@ -0,0 +1,86 @@ +// Matrix Construct +// +// Copyright (C) Matrix Construct Developers, Authors & Contributors +// Copyright (C) 2016-2018 Jason Volk +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice is present in all copies. The +// full license for this software is available in the LICENSE file. + +using namespace ircd; + +static void _init(); +static void _fini(); + +mapi::header +IRCD_MODULE +{ + "Matrix Virtual Machine: Fetch Unit", _init, _fini +}; + +// Fetch unit state +namespace ircd::m::vm::fetch +{ + struct request; + + extern ctx::dock dock; + extern std::set fetching; + extern std::deque fetched; + extern ctx::context context; + extern "C" vm::phase phase; + + // worker stack + static bool requesting(); + static bool handle(); + static void worker(); + + // interface stack + static request &_fetch(const m::room::id &, const m::event::id &); + static bool remove(const m::event::id &); + extern "C" bool prefetch(const m::room::id &, const m::event::id &); + extern "C" json::object acquire(const m::room::id &, const m::event::id &, const mutable_buffer &); + + // phase stack + static void enter(m::vm::eval &); +} + +/// Fetch entity state +struct ircd::m::vm::fetch::request +:m::v1::event +{ + using is_transparent = void; + + m::room::id::buf room_id; + m::event::id::buf event_id; + unique_buffer _buf; + mutable_buffer buf; + std::set> attempted; + string_view origin; + time_t started {0}; + time_t last {0}; + time_t finished {0}; + std::exception_ptr eptr; + ctx::dock dock; + + bool operator()(const request &a, const request &b) const; + bool operator()(const request &a, const string_view &b) const; + bool operator()(const string_view &a, const request &b) const; + + void finish(); + void retry(); + void handle(); + + string_view select_origin(const string_view &); + string_view select_random_origin(); + void start(m::v1::event::opts &&); + void start(); + + request(const m::room::id &room_id, + const m::event::id &event_id, + const mutable_buffer & = {}); + + request() = default; + request(request &&) = delete; + request(const request &) = delete; +};