mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-16 15:01:23 +01:00
Add Sliding Sync /sync
endpoint (initial implementation) (#17187)
Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync This iteration only focuses on returning the list of room IDs in the sliding window API (without sorting/filtering). Rooms appear in the Sliding sync response based on: - `invite`, `join`, `knock`, `ban` membership events - Kicks (`leave` membership events where `sender` is different from the `user_id`/`state_key`) - `newly_left` (rooms that were left during the given token range, > `from_token` and <= `to_token`) - In order for bans/kicks to not show up, you need to `/forget` those rooms. This doesn't modify the event itself though and only adds the `forgotten` flag to `room_memberships` in Synapse. There isn't a way to tell when a room was forgotten at the moment so we can't factor it into the from/to range. ### Example request `POST http://localhost:8008/_matrix/client/unstable/org.matrix.msc3575/sync` ```json { "lists": { "foo-list": { "ranges": [ [0, 99] ], "sort": [ "by_notification_level", "by_recency", "by_name" ], "required_state": [ ["m.room.join_rules", ""], ["m.room.history_visibility", ""], ["m.space.child", "*"] ], "timeline_limit": 100 } } } ``` Response: ```json { "next_pos": "s58_224_0_13_10_1_1_16_0_1", "lists": { "foo-list": { "count": 1, "ops": [ { "op": "SYNC", "range": [0, 99], "room_ids": [ "!MmgikIyFzsuvtnbvVG:my.synapse.linux.server" ] } ] } }, "rooms": {}, "extensions": {} } ```
This commit is contained in:
parent
ce9385819b
commit
4a7c58642c
11 changed files with 2302 additions and 15 deletions
1
changelog.d/17187.feature
Normal file
1
changelog.d/17187.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
|
@ -50,7 +50,7 @@ class Membership:
|
||||||
KNOCK: Final = "knock"
|
KNOCK: Final = "knock"
|
||||||
LEAVE: Final = "leave"
|
LEAVE: Final = "leave"
|
||||||
BAN: Final = "ban"
|
BAN: Final = "ban"
|
||||||
LIST: Final = (INVITE, JOIN, KNOCK, LEAVE, BAN)
|
LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN}
|
||||||
|
|
||||||
|
|
||||||
class PresenceState:
|
class PresenceState:
|
||||||
|
|
610
synapse/handlers/sliding_sync.py
Normal file
610
synapse/handlers/sliding_sync.py
Normal file
|
@ -0,0 +1,610 @@
|
||||||
|
#
|
||||||
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2024 New Vector, Ltd
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# See the GNU Affero General Public License for more details:
|
||||||
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
#
|
||||||
|
# Originally licensed under the Apache License, Version 2.0:
|
||||||
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||||
|
#
|
||||||
|
# [This file includes modifications made by New Vector Limited]
|
||||||
|
#
|
||||||
|
#
|
||||||
|
import logging
|
||||||
|
from enum import Enum
|
||||||
|
from typing import TYPE_CHECKING, AbstractSet, Dict, Final, List, Optional, Tuple
|
||||||
|
|
||||||
|
import attr
|
||||||
|
from immutabledict import immutabledict
|
||||||
|
|
||||||
|
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||||
|
|
||||||
|
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||||
|
from pydantic.v1 import Extra
|
||||||
|
else:
|
||||||
|
from pydantic import Extra
|
||||||
|
|
||||||
|
from synapse.api.constants import Membership
|
||||||
|
from synapse.events import EventBase
|
||||||
|
from synapse.rest.client.models import SlidingSyncBody
|
||||||
|
from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
|
||||||
|
"""
|
||||||
|
Returns True if the membership event should be included in the sync response,
|
||||||
|
otherwise False.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
membership: The membership state of the user in the room.
|
||||||
|
user_id: The user ID that the membership applies to
|
||||||
|
sender: The person who sent the membership event
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Everything except `Membership.LEAVE` because we want everything that's *still*
|
||||||
|
# relevant to the user. There are few more things to include in the sync response
|
||||||
|
# (newly_left) but those are handled separately.
|
||||||
|
#
|
||||||
|
# This logic includes kicks (leave events where the sender is not the same user) and
|
||||||
|
# can be read as "anything that isn't a leave or a leave with a different sender".
|
||||||
|
return membership != Membership.LEAVE or sender != user_id
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncConfig(SlidingSyncBody):
|
||||||
|
"""
|
||||||
|
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
|
||||||
|
extra fields that we need in the handler
|
||||||
|
"""
|
||||||
|
|
||||||
|
user: UserID
|
||||||
|
device_id: Optional[str]
|
||||||
|
|
||||||
|
# Pydantic config
|
||||||
|
class Config:
|
||||||
|
# By default, ignore fields that we don't recognise.
|
||||||
|
extra = Extra.ignore
|
||||||
|
# By default, don't allow fields to be reassigned after parsing.
|
||||||
|
allow_mutation = False
|
||||||
|
# Allow custom types like `UserID` to be used in the model
|
||||||
|
arbitrary_types_allowed = True
|
||||||
|
|
||||||
|
|
||||||
|
class OperationType(Enum):
|
||||||
|
"""
|
||||||
|
Represents the operation types in a Sliding Sync window.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
|
||||||
|
entries in this range.
|
||||||
|
INSERT: Sets a single entry. If the position is not empty then clients MUST move
|
||||||
|
entries to the left or the right depending on where the closest empty space is.
|
||||||
|
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
|
||||||
|
places.
|
||||||
|
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
|
||||||
|
offline support, but they should be treated as empty when additional operations
|
||||||
|
which concern indexes in the range arrive from the server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SYNC: Final = "SYNC"
|
||||||
|
INSERT: Final = "INSERT"
|
||||||
|
DELETE: Final = "DELETE"
|
||||||
|
INVALIDATE: Final = "INVALIDATE"
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class SlidingSyncResult:
|
||||||
|
"""
|
||||||
|
The Sliding Sync result to be serialized to JSON for a response.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
next_pos: The next position token in the sliding window to request (next_batch).
|
||||||
|
lists: Sliding window API. A map of list key to list results.
|
||||||
|
rooms: Room subscription API. A map of room ID to room subscription to room results.
|
||||||
|
extensions: Extensions API. A map of extension key to extension results.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class RoomResult:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
name: Room name or calculated room name.
|
||||||
|
avatar: Room avatar
|
||||||
|
heroes: List of stripped membership events (containing `user_id` and optionally
|
||||||
|
`avatar_url` and `displayname`) for the users used to calculate the room name.
|
||||||
|
initial: Flag which is set when this is the first time the server is sending this
|
||||||
|
data on this connection. Clients can use this flag to replace or update
|
||||||
|
their local state. When there is an update, servers MUST omit this flag
|
||||||
|
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
||||||
|
absence of this flag means 'false'.
|
||||||
|
required_state: The current state of the room
|
||||||
|
timeline: Latest events in the room. The last event is the most recent
|
||||||
|
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
||||||
|
between two people).
|
||||||
|
invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
|
||||||
|
in sync v2, absent on joined/left rooms
|
||||||
|
prev_batch: A token that can be passed as a start parameter to the
|
||||||
|
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
||||||
|
limited: True if their are more events than fit between the given position and now.
|
||||||
|
Sync again to get more.
|
||||||
|
joined_count: The number of users with membership of join, including the client's
|
||||||
|
own user ID. (same as sync `v2 m.joined_member_count`)
|
||||||
|
invited_count: The number of users with membership of invite. (same as sync v2
|
||||||
|
`m.invited_member_count`)
|
||||||
|
notification_count: The total number of unread notifications for this room. (same
|
||||||
|
as sync v2)
|
||||||
|
highlight_count: The number of unread notifications for this room with the highlight
|
||||||
|
flag set. (same as sync v2)
|
||||||
|
num_live: The number of timeline events which have just occurred and are not historical.
|
||||||
|
The last N events are 'live' and should be treated as such. This is mostly
|
||||||
|
useful to determine whether a given @mention event should make a noise or not.
|
||||||
|
Clients cannot rely solely on the absence of `initial: true` to determine live
|
||||||
|
events because if a room not in the sliding window bumps into the window because
|
||||||
|
of an @mention it will have `initial: true` yet contain a single live event
|
||||||
|
(with potentially other old events in the timeline).
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
avatar: Optional[str]
|
||||||
|
heroes: Optional[List[EventBase]]
|
||||||
|
initial: bool
|
||||||
|
required_state: List[EventBase]
|
||||||
|
timeline: List[EventBase]
|
||||||
|
is_dm: bool
|
||||||
|
invite_state: List[EventBase]
|
||||||
|
prev_batch: StreamToken
|
||||||
|
limited: bool
|
||||||
|
joined_count: int
|
||||||
|
invited_count: int
|
||||||
|
notification_count: int
|
||||||
|
highlight_count: int
|
||||||
|
num_live: int
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class SlidingWindowList:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
count: The total number of entries in the list. Always present if this list
|
||||||
|
is.
|
||||||
|
ops: The sliding list operations to perform.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class Operation:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
op: The operation type to perform.
|
||||||
|
range: Which index positions are affected by this operation. These are
|
||||||
|
both inclusive.
|
||||||
|
room_ids: Which room IDs are affected by this operation. These IDs match
|
||||||
|
up to the positions in the `range`, so the last room ID in this list
|
||||||
|
matches the 9th index. The room data is held in a separate object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
op: OperationType
|
||||||
|
range: Tuple[int, int]
|
||||||
|
room_ids: List[str]
|
||||||
|
|
||||||
|
count: int
|
||||||
|
ops: List[Operation]
|
||||||
|
|
||||||
|
next_pos: StreamToken
|
||||||
|
lists: Dict[str, SlidingWindowList]
|
||||||
|
rooms: Dict[str, RoomResult]
|
||||||
|
extensions: JsonMapping
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
"""Make the result appear empty if there are no updates. This is used
|
||||||
|
to tell if the notifier needs to wait for more events when polling for
|
||||||
|
events.
|
||||||
|
"""
|
||||||
|
return bool(self.lists or self.rooms or self.extensions)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def empty(next_pos: StreamToken) -> "SlidingSyncResult":
|
||||||
|
"Return a new empty result"
|
||||||
|
return SlidingSyncResult(
|
||||||
|
next_pos=next_pos,
|
||||||
|
lists={},
|
||||||
|
rooms={},
|
||||||
|
extensions={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncHandler:
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
self.auth_blocking = hs.get_auth_blocking()
|
||||||
|
self.notifier = hs.get_notifier()
|
||||||
|
self.event_sources = hs.get_event_sources()
|
||||||
|
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||||
|
|
||||||
|
async def wait_for_sync_for_user(
|
||||||
|
self,
|
||||||
|
requester: Requester,
|
||||||
|
sync_config: SlidingSyncConfig,
|
||||||
|
from_token: Optional[StreamToken] = None,
|
||||||
|
timeout_ms: int = 0,
|
||||||
|
) -> SlidingSyncResult:
|
||||||
|
"""Get the sync for a client if we have new data for it now. Otherwise
|
||||||
|
wait for new data to arrive on the server. If the timeout expires, then
|
||||||
|
return an empty sync result.
|
||||||
|
"""
|
||||||
|
# If the user is not part of the mau group, then check that limits have
|
||||||
|
# not been exceeded (if not part of the group by this point, almost certain
|
||||||
|
# auth_blocking will occur)
|
||||||
|
await self.auth_blocking.check_auth_blocking(requester=requester)
|
||||||
|
|
||||||
|
# TODO: If the To-Device extension is enabled and we have a `from_token`, delete
|
||||||
|
# any to-device messages before that token (since we now know that the device
|
||||||
|
# has received them). (see sync v2 for how to do this)
|
||||||
|
|
||||||
|
# If we're working with a user-provided token, we need to make sure to wait for
|
||||||
|
# this worker to catch up with the token so we don't skip past any incoming
|
||||||
|
# events or future events if the user is nefariously, manually modifying the
|
||||||
|
# token.
|
||||||
|
if from_token is not None:
|
||||||
|
# We need to make sure this worker has caught up with the token. If
|
||||||
|
# this returns false, it means we timed out waiting, and we should
|
||||||
|
# just return an empty response.
|
||||||
|
before_wait_ts = self.clock.time_msec()
|
||||||
|
if not await self.notifier.wait_for_stream_token(from_token):
|
||||||
|
logger.warning(
|
||||||
|
"Timed out waiting for worker to catch up. Returning empty response"
|
||||||
|
)
|
||||||
|
return SlidingSyncResult.empty(from_token)
|
||||||
|
|
||||||
|
# If we've spent significant time waiting to catch up, take it off
|
||||||
|
# the timeout.
|
||||||
|
after_wait_ts = self.clock.time_msec()
|
||||||
|
if after_wait_ts - before_wait_ts > 1_000:
|
||||||
|
timeout_ms -= after_wait_ts - before_wait_ts
|
||||||
|
timeout_ms = max(timeout_ms, 0)
|
||||||
|
|
||||||
|
# We're going to respond immediately if the timeout is 0 or if this is an
|
||||||
|
# initial sync (without a `from_token`) so we can avoid calling
|
||||||
|
# `notifier.wait_for_events()`.
|
||||||
|
if timeout_ms == 0 or from_token is None:
|
||||||
|
now_token = self.event_sources.get_current_token()
|
||||||
|
result = await self.current_sync_for_user(
|
||||||
|
sync_config,
|
||||||
|
from_token=from_token,
|
||||||
|
to_token=now_token,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Otherwise, we wait for something to happen and report it to the user.
|
||||||
|
async def current_sync_callback(
|
||||||
|
before_token: StreamToken, after_token: StreamToken
|
||||||
|
) -> SlidingSyncResult:
|
||||||
|
return await self.current_sync_for_user(
|
||||||
|
sync_config,
|
||||||
|
from_token=from_token,
|
||||||
|
to_token=after_token,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await self.notifier.wait_for_events(
|
||||||
|
sync_config.user.to_string(),
|
||||||
|
timeout_ms,
|
||||||
|
current_sync_callback,
|
||||||
|
from_token=from_token,
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def current_sync_for_user(
|
||||||
|
self,
|
||||||
|
sync_config: SlidingSyncConfig,
|
||||||
|
to_token: StreamToken,
|
||||||
|
from_token: Optional[StreamToken] = None,
|
||||||
|
) -> SlidingSyncResult:
|
||||||
|
"""
|
||||||
|
Generates the response body of a Sliding Sync result, represented as a
|
||||||
|
`SlidingSyncResult`.
|
||||||
|
"""
|
||||||
|
user_id = sync_config.user.to_string()
|
||||||
|
app_service = self.store.get_app_service_by_user_id(user_id)
|
||||||
|
if app_service:
|
||||||
|
# We no longer support AS users using /sync directly.
|
||||||
|
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
# Get all of the room IDs that the user should be able to see in the sync
|
||||||
|
# response
|
||||||
|
room_id_set = await self.get_sync_room_ids_for_user(
|
||||||
|
sync_config.user,
|
||||||
|
from_token=from_token,
|
||||||
|
to_token=to_token,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assemble sliding window lists
|
||||||
|
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
||||||
|
if sync_config.lists:
|
||||||
|
for list_key, list_config in sync_config.lists.items():
|
||||||
|
# TODO: Apply filters
|
||||||
|
#
|
||||||
|
# TODO: Exclude partially stated rooms unless the `required_state` has
|
||||||
|
# `["m.room.member", "$LAZY"]`
|
||||||
|
filtered_room_ids = room_id_set
|
||||||
|
# TODO: Apply sorts
|
||||||
|
sorted_room_ids = sorted(filtered_room_ids)
|
||||||
|
|
||||||
|
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
|
||||||
|
if list_config.ranges:
|
||||||
|
for range in list_config.ranges:
|
||||||
|
ops.append(
|
||||||
|
SlidingSyncResult.SlidingWindowList.Operation(
|
||||||
|
op=OperationType.SYNC,
|
||||||
|
range=range,
|
||||||
|
room_ids=sorted_room_ids[range[0] : range[1]],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
lists[list_key] = SlidingSyncResult.SlidingWindowList(
|
||||||
|
count=len(sorted_room_ids),
|
||||||
|
ops=ops,
|
||||||
|
)
|
||||||
|
|
||||||
|
return SlidingSyncResult(
|
||||||
|
next_pos=to_token,
|
||||||
|
lists=lists,
|
||||||
|
# TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
|
||||||
|
rooms={},
|
||||||
|
extensions={},
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_sync_room_ids_for_user(
|
||||||
|
self,
|
||||||
|
user: UserID,
|
||||||
|
to_token: StreamToken,
|
||||||
|
from_token: Optional[StreamToken] = None,
|
||||||
|
) -> AbstractSet[str]:
|
||||||
|
"""
|
||||||
|
Fetch room IDs that should be listed for this user in the sync response (the
|
||||||
|
full room list that will be filtered, sorted, and sliced).
|
||||||
|
|
||||||
|
We're looking for rooms where the user has the following state in the token
|
||||||
|
range (> `from_token` and <= `to_token`):
|
||||||
|
|
||||||
|
- `invite`, `join`, `knock`, `ban` membership events
|
||||||
|
- Kicks (`leave` membership events where `sender` is different from the
|
||||||
|
`user_id`/`state_key`)
|
||||||
|
- `newly_left` (rooms that were left during the given token range)
|
||||||
|
- In order for bans/kicks to not show up in sync, you need to `/forget` those
|
||||||
|
rooms. This doesn't modify the event itself though and only adds the
|
||||||
|
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
|
||||||
|
to tell when a room was forgotten at the moment so we can't factor it into the
|
||||||
|
from/to range.
|
||||||
|
"""
|
||||||
|
user_id = user.to_string()
|
||||||
|
|
||||||
|
# First grab a current snapshot rooms for the user
|
||||||
|
# (also handles forgotten rooms)
|
||||||
|
room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
|
||||||
|
user_id=user_id,
|
||||||
|
# We want to fetch any kind of membership (joined and left rooms) in order
|
||||||
|
# to get the `event_pos` of the latest room membership event for the
|
||||||
|
# user.
|
||||||
|
#
|
||||||
|
# We will filter out the rooms that don't belong below (see
|
||||||
|
# `filter_membership_for_sync`)
|
||||||
|
membership_list=Membership.LIST,
|
||||||
|
excluded_rooms=self.rooms_to_exclude_globally,
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the user has never joined any rooms before, we can just return an empty list
|
||||||
|
if not room_for_user_list:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
# Our working list of rooms that can show up in the sync response
|
||||||
|
sync_room_id_set = {
|
||||||
|
room_for_user.room_id
|
||||||
|
for room_for_user in room_for_user_list
|
||||||
|
if filter_membership_for_sync(
|
||||||
|
membership=room_for_user.membership,
|
||||||
|
user_id=user_id,
|
||||||
|
sender=room_for_user.sender,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
|
||||||
|
# our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
|
||||||
|
#
|
||||||
|
# First, we need to get the max stream_ordering of each event persister instance
|
||||||
|
# that we queried events from.
|
||||||
|
instance_to_max_stream_ordering_map: Dict[str, int] = {}
|
||||||
|
for room_for_user in room_for_user_list:
|
||||||
|
instance_name = room_for_user.event_pos.instance_name
|
||||||
|
stream_ordering = room_for_user.event_pos.stream
|
||||||
|
|
||||||
|
current_instance_max_stream_ordering = (
|
||||||
|
instance_to_max_stream_ordering_map.get(instance_name)
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
current_instance_max_stream_ordering is None
|
||||||
|
or stream_ordering > current_instance_max_stream_ordering
|
||||||
|
):
|
||||||
|
instance_to_max_stream_ordering_map[instance_name] = stream_ordering
|
||||||
|
|
||||||
|
# Then assemble the `RoomStreamToken`
|
||||||
|
membership_snapshot_token = RoomStreamToken(
|
||||||
|
# Minimum position in the `instance_map`
|
||||||
|
stream=min(instance_to_max_stream_ordering_map.values()),
|
||||||
|
instance_map=immutabledict(instance_to_max_stream_ordering_map),
|
||||||
|
)
|
||||||
|
|
||||||
|
# If our `to_token` is already the same or ahead of the latest room membership
|
||||||
|
# for the user, we can just straight-up return the room list (nothing has
|
||||||
|
# changed)
|
||||||
|
if membership_snapshot_token.is_before_or_eq(to_token.room_key):
|
||||||
|
return sync_room_id_set
|
||||||
|
|
||||||
|
# Since we fetched the users room list at some point in time after the from/to
|
||||||
|
# tokens, we need to revert/rewind some membership changes to match the point in
|
||||||
|
# time of the `to_token`. In particular, we need to make these fixups:
|
||||||
|
#
|
||||||
|
# - 1a) Remove rooms that the user joined after the `to_token`
|
||||||
|
# - 1b) Add back rooms that the user left after the `to_token`
|
||||||
|
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
|
||||||
|
#
|
||||||
|
# Below, we're doing two separate lookups for membership changes. We could
|
||||||
|
# request everything for both fixups in one range, [`from_token.room_key`,
|
||||||
|
# `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
|
||||||
|
# comparison without `instance_name` (which is flawed). We could refactor
|
||||||
|
# `event.internal_metadata` to include `instance_name` but it might turn out a
|
||||||
|
# little difficult and a bigger, broader Synapse change than we want to make.
|
||||||
|
|
||||||
|
# 1) -----------------------------------------------------
|
||||||
|
|
||||||
|
# 1) Fetch membership changes that fall in the range from `to_token` up to
|
||||||
|
# `membership_snapshot_token`
|
||||||
|
membership_change_events_after_to_token = (
|
||||||
|
await self.store.get_membership_changes_for_user(
|
||||||
|
user_id,
|
||||||
|
from_key=to_token.room_key,
|
||||||
|
to_key=membership_snapshot_token,
|
||||||
|
excluded_rooms=self.rooms_to_exclude_globally,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 1) Assemble a list of the last membership events in some given ranges. Someone
|
||||||
|
# could have left and joined multiple times during the given range but we only
|
||||||
|
# care about end-result so we grab the last one.
|
||||||
|
last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
|
||||||
|
# We also need the first membership event after the `to_token` so we can step
|
||||||
|
# backward to the previous membership that would apply to the from/to range.
|
||||||
|
first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
|
||||||
|
for event in membership_change_events_after_to_token:
|
||||||
|
last_membership_change_by_room_id_after_to_token[event.room_id] = event
|
||||||
|
# Only set if we haven't already set it
|
||||||
|
first_membership_change_by_room_id_after_to_token.setdefault(
|
||||||
|
event.room_id, event
|
||||||
|
)
|
||||||
|
|
||||||
|
# 1) Fixup
|
||||||
|
for (
|
||||||
|
last_membership_change_after_to_token
|
||||||
|
) in last_membership_change_by_room_id_after_to_token.values():
|
||||||
|
room_id = last_membership_change_after_to_token.room_id
|
||||||
|
|
||||||
|
# We want to find the first membership change after the `to_token` then step
|
||||||
|
# backward to know the membership in the from/to range.
|
||||||
|
first_membership_change_after_to_token = (
|
||||||
|
first_membership_change_by_room_id_after_to_token.get(room_id)
|
||||||
|
)
|
||||||
|
assert first_membership_change_after_to_token is not None, (
|
||||||
|
"If there was a `last_membership_change_after_to_token` that we're iterating over, "
|
||||||
|
+ "then there should be corresponding a first change. For example, even if there "
|
||||||
|
+ "is only one event after the `to_token`, the first and last event will be same event. "
|
||||||
|
+ "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
|
||||||
|
+ "/`first_membership_change_by_room_id_after_to_token` dicts above."
|
||||||
|
)
|
||||||
|
# TODO: Instead of reading from `unsigned`, refactor this to use the
|
||||||
|
# `current_state_delta_stream` table in the future. Probably a new
|
||||||
|
# `get_membership_changes_for_user()` function that uses
|
||||||
|
# `current_state_delta_stream` with a join to `room_memberships`. This would
|
||||||
|
# help in state reset scenarios since `prev_content` is looking at the
|
||||||
|
# current branch vs the current room state. This is all just data given to
|
||||||
|
# the client so no real harm to data integrity, but we'd like to be nice to
|
||||||
|
# the client. Since the `current_state_delta_stream` table is new, it
|
||||||
|
# doesn't have all events in it. Since this is Sliding Sync, if we ever need
|
||||||
|
# to, we can signal the client to throw all of their state away by sending
|
||||||
|
# "operation: RESET".
|
||||||
|
prev_content = first_membership_change_after_to_token.unsigned.get(
|
||||||
|
"prev_content", {}
|
||||||
|
)
|
||||||
|
prev_membership = prev_content.get("membership", None)
|
||||||
|
prev_sender = first_membership_change_after_to_token.unsigned.get(
|
||||||
|
"prev_sender", None
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if the previous membership (membership that applies to the from/to
|
||||||
|
# range) should be included in our `sync_room_id_set`
|
||||||
|
should_prev_membership_be_included = (
|
||||||
|
prev_membership is not None
|
||||||
|
and prev_sender is not None
|
||||||
|
and filter_membership_for_sync(
|
||||||
|
membership=prev_membership,
|
||||||
|
user_id=user_id,
|
||||||
|
sender=prev_sender,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if the last membership (membership that applies to our snapshot) was
|
||||||
|
# already included in our `sync_room_id_set`
|
||||||
|
was_last_membership_already_included = filter_membership_for_sync(
|
||||||
|
membership=last_membership_change_after_to_token.membership,
|
||||||
|
user_id=user_id,
|
||||||
|
sender=last_membership_change_after_to_token.sender,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 1a) Add back rooms that the user left after the `to_token`
|
||||||
|
#
|
||||||
|
# For example, if the last membership event after the `to_token` is a leave
|
||||||
|
# event, then the room was excluded from `sync_room_id_set` when we first
|
||||||
|
# crafted it above. We should add these rooms back as long as the user also
|
||||||
|
# was part of the room before the `to_token`.
|
||||||
|
if (
|
||||||
|
not was_last_membership_already_included
|
||||||
|
and should_prev_membership_be_included
|
||||||
|
):
|
||||||
|
sync_room_id_set.add(room_id)
|
||||||
|
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
|
||||||
|
#
|
||||||
|
# For example, if the last membership event after the `to_token` is a "join"
|
||||||
|
# event, then the room was included `sync_room_id_set` when we first crafted
|
||||||
|
# it above. We should remove these rooms as long as the user also wasn't
|
||||||
|
# part of the room before the `to_token`.
|
||||||
|
elif (
|
||||||
|
was_last_membership_already_included
|
||||||
|
and not should_prev_membership_be_included
|
||||||
|
):
|
||||||
|
sync_room_id_set.discard(room_id)
|
||||||
|
|
||||||
|
# 2) -----------------------------------------------------
|
||||||
|
# We fix-up newly_left rooms after the first fixup because it may have removed
|
||||||
|
# some left rooms that we can figure out our newly_left in the following code
|
||||||
|
|
||||||
|
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
|
||||||
|
membership_change_events_in_from_to_range = []
|
||||||
|
if from_token:
|
||||||
|
membership_change_events_in_from_to_range = (
|
||||||
|
await self.store.get_membership_changes_for_user(
|
||||||
|
user_id,
|
||||||
|
from_key=from_token.room_key,
|
||||||
|
to_key=to_token.room_key,
|
||||||
|
excluded_rooms=self.rooms_to_exclude_globally,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2) Assemble a list of the last membership events in some given ranges. Someone
|
||||||
|
# could have left and joined multiple times during the given range but we only
|
||||||
|
# care about end-result so we grab the last one.
|
||||||
|
last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
|
||||||
|
for event in membership_change_events_in_from_to_range:
|
||||||
|
last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
|
||||||
|
|
||||||
|
# 2) Fixup
|
||||||
|
for (
|
||||||
|
last_membership_change_in_from_to_range
|
||||||
|
) in last_membership_change_by_room_id_in_from_to_range.values():
|
||||||
|
room_id = last_membership_change_in_from_to_range.room_id
|
||||||
|
|
||||||
|
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
|
||||||
|
# include newly_left rooms because the last event that the user should see
|
||||||
|
# is their own leave event
|
||||||
|
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
|
||||||
|
sync_room_id_set.add(room_id)
|
||||||
|
|
||||||
|
return sync_room_id_set
|
|
@ -2002,7 +2002,7 @@ class SyncHandler:
|
||||||
"""
|
"""
|
||||||
user_id = sync_config.user.to_string()
|
user_id = sync_config.user.to_string()
|
||||||
|
|
||||||
# Note: we get the users room list *before* we get the current token, this
|
# Note: we get the users room list *before* we get the `now_token`, this
|
||||||
# avoids checking back in history if rooms are joined after the token is fetched.
|
# avoids checking back in history if rooms are joined after the token is fetched.
|
||||||
token_before_rooms = self.event_sources.get_current_token()
|
token_before_rooms = self.event_sources.get_current_token()
|
||||||
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
|
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
|
||||||
|
@ -2014,10 +2014,10 @@ class SyncHandler:
|
||||||
now_token = self.event_sources.get_current_token()
|
now_token = self.event_sources.get_current_token()
|
||||||
log_kv({"now_token": now_token})
|
log_kv({"now_token": now_token})
|
||||||
|
|
||||||
# Since we fetched the users room list before the token, there's a small window
|
# Since we fetched the users room list before calculating the `now_token` (see
|
||||||
# during which membership events may have been persisted, so we fetch these now
|
# above), there's a small window during which membership events may have been
|
||||||
# and modify the joined room list for any changes between the get_rooms_for_user
|
# persisted, so we fetch these now and modify the joined room list for any
|
||||||
# call and the get_current_token call.
|
# changes between the get_rooms_for_user call and the get_current_token call.
|
||||||
membership_change_events = []
|
membership_change_events = []
|
||||||
if since_token:
|
if since_token:
|
||||||
membership_change_events = await self.store.get_membership_changes_for_user(
|
membership_change_events = await self.store.get_membership_changes_for_user(
|
||||||
|
@ -2027,16 +2027,19 @@ class SyncHandler:
|
||||||
self.rooms_to_exclude_globally,
|
self.rooms_to_exclude_globally,
|
||||||
)
|
)
|
||||||
|
|
||||||
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
last_membership_change_by_room_id: Dict[str, EventBase] = {}
|
||||||
for event in membership_change_events:
|
for event in membership_change_events:
|
||||||
mem_last_change_by_room_id[event.room_id] = event
|
last_membership_change_by_room_id[event.room_id] = event
|
||||||
|
|
||||||
# For the latest membership event in each room found, add/remove the room ID
|
# For the latest membership event in each room found, add/remove the room ID
|
||||||
# from the joined room list accordingly. In this case we only care if the
|
# from the joined room list accordingly. In this case we only care if the
|
||||||
# latest change is JOIN.
|
# latest change is JOIN.
|
||||||
|
|
||||||
for room_id, event in mem_last_change_by_room_id.items():
|
for room_id, event in last_membership_change_by_room_id.items():
|
||||||
assert event.internal_metadata.stream_ordering
|
assert event.internal_metadata.stream_ordering
|
||||||
|
# As a shortcut, skip any events that happened before we got our
|
||||||
|
# `get_rooms_for_user()` snapshot (any changes are already represented
|
||||||
|
# in that list).
|
||||||
if (
|
if (
|
||||||
event.internal_metadata.stream_ordering
|
event.internal_metadata.stream_ordering
|
||||||
< token_before_rooms.room_key.stream
|
< token_before_rooms.room_key.stream
|
||||||
|
|
|
@ -18,14 +18,30 @@
|
||||||
# [This file includes modifications made by New Vector Limited]
|
# [This file includes modifications made by New Vector Limited]
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
from typing import TYPE_CHECKING, Dict, Optional
|
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||||
|
|
||||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||||
from pydantic.v1 import Extra, StrictInt, StrictStr, constr, validator
|
from pydantic.v1 import (
|
||||||
|
Extra,
|
||||||
|
StrictBool,
|
||||||
|
StrictInt,
|
||||||
|
StrictStr,
|
||||||
|
conint,
|
||||||
|
constr,
|
||||||
|
validator,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
from pydantic import Extra, StrictInt, StrictStr, constr, validator
|
from pydantic import (
|
||||||
|
Extra,
|
||||||
|
StrictBool,
|
||||||
|
StrictInt,
|
||||||
|
StrictStr,
|
||||||
|
conint,
|
||||||
|
constr,
|
||||||
|
validator,
|
||||||
|
)
|
||||||
|
|
||||||
from synapse.rest.models import RequestBodyModel
|
from synapse.rest.models import RequestBodyModel
|
||||||
from synapse.util.threepids import validate_email
|
from synapse.util.threepids import validate_email
|
||||||
|
@ -97,3 +113,172 @@ else:
|
||||||
class MsisdnRequestTokenBody(ThreepidRequestTokenBody):
|
class MsisdnRequestTokenBody(ThreepidRequestTokenBody):
|
||||||
country: ISO3116_1_Alpha_2
|
country: ISO3116_1_Alpha_2
|
||||||
phone_number: StrictStr
|
phone_number: StrictStr
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncBody(RequestBodyModel):
|
||||||
|
"""
|
||||||
|
Sliding Sync API request body.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
lists: Sliding window API. A map of list key to list information
|
||||||
|
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be
|
||||||
|
arbitrary strings which the client is using to refer to the list. Keep this
|
||||||
|
small as it needs to be sent a lot. Max length: 64 bytes.
|
||||||
|
room_subscriptions: Room subscription API. A map of room ID to room subscription
|
||||||
|
information. Used to subscribe to a specific room. Sometimes clients know
|
||||||
|
exactly which room they want to get information about e.g by following a
|
||||||
|
permalink or by refreshing a webapp currently viewing a specific room. The
|
||||||
|
sliding window API alone is insufficient for this use case because there's
|
||||||
|
no way to say "please track this room explicitly".
|
||||||
|
extensions: Extensions API. A map of extension key to extension config.
|
||||||
|
"""
|
||||||
|
|
||||||
|
class CommonRoomParameters(RequestBodyModel):
|
||||||
|
"""
|
||||||
|
Common parameters shared between the sliding window and room subscription APIs.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
required_state: Required state for each room returned. An array of event
|
||||||
|
type and state key tuples. Elements in this array are ORd together to
|
||||||
|
produce the final set of state events to return. One unique exception is
|
||||||
|
when you request all state events via `["*", "*"]`. When used, all state
|
||||||
|
events are returned by default, and additional entries FILTER OUT the
|
||||||
|
returned set of state events. These additional entries cannot use `*`
|
||||||
|
themselves. For example, `["*", "*"], ["m.room.member",
|
||||||
|
"@alice:example.com"]` will *exclude* every `m.room.member` event
|
||||||
|
*except* for `@alice:example.com`, and include every other state event.
|
||||||
|
In addition, `["*", "*"], ["m.space.child", "*"]` is an error, the
|
||||||
|
`m.space.child` filter is not required as it would have been returned
|
||||||
|
anyway.
|
||||||
|
timeline_limit: The maximum number of timeline events to return per response.
|
||||||
|
(Max 1000 messages)
|
||||||
|
include_old_rooms: Determines if `predecessor` rooms are included in the
|
||||||
|
`rooms` response. The user MUST be joined to old rooms for them to show up
|
||||||
|
in the response.
|
||||||
|
"""
|
||||||
|
|
||||||
|
class IncludeOldRooms(RequestBodyModel):
|
||||||
|
timeline_limit: StrictInt
|
||||||
|
required_state: List[Tuple[StrictStr, StrictStr]]
|
||||||
|
|
||||||
|
required_state: List[Tuple[StrictStr, StrictStr]]
|
||||||
|
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
timeline_limit: int
|
||||||
|
else:
|
||||||
|
timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type]
|
||||||
|
include_old_rooms: Optional[IncludeOldRooms] = None
|
||||||
|
|
||||||
|
class SlidingSyncList(CommonRoomParameters):
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
ranges: Sliding window ranges. If this field is missing, no sliding window
|
||||||
|
is used and all rooms are returned in this list. Integers are
|
||||||
|
*inclusive*.
|
||||||
|
sort: How the list should be sorted on the server. The first value is
|
||||||
|
applied first, then tiebreaks are performed with each subsequent sort
|
||||||
|
listed.
|
||||||
|
|
||||||
|
FIXME: Furthermore, it's not currently defined how servers should behave
|
||||||
|
if they encounter a filter or sort operation they do not recognise. If
|
||||||
|
the server rejects the request with an HTTP 400 then that will break
|
||||||
|
backwards compatibility with new clients vs old servers. However, the
|
||||||
|
client would be otherwise unaware that only some of the sort/filter
|
||||||
|
operations have taken effect. We may need to include a "warnings"
|
||||||
|
section to indicate which sort/filter operations are unrecognised,
|
||||||
|
allowing for some form of graceful degradation of service.
|
||||||
|
-- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions
|
||||||
|
|
||||||
|
slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
|
||||||
|
sliding windows). When true, the `ranges` and `sort` fields are ignored.
|
||||||
|
required_state: Required state for each room returned. An array of event
|
||||||
|
type and state key tuples. Elements in this array are ORd together to
|
||||||
|
produce the final set of state events to return.
|
||||||
|
|
||||||
|
One unique exception is when you request all state events via `["*",
|
||||||
|
"*"]`. When used, all state events are returned by default, and
|
||||||
|
additional entries FILTER OUT the returned set of state events. These
|
||||||
|
additional entries cannot use `*` themselves. For example, `["*", "*"],
|
||||||
|
["m.room.member", "@alice:example.com"]` will *exclude* every
|
||||||
|
`m.room.member` event *except* for `@alice:example.com`, and include
|
||||||
|
every other state event. In addition, `["*", "*"], ["m.space.child",
|
||||||
|
"*"]` is an error, the `m.space.child` filter is not required as it
|
||||||
|
would have been returned anyway.
|
||||||
|
|
||||||
|
Room members can be lazily-loaded by using the special `$LAZY` state key
|
||||||
|
(`["m.room.member", "$LAZY"]`). Typically, when you view a room, you
|
||||||
|
want to retrieve all state events except for m.room.member events which
|
||||||
|
you want to lazily load. To get this behaviour, clients can send the
|
||||||
|
following::
|
||||||
|
|
||||||
|
{
|
||||||
|
"required_state": [
|
||||||
|
// activate lazy loading
|
||||||
|
["m.room.member", "$LAZY"],
|
||||||
|
// request all state events _except_ for m.room.member
|
||||||
|
events which are lazily loaded
|
||||||
|
["*", "*"]
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
timeline_limit: The maximum number of timeline events to return per response.
|
||||||
|
include_old_rooms: Determines if `predecessor` rooms are included in the
|
||||||
|
`rooms` response. The user MUST be joined to old rooms for them to show up
|
||||||
|
in the response.
|
||||||
|
include_heroes: Return a stripped variant of membership events (containing
|
||||||
|
`user_id` and optionally `avatar_url` and `displayname`) for the users used
|
||||||
|
to calculate the room name.
|
||||||
|
filters: Filters to apply to the list before sorting.
|
||||||
|
bump_event_types: Allowlist of event types which should be considered recent activity
|
||||||
|
when sorting `by_recency`. By omitting event types from this field,
|
||||||
|
clients can ensure that uninteresting events (e.g. a profile rename) do
|
||||||
|
not cause a room to jump to the top of its list(s). Empty or omitted
|
||||||
|
`bump_event_types` have no effect—all events in a room will be
|
||||||
|
considered recent activity.
|
||||||
|
"""
|
||||||
|
|
||||||
|
class Filters(RequestBodyModel):
|
||||||
|
is_dm: Optional[StrictBool] = None
|
||||||
|
spaces: Optional[List[StrictStr]] = None
|
||||||
|
is_encrypted: Optional[StrictBool] = None
|
||||||
|
is_invite: Optional[StrictBool] = None
|
||||||
|
room_types: Optional[List[Union[StrictStr, None]]] = None
|
||||||
|
not_room_types: Optional[List[StrictStr]] = None
|
||||||
|
room_name_like: Optional[StrictStr] = None
|
||||||
|
tags: Optional[List[StrictStr]] = None
|
||||||
|
not_tags: Optional[List[StrictStr]] = None
|
||||||
|
|
||||||
|
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
ranges: Optional[List[Tuple[int, int]]] = None
|
||||||
|
else:
|
||||||
|
ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None # type: ignore[valid-type]
|
||||||
|
sort: Optional[List[StrictStr]] = None
|
||||||
|
slow_get_all_rooms: Optional[StrictBool] = False
|
||||||
|
include_heroes: Optional[StrictBool] = False
|
||||||
|
filters: Optional[Filters] = None
|
||||||
|
bump_event_types: Optional[List[StrictStr]] = None
|
||||||
|
|
||||||
|
class RoomSubscription(CommonRoomParameters):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class Extension(RequestBodyModel):
|
||||||
|
enabled: Optional[StrictBool] = False
|
||||||
|
lists: Optional[List[StrictStr]] = None
|
||||||
|
rooms: Optional[List[StrictStr]] = None
|
||||||
|
|
||||||
|
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
lists: Optional[Dict[str, SlidingSyncList]] = None
|
||||||
|
else:
|
||||||
|
lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]] = None # type: ignore[valid-type]
|
||||||
|
room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]] = None
|
||||||
|
extensions: Optional[Dict[StrictStr, Extension]] = None
|
||||||
|
|
||||||
|
@validator("lists")
|
||||||
|
def lists_length_check(
|
||||||
|
cls, value: Optional[Dict[str, SlidingSyncList]]
|
||||||
|
) -> Optional[Dict[str, SlidingSyncList]]:
|
||||||
|
if value is not None:
|
||||||
|
assert len(value) <= 100, f"Max lists: 100 but saw {len(value)}"
|
||||||
|
return value
|
||||||
|
|
|
@ -292,6 +292,9 @@ class RoomStateEventRestServlet(RestServlet):
|
||||||
try:
|
try:
|
||||||
if event_type == EventTypes.Member:
|
if event_type == EventTypes.Member:
|
||||||
membership = content.get("membership", None)
|
membership = content.get("membership", None)
|
||||||
|
if not isinstance(membership, str):
|
||||||
|
raise SynapseError(400, "Invalid membership (must be a string)")
|
||||||
|
|
||||||
event_id, _ = await self.room_member_handler.update_membership(
|
event_id, _ = await self.room_member_handler.update_membership(
|
||||||
requester,
|
requester,
|
||||||
target=UserID.from_string(state_key),
|
target=UserID.from_string(state_key),
|
||||||
|
|
|
@ -33,6 +33,7 @@ from synapse.events.utils import (
|
||||||
format_event_raw,
|
format_event_raw,
|
||||||
)
|
)
|
||||||
from synapse.handlers.presence import format_user_presence_state
|
from synapse.handlers.presence import format_user_presence_state
|
||||||
|
from synapse.handlers.sliding_sync import SlidingSyncConfig, SlidingSyncResult
|
||||||
from synapse.handlers.sync import (
|
from synapse.handlers.sync import (
|
||||||
ArchivedSyncResult,
|
ArchivedSyncResult,
|
||||||
InvitedSyncResult,
|
InvitedSyncResult,
|
||||||
|
@ -43,9 +44,16 @@ from synapse.handlers.sync import (
|
||||||
SyncVersion,
|
SyncVersion,
|
||||||
)
|
)
|
||||||
from synapse.http.server import HttpServer
|
from synapse.http.server import HttpServer
|
||||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
from synapse.http.servlet import (
|
||||||
|
RestServlet,
|
||||||
|
parse_and_validate_json_object_from_request,
|
||||||
|
parse_boolean,
|
||||||
|
parse_integer,
|
||||||
|
parse_string,
|
||||||
|
)
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.opentracing import trace_with_opname
|
from synapse.logging.opentracing import trace_with_opname
|
||||||
|
from synapse.rest.client.models import SlidingSyncBody
|
||||||
from synapse.types import JsonDict, Requester, StreamToken
|
from synapse.types import JsonDict, Requester, StreamToken
|
||||||
from synapse.util import json_decoder
|
from synapse.util import json_decoder
|
||||||
from synapse.util.caches.lrucache import LruCache
|
from synapse.util.caches.lrucache import LruCache
|
||||||
|
@ -735,8 +743,228 @@ class SlidingSyncE2eeRestServlet(RestServlet):
|
||||||
return 200, response
|
return 200, response
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncRestServlet(RestServlet):
|
||||||
|
"""
|
||||||
|
API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
|
||||||
|
subset (sliding window) of rooms, state, and timeline events (just what they need)
|
||||||
|
in order to bootstrap quickly and subscribe to only what the client cares about.
|
||||||
|
Because the client can specify what it cares about, we can respond quickly and skip
|
||||||
|
all of the work we would normally have to do with a sync v2 response.
|
||||||
|
|
||||||
|
Request query parameters:
|
||||||
|
timeout: How long to wait for new events in milliseconds.
|
||||||
|
pos: Stream position token when asking for incremental deltas.
|
||||||
|
|
||||||
|
Request body::
|
||||||
|
{
|
||||||
|
// Sliding Window API
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [ [0, 99] ],
|
||||||
|
"sort": [ "by_notification_level", "by_recency", "by_name" ],
|
||||||
|
"required_state": [
|
||||||
|
["m.room.join_rules", ""],
|
||||||
|
["m.room.history_visibility", ""],
|
||||||
|
["m.space.child", "*"]
|
||||||
|
],
|
||||||
|
"timeline_limit": 10,
|
||||||
|
"filters": {
|
||||||
|
"is_dm": true
|
||||||
|
},
|
||||||
|
"bump_event_types": [ "m.room.message", "m.room.encrypted" ],
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Room Subscriptions API
|
||||||
|
"room_subscriptions": {
|
||||||
|
"!sub1:bar": {
|
||||||
|
"required_state": [ ["*","*"] ],
|
||||||
|
"timeline_limit": 10,
|
||||||
|
"include_old_rooms": {
|
||||||
|
"timeline_limit": 1,
|
||||||
|
"required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Extensions API
|
||||||
|
"extensions": {}
|
||||||
|
}
|
||||||
|
|
||||||
|
Response JSON::
|
||||||
|
{
|
||||||
|
"next_pos": "s58_224_0_13_10_1_1_16_0_1",
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"count": 1337,
|
||||||
|
"ops": [{
|
||||||
|
"op": "SYNC",
|
||||||
|
"range": [0, 99],
|
||||||
|
"room_ids": [
|
||||||
|
"!foo:bar",
|
||||||
|
// ... 99 more room IDs
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Aggregated rooms from lists and room subscriptions
|
||||||
|
"rooms": {
|
||||||
|
// Room from room subscription
|
||||||
|
"!sub1:bar": {
|
||||||
|
"name": "Alice and Bob",
|
||||||
|
"avatar": "mxc://...",
|
||||||
|
"initial": true,
|
||||||
|
"required_state": [
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}}
|
||||||
|
],
|
||||||
|
"timeline": [
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
|
||||||
|
],
|
||||||
|
"prev_batch": "t111_222_333",
|
||||||
|
"joined_count": 41,
|
||||||
|
"invited_count": 1,
|
||||||
|
"notification_count": 1,
|
||||||
|
"highlight_count": 0
|
||||||
|
},
|
||||||
|
// rooms from list
|
||||||
|
"!foo:bar": {
|
||||||
|
"name": "The calculated room name",
|
||||||
|
"avatar": "mxc://...",
|
||||||
|
"initial": true,
|
||||||
|
"required_state": [
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.space.child", "state_key":"!foo:example.com", "content":{"via":["example.com"]}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.space.child", "state_key":"!bar:example.com", "content":{"via":["example.com"]}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.space.child", "state_key":"!baz:example.com", "content":{"via":["example.com"]}}
|
||||||
|
],
|
||||||
|
"timeline": [
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"C"}},
|
||||||
|
{"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"D"}},
|
||||||
|
],
|
||||||
|
"prev_batch": "t111_222_333",
|
||||||
|
"joined_count": 4,
|
||||||
|
"invited_count": 0,
|
||||||
|
"notification_count": 54,
|
||||||
|
"highlight_count": 3
|
||||||
|
},
|
||||||
|
// ... 99 more items
|
||||||
|
},
|
||||||
|
"extensions": {}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
PATTERNS = client_patterns(
|
||||||
|
"/org.matrix.msc3575/sync$", releases=[], v1=False, unstable=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
super().__init__()
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
self.filtering = hs.get_filtering()
|
||||||
|
self.sliding_sync_handler = hs.get_sliding_sync_handler()
|
||||||
|
|
||||||
|
# TODO: Update this to `on_GET` once we figure out how we want to handle params
|
||||||
|
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||||
|
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||||
|
user = requester.user
|
||||||
|
device_id = requester.device_id
|
||||||
|
|
||||||
|
timeout = parse_integer(request, "timeout", default=0)
|
||||||
|
# Position in the stream
|
||||||
|
from_token_string = parse_string(request, "pos")
|
||||||
|
|
||||||
|
from_token = None
|
||||||
|
if from_token_string is not None:
|
||||||
|
from_token = await StreamToken.from_string(self.store, from_token_string)
|
||||||
|
|
||||||
|
# TODO: We currently don't know whether we're going to use sticky params or
|
||||||
|
# maybe some filters like sync v2 where they are built up once and referenced
|
||||||
|
# by filter ID. For now, we will just prototype with always passing everything
|
||||||
|
# in.
|
||||||
|
body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
|
||||||
|
logger.info("Sliding sync request: %r", body)
|
||||||
|
|
||||||
|
sync_config = SlidingSyncConfig(
|
||||||
|
user=user,
|
||||||
|
device_id=device_id,
|
||||||
|
# FIXME: Currently, we're just manually copying the fields from the
|
||||||
|
# `SlidingSyncBody` into the config. How can we gurantee into the future
|
||||||
|
# that we don't forget any? I would like something more structured like
|
||||||
|
# `copy_attributes(from=body, to=config)`
|
||||||
|
lists=body.lists,
|
||||||
|
room_subscriptions=body.room_subscriptions,
|
||||||
|
extensions=body.extensions,
|
||||||
|
)
|
||||||
|
|
||||||
|
sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user(
|
||||||
|
requester,
|
||||||
|
sync_config,
|
||||||
|
from_token,
|
||||||
|
timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
# The client may have disconnected by now; don't bother to serialize the
|
||||||
|
# response if so.
|
||||||
|
if request._disconnected:
|
||||||
|
logger.info("Client has disconnected; not serializing response.")
|
||||||
|
return 200, {}
|
||||||
|
|
||||||
|
response_content = await self.encode_response(sliding_sync_results)
|
||||||
|
|
||||||
|
return 200, response_content
|
||||||
|
|
||||||
|
# TODO: Is there a better way to encode things?
|
||||||
|
async def encode_response(
|
||||||
|
self,
|
||||||
|
sliding_sync_result: SlidingSyncResult,
|
||||||
|
) -> JsonDict:
|
||||||
|
response: JsonDict = defaultdict(dict)
|
||||||
|
|
||||||
|
response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store)
|
||||||
|
serialized_lists = self.encode_lists(sliding_sync_result.lists)
|
||||||
|
if serialized_lists:
|
||||||
|
response["lists"] = serialized_lists
|
||||||
|
response["rooms"] = {} # TODO: sliding_sync_result.rooms
|
||||||
|
response["extensions"] = {} # TODO: sliding_sync_result.extensions
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def encode_lists(
|
||||||
|
self, lists: Dict[str, SlidingSyncResult.SlidingWindowList]
|
||||||
|
) -> JsonDict:
|
||||||
|
def encode_operation(
|
||||||
|
operation: SlidingSyncResult.SlidingWindowList.Operation,
|
||||||
|
) -> JsonDict:
|
||||||
|
return {
|
||||||
|
"op": operation.op.value,
|
||||||
|
"range": operation.range,
|
||||||
|
"room_ids": operation.room_ids,
|
||||||
|
}
|
||||||
|
|
||||||
|
serialized_lists = {}
|
||||||
|
for list_key, list_result in lists.items():
|
||||||
|
serialized_lists[list_key] = {
|
||||||
|
"count": list_result.count,
|
||||||
|
"ops": [encode_operation(op) for op in list_result.ops],
|
||||||
|
}
|
||||||
|
|
||||||
|
return serialized_lists
|
||||||
|
|
||||||
|
|
||||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||||
SyncRestServlet(hs).register(http_server)
|
SyncRestServlet(hs).register(http_server)
|
||||||
|
|
||||||
if hs.config.experimental.msc3575_enabled:
|
if hs.config.experimental.msc3575_enabled:
|
||||||
|
SlidingSyncRestServlet(hs).register(http_server)
|
||||||
SlidingSyncE2eeRestServlet(hs).register(http_server)
|
SlidingSyncE2eeRestServlet(hs).register(http_server)
|
||||||
|
|
|
@ -109,6 +109,7 @@ from synapse.handlers.room_summary import RoomSummaryHandler
|
||||||
from synapse.handlers.search import SearchHandler
|
from synapse.handlers.search import SearchHandler
|
||||||
from synapse.handlers.send_email import SendEmailHandler
|
from synapse.handlers.send_email import SendEmailHandler
|
||||||
from synapse.handlers.set_password import SetPasswordHandler
|
from synapse.handlers.set_password import SetPasswordHandler
|
||||||
|
from synapse.handlers.sliding_sync import SlidingSyncHandler
|
||||||
from synapse.handlers.sso import SsoHandler
|
from synapse.handlers.sso import SsoHandler
|
||||||
from synapse.handlers.stats import StatsHandler
|
from synapse.handlers.stats import StatsHandler
|
||||||
from synapse.handlers.sync import SyncHandler
|
from synapse.handlers.sync import SyncHandler
|
||||||
|
@ -554,6 +555,9 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
def get_sync_handler(self) -> SyncHandler:
|
def get_sync_handler(self) -> SyncHandler:
|
||||||
return SyncHandler(self)
|
return SyncHandler(self)
|
||||||
|
|
||||||
|
def get_sliding_sync_handler(self) -> SlidingSyncHandler:
|
||||||
|
return SlidingSyncHandler(self)
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_room_list_handler(self) -> RoomListHandler:
|
def get_room_list_handler(self) -> RoomListHandler:
|
||||||
return RoomListHandler(self)
|
return RoomListHandler(self)
|
||||||
|
|
1118
tests/handlers/test_sliding_sync.py
Normal file
1118
tests/handlers/test_sliding_sync.py
Normal file
File diff suppressed because it is too large
Load diff
|
@ -34,7 +34,7 @@ from synapse.api.constants import (
|
||||||
)
|
)
|
||||||
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
|
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
@ -1204,3 +1204,135 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"])
|
self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"])
|
||||||
self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"])
|
self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"])
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||||
|
"""
|
||||||
|
Tests regarding MSC3575 Sliding Sync `/sync` endpoint.
|
||||||
|
"""
|
||||||
|
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
sync.register_servlets,
|
||||||
|
devices.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def default_config(self) -> JsonDict:
|
||||||
|
config = super().default_config()
|
||||||
|
# Enable sliding sync
|
||||||
|
config["experimental_features"] = {"msc3575_enabled": True}
|
||||||
|
return config
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync"
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
self.event_sources = hs.get_event_sources()
|
||||||
|
|
||||||
|
def test_sync_list(self) -> None:
|
||||||
|
"""
|
||||||
|
Test that room IDs show up in the Sliding Sync lists
|
||||||
|
"""
|
||||||
|
alice_user_id = self.register_user("alice", "correcthorse")
|
||||||
|
alice_access_token = self.login(alice_user_id, "correcthorse")
|
||||||
|
|
||||||
|
room_id = self.helper.create_room_as(
|
||||||
|
alice_user_id, tok=alice_access_token, is_public=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make the Sliding Sync request
|
||||||
|
channel = self.make_request(
|
||||||
|
"POST",
|
||||||
|
self.sync_endpoint,
|
||||||
|
{
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [[0, 99]],
|
||||||
|
"sort": ["by_notification_level", "by_recency", "by_name"],
|
||||||
|
"required_state": [
|
||||||
|
["m.room.join_rules", ""],
|
||||||
|
["m.room.history_visibility", ""],
|
||||||
|
["m.space.child", "*"],
|
||||||
|
],
|
||||||
|
"timeline_limit": 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
access_token=alice_access_token,
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
|
||||||
|
# Make sure it has the foo-list we requested
|
||||||
|
self.assertListEqual(
|
||||||
|
list(channel.json_body["lists"].keys()),
|
||||||
|
["foo-list"],
|
||||||
|
channel.json_body["lists"].keys(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure the list includes the room we are joined to
|
||||||
|
self.assertListEqual(
|
||||||
|
list(channel.json_body["lists"]["foo-list"]["ops"]),
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"op": "SYNC",
|
||||||
|
"range": [0, 99],
|
||||||
|
"room_ids": [room_id],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
channel.json_body["lists"]["foo-list"],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_wait_for_sync_token(self) -> None:
|
||||||
|
"""
|
||||||
|
Test that worker will wait until it catches up to the given token
|
||||||
|
"""
|
||||||
|
alice_user_id = self.register_user("alice", "correcthorse")
|
||||||
|
alice_access_token = self.login(alice_user_id, "correcthorse")
|
||||||
|
|
||||||
|
# Create a future token that will cause us to wait. Since we never send a new
|
||||||
|
# event to reach that future stream_ordering, the worker will wait until the
|
||||||
|
# full timeout.
|
||||||
|
current_token = self.event_sources.get_current_token()
|
||||||
|
future_position_token = current_token.copy_and_replace(
|
||||||
|
StreamKeyType.ROOM,
|
||||||
|
RoomStreamToken(stream=current_token.room_key.stream + 1),
|
||||||
|
)
|
||||||
|
|
||||||
|
future_position_token_serialized = self.get_success(
|
||||||
|
future_position_token.to_string(self.store)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make the Sliding Sync request
|
||||||
|
channel = self.make_request(
|
||||||
|
"POST",
|
||||||
|
self.sync_endpoint + f"?pos={future_position_token_serialized}",
|
||||||
|
{
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [[0, 99]],
|
||||||
|
"sort": ["by_notification_level", "by_recency", "by_name"],
|
||||||
|
"required_state": [
|
||||||
|
["m.room.join_rules", ""],
|
||||||
|
["m.room.history_visibility", ""],
|
||||||
|
["m.space.child", "*"],
|
||||||
|
],
|
||||||
|
"timeline_limit": 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
access_token=alice_access_token,
|
||||||
|
await_result=False,
|
||||||
|
)
|
||||||
|
# Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)`
|
||||||
|
# timeout
|
||||||
|
with self.assertRaises(TimedOutException):
|
||||||
|
channel.await_result(timeout_ms=9900)
|
||||||
|
channel.await_result(timeout_ms=200)
|
||||||
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
|
||||||
|
# We expect the `next_pos` in the result to be the same as what we requested
|
||||||
|
# with because we weren't able to find anything new yet.
|
||||||
|
self.assertEqual(
|
||||||
|
channel.json_body["next_pos"], future_position_token_serialized
|
||||||
|
)
|
||||||
|
|
|
@ -330,9 +330,12 @@ class RestHelper:
|
||||||
data,
|
data,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert channel.code == expect_code, "Expected: %d, got: %d, resp: %r" % (
|
assert (
|
||||||
|
channel.code == expect_code
|
||||||
|
), "Expected: %d, got: %d, PUT %s -> resp: %r" % (
|
||||||
expect_code,
|
expect_code,
|
||||||
channel.code,
|
channel.code,
|
||||||
|
path,
|
||||||
channel.result["body"],
|
channel.result["body"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue