0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2025-01-14 16:46:50 +01:00

modules/vm: Share transactions between all evals on the same stack; adjust sequence strategy.

This commit is contained in:
Jason Volk 2019-03-21 14:13:45 -07:00
parent a7f6549c27
commit 82378db816
3 changed files with 120 additions and 140 deletions

View file

@ -65,7 +65,8 @@ struct ircd::m::vm::eval
uint64_t id {++id_ctr}; uint64_t id {++id_ctr};
uint64_t sequence {0}; uint64_t sequence {0};
db::txn *txn {nullptr}; uint64_t sequence_shared[2] {0}; // min, max
std::shared_ptr<db::txn> txn;
const json::iov *issue {nullptr}; const json::iov *issue {nullptr};
const event *event_ {nullptr}; const event *event_ {nullptr};

View file

@ -1065,13 +1065,12 @@ ircd::m::vm::loghead(const mutable_buffer &buf,
{ {
return fmt::sprintf return fmt::sprintf
{ {
buf, "vm[%lu:%lu:%lu] eval[%lu] id:%lu txn:%p", buf, "vm[%lu:%lu:%lu] eval[%lu] id:%lu",
sequence::uncommitted, sequence::uncommitted,
sequence::committed, sequence::committed,
sequence::retired, sequence::retired,
sequence::get(eval), sequence::get(eval),
eval.id, eval.id,
(const void *)txn
}; };
} }

View file

@ -10,8 +10,9 @@
namespace ircd::m::vm namespace ircd::m::vm
{ {
static void _commit(eval &); static void write_commit(eval &);
static void _write(eval &, const event &); static void write_append(eval &, const event &);
static void write_prepare(eval &, const event &);
static fault _eval_edu(eval &, const event &); static fault _eval_edu(eval &, const event &);
static fault _eval_pdu(eval &, const event &); static fault _eval_pdu(eval &, const event &);
@ -142,8 +143,6 @@ ircd::m::vm::pool
void void
ircd::m::vm::init() ircd::m::vm::init()
{ {
pool.min(size_t(pool_size));
id::event::buf event_id; id::event::buf event_id;
sequence::retired = sequence::get(event_id); sequence::retired = sequence::get(event_id);
sequence::committed = sequence::retired; sequence::committed = sequence::retired;
@ -156,6 +155,8 @@ ircd::m::vm::init()
sequence::retired, sequence::retired,
sequence::retired? string_view{event_id} : "NO EVENTS"_sv sequence::retired? string_view{event_id} : "NO EVENTS"_sv
}; };
//pool.min(size_t(pool_size));
} }
void void
@ -677,165 +678,148 @@ ircd::m::vm::_eval_pdu(eval &eval,
if(opts.fetch) if(opts.fetch)
fetch_hook(event, eval); fetch_hook(event, eval);
// Obtain sequence number here // Obtain sequence number here.
if(opts.write) const auto *const &top(eval::seqmax());
eval.sequence_shared[0] = 0;
eval.sequence_shared[1] = 0;
eval.sequence =
{ {
const auto *const &top(eval::seqmax()); top?
eval.sequence = top?
std::max(sequence::get(*top) + 1, sequence::committed + 1): std::max(sequence::get(*top) + 1, sequence::committed + 1):
sequence::committed + 1; sequence::committed + 1
};
log::debug log::debug
{ {
log, "vm seq %lu[%lu]%lu|%lu|%lu:%lu | acquire", log, "%s | acquire", loghead(eval)
vm::sequence::max(), };
sequence::get(eval),
vm::sequence::uncommitted,
vm::sequence::committed,
vm::sequence::retired,
vm::sequence::min(),
};
assert(eval.sequence != 0); assert(eval.sequence != 0);
assert(sequence::uncommitted <= sequence::get(eval)); assert(sequence::uncommitted <= sequence::get(eval));
assert(sequence::committed < sequence::get(eval)); assert(sequence::committed < sequence::get(eval));
assert(sequence::retired < sequence::get(eval)); assert(sequence::retired < sequence::get(eval));
assert(eval::sequnique(sequence::get(eval))); assert(eval::sequnique(sequence::get(eval)));
sequence::uncommitted = sequence::get(eval); sequence::uncommitted = sequence::get(eval);
}
// Evaluation by module hooks // Evaluation by module hooks
if(opts.eval) if(opts.eval)
eval_hook(event, eval); eval_hook(event, eval);
if(opts.write) // Wait until this is the lowest sequence number
sequence::dock.wait([&eval]
{ {
log::debug return eval::seqnext(sequence::committed) == &eval;
{ });
log, "vm seq %lu:%lu[%lu]%lu|%lu:%lu | commit",
vm::sequence::max(),
vm::sequence::uncommitted,
sequence::get(eval),
vm::sequence::committed,
vm::sequence::retired,
vm::sequence::min(),
};
sequence::dock.wait([&eval] log::debug
{ {
return eval::seqnext(sequence::committed) == &eval; log, "%s | commit", loghead(eval)
}); };
assert(sequence::committed < sequence::get(eval)); assert(sequence::committed < sequence::get(eval));
assert(sequence::retired < sequence::get(eval)); assert(sequence::retired < sequence::get(eval));
sequence::committed = sequence::get(eval); sequence::committed = sequence::get(eval);
sequence::dock.notify_all(); sequence::dock.notify_all();
log::debug
{
log, "vm seq %lu:%lu[%lu|%lu]%lu:%lu | write",
vm::sequence::max(),
vm::sequence::uncommitted,
vm::sequence::committed,
sequence::get(eval),
vm::sequence::retired,
vm::sequence::min(),
};
_write(eval, event);
}
// pre-notify effect hooks
bool post_hook_complete{false};
if(opts.post)
pool([&event, &eval, &post_hook_complete]
{
const unwind notify{[&post_hook_complete]
{
post_hook_complete = true;
sequence::dock.notify_all();
}};
post_hook(event, eval);
});
if(opts.write) if(opts.write)
write_prepare(eval, event);
if(opts.write)
write_append(eval, event);
// Generate post-eval/pre-notify effects. This function may conduct
// an entire eval of several more events recursively before returning.
if(opts.post)
post_hook(event, eval);
// Commit the transaction to database iff this eval is at the stack base.
if(opts.write && !eval.sequence_shared[0])
write_commit(eval);
// Wait for sequencing only if this is the stack base, otherwise we'll
// never return back to that stack base.
if(!eval.sequence_shared[0])
{ {
sequence::dock.wait([&eval] sequence::dock.wait([&eval]
{ {
return eval::seqnext(sequence::retired) == &eval; return eval::seqnext(sequence::retired) == &eval;
}); });
log::debug
{
log, "%s | retire %lu:%lu",
loghead(eval),
sequence::get(eval),
eval.sequence_shared[1],
};
assert(sequence::retired < sequence::get(eval)); assert(sequence::retired < sequence::get(eval));
sequence::retired = sequence::get(eval); sequence::retired = std::max(eval.sequence_shared[1], sequence::get(eval));
sequence::dock.notify_all(); sequence::dock.notify_all();
log::debug
{
log, "vm seq %lu:%lu|%lu[%lu|%lu]%lu | retire",
vm::sequence::max(),
vm::sequence::uncommitted,
vm::sequence::committed,
vm::sequence::retired,
sequence::get(eval),
vm::sequence::min(),
};
}
if(opts.post)
{
sequence::dock.wait([&post_hook_complete]
{
return post_hook_complete;
});
log::debug
{
log, "vm seq %lu:%lu|%lu[%lu|%lu]%lu | accept",
vm::sequence::max(),
vm::sequence::uncommitted,
vm::sequence::committed,
vm::sequence::retired,
sequence::get(eval),
vm::sequence::min(),
};
} }
return fault::ACCEPT; return fault::ACCEPT;
} }
void void
ircd::m::vm::_write(eval &eval, ircd::m::vm::write_prepare(eval &eval,
const event &event) const event &event)
{ {
assert(eval.opts); assert(eval.opts);
const auto &opts const auto &opts{*eval.opts};
// Share a transaction with any other evals on this stack. This
// should mean the bottom-most/lowest-sequence eval on this ctx.
const auto get_other_txn{[&eval]
(auto &other)
{ {
*eval.opts if(&other != &eval && other.txn)
}; {
other.sequence_shared[1] = std::max(other.sequence_shared[1], sequence::get(eval));
eval.sequence_shared[0] = sequence::get(other);
eval.txn = other.txn;
return false;
}
else return true;
}};
// If we broke from the iteration then this eval is sharing a transaction
// from another eval on this stack.
if(!eval.for_each(eval.ctx, get_other_txn))
return;
const size_t reserve_bytes const size_t reserve_bytes
{ {
opts.reserve_bytes == size_t(-1)? opts.reserve_bytes == size_t(-1)?
size_t(json::serialized(event) * 1.66): size_t(json::serialized(event) * 1.66):
opts.reserve_bytes
opts.reserve_bytes
}; };
db::txn txn eval.txn = std::make_shared<db::txn>
{ (
*dbs::events, db::txn::opts *dbs::events, db::txn::opts
{ {
reserve_bytes + opts.reserve_index, // reserve_bytes reserve_bytes + opts.reserve_index, // reserve_bytes
0, // max_bytes (no max) 0, // max_bytes (no max)
} }
}; );
}
// Expose to eval interface void
const scope_restore eval_txn ircd::m::vm::write_append(eval &eval,
const event &event)
{
assert(eval.opts);
assert(eval.txn);
const auto &opts{*eval.opts};
auto &txn{*eval.txn};
log::debug
{ {
eval.txn, &txn log, "%s | append", loghead(eval)
}; };
// Preliminary write_opts // Preliminary write_opts
@ -851,8 +835,7 @@ ircd::m::vm::_write(eval &eval,
if(at<"type"_>(event) == "m.room.create") if(at<"type"_>(event) == "m.room.create")
{ {
dbs::write(txn, event, wopts); dbs::write(*eval.txn, event, wopts);
_commit(eval);
return; return;
} }
@ -886,14 +869,15 @@ ircd::m::vm::_write(eval &eval,
}; };
wopts.root_in = state.root_id; wopts.root_in = state.root_id;
dbs::write(txn, event, wopts); dbs::write(*eval.txn, event, wopts);
_commit(eval);
} }
void void
ircd::m::vm::_commit(eval &eval) ircd::m::vm::write_commit(eval &eval)
{ {
assert(eval.txn); assert(eval.txn);
assert(eval.txn.use_count() == 1);
assert(eval.sequence_shared[0] == 0);
auto &txn(*eval.txn); auto &txn(*eval.txn);
#ifdef RB_DEBUG #ifdef RB_DEBUG
@ -906,19 +890,15 @@ ircd::m::vm::_commit(eval &eval)
const auto db_seq_after(db::sequence(*m::dbs::events)); const auto db_seq_after(db::sequence(*m::dbs::events));
#endif #endif
if(log_commit_debug && !eval.opts->debuglog_accept) log::debug
log::debug {
{ log, "%s | wrote %lu:%lu | db seq %lu:%lu %zu cells in %zu bytes to events database ...",
log, "vm seq %lu:%lu|%lu[%lu]%lu:%lu | wrote | db seq %lu:%lu %zu cells in %zu bytes to events database ...", loghead(eval),
vm::sequence::max(), sequence::get(eval),
vm::sequence::uncommitted, eval.sequence_shared[1],
vm::sequence::committed, db_seq_before,
sequence::get(eval), db_seq_after,
vm::sequence::retired, txn.size(),
vm::sequence::min(), txn.bytes()
db_seq_before, };
db_seq_after,
txn.size(),
txn.bytes()
};
} }