0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-10 05:58:56 +02:00

modules/vm: Add sequencing points.

ircd:Ⓜ️:vm: Reorganize sequence counters.
This commit is contained in:
Jason Volk 2019-03-19 11:45:01 -07:00
parent 620e352a4f
commit 5b951b55c5
8 changed files with 323 additions and 157 deletions

View file

@ -23,18 +23,26 @@ namespace ircd::m::vm
using fault_t = std::underlying_type<fault>::type;
extern log::log log;
extern uint64_t current_sequence;
extern uint64_t uncommitted_sequence;
extern const opts default_opts;
extern const copts default_copts;
string_view reflect(const fault &);
http::code http_code(const fault &);
const uint64_t &sequence(const eval &);
uint64_t retired_sequence(id::event::buf &);
uint64_t retired_sequence();
}
namespace ircd::m::vm::sequence
{
extern uint64_t retired; // already written; always monotonic
extern uint64_t committed; // pending write; usually monotonic
extern uint64_t uncommitted; // evaluating; not monotonic
extern ctx::dock dock;
uint64_t get(id::event::buf &); // [GET]
const uint64_t &get(const eval &);
uint64_t max();
uint64_t min();
};
/// Event Evaluation Device
///
/// This object conducts the evaluation of an event or a tape of multiple
@ -46,24 +54,25 @@ namespace ircd::m::vm
struct ircd::m::vm::eval
:instance_list<eval>
{
static uint64_t id_ctr; // monotonic
uint64_t id {++id_ctr};
ctx::ctx *ctx {ctx::current};
static uint64_t id_ctr;
const vm::opts *opts {&default_opts};
const vm::copts *copts {nullptr};
event::conforms report;
ctx::ctx *ctx {ctx::current};
uint64_t id {++id_ctr};
uint64_t sequence {0};
db::txn *txn {nullptr};
string_view room_id;
const json::iov *issue {nullptr};
const event *event_ {nullptr};
json::array pdus;
string_view room_id;
event::id::buf event_id;
event::conforms report;
static bool for_each_pdu(const std::function<bool (const json::object &)> &);
public:
operator const event::id::buf &() const;
@ -86,8 +95,11 @@ struct ircd::m::vm::eval
static bool for_each(const std::function<bool (eval &)> &);
static eval *find(const event::id &);
static eval &get(const event::id &);
static bool for_each_pdu(const std::function<bool (const json::object &)> &);
static bool sequnique(const uint64_t &seq);
static eval *seqnext(const uint64_t &seq);
static eval *seqmax();
static eval *seqmin();
static void seqsort();
};
/// Evaluation faults. These are reasons which evaluation has halted but may

245
ircd/m.cc
View file

