0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-26 00:32:35 +01:00

ircd:Ⓜ️:vm: Integrate sequence counting.

This commit is contained in:
Jason Volk 2018-04-16 14:21:54 -07:00
parent d5087f4e8f
commit b320d8ece1
3 changed files with 119 additions and 7 deletions

View file

@ -85,6 +85,7 @@ struct ircd::m::init
json::object config;
keys::init _keys;
dbs::init _dbs;
vm::init _vm;
std::unique_ptr<modules> modules;
std::unique_ptr<listeners> listeners;

View file

@ -15,11 +15,12 @@
///
namespace ircd::m::vm
{
enum fault :uint;
struct init;
struct error; // custom exception
struct opts;
struct eval;
struct accepted;
enum fault :uint;
using fault_t = std::underlying_type<fault>::type;
using closure = std::function<void (const event &)>;
@ -29,8 +30,16 @@ namespace ircd::m::vm
extern uint64_t current_sequence;
extern ctx::shared_view<accepted> accept;
extern const opts default_opts;
uint64_t sequence(const eval &);
}
struct ircd::m::vm::init
{
init();
~init() noexcept;
};
/// Event Evaluation Device
///
/// This object conducts the evaluation of an event or a tape of multiple

View file

@ -10,6 +10,12 @@
#include <ircd/m/m.h>
namespace ircd::m::vm
{
uint64_t last_sequence(id::event::buf &);
uint64_t last_sequence();
}
decltype(ircd::m::vm::log)
ircd::m::vm::log
{
@ -32,6 +38,37 @@ decltype(ircd::m::vm::default_commit_opts)
ircd::m::vm::default_commit_opts
{};
//
// init
//
ircd::m::vm::init::init()
{
id::event::buf event_id;
current_sequence = last_sequence(event_id);
log.info("Initializing from vm sequence %lu [%s]",
current_sequence,
current_sequence? string_view{event_id} : "NO EVENTS"_sv);
}
ircd::m::vm::init::~init()
{
id::event::buf event_id;
const auto current_sequence
{
last_sequence(event_id)
};
log.info("Shutting down @ %lu [%s]",
current_sequence,
current_sequence? string_view{event_id} : "NO EVENTS"_sv);
}
//
// commit
//
/// This function takes an event object vector and adds our origin and event_id
/// and hashes and signature and attempts to inject the event into the core.
///
@ -164,7 +201,7 @@ ircd::m::vm::commit(const event &event,
const opts::commit &opts)
{
if(opts.debuglog_precommit)
log.debug("injecting event(mark: %ld) %s",
log.debug("injecting event(mark +%ld) %s",
vm::current_sequence,
pretty_oneline(event));
@ -189,9 +226,10 @@ ircd::m::vm::commit(const event &event,
eval(event)
};
if(opts.infolog_postcommit)
log.info("@%lu %s",
vm::current_sequence,
if(opts_.infolog_postcommit)
log.info("%s @%lu %s",
reflect(ret),
sequence(eval),
pretty_oneline(event, false));
return ret;
@ -395,6 +433,22 @@ ircd::m::vm::_eval_pdu(eval &eval,
"Signature verification failed"
};
// Obtain sequence number here
const uint64_t sequence_number
{
++vm::current_sequence
};
db::txn::append
{
eval.txn, dbs::event_seq,
{
db::op::SET,
byte_view<string_view>(sequence_number),
event_id
}
};
eval_hook(event);
const auto &depth
@ -480,8 +534,56 @@ ircd::m::vm::write(eval &eval)
eval.txn.bytes());
eval.txn();
vm::current_sequence++;
eval.txn.clear();
}
uint64_t
ircd::m::vm::sequence(const eval &eval)
{
uint64_t ret;
eval.txn.at(db::op::SET, "_event_seq", [&ret]
(const auto &delta)
{
const byte_view<uint64_t> seqnum
{
std::get<delta.KEY>(delta)
};
ret = seqnum;
});
return ret;
}
uint64_t
ircd::m::vm::last_sequence()
{
id::event::buf event_id;
return last_sequence(event_id);
}
uint64_t
ircd::m::vm::last_sequence(id::event::buf &event_id)
{
auto &column
{
dbs::event_seq
};
const auto it
{
column.rbegin()
};
if(!it)
{
// If this iterator is invalid the events db should
// be completely fresh.
assert(db::sequence(*dbs::events) == 0);
return 0;
}
event_id = it->second;
return byte_view<uint64_t>(it->first);
}
ircd::string_view