construct/matrix/gossip.cc

442 lines
7.0 KiB
C++

// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
template<>
decltype(ircd::util::instance_list<ircd::m::gossip>::allocator)
ircd::util::instance_list<ircd::m::gossip>::allocator
{};
template<>
decltype(ircd::util::instance_list<ircd::m::gossip>::list)
ircd::util::instance_list<ircd::m::gossip>::list
{
allocator
};
decltype(ircd::m::gossip::log)
ircd::m::gossip::log
{
"m.gossip"
};
ircd::m::gossip::gossip::gossip(const struct opts &opts)
:opts{opts}
{
for(size_t i(0); i < opts.rounds; ++i)
if(!gossip_head())
break;
}
ircd::m::gossip::~gossip()
noexcept try
{
while(!requests.empty())
while(handle());
}
catch(const ctx::interrupted &)
{
return;
}
catch(const ctx::terminated &)
{
return;
}
bool
ircd::m::gossip::gossip_head()
{
bool ret
{
false
};
if(opts.hint && opts.hint_only && opts.room.event_id)
{
m::event result;
result.event_id = opts.room.event_id;
json::get<"origin"_>(result) = opts.hint;
ret |= handle_head(result);
return ret;
}
if(opts.hint && opts.hint_only)
{
const unique_mutable_buffer buf
{
16_KiB
};
const auto event
{
m::room::head::fetch::one(buf, opts.room, opts.hint)
};
m::for_each(event::prev{event}, [this, &ret]
(const event::id &event_id)
{
m::event result;
json::get<"origin"_>(result) = opts.hint;
result.event_id = event_id;
ret |= handle_head(result);
return true;
});
return ret;
}
m::room::head::fetch::opts hfopts;
hfopts.room_id = opts.room.room_id;
hfopts.top = m::top(opts.room.room_id);
hfopts.existing = true;
m::room::head::fetch
{
hfopts, [this, &ret]
(const m::event &result)
{
ret |= handle_head(result);
return true;
}
};
return ret;
}
bool
ircd::m::gossip::handle_head(const m::event &result)
{
const auto &remote
{
json::get<"origin"_>(result)
};
const bool submitted
{
submit(result.event_id, remote)
};
return submitted;
}
bool
ircd::m::gossip::submit(const m::event::id &event_id,
const string_view &remote)
{
const auto hash
{
(uint128_t(ircd::hash(event_id)))
| (uint128_t(ircd::hash(remote)) << 64)
};
auto it
{
attempts.lower_bound(hash)
};
const bool exists
{
it != end(attempts) && *it == hash
};
if(!exists)
it = attempts.emplace_hint(it, hash);
const bool submitted
{
!exists && !started(event_id, remote)?
start(event_id, remote):
false
};
if(submitted || full())
while(handle());
return submitted;
}
bool
ircd::m::gossip::start(const m::event::id &event_id_,
const string_view &remote_)
try
{
static const size_t max
{
48
};
const auto event_idx_
{
m::index(std::nothrow, event_id_)
};
const m::event::refs refs
{
event_idx_
};
size_t num{0}, i{0};
std::array<event::idx, max> next_idx;
refs.for_each(dbs::ref::NEXT, [this, &next_idx, &num]
(const event::idx &event_idx, const auto &ref_type)
{
assert(ref_type == dbs::ref::NEXT);
if(event_idx < opts.ref.first || event_idx > opts.ref.second)
return true;
next_idx.at(num) = event_idx;
return ++num < next_idx.size();
});
if(!num)
return false;
unique_mutable_buffer _buf
{
(event::MAX_SIZE * num) + 16_KiB
};
mutable_buffer buf{_buf};
json::stack out{buf};
{
json::stack::object top
{
out
};
json::stack::member
{
top, "origin", m::my_host()
};
json::stack::member
{
top, "origin_server_ts", json::value
{
ircd::time<milliseconds>()
}
};
json::stack::array pdus
{
top, "pdus"
};
m::event::fetch event;
for(i = 0; i < num; ++i)
{
if(!seek(std::nothrow, event, next_idx.at(i)))
continue;
pdus.append(event.source);
log::debug
{
log, "Gossip %zu/%zu in %s for %s to '%s' %s idx:%lu",
i,
num,
string_view{opts.room.room_id},
string_view{event_id_},
remote_,
string_view{event.event_id},
event.event_idx,
};
}
}
if(!i)
return false;
const string_view txn
{
out.completed()
};
consume(buf, size(txn));
const string_view txnid
{
m::txn::create_id(buf, txn)
};
consume(buf, size(txnid));
const string_view remote
{
strlcpy{buf, remote_}
};
consume(buf, size(remote));
const string_view event_id
{
strlcpy{buf, event_id_}
};
consume(buf, size(event_id));
assert(!empty(buf));
char pbuf[48];
log::debug
{
log, "Gossip %zu/%zu in %s for %s to '%s' txn[%s] %s",
i,
num,
string_view{opts.room.room_id},
string_view{event_id_},
remote_,
txnid,
pretty(pbuf, iec(size(txn))),
};
m::fed::send::opts fedopts;
fedopts.remote = remote;
requests.emplace_back(result
{
std::move(_buf),
txn,
txnid,
remote,
event_id,
m::fed::send
{
txnid,
txn,
buf,
std::move(fedopts)
}
});
return true;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::error
{
log, "Gossip %s in %s from '%s' :%s",
string_view{event_id_},
string_view{opts.room.room_id},
remote_,
e.what(),
};
return false;
}
bool
ircd::m::gossip::handle()
{
if(requests.empty())
return false;
auto next
{
ctx::when_any(std::begin(requests), std::end(requests), []
(auto &it) -> ctx::future<http::code> &
{
return it->request;
})
};
const milliseconds timeout
{
full()? 5000: 50
};
ctx::interruption_point();
if(!next.wait(timeout, std::nothrow))
return full();
const unique_iterator it
{
requests, next.get()
};
assert(it.it != std::end(requests));
return handle(*it.it);
}
bool
ircd::m::gossip::handle(result &result)
try
{
auto response
{
result.request.get()
};
const json::object body
{
result.request
};
fed::send::response{body}.for_each_pdu([&]
(const event::id &event_id, const json::object &errors)
{
const bool ok
{
empty(errors)
};
log::logf
{
log, ok? log::level::DEBUG: log::level::DERROR,
"Gossip %s in %s to '%s'%s%s",
string_view{event_id},
string_view{opts.room.room_id},
string_view{result.remote},
!ok? " :"_sv: " "_sv,
string_view{errors},
};
});
return true;
}
catch(const ctx::interrupted &e)
{
throw;
}
catch(const std::exception &e)
{
log::logf
{
log, log::level::DERROR,
"Gossip %s in %s to '%s' :%s",
string_view{result.event_id},
string_view{opts.room.room_id},
string_view{result.remote},
e.what(),
};
return true;
}
bool
ircd::m::gossip::started(const event::id &event_id,
const string_view &remote)
const
{
const auto it
{
std::find_if(std::begin(requests), std::end(requests), [&]
(const auto &result)
{
return result.event_id == event_id && result.remote == remote;
})
};
return it != std::end(requests);
}
bool
ircd::m::gossip::full()
const noexcept
{
return requests.size() >= opts.width;
}