0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-11-29 18:22:50 +01:00

ircd:Ⓜ️:gossip: Refactor interface; zero-worker implementation.

This commit is contained in:
Jason Volk 2020-12-04 21:38:56 -08:00
parent d85a4bec99
commit 1622ef89af
3 changed files with 359 additions and 107 deletions

View file

@ -11,29 +11,83 @@
#pragma once #pragma once
#define HAVE_IRCD_M_GOSSIP_H #define HAVE_IRCD_M_GOSSIP_H
namespace ircd::m::gossip namespace ircd::m
{
struct gossip;
}
/// Matrix Gossip is a mechanism that proactively resolves the head (forward-
/// extremities) of a room on remote servers by sending the events they are
/// missing if we have them. Gossip may be performed multiple times, checking
/// for changes in the remote head and repeating based on options or until
/// completion.
struct ircd::m::gossip
:instance_list<ircd::m::gossip>
{ {
struct opts; struct opts;
struct gossip; struct result;
extern log::log log; static log::log log;
};
struct ircd::m::gossip::gossip const struct opts &opts;
{ std::list<result> requests;
gossip(const room::id &, const opts &);
private:
bool full() const noexcept;
bool started(const event::id &, const string_view &) const;
bool handle(result &);
bool handle();
bool start(const event::id &, const string_view &);
bool submit(const event::id &, const string_view &);
bool handle_head(const m::event &);
void gossip_head();
public:
gossip(const struct opts &);
gossip(const gossip &) = delete;
gossip &operator=(const gossip &) = delete;
~gossip() noexcept;
}; };
struct ircd::m::gossip::opts struct ircd::m::gossip::opts
{ {
/// The remote server to gossip with. May be empty to gossip with every /// Room apropos; when room.event_id is true, only that event will be
/// server in the room. /// the subject of gossip and that is only if the remote's head requires
string_view remote; /// it. room.event_id should not be given in most cases.
m::room room;
/// An event to gossip about. May be empty to determine which event must /// When hint_only=true this string is used to conduct gossip with the
/// be gossiped about. /// single remote given.
m::event::id event_id; string_view hint;
/// Coarse timeout for various network interactions. /// Forces remote operations to the remote given in the hint only.
milliseconds timeout {7500ms}; bool hint_only {false};
/// Depthwise window of gossip: no gossip for events outside of a given
/// depth window. Ignored if !depth.second.
pair<int64_t> depth {0, 0};
/// Indexwise window of gossip: no gossip for events with a value outside
/// of the window.
pair<event::idx> ref {0, -1UL};
/// The number of rounds the algorithm runs for.
size_t rounds {-1UL};
/// Total event limit over all operations.
size_t max {-1UL};
/// Limit the number of gossips in flight at any given time.
size_t width {128};
};
struct ircd::m::gossip::result
{
unique_mutable_buffer buf;
string_view txn;
string_view txnid;
string_view remote;
event::id event_id;
m::fed::send request;
}; };

View file

