0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-12 04:52:26 +01:00

Sliding Sync: Make PerConnectionState immutable (#17600)

This is so that we can cache it.

We also move the sliding sync types to
`synapse/types/handlers/sliding_sync.py`. This is mainly in-prep for

The only change in behaviour is that
`RoomSyncConfig.combine_sync_config(..)` now returns a new room sync
config rather than mutating in-place.

Reviewable commit-by-commit.

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
Erik Johnston 2024-08-29 16:22:57 +01:00 committed by Erik Johnston
parent 8678516e79
commit dab88a7b1f
9 changed files with 444 additions and 445 deletions

1
changelog.d/17600.misc Normal file
View file

@ -0,0 +1 @@
Make the sliding sync `PerConnectionState` class immutable.

View file

@ -38,6 +38,7 @@ from mypy.types import (
NoneType, NoneType,
TupleType, TupleType,
TypeAliasType, TypeAliasType,
TypeVarType,
UninhabitedType, UninhabitedType,
UnionType, UnionType,
) )
@ -233,6 +234,7 @@ IMMUTABLE_CUSTOM_TYPES = {
"synapse.synapse_rust.push.FilteredPushRules", "synapse.synapse_rust.push.FilteredPushRules",
# This is technically not immutable, but close enough. # This is technically not immutable, but close enough.
"signedjson.types.VerifyKey", "signedjson.types.VerifyKey",
"synapse.types.StrCollection",
} }
# Immutable containers only if the values are also immutable. # Immutable containers only if the values are also immutable.
@ -298,7 +300,7 @@ def is_cacheable(
elif rt.type.fullname in MUTABLE_CONTAINER_TYPES: elif rt.type.fullname in MUTABLE_CONTAINER_TYPES:
# Mutable containers are mutable regardless of their underlying type. # Mutable containers are mutable regardless of their underlying type.
return False, None return False, f"container {rt.type.fullname} is mutable"
elif "attrs" in rt.type.metadata: elif "attrs" in rt.type.metadata:
# attrs classes are only cachable iff it is frozen (immutable itself) # attrs classes are only cachable iff it is frozen (immutable itself)
@ -318,6 +320,9 @@ def is_cacheable(
else: else:
return False, "non-frozen attrs class" return False, "non-frozen attrs class"
elif rt.type.is_enum:
# We assume Enum values are immutable
return True, None
else: else:
# Ensure we fail for unknown types, these generally means that the # Ensure we fail for unknown types, these generally means that the
# above code is not complete. # above code is not complete.
@ -326,6 +331,18 @@ def is_cacheable(
f"Don't know how to handle {rt.type.fullname} return type instance", f"Don't know how to handle {rt.type.fullname} return type instance",
) )
elif isinstance(rt, TypeVarType):
# We consider TypeVars immutable if they are bound to a set of immutable
# types.
if rt.values:
for value in rt.values:
ok, note = is_cacheable(value, signature, verbose)
if not ok:
return False, f"TypeVar bound not cacheable {value}"
return True, None
return False, "TypeVar is unbound"
elif isinstance(rt, NoneType): elif isinstance(rt, NoneType):
# None is cachable. # None is cachable.
return True, None return True, None

View file

@ -29,13 +29,6 @@ from synapse.handlers.sliding_sync.room_lists import (
_RoomMembershipForUser, _RoomMembershipForUser,
) )
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomSyncConfig,
StateValues,
)
from synapse.logging.opentracing import ( from synapse.logging.opentracing import (
SynapseTags, SynapseTags,
log_kv, log_kv,
@ -57,7 +50,15 @@ from synapse.types import (
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
) )
from synapse.types.handlers import SlidingSyncConfig, SlidingSyncResult from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomSyncConfig,
SlidingSyncConfig,
SlidingSyncResult,
StateValues,
)
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client

View file

