0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-25 05:18:23 +02:00

ircd:Ⓜ️:gossip: Implement multi-round loop; add console cmds.

This commit is contained in:
Jason Volk 2020-12-09 02:15:41 -08:00
parent 2351cc071f
commit 4b1c8956a6
3 changed files with 208 additions and 32 deletions

View file

@ -31,6 +31,7 @@ struct ircd::m::gossip
const struct opts &opts;
std::list<result> requests;
std::set<uint128_t> attempts;
private:
bool full() const noexcept;
@ -41,7 +42,7 @@ struct ircd::m::gossip
bool start(const event::id &, const string_view &);
bool submit(const event::id &, const string_view &);
bool handle_head(const m::event &);
void gossip_head();
bool gossip_head();
public:
gossip(const struct opts &);

View file

@ -29,31 +29,83 @@ ircd::m::gossip::log
ircd::m::gossip::gossip::gossip(const struct opts &opts)
:opts{opts}
{
gossip_head();
for(size_t i(0); i < opts.rounds; ++i)
if(!gossip_head())
break;
}
ircd::m::gossip::~gossip()
noexcept
noexcept try
{
while(!requests.empty())
while(handle());
}
catch(const ctx::interrupted &)
{
return;
}
catch(const ctx::terminated &)
{
return;
}
void
bool
ircd::m::gossip::gossip_head()
{
bool ret
{
false
};
if(opts.hint && opts.hint_only && opts.room.event_id)
{
m::event result;
result.event_id = opts.room.event_id;
json::get<"origin"_>(result) = opts.hint;
ret |= handle_head(result);
return ret;
}
if(opts.hint && opts.hint_only)
{
const unique_mutable_buffer buf
{
16_KiB
};
const auto event
{
m::room::head::fetch::one(buf, opts.room, opts.hint)
};
m::for_each(event::prev{event}, [this, &ret]
(const event::id &event_id)
{
m::event result;
json::get<"origin"_>(result) = opts.hint;
result.event_id = event_id;
ret |= handle_head(result);
return true;
});
return ret;
}
m::room::head::fetch::opts hfopts;
hfopts.room_id = opts.room.room_id;
hfopts.top = m::top(opts.room.room_id);
hfopts.existing = true;
m::room::head::fetch
{
hfopts, [this](const m::event &result)
hfopts, [this, &ret]
(const m::event &result)
{
// Bail if interrupted
if(ctx::interruption_requested())
return false;
return handle_head(result);
ret |= handle_head(result);
return true;
}
};
return ret;
}
bool
@ -69,34 +121,43 @@ ircd::m::gossip::handle_head(const m::event &result)
submit(result.event_id, remote)
};
log::debug
{
log, "Gossip %s in %s to '%s' submit:%b requests:%zu",
string_view{result.event_id},
string_view{opts.room.room_id},
remote,
submitted,
requests.size(),
};
return true;
return submitted;
}
bool
ircd::m::gossip::submit(const m::event::id &event_id,
const string_view &remote)
{
const bool ret
const auto hash
{
!started(event_id, remote)?
(uint128_t(ircd::hash(remote)) << 64) |
(uint128_t(ircd::hash(event_id)) >> 64)
};
auto it
{
attempts.lower_bound(hash)
};
const bool exists
{
it != end(attempts) && *it == hash
};
if(!exists)
it = attempts.emplace_hint(it, hash);
const bool submitted
{
!exists && !started(event_id, remote)?
start(event_id, remote):
false
};
if(ret || full())
if(submitted || full())
while(handle());
return ret;
return submitted;
}
bool
@ -109,17 +170,25 @@ try
48
};
const m::event::refs refs
const auto event_idx_
{
m::index(std::nothrow, event_id_)
};
size_t num{0};
const m::event::refs refs
{
event_idx_
};
size_t num{0}, i{0};
std::array<event::idx, max> next_idx;
refs.for_each(dbs::ref::NEXT, [&next_idx, &num]
refs.for_each(dbs::ref::NEXT, [this, &next_idx, &num]
(const event::idx &event_idx, const auto &ref_type)
{
assert(ref_type == dbs::ref::NEXT);
if(event_idx < opts.ref.first || event_idx > opts.ref.second)
return true;
next_idx.at(num) = event_idx;
return ++num < next_idx.size();
});
@ -159,11 +228,29 @@ try
};
m::event::fetch event;
for(size_t i(0); i < num; ++i)
if(seek(std::nothrow, event, next_idx.at(i)))
pdus.append(event.source);
for(i = 0; i < num; ++i)
{
if(!seek(std::nothrow, event, next_idx.at(i)))
continue;
pdus.append(event.source);
log::debug
{
log, "Gossip %zu/%zu in %s for %s to '%s' %s idx:%lu",
i,
num,
string_view{opts.room.room_id},
string_view{event_id_},
remote_,
string_view{event.event_id},
event.event_idx,
};
}
}
if(!i)
return false;
const string_view txn
{
out.completed()
@ -190,6 +277,19 @@ try
consume(buf, size(event_id));
assert(!empty(buf));
char pbuf[48];
log::debug
{
log, "Gossip %zu/%zu in %s for %s to '%s' txn[%s] %s",
i,
num,
string_view{opts.room.room_id},
string_view{event_id_},
remote_,
txnid,
pretty(pbuf, iec(size(txn))),
};
m::fed::send::opts fedopts;
fedopts.remote = remote;
requests.emplace_back(result
@ -290,7 +390,7 @@ try
string_view{event_id},
string_view{opts.room.room_id},
string_view{result.remote},
!ok? " :"_sv: ""_sv,
!ok? " :"_sv: " "_sv,
string_view{errors},
};
});

View file

@ -11158,6 +11158,81 @@ console_cmd__room__acquire(opt &out, const string_view &line)
return true;
}
bool
console_cmd__room__gossip__list(opt &out, const string_view &line)
{
size_t i(0);
for(const auto *const &a : m::gossip::list)
{
size_t j(0);
for(const auto &result : a->requests)
out
<< std::left << std::setw(4) << i
<< " "
<< std::left << std::setw(4) << j++
<< " "
<< std::left << std::setw(50) << trunc(a->opts.room.room_id, 40)
<< " ["
<< std::right << std::setw(7) << a->opts.depth.first
<< " "
<< std::right << std::setw(7) << a->opts.depth.second
<< " | "
<< std::right << std::setw(8) << a->opts.ref.first
<< " "
<< std::right << std::setw(8) << long(a->opts.ref.second)
<< "] "
<< std::endl;
i++;
}
return true;
}
bool
console_cmd__room__gossip(opt &out, const string_view &line)
{
const params param{line, " ",
{
"room_id", "remote", "rounds"
}};
if(!param["room_id"])
return console_cmd__room__gossip__list(out, line);
const auto &room_id
{
m::room_id(param.at("room_id"))
};
const auto remote
{
param["remote"]
};
const auto rounds
{
param.at("rounds", -1UL)
};
const m::room room
{
room_id
};
struct m::gossip::opts opts;
opts.room = room;
opts.hint = remote != "*"? remote: string_view{};
opts.hint_only = !opts.hint.empty();
opts.rounds = rounds;
m::gossip gossip
{
opts
};
return true;
}
bool
console_cmd__room__messages(opt &out, const string_view &line)
{