construct/matrix/room_head_fetch.cc

288 lines
6.0 KiB
C++

// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
decltype(ircd::m::room::head::fetch::timeout)
ircd::m::room::head::fetch::timeout
{
{ "name", "ircd.m.room.head.fetch.timeout" },
{ "default", 10 * 1000L },
};
ircd::m::event::id::buf
ircd::m::room::head::fetch::one(const id &room_id,
const string_view &remote,
const user::id &user_id)
{
const unique_mutable_buffer buf
{
16_KiB
};
const m::event event
{
one(buf, room_id, remote, user_id)
};
const m::event::prev prev
{
event
};
return prev.prev_event(0);
}
ircd::m::event
ircd::m::room::head::fetch::one(const mutable_buffer &out,
const id &room_id,
const string_view &remote,
const user::id &user_id_)
{
const m::room room
{
room_id
};
// When no user_id is supplied and the room exists locally we attempt
// to find the user_id of one of our users with membership in the room.
// This satisfies synapse's requirements for whether we have access
// to the response. If user_id remains blank then make_join will later
// generate a random one from our host as well.
m::user::id::buf user_id
{
!user_id_?
any_user(room, my_host(), "join"):
user_id_
};
// Make another attempt to find an invited user because that carries some
// value (this query is not as fast as querying join memberships).
if(!user_id)
user_id = any_user(room, my_host(), "invite");
const bool internal_alloc
{
size(out) < 16_KiB
};
const unique_mutable_buffer internal_buf
{
internal_alloc? 16_KiB: 0_KiB
};
const mutable_buffer buf
{
internal_alloc? internal_buf: out
};
assert(size(buf) >= 16_KiB);
fed::make_join::opts opts;
opts.remote = remote;
opts.dynamic = false;
fed::make_join request
{
room_id, user_id, buf, std::move(opts)
};
const auto code
{
request.get(milliseconds(timeout))
};
const json::object proto
{
request.in.content
};
const json::object event
{
proto["event"]
};
const size_t moved
{
move(out, string_view(event))
};
assert(moved == size(string_view(event)));
return json::object
{
string_view
{
data(out), moved
}
};
}
//
// fetch::fetch
//
ircd::m::room::head::fetch::fetch(const opts &opts,
const closure &closure)
{
const m::room room
{
opts.room_id
};
// When the room isn't public we need to supply a user_id of one of our
// users in the room to satisfy matrix protocol requirements upstack.
const m::user::id::buf user_id
{
!opts.user_id?
m::any_user(room, origin(my()), "join"):
opts.user_id
};
std::tuple<id::event::buf, int64_t, event::idx> top;
std::get<2>(top) = std::get<2>(opts.top);
std::get<1>(top) = std::get<1>(opts.top);
if(std::get<0>(opts.top))
std::get<0>(top) = std::get<0>(opts.top);
time_t top_ots {0};
auto &[top_event_id, top_depth, top_idx]
{
top
};
if(!top_event_id && !top_idx)
top = m::top(std::nothrow, room);
if(!top_event_id)
top_event_id = m::head(std::nothrow, room);
if(!top_idx)
top_idx = m::index(std::nothrow, top_event_id);
if(!top_depth)
m::get(std::nothrow, top_idx, "depth", top_depth);
if(!top_ots)
m::get(std::nothrow, top_idx, "origin_server_ts", top_ots);
char tmbuf[48];
log::debug
{
log, "Resynchronizing %s from %s [relative idx:%lu depth:%ld %s] from %zu joined servers...",
string_view{room.room_id},
string_view{top_event_id},
top_idx,
top_depth,
microdate(tmbuf),
room::origins(room).count(),
};
m::event result;
if(likely(closure))
json::get<"room_id"_>(result) = opts.room_id;
feds::opts fopts;
fopts.op = feds::op::head;
fopts.room_id = room.room_id;
fopts.user_id = user_id;
fopts.closure_errors = false; // exceptions wil not propagate feds::execute
fopts.exclude_myself = true;
fopts.timeout = milliseconds(timeout);
feds::execute(fopts, [this, &opts, &top, &top_ots, &closure, &result]
(const auto &response)
{
m::event event
{
response.object.get("event")
};
const event::prev prev
{
event
};
const auto heads
{
prev.prev_events_count()
};
// The depth comes back as one greater than any existing
// depth so we subtract one.
const auto depth
{
int64_t(std::max(uint64_t(json::get<"depth"_>(event)) - 1, 0UL))
};
const auto &ots
{
json::get<"origin_server_ts"_>(event)
};
const auto &[top_event_id, top_depth, top_idx]
{
top
};
this->respond += 1;
this->heads += heads;
this->ots[0] += ots < top_ots;
this->ots[1] += ots == top_ots;
this->ots[2] += ots > top_ots;
this->depth[0] += depth < top_depth;
this->depth[1] += depth == top_depth;
this->depth[2] += depth > top_depth;
if(likely(closure))
{
json::get<"origin"_>(result) = response.origin;
json::get<"origin_server_ts"_>(result) = ots;
json::get<"depth"_>(result) = depth;
}
size_t i(0);
return m::for_each(prev, [this, &opts, &closure, &result, &i]
(const event::id &event_id)
{
if(unlikely(i++ > opts.max_results_per_server))
return true;
if(unlikely(this->head.size() >= opts.max_results))
return false;
auto it
{
this->head.lower_bound(event_id)
};
if(likely(opts.unique))
if(it != std::end(this->head) && *it == event_id)
{
++this->concur;
return true;
}
if(likely(!opts.existing))
if(m::exists(event_id))
{
++this->exists;
return true;
}
if(likely(opts.unique))
{
it = this->head.emplace_hint(it, event_id);
result.event_id = *it;
}
else result.event_id = event_id;
if(likely(closure))
return closure(result);
return true;
});
});
}