0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-12 04:52:26 +01:00

Return some room data in Sliding Sync /sync (#17320)

- Timeline events
 - Stripped `invite_state`

Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync
This commit is contained in:
Eric Eastwood 2024-07-02 11:07:05 -05:00 committed by GitHub
parent 0d2b75cf92
commit fa91655805
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 3593 additions and 259 deletions

View file

@ -0,0 +1 @@
Add `rooms` data to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View file

@ -836,3 +836,21 @@ def maybe_upsert_event_field(
del container[key] del container[key]
return upsert_okay return upsert_okay
def strip_event(event: EventBase) -> JsonDict:
"""
Used for "stripped state" events which provide a simplified view of the state of a
room intended to help a potential joiner identify the room (relevant when the user
is invited or knocked).
Stripped state events can only have the `sender`, `type`, `state_key` and `content`
properties present.
"""
return {
"type": event.type,
"state_key": event.state_key,
"content": event.content,
"sender": event.sender,
}

View file

@ -18,22 +18,28 @@
# #
# #
import logging import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
import attr
from immutabledict import immutabledict from immutabledict import immutabledict
from synapse.api.constants import AccountDataTypes, EventTypes, Membership from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase from synapse.events import EventBase
from synapse.storage.roommember import RoomsForUser from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.types import ( from synapse.types import (
JsonDict,
PersistedEventPosition, PersistedEventPosition,
Requester, Requester,
RoomStreamToken, RoomStreamToken,
StreamKeyType,
StreamToken, StreamToken,
UserID, UserID,
) )
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@ -41,28 +47,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser: def filter_membership_for_sync(
""" *, membership: str, user_id: str, sender: Optional[str]
Quick helper to convert an event to a `RoomsForUser` object. ) -> bool:
"""
# These fields should be present for all persisted events
assert event.internal_metadata.stream_ordering is not None
assert event.internal_metadata.instance_name is not None
return RoomsForUser(
room_id=event.room_id,
sender=event.sender,
membership=event.membership,
event_id=event.event_id,
event_pos=PersistedEventPosition(
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
room_version_id=event.room_version.identifier,
)
def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
""" """
Returns True if the membership event should be included in the sync response, Returns True if the membership event should be included in the sync response,
otherwise False. otherwise False.
@ -79,7 +66,54 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
# #
# This logic includes kicks (leave events where the sender is not the same user) and # This logic includes kicks (leave events where the sender is not the same user) and
# can be read as "anything that isn't a leave or a leave with a different sender". # can be read as "anything that isn't a leave or a leave with a different sender".
return membership != Membership.LEAVE or sender != user_id #
# When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
# happened that removed the user from the room, or the user was the last person
# locally to leave the room which caused the server to leave the room. In both
# cases, we can just remove the rooms since they are no longer relevant to the user.
# They could still be added back later if they are `newly_left`.
return membership != Membership.LEAVE or sender not in (user_id, None)
# We can't freeze this class because we want to update it in place with the
# de-duplicated data.
@attr.s(slots=True, auto_attribs=True)
class RoomSyncConfig:
"""
Holds the config for what data we should fetch for a room in the sync response.
Attributes:
timeline_limit: The maximum number of events to return in the timeline.
required_state: The set of state events requested for the room. The
values are close to `StateKey` but actually use a syntax where you can
provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part
of the tuple (type, state_key).
"""
timeline_limit: int
required_state: Set[Tuple[str, str]]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
Attributes:
event_id: The event ID of the membership event
event_pos: The stream position of the membership event
membership: The membership state of the user in the room
sender: The person who sent the membership event
newly_joined: Whether the user newly joined the room during the given token
range
"""
event_id: Optional[str]
event_pos: PersistedEventPosition
membership: str
sender: Optional[str]
newly_joined: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
return attr.evolve(self, **kwds)
class SlidingSyncHandler: class SlidingSyncHandler:
@ -90,6 +124,7 @@ class SlidingSyncHandler:
self.auth_blocking = hs.get_auth_blocking() self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.relations_handler = hs.get_relations_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user( async def wait_for_sync_for_user(
@ -201,6 +236,7 @@ class SlidingSyncHandler:
# Assemble sliding window lists # Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists: if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync # Get all of the room IDs that the user should be able to see in the sync
# response # response
@ -225,29 +261,67 @@ class SlidingSyncHandler:
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges: if list_config.ranges:
for range in list_config.ranges: for range in list_config.ranges:
sliced_room_ids = [
room_id
# Both sides of range are inclusive
for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
]
ops.append( ops.append(
SlidingSyncResult.SlidingWindowList.Operation( SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC, op=OperationType.SYNC,
range=range, range=range,
room_ids=[ room_ids=sliced_room_ids,
room_id
for room_id, _ in sorted_room_info[
range[0] : range[1]
]
],
) )
) )
# Take the superset of the `RoomSyncConfig` for each room
for room_id in sliced_room_ids:
if relevant_room_map.get(room_id) is not None:
# Take the highest timeline limit
if (
relevant_room_map[room_id].timeline_limit
< list_config.timeline_limit
):
relevant_room_map[room_id].timeline_limit = (
list_config.timeline_limit
)
# Union the required state
relevant_room_map[room_id].required_state.update(
list_config.required_state
)
else:
relevant_room_map[room_id] = RoomSyncConfig(
timeline_limit=list_config.timeline_limit,
required_state=set(list_config.required_state),
)
lists[list_key] = SlidingSyncResult.SlidingWindowList( lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info), count=len(sorted_room_info),
ops=ops, ops=ops,
) )
# TODO: if (sync_config.room_subscriptions):
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
for room_id, room_sync_config in relevant_room_map.items():
room_sync_result = await self.get_room_sync_data(
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
rooms_membership_for_user_at_to_token=sync_room_map[room_id],
from_token=from_token,
to_token=to_token,
)
rooms[room_id] = room_sync_result
return SlidingSyncResult( return SlidingSyncResult(
next_pos=to_token, next_pos=to_token,
lists=lists, lists=lists,
# TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions` rooms=rooms,
rooms={},
extensions={}, extensions={},
) )
@ -256,7 +330,7 @@ class SlidingSyncHandler:
user: UserID, user: UserID,
to_token: StreamToken, to_token: StreamToken,
from_token: Optional[StreamToken] = None, from_token: Optional[StreamToken] = None,
) -> Dict[str, RoomsForUser]: ) -> Dict[str, _RoomMembershipForUser]:
""" """
Fetch room IDs that should be listed for this user in the sync response (the Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced). full room list that will be filtered, sorted, and sliced).
@ -305,13 +379,17 @@ class SlidingSyncHandler:
# Our working list of rooms that can show up in the sync response # Our working list of rooms that can show up in the sync response
sync_room_id_set = { sync_room_id_set = {
room_for_user.room_id: room_for_user # Note: The `room_for_user` we're assigning here will need to be fixed up
for room_for_user in room_for_user_list # (below) because they are potentially from the current snapshot time
if filter_membership_for_sync( # instead from the time of the `to_token`.
room_for_user.room_id: _RoomMembershipForUser(
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
membership=room_for_user.membership, membership=room_for_user.membership,
user_id=user_id,
sender=room_for_user.sender, sender=room_for_user.sender,
newly_joined=False,
) )
for room_for_user in room_for_user_list
} }
# Get the `RoomStreamToken` that represents the spot we queried up to when we got # Get the `RoomStreamToken` that represents the spot we queried up to when we got
@ -346,14 +424,9 @@ class SlidingSyncHandler:
# #
# - 1a) Remove rooms that the user joined after the `to_token` # - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token` # - 1b) Add back rooms that the user left after the `to_token`
# - 1c) Update room membership events to the point in time of the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`) # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
# # - 3) Figure out which rooms are `newly_joined`
# Below, we're doing two separate lookups for membership changes. We could
# request everything for both fixups in one range, [`from_token.room_key`,
# `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
# comparison without `instance_name` (which is flawed). We could refactor
# `event.internal_metadata` to include `instance_name` but it might turn out a
# little difficult and a bigger, broader Synapse change than we want to make.
# 1) ----------------------------------------------------- # 1) -----------------------------------------------------
@ -363,159 +436,198 @@ class SlidingSyncHandler:
# If our `to_token` is already the same or ahead of the latest room membership # If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up # for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed) # use the room list from the snapshot as a base (nothing has changed)
membership_change_events_after_to_token = [] current_state_delta_membership_changes_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key): if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
membership_change_events_after_to_token = ( current_state_delta_membership_changes_after_to_token = (
await self.store.get_membership_changes_for_user( await self.store.get_current_state_delta_membership_changes_for_user(
user_id, user_id,
from_key=to_token.room_key, from_key=to_token.room_key,
to_key=membership_snapshot_token, to_key=membership_snapshot_token,
excluded_rooms=self.rooms_to_exclude_globally, excluded_room_ids=self.rooms_to_exclude_globally,
) )
) )
# 1) Assemble a list of the last membership events in some given ranges. Someone # 1) Assemble a list of the first membership event after the `to_token` so we can
# could have left and joined multiple times during the given range but we only # step backward to the previous membership that would apply to the from/to
# care about end-result so we grab the last one. # range.
last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} first_membership_change_by_room_id_after_to_token: Dict[
# We also need the first membership event after the `to_token` so we can step str, CurrentStateDeltaMembership
# backward to the previous membership that would apply to the from/to range. ] = {}
first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} for membership_change in current_state_delta_membership_changes_after_to_token:
for event in membership_change_events_after_to_token:
last_membership_change_by_room_id_after_to_token[event.room_id] = event
# Only set if we haven't already set it # Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault( first_membership_change_by_room_id_after_to_token.setdefault(
event.room_id, event membership_change.room_id, membership_change
) )
# 1) Fixup # 1) Fixup
#
# Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`.
for ( for (
last_membership_change_after_to_token room_id,
) in last_membership_change_by_room_id_after_to_token.values(): first_membership_change_after_to_token,
room_id = last_membership_change_after_to_token.room_id ) in first_membership_change_by_room_id_after_to_token.items():
# 1a) Remove rooms that the user joined after the `to_token`
if first_membership_change_after_to_token.prev_event_id is None:
sync_room_id_set.pop(room_id, None)
# 1b) 1c) From the first membership event after the `to_token`, step backward to the
# previous membership that would apply to the from/to range.
else:
# We don't expect these fields to be `None` if we have a `prev_event_id`
# but we're being defensive since it's possible that the prev event was
# culled from the database.
if (
first_membership_change_after_to_token.prev_event_pos is not None
and first_membership_change_after_to_token.prev_membership
is not None
):
sync_room_id_set[room_id] = _RoomMembershipForUser(
event_id=first_membership_change_after_to_token.prev_event_id,
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
sender=first_membership_change_after_to_token.prev_sender,
newly_joined=False,
)
else:
# If we can't find the previous membership event, we shouldn't
# include the room in the sync response since we can't determine the
# exact membership state and shouldn't rely on the current snapshot.
sync_room_id_set.pop(room_id, None)
# We want to find the first membership change after the `to_token` then step # Filter the rooms that that we have updated room membership events to the point
# backward to know the membership in the from/to range. # in time of the `to_token` (from the "1)" fixups)
first_membership_change_after_to_token = ( filtered_sync_room_id_set = {
first_membership_change_by_room_id_after_to_token.get(room_id) room_id: room_membership_for_user
) for room_id, room_membership_for_user in sync_room_id_set.items()
assert first_membership_change_after_to_token is not None, ( if filter_membership_for_sync(
"If there was a `last_membership_change_after_to_token` that we're iterating over, " membership=room_membership_for_user.membership,
+ "then there should be corresponding a first change. For example, even if there "
+ "is only one event after the `to_token`, the first and last event will be same event. "
+ "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
+ "/`first_membership_change_by_room_id_after_to_token` dicts above."
)
# TODO: Instead of reading from `unsigned`, refactor this to use the
# `current_state_delta_stream` table in the future. Probably a new
# `get_membership_changes_for_user()` function that uses
# `current_state_delta_stream` with a join to `room_memberships`. This would
# help in state reset scenarios since `prev_content` is looking at the
# current branch vs the current room state. This is all just data given to
# the client so no real harm to data integrity, but we'd like to be nice to
# the client. Since the `current_state_delta_stream` table is new, it
# doesn't have all events in it. Since this is Sliding Sync, if we ever need
# to, we can signal the client to throw all of their state away by sending
# "operation: RESET".
prev_content = first_membership_change_after_to_token.unsigned.get(
"prev_content", {}
)
prev_membership = prev_content.get("membership", None)
prev_sender = first_membership_change_after_to_token.unsigned.get(
"prev_sender", None
)
# Check if the previous membership (membership that applies to the from/to
# range) should be included in our `sync_room_id_set`
should_prev_membership_be_included = (
prev_membership is not None
and prev_sender is not None
and filter_membership_for_sync(
membership=prev_membership,
user_id=user_id,
sender=prev_sender,
)
)
# Check if the last membership (membership that applies to our snapshot) was
# already included in our `sync_room_id_set`
was_last_membership_already_included = filter_membership_for_sync(
membership=last_membership_change_after_to_token.membership,
user_id=user_id, user_id=user_id,
sender=last_membership_change_after_to_token.sender, sender=room_membership_for_user.sender,
) )
}
# 1a) Add back rooms that the user left after the `to_token`
#
# For example, if the last membership event after the `to_token` is a leave
# event, then the room was excluded from `sync_room_id_set` when we first
# crafted it above. We should add these rooms back as long as the user also
# was part of the room before the `to_token`.
if (
not was_last_membership_already_included
and should_prev_membership_be_included
):
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_after_to_token
)
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
#
# For example, if the last membership event after the `to_token` is a "join"
# event, then the room was included `sync_room_id_set` when we first crafted
# it above. We should remove these rooms as long as the user also wasn't
# part of the room before the `to_token`.
elif (
was_last_membership_already_included
and not should_prev_membership_be_included
):
del sync_room_id_set[room_id]
# 2) ----------------------------------------------------- # 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed # We fix-up newly_left rooms after the first fixup because it may have removed
# some left rooms that we can figure out our newly_left in the following code # some left rooms that we can figure out are newly_left in the following code
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
membership_change_events_in_from_to_range = [] current_state_delta_membership_changes_in_from_to_range = []
if from_token: if from_token:
membership_change_events_in_from_to_range = ( current_state_delta_membership_changes_in_from_to_range = (
await self.store.get_membership_changes_for_user( await self.store.get_current_state_delta_membership_changes_for_user(
user_id, user_id,
from_key=from_token.room_key, from_key=from_token.room_key,
to_key=to_token.room_key, to_key=to_token.room_key,
excluded_rooms=self.rooms_to_exclude_globally, excluded_room_ids=self.rooms_to_exclude_globally,
) )
) )
# 2) Assemble a list of the last membership events in some given ranges. Someone # 2) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only # could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one. # care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {} last_membership_change_by_room_id_in_from_to_range: Dict[
for event in membership_change_events_in_from_to_range: str, CurrentStateDeltaMembership
last_membership_change_by_room_id_in_from_to_range[event.room_id] = event ] = {}
# We also want to assemble a list of the first membership events during the token
# range so we can step backward to the previous membership that would apply to
# before the token range to see if we have `newly_joined` the room.
first_membership_change_by_room_id_in_from_to_range: Dict[
str, CurrentStateDeltaMembership
] = {}
# Keep track if the room has a non-join event in the token range so we can later
# tell if it was a `newly_joined` room. If the last membership event in the
# token range is a join and there is also some non-join in the range, we know
# they `newly_joined`.
has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
for (
membership_change
) in current_state_delta_membership_changes_in_from_to_range:
room_id = membership_change.room_id
last_membership_change_by_room_id_in_from_to_range[room_id] = (
membership_change
)
# Only set if we haven't already set it
first_membership_change_by_room_id_in_from_to_range.setdefault(
room_id, membership_change
)
if membership_change.membership != Membership.JOIN:
has_non_join_event_by_room_id_in_from_to_range[room_id] = True
# 2) Fixup # 2) Fixup
#
# 3) We also want to assemble a list of possibly newly joined rooms. Someone
# could have left and joined multiple times during the given range but we only
# care about whether they are joined at the end of the token range so we are
# working with the last membership even in the token range.
possibly_newly_joined_room_ids = set()
for ( for (
last_membership_change_in_from_to_range last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values(): ) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id room_id = last_membership_change_in_from_to_range.room_id
# 3)
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see # include newly_left rooms because the last event that the user should see
# is their own leave event # is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE: if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
sync_room_id_set[room_id] = convert_event_to_rooms_for_user( filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
last_membership_change_in_from_to_range event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
membership=last_membership_change_in_from_to_range.membership,
sender=last_membership_change_in_from_to_range.sender,
newly_joined=False,
) )
return sync_room_id_set # 3) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:
has_non_join_in_from_to_range = (
has_non_join_event_by_room_id_in_from_to_range.get(room_id, False)
)
# If the last membership event in the token range is a join and there is
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
else:
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
room_id
].prev_event_id
prev_membership = first_membership_change_by_room_id_in_from_to_range[
room_id
].prev_membership
if prev_event_id is None:
# We found a `newly_joined` room (we are joining the room for the
# first time within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
# Last resort, we need to step back to the previous membership event
# just before the token range to see if we're joined then or not.
elif prev_membership != Membership.JOIN:
# We found a `newly_joined` room (we left before the token range
# and joined within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
return filtered_sync_room_id_set
async def filter_rooms( async def filter_rooms(
self, self,
user: UserID, user: UserID,
sync_room_map: Dict[str, RoomsForUser], sync_room_map: Dict[str, _RoomMembershipForUser],
filters: SlidingSyncConfig.SlidingSyncList.Filters, filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken, to_token: StreamToken,
) -> Dict[str, RoomsForUser]: ) -> Dict[str, _RoomMembershipForUser]:
""" """
Filter rooms based on the sync request. Filter rooms based on the sync request.
@ -629,9 +741,9 @@ class SlidingSyncHandler:
async def sort_rooms( async def sort_rooms(
self, self,
sync_room_map: Dict[str, RoomsForUser], sync_room_map: Dict[str, _RoomMembershipForUser],
to_token: StreamToken, to_token: StreamToken,
) -> List[Tuple[str, RoomsForUser]]: ) -> List[Tuple[str, _RoomMembershipForUser]]:
""" """
Sort by `stream_ordering` of the last event that the user should see in the Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort. room. `stream_ordering` is unique so we get a stable sort.
@ -678,3 +790,229 @@ class SlidingSyncHandler:
# We want descending order # We want descending order
reverse=True, reverse=True,
) )
async def get_room_sync_data(
self,
user: UserID,
room_id: str,
room_sync_config: RoomSyncConfig,
rooms_membership_for_user_at_to_token: _RoomMembershipForUser,
from_token: Optional[StreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
We fetch data according to the token range (> `from_token` and <= `to_token`).
Args:
user: User to fetch data for
room_id: The room ID to fetch data for
room_sync_config: Config for what data we should fetch for a room in the
sync response.
rooms_membership_for_user_at_to_token: Membership information for the user
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""
# Assemble the list of timeline events
#
# It would be nice to make the `rooms` response more uniform regardless of
# membership. Currently, we have to make all of these optional because
# `invite`/`knock` rooms only have `stripped_state`. See
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
timeline_events: Optional[List[EventBase]] = None
bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
limited: Optional[bool] = None
prev_batch_token: Optional[StreamToken] = None
num_live: Optional[int] = None
if (
room_sync_config.timeline_limit > 0
# No timeline for invite/knock rooms (just `stripped_state`)
and rooms_membership_for_user_at_to_token.membership
not in (Membership.INVITE, Membership.KNOCK)
):
limited = False
# We want to start off using the `to_token` (vs `from_token`) because we look
# backwards from the `to_token` up to the `timeline_limit` and we might not
# reach the `from_token` before we hit the limit. We will update the room stream
# position once we've fetched the events to point to the earliest event fetched.
prev_batch_token = to_token
# We're going to paginate backwards from the `to_token`
from_bound = to_token.room_key
# People shouldn't see past their leave/ban event
if rooms_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
from_bound = (
rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
to_bound = (
from_token.room_key
if from_token is not None
and not rooms_membership_for_user_at_to_token.newly_joined
else None
)
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_bound,
to_key=to_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
event_filter=None,
)
# We want to return the events in ascending order (the last event is the
# most recent).
timeline_events.reverse()
# Determine our `limited` status based on the timeline. We do this before
# filtering the events so we can accurately determine if there is more to
# paginate even if we filter out some/all events.
if len(timeline_events) > room_sync_config.timeline_limit:
limited = True
# Get rid of that extra "+ 1" event because we only used it to determine
# if we hit the limit or not
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
assert timeline_events[0].internal_metadata.stream_ordering
new_room_key = RoomStreamToken(
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)
# Make sure we don't expose any events that the client shouldn't see
timeline_events = await filter_events_for_client(
self.storage_controllers,
user.to_string(),
timeline_events,
is_peeking=rooms_membership_for_user_at_to_token.membership
!= Membership.JOIN,
filter_send_to_client=True,
)
# TODO: Filter out `EventTypes.CallInvite` in public rooms,
# see https://github.com/element-hq/synapse/issues/17359
# TODO: Handle timeline gaps (`get_timeline_gaps()`)
# Determine how many "live" events we have (events within the given token range).
#
# This is mostly useful to determine whether a given @mention event should
# make a noise or not. Clients cannot rely solely on the absence of
# `initial: true` to determine live events because if a room not in the
# sliding window bumps into the window because of an @mention it will have
# `initial: true` yet contain a single live event (with potentially other
# old events in the timeline)
num_live = 0
if from_token is not None:
for timeline_event in reversed(timeline_events):
# This fields should be present for all persisted events
assert timeline_event.internal_metadata.stream_ordering is not None
assert timeline_event.internal_metadata.instance_name is not None
persisted_position = PersistedEventPosition(
instance_name=timeline_event.internal_metadata.instance_name,
stream=timeline_event.internal_metadata.stream_ordering,
)
if persisted_position.persisted_after(from_token.room_key):
num_live += 1
else:
# Since we're iterating over the timeline events in
# reverse-chronological order, we can break once we hit an event
# that's not live. In the future, we could potentially optimize
# this more with a binary search (bisect).
break
# If the timeline is `limited=True`, the client does not have all events
# necessary to calculate aggregations themselves.
if limited:
bundled_aggregations = (
await self.relations_handler.get_bundled_aggregations(
timeline_events, user.to_string()
)
)
# Update the `prev_batch_token` to point to the position that allows us to
# keep paginating backwards from the oldest event we return in the timeline.
prev_batch_token = prev_batch_token.copy_and_replace(
StreamKeyType.ROOM, new_room_key
)
# Figure out any stripped state events for invite/knocks. This allows the
# potential joiner to identify the room.
stripped_state: List[JsonDict] = []
if rooms_membership_for_user_at_to_token.membership in (
Membership.INVITE,
Membership.KNOCK,
):
# This should never happen. If someone is invited/knocked on room, then
# there should be an event for it.
assert rooms_membership_for_user_at_to_token.event_id is not None
invite_or_knock_event = await self.store.get_event(
rooms_membership_for_user_at_to_token.event_id
)
stripped_state = []
if invite_or_knock_event.membership == Membership.INVITE:
stripped_state.extend(
invite_or_knock_event.unsigned.get("invite_room_state", [])
)
elif invite_or_knock_event.membership == Membership.KNOCK:
stripped_state.extend(
invite_or_knock_event.unsigned.get("knock_room_state", [])
)
stripped_state.append(strip_event(invite_or_knock_event))
# TODO: Handle state resets. For example, if we see
# `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but
# `required_state` doesn't include it, we should indicate to the client that a
# state reset happened. Perhaps we should indicate this by setting `initial:
# True` and empty `required_state`.
return SlidingSyncResult.RoomResult(
# TODO: Dummy value
name=None,
# TODO: Dummy value
avatar=None,
# TODO: Dummy value
heroes=None,
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial=True,
# TODO: Dummy value
required_state=[],
timeline_events=timeline_events,
bundled_aggregations=bundled_aggregations,
# TODO: Dummy value
is_dm=False,
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
# TODO: Dummy values
joined_count=0,
invited_count=0,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
num_live=num_live,
)

View file

@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet):
"lists": { "lists": {
"foo-list": { "foo-list": {
"ranges": [ [0, 99] ], "ranges": [ [0, 99] ],
"sort": [ "by_notification_level", "by_recency", "by_name" ],
"required_state": [ "required_state": [
["m.room.join_rules", ""], ["m.room.join_rules", ""],
["m.room.history_visibility", ""], ["m.room.history_visibility", ""],
@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet):
"filters": { "filters": {
"is_dm": true "is_dm": true
}, },
"bump_event_types": [ "m.room.message", "m.room.encrypted" ],
} }
}, },
// Room Subscriptions API // Room Subscriptions API
@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet):
"!sub1:bar": { "!sub1:bar": {
"required_state": [ ["*","*"] ], "required_state": [ ["*","*"] ],
"timeline_limit": 10, "timeline_limit": 10,
"include_old_rooms": {
"timeline_limit": 1,
"required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
}
} }
}, },
// Extensions API // Extensions API
@ -791,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet):
Response JSON:: Response JSON::
{ {
"next_pos": "s58_224_0_13_10_1_1_16_0_1", "pos": "s58_224_0_13_10_1_1_16_0_1",
"lists": { "lists": {
"foo-list": { "foo-list": {
"count": 1337, "count": 1337,
@ -830,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet):
"joined_count": 41, "joined_count": 41,
"invited_count": 1, "invited_count": 1,
"notification_count": 1, "notification_count": 1,
"highlight_count": 0 "highlight_count": 0,
"num_live": 2"
}, },
// rooms from list // rooms from list
"!foo:bar": { "!foo:bar": {
@ -855,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet):
"joined_count": 4, "joined_count": 4,
"invited_count": 0, "invited_count": 0,
"notification_count": 54, "notification_count": 54,
"highlight_count": 3 "highlight_count": 3,
"num_live": 1,
}, },
// ... 99 more items // ... 99 more items
}, },
@ -871,10 +867,11 @@ class SlidingSyncRestServlet(RestServlet):
super().__init__() super().__init__()
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.filtering = hs.get_filtering() self.filtering = hs.get_filtering()
self.sliding_sync_handler = hs.get_sliding_sync_handler() self.sliding_sync_handler = hs.get_sliding_sync_handler()
self.event_serializer = hs.get_event_client_serializer()
# TODO: Update this to `on_GET` once we figure out how we want to handle params
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True) requester = await self.auth.get_user_by_req(request, allow_guest=True)
user = requester.user user = requester.user
@ -920,22 +917,25 @@ class SlidingSyncRestServlet(RestServlet):
logger.info("Client has disconnected; not serializing response.") logger.info("Client has disconnected; not serializing response.")
return 200, {} return 200, {}
response_content = await self.encode_response(sliding_sync_results) response_content = await self.encode_response(requester, sliding_sync_results)
return 200, response_content return 200, response_content
# TODO: Is there a better way to encode things? # TODO: Is there a better way to encode things?
async def encode_response( async def encode_response(
self, self,
requester: Requester,
sliding_sync_result: SlidingSyncResult, sliding_sync_result: SlidingSyncResult,
) -> JsonDict: ) -> JsonDict:
response: JsonDict = defaultdict(dict) response: JsonDict = defaultdict(dict)
response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store) response["pos"] = await sliding_sync_result.next_pos.to_string(self.store)
serialized_lists = self.encode_lists(sliding_sync_result.lists) serialized_lists = self.encode_lists(sliding_sync_result.lists)
if serialized_lists: if serialized_lists:
response["lists"] = serialized_lists response["lists"] = serialized_lists
response["rooms"] = {} # TODO: sliding_sync_result.rooms response["rooms"] = await self.encode_rooms(
requester, sliding_sync_result.rooms
)
response["extensions"] = {} # TODO: sliding_sync_result.extensions response["extensions"] = {} # TODO: sliding_sync_result.extensions
return response return response
@ -961,6 +961,92 @@ class SlidingSyncRestServlet(RestServlet):
return serialized_lists return serialized_lists
async def encode_rooms(
self,
requester: Requester,
rooms: Dict[str, SlidingSyncResult.RoomResult],
) -> JsonDict:
time_now = self.clock.time_msec()
serialize_options = SerializeEventConfig(
event_format=format_event_for_client_v2_without_room_id,
requester=requester,
)
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
}
if room_result.name:
serialized_rooms[room_id]["name"] = room_result.name
if room_result.avatar:
serialized_rooms[room_id]["avatar"] = room_result.avatar
if room_result.heroes:
serialized_rooms[room_id]["heroes"] = room_result.heroes
# We should only include the `initial` key if it's `True` to save bandwidth.
# The absense of this flag means `False`.
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial
# This will omitted for invite/knock rooms with `stripped_state`
if room_result.required_state is not None:
serialized_required_state = (
await self.event_serializer.serialize_events(
room_result.required_state,
time_now,
config=serialize_options,
)
)
serialized_rooms[room_id]["required_state"] = serialized_required_state
# This will omitted for invite/knock rooms with `stripped_state`
if room_result.timeline_events is not None:
serialized_timeline = await self.event_serializer.serialize_events(
room_result.timeline_events,
time_now,
config=serialize_options,
bundle_aggregations=room_result.bundled_aggregations,
)
serialized_rooms[room_id]["timeline"] = serialized_timeline
# This will omitted for invite/knock rooms with `stripped_state`
if room_result.limited is not None:
serialized_rooms[room_id]["limited"] = room_result.limited
# This will omitted for invite/knock rooms with `stripped_state`
if room_result.prev_batch is not None:
serialized_rooms[room_id]["prev_batch"] = (
await room_result.prev_batch.to_string(self.store)
)
# This will omitted for invite/knock rooms with `stripped_state`
if room_result.num_live is not None:
serialized_rooms[room_id]["num_live"] = room_result.num_live
# Field should be absent on non-DM rooms
if room_result.is_dm:
serialized_rooms[room_id]["is_dm"] = room_result.is_dm
# Stripped state only applies to invite/knock rooms
if room_result.stripped_state is not None:
# TODO: `knocked_state` but that isn't specced yet.
#
# TODO: Instead of adding `knocked_state`, it would be good to rename
# this to `stripped_state` so it can be shared between invite and knock
# rooms, see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919
serialized_rooms[room_id]["invite_state"] = room_result.stripped_state
return serialized_rooms
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server) SyncRestServlet(hs).register(http_server)

View file

@ -55,7 +55,7 @@ from synapse.api.room_versions import (
) )
from synapse.events import EventBase, make_event_from_dict from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event from synapse.events.utils import prune_event, strip_event
from synapse.logging.context import ( from synapse.logging.context import (
PreserveLoggingContext, PreserveLoggingContext,
current_context, current_context,
@ -1025,15 +1025,7 @@ class EventsWorkerStore(SQLBaseStore):
state_to_include = await self.get_events(selected_state_ids.values()) state_to_include = await self.get_events(selected_state_ids.values())
return [ return [strip_event(e) for e in state_to_include.values()]
{
"type": e.type,
"state_key": e.state_key,
"content": e.content,
"sender": e.sender,
}
for e in state_to_include.values()
]
def _maybe_start_fetch_thread(self) -> None: def _maybe_start_fetch_thread(self) -> None:
"""Starts an event fetch thread if we are not yet at the maximum number.""" """Starts an event fetch thread if we are not yet at the maximum number."""

View file

@ -44,6 +44,7 @@ what sort order was used:
import logging import logging
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
AbstractSet,
Any, Any,
Collection, Collection,
Dict, Dict,
@ -62,7 +63,7 @@ from typing_extensions import Literal
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import Direction from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events import EventBase from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
@ -111,6 +112,32 @@ class _EventsAround:
end: RoomStreamToken end: RoomStreamToken
@attr.s(slots=True, frozen=True, auto_attribs=True)
class CurrentStateDeltaMembership:
"""
Attributes:
event_id: The "current" membership event ID in this room.
event_pos: The position of the "current" membership event in the event stream.
prev_event_id: The previous membership event in this room that was replaced by
the "current" one. May be `None` if there was no previous membership event.
room_id: The room ID of the membership event.
membership: The membership state of the user in the room
sender: The person who sent the membership event
"""
room_id: str
# Event
event_id: Optional[str]
event_pos: PersistedEventPosition
membership: str
sender: Optional[str]
# Prev event
prev_event_id: Optional[str]
prev_event_pos: Optional[PersistedEventPosition]
prev_membership: Optional[str]
prev_sender: Optional[str]
def generate_pagination_where_clause( def generate_pagination_where_clause(
direction: Direction, direction: Direction,
column_names: Tuple[str, str], column_names: Tuple[str, str],
@ -390,6 +417,43 @@ def _filter_results(
return True return True
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
stream_ordering: int,
) -> bool:
"""
This function only works with "live" tokens with `stream_ordering` only. See
`_filter_results(...)` if you want to work with all tokens.
Returns True if the event persisted by the given instance at the given
stream_ordering falls between the two tokens (taking a None
token to mean unbounded).
Used to filter results from fetching events in the DB against the given
tokens. This is necessary to handle the case where the tokens include
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
"""
if lower_token:
assert lower_token.topological is None
# If these are live tokens we compare the stream ordering against the
# writers stream position.
if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name):
return False
if upper_token:
assert upper_token.topological is None
if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
return False
return True
def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
# NB: This may create SQL clauses that don't optimise well (and we don't # NB: This may create SQL clauses that don't optimise well (and we don't
# have indices on all possible clauses). E.g. it may create # have indices on all possible clauses). E.g. it may create
@ -734,6 +798,191 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return ret, key return ret, key
async def get_current_state_delta_membership_changes_for_user(
self,
user_id: str,
from_key: RoomStreamToken,
to_key: RoomStreamToken,
excluded_room_ids: Optional[List[str]] = None,
) -> List[CurrentStateDeltaMembership]:
"""
Fetch membership events (and the previous event that was replaced by that one)
for a given user.
Note: This function only works with "live" tokens with `stream_ordering` only.
We're looking for membership changes in the token range (> `from_key` and <=
`to_key`).
Please be mindful to only use this with `from_key` and `to_key` tokens that are
recent enough to be after when the first local user joined the room. Otherwise,
the results may be incomplete or too greedy. For example, if you use a token
range before the first local user joined the room, you will see 0 events since
`current_state_delta_stream` tracks what the server thinks is the current state
of the room as time goes. It does not track how state progresses from the
beginning of the room. So for example, when you remotely join a room, the first
rows will just be the state when you joined and progress from there.
You can probably reasonably use this with `/sync` because the `to_key` passed in
will be the "current" now token and the range will cover when the user joined
the room.
Args:
user_id: The user ID to fetch membership events for.
from_key: The point in the stream to sync from (fetching events > this point).
to_key: The token to fetch rooms up to (fetching events <= this point).
excluded_room_ids: Optional list of room IDs to exclude from the results.
Returns:
All membership changes to the current state in the token range. Events are
sorted by `stream_ordering` ascending.
"""
# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return []
if from_key:
has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_key.stream)
)
if not has_changed:
return []
def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos()
args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id]
# TODO: It would be good to assert that the `from_token`/`to_token` is >=
# the first row in `current_state_delta_stream` for the rooms we're
# interested in. Otherwise, we will end up with empty results and not know
# it.
# We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate
# stream positioning when available but given our usages, we can avoid the
# complexity. Between two (valid) stream tokens, we will still get all of
# the state changes. Since those events are persisted in a batch, valid
# tokens will either be before or after the batch of events.
#
# `stream_ordering` from the `events` table is more accurate when available
# since the `current_state_delta_stream` table only tracks that the current
# state is at this stream position (not what stream position the state event
# was added) and uses the *minimum* stream position for batches of events.
sql = """
SELECT
s.room_id,
e.event_id,
s.instance_name,
s.stream_id,
m.membership,
e.sender,
s.prev_event_id,
e_prev.instance_name AS prev_instance_name,
e_prev.stream_ordering AS prev_stream_ordering,
m_prev.membership AS prev_membership,
e_prev.sender AS prev_sender
FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN events AS e_prev ON e_prev.event_id = s.prev_event_id
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
WHERE s.stream_id > ? AND s.stream_id <= ?
AND s.type = ?
AND s.state_key = ?
ORDER BY s.stream_id ASC
"""
txn.execute(sql, args)
membership_changes: List[CurrentStateDeltaMembership] = []
for (
room_id,
event_id,
instance_name,
stream_ordering,
membership,
sender,
prev_event_id,
prev_instance_name,
prev_stream_ordering,
prev_membership,
prev_sender,
) in txn:
assert room_id is not None
assert instance_name is not None
assert stream_ordering is not None
if _filter_results_by_stream(
from_key,
to_key,
instance_name,
stream_ordering,
):
# When the server leaves a room, it will insert new rows into the
# `current_state_delta_stream` table with `event_id = null` for all
# current state. This means we might already have a row for the
# leave event and then another for the same leave where the
# `event_id=null` but the `prev_event_id` is pointing back at the
# earlier leave event. We don't want to report the leave, if we
# already have a leave event.
if event_id is None and prev_membership == Membership.LEAVE:
continue
membership_change = CurrentStateDeltaMembership(
room_id=room_id,
# Event
event_id=event_id,
event_pos=PersistedEventPosition(
instance_name=instance_name,
stream=stream_ordering,
),
# When `s.event_id = null`, we won't be able to get respective
# `room_membership` but can assume the user has left the room
# because this only happens when the server leaves a room
# (meaning everyone locally left) or a state reset which removed
# the person from the room.
membership=(
membership if membership is not None else Membership.LEAVE
),
sender=sender,
# Prev event
prev_event_id=prev_event_id,
prev_event_pos=(
PersistedEventPosition(
instance_name=prev_instance_name,
stream=prev_stream_ordering,
)
if (
prev_instance_name is not None
and prev_stream_ordering is not None
)
else None
),
prev_membership=prev_membership,
prev_sender=prev_sender,
)
membership_changes.append(membership_change)
return membership_changes
membership_changes = await self.db_pool.runInteraction(
"get_current_state_delta_membership_changes_for_user", f
)
room_ids_to_exclude: AbstractSet[str] = set()
if excluded_room_ids is not None:
room_ids_to_exclude = set(excluded_room_ids)
return [
membership_change
for membership_change in membership_changes
if membership_change.room_id not in room_ids_to_exclude
]
@cancellable @cancellable
async def get_membership_changes_for_user( async def get_membership_changes_for_user(
self, self,
@ -769,10 +1018,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ignore_room_clause = "" ignore_room_clause = ""
if excluded_rooms is not None and len(excluded_rooms) > 0: if excluded_rooms is not None and len(excluded_rooms) > 0:
ignore_room_clause = "AND e.room_id NOT IN (%s)" % ",".join( ignore_room_clause, ignore_room_args = make_in_list_sql_clause(
"?" for _ in excluded_rooms txn.database_engine, "e.room_id", excluded_rooms, negative=True
) )
args = args + excluded_rooms ignore_room_clause = f"AND {ignore_room_clause}"
args += ignore_room_args
sql = """ sql = """
SELECT m.event_id, instance_name, topological_ordering, stream_ordering SELECT m.event_id, instance_name, topological_ordering, stream_ordering
@ -1554,6 +1804,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
) -> Tuple[List[EventBase], RoomStreamToken]: ) -> Tuple[List[EventBase], RoomStreamToken]:
"""Returns list of events before or after a given token. """Returns list of events before or after a given token.
When Direction.FORWARDS: from_key < x <= to_key
When Direction.BACKWARDS: from_key >= x > to_key
Args: Args:
room_id room_id
from_key: The token used to stream from from_key: The token used to stream from
@ -1570,6 +1823,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and `to_key`). and `to_key`).
""" """
# We can bail early if we're looking forwards, and our `to_key` is already
# before our `from_key`.
if (
direction == Direction.FORWARDS
and to_key is not None
and to_key.is_before_or_eq(from_key)
):
# Token selection matches what we do in `_paginate_room_events_txn` if there
# are no rows
return [], to_key if to_key else from_key
# Or vice-versa, if we're looking backwards and our `from_key` is already before
# our `to_key`.
elif (
direction == Direction.BACKWARDS
and to_key is not None
and from_key.is_before_or_eq(to_key)
):
# Token selection matches what we do in `_paginate_room_events_txn` if there
# are no rows
return [], to_key if to_key else from_key
rows, token = await self.db_pool.runInteraction( rows, token = await self.db_pool.runInteraction(
"paginate_room_events", "paginate_room_events",
self._paginate_room_events_txn, self._paginate_room_events_txn,

View file

@ -32,7 +32,10 @@
* limitations under the License. * limitations under the License.
*/ */
-- Tracks what the server thinks is the current state of the room as time goes. It does
-- not track how state progresses from the beginning of the room. So for example, when
-- you remotely join a room, the first rows will just be the state when you joined and
-- progress from there.
CREATE TABLE current_state_delta_stream ( CREATE TABLE current_state_delta_stream (
stream_id BIGINT NOT NULL, stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,

View file

@ -1096,6 +1096,9 @@ class PersistedPosition:
stream: int stream: int
def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool:
"""
Checks whether this position happened after the token
"""
return token.get_stream_pos_for_instance(self.instance_name) < self.stream return token.get_stream_pos_for_instance(self.instance_name) < self.stream

View file

@ -31,9 +31,12 @@ else:
from pydantic import Extra from pydantic import Extra
from synapse.events import EventBase from synapse.events import EventBase
from synapse.types import JsonMapping, StreamToken, UserID from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
from synapse.types.rest.client import SlidingSyncBody from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING:
from synapse.handlers.relations import BundledAggregations
class ShutdownRoomParams(TypedDict): class ShutdownRoomParams(TypedDict):
""" """
@ -159,11 +162,16 @@ class SlidingSyncResult:
entirely and NOT send "initial":false as this is wasteful on bandwidth. The entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'. absence of this flag means 'false'.
required_state: The current state of the room required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
the timeline events above. This allows clients to show accurate reaction
counts (or edits, threads), even if some of the reaction events were skipped
over in a gappy sync.
is_dm: Flag to specify whether the room is a direct-message room (most likely is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people). between two people).
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state` stripped_state: Stripped state events (for rooms where the usre is
in sync v2, absent on joined/left rooms invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages. `/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if their are more events than fit between the given position and now. limited: True if their are more events than fit between the given position and now.
@ -185,21 +193,28 @@ class SlidingSyncResult:
(with potentially other old events in the timeline). (with potentially other old events in the timeline).
""" """
name: str name: Optional[str]
avatar: Optional[str] avatar: Optional[str]
heroes: Optional[List[EventBase]] heroes: Optional[List[EventBase]]
initial: bool initial: bool
required_state: List[EventBase] # Only optional because it won't be included for invite/knock rooms with `stripped_state`
timeline: List[EventBase] required_state: Optional[List[EventBase]]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
timeline_events: Optional[List[EventBase]]
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
is_dm: bool is_dm: bool
invite_state: List[EventBase] # Optional because it's only relevant to invite/knock rooms
prev_batch: StreamToken stripped_state: Optional[List[JsonDict]]
limited: bool # Only optional because it won't be included for invite/knock rooms with `stripped_state`
prev_batch: Optional[StreamToken]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
limited: Optional[bool]
joined_count: int joined_count: int
invited_count: int invited_count: int
notification_count: int notification_count: int
highlight_count: int highlight_count: int
num_live: int # Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList: class SlidingWindowList:

View file

@ -152,22 +152,14 @@ class SlidingSyncBody(RequestBodyModel):
anyway. anyway.
timeline_limit: The maximum number of timeline events to return per response. timeline_limit: The maximum number of timeline events to return per response.
(Max 1000 messages) (Max 1000 messages)
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
""" """
class IncludeOldRooms(RequestBodyModel):
timeline_limit: StrictInt
required_state: List[Tuple[StrictStr, StrictStr]]
required_state: List[Tuple[StrictStr, StrictStr]] required_state: List[Tuple[StrictStr, StrictStr]]
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING: if TYPE_CHECKING:
timeline_limit: int timeline_limit: int
else: else:
timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type] timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type]
include_old_rooms: Optional[IncludeOldRooms] = None
class SlidingSyncList(CommonRoomParameters): class SlidingSyncList(CommonRoomParameters):
""" """
@ -208,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel):
} }
timeline_limit: The maximum number of timeline events to return per response. timeline_limit: The maximum number of timeline events to return per response.
include_old_rooms: Determines if `predecessor` rooms are included in the
`rooms` response. The user MUST be joined to old rooms for them to show up
in the response.
include_heroes: Return a stripped variant of membership events (containing include_heroes: Return a stripped variant of membership events (containing
`user_id` and optionally `avatar_url` and `displayname`) for the users used `user_id` and optionally `avatar_url` and `displayname`) for the users used
to calculate the room name. to calculate the room name.

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -261,9 +261,9 @@ class RestHelper:
targ: str, targ: str,
expect_code: int = HTTPStatus.OK, expect_code: int = HTTPStatus.OK,
tok: Optional[str] = None, tok: Optional[str] = None,
) -> None: ) -> JsonDict:
"""A convenience helper: `change_membership` with `membership` preset to "ban".""" """A convenience helper: `change_membership` with `membership` preset to "ban"."""
self.change_membership( return self.change_membership(
room=room, room=room,
src=src, src=src,
targ=targ, targ=targ,

View file

@ -21,20 +21,32 @@
import logging import logging
from typing import List, Tuple from typing import List, Tuple
from unittest.mock import AsyncMock, patch
from immutabledict import immutabledict from immutabledict import immutabledict
from twisted.test.proto_helpers import MemoryReactor from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import Direction, EventTypes, RelationTypes from synapse.api.constants import Direction, EventTypes, Membership, RelationTypes
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import FrozenEventV3
from synapse.federation.federation_client import SendJoinResult
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import login, room from synapse.rest.client import login, room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.types import (
JsonDict,
PersistedEventPosition,
RoomStreamToken,
UserID,
create_requester,
)
from synapse.util import Clock from synapse.util import Clock
from tests.unittest import HomeserverTestCase from tests.test_utils.event_injection import create_event
from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -543,3 +555,859 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
} }
), ),
) )
class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
"""
Test `get_current_state_delta_membership_changes_for_user(...)`
"""
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.state_handler = self.hs.get_state_handler()
persistence = hs.get_storage_controllers().persistence
assert persistence is not None
self.persistence = persistence
def test_returns_membership_events(self) -> None:
"""
A basic test that a membership event in the token range is returned for the user.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
join_pos = self.get_success(
self.store.get_position_for_event(join_response["event_id"])
)
after_room1_token = self.event_sources.get_current_token()
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=join_response["event_id"],
event_pos=join_pos,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
)
],
)
def test_server_left_room_after_us(self) -> None:
"""
Test that when probing over part of the DAG where the server left the room *after
us*, we still see the join and leave changes.
This is to make sure we play nicely with this behavior: When the server leaves a
room, it will insert new rows with `event_id = null` into the
`current_state_delta_stream` table for all current state.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
room_id1 = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
"power_level_content_override": {
"users": {
user2_id: 100,
# Allow user1 to send state in the room
user1_id: 100,
}
}
},
)
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
join_pos1 = self.get_success(
self.store.get_position_for_event(join_response1["event_id"])
)
# Make sure that random other non-member state that happens to have a `state_key`
# matching the user ID doesn't mess with things.
self.helper.send_state(
room_id1,
event_type="foobarbazdummy",
state_key=user1_id,
body={"foo": "bar"},
tok=user1_tok,
)
# User1 should leave the room first
leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
leave_pos1 = self.get_success(
self.store.get_position_for_event(leave_response1["event_id"])
)
# User2 should also leave the room (everyone has left the room which means the
# server is no longer in the room).
self.helper.leave(room_id1, user2_id, tok=user2_tok)
after_room1_token = self.event_sources.get_current_token()
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like the
# following. When the server leaves a room, it will insert new rows with
# `event_id = null` for all current state.
#
# | stream_id | room_id | type | state_key | event_id | prev_event_id |
# |-----------|----------|-----------------------------|----------------|----------|---------------|
# | 2 | !x:test | 'm.room.create' | '' | $xxx | None |
# | 3 | !x:test | 'm.room.member' | '@user2:test' | $aaa | None |
# | 4 | !x:test | 'm.room.history_visibility' | '' | $xxx | None |
# | 4 | !x:test | 'm.room.join_rules' | '' | $xxx | None |
# | 4 | !x:test | 'm.room.power_levels' | '' | $xxx | None |
# | 7 | !x:test | 'm.room.member' | '@user1:test' | $ooo | None |
# | 8 | !x:test | 'foobarbazdummy' | '@user1:test' | $xxx | None |
# | 9 | !x:test | 'm.room.member' | '@user1:test' | $ppp | $ooo |
# | 10 | !x:test | 'foobarbazdummy' | '@user1:test' | None | $xxx |
# | 10 | !x:test | 'm.room.create' | '' | None | $xxx |
# | 10 | !x:test | 'm.room.history_visibility' | '' | None | $xxx |
# | 10 | !x:test | 'm.room.join_rules' | '' | None | $xxx |
# | 10 | !x:test | 'm.room.member' | '@user1:test' | None | $ppp |
# | 10 | !x:test | 'm.room.member' | '@user2:test' | None | $aaa |
# | 10 | !x:test | 'm.room.power_levels' | | None | $xxx |
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=join_response1["event_id"],
event_pos=join_pos1,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
),
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=leave_response1["event_id"],
event_pos=leave_pos1,
membership="leave",
sender=user1_id,
prev_event_id=join_response1["event_id"],
prev_event_pos=join_pos1,
prev_membership="join",
prev_sender=user1_id,
),
],
)
def test_server_left_room_after_us_later(self) -> None:
"""
Test when the user leaves the room, then sometime later, everyone else leaves
the room, causing the server to leave the room, we shouldn't see any membership
changes.
This is to make sure we play nicely with this behavior: When the server leaves a
room, it will insert new rows with `event_id = null` into the
`current_state_delta_stream` table for all current state.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# User1 should leave the room first
self.helper.leave(room_id1, user1_id, tok=user1_tok)
after_user1_leave_token = self.event_sources.get_current_token()
# User2 should also leave the room (everyone has left the room which means the
# server is no longer in the room).
self.helper.leave(room_id1, user2_id, tok=user2_tok)
after_server_leave_token = self.event_sources.get_current_token()
# Join another room as user1 just to advance the stream_ordering and bust
# `_membership_stream_cache`
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id2, user1_id, tok=user1_tok)
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like the
# following. When the server leaves a room, it will insert new rows with
# `event_id = null` for all current state.
#
# TODO: Add DB rows to better see what's going on.
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=after_user1_leave_token.room_key,
to_key=after_server_leave_token.room_key,
)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[],
)
def test_we_cause_server_left_room(self) -> None:
"""
Test that when probing over part of the DAG where the user leaves the room
causing the server to leave the room (because we were the last local user in the
room), we still see the join and leave changes.
This is to make sure we play nicely with this behavior: When the server leaves a
room, it will insert new rows with `event_id = null` into the
`current_state_delta_stream` table for all current state.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
room_id1 = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
"power_level_content_override": {
"users": {
user2_id: 100,
# Allow user1 to send state in the room
user1_id: 100,
}
}
},
)
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
join_pos1 = self.get_success(
self.store.get_position_for_event(join_response1["event_id"])
)
# Make sure that random other non-member state that happens to have a `state_key`
# matching the user ID doesn't mess with things.
self.helper.send_state(
room_id1,
event_type="foobarbazdummy",
state_key=user1_id,
body={"foo": "bar"},
tok=user1_tok,
)
# User2 should leave the room first.
self.helper.leave(room_id1, user2_id, tok=user2_tok)
# User1 (the person we're testing with) should also leave the room (everyone has
# left the room which means the server is no longer in the room).
leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
leave_pos1 = self.get_success(
self.store.get_position_for_event(leave_response1["event_id"])
)
after_room1_token = self.event_sources.get_current_token()
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like the
# following. When the server leaves a room, it will insert new rows with
# `event_id = null` for all current state.
#
# | stream_id | room_id | type | state_key | event_id | prev_event_id |
# |-----------|-----------|-----------------------------|---------------|----------|---------------|
# | 2 | '!x:test' | 'm.room.create' | '' | '$xxx' | None |
# | 3 | '!x:test' | 'm.room.member' | '@user2:test' | '$aaa' | None |
# | 4 | '!x:test' | 'm.room.history_visibility' | '' | '$xxx' | None |
# | 4 | '!x:test' | 'm.room.join_rules' | '' | '$xxx' | None |
# | 4 | '!x:test' | 'm.room.power_levels' | '' | '$xxx' | None |
# | 7 | '!x:test' | 'm.room.member' | '@user1:test' | '$ooo' | None |
# | 8 | '!x:test' | 'foobarbazdummy' | '@user1:test' | '$xxx' | None |
# | 9 | '!x:test' | 'm.room.member' | '@user2:test' | '$bbb' | '$aaa' |
# | 10 | '!x:test' | 'foobarbazdummy' | '@user1:test' | None | '$xxx' |
# | 10 | '!x:test' | 'm.room.create' | '' | None | '$xxx' |
# | 10 | '!x:test' | 'm.room.history_visibility' | '' | None | '$xxx' |
# | 10 | '!x:test' | 'm.room.join_rules' | '' | None | '$xxx' |
# | 10 | '!x:test' | 'm.room.member' | '@user1:test' | None | '$ooo' |
# | 10 | '!x:test' | 'm.room.member' | '@user2:test' | None | '$bbb' |
# | 10 | '!x:test' | 'm.room.power_levels' | '' | None | '$xxx' |
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=join_response1["event_id"],
event_pos=join_pos1,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
),
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=None, # leave_response1["event_id"],
event_pos=leave_pos1,
membership="leave",
sender=None, # user1_id,
prev_event_id=join_response1["event_id"],
prev_event_pos=join_pos1,
prev_membership="join",
prev_sender=user1_id,
),
],
)
def test_different_user_membership_persisted_in_same_batch(self) -> None:
"""
Test batch of membership events from different users being processed at once.
This will result in all of the memberships being stored in the
`current_state_delta_stream` table with the same `stream_ordering` even though
the individual events have different `stream_ordering`s.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
_user3_tok = self.login(user3_id, "pass")
user4_id = self.register_user("user4", "pass")
_user4_tok = self.login(user4_id, "pass")
before_room1_token = self.event_sources.get_current_token()
# User2 is just the designated person to create the room (we do this across the
# tests to be consistent)
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
# Persist the user1, user3, and user4 join events in the same batch so they all
# end up in the `current_state_delta_stream` table with the same
# stream_ordering.
join_event3, join_event_context3 = self.get_success(
create_event(
self.hs,
sender=user3_id,
type=EventTypes.Member,
state_key=user3_id,
content={"membership": "join"},
room_id=room_id1,
)
)
# We want to put user1 in the middle of the batch. This way, regardless of the
# implementation that inserts rows into current_state_delta_stream` (whether it
# be minimum/maximum of stream position of the batch), we will still catch bugs.
join_event1, join_event_context1 = self.get_success(
create_event(
self.hs,
sender=user1_id,
type=EventTypes.Member,
state_key=user1_id,
content={"membership": "join"},
room_id=room_id1,
)
)
join_event4, join_event_context4 = self.get_success(
create_event(
self.hs,
sender=user4_id,
type=EventTypes.Member,
state_key=user4_id,
content={"membership": "join"},
room_id=room_id1,
)
)
self.get_success(
self.persistence.persist_events(
[
(join_event3, join_event_context3),
(join_event1, join_event_context1),
(join_event4, join_event_context4),
]
)
)
after_room1_token = self.event_sources.get_current_token()
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like (notice
# those three memberships at the end with `stream_id=7` because we persisted
# them in the same batch):
#
# | stream_id | room_id | type | state_key | event_id | prev_event_id |
# |-----------|-----------|----------------------------|------------------|----------|---------------|
# | 2 | '!x:test' | 'm.room.create' | '' | '$xxx' | None |
# | 3 | '!x:test' | 'm.room.member' | '@user2:test' | '$xxx' | None |
# | 4 | '!x:test' | 'm.room.history_visibility'| '' | '$xxx' | None |
# | 4 | '!x:test' | 'm.room.join_rules' | '' | '$xxx' | None |
# | 4 | '!x:test' | 'm.room.power_levels' | '' | '$xxx' | None |
# | 7 | '!x:test' | 'm.room.member' | '@user3:test' | '$xxx' | None |
# | 7 | '!x:test' | 'm.room.member' | '@user1:test' | '$xxx' | None |
# | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
join_pos3 = self.get_success(
self.store.get_position_for_event(join_event3.event_id)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=join_event1.event_id,
# Ideally, this would be `join_pos1` (to match the `event_id`) but
# when events are persisted in a batch, they are all stored in the
# `current_state_delta_stream` table with the minimum
# `stream_ordering` from the batch.
event_pos=join_pos3,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
),
],
)
def test_state_reset(self) -> None:
"""
Test a state reset scenario where the user gets removed from the room (when
there is no corresponding leave event)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
join_pos1 = self.get_success(
self.store.get_position_for_event(join_response1["event_id"])
)
before_reset_token = self.event_sources.get_current_token()
# Send another state event to make a position for the state reset to happen at
dummy_state_response = self.helper.send_state(
room_id1,
event_type="foobarbaz",
state_key="",
body={"foo": "bar"},
tok=user2_tok,
)
dummy_state_pos = self.get_success(
self.store.get_position_for_event(dummy_state_response["event_id"])
)
# Mock a state reset removing the membership for user1 in the current state
self.get_success(
self.store.db_pool.simple_delete(
table="current_state_events",
keyvalues={
"room_id": room_id1,
"type": EventTypes.Member,
"state_key": user1_id,
},
desc="state reset user in current_state_delta_stream",
)
)
self.get_success(
self.store.db_pool.simple_insert(
table="current_state_delta_stream",
values={
"stream_id": dummy_state_pos.stream,
"room_id": room_id1,
"type": EventTypes.Member,
"state_key": user1_id,
"event_id": None,
"prev_event_id": join_response1["event_id"],
"instance_name": dummy_state_pos.instance_name,
},
desc="state reset user in current_state_delta_stream",
)
)
# Manually bust the cache since we we're just manually messing with the database
# and not causing an actual state reset.
self.store._membership_stream_cache.entity_has_changed(
user1_id, dummy_state_pos.stream
)
after_reset_token = self.event_sources.get_current_token()
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_reset_token.room_key,
to_key=after_reset_token.room_key,
)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=None,
event_pos=dummy_state_pos,
membership="leave",
sender=None, # user1_id,
prev_event_id=join_response1["event_id"],
prev_event_pos=join_pos1,
prev_membership="join",
prev_sender=user1_id,
),
],
)
def test_excluded_room_ids(self) -> None:
"""
Test that the `excluded_room_ids` option excludes changes from the specified rooms.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
join_pos1 = self.get_success(
self.store.get_position_for_event(join_response1["event_id"])
)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
join_pos2 = self.get_success(
self.store.get_position_for_event(join_response2["event_id"])
)
after_room1_token = self.event_sources.get_current_token()
# First test the the room is returned without the `excluded_room_ids` option
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=join_response1["event_id"],
event_pos=join_pos1,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
),
CurrentStateDeltaMembership(
room_id=room_id2,
event_id=join_response2["event_id"],
event_pos=join_pos2,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
),
],
)
# The test that `excluded_room_ids` excludes room2 as expected
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_room1_token.room_key,
to_key=after_room1_token.room_key,
excluded_room_ids=[room_id2],
)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=room_id1,
event_id=join_response1["event_id"],
event_pos=join_pos1,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
)
],
)
class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
FederatingHomeserverTestCase
):
"""
Test `get_current_state_delta_membership_changes_for_user(...)` when joining remote federated rooms.
"""
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
self.store = self.hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.room_member_handler = hs.get_room_member_handler()
def test_remote_join(self) -> None:
"""
Test remote join where the first rows in `current_state_delta_stream` will just
be the state when you joined the remote room.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
before_join_token = self.event_sources.get_current_token()
intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
# Remotely join a room on another homeserver.
#
# To do this we have to mock the responses from the remote homeserver. We also
# patch out a bunch of event checks on our end.
create_event_source = {
"auth_events": [],
"content": {
"creator": f"@creator:{self.OTHER_SERVER_NAME}",
"room_version": self.hs.config.server.default_room_version.identifier,
},
"depth": 0,
"origin_server_ts": 0,
"prev_events": [],
"room_id": intially_unjoined_room_id,
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
"state_key": "",
"type": EventTypes.Create,
}
self.add_hashes_and_signatures_from_other_server(
create_event_source,
self.hs.config.server.default_room_version,
)
create_event = FrozenEventV3(
create_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
creator_join_event_source = {
"auth_events": [create_event.event_id],
"content": {
"membership": "join",
},
"depth": 1,
"origin_server_ts": 1,
"prev_events": [],
"room_id": intially_unjoined_room_id,
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
"state_key": f"@creator:{self.OTHER_SERVER_NAME}",
"type": EventTypes.Member,
}
self.add_hashes_and_signatures_from_other_server(
creator_join_event_source,
self.hs.config.server.default_room_version,
)
creator_join_event = FrozenEventV3(
creator_join_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
# Our local user is going to remote join the room
join_event_source = {
"auth_events": [create_event.event_id],
"content": {"membership": "join"},
"depth": 1,
"origin_server_ts": 100,
"prev_events": [creator_join_event.event_id],
"sender": user1_id,
"state_key": user1_id,
"room_id": intially_unjoined_room_id,
"type": EventTypes.Member,
}
add_hashes_and_signatures(
self.hs.config.server.default_room_version,
join_event_source,
self.hs.hostname,
self.hs.signing_key,
)
join_event = FrozenEventV3(
join_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
mock_make_membership_event = AsyncMock(
return_value=(
self.OTHER_SERVER_NAME,
join_event,
self.hs.config.server.default_room_version,
)
)
mock_send_join = AsyncMock(
return_value=SendJoinResult(
join_event,
self.OTHER_SERVER_NAME,
state=[create_event, creator_join_event],
auth_chain=[create_event, creator_join_event],
partial_state=False,
servers_in_room=frozenset(),
)
)
with patch.object(
self.room_member_handler.federation_handler.federation_client,
"make_membership_event",
mock_make_membership_event,
), patch.object(
self.room_member_handler.federation_handler.federation_client,
"send_join",
mock_send_join,
), patch(
"synapse.event_auth._is_membership_change_allowed",
return_value=None,
), patch(
"synapse.handlers.federation_event.check_state_dependent_auth_rules",
return_value=None,
):
self.get_success(
self.room_member_handler.update_membership(
requester=create_requester(user1_id),
target=UserID.from_string(user1_id),
room_id=intially_unjoined_room_id,
action=Membership.JOIN,
remote_room_hosts=[self.OTHER_SERVER_NAME],
)
)
after_join_token = self.event_sources.get_current_token()
# Get the membership changes for the user.
#
# At this point, the `current_state_delta_stream` table should look like the
# following. Notice that all of the events are at the same `stream_id` because
# the current state starts out where we remotely joined:
#
# | stream_id | room_id | type | state_key | event_id | prev_event_id |
# |-----------|------------------------------|-----------------|------------------------------|----------|---------------|
# | 2 | '!example:other.example.com' | 'm.room.member' | '@user1:test' | '$xxx' | None |
# | 2 | '!example:other.example.com' | 'm.room.create' | '' | '$xxx' | None |
# | 2 | '!example:other.example.com' | 'm.room.member' | '@creator:other.example.com' | '$xxx' | None |
membership_changes = self.get_success(
self.store.get_current_state_delta_membership_changes_for_user(
user1_id,
from_key=before_join_token.room_key,
to_key=after_join_token.room_key,
)
)
join_pos = self.get_success(
self.store.get_position_for_event(join_event.event_id)
)
# Let the whole diff show on failure
self.maxDiff = None
self.assertEqual(
membership_changes,
[
CurrentStateDeltaMembership(
room_id=intially_unjoined_room_id,
event_id=join_event.event_id,
event_pos=join_pos,
membership="join",
sender=user1_id,
prev_event_id=None,
prev_event_pos=None,
prev_membership=None,
prev_sender=None,
),
],
)