0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-12-25 23:14:13 +01:00

ircd:Ⓜ️:init::backfill: Split gossip routine into separate interface.

This commit is contained in:
Jason Volk 2020-04-23 02:04:07 -07:00
parent 05b24b1ef3
commit 87f873ad45
5 changed files with 201 additions and 128 deletions

39
include/ircd/m/gossip.h Normal file
View file

@ -0,0 +1,39 @@
// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_M_GOSSIP_H
namespace ircd::m::gossip
{
struct opts;
struct gossip;
extern log::log log;
};
struct ircd::m::gossip::gossip
{
gossip(const room::id &, const opts &);
};
struct ircd::m::gossip::opts
{
/// The remote server to gossip with. May be empty to gossip with every
/// server in the room.
string_view remote;
/// An event to gossip about. May be empty to determine which event must
/// be gossiped about.
m::event::id event_id;
/// Coarse timeout for various network interactions.
milliseconds timeout {7500ms};
};

View file

@ -94,6 +94,7 @@ namespace ircd
#include "breadcrumb_rooms.h"
#include "media.h"
#include "search.h"
#include "gossip.h"
#include "resource.h"
#include "homeserver.h"

View file

@ -134,6 +134,7 @@ libircd_matrix_la_SOURCES += events.cc
libircd_matrix_la_SOURCES += fed.cc
libircd_matrix_la_SOURCES += feds.cc
libircd_matrix_la_SOURCES += fetch.cc
libircd_matrix_la_SOURCES += gossip.cc
libircd_matrix_la_SOURCES += request.cc
libircd_matrix_la_SOURCES += keys.cc
libircd_matrix_la_SOURCES += node.cc

139
matrix/gossip.cc Normal file
View file

@ -0,0 +1,139 @@
// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
decltype(ircd::m::gossip::log)
ircd::m::gossip::log
{
"m.gossip"
};
/// Initial gossip protocol works by sending the remote server some events which
/// reference an event contained in the remote's head which we just obtained.
/// This is part of a family of active measures taken to reduce forward
/// extremities on other servers but without polluting the chain with
/// permanent data for this purpose such as with org.matrix.dummy_event.
ircd::m::gossip::gossip::gossip(const room::id &room_id,
const opts &opts)
{
assert(opts.event_id);
const auto &event_id
{
opts.event_id
};
const m::event::refs refs
{
m::index(std::nothrow, event_id)
};
static const size_t max{48};
const size_t count
{
std::min(refs.count(dbs::ref::NEXT), max)
};
if(!count)
return;
const unique_mutable_buffer buf[]
{
{ event::MAX_SIZE * (count + 1) },
{ 16_KiB },
};
size_t i{0};
std::array<event::idx, max> next_idx;
refs.for_each(dbs::ref::NEXT, [&next_idx, &i]
(const event::idx &event_idx, const auto &ref_type)
{
assert(ref_type == dbs::ref::NEXT);
next_idx.at(i) = event_idx;
return ++i < next_idx.size();
});
size_t ret{0};
json::stack out{buf[0]};
{
json::stack::object top
{
out
};
json::stack::member
{
top, "origin", m::my_host()
};
json::stack::member
{
top, "origin_server_ts", json::value
{
long(ircd::time<milliseconds>())
}
};
json::stack::array pdus
{
top, "pdus"
};
m::event::fetch event;
for(assert(ret == 0); ret < i; ++ret)
if(seek(std::nothrow, event, next_idx.at(ret)))
pdus.append(event.source);
}
const string_view txn
{
out.completed()
};
char idbuf[64];
const string_view txnid
{
m::txn::create_id(idbuf, txn)
};
m::fed::send::opts fedopts;
fedopts.remote = opts.remote;
m::fed::send request
{
txnid, txn, buf[1], std::move(fedopts)
};
http::code code{0};
std::exception_ptr eptr;
if(request.wait(opts.timeout, std::nothrow)) try
{
code = request.get();
ret += code == http::OK;
}
catch(...)
{
eptr = std::current_exception();
}
log::logf
{
log, code == http::OK? log::DEBUG : log::DERROR,
"gossip %zu:%zu to %s reference to %s in %s :%s %s",
ret,
count,
opts.remote,
string_view{event_id},
string_view{room_id},
code?
status(code):
"failed",
eptr?
what(eptr):
string_view{},
};
}

