0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 10:12:39 +01:00

modules: Checkpoint vm_fetch fwiw (disabled from active eval codepaths).

This commit is contained in:
Jason Volk 2018-09-04 23:27:01 -07:00
parent 4251a55e53
commit 6dc3f4044a
4 changed files with 738 additions and 73 deletions

View file

@ -34,6 +34,8 @@ namespace ircd::m::vm
const uint64_t &sequence(const eval &); const uint64_t &sequence(const eval &);
uint64_t retired_sequence(id::event::buf &); uint64_t retired_sequence(id::event::buf &);
uint64_t retired_sequence(); uint64_t retired_sequence();
ctx::future<> evaluated(const event::id &);
} }
/// Event Evaluation Device /// Event Evaluation Device

View file

@ -383,6 +383,12 @@ ircd::m::feds::state::state(const m::room::id &room_id,
// m/vm.h // m/vm.h
// //
namespace ircd::m::vm
{
extern std::map<event::id::buf, ctx::promise<>> depends;
static void notify_depends(const event::id &, std::exception_ptr);
}
decltype(ircd::m::vm::log) decltype(ircd::m::vm::log)
ircd::m::vm::log ircd::m::vm::log
{ {
@ -405,6 +411,115 @@ decltype(ircd::m::vm::default_copts)
ircd::m::vm::default_copts ircd::m::vm::default_copts
{}; {};
decltype(ircd::m::vm::depends)
ircd::m::vm::depends
{};
void
ircd::m::vm::notify_depends(const event::id &event_id,
std::exception_ptr eptr)
{
const auto it
{
depends.find(event_id)
};
if(it == end(depends))
return;
auto &promise(it->second);
if(eptr)
promise.set_exception(std::move(eptr));
else
promise.set_value();
depends.erase(it);
}
ircd::ctx::future<>
ircd::m::vm::evaluated(const event::id &event_id)
{
if(exists(event_id))
return ctx::future<>::already;
const auto iit
{
depends.emplace(event_id, ctx::promise<>{})
};
return ctx::future<>
{
iit.first->second
};
}
const uint64_t &
ircd::m::vm::sequence(const eval &eval)
{
return eval.sequence;
}
uint64_t
ircd::m::vm::retired_sequence()
{
event::id::buf event_id;
return retired_sequence(event_id);
}
uint64_t
ircd::m::vm::retired_sequence(event::id::buf &event_id)
{
static constexpr auto column_idx
{
json::indexof<event, "event_id"_>()
};
auto &column
{
dbs::event_column.at(column_idx)
};
const auto it
{
column.rbegin()
};
if(!it)
{
// If this iterator is invalid the events db should
// be completely fresh.
assert(db::sequence(*dbs::events) == 0);
return 0;
}
const auto &ret
{
byte_view<uint64_t>(it->first)
};
event_id = it->second;
return ret;
}
ircd::string_view
ircd::m::vm::reflect(const enum fault &code)
{
switch(code)
{
case fault::ACCEPT: return "ACCEPT";
case fault::EXISTS: return "EXISTS";
case fault::INVALID: return "INVALID";
case fault::DEBUGSTEP: return "DEBUGSTEP";
case fault::BREAKPOINT: return "BREAKPOINT";
case fault::GENERAL: return "GENERAL";
case fault::EVENT: return "EVENT";
case fault::STATE: return "STATE";
case fault::INTERRUPT: return "INTERRUPT";
}
return "??????";
}
// //
// Eval // Eval
// //
@ -525,6 +640,7 @@ ircd::m::vm::eval::operator()(json::iov &event,
enum ircd::m::vm::fault enum ircd::m::vm::fault
ircd::m::vm::eval::operator()(const event &event) ircd::m::vm::eval::operator()(const event &event)
try
{ {
using prototype = fault (eval &, const m::event &); using prototype = fault (eval &, const m::event &);
@ -533,74 +649,36 @@ ircd::m::vm::eval::operator()(const event &event)
"vm", "eval__event" "vm", "eval__event"
}; };
return function(*this, event); const vm::fault ret
}
const uint64_t &
ircd::m::vm::sequence(const eval &eval)
{
return eval.sequence;
}
uint64_t
ircd::m::vm::retired_sequence()
{
event::id::buf event_id;
return retired_sequence(event_id);
}
uint64_t
ircd::m::vm::retired_sequence(event::id::buf &event_id)
{
static constexpr auto column_idx
{ {
json::indexof<event, "event_id"_>() function(*this, event)
}; };
auto &column if(json::get<"event_id"_>(event)) switch(ret)
{ {
dbs::event_column.at(column_idx) case fault::ACCEPT:
}; notify_depends(at<"event_id"_>(event), std::exception_ptr{});
break;
const auto it default:
{ {
column.rbegin() notify_depends(at<"event_id"_>(event), std::make_exception_ptr(error
}; {
ret, "fault"
}));
if(!it) break;
{ }
// If this iterator is invalid the events db should
// be completely fresh.
assert(db::sequence(*dbs::events) == 0);
return 0;
} }
const auto &ret
{
byte_view<uint64_t>(it->first)
};
event_id = it->second;
return ret; return ret;
} }
catch(...)
ircd::string_view
ircd::m::vm::reflect(const enum fault &code)
{ {
switch(code) if(json::get<"event_id"_>(event))
{ notify_depends(at<"event_id"_>(event), std::current_exception());
case fault::ACCEPT: return "ACCEPT";
case fault::EXISTS: return "EXISTS";
case fault::INVALID: return "INVALID";
case fault::DEBUGSTEP: return "DEBUGSTEP";
case fault::BREAKPOINT: return "BREAKPOINT";
case fault::GENERAL: return "GENERAL";
case fault::EVENT: return "EVENT";
case fault::STATE: return "STATE";
case fault::INTERRUPT: return "INTERRUPT";
}
return "??????"; throw;
} }
// //

View file

@ -10,32 +10,531 @@
using namespace ircd; using namespace ircd;
m::vm::phase #include "vm_fetch.int.h"
fetch_phase
{
"fetch"
};
void init()
void fini();
mapi::header
IRCD_MODULE
{
"Matrix Virtual Machine: fetch unit",
init, fini
};
// //
// init // init
// //
void void
init() _init()
{ {
m::vm::fetch::context = ctx::context
{
"fetcher",
128_KiB,
std::bind(&m::vm::fetch::worker),
ctx::context::POST
};
log::debug
{
m::vm::log, "Fetch unit ready."
};
} }
void void
fini() _fini()
{ {
log::debug
{
m::vm::log, "Shutting down fetch unit..."
};
m::vm::fetch::context.terminate();
m::vm::fetch::context.join();
log::debug
{
m::vm::log, "Fetch unit complete."
};
}
//
// fetch_phase
//
decltype(m::vm::fetch::phase)
m::vm::fetch::phase
{
"fetch", enter
};
void
m::vm::fetch::enter(eval &eval)
{
assert(eval.event_);
assert(eval.opts);
const event::prev prev
{
*eval.event_
};
const auto &prev_events
{
json::get<"prev_events"_>(prev)
};
const size_t &prev_count
{
size(prev_events)
};
for(size_t i(0); i < prev_count; ++i)
{
const auto &prev_id{prev.prev_event(i)};
if(!exists(prev_id))
{
log::warning
{
log, "Missing prev %s in %s in %s",
string_view{prev_id},
json::get<"event_id"_>(*eval.event_),
json::get<"room_id"_>(*eval.event_)
};
/*
auto future
{
vm::evaluated(prev_id)
};
*/
//future.wait();
// std::cout << "got " << prev_id << " for " << json::get<"event_id"_>(*eval.event_) << std::endl;
}
if(eval.opts->prev_check_exists && !exists(prev_id))
throw error
{
fault::EVENT, "Missing prev event %s", prev_id
};
}
}
//
// API interface
//
json::object
m::vm::fetch::acquire(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer &buf)
{
auto &request
{
_fetch(room_id, event_id)
};
request.dock.wait([&request]
{
return request.finished;
});
const unwind _remove_{[&event_id]
{
remove(event_id);
}};
if(request.eptr)
std::rethrow_exception(request.eptr);
const json::object &event
{
request
};
return json::object
{
data(buf), copy(buf, event)
};
}
bool
m::vm::fetch::prefetch(const m::room::id &room_id,
const m::event::id &event_id)
{
_fetch(room_id, event_id);
return true;
}
bool
ircd::m::vm::fetch::remove(const m::event::id &event_id)
{
const auto it
{
fetching.find(event_id)
};
if(it == end(fetching))
return false;
const request &r{*it};
if(!r.dock.empty())
return false;
for(auto it(begin(fetched)); it != end(fetched); ++it)
if(*it == &r)
{
fetched.erase(it);
break;
}
fetching.erase(it);
return true;
}
m::vm::fetch::request &
m::vm::fetch::_fetch(const m::room::id &room_id,
const m::event::id &event_id)
{
auto it
{
fetching.lower_bound(event_id)
};
if(it == end(fetching) || it->event_id != event_id)
it = fetching.emplace_hint(it, room_id, event_id);
else
return const_cast<struct request &>(*it);
auto &request
{
const_cast<struct request &>(*it)
};
request.start();
dock.notify_all();
return request;
}
//
// fetcher
//
decltype(m::vm::fetch::dock)
m::vm::fetch::dock;
decltype(m::vm::fetch::fetching)
m::vm::fetch::fetching;
decltype(m::vm::fetch::fetched)
m::vm::fetch::fetched;
/// The fetch context is an internal worker which drives the fetch process
/// and then indicates completion to anybody waiting on a fetch. This involves
/// handling errors/timeouts from a fetch attempt and retrying with another
/// server etc.
///
decltype(m::vm::fetch::context)
m::vm::fetch::context;
void
ircd::m::vm::fetch::worker()
try
{
while(1)
{
dock.wait(requesting);
handle();
}
}
catch(const std::exception &e)
{
log::critical
{
"Fetch worker :%s", e.what()
};
}
bool
ircd::m::vm::fetch::handle()
try
{
auto next
{
ctx::when_any(begin(fetching), end(fetching))
};
if(!next.wait(seconds(2), std::nothrow))
return true;
const auto it
{
next.get()
};
if(it == end(fetching))
{
std::cout << "got nil" << std::endl;
return false;
}
auto &request
{
const_cast<struct request &>(*it)
};
if(request.finished || request.eptr)
return true;
request.handle();
return true;
}
catch(const std::exception &e)
{
log::error
{
"Fetch worker :%s", e.what()
};
return true;
}
bool
ircd::m::vm::fetch::requesting()
{
return std::any_of(begin(fetching), end(fetching), []
(const request &request)
{
return !request.finished;
});
}
//
// fetch::request
//
ircd::m::vm::fetch::request::request(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer &buf)
:room_id{room_id}
,event_id{event_id}
,_buf
{
!buf?
unique_buffer<mutable_buffer>{96_KiB}:
unique_buffer<mutable_buffer>{}
}
,buf
{
empty(buf)? _buf: buf
}
{
// Ensure buffer has enough room for a worst-case event, request
// headers and response headers.
assert(size(this->buf) >= 64_KiB + 8_KiB + 8_KiB);
}
void
ircd::m::vm::fetch::request::start()
{
m::v1::event::opts opts;
opts.dynamic = false;
opts.remote = origin?: select_random_origin();
start(std::move(opts));
}
void
ircd::m::vm::fetch::request::start(m::v1::event::opts &&opts)
{
if(!started)
started = ircd::time();
last = ircd::time();
static_cast<m::v1::event &>(*this) =
{
this->event_id, this->buf, std::move(opts)
};
}
ircd::string_view
ircd::m::vm::fetch::request::select_random_origin()
{
const m::room::origins origins
{
room_id
};
// copies randomly selected origin into the attempted set.
const auto closure{[this]
(const string_view &origin)
{
this->select_origin(origin);
}};
// Tests if origin is potentially viable
const auto proffer{[this]
(const string_view &origin)
{
// Don't want to use a peer we already tried and failed with.
if(attempted.count(origin))
return false;
// Don't want to use a peer marked with an error by ircd::server
if(ircd::server::errmsg(origin))
return false;
return true;
}};
if(!origins.random(closure, proffer))
throw m::NOT_FOUND
{
"Cannot find any server to fetch %s in %s",
string_view{event_id},
string_view{room_id},
};
return this->origin;
}
ircd::string_view
ircd::m::vm::fetch::request::select_origin(const string_view &origin)
{
const auto iit
{
attempted.emplace(std::string{origin})
};
this->origin = *iit.first;
return this->origin;
}
void
ircd::m::vm::fetch::request::handle()
{
auto &future
{
static_cast<m::v1::event &>(*this)
};
future.wait(); try
{
future.get();
}
catch(...)
{
eptr = std::current_exception();
}
if(!eptr)
finish();
else
retry();
}
void
ircd::m::vm::fetch::request::retry()
try
{
eptr = std::exception_ptr{};
origin = {};
start();
}
catch(...)
{
eptr = std::current_exception();
finish();
}
void
ircd::m::vm::fetch::request::finish()
{
finished = ircd::time();
fetched.emplace_back(this);
dock.notify_all();
}
bool
ircd::m::vm::fetch::request::operator()(const request &a,
const request &b)
const
{
return a.event_id < b.event_id;
}
bool
ircd::m::vm::fetch::request::operator()(const request &a,
const string_view &b)
const
{
return a.event_id < b;
}
bool
ircd::m::vm::fetch::request::operator()(const string_view &a,
const request &b)
const
{
return a < b.event_id;
}
//
//
//
extern "C" void
auth_chain_fetch(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote,
const milliseconds &timeout,
const std::function<bool (const m::event &)> &closure)
{
m::v1::event_auth::opts opts;
opts.remote = remote;
opts.dynamic = true;
const unique_buffer<mutable_buffer> buf
{
16_KiB
};
m::v1::event_auth request
{
room_id, event_id, buf, std::move(opts)
};
request.wait(timeout);
request.get();
const json::array &auth_chain
{
request
};
std::vector<m::event> events{auth_chain.count()};
std::transform(begin(auth_chain), end(auth_chain), begin(events), []
(const json::object &pdu)
{
return m::event{pdu};
});
std::sort(begin(events), end(events));
for(const auto &event : events)
if(!closure(event))
return;
}
extern "C" void
auth_chain_eval(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote)
{
m::vm::opts opts;
opts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);
opts.non_conform.set(m::event::conforms::MISSING_MEMBERSHIP);
opts.infolog_accept = true;
opts.warnlog |= m::vm::fault::STATE;
opts.warnlog &= ~m::vm::fault::EXISTS;
opts.errorlog &= ~m::vm::fault::STATE;
auth_chain_fetch(room_id, event_id, remote, seconds(30), [&opts]
(const auto &event)
{
m::vm::eval
{
event, opts
};
return true;
});
} }

86
modules/vm_fetch.int.h Normal file
View file

@ -0,0 +1,86 @@
// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
using namespace ircd;
static void _init();
static void _fini();
mapi::header
IRCD_MODULE
{
"Matrix Virtual Machine: Fetch Unit", _init, _fini
};
// Fetch unit state
namespace ircd::m::vm::fetch
{
struct request;
extern ctx::dock dock;
extern std::set<request, request> fetching;
extern std::deque<request *> fetched;
extern ctx::context context;
extern "C" vm::phase phase;
// worker stack
static bool requesting();
static bool handle();
static void worker();
// interface stack
static request &_fetch(const m::room::id &, const m::event::id &);
static bool remove(const m::event::id &);
extern "C" bool prefetch(const m::room::id &, const m::event::id &);
extern "C" json::object acquire(const m::room::id &, const m::event::id &, const mutable_buffer &);
// phase stack
static void enter(m::vm::eval &);
}
/// Fetch entity state
struct ircd::m::vm::fetch::request
:m::v1::event
{
using is_transparent = void;
m::room::id::buf room_id;
m::event::id::buf event_id;
unique_buffer<mutable_buffer> _buf;
mutable_buffer buf;
std::set<std::string, std::less<>> attempted;
string_view origin;
time_t started {0};
time_t last {0};
time_t finished {0};
std::exception_ptr eptr;
ctx::dock dock;
bool operator()(const request &a, const request &b) const;
bool operator()(const request &a, const string_view &b) const;
bool operator()(const string_view &a, const request &b) const;
void finish();
void retry();
void handle();
string_view select_origin(const string_view &);
string_view select_random_origin();
void start(m::v1::event::opts &&);
void start();
request(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer & = {});
request() = default;
request(request &&) = delete;
request(const request &) = delete;
};