mirror of
https://github.com/matrix-construct/construct
synced 2024-11-17 23:40:57 +01:00
441 lines
7 KiB
C++
441 lines
7 KiB
C++
// The Construct
|
|
//
|
|
// Copyright (C) The Construct Developers, Authors & Contributors
|
|
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
|
|
//
|
|
// Permission to use, copy, modify, and/or distribute this software for any
|
|
// purpose with or without fee is hereby granted, provided that the above
|
|
// copyright notice and this permission notice is present in all copies. The
|
|
// full license for this software is available in the LICENSE file.
|
|
|
|
template<>
|
|
decltype(ircd::util::instance_list<ircd::m::gossip>::allocator)
|
|
ircd::util::instance_list<ircd::m::gossip>::allocator
|
|
{};
|
|
|
|
template<>
|
|
decltype(ircd::util::instance_list<ircd::m::gossip>::list)
|
|
ircd::util::instance_list<ircd::m::gossip>::list
|
|
{
|
|
allocator
|
|
};
|
|
|
|
decltype(ircd::m::gossip::log)
|
|
ircd::m::gossip::log
|
|
{
|
|
"m.gossip"
|
|
};
|
|
|
|
ircd::m::gossip::gossip::gossip(const struct opts &opts)
|
|
:opts{opts}
|
|
{
|
|
for(size_t i(0); i < opts.rounds; ++i)
|
|
if(!gossip_head())
|
|
break;
|
|
}
|
|
|
|
ircd::m::gossip::~gossip()
|
|
noexcept try
|
|
{
|
|
while(!requests.empty())
|
|
while(handle());
|
|
}
|
|
catch(const ctx::interrupted &)
|
|
{
|
|
return;
|
|
}
|
|
catch(const ctx::terminated &)
|
|
{
|
|
return;
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::gossip_head()
|
|
{
|
|
bool ret
|
|
{
|
|
false
|
|
};
|
|
|
|
if(opts.hint && opts.hint_only && opts.room.event_id)
|
|
{
|
|
m::event result;
|
|
result.event_id = opts.room.event_id;
|
|
json::get<"origin"_>(result) = opts.hint;
|
|
ret |= handle_head(result);
|
|
return ret;
|
|
}
|
|
|
|
if(opts.hint && opts.hint_only)
|
|
{
|
|
const unique_mutable_buffer buf
|
|
{
|
|
16_KiB
|
|
};
|
|
|
|
const auto event
|
|
{
|
|
m::room::head::fetch::one(buf, opts.room, opts.hint)
|
|
};
|
|
|
|
m::for_each(event::prev{event}, [this, &ret]
|
|
(const event::id &event_id)
|
|
{
|
|
m::event result;
|
|
json::get<"origin"_>(result) = opts.hint;
|
|
result.event_id = event_id;
|
|
ret |= handle_head(result);
|
|
return true;
|
|
});
|
|
|
|
return ret;
|
|
}
|
|
|
|
m::room::head::fetch::opts hfopts;
|
|
hfopts.room_id = opts.room.room_id;
|
|
hfopts.top = m::top(opts.room.room_id);
|
|
hfopts.existing = true;
|
|
m::room::head::fetch
|
|
{
|
|
hfopts, [this, &ret]
|
|
(const m::event &result)
|
|
{
|
|
ret |= handle_head(result);
|
|
return true;
|
|
}
|
|
};
|
|
|
|
return ret;
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::handle_head(const m::event &result)
|
|
{
|
|
const auto &remote
|
|
{
|
|
json::get<"origin"_>(result)
|
|
};
|
|
|
|
const bool submitted
|
|
{
|
|
submit(result.event_id, remote)
|
|
};
|
|
|
|
return submitted;
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::submit(const m::event::id &event_id,
|
|
const string_view &remote)
|
|
{
|
|
const auto hash
|
|
{
|
|
(uint128_t(ircd::hash(event_id)))
|
|
| (uint128_t(ircd::hash(remote)) << 64)
|
|
};
|
|
|
|
auto it
|
|
{
|
|
attempts.lower_bound(hash)
|
|
};
|
|
|
|
const bool exists
|
|
{
|
|
it != end(attempts) && *it == hash
|
|
};
|
|
|
|
if(!exists)
|
|
it = attempts.emplace_hint(it, hash);
|
|
|
|
const bool submitted
|
|
{
|
|
!exists && !started(event_id, remote)?
|
|
start(event_id, remote):
|
|
false
|
|
};
|
|
|
|
if(submitted || full())
|
|
while(handle());
|
|
|
|
return submitted;
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::start(const m::event::id &event_id_,
|
|
const string_view &remote_)
|
|
try
|
|
{
|
|
static const size_t max
|
|
{
|
|
48
|
|
};
|
|
|
|
const auto event_idx_
|
|
{
|
|
m::index(std::nothrow, event_id_)
|
|
};
|
|
|
|
const m::event::refs refs
|
|
{
|
|
event_idx_
|
|
};
|
|
|
|
size_t num{0}, i{0};
|
|
std::array<event::idx, max> next_idx;
|
|
refs.for_each(dbs::ref::NEXT, [this, &next_idx, &num]
|
|
(const event::idx &event_idx, const auto &ref_type)
|
|
{
|
|
assert(ref_type == dbs::ref::NEXT);
|
|
if(event_idx < opts.ref.first || event_idx > opts.ref.second)
|
|
return true;
|
|
|
|
next_idx.at(num) = event_idx;
|
|
return ++num < next_idx.size();
|
|
});
|
|
|
|
if(!num)
|
|
return false;
|
|
|
|
unique_mutable_buffer _buf
|
|
{
|
|
(event::MAX_SIZE * num) + 16_KiB
|
|
};
|
|
|
|
mutable_buffer buf{_buf};
|
|
json::stack out{buf};
|
|
{
|
|
json::stack::object top
|
|
{
|
|
out
|
|
};
|
|
|
|
json::stack::member
|
|
{
|
|
top, "origin", m::my_host()
|
|
};
|
|
|
|
json::stack::member
|
|
{
|
|
top, "origin_server_ts", json::value
|
|
{
|
|
ircd::time<milliseconds>()
|
|
}
|
|
};
|
|
|
|
json::stack::array pdus
|
|
{
|
|
top, "pdus"
|
|
};
|
|
|
|
m::event::fetch event;
|
|
for(i = 0; i < num; ++i)
|
|
{
|
|
if(!seek(std::nothrow, event, next_idx.at(i)))
|
|
continue;
|
|
|
|
pdus.append(event.source);
|
|
log::debug
|
|
{
|
|
log, "Gossip %zu/%zu in %s for %s to '%s' %s idx:%lu",
|
|
i,
|
|
num,
|
|
string_view{opts.room.room_id},
|
|
string_view{event_id_},
|
|
remote_,
|
|
string_view{event.event_id},
|
|
event.event_idx,
|
|
};
|
|
}
|
|
}
|
|
|
|
if(!i)
|
|
return false;
|
|
|
|
const string_view txn
|
|
{
|
|
out.completed()
|
|
};
|
|
|
|
consume(buf, size(txn));
|
|
const string_view txnid
|
|
{
|
|
m::txn::create_id(buf, txn)
|
|
};
|
|
|
|
consume(buf, size(txnid));
|
|
const string_view remote
|
|
{
|
|
strlcpy{buf, remote_}
|
|
};
|
|
|
|
consume(buf, size(remote));
|
|
const string_view event_id
|
|
{
|
|
strlcpy{buf, event_id_}
|
|
};
|
|
|
|
consume(buf, size(event_id));
|
|
assert(!empty(buf));
|
|
|
|
char pbuf[48];
|
|
log::debug
|
|
{
|
|
log, "Gossip %zu/%zu in %s for %s to '%s' txn[%s] %s",
|
|
i,
|
|
num,
|
|
string_view{opts.room.room_id},
|
|
string_view{event_id_},
|
|
remote_,
|
|
txnid,
|
|
pretty(pbuf, iec(size(txn))),
|
|
};
|
|
|
|
m::fed::send::opts fedopts;
|
|
fedopts.remote = remote;
|
|
requests.emplace_back(result
|
|
{
|
|
std::move(_buf),
|
|
txn,
|
|
txnid,
|
|
remote,
|
|
event_id,
|
|
m::fed::send
|
|
{
|
|
txnid,
|
|
txn,
|
|
buf,
|
|
std::move(fedopts)
|
|
}
|
|
});
|
|
|
|
return true;
|
|
}
|
|
catch(const ctx::interrupted &e)
|
|
{
|
|
throw;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::error
|
|
{
|
|
log, "Gossip %s in %s from '%s' :%s",
|
|
string_view{event_id_},
|
|
string_view{opts.room.room_id},
|
|
remote_,
|
|
e.what(),
|
|
};
|
|
|
|
return false;
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::handle()
|
|
{
|
|
if(requests.empty())
|
|
return false;
|
|
|
|
auto next
|
|
{
|
|
ctx::when_any(std::begin(requests), std::end(requests), []
|
|
(auto &it) -> ctx::future<http::code> &
|
|
{
|
|
return it->request;
|
|
})
|
|
};
|
|
|
|
const milliseconds timeout
|
|
{
|
|
full()? 5000: 50
|
|
};
|
|
|
|
ctx::interruption_point();
|
|
if(!next.wait(timeout, std::nothrow))
|
|
return full();
|
|
|
|
const unique_iterator it
|
|
{
|
|
requests, next.get()
|
|
};
|
|
|
|
assert(it.it != std::end(requests));
|
|
return handle(*it.it);
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::handle(result &result)
|
|
try
|
|
{
|
|
auto response
|
|
{
|
|
result.request.get()
|
|
};
|
|
|
|
const json::object body
|
|
{
|
|
result.request
|
|
};
|
|
|
|
fed::send::response{body}.for_each_pdu([&]
|
|
(const event::id &event_id, const json::object &errors)
|
|
{
|
|
const bool ok
|
|
{
|
|
empty(errors)
|
|
};
|
|
|
|
log::logf
|
|
{
|
|
log, ok? log::level::DEBUG: log::level::DERROR,
|
|
"Gossip %s in %s to '%s'%s%s",
|
|
string_view{event_id},
|
|
string_view{opts.room.room_id},
|
|
string_view{result.remote},
|
|
!ok? " :"_sv: " "_sv,
|
|
string_view{errors},
|
|
};
|
|
});
|
|
|
|
return true;
|
|
}
|
|
catch(const ctx::interrupted &e)
|
|
{
|
|
throw;
|
|
}
|
|
catch(const std::exception &e)
|
|
{
|
|
log::logf
|
|
{
|
|
log, log::level::DERROR,
|
|
"Gossip %s in %s to '%s' :%s",
|
|
string_view{result.event_id},
|
|
string_view{opts.room.room_id},
|
|
string_view{result.remote},
|
|
e.what(),
|
|
};
|
|
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::started(const event::id &event_id,
|
|
const string_view &remote)
|
|
const
|
|
{
|
|
const auto it
|
|
{
|
|
std::find_if(std::begin(requests), std::end(requests), [&]
|
|
(const auto &result)
|
|
{
|
|
return result.event_id == event_id && result.remote == remote;
|
|
})
|
|
};
|
|
|
|
return it != std::end(requests);
|
|
}
|
|
|
|
bool
|
|
ircd::m::gossip::full()
|
|
const noexcept
|
|
{
|
|
return requests.size() >= opts.width;
|
|
}
|