mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-12 04:52:26 +01:00
SS: Reset connection if token is unrecognized (#17529)
This triggers the client to start a new sliding sync connection. If we don't do this and the client asks for the full range of rooms, we end up sending down all rooms and their state from scratch (which can be very slow) This causes things like https://github.com/element-hq/element-x-ios/issues/3115 after we restart the server --------- Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
parent
e3db7b2d81
commit
c270355349
4 changed files with 57 additions and 17 deletions
1
changelog.d/17529.misc
Normal file
1
changelog.d/17529.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Reset the sliding sync connection if we don't recognize the per-connection state position.
|
|
@ -128,6 +128,10 @@ class Codes(str, Enum):
|
|||
# MSC2677
|
||||
DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION"
|
||||
|
||||
# MSC3575 we are telling the client they need to reset their sliding sync
|
||||
# connection.
|
||||
UNKNOWN_POS = "M_UNKNOWN_POS"
|
||||
|
||||
|
||||
class CodeMessageException(RuntimeError):
|
||||
"""An exception with integer code, a message string attributes and optional headers.
|
||||
|
@ -847,3 +851,17 @@ class PartialStateConflictError(SynapseError):
|
|||
msg=PartialStateConflictError.message(),
|
||||
errcode=Codes.UNKNOWN,
|
||||
)
|
||||
|
||||
|
||||
class SlidingSyncUnknownPosition(SynapseError):
|
||||
"""An error that Synapse can return to signal to the client to expire their
|
||||
sliding sync connection (i.e. send a new request without a `?since=`
|
||||
param).
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
msg="Unknown position",
|
||||
errcode=Codes.UNKNOWN_POS,
|
||||
)
|
||||
|
|
|
@ -47,6 +47,7 @@ from synapse.api.constants import (
|
|||
EventTypes,
|
||||
Membership,
|
||||
)
|
||||
from synapse.api.errors import SlidingSyncUnknownPosition
|
||||
from synapse.events import EventBase, StrippedStateEvent
|
||||
from synapse.events.utils import parse_stripped_state_event, strip_event
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
|
@ -491,6 +492,22 @@ class SlidingSyncHandler:
|
|||
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||
raise NotImplementedError()
|
||||
|
||||
if from_token:
|
||||
# Check that we recognize the connection position, if not tell the
|
||||
# clients that they need to start again.
|
||||
#
|
||||
# If we don't do this and the client asks for the full range of
|
||||
# rooms, we end up sending down all rooms and their state from
|
||||
# scratch (which can be very slow). By expiring the connection we
|
||||
# allow the client a chance to do an initial request with a smaller
|
||||
# range of rooms to get them some results sooner but will end up
|
||||
# taking the same amount of time (more with round-trips and
|
||||
# re-processing) in the end to get everything again.
|
||||
if not await self.connection_store.is_valid_token(
|
||||
sync_config, from_token.connection_position
|
||||
):
|
||||
raise SlidingSyncUnknownPosition()
|
||||
|
||||
await self.connection_store.mark_token_seen(
|
||||
sync_config=sync_config,
|
||||
from_token=from_token,
|
||||
|
@ -2821,6 +2838,16 @@ class SlidingSyncConnectionStore:
|
|||
attr.Factory(dict)
|
||||
)
|
||||
|
||||
async def is_valid_token(
|
||||
self, sync_config: SlidingSyncConfig, connection_token: int
|
||||
) -> bool:
|
||||
"""Return whether the connection token is valid/recognized"""
|
||||
if connection_token == 0:
|
||||
return True
|
||||
|
||||
conn_key = self._get_connection_key(sync_config)
|
||||
return connection_token in self._connections.get(conn_key, {})
|
||||
|
||||
async def have_sent_room(
|
||||
self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
|
||||
) -> HaveSentRoom:
|
||||
|
|
|
@ -161,10 +161,10 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
|||
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
|
||||
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
|
||||
|
||||
def test_rooms_required_state_incremental_sync_restart(self) -> None:
|
||||
def test_rooms_incremental_sync_restart(self) -> None:
|
||||
"""
|
||||
Test `rooms.required_state` returns requested state events in the room during an
|
||||
incremental sync, after a restart (and so the in memory caches are reset).
|
||||
Test that after a restart (and so the in memory caches are reset) that
|
||||
we correctly return an `M_UNKNOWN_POS`
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
|
@ -195,22 +195,16 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
|||
self.hs.get_sliding_sync_handler().connection_store._connections.clear()
|
||||
|
||||
# Make the Sliding Sync request
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
# If the cache has been cleared then we do expect the state to come down
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
channel = self.make_request(
|
||||
method="POST",
|
||||
path=self.sync_endpoint + f"?pos={from_token}",
|
||||
content=sync_body,
|
||||
access_token=user1_tok,
|
||||
)
|
||||
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
state_map[(EventTypes.RoomHistoryVisibility, "")],
|
||||
},
|
||||
exact=True,
|
||||
self.assertEqual(channel.code, 400, channel.json_body)
|
||||
self.assertEqual(
|
||||
channel.json_body["errcode"], "M_UNKNOWN_POS", channel.json_body
|
||||
)
|
||||
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
|
||||
|
||||
def test_rooms_required_state_wildcard(self) -> None:
|
||||
"""
|
||||
|
|
Loading…
Reference in a new issue