0
0
Fork 0
mirror of https://github.com/matrix-construct/construct synced 2024-06-07 12:38:56 +02:00

ircd:Ⓜ️:room::head: Add fetch interface.

This commit is contained in:
Jason Volk 2020-11-05 17:58:17 -08:00
parent 45def842c4
commit 6fe770b4bf
9 changed files with 361 additions and 86 deletions

View file

@ -40,11 +40,10 @@ namespace ircd::m
///
struct ircd::m::room::head
{
struct fetch;
struct generate;
using closure = std::function<bool (const event::idx &, const event::id &)>;
static conf::item<milliseconds> fetch_timeout;
m::room room;
bool for_each(const closure &) const;
@ -56,8 +55,6 @@ struct ircd::m::room::head
:room{room}
{}
static event::id::buf fetch(const id &, const string_view &remote, const id::user &);
static event::id::buf fetch(const id &, const string_view &remote);
static void modify(const event::id &, const db::op &, const bool &);
static size_t rebuild(const head &);
static size_t reset(const head &);

View file

@ -0,0 +1,74 @@
// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
#pragma once
#define HAVE_IRCD_M_ROOM_HEAD_FETCH_H
/// Fetch and determine the latest head information from all servers.
struct ircd::m::room::head::fetch
{
struct opts;
using closure = std::function<bool (const m::event &)>;
static conf::item<milliseconds> timeout;
/// Count of responding servers.
size_t respond {0};
/// Counts of servers reporting depth [behind, equal, ahead] relative to us.
size_t depth[3] {0};
/// Count of servers reporting origin_server_ts [behind, equal, ahead].
size_t ots[3] {0};
/// Total number of heads reported from all servers (including duplicates)
size_t heads {0};
/// Total number of concurrences for non-existent heads
size_t concur {0};
/// Total number of concurrences for existing heads.
size_t exists {0};
/// Running (and final) results when opts.unique=true; otherwise the
/// closure is the only way to receive results.
std::set<event::id::buf, std::less<>> head;
// Primary operation; synchronous construction with results provided to
// closure asynchronously.
fetch(const opts &, const closure & = {});
// Convenience operation
static event::id::buf one(const id &, const string_view &remote, const id::user &);
static event::id::buf one(const id &, const string_view &remote);
};
struct ircd::m::room::head::fetch::opts
{
/// Room apropos.
m::id::room room_id;
/// User for non-public rooms; note if not given one will be determined
/// automatically.
m::id::user user_id;
/// Local reference frame; determined internally if not provided.
std::tuple<m::id::event, int64_t, event::idx> top
{
m::id::event{}, 0L, 0UL
};
/// When true, results are stored in the head set and duplicate results
/// are not provided to the closure. When false, the head set is not used.
bool unique {true};
/// When true, results may include events this server already has executed.
bool existing {false};
};

View file

@ -198,6 +198,7 @@ struct ircd::m::room
#include "content.h"
#include "head.h"
#include "head_generate.h"
#include "head_fetch.h"
#include "auth.h"
#include "power.h"
#include "aliases.h"

View file

@ -107,6 +107,7 @@ libircd_matrix_la_SOURCES += room_bootstrap.cc
libircd_matrix_la_SOURCES += room_create.cc
libircd_matrix_la_SOURCES += room_events.cc
libircd_matrix_la_SOURCES += room_head.cc
libircd_matrix_la_SOURCES += room_head_fetch.cc
libircd_matrix_la_SOURCES += room_join.cc
libircd_matrix_la_SOURCES += room_leave.cc
libircd_matrix_la_SOURCES += room_visible.cc

View file

@ -406,7 +406,7 @@ ircd::m::fed::backfill::backfill(const room::id &room_id,
m::event::id::buf event_id_buf;
if(!opts.event_id)
{
event_id_buf = m::room::head::fetch(room_id, opts.remote);
event_id_buf = m::room::head::fetch::one(room_id, opts.remote);
opts.event_id = event_id_buf;
}

View file

@ -8,86 +8,6 @@
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
decltype(ircd::m::room::head::fetch_timeout)
ircd::m::room::head::fetch_timeout
{
{ "name", "ircd.m.room.head.fetch.timeout" },
{ "default", 30 * 1000L },
};
ircd::m::event::id::buf
ircd::m::room::head::fetch(const id &room_id,
const string_view &remote)
{
const m::room room
{
room_id
};
// When no user_id is supplied and the room exists locally we attempt
// to find the user_id of one of our users with membership in the room.
// This satisfies synapse's requirements for whether we have access
// to the response. If user_id remains blank then make_join will later
// generate a random one from our host as well.
m::user::id::buf user_id
{
any_user(room, my_host(), "join")
};
// Make another attempt to find an invited user because that carries some
// value (this query is not as fast as querying join memberships).
if(!user_id)
user_id = any_user(room, my_host(), "invite");
return fetch(room_id, remote, user_id);
}
ircd::m::event::id::buf
ircd::m::room::head::fetch(const id &room_id,
const string_view &remote,
const user::id &user_id)
{
const unique_buffer<mutable_buffer> buf
{
16_KiB
};
fed::make_join::opts opts;
opts.remote = remote;
opts.dynamic = false;
fed::make_join request
{
room_id, user_id, buf, std::move(opts)
};
const auto code
{
request.get(milliseconds(fetch_timeout))
};
const json::object proto
{
request.in.content
};
const json::object event
{
proto.at("event")
};
const m::event::prev prev
{
event
};
const auto &prev_event_id
{
prev.prev_event(0)
};
return prev_event_id;
}
namespace ircd::m
{
static void append_v1(json::stack::array &, const event::id &);

248
matrix/room_head_fetch.cc Normal file
View file

@ -0,0 +1,248 @@
// The Construct
//
// Copyright (C) The Construct Developers, Authors & Contributors
// Copyright (C) 2016-2020 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
decltype(ircd::m::room::head::fetch::timeout)
ircd::m::room::head::fetch::timeout
{
{ "name", "ircd.m.room.head.fetch.timeout" },
{ "default", 10 * 1000L },
};
ircd::m::event::id::buf
ircd::m::room::head::fetch::one(const id &room_id,
const string_view &remote)
{
const m::room room
{
room_id
};
// When no user_id is supplied and the room exists locally we attempt
// to find the user_id of one of our users with membership in the room.
// This satisfies synapse's requirements for whether we have access
// to the response. If user_id remains blank then make_join will later
// generate a random one from our host as well.
m::user::id::buf user_id
{
any_user(room, my_host(), "join")
};
// Make another attempt to find an invited user because that carries some
// value (this query is not as fast as querying join memberships).
if(!user_id)
user_id = any_user(room, my_host(), "invite");
return one(room_id, remote, user_id);
}
ircd::m::event::id::buf
ircd::m::room::head::fetch::one(const id &room_id,
const string_view &remote,
const user::id &user_id)
{
const unique_buffer<mutable_buffer> buf
{
16_KiB
};
fed::make_join::opts opts;
opts.remote = remote;
opts.dynamic = false;
fed::make_join request
{
room_id, user_id, buf, std::move(opts)
};
const auto code
{
request.get(milliseconds(timeout))
};
const json::object proto
{
request.in.content
};
const json::object event
{
proto.at("event")
};
const m::event::prev prev
{
event
};
const auto &prev_event_id
{
prev.prev_event(0)
};
return prev_event_id;
}
//
// fetch::fetch
//
ircd::m::room::head::fetch::fetch(const opts &opts,
const closure &closure)
{
const m::room room
{
opts.room_id
};
// When the room isn't public we need to supply a user_id of one of our
// users in the room to satisfy matrix protocol requirements upstack.
const m::user::id::buf user_id
{
!opts.user_id?
m::any_user(room, origin(my()), "join"):
opts.user_id
};
std::tuple<id::event::buf, int64_t, event::idx> top;
std::get<2>(top) = std::get<2>(opts.top);
std::get<1>(top) = std::get<1>(opts.top);
if(std::get<0>(opts.top))
std::get<0>(top) = std::get<0>(opts.top);
time_t top_ots {0};
auto &[top_event_id, top_depth, top_idx]
{
top
};
if(!top_event_id && !top_idx)
top = m::top(std::nothrow, room);
if(!top_event_id)
top_event_id = m::head(std::nothrow, room);
if(!top_idx)
top_idx = m::index(std::nothrow, top_event_id);
if(!top_depth)
m::get(std::nothrow, top_idx, "depth", top_depth);
if(!top_ots)
m::get(std::nothrow, top_idx, "origin_server_ts", top_ots);
char tmbuf[48];
log::debug
{
log, "Resynchronizing %s from %s [relative idx:%lu depth:%ld %s] from %zu joined servers...",
string_view{room.room_id},
string_view{top_event_id},
top_idx,
top_depth,
microdate(tmbuf),
room::origins(room).count(),
};
m::event result;
if(likely(closure))
json::get<"room_id"_>(result) = opts.room_id;
feds::opts fopts;
fopts.op = feds::op::head;
fopts.room_id = room.room_id;
fopts.user_id = user_id;
fopts.closure_errors = false; // exceptions wil not propagate feds::execute
fopts.exclude_myself = true;
fopts.timeout = milliseconds(timeout);
feds::execute(fopts, [this, &opts, &top, &top_ots, &closure, &result]
(const auto &response)
{
m::event event
{
response.object.get("event")
};
const event::prev prev
{
event
};
const auto heads
{
prev.prev_events_count()
};
// The depth comes back as one greater than any existing
// depth so we subtract one.
const auto &depth
{
std::max(json::get<"depth"_>(event) - 1L, 0L)
};
const auto &ots
{
json::get<"origin_server_ts"_>(event)
};
const auto &[top_event_id, top_depth, top_idx]
{
top
};
this->respond += 1;
this->heads += heads;
this->ots[0] += ots < top_ots;
this->ots[1] += ots == top_ots;
this->ots[2] += ots > top_ots;
this->depth[0] += depth < top_depth;
this->depth[1] += depth == top_depth;
this->depth[2] += depth > top_depth;
if(likely(closure))
{
json::get<"origin"_>(result) = response.origin;
json::get<"origin_server_ts"_>(result) = ots;
json::get<"depth"_>(result) = depth;
}
return m::for_each(prev, [this, &opts, &closure, &result]
(const event::id &event_id)
{
auto it
{
this->head.lower_bound(event_id)
};
if(likely(opts.unique))
if(it != std::end(this->head) && *it == event_id)
{
++this->concur;
return true;
}
if(likely(!opts.existing))
if(m::exists(event_id))
{
++this->exists;
return true;
}
if(likely(opts.unique))
{
it = this->head.emplace_hint(it, event_id);
result.event_id = *it;
}
else result.event_id = event_id;
if(likely(closure))
return closure(result);
return true;
});
});
}

View file

@ -280,7 +280,7 @@ get__initialsync_remote(client &client,
const auto head
{
m::room::head::fetch(room, remote, request.user_id)
m::room::head::fetch::one(room, remote, request.user_id)
};
m::room room_{room};

View file

@ -9195,6 +9195,40 @@ console_cmd__room__head__reset(opt &out, const string_view &line)
return true;
}
bool
console_cmd__room__head__fetch(opt &out, const string_view &line)
{
const params param{line, " ",
{
"room_id"
}};
const auto room_id
{
m::room_id(param.at("room_id"))
};
const m::room room
{
room_id
};
m::room::head::fetch::opts opts;
opts.room_id = room_id;
m::room::head::fetch
{
opts, [&out](const m::event &result)
{
out
<< pretty_oneline(result)
<< std::endl;
return true;
}
};
return true;
}
bool
console_cmd__room__sounding(opt &out, const string_view &line)
{