mirror of
https://github.com/matrix-construct/construct
synced 2025-01-13 16:33:53 +01:00
287 lines
6 KiB
C++
287 lines
6 KiB
C++
// The Construct
|
|
//
|
|
// Copyright (C) The Construct Developers, Authors & Contributors
|
|
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
|
|
//
|
|
// Permission to use, copy, modify, and/or distribute this software for any
|
|
// purpose with or without fee is hereby granted, provided that the above
|
|
// copyright notice and this permission notice is present in all copies. The
|
|
// full license for this software is available in the LICENSE file.
|
|
|
|
decltype(ircd::m::room::head::fetch::timeout)
|
|
ircd::m::room::head::fetch::timeout
|
|
{
|
|
{ "name", "ircd.m.room.head.fetch.timeout" },
|
|
{ "default", 10 * 1000L },
|
|
};
|
|
|
|
ircd::m::event::id::buf
|
|
ircd::m::room::head::fetch::one(const id &room_id,
|
|
const string_view &remote,
|
|
const user::id &user_id)
|
|
{
|
|
const unique_mutable_buffer buf
|
|
{
|
|
16_KiB
|
|
};
|
|
|
|
const m::event event
|
|
{
|
|
one(buf, room_id, remote, user_id)
|
|
};
|
|
|
|
const m::event::prev prev
|
|
{
|
|
event
|
|
};
|
|
|
|
return prev.prev_event(0);
|
|
}
|
|
|
|
ircd::m::event
|
|
ircd::m::room::head::fetch::one(const mutable_buffer &out,
|
|
const id &room_id,
|
|
const string_view &remote,
|
|
const user::id &user_id_)
|
|
{
|
|
const m::room room
|
|
{
|
|
room_id
|
|
};
|
|
|
|
// When no user_id is supplied and the room exists locally we attempt
|
|
// to find the user_id of one of our users with membership in the room.
|
|
// This satisfies synapse's requirements for whether we have access
|
|
// to the response. If user_id remains blank then make_join will later
|
|
// generate a random one from our host as well.
|
|
m::user::id::buf user_id
|
|
{
|
|
!user_id_?
|
|
any_user(room, my_host(), "join"):
|
|
user_id_
|
|
};
|
|
|
|
// Make another attempt to find an invited user because that carries some
|
|
// value (this query is not as fast as querying join memberships).
|
|
if(!user_id)
|
|
user_id = any_user(room, my_host(), "invite");
|
|
|
|
const bool internal_alloc
|
|
{
|
|
size(out) < 16_KiB
|
|
};
|
|
|
|
const unique_mutable_buffer internal_buf
|
|
{
|
|
internal_alloc? 16_KiB: 0_KiB
|
|
};
|
|
|
|
const mutable_buffer buf
|
|
{
|
|
internal_alloc? internal_buf: out
|
|
};
|
|
|
|
assert(size(buf) >= 16_KiB);
|
|
fed::make_join::opts opts;
|
|
opts.remote = remote;
|
|
opts.dynamic = false;
|
|
fed::make_join request
|
|
{
|
|
room_id, user_id, buf, std::move(opts)
|
|
};
|
|
|
|
const auto code
|
|
{
|
|
request.get(milliseconds(timeout))
|
|
};
|
|
|
|
const json::object proto
|
|
{
|
|
request.in.content
|
|
};
|
|
|
|
const json::object event
|
|
{
|
|
proto["event"]
|
|
};
|
|
|
|
const size_t moved
|
|
{
|
|
move(out, string_view(event))
|
|
};
|
|
|
|
assert(moved == size(string_view(event)));
|
|
return json::object
|
|
{
|
|
string_view
|
|
{
|
|
data(out), moved
|
|
}
|
|
};
|
|
}
|
|
|
|
//
|
|
// fetch::fetch
|
|
//
|
|
|
|
ircd::m::room::head::fetch::fetch(const opts &opts,
|
|
const closure &closure)
|
|
{
|
|
const m::room room
|
|
{
|
|
opts.room_id
|
|
};
|
|
|
|
// When the room isn't public we need to supply a user_id of one of our
|
|
// users in the room to satisfy matrix protocol requirements upstack.
|
|
const m::user::id::buf user_id
|
|
{
|
|
!opts.user_id?
|
|
m::any_user(room, origin(my()), "join"):
|
|
opts.user_id
|
|
};
|
|
|
|
std::tuple<id::event::buf, int64_t, event::idx> top;
|
|
std::get<2>(top) = std::get<2>(opts.top);
|
|
std::get<1>(top) = std::get<1>(opts.top);
|
|
if(std::get<0>(opts.top))
|
|
std::get<0>(top) = std::get<0>(opts.top);
|
|
|
|
time_t top_ots {0};
|
|
auto &[top_event_id, top_depth, top_idx]
|
|
{
|
|
top
|
|
};
|
|
|
|
if(!top_event_id && !top_idx)
|
|
top = m::top(std::nothrow, room);
|
|
|
|
if(!top_event_id)
|
|
top_event_id = m::head(std::nothrow, room);
|
|
|
|
if(!top_idx)
|
|
top_idx = m::index(std::nothrow, top_event_id);
|
|
|
|
if(!top_depth)
|
|
m::get(std::nothrow, top_idx, "depth", top_depth);
|
|
|
|
if(!top_ots)
|
|
m::get(std::nothrow, top_idx, "origin_server_ts", top_ots);
|
|
|
|
char tmbuf[48];
|
|
log::debug
|
|
{
|
|
log, "Resynchronizing %s from %s [relative idx:%lu depth:%ld %s] from %zu joined servers...",
|
|
string_view{room.room_id},
|
|
string_view{top_event_id},
|
|
top_idx,
|
|
top_depth,
|
|
microdate(tmbuf),
|
|
room::origins(room).count(),
|
|
};
|
|
|
|
m::event result;
|
|
if(likely(closure))
|
|
json::get<"room_id"_>(result) = opts.room_id;
|
|
|
|
feds::opts fopts;
|
|
fopts.op = feds::op::head;
|
|
fopts.room_id = room.room_id;
|
|
fopts.user_id = user_id;
|
|
fopts.closure_errors = false; // exceptions wil not propagate feds::execute
|
|
fopts.exclude_myself = true;
|
|
fopts.timeout = milliseconds(timeout);
|
|
feds::execute(fopts, [this, &opts, &top, &top_ots, &closure, &result]
|
|
(const auto &response)
|
|
{
|
|
m::event event
|
|
{
|
|
response.object.get("event")
|
|
};
|
|
|
|
const event::prev prev
|
|
{
|
|
event
|
|
};
|
|
|
|
const auto heads
|
|
{
|
|
prev.prev_events_count()
|
|
};
|
|
|
|
// The depth comes back as one greater than any existing
|
|
// depth so we subtract one.
|
|
const auto depth
|
|
{
|
|
int64_t(std::max(uint64_t(json::get<"depth"_>(event)) - 1, 0UL))
|
|
};
|
|
|
|
const auto &ots
|
|
{
|
|
json::get<"origin_server_ts"_>(event)
|
|
};
|
|
|
|
const auto &[top_event_id, top_depth, top_idx]
|
|
{
|
|
top
|
|
};
|
|
|
|
this->respond += 1;
|
|
this->heads += heads;
|
|
this->ots[0] += ots < top_ots;
|
|
this->ots[1] += ots == top_ots;
|
|
this->ots[2] += ots > top_ots;
|
|
this->depth[0] += depth < top_depth;
|
|
this->depth[1] += depth == top_depth;
|
|
this->depth[2] += depth > top_depth;
|
|
|
|
if(likely(closure))
|
|
{
|
|
json::get<"origin"_>(result) = response.origin;
|
|
json::get<"origin_server_ts"_>(result) = ots;
|
|
json::get<"depth"_>(result) = depth;
|
|
}
|
|
|
|
size_t i(0);
|
|
return m::for_each(prev, [this, &opts, &closure, &result, &i]
|
|
(const event::id &event_id)
|
|
{
|
|
if(unlikely(i++ > opts.max_results_per_server))
|
|
return true;
|
|
|
|
if(unlikely(this->head.size() >= opts.max_results))
|
|
return false;
|
|
|
|
auto it
|
|
{
|
|
this->head.lower_bound(event_id)
|
|
};
|
|
|
|
if(likely(opts.unique))
|
|
if(it != std::end(this->head) && *it == event_id)
|
|
{
|
|
++this->concur;
|
|
return true;
|
|
}
|
|
|
|
if(likely(!opts.existing))
|
|
if(m::exists(event_id))
|
|
{
|
|
++this->exists;
|
|
return true;
|
|
}
|
|
|
|
if(likely(opts.unique))
|
|
{
|
|
it = this->head.emplace_hint(it, event_id);
|
|
result.event_id = *it;
|
|
}
|
|
else result.event_id = event_id;
|
|
|
|
if(likely(closure))
|
|
return closure(result);
|
|
|
|
return true;
|
|
});
|
|
});
|
|
}
|