construct/modules/client/sync/rooms/state.cc

679 lines
15 KiB
C++

// Matrix Construct
//
// Copyright (C) Matrix Construct Developers, Authors & Contributors
// Copyright (C) 2016-2018 Jason Volk <jason@zemos.net>
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice is present in all copies. The
// full license for this software is available in the LICENSE file.
namespace ircd::m::sync
{
static bool room_state_phased_member_events(data &, json::stack::array &);
static bool room_state_phased_events(data &);
static bool room_state_phased_prefetch(data &);
static bool room_state_polylog_events(data &);
static bool room_state_polylog_prefetch(data &);
static bool _room_state_polylog(data &);
static bool room_state_polylog(data &);
static bool room_invite_state_polylog(data &);
static bool room_state_linear_events(data &);
static bool room_invite_state_linear(data &);
static bool room_state_linear(data &);
static const size_t MEMBER_SCAN_MAX {32};
extern conf::item<size_t> member_scan_max;
extern conf::item<bool> lazyload_members_enable;
extern conf::item<bool> crazyload_historical_members;
extern item room_invite_state;
extern item room_state;
}
ircd::mapi::header
IRCD_MODULE
{
"Client Sync :Room State"
};
decltype(ircd::m::sync::room_state)
ircd::m::sync::room_state
{
"rooms.state",
room_state_polylog,
room_state_linear,
{
{ "phased", true },
{ "prefetch", true },
}
};
decltype(ircd::m::sync::room_invite_state)
ircd::m::sync::room_invite_state
{
"rooms.invite_state",
room_invite_state_polylog,
room_invite_state_linear,
{
{ "phased", true },
}
};
decltype(ircd::m::sync::member_scan_max)
ircd::m::sync::member_scan_max
{
{ "name", "ircd.m.sync.rooms.state.members.scan.max" },
{ "default", 12L },
};
decltype(ircd::m::sync::lazyload_members_enable)
ircd::m::sync::lazyload_members_enable
{
{ "name", "ircd.m.sync.rooms.state.members.lazyload" },
{ "default", true },
};
decltype(ircd::m::sync::crazyload_historical_members)
ircd::m::sync::crazyload_historical_members
{
{ "name", "ircd.m.sync.rooms.state.members.historical" },
{ "default", false },
};
bool
ircd::m::sync::room_state_linear(data &data)
{
if(data.membership == "invite")
return false;
return room_state_linear_events(data);
}
bool
ircd::m::sync::room_invite_state_linear(data &data)
{
if(data.membership != "invite")
return false;
return room_state_linear_events(data);
}
bool
ircd::m::sync::room_state_linear_events(data &data)
{
if(!data.event_idx)
return false;
if(!data.room)
return false;
if(!data.membership)
return false;
assert(data.event);
if(!defined(json::get<"state_key"_>(*data.event)))
return false;
const bool is_own_membership
{
json::get<"type"_>(*data.event) == "m.room.member"
&& json::get<"state_key"_>(*data.event) == data.user.user_id
};
const bool is_own_join
{
is_own_membership
&& data.membership == "join"
};
const bool is_own_room_join
{
(is_own_join || m::creation(*data.event, data.user))
&& m::creator(*data.room, data.user)
};
if(is_own_join && !is_own_room_join)
{
// Special case gimmick; this effectively stops the linear-sync at this
// event and has /sync respond with a token containing a flag. When the
// client makes the next request with this flag we treat it as if they
// were using the ?full_state=true query parameter. This will enter the
// polylog handler instead of the linear handler (here) so as to
// efficiently sync the entire room's state to the client; as we cannot
// perform that feat from this handler.
data.reflow_full_state = true;
return false;
}
const ssize_t &viewport_size
{
room::events::viewport_size
};
const auto sounding
{
data.room_depth - json::get<"depth"_>(*data.event)
};
// Figure out whether the event was included in the timeline
const bool viewport_visible
{
viewport_size <= 0
|| data.membership == "invite"
|| sounding <= viewport_size
};
// Query whether this state cell has been overwritten. Unlike the timeline,
// the state field will not be processed sequentially by our client.
const bool stale
{
m::room::state::next(data.event_idx) != 0
};
if(!viewport_visible && stale)
return false;
json::stack::object rooms
{
*data.out, "rooms"
};
json::stack::object membership_
{
*data.out, data.membership
};
json::stack::object room_
{
*data.out, data.room->room_id
};
const auto &state_member_name
{
data.membership == "invite"?
"invite_state": // "invite_state"_sv:
"state"
};
json::stack::object state
{
*data.out, state_member_name
};
json::stack::array array
{
*data.out, "events"
};
bool ret{false};
const auto append
{
[&data, &array, &ret](const event::idx &event_idx)
{
const event::fetch event
{
std::nothrow, event_idx
};
ret |= event.valid && m::event::append
{
array, event,
{
.event_idx = event_idx,
.user_id = data.user.user_id,
.user_room_id = data.user_room.room_id,
.room_depth = data.room_depth,
.query_txnid = false,
.query_prev_state = true,
}
};
return true;
}
};
if(is_own_membership && data.membership == "invite")
{
const m::room::state state{*data.room};
state.get(std::nothrow, "m.room.create", "", append);
state.get(std::nothrow, "m.room.join_rules", "", append);
state.get(std::nothrow, "m.room.power_levels", "", append);
state.get(std::nothrow, "m.room.history_visibility", "", append);
state.get(std::nothrow, "m.room.avatar", "", append);
state.get(std::nothrow, "m.room.name", "", append);
state.get(std::nothrow, "m.room.canonical_alias", "", append);
state.get(std::nothrow, "m.room.aliases", my_host(), append);
}
// Branch for supplying state to the client after its user's invite
// is processed. At this point the client has not received prior room
// state in /sync.
if(is_own_membership && data.membership == "invite")
{
const m::room::state state{*data.room};
const auto &sender
{
json::get<"sender"_>(*data.event)
};
state.get(std::nothrow, "m.room.member", sender, append);
}
ret |= m::event::append
{
array, *data.event,
{
.event_idx = data.event_idx,
.user_id = data.user.user_id,
.user_room_id = data.user_room.room_id,
.room_depth = data.room_depth,
.query_txnid = false,
.query_prev_state = true,
}
};
return ret;
}
bool
ircd::m::sync::room_state_polylog(data &data)
{
if(data.membership == "invite")
return false;
return _room_state_polylog(data);
}
bool
ircd::m::sync::room_invite_state_polylog(data &data)
{
if(data.membership != "invite")
return false;
return _room_state_polylog(data);
}
bool
ircd::m::sync::_room_state_polylog(data &data)
{
assert(data.args);
const bool full_state_all
{
data.args->full_state &&
!has(std::get<2>(data.args->since), 'P')
};
if(likely(!full_state_all))
if(!data.phased && int64_t(data.range.first) > 0)
if(!apropos(data, data.room_head))
return false;
if(data.phased && data.range.first == 0)
{
if(data.prefetch)
return room_state_phased_prefetch(data);
else
return room_state_phased_events(data);
}
if(data.prefetch)
return room_state_polylog_prefetch(data);
else
return room_state_polylog_events(data);
}
bool
ircd::m::sync::room_state_polylog_prefetch(data &data)
{
return false;
}
bool
ircd::m::sync::room_state_polylog_events(data &data)
{
bool ret{false};
ctx::mutex mutex;
json::stack::array array
{
*data.out, "events"
};
static const auto num(64); //TODO: XXX
sync::pool.min(num);
unsigned long long a_mask[2] {0};
allocator::state a(num, a_mask);
std::vector<m::event::fetch> events(num);
ctx::concurrent<event::idx> concurrent
{
sync::pool, [&data, &ret, &mutex, &array, &events, &a](const auto &event_idx)
{
const auto i(a.allocate(1)); const unwind i_{[&a, &i]
{
a.deallocate(i, 1);
}};
assert(i < events.size());
auto &event(events.at(i));
if(!m::seek(std::nothrow, event, event_idx))
{
log::error
{
log, "Failed to fetch event idx:%lu in room %s state.",
event_idx,
string_view{data.room->room_id},
};
assert(!event.valid);
return;
}
assert(event.valid);
const std::lock_guard lock{mutex};
ret |= m::event::append
{
array, event,
{
.event_idx = event_idx,
.user_id = data.user.user_id,
.user_room_id = data.user_room.room_id,
.room_depth = data.room_depth,
.query_txnid = false,
.query_prev_state = false,
}
};
}
};
const auto &room_filter
{
json::get<"room"_>(data.filter)
};
const auto &state_filter
{
json::get<"state"_>(room_filter)
};
const room::state state
{
*data.room
};
const auto &lazyload_members
{
lazyload_members_enable
&& json::get<"lazy_load_members"_>(state_filter)
};
const bool full_state_reflow
{
data.args->full_state
&& has(std::get<2>(data.args->since), 'P')
};
const bool full_state_all
{
data.args->full_state
&& !full_state_reflow
};
state.for_each([&data, &concurrent, &lazyload_members]
(const string_view &type, const string_view &state_key, const event::idx &event_idx)
{
// Conditions to skip state when not forcing full_state
if(likely(!data.args->full_state))
{
// If this event is not in the sync range
if(likely(!apropos(data, event_idx)))
return true;
// Branch for crazy/lazyloading conditions to skip.
if(type == "m.room.member")
{
if(lazyload_members)
return true;
if(!crazyload_historical_members)
if(data.membership == "leave" || data.membership == "ban")
return true;
}
}
this_ctx::interruption_point();
concurrent(event_idx);
return true;
});
const ctx::uninterruptible::nothrow ui;
concurrent.wait();
return ret;
}
bool
ircd::m::sync::room_state_phased_prefetch(data &data)
{
const size_t configured_count
{
std::min(size_t(room::events::viewport_size), size_t(member_scan_max))
};
const size_t member_count
{
std::min(configured_count, MEMBER_SCAN_MAX)
};
const std::pair<string_view, string_view> state_keys[]
{
{ "m.room.create", "" },
{ "m.room.canonical_alias", "" },
{ "m.room.name", "" },
{ "m.room.avatar", "" },
{ "m.room.aliases", data.user.user_id.host() },
{ "m.room.member", data.user.user_id },
};
// Prefetch the state cells
const room::state state
{
*data.room
};
for(const auto &[type, state_key] : state_keys)
state.prefetch(type, state_key);
m::room::events events
{
*data.room
};
// Prefetch the senders of the recent room events
for(size_t i(0); events && i < member_count; --events, ++i)
m::prefetch(events.event_idx(), "sender");
return true;
}
bool
ircd::m::sync::room_state_phased_events(data &data)
{
bool ret{false};
ctx::mutex mutex;
json::stack::array array
{
*data.out, "events"
};
const auto append
{
[&data, &array, &ret, &mutex]
(const m::event::idx &event_idx, const m::event &event)
{
const std::lock_guard lock{mutex};
ret |= m::event::append
{
array, event,
{
.event_idx = event_idx,
.user_id = data.user.user_id,
.user_room_id = data.user_room.room_id,
.room_depth = data.room_depth,
.query_txnid = false,
.query_prev_state = true,
}
};
}
};
const std::pair<string_view, string_view> keys[]
{
{ "m.room.create", "" },
{ "m.room.canonical_alias", "" },
{ "m.room.name", "" },
{ "m.room.avatar", "" },
{ "m.room.aliases", data.user.user_id.host() },
{ "m.room.member", data.user.user_id },
};
const room::state state
{
*data.room
};
// Fetch the state cells and prefetch the event data
size_t i(0);
size_t prev_content_prefetched(0);
static const auto num_keys(sizeof(keys) / sizeof(*keys));
std::array<event::idx, num_keys> event_idx;
for(const auto &[type, state_key] : keys)
{
auto &idx(event_idx.at(i++));
idx = state.get(std::nothrow, type, state_key);
// Prefetch the content of the previous state for event::append()
if(likely(type != "m.room.create"))
{
const auto &prev_idx
{
room::state::prev(idx)
};
prev_content_prefetched += m::prefetch(prev_idx, "content");
}
}
// Fetch the event data and stream to client
m::event::fetch event;
assert(i <= event_idx.size());
for(i = 0; i < event_idx.size(); ++i) try
{
if(!event_idx.at(i))
continue;
seek(event, event_idx.at(i));
append(event_idx.at(i), event);
}
catch(const std::exception &e)
{
const auto &[type, state_key]
{
keys[i]
};
log::error
{
log, "Failed to find event_idx:%lu in room %s state (%s,%s)",
event_idx.at(i),
string_view{data.room->room_id},
type,
state_key,
};
}
if(data.membership == "join")
ret |= room_state_phased_member_events(data, array);
return ret;
}
bool
ircd::m::sync::room_state_phased_member_events(data &data,
json::stack::array &array)
{
// The number of recent room events we'll seek senders for.
const size_t configured_count
{
std::min(size_t(room::events::viewport_size), size_t(member_scan_max))
};
const size_t count
{
std::min(configured_count, MEMBER_SCAN_MAX)
};
m::room::events it
{
*data.room
};
// Prefetch the senders of the recent room events
size_t i(0), prefetched(0);
std::array<event::idx, MEMBER_SCAN_MAX> event_idx;
assert(count <= MEMBER_SCAN_MAX && count <= event_idx.size());
for(; it && i < count; --it, ++i)
{
event_idx[i] = it.event_idx();
prefetched += m::prefetch(event_idx[i], "sender");
}
// Transform the senders into member event::idx's and prefetch events
std::transform(begin(event_idx), begin(event_idx) + i, begin(event_idx), [&data]
(const m::event::idx &event_idx)
{
const event::idx &member_idx
{
m::query(std::nothrow, event_idx, "sender", [&data]
(const string_view &sender)
{
return data.room->get(std::nothrow, "m.room.member", sender);
})
};
m::prefetch(member_idx);
return member_idx;
});
// Eliminate duplicate member event::idx
std::sort(begin(event_idx), begin(event_idx) + i);
const auto end(std::unique(begin(event_idx), begin(event_idx) + i));
assert(std::distance(begin(event_idx), end) > 0 || i == 0);
// Fetch and stream those member events to client
bool ret{false};
m::event::fetch event;
std::for_each(begin(event_idx), end, [&data, &array, &ret, &event]
(const event::idx &sender_idx)
{
if(!seek(std::nothrow, event, sender_idx))
return;
ret |= m::event::append
{
array, event,
{
.event_idx = sender_idx,
.user_id = data.user.user_id,
.user_room_id = data.user_room.room_id,
.room_depth = data.room_depth,
.query_txnid = false,
.query_prev_state = false,
}
};
});
return ret;
}