@ -20,11 +20,6 @@ from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, EduTypes from synapse.api.constants import AccountDataTypes, EduTypes
from synapse.handlers.receipts import ReceiptEventSource from synapse.handlers.receipts import ReceiptEventSource
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
)
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.types import ( from synapse.types import (
@ -35,7 +30,14 @@ from synapse.types import (
StrCollection, StrCollection,
StreamToken, StreamToken,
) )
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
MutablePerConnectionState,
OperationType,
PerConnectionState,
SlidingSyncConfig,
SlidingSyncResult,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer

View file

@ -40,11 +40,6 @@ from synapse.api.constants import (
) )
from synapse.events import StrippedStateEvent from synapse.events import StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event from synapse.events.utils import parse_stripped_state_event
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
PerConnectionState,
RoomSyncConfig,
)
from synapse.logging.opentracing import start_active_span, trace from synapse.logging.opentracing import start_active_span, trace
from synapse.storage.databases.main.state import ( from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL, ROOM_UNKNOWN_SENTINEL,
@ -61,7 +56,14 @@ from synapse.types import (
StreamToken, StreamToken,
UserID, UserID,
) )
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
OperationType,
PerConnectionState,
RoomSyncConfig,
SlidingSyncConfig,
SlidingSyncResult,
)
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
if TYPE_CHECKING: if TYPE_CHECKING:
@ -279,15 +281,11 @@ class SlidingSyncRoomLists:
room_id room_id
) )
if existing_room_sync_config is not None: if existing_room_sync_config is not None:
existing_room_sync_config.combine_room_sync_config( room_sync_config = existing_room_sync_config.combine_room_sync_config(
room_sync_config room_sync_config
) )
else:
# Make a copy so if we modify it later, it doesn't relevant_room_map[room_id] = room_sync_config
# affect all references.
relevant_room_map[room_id] = (
room_sync_config.deep_copy()
)
room_ids_in_list.append(room_id) room_ids_in_list.append(room_id)
@ -351,10 +349,12 @@ class SlidingSyncRoomLists:
# and need to fetch more info about. # and need to fetch more info about.
existing_room_sync_config = relevant_room_map.get(room_id) existing_room_sync_config = relevant_room_map.get(room_id)
if existing_room_sync_config is not None: if existing_room_sync_config is not None:
room_sync_config = (
existing_room_sync_config.combine_room_sync_config( existing_room_sync_config.combine_room_sync_config(
room_sync_config room_sync_config
) )
else: )
relevant_room_map[room_id] = room_sync_config relevant_room_map[room_id] = room_sync_config
# Filtered subset of `relevant_room_map` for rooms that may have updates # Filtered subset of `relevant_room_map` for rooms that may have updates

View file

@ -18,13 +18,13 @@ from typing import TYPE_CHECKING, Dict, Optional, Tuple
import attr import attr
from synapse.api.errors import SlidingSyncUnknownPosition from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.handlers.sliding_sync.types import (
MutablePerConnectionState,
PerConnectionState,
)
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.types import SlidingSyncStreamToken from synapse.types import SlidingSyncStreamToken
from synapse.types.handlers import SlidingSyncConfig from synapse.types.handlers.sliding_sync import (
MutablePerConnectionState,
PerConnectionState,
SlidingSyncConfig,
)
if TYPE_CHECKING: if TYPE_CHECKING:
pass pass

View file

