0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-10 14:08:56 +02:00

modules: Rename vm_fetch to s_fetch.

This commit is contained in:
Jason Volk 2018-10-23 09:13:48 -07:00
parent 94fe5090f5
commit c83d17396b
4 changed files with 144 additions and 93 deletions

View file

@ -162,7 +162,6 @@ ircd::m::init::modules::init_imports()
// Manually load first modules
mods::imports.emplace("vm"s, "vm"s);
mods::imports.emplace("vm_fetch"s, "vm_fetch"s);
// The order of these prefixes will be the loading order. Order of
// specific modules within a prefix is not determined here.

View file

@ -47,13 +47,11 @@ moduledir = @moduledir@
webroot_la_SOURCES = webroot.cc
console_la_SOURCES = console.cc
vm_la_SOURCES = vm.cc
vm_fetch_la_SOURCES = vm_fetch.cc
module_LTLIBRARIES = \
webroot.la \
console.la \
vm.la \
vm_fetch.la \
###
###############################################################################
@ -69,6 +67,7 @@ s_dns_la_SOURCES = s_dns.cc s_dns_cache.cc s_dns_resolver.cc
s_node_la_SOURCES = s_node.cc
s_listen_la_SOURCES = s_listen.cc
s_keys_la_SOURCES = s_keys.cc
s_fetch_la_SOURCES = s_fetch.cc
s_module_LTLIBRARIES = \
s_conf.la \
@ -77,6 +76,7 @@ s_module_LTLIBRARIES = \
s_node.la \
s_listen.la \
s_keys.la \
s_fetch.la \
###
###############################################################################

View file

