diff --git a/include/ircd/m/vm.h b/include/ircd/m/vm.h index d82d13275..bf2e62b92 100644 --- a/include/ircd/m/vm.h +++ b/include/ircd/m/vm.h @@ -22,23 +22,25 @@ namespace ircd::m::vm enum fault :uint; using fault_t = std::underlying_type::type; - extern log::log log; - extern const opts default_opts; - extern const copts default_copts; - string_view reflect(const fault &); http::code http_code(const fault &); - string_view loghead(const mutable_buffer &, const eval &); string_view loghead(const eval &); // single tls buffer + + extern const opts default_opts; + extern const copts default_copts; + extern log::log log; + extern ctx::dock dock; + extern bool ready; } namespace ircd::m::vm::sequence { + extern ctx::dock dock; extern uint64_t retired; // already written; always monotonic extern uint64_t committed; // pending write; usually monotonic extern uint64_t uncommitted; // evaluating; not monotonic - extern ctx::dock dock; + static uint pending; const uint64_t &get(const eval &); uint64_t get(id::event::buf &); // [GET] @@ -58,11 +60,13 @@ struct ircd::m::vm::eval :instance_list { static uint64_t id_ctr; + static uint executing; + static uint injecting; + static uint injecting_room; const vm::opts *opts {&default_opts}; const vm::copts *copts {nullptr}; ctx::ctx *ctx {ctx::current}; - uint64_t id {++id_ctr}; uint64_t sequence {0}; uint64_t sequence_shared[2] {0}; // min, max @@ -256,7 +260,7 @@ struct ircd::m::vm::opts /// Extension structure to vm::opts which includes additional options for /// commissioning events originating from this server which are then passed -/// through eval (this process is committing). +/// through eval (this process is also known as issuing). struct ircd::m::vm::copts :opts { diff --git a/ircd/m.cc b/ircd/m.cc index 1d5aec23a..55539fd2e 100644 --- a/ircd/m.cc +++ b/ircd/m.cc @@ -1038,19 +1038,23 @@ ircd::m::feds::head::head(const m::room::id &room_id, // m/vm.h // +decltype(ircd::m::vm::default_opts) +ircd::m::vm::default_opts; + +decltype(ircd::m::vm::default_copts) +ircd::m::vm::default_copts; + decltype(ircd::m::vm::log) ircd::m::vm::log { "vm", 'v' }; -decltype(ircd::m::vm::default_opts) -ircd::m::vm::default_opts -{}; +decltype(ircd::m::vm::dock) +ircd::m::vm::dock; -decltype(ircd::m::vm::default_copts) -ircd::m::vm::default_copts -{}; +decltype(ircd::m::vm::ready) +ircd::m::vm::ready; ircd::string_view ircd::m::vm::loghead(const eval &eval) @@ -1121,8 +1125,16 @@ ircd::util::instance_list::list {}; decltype(ircd::m::vm::eval::id_ctr) -ircd::m::vm::eval::id_ctr -{}; +ircd::m::vm::eval::id_ctr; + +decltype(ircd::m::vm::eval::executing) +ircd::m::vm::eval::executing; + +decltype(ircd::m::vm::eval::injecting) +ircd::m::vm::eval::injecting; + +decltype(ircd::m::vm::eval::injecting_room) +ircd::m::vm::eval::injecting_room; void ircd::m::vm::eval::seqsort() @@ -1394,6 +1406,11 @@ ircd::m::vm::eval::operator()(const room &room, "vm", "ircd::m::vm::inject" }; + vm::dock.wait([] + { + return vm::ready; + }); + return call(*this, room, event, contents); } @@ -1410,6 +1427,11 @@ ircd::m::vm::eval::operator()(json::iov &event, "vm", "ircd::m::vm::inject" }; + vm::dock.wait([] + { + return vm::ready; + }); + return call(*this, event, contents); } @@ -1423,6 +1445,11 @@ ircd::m::vm::eval::operator()(const event &event) "vm", "ircd::m::vm::execute" }; + vm::dock.wait([] + { + return vm::ready; + }); + return call(*this, event); } @@ -1430,6 +1457,9 @@ ircd::m::vm::eval::operator()(const event &event) // sequence // +decltype(ircd::m::vm::sequence::dock) +ircd::m::vm::sequence::dock; + decltype(ircd::m::vm::sequence::retired) ircd::m::vm::sequence::retired; @@ -1439,9 +1469,6 @@ ircd::m::vm::sequence::committed; decltype(ircd::m::vm::sequence::uncommitted) ircd::m::vm::sequence::uncommitted; -decltype(ircd::m::vm::sequence::dock) -ircd::m::vm::sequence::dock; - uint64_t ircd::m::vm::sequence::min() { diff --git a/modules/vm.cc b/modules/vm.cc index 58743a3fc..94acb4a47 100644 --- a/modules/vm.cc +++ b/modules/vm.cc @@ -148,6 +148,10 @@ ircd::m::vm::init() sequence::committed = sequence::retired; sequence::uncommitted = sequence::committed; + //pool.min(size_t(pool_size)); + vm::ready = true; + vm::dock.notify_all(); + log::info { log, "BOOT %s @%lu [%s]", @@ -155,17 +159,32 @@ ircd::m::vm::init() sequence::retired, sequence::retired? string_view{event_id} : "NO EVENTS"_sv }; - - //pool.min(size_t(pool_size)); } void ircd::m::vm::fini() { + vm::ready = false; pool.terminate(); - pool.join(); - assert(eval::list.empty()); + if(!eval::list.empty()) + log::warning + { + log, "Waiting for %zu evals (exec:%zu inject:%zu room:%zu pending:%zu)", + eval::list.size(), + eval::executing, + eval::injecting, + eval::injecting_room, + sequence::pending, + }; + + vm::dock.wait([] + { + return !eval::executing && !eval::injecting && !eval::injecting_room; + }); + + pool.join(); + assert(!sequence::pending); event::id::buf event_id; const auto retired @@ -183,6 +202,8 @@ ircd::m::vm::fini() sequence::committed, sequence::uncommitted }; + + assert(retired == sequence::retired); } // @@ -196,6 +217,10 @@ ircd::m::vm::inject(eval &eval, json::iov &event, const json::iov &contents) { + // m::vm bookkeeping that someone entered this function + const scope_count injecting_room{eval::injecting_room}; + const scope_notify notify{vm::dock}; + // This eval entry point is only used for commits. We try to find the // commit opts the user supplied directly to this eval or with the room. if(!eval.copts) @@ -335,6 +360,10 @@ ircd::m::vm::inject(eval &eval, json::iov &event, const json::iov &contents) { + // m::vm bookkeeping that someone entered this function + const scope_count injecting{eval::injecting}; + const scope_notify notify{vm::dock}; + // This eval entry point is only used for commits. If the user did not // supply commit opts we supply the default ones here. if(!eval.copts) @@ -488,6 +517,10 @@ ircd::m::vm::execute(eval &eval, const event &event) try { + // m::vm bookkeeping that someone entered this function + const scope_count executing{eval::executing}; + const scope_notify notify{vm::dock}; + // Set a member pointer to the event currently being evaluated. This // allows other parallel evals to have deep access to exactly what this // eval is working on. @@ -637,6 +670,11 @@ enum ircd::m::vm::fault ircd::m::vm::execute_pdu(eval &eval, const event &event) { + const scope_count pending + { + sequence::pending + }; + assert(eval.opts); const auto &opts {