@ -8,58 +8,132 @@
// copyright notice and this permission notice is present in all copies. The // copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file. // full license for this software is available in the LICENSE file.
template<>
decltype(ircd::util::instance_list<ircd::m::gossip>::allocator)
ircd::util::instance_list<ircd::m::gossip>::allocator
{};
template<>
decltype(ircd::util::instance_list<ircd::m::gossip>::list)
ircd::util::instance_list<ircd::m::gossip>::list
{
allocator
};
decltype(ircd::m::gossip::log) decltype(ircd::m::gossip::log)
ircd::m::gossip::log ircd::m::gossip::log
{ {
"m.gossip" "m.gossip"
}; };
/// Initial gossip protocol works by sending the remote server some events which ircd::m::gossip::gossip::gossip(const struct opts &opts)
/// reference an event contained in the remote's head which we just obtained. :opts{opts}
/// This is part of a family of active measures taken to reduce forward
/// extremities on other servers but without polluting the chain with
/// permanent data for this purpose such as with org.matrix.dummy_event.
ircd::m::gossip::gossip::gossip(const room::id &room_id,
const opts &opts)
{ {
assert(opts.event_id); gossip_head();
const auto &event_id }
ircd::m::gossip::~gossip()
noexcept
{
}
void
ircd::m::gossip::gossip_head()
{
m::room::head::fetch::opts hfopts;
hfopts.room_id = opts.room.room_id;
hfopts.top = m::top(opts.room.room_id);
m::room::head::fetch
{ {
opts.event_id hfopts, [this](const m::event &result)
{
// Bail if interrupted
if(ctx::interruption_requested())
return false;
return handle_head(result);
}
};
}
bool
ircd::m::gossip::handle_head(const m::event &result)
{
const auto &remote
{
json::get<"origin"_>(result)
};
const bool submitted
{
submit(result.event_id, remote)
};
log::debug
{
log, "Gossip %s in %s to '%s' submit:%b requests:%zu",
string_view{result.event_id},
string_view{opts.room.room_id},
remote,
submitted,
requests.size(),
};
return true;
}
bool
ircd::m::gossip::submit(const m::event::id &event_id,
const string_view &remote)
{
const bool ret
{
!started(event_id, remote)?
start(event_id, remote):
false
};
if(ret || full())
while(handle());
return ret;
}
bool
ircd::m::gossip::start(const m::event::id &event_id_,
const string_view &remote_)
try
{
static const size_t max
{
48
}; };
const m::event::refs refs const m::event::refs refs
{ {
m::index(std::nothrow, event_id) m::index(std::nothrow, event_id_)
}; };
static const size_t max{48}; size_t num{0};
const size_t count
{
std::min(refs.count(dbs::ref::NEXT), max)
};
if(!count)
return;
const unique_mutable_buffer buf[]
{
{ event::MAX_SIZE * (count + 1) },
{ 16_KiB },
};
size_t i{0};
std::array<event::idx, max> next_idx; std::array<event::idx, max> next_idx;
refs.for_each(dbs::ref::NEXT, [&next_idx, &i] refs.for_each(dbs::ref::NEXT, [&next_idx, &num]
(const event::idx &event_idx, const auto &ref_type) (const event::idx &event_idx, const auto &ref_type)
{ {
assert(ref_type == dbs::ref::NEXT); assert(ref_type == dbs::ref::NEXT);
next_idx.at(i) = event_idx; next_idx.at(num) = event_idx;
return ++i < next_idx.size(); return ++num < next_idx.size();
}); });
size_t ret{0}; if(!num)
json::stack out{buf[0]}; return false;
unique_mutable_buffer _buf
{
(event::MAX_SIZE * num) + 16_KiB
};
mutable_buffer buf{_buf};
json::stack out{buf};
{ {
json::stack::object top json::stack::object top
{ {
@ -75,7 +149,7 @@ ircd::m::gossip::gossip::gossip(const room::id &room_id,
{ {
top, "origin_server_ts", json::value top, "origin_server_ts", json::value
{ {
long(ircd::time<milliseconds>()) ircd::time<milliseconds>()
} }
}; };
@ -85,8 +159,8 @@ ircd::m::gossip::gossip::gossip(const room::id &room_id,
}; };
m::event::fetch event; m::event::fetch event;
for(assert(ret == 0); ret < i; ++ret) for(size_t i(0); i < num; ++i)
if(seek(std::nothrow, event, next_idx.at(ret))) if(seek(std::nothrow, event, next_idx.at(i)))
pdus.append(event.source); pdus.append(event.source);
} }
@ -95,45 +169,173 @@ ircd::m::gossip::gossip::gossip(const room::id &room_id,
out.completed() out.completed()
}; };
char idbuf[64]; consume(buf, size(txn));
const string_view txnid const string_view txnid
{ {
m::txn::create_id(idbuf, txn) m::txn::create_id(buf, txn)
}; };
consume(buf, size(txnid));
const string_view remote
{
strlcpy{buf, remote_}
};
consume(buf, size(remote));
const string_view event_id
{
strlcpy{buf, event_id_}
};
consume(buf, size(event_id));
assert(!empty(buf));
m::fed::send::opts fedopts; m::fed::send::opts fedopts;
fedopts.remote = opts.remote; fedopts.remote = remote;
m::fed::send request requests.emplace_back(result
{ {
txnid, txn, buf[1], std::move(fedopts) std::move(_buf),
txn,
txnid,
remote,
event_id,
m::fed::send
{
txnid,
txn,
buf,
std::move(fedopts)
}
});
return true;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Gossip %s in %s from '%s' :%s",
string_view{event_id_},
string_view{opts.room.room_id},
remote_,
e.what(),
}; };
http::code code{0}; return false;
std::exception_ptr eptr; }
if(request.wait(opts.timeout, std::nothrow)) try
{
code = request.get();
ret += code == http::OK;
}
catch(...)
{
eptr = std::current_exception();
}
bool
ircd::m::gossip::handle()
{
if(requests.empty())
return false;
auto next
{
ctx::when_any(std::begin(requests), std::end(requests), []
(auto &it) -> ctx::future<http::code> &
{
return it->request;
})
};
const milliseconds timeout
{
full()? 5000: 50
};
ctx::interruption_point();
if(!next.wait(timeout, std::nothrow))
return full();
const unique_iterator it
{
requests, next.get()
};
assert(it.it != std::end(requests));
return handle(*it.it);
}
bool
ircd::m::gossip::handle(result &result)
try
{
auto response
{
result.request.get()
};
const json::object body
{
result.request
};
fed::send::response{body}.for_each_pdu([&]
(const event::id &event_id, const json::object &errors)
{
const bool ok
{
empty(errors)
};
log::logf
{
log, ok? log::level::DEBUG: log::level::DERROR,
"Gossip %s in %s to '%s'%s%s",
string_view{event_id},
string_view{opts.room.room_id},
string_view{result.remote},
!ok? " :"_sv: ""_sv,
string_view{errors},
};
});
return true;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::logf log::logf
{ {
log, code == http::OK? log::DEBUG : log::DERROR, log, log::level::DERROR,
"gossip %zu:%zu to %s reference to %s in %s :%s %s", "Gossip %s in %s to '%s' :%s",
ret, string_view{result.event_id},
count, string_view{opts.room.room_id},
opts.remote, string_view{result.remote},
string_view{event_id}, e.what(),
string_view{room_id},
code?
status(code):
"failed",
eptr?
what(eptr):
string_view{},
}; };
return true;
}
bool
ircd::m::gossip::started(const event::id &event_id,
const string_view &remote)
const
{
const auto it
{
std::find_if(std::begin(requests), std::end(requests), [&]
(const auto &result)
{
return result.event_id == event_id && result.remote == remote;
})
};
return it != std::end(requests);
}
bool
ircd::m::gossip::full()
const noexcept
{
return requests.size() >= opts.width;
} }