@ -567,7 +567,7 @@ ircd::m::sync::loghead(const data &data)
string_view{data.user.user_id},
data.range.first,
data.range.second,
vm::current_sequence,
vm::sequence::retired,
flush_count,
ircd::pretty(iecbuf[1], iec(flush_bytes)),
data.out?
@ -1044,14 +1044,6 @@ ircd::m::vm::log
"vm", 'v'
};
decltype(ircd::m::vm::current_sequence)
ircd::m::vm::current_sequence
{};
decltype(ircd::m::vm::uncommitted_sequence)
ircd::m::vm::uncommitted_sequence
{};
decltype(ircd::m::vm::default_opts)
ircd::m::vm::default_opts
{};
@ -1060,54 +1052,6 @@ decltype(ircd::m::vm::default_copts)
ircd::m::vm::default_copts
{};
const uint64_t &
ircd::m::vm::sequence(const eval &eval)
{
return eval.sequence;
}
uint64_t
ircd::m::vm::retired_sequence()
{
event::id::buf event_id;
return retired_sequence(event_id);
}
uint64_t
ircd::m::vm::retired_sequence(event::id::buf &event_id)
{
static constexpr auto column_idx
{
json::indexof<event, "event_id"_>()
};
auto &column
{
dbs::event_column.at(column_idx)
};
const auto it
{
column.rbegin()
};
if(!it)
{
// If this iterator is invalid the events db should
// be completely fresh.
assert(db::sequence(*dbs::events) == 0);
return 0;
}
const auto &ret
{
byte_view<uint64_t>(it->first)
};
event_id = it->second;
return ret;
}
ircd::http::code
ircd::m::vm::http_code(const fault &code)
{
@ -1158,16 +1102,94 @@ decltype(ircd::m::vm::eval::id_ctr)
ircd::m::vm::eval::id_ctr
{};
bool
ircd::m::vm::eval::for_each_pdu(const std::function<bool (const json::object &)> &closure)
void
ircd::m::vm::eval::seqsort()
{
return for_each([&closure](eval &e)
eval::list.sort([]
(const auto *const &a, const auto *const &b)
{
for(const json::object &pdu : e.pdus)
if(!closure(pdu))
if(sequence::get(*a) == 0)
return false;
if(sequence::get(*b) == 0)
return true;
return sequence::get(*a) < sequence::get(*b);
});
}
ircd::m::vm::eval *
ircd::m::vm::eval::seqmin()
{
const auto it
{
std::min_element(begin(eval::list), end(eval::list), []
(const auto *const &a, const auto *const &b)
{
if(sequence::get(*a) == 0)
return false;
return true;
if(sequence::get(*b) == 0)
return true;
return sequence::get(*a) < sequence::get(*b);
})
};
if(it == end(eval::list))
return nullptr;
if(sequence::get(**it) == 0)
return nullptr;
return *it;
}
ircd::m::vm::eval *
ircd::m::vm::eval::seqmax()
{
const auto it
{
std::max_element(begin(eval::list), end(eval::list), []
(const auto *const &a, const auto *const &b)
{
return sequence::get(*a) < sequence::get(*b);
})
};
if(it == end(eval::list))
return nullptr;
if(sequence::get(**it) == 0)
return nullptr;
return *it;
}
ircd::m::vm::eval *
ircd::m::vm::eval::seqnext(const uint64_t &seq)
{
eval *ret{nullptr};
for(auto *const &eval : eval::list)
{
if(sequence::get(*eval) <= seq)
continue;
if(!ret || sequence::get(*eval) < sequence::get(*ret))
ret = eval;
}
assert(!ret || sequence::get(*ret) > seq);
return ret;
}
bool
ircd::m::vm::eval::sequnique(const uint64_t &seq)
{
return 1 == std::count_if(begin(eval::list), end(eval::list), [&seq]
(const auto *const &eval)
{
return sequence::get(*eval) == seq;
});
}
@ -1283,6 +1305,19 @@ const
return event_id;
}
bool
ircd::m::vm::eval::for_each_pdu(const std::function<bool (const json::object &)> &closure)
{
return for_each([&closure](eval &e)
{
for(const json::object &pdu : e.pdus)
if(!closure(pdu))
return false;
return true;
});
}
///
/// Figure 1:
/// in . <-- injection
@ -1352,6 +1387,86 @@ ircd::m::vm::eval::operator()(const event &event)
return ret;
}
//
// sequence
//
decltype(ircd::m::vm::sequence::retired)
ircd::m::vm::sequence::retired;
decltype(ircd::m::vm::sequence::committed)
ircd::m::vm::sequence::committed;
decltype(ircd::m::vm::sequence::uncommitted)
ircd::m::vm::sequence::uncommitted;
decltype(ircd::m::vm::sequence::dock)
ircd::m::vm::sequence::dock;
uint64_t
ircd::m::vm::sequence::min()
{
const auto *const e
{
eval::seqmin()
};
return e? get(*e) : 0;
}
uint64_t
ircd::m::vm::sequence::max()
{
const auto *const e
{
eval::seqmax()
};
return e? get(*e) : 0;
}
uint64_t
ircd::m::vm::sequence::get(id::event::buf &event_id)
{
static constexpr auto column_idx
{
json::indexof<event, "event_id"_>()
};
auto &column
{
dbs::event_column.at(column_idx)
};
const auto it
{
column.rbegin()
};
if(!it)
{
// If this iterator is invalid the events db should
// be completely fresh.
assert(db::sequence(*dbs::events) == 0);
return 0;
}
const auto &ret
{
byte_view<uint64_t>(it->first)
};
event_id = it->second;
return ret;
}
const uint64_t &
ircd::m::vm::sequence::get(const eval &eval)
{
return eval.sequence;
}
///////////////////////////////////////////////////////////////////////////////
//
// m/keys.h
@ -2113,13 +2228,13 @@ ircd::m::events::for_each(const range &range,
{
ascending?
range.first:
std::min(range.first, vm::current_sequence)
std::min(range.first, vm::sequence::retired)
};
const auto stop
{
ascending?
std::min(range.second, vm::current_sequence + 1):
std::min(range.second, vm::sequence::retired + 1):
range.second
};

View file

@ -315,7 +315,7 @@ get_events_from(client &client,
{
unsigned_, "age", json::value
{
long(m::vm::current_sequence - it.event_idx())
long(m::vm::sequence::retired - it.event_idx())
}
};
}

View file

@ -165,7 +165,7 @@ _initialsync(client &client,
const auto next_batch
{
int64_t(m::vm::current_sequence)
int64_t(m::vm::sequence::retired)
};
// rooms

View file

@ -47,7 +47,7 @@ some threshold it becomes too expensive to scan a huge number of events to
grab only those that the client requires; it is cheaper to conduct a series
of random-access queries with polylog-sync instead. Note the exclusive
upper-bound of a sync is determined either by a non-spec query parameter
'next_batch' or the vm::current_sequence+1.
'next_batch' or the vm::sequence::retired+1.
)"};
@ -139,10 +139,10 @@ ircd::m::sync::handle_get(client &client,
// The range to `/sync`. We involve events starting at the range.first
// index in this sync. We will not involve events with an index equal
// or greater than the range.second. In this case the range.second does not
// exist yet because it is one past the server's current_sequence counter.
// exist yet because it is one past the server's sequence::retired counter.
const m::events::range range
{
args.since, std::min(args.next_batch, m::vm::current_sequence + 1)
args.since, std::min(args.next_batch, m::vm::sequence::retired + 1)
};
// When the range indexes are the same, the client is polling for the next
@ -190,7 +190,7 @@ ircd::m::sync::handle_get(client &client,
const bool should_longpoll
{
range.first > vm::current_sequence
range.first > vm::sequence::retired
};
const bool should_linear
@ -702,7 +702,7 @@ ircd::m::sync::longpoll::handle(data &data,
const auto next
{
data.event_idx?
std::min(data.event_idx + 1, vm::current_sequence + 1):
std::min(data.event_idx + 1, vm::sequence::retired + 1):
data.range.first
};

View file

@ -5632,7 +5632,7 @@ console_cmd__events__dump(opt &out, const string_view &line)
const double pct
{
(seq / double(m::vm::current_sequence)) * 100.0
(seq / double(m::vm::sequence::retired)) * 100.0
};
log::info
@ -5642,7 +5642,7 @@ console_cmd__events__dump(opt &out, const string_view &line)
pct,
'%', //TODO: fix gram
seq,
m::vm::current_sequence,
m::vm::sequence::retired,
ecount,
foff,
acount,
@ -11530,7 +11530,7 @@ bool
console_cmd__vm(opt &out, const string_view &line)
{
out << "sequence: "
<< std::right << std::setw(10) << m::vm::current_sequence
<< std::right << std::setw(10) << m::vm::sequence::retired
<< std::endl;
out << "eval total: "

View file

@ -541,7 +541,7 @@ ircd::m::append(json::stack::object &object,
unsigned_, "age", json::value
{
has_event_idx && opts.age != std::numeric_limits<long>::min()?
long(vm::current_sequence - *opts.event_idx):
long(vm::sequence::retired - *opts.event_idx):
opts.age != std::numeric_limits<long>::min()?
opts.age:
0L
@ -658,7 +658,7 @@ ircd::m::event::refs::rebuild()
m::log, "Refs builder @%zu:%zu of %lu (@idx: %lu)",
i,
j,
m::vm::current_sequence,
m::vm::sequence::retired,
event_idx
};

View file

@ -116,15 +116,16 @@ void
ircd::m::vm::init()
{
id::event::buf event_id;
current_sequence = retired_sequence(event_id);
uncommitted_sequence = current_sequence;
sequence::retired = sequence::get(event_id);
sequence::committed = sequence::retired;
sequence::uncommitted = sequence::committed;
log::info
{
log, "BOOT %s @%lu [%s]",
string_view{m::my_node.node_id},
current_sequence,
current_sequence? string_view{event_id} : "NO EVENTS"_sv
sequence::retired,
sequence::retired? string_view{event_id} : "NO EVENTS"_sv
};
}
@ -133,20 +134,21 @@ ircd::m::vm::fini()
{
assert(eval::list.empty());
id::event::buf event_id;
const auto current_sequence
event::id::buf event_id;
const auto retired
{
retired_sequence(event_id)
sequence::get(event_id)
};
log::info
{
log, "HLT '%s' @%lu [%s] %lu:%lu",
log, "HLT '%s' @%lu [%s] %lu:%lu:%lu",
string_view{m::my_node.node_id},
current_sequence,
current_sequence? string_view{event_id} : "NO EVENTS"_sv,
vm::uncommitted_sequence,
vm::current_sequence,
retired,
retired? string_view{event_id} : "NO EVENTS"_sv,
sequence::retired,
sequence::committed,
sequence::uncommitted
};
}
@ -645,18 +647,96 @@ ircd::m::vm::_eval_pdu(eval &eval,
// Obtain sequence number here
if(opts.write)
eval.sequence = ++vm::uncommitted_sequence;
{
const auto *const &top(eval::seqmax());
eval.sequence = top?
std::max(sequence::get(*top) + 1, sequence::committed + 1):
sequence::committed + 1;
log::debug
{
log, "vm seq %lu[%lu]%lu|%lu|%lu:%lu | acquire",
vm::sequence::max(),
sequence::get(eval),
vm::sequence::uncommitted,
vm::sequence::committed,
vm::sequence::retired,
vm::sequence::min(),
};
assert(eval.sequence != 0);
assert(sequence::uncommitted <= sequence::get(eval));
assert(sequence::committed < sequence::get(eval));
assert(sequence::retired < sequence::get(eval));
assert(eval::sequnique(sequence::get(eval)));
sequence::uncommitted = sequence::get(eval);
}
// Evaluation by module hooks
if(opts.eval)
eval_hook(event, eval);
if(opts.write)
{
log::debug
{
log, "vm seq %lu:%lu[%lu]%lu|%lu:%lu | commit",
vm::sequence::max(),
vm::sequence::uncommitted,
sequence::get(eval),
vm::sequence::committed,
vm::sequence::retired,
vm::sequence::min(),
};
sequence::dock.wait([&eval]
{
return eval::seqnext(sequence::committed) == &eval;
});
assert(sequence::committed < sequence::get(eval));
assert(sequence::retired < sequence::get(eval));
sequence::committed = sequence::get(eval);
sequence::dock.notify_all();
log::debug
{
log, "vm seq %lu:%lu[%lu|%lu]%lu:%lu | write",
vm::sequence::max(),
vm::sequence::uncommitted,
vm::sequence::committed,
sequence::get(eval),
vm::sequence::retired,
vm::sequence::min(),
};
_write(eval, event);
sequence::dock.wait([&eval]
{
return eval::seqnext(sequence::retired) == &eval;
});
assert(sequence::retired < sequence::get(eval));
sequence::retired = sequence::get(eval);
sequence::dock.notify_all();
log::debug
{
log, "vm seq %lu:%lu|%lu[%lu|%lu]%lu | retire",
vm::sequence::max(),
vm::sequence::uncommitted,
vm::sequence::committed,
vm::sequence::retired,
sequence::get(eval),
vm::sequence::min(),
};
}
// pre-notify effect hooks
if(opts.post)
post_hook(event, eval);
if(opts.write)
_write(eval, event);
return fault::ACCEPT;
}
@ -753,69 +833,28 @@ ircd::m::vm::_commit(eval &eval)
auto &txn(*eval.txn);
#ifdef RB_DEBUG
const auto db_seq_before(sequence(*m::dbs::events));
const auto db_seq_before(db::sequence(*m::dbs::events));
#endif
txn();
#ifdef RB_DEBUG
const auto db_seq_after(sequence(*m::dbs::events));
const auto db_seq_after(db::sequence(*m::dbs::events));
#endif
if(log_commit_debug || eval.opts->debuglog_accept)
if(log_commit_debug && !eval.opts->debuglog_accept)
log::debug
{
log, "vm seq %lu:%lu:%lu | db seq %lu:%lu %zu cells in %zu bytes to events database ...",
vm::uncommitted_sequence,
vm::current_sequence,
eval.sequence,
log, "vm seq %lu:%lu|%lu[%lu]%lu:%lu | wrote | db seq %lu:%lu %zu cells in %zu bytes to events database ...",
vm::sequence::max(),
vm::sequence::uncommitted,
vm::sequence::committed,
sequence::get(eval),
vm::sequence::retired,
vm::sequence::min(),
db_seq_before,
db_seq_after,
txn.size(),
txn.bytes()
};
++vm::current_sequence;
}
uint64_t
ircd::m::vm::retired_sequence()
{
event::id::buf event_id;
return retired_sequence(event_id);
}
uint64_t
ircd::m::vm::retired_sequence(event::id::buf &event_id)
{
static constexpr auto column_idx
{
json::indexof<event, "event_id"_>()
};
auto &column
{
dbs::event_column.at(column_idx)
};
const auto it
{
column.rbegin()
};
if(!it)
{
// If this iterator is invalid the events db should
// be completely fresh.
assert(db::sequence(*dbs::events) == 0);
return 0;
}
const auto &ret
{
byte_view<uint64_t>(it->first)
};
event_id = it->second;
return ret;
}