@ -17,33 +17,9 @@
# [This file includes modifications made by New Vector Limited] # [This file includes modifications made by New Vector Limited]
# #
# #
from enum import Enum
from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple
import attr
from typing_extensions import TypedDict
from synapse._pydantic_compat import HAS_PYDANTIC_V2 from typing import List, Optional, TypedDict
if TYPE_CHECKING or HAS_PYDANTIC_V2:
from pydantic.v1 import Extra
else:
from pydantic import Extra
from synapse.events import EventBase
from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
Requester,
SlidingSyncStreamToken,
StreamToken,
UserID,
)
from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING:
from synapse.handlers.relations import BundledAggregations
class ShutdownRoomParams(TypedDict): class ShutdownRoomParams(TypedDict):
@ -101,335 +77,3 @@ class ShutdownRoomResponse(TypedDict):
failed_to_kick_users: List[str] failed_to_kick_users: List[str]
local_aliases: List[str] local_aliases: List[str]
new_room_id: Optional[str] new_room_id: Optional[str]
class SlidingSyncConfig(SlidingSyncBody):
"""
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
extra fields that we need in the handler
"""
user: UserID
requester: Requester
# Pydantic config
class Config:
# By default, ignore fields that we don't recognise.
extra = Extra.ignore
# By default, don't allow fields to be reassigned after parsing.
allow_mutation = False
# Allow custom types like `UserID` to be used in the model
arbitrary_types_allowed = True
class OperationType(Enum):
"""
Represents the operation types in a Sliding Sync window.
Attributes:
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
entries in this range.
INSERT: Sets a single entry. If the position is not empty then clients MUST move
entries to the left or the right depending on where the closest empty space is.
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
places.
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
offline support, but they should be treated as empty when additional operations
which concern indexes in the range arrive from the server.
"""
SYNC: Final = "SYNC"
INSERT: Final = "INSERT"
DELETE: Final = "DELETE"
INVALIDATE: Final = "INVALIDATE"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingSyncResult:
"""
The Sliding Sync result to be serialized to JSON for a response.
Attributes:
next_pos: The next position token in the sliding window to request (next_batch).
lists: Sliding window API. A map of list key to list results.
rooms: Room subscription API. A map of room ID to room results.
extensions: Extensions API. A map of extension key to extension results.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomResult:
"""
Attributes:
name: Room name or calculated room name.
avatar: Room avatar
heroes: List of stripped membership events (containing `user_id` and optionally
`avatar_url` and `displayname`) for the users used to calculate the room name.
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
initial: Flag which is set when this is the first time the server is sending this
data on this connection. Clients can use this flag to replace or update
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
the timeline events above. This allows clients to show accurate reaction
counts (or edits, threads), even if some of the reaction events were skipped
over in a gappy sync.
stripped_state: Stripped state events (for rooms where the usre is
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if there are more events than `timeline_limit` looking
backwards from the `response.pos` to the `request.pos`.
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
bump_stamp: The `stream_ordering` of the last event according to the
`bump_event_types`. This helps clients sort more readily without them
needing to pull in a bunch of the timeline to determine the last activity.
`bump_event_types` is a thing because for example, we don't want display
name changes to mark the room as unread and bump it to the top. For
encrypted rooms, we just have to consider any activity as a bump because we
can't see the content and the client has to figure it out for themselves.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class StrippedHero:
user_id: str
display_name: Optional[str]
avatar_url: Optional[str]
name: Optional[str]
avatar: Optional[str]
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`
timeline_events: List[EventBase]
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
# Optional because it's only relevant to invite/knock rooms
stripped_state: List[JsonDict]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
prev_batch: Optional[StreamToken]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
joined_count: int
invited_count: int
notification_count: int
highlight_count: int
def __bool__(self) -> bool:
return (
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
"""
Attributes:
count: The total number of entries in the list. Always present if this list
is.
ops: The sliding list operations to perform.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Operation:
"""
Attributes:
op: The operation type to perform.
range: Which index positions are affected by this operation. These are
both inclusive.
room_ids: Which room IDs are affected by this operation. These IDs match
up to the positions in the `range`, so the last room ID in this list
matches the 9th index. The room data is held in a separate object.
"""
op: OperationType
range: Tuple[int, int]
room_ids: List[str]
count: int
ops: List[Operation]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Extensions:
"""Responses for extensions
Attributes:
to_device: The to-device extension (MSC3885)
e2ee: The E2EE device extension (MSC3884)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ToDeviceExtension:
"""The to-device extension (MSC3885)
Attributes:
next_batch: The to-device stream token the client should use
to get more results
events: A list of to-device messages for the client
"""
next_batch: str
events: Sequence[JsonMapping]
def __bool__(self) -> bool:
return bool(self.events)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeExtension:
"""The E2EE device extension (MSC3884)
Attributes:
device_list_updates: List of user_ids whose devices have changed or left (only
present on incremental syncs).
device_one_time_keys_count: Map from key algorithm to the number of
unclaimed one-time keys currently held on the server for this device. If
an algorithm is unlisted, the count for that algorithm is assumed to be
zero. If this entire parameter is missing, the count for all algorithms
is assumed to be zero.
device_unused_fallback_key_types: List of unused fallback key algorithms
for this device.
"""
# Only present on incremental syncs
device_list_updates: Optional[DeviceListUpdates]
device_one_time_keys_count: Mapping[str, int]
device_unused_fallback_key_types: Sequence[str]
def __bool__(self) -> bool:
# Note that "signed_curve25519" is always returned in key count responses
# regardless of whether we uploaded any keys for it. This is necessary until
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
#
# Also related:
# https://github.com/element-hq/element-android/issues/3725 and
# https://github.com/matrix-org/synapse/issues/10456
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
default_otk is not None and default_otk > 0
)
return bool(
more_than_default_otk
or self.device_list_updates
or self.device_unused_fallback_key_types
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class AccountDataExtension:
"""The Account Data extension (MSC3959)
Attributes:
global_account_data_map: Mapping from `type` to `content` of global account
data events.
account_data_by_room_map: Mapping from room_id to mapping of `type` to
`content` of room account data events.
"""
global_account_data_map: Mapping[str, JsonMapping]
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
def __bool__(self) -> bool:
return bool(
self.global_account_data_map or self.account_data_by_room_map
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ReceiptsExtension:
"""The Receipts extension (MSC3960)
Attributes:
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
event (type, content)
"""
room_id_to_receipt_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_receipt_map)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingExtension:
"""The Typing Notification extension (MSC3961)
Attributes:
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
event (type, content)
"""
room_id_to_typing_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_typing_map)
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None
def __bool__(self) -> bool:
return bool(
self.to_device
or self.e2ee
or self.account_data
or self.receipts
or self.typing
)
next_pos: SlidingSyncStreamToken
lists: Mapping[str, SlidingWindowList]
rooms: Dict[str, RoomResult]
extensions: Extensions
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
"""
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
# the latest activity, anything that would cause the order to change would end
# up in `self.rooms` and cause us to send down the change.
return bool(self.rooms or self.extensions)
@staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
"Return a new empty result"
return SlidingSyncResult(
next_pos=next_pos,
lists={},
rooms={},
extensions=SlidingSyncResult.Extensions(),
)

View file

@ -18,30 +18,382 @@ from collections import ChainMap
from enum import Enum from enum import Enum
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
AbstractSet,
Callable, Callable,
Dict, Dict,
Final, Final,
Generic, Generic,
List,
Mapping, Mapping,
MutableMapping, MutableMapping,
Optional, Optional,
Sequence,
Set, Set,
Tuple,
TypeVar, TypeVar,
cast, cast,
) )
import attr import attr
from synapse._pydantic_compat import HAS_PYDANTIC_V2
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID
from synapse.types.handlers import SlidingSyncConfig
if TYPE_CHECKING or HAS_PYDANTIC_V2:
from pydantic.v1 import Extra
else:
from pydantic import Extra
from synapse.events import EventBase
from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
Requester,
SlidingSyncStreamToken,
StreamToken,
)
from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING: if TYPE_CHECKING:
pass from synapse.handlers.relations import BundledAggregations
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SlidingSyncConfig(SlidingSyncBody):
"""
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
extra fields that we need in the handler
"""
user: UserID
requester: Requester
# Pydantic config
class Config:
# By default, ignore fields that we don't recognise.
extra = Extra.ignore
# By default, don't allow fields to be reassigned after parsing.
allow_mutation = False
# Allow custom types like `UserID` to be used in the model
arbitrary_types_allowed = True
class OperationType(Enum):
"""
Represents the operation types in a Sliding Sync window.
Attributes:
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
entries in this range.
INSERT: Sets a single entry. If the position is not empty then clients MUST move
entries to the left or the right depending on where the closest empty space is.
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
places.
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
offline support, but they should be treated as empty when additional operations
which concern indexes in the range arrive from the server.
"""
SYNC: Final = "SYNC"
INSERT: Final = "INSERT"
DELETE: Final = "DELETE"
INVALIDATE: Final = "INVALIDATE"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingSyncResult:
"""
The Sliding Sync result to be serialized to JSON for a response.
Attributes:
next_pos: The next position token in the sliding window to request (next_batch).
lists: Sliding window API. A map of list key to list results.
rooms: Room subscription API. A map of room ID to room results.
extensions: Extensions API. A map of extension key to extension results.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomResult:
"""
Attributes:
name: Room name or calculated room name.
avatar: Room avatar
heroes: List of stripped membership events (containing `user_id` and optionally
`avatar_url` and `displayname`) for the users used to calculate the room name.
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
initial: Flag which is set when this is the first time the server is sending this
data on this connection. Clients can use this flag to replace or update
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
the timeline events above. This allows clients to show accurate reaction
counts (or edits, threads), even if some of the reaction events were skipped
over in a gappy sync.
stripped_state: Stripped state events (for rooms where the usre is
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if there are more events than `timeline_limit` looking
backwards from the `response.pos` to the `request.pos`.
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
bump_stamp: The `stream_ordering` of the last event according to the
`bump_event_types`. This helps clients sort more readily without them
needing to pull in a bunch of the timeline to determine the last activity.
`bump_event_types` is a thing because for example, we don't want display
name changes to mark the room as unread and bump it to the top. For
encrypted rooms, we just have to consider any activity as a bump because we
can't see the content and the client has to figure it out for themselves.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class StrippedHero:
user_id: str
display_name: Optional[str]
avatar_url: Optional[str]
name: Optional[str]
avatar: Optional[str]
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`
timeline_events: List[EventBase]
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
# Optional because it's only relevant to invite/knock rooms
stripped_state: List[JsonDict]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
prev_batch: Optional[StreamToken]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
joined_count: int
invited_count: int
notification_count: int
highlight_count: int
def __bool__(self) -> bool:
return (
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
"""
Attributes:
count: The total number of entries in the list. Always present if this list
is.
ops: The sliding list operations to perform.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Operation:
"""
Attributes:
op: The operation type to perform.
range: Which index positions are affected by this operation. These are
both inclusive.
room_ids: Which room IDs are affected by this operation. These IDs match
up to the positions in the `range`, so the last room ID in this list
matches the 9th index. The room data is held in a separate object.
"""
op: OperationType
range: Tuple[int, int]
room_ids: List[str]
count: int
ops: List[Operation]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Extensions:
"""Responses for extensions
Attributes:
to_device: The to-device extension (MSC3885)
e2ee: The E2EE device extension (MSC3884)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ToDeviceExtension:
"""The to-device extension (MSC3885)
Attributes:
next_batch: The to-device stream token the client should use
to get more results
events: A list of to-device messages for the client
"""
next_batch: str
events: Sequence[JsonMapping]
def __bool__(self) -> bool:
return bool(self.events)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeExtension:
"""The E2EE device extension (MSC3884)
Attributes:
device_list_updates: List of user_ids whose devices have changed or left (only
present on incremental syncs).
device_one_time_keys_count: Map from key algorithm to the number of
unclaimed one-time keys currently held on the server for this device. If
an algorithm is unlisted, the count for that algorithm is assumed to be
zero. If this entire parameter is missing, the count for all algorithms
is assumed to be zero.
device_unused_fallback_key_types: List of unused fallback key algorithms
for this device.
"""
# Only present on incremental syncs
device_list_updates: Optional[DeviceListUpdates]
device_one_time_keys_count: Mapping[str, int]
device_unused_fallback_key_types: Sequence[str]
def __bool__(self) -> bool:
# Note that "signed_curve25519" is always returned in key count responses
# regardless of whether we uploaded any keys for it. This is necessary until
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
#
# Also related:
# https://github.com/element-hq/element-android/issues/3725 and
# https://github.com/matrix-org/synapse/issues/10456
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
default_otk is not None and default_otk > 0
)
return bool(
more_than_default_otk
or self.device_list_updates
or self.device_unused_fallback_key_types
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class AccountDataExtension:
"""The Account Data extension (MSC3959)
Attributes:
global_account_data_map: Mapping from `type` to `content` of global account
data events.
account_data_by_room_map: Mapping from room_id to mapping of `type` to
`content` of room account data events.
"""
global_account_data_map: Mapping[str, JsonMapping]
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
def __bool__(self) -> bool:
return bool(
self.global_account_data_map or self.account_data_by_room_map
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ReceiptsExtension:
"""The Receipts extension (MSC3960)
Attributes:
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
event (type, content)
"""
room_id_to_receipt_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_receipt_map)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingExtension:
"""The Typing Notification extension (MSC3961)
Attributes:
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
event (type, content)
"""
room_id_to_typing_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_typing_map)
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None
def __bool__(self) -> bool:
return bool(
self.to_device
or self.e2ee
or self.account_data
or self.receipts
or self.typing
)
next_pos: SlidingSyncStreamToken
lists: Mapping[str, SlidingWindowList]
rooms: Dict[str, RoomResult]
extensions: Extensions
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
"""
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
# the latest activity, anything that would cause the order to change would end
# up in `self.rooms` and cause us to send down the change.
return bool(self.rooms or self.extensions)
@staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
"Return a new empty result"
return SlidingSyncResult(
next_pos=next_pos,
lists={},
rooms={},
extensions=SlidingSyncResult.Extensions(),
)
class StateValues: class StateValues:
""" """
Understood values of the (type, state_key) tuple in `required_state`. Understood values of the (type, state_key) tuple in `required_state`.
@ -60,7 +412,7 @@ class StateValues:
# We can't freeze this class because we want to update it in place with the # We can't freeze this class because we want to update it in place with the
# de-duplicated data. # de-duplicated data.
@attr.s(slots=True, auto_attribs=True) @attr.s(slots=True, auto_attribs=True, frozen=True)
class RoomSyncConfig: class RoomSyncConfig:
""" """
Holds the config for what data we should fetch for a room in the sync response. Holds the config for what data we should fetch for a room in the sync response.
@ -74,7 +426,7 @@ class RoomSyncConfig:
""" """
timeline_limit: int timeline_limit: int
required_state_map: Dict[str, Set[str]] required_state_map: Mapping[str, AbstractSet[str]]
@classmethod @classmethod
def from_room_config( def from_room_config(
@ -146,27 +498,22 @@ class RoomSyncConfig:
required_state_map=required_state_map, required_state_map=required_state_map,
) )
def deep_copy(self) -> "RoomSyncConfig":
required_state_map: Dict[str, Set[str]] = {
state_type: state_key_set.copy()
for state_type, state_key_set in self.required_state_map.items()
}
return RoomSyncConfig(
timeline_limit=self.timeline_limit,
required_state_map=required_state_map,
)
def combine_room_sync_config( def combine_room_sync_config(
self, other_room_sync_config: "RoomSyncConfig" self, other_room_sync_config: "RoomSyncConfig"
) -> None: ) -> "RoomSyncConfig":
""" """
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the Combine this `RoomSyncConfig` with another `RoomSyncConfig` and return the
superset union of the two. superset union of the two.
""" """
timeline_limit = self.timeline_limit
required_state_map = {
event_type: set(state_keys)
for event_type, state_keys in self.required_state_map.items()
}
# Take the highest timeline limit # Take the highest timeline limit
if self.timeline_limit < other_room_sync_config.timeline_limit: if timeline_limit < other_room_sync_config.timeline_limit:
self.timeline_limit = other_room_sync_config.timeline_limit timeline_limit = other_room_sync_config.timeline_limit
# Union the required state # Union the required state
for ( for (
@ -175,14 +522,14 @@ class RoomSyncConfig:
) in other_room_sync_config.required_state_map.items(): ) in other_room_sync_config.required_state_map.items():
# If we already have a wildcard for everything, we don't need to add # If we already have a wildcard for everything, we don't need to add
# anything else # anything else
if StateValues.WILDCARD in self.required_state_map.get( if StateValues.WILDCARD in required_state_map.get(
StateValues.WILDCARD, set() StateValues.WILDCARD, set()
): ):
break break
# If we already have a wildcard `state_key` for this `state_type`, we don't need # If we already have a wildcard `state_key` for this `state_type`, we don't need
# to add anything else # to add anything else
if StateValues.WILDCARD in self.required_state_map.get(state_type, set()): if StateValues.WILDCARD in required_state_map.get(state_type, set()):
continue continue
# If we're getting wildcards for the `state_type` and `state_key`, that's # If we're getting wildcards for the `state_type` and `state_key`, that's
@ -191,16 +538,14 @@ class RoomSyncConfig:
state_type == StateValues.WILDCARD state_type == StateValues.WILDCARD
and StateValues.WILDCARD in state_key_set and StateValues.WILDCARD in state_key_set
): ):
self.required_state_map = {state_type: {StateValues.WILDCARD}} required_state_map = {state_type: {StateValues.WILDCARD}}
# We can break, since we don't need to add anything else # We can break, since we don't need to add anything else
break break
for state_key in state_key_set: for state_key in state_key_set:
# If we already have a wildcard for this specific `state_key`, we don't need # If we already have a wildcard for this specific `state_key`, we don't need
# to add it since the wildcard already covers it. # to add it since the wildcard already covers it.
if state_key in self.required_state_map.get( if state_key in required_state_map.get(StateValues.WILDCARD, set()):
StateValues.WILDCARD, set()
):
continue continue
# If we're getting a wildcard for the `state_type`, get rid of any other # If we're getting a wildcard for the `state_type`, get rid of any other
@ -211,7 +556,7 @@ class RoomSyncConfig:
# Make a copy so we don't run into an error: `dictionary changed size # Make a copy so we don't run into an error: `dictionary changed size
# during iteration`, when we remove items # during iteration`, when we remove items
for existing_state_type, existing_state_key_set in list( for existing_state_type, existing_state_key_set in list(
self.required_state_map.items() required_state_map.items()
): ):
# Make a copy so we don't run into an error: `Set changed size during # Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items # iteration`, when we filter out and remove items
@ -221,19 +566,21 @@ class RoomSyncConfig:
# If we've the left the `set()` empty, remove it from the map # If we've the left the `set()` empty, remove it from the map
if existing_state_key_set == set(): if existing_state_key_set == set():
self.required_state_map.pop(existing_state_type, None) required_state_map.pop(existing_state_type, None)
# If we're getting a wildcard `state_key`, get rid of any other state_keys # If we're getting a wildcard `state_key`, get rid of any other state_keys
# for this `state_type` since the wildcard will cover it already. # for this `state_type` since the wildcard will cover it already.
if state_key == StateValues.WILDCARD: if state_key == StateValues.WILDCARD:
self.required_state_map[state_type] = {state_key} required_state_map[state_type] = {state_key}
break break
# Otherwise, just add it to the set # Otherwise, just add it to the set
else: else:
if self.required_state_map.get(state_type) is None: if required_state_map.get(state_type) is None:
self.required_state_map[state_type] = {state_key} required_state_map[state_type] = {state_key}
else: else:
self.required_state_map[state_type].add(state_key) required_state_map[state_type].add(state_key)
return RoomSyncConfig(timeline_limit, required_state_map)
def must_await_full_state( def must_await_full_state(
self, self,
@ -324,7 +671,7 @@ class HaveSentRoomFlag(Enum):
LIVE = "live" LIVE = "live"
T = TypeVar("T") T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
@attr.s(auto_attribs=True, slots=True, frozen=True) @attr.s(auto_attribs=True, slots=True, frozen=True)
@ -439,7 +786,7 @@ class MutableRoomStatusMap(RoomStatusMap[T]):
self._statuses[room_id] = HaveSentRoom.previously(from_token) self._statuses[room_id] = HaveSentRoom.previously(from_token)
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True, frozen=True)
class PerConnectionState: class PerConnectionState:
"""The per-connection state. A snapshot of what we've sent down the """The per-connection state. A snapshot of what we've sent down the
connection before. connection before.

View file

@ -18,7 +18,6 @@
# #
# #
import logging import logging
from copy import deepcopy
from typing import Dict, List, Optional from typing import Dict, List, Optional
from unittest.mock import patch from unittest.mock import patch
@ -47,7 +46,7 @@ from synapse.rest.client import knock, login, room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, StreamToken, UserID from synapse.types import JsonDict, StreamToken, UserID
from synapse.types.handlers import SlidingSyncConfig from synapse.types.handlers.sliding_sync import SlidingSyncConfig
from synapse.util import Clock from synapse.util import Clock
from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.replication._base import BaseMultiWorkerStreamTestCase
@ -566,23 +565,11 @@ class RoomSyncConfigTestCase(TestCase):
""" """
Combine A into B and B into A to make sure we get the same result. Combine A into B and B into A to make sure we get the same result.
""" """
# Since we're mutating these in place, make a copy for each of our trials combined_config = a.combine_room_sync_config(b)
room_sync_config_a = deepcopy(a) self._assert_room_config_equal(combined_config, expected, "B into A")
room_sync_config_b = deepcopy(b)
# Combine B into A combined_config = a.combine_room_sync_config(b)
room_sync_config_a.combine_room_sync_config(room_sync_config_b) self._assert_room_config_equal(combined_config, expected, "A into B")
self._assert_room_config_equal(room_sync_config_a, expected, "B into A")
# Since we're mutating these in place, make a copy for each of our trials
room_sync_config_a = deepcopy(a)
room_sync_config_b = deepcopy(b)
# Combine A into B
room_sync_config_b.combine_room_sync_config(room_sync_config_a)
self._assert_room_config_equal(room_sync_config_b, expected, "A into B")
class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase): class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):