View file

@ -10,7 +10,6 @@
namespace ircd::m::init::backfill namespace ircd::m::init::backfill
{ {
void gossip(const room::id &, const event::id &, const string_view &remote);
void handle_room(const room::id &); void handle_room(const room::id &);
void worker(); void worker();
@ -311,35 +310,32 @@ catch(const std::exception &e)
void void
ircd::m::init::backfill::handle_room(const room::id &room_id) ircd::m::init::backfill::handle_room(const room::id &room_id)
{ {
struct m::acquire::opts opts;
opts.room = room_id;
opts.viewport_size = ssize_t(m::room::events::viewport_size);
opts.viewport_size *= size_t(viewports);
opts.vmopts.infolog_accept = true;
opts.vmopts.warnlog &= ~vm::fault::EXISTS;
opts.attempt_max = size_t(attempt_max);
m::acquire
{ {
opts struct m::acquire::opts opts;
}; opts.room = room_id;
opts.viewport_size = ssize_t(m::room::events::viewport_size);
opts.viewport_size *= size_t(viewports);
opts.vmopts.infolog_accept = true;
opts.vmopts.warnlog &= ~vm::fault::EXISTS;
opts.attempt_max = size_t(attempt_max);
m::acquire
{
opts
};
const size_t num_reset const size_t num_reset
{ {
m::room::head::reset(opts.room) m::room::head::reset(opts.room)
}; };
} }
void if((false))
ircd::m::init::backfill::gossip(const room::id &room_id,
const event::id &event_id,
const string_view &remote)
{
m::gossip::opts opts;
opts.timeout = seconds(gossip_timeout);
opts.remote = remote;
opts.event_id = event_id;
gossip::gossip
{ {
room_id, opts struct m::gossip::opts opts;
}; opts.room = room_id;
m::gossip
{
opts
};
}
} }