View file

@ -12,7 +12,7 @@ namespace ircd::m::init::backfill
{
extern conf::item<bool> gossip_enable;
extern conf::item<seconds> gossip_timeout;
size_t gossip(const room::id &, const event::id &, const string_view &remote);
void gossip(const room::id &, const event::id &, const string_view &remote);
bool handle_event(const room::id &, const event::id &, const string_view &hint, const bool &ask_one);
void handle_missing(const room::id &);
@ -53,6 +53,20 @@ ircd::m::init::backfill::local_joined_only
{ "default", true },
};
decltype(ircd::m::init::backfill::gossip_enable)
ircd::m::init::backfill::gossip_enable
{
{ "name", "ircd.m.init.backfill.gossip.enable" },
{ "default", true },
};
decltype(ircd::m::init::backfill::gossip_timeout)
ircd::m::init::backfill::gossip_timeout
{
{ "name", "ircd.m.init.backfill.gossip.timeout" },
{ "default", 5L },
};
decltype(ircd::m::init::backfill::worker_context)
ircd::m::init::backfill::worker_context;
@ -568,138 +582,17 @@ catch(const std::exception &e)
return false;
}
decltype(ircd::m::init::backfill::gossip_enable)
ircd::m::init::backfill::gossip_enable
{
{ "name", "ircd.m.init.backfill.gossip.enable" },
{ "default", true },
};
decltype(ircd::m::init::backfill::gossip_timeout)
ircd::m::init::backfill::gossip_timeout
{
{ "name", "ircd.m.init.backfill.gossip.timeout" },
{ "default", 5L },
};
/// Initial gossip protocol works by sending the remote server some events which
/// reference an event contained in the remote's head which we just obtained.
/// This is part of a family of active measures taken to reduce forward
/// extremities on other servers but without polluting the chain with
/// permanent data for this purpose such as with org.matrix.dummy_event.
size_t
void
ircd::m::init::backfill::gossip(const room::id &room_id,
const event::id &event_id,
const string_view &remote)
{
size_t ret{0};
const m::event::refs refs
{
m::index(std::nothrow, event_id)
};
static const size_t max{48};
const size_t count
{
std::min(refs.count(dbs::ref::NEXT), max)
};
if(!count)
return ret;
const unique_mutable_buffer buf[]
{
{ event::MAX_SIZE * (count + 1) },
{ 16_KiB },
};
size_t i{0};
std::array<event::idx, max> next_idx;
refs.for_each(dbs::ref::NEXT, [&next_idx, &i]
(const event::idx &event_idx, const auto &ref_type)
{
assert(ref_type == dbs::ref::NEXT);
next_idx.at(i) = event_idx;
return ++i < next_idx.size();
});
json::stack out{buf[0]};
{
json::stack::object top
{
out
};
json::stack::member
{
top, "origin", m::my_host()
};
json::stack::member
{
top, "origin_server_ts", json::value
{
long(ircd::time<milliseconds>())
}
};
json::stack::array pdus
{
top, "pdus"
};
m::event::fetch event;
for(assert(ret == 0); ret < i; ++ret)
if(seek(std::nothrow, event, next_idx.at(ret)))
pdus.append(event.source);
}
const string_view txn
{
out.completed()
};
char idbuf[64];
const string_view txnid
{
m::txn::create_id(idbuf, txn)
};
m::fed::send::opts opts;
m::gossip::opts opts;
opts.timeout = seconds(gossip_timeout);
opts.remote = remote;
m::fed::send request
opts.event_id = event_id;
gossip::gossip
{
txnid, txn, buf[1], std::move(opts)
room_id, opts
};
http::code code{0};
std::exception_ptr eptr;
if(request.wait(seconds(gossip_timeout), std::nothrow)) try
{
code = request.get();
ret += code == http::OK;
}
catch(...)
{
eptr = std::current_exception();
}
log::logf
{
log, code == http::OK? log::DEBUG : log::DERROR,
"gossip %zu:%zu to %s reference to %s in %s :%s %s",
ret,
count,
remote,
string_view{event_id},
string_view{room_id},
code?
status(code):
"failed",
eptr?
what(eptr):
string_view{},
};
return ret;
}