@ -8,45 +8,49 @@
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
using namespace ircd;
#include "s_fetch.h"
#include "vm_fetch.int.h"
ircd::mapi::header
IRCD_MODULE
{
"Event Fetch Unit", ircd::m::fetch::init, ircd::m::fetch::fini
};
//
// init
//
void
_init()
ircd::m::fetch::init()
{
m::vm::fetch::context = ctx::context
context = ctx::context
{
"fetcher",
128_KiB,
std::bind(&m::vm::fetch::worker),
std::bind(&ircd::m::fetch::worker),
ctx::context::POST
};
log::debug
{
m::vm::log, "Fetch unit ready."
vm::log, "Fetch unit ready."
};
}
void
_fini()
ircd::m::fetch::fini()
{
log::debug
{
m::vm::log, "Shutting down fetch unit..."
vm::log, "Shutting down fetch unit..."
};
m::vm::fetch::context.terminate();
m::vm::fetch::context.join();
context.terminate();
context.join();
log::debug
{
m::vm::log, "Fetch unit complete."
vm::log, "Fetch unit complete."
};
}
@ -54,8 +58,8 @@ _fini()
// fetch_phase
//
decltype(m::vm::fetch::hook)
m::vm::fetch::hook
decltype(ircd::m::fetch::hook)
ircd::m::fetch::hook
{
enter,
{
@ -64,8 +68,8 @@ m::vm::fetch::hook
};
void
m::vm::fetch::enter(const event &event,
eval &eval)
ircd::m::fetch::enter(const event &event,
vm::eval &eval)
{
assert(eval.opts);
const auto &opts{*eval.opts};
@ -83,9 +87,10 @@ m::vm::fetch::enter(const event &event,
//m::join(m::room{room_id}, user_id);
}
else if(type != "m.room.create")
throw error
throw vm::error
{
fault::STATE, "Found nothing for room %s", string_view{room_id}
vm::fault::STATE, "Found nothing for room %s",
string_view{room_id}
};
}
@ -106,9 +111,8 @@ m::vm::fetch::enter(const event &event,
for(size_t i(0); i < prev_count; ++i)
{
const auto &prev_id{prev.prev_event(i)};
const auto &prev_id(prev.prev_event(i));
if(!exists(prev_id))
{
log::warning
{
log, "Missing prev %s in %s in %s",
@ -116,12 +120,33 @@ m::vm::fetch::enter(const event &event,
json::get<"event_id"_>(*eval.event_),
json::get<"room_id"_>(*eval.event_)
};
}
if(eval.opts->prev_check_exists && !exists(prev_id))
throw error
if(!eval.opts->prev_check_exists || exists(prev_id))
continue;
auto &request
{
fetch(json::get<"room_id"_>(*eval.event_), prev_id)
};
++request.refcnt;
request.dock.wait([&request]
{
return request.eval;
});
const unwind uw{[&request]
{
if(!--request.refcnt)
fetching.erase(request);
}};
if(request.eptr)
throw vm::error
{
fault::EVENT, "Missing prev event %s", prev_id
vm::fault::EVENT, "Missing prev event %s (%s)",
prev_id,
what(request.eptr)
};
}
}
@ -130,9 +155,9 @@ m::vm::fetch::enter(const event &event,
// API interface
//
m::vm::fetch::request &
m::vm::fetch::fetch(const m::room::id &room_id,
const m::event::id &event_id)
ircd::m::fetch::request &
ircd::m::fetch::fetch(const m::room::id &room_id,
const m::event::id &event_id)
{
auto it
{
@ -158,28 +183,45 @@ m::vm::fetch::fetch(const m::room::id &room_id,
// fetcher
//
decltype(m::vm::fetch::dock)
m::vm::fetch::dock;
decltype(ircd::m::fetch::dock)
ircd::m::fetch::dock;
decltype(m::vm::fetch::fetching)
m::vm::fetch::fetching;
decltype(ircd::m::fetch::fetching)
ircd::m::fetch::fetching;
decltype(ircd::m::fetch::completed)
ircd::m::fetch::completed;
/// The fetch context is an internal worker which drives the fetch process
/// and then indicates completion to anybody waiting on a fetch. This involves
/// handling errors/timeouts from a fetch attempt and retrying with another
/// server etc.
///
decltype(m::vm::fetch::context)
m::vm::fetch::context;
decltype(ircd::m::fetch::context)
ircd::m::fetch::context;
void
ircd::m::vm::fetch::worker()
ircd::m::fetch::worker()
try
{
while(1)
{
dock.wait(requesting);
handle();
if(!handle())
continue;
while(!completed.empty())
{
auto *const &request
{
completed.front()
};
completed.pop_front();
request->eval = true;
std::cout << "evals " << request->event_id << " ref " << request->refcnt << std::endl;
request->dock.notify_all();
}
}
}
catch(const std::exception &e)
@ -191,7 +233,7 @@ catch(const std::exception &e)
}
bool
ircd::m::vm::fetch::handle()
ircd::m::fetch::handle()
try
{
auto next
@ -200,7 +242,7 @@ try
};
if(!next.wait(seconds(2), std::nothrow))
return true;
return false;
const auto it
{
@ -208,21 +250,26 @@ try
};
if(it == end(fetching))
{
std::cout << "got nil" << std::endl;
return false;
}
auto &request
{
const_cast<struct request &>(*it)
};
if(request.finished || request.eptr)
return true;
if(!request.finished && !request.eptr)
if(!request.handle())
return false;
request.handle();
return true;
assert(request.finished || request.eptr);
if(!request.completed)
{
completed.emplace_back(&request);
request.completed = true;
return true;
}
return false;
}
catch(const std::exception &e)
{
@ -231,11 +278,11 @@ catch(const std::exception &e)
"Fetch worker :%s", e.what()
};
return true;
return false;
}
bool
ircd::m::vm::fetch::requesting()
ircd::m::fetch::requesting()
{
return std::any_of(begin(fetching), end(fetching), []
(const request &request)
@ -248,9 +295,9 @@ ircd::m::vm::fetch::requesting()
// fetch::request
//
ircd::m::vm::fetch::request::request(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer &buf)
ircd::m::fetch::request::request(const m::room::id &room_id,
const m::event::id &event_id,
const mutable_buffer &buf)
:room_id{room_id}
,event_id{event_id}
,_buf
@ -270,7 +317,7 @@ ircd::m::vm::fetch::request::request(const m::room::id &room_id,
}
void
ircd::m::vm::fetch::request::start()
ircd::m::fetch::request::start()
{
m::v1::event::opts opts;
opts.dynamic = false;
@ -279,7 +326,7 @@ ircd::m::vm::fetch::request::start()
}
void
ircd::m::vm::fetch::request::start(m::v1::event::opts &&opts)
ircd::m::fetch::request::start(m::v1::event::opts &&opts)
{
if(!started)
started = ircd::time();
@ -292,7 +339,7 @@ ircd::m::vm::fetch::request::start(m::v1::event::opts &&opts)
}
ircd::string_view
ircd::m::vm::fetch::request::select_random_origin()
ircd::m::fetch::request::select_random_origin()
{
const m::room::origins origins
{
@ -337,7 +384,7 @@ ircd::m::vm::fetch::request::select_random_origin()
}
ircd::string_view
ircd::m::vm::fetch::request::select_origin(const string_view &origin)
ircd::m::fetch::request::select_origin(const string_view &origin)
{
const auto iit
{
@ -348,8 +395,8 @@ ircd::m::vm::fetch::request::select_origin(const string_view &origin)
return this->origin;
}
void
ircd::m::vm::fetch::request::handle()
bool
ircd::m::fetch::request::handle()
{
auto &future
{
@ -369,10 +416,12 @@ ircd::m::vm::fetch::request::handle()
finish();
else
retry();
return finished;
}
void
ircd::m::vm::fetch::request::retry()
ircd::m::fetch::request::retry()
try
{
eptr = std::exception_ptr{};
@ -386,46 +435,54 @@ catch(...)
}
void
ircd::m::vm::fetch::request::finish()
ircd::m::fetch::request::finish()
{
finished = ircd::time();
dock.notify_all();
}
bool
ircd::m::vm::fetch::request::operator()(const request &a,
const request &b)
ircd::m::fetch::request::operator()(const request &a,
const request &b)
const
{
return a.event_id < b.event_id;
}
bool
ircd::m::vm::fetch::request::operator()(const request &a,
const string_view &b)
ircd::m::fetch::request::operator()(const request &a,
const string_view &b)
const
{
return a.event_id < b;
}
bool
ircd::m::vm::fetch::request::operator()(const string_view &a,
const request &b)
ircd::m::fetch::request::operator()(const string_view &a,
const request &b)
const
{
return a < b.event_id;
}
//
//
//
namespace ircd::m::fetch
{
extern "C" void auth_chain_fetch(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote,
const milliseconds &timeout,
const std::function<bool (const m::event &)> &closure);
extern "C" void auth_chain_eval(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote);
}
extern "C" void
auth_chain_fetch(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote,
const milliseconds &timeout,
const std::function<bool (const m::event &)> &closure)
ircd::m::fetch::auth_chain_fetch(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote,
const milliseconds &timeout,
const std::function<bool (const m::event &)> &closure)
{
m::v1::event_auth::opts opts;
opts.remote = remote;
@ -462,9 +519,9 @@ auth_chain_fetch(const m::room::id &room_id,
}
extern "C" void
auth_chain_eval(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote)
ircd::m::fetch::auth_chain_eval(const m::room::id &room_id,
const m::event::id &event_id,
const net::hostport &remote)
{
m::vm::opts opts;
opts.non_conform.set(m::event::conforms::MISSING_PREV_STATE);

View file

@ -8,25 +8,15 @@
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
using namespace ircd;
static void _init();
static void _fini();
mapi::header
IRCD_MODULE
{
"Matrix Virtual Machine: Fetch Unit", _init, _fini
};
// Fetch unit state
namespace ircd::m::vm::fetch
namespace ircd::m::fetch
{
struct request;
extern ctx::dock dock;
extern std::set<request, request> fetching;
extern hookfn<eval &> hook;
extern std::deque<request *> completed;
extern hookfn<vm::eval &> hook;
extern ctx::context context;
// worker stack
@ -35,12 +25,14 @@ namespace ircd::m::vm::fetch
static void worker();
static request &fetch(const m::room::id &, const m::event::id &);
static void enter(const event &, vm::eval &);
static void init();
static void fini();
}
/// Fetch entity state
struct ircd::m::vm::fetch::request
struct ircd::m::fetch::request
:m::v1::event
{
using is_transparent = void;
@ -56,6 +48,9 @@ struct ircd::m::vm::fetch::request
time_t finished {0};
std::exception_ptr eptr;
ctx::dock dock;
bool eval {false};
bool completed {false};
size_t refcnt {0};
bool operator()(const request &a, const request &b) const;
bool operator()(const request &a, const string_view &b) const;
@ -63,7 +58,7 @@ struct ircd::m::vm::fetch::request
void finish();
void retry();
void handle();
bool handle();
string_view select_origin(const string_view &);
string_view select_random_origin();