mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 12:43:50 +01:00
Only send rooms with updates down sliding sync (#17479)
Rather than always including all rooms in range. Also adds a pre-filter to rooms that checks the stream change cache to see if anything might have happened. Based on #17447 --------- Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
parent
be4a16ff44
commit
34306be5aa
5 changed files with 138 additions and 30 deletions
1
changelog.d/17479.misc
Normal file
1
changelog.d/17479.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Do not send down empty room entries down experimental sliding sync endpoint.
|
|
@ -619,6 +619,51 @@ class SlidingSyncHandler:
|
||||||
# Fetch room data
|
# Fetch room data
|
||||||
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
||||||
|
|
||||||
|
# Filter out rooms that haven't received updates and we've sent down
|
||||||
|
# previously.
|
||||||
|
if from_token:
|
||||||
|
rooms_should_send = set()
|
||||||
|
|
||||||
|
# First we check if there are rooms that match a list/room
|
||||||
|
# subscription and have updates we need to send (i.e. either because
|
||||||
|
# we haven't sent the room down, or we have but there are missing
|
||||||
|
# updates).
|
||||||
|
for room_id in relevant_room_map:
|
||||||
|
status = await self.connection_store.have_sent_room(
|
||||||
|
sync_config,
|
||||||
|
from_token.connection_position,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
# The room was never sent down before so the client needs to know
|
||||||
|
# about it regardless of any updates.
|
||||||
|
status.status == HaveSentRoomFlag.NEVER
|
||||||
|
# `PREVIOUSLY` literally means the "room was sent down before *AND*
|
||||||
|
# there are updates we haven't sent down" so we already know this
|
||||||
|
# room has updates.
|
||||||
|
or status.status == HaveSentRoomFlag.PREVIOUSLY
|
||||||
|
):
|
||||||
|
rooms_should_send.add(room_id)
|
||||||
|
elif status.status == HaveSentRoomFlag.LIVE:
|
||||||
|
# We know that we've sent all updates up until `from_token`,
|
||||||
|
# so we just need to check if there have been updates since
|
||||||
|
# then.
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
assert_never(status.status)
|
||||||
|
|
||||||
|
# We only need to check for new events since any state changes
|
||||||
|
# will also come down as new events.
|
||||||
|
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
|
||||||
|
relevant_room_map.keys(), from_token.stream_token.room_key
|
||||||
|
)
|
||||||
|
rooms_should_send.update(rooms_that_have_updates)
|
||||||
|
relevant_room_map = {
|
||||||
|
room_id: room_sync_config
|
||||||
|
for room_id, room_sync_config in relevant_room_map.items()
|
||||||
|
if room_id in rooms_should_send
|
||||||
|
}
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
@tag_args
|
@tag_args
|
||||||
async def handle_room(room_id: str) -> None:
|
async def handle_room(room_id: str) -> None:
|
||||||
|
@ -633,7 +678,9 @@ class SlidingSyncHandler:
|
||||||
to_token=to_token,
|
to_token=to_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
rooms[room_id] = room_sync_result
|
# Filter out empty room results during incremental sync
|
||||||
|
if room_sync_result or not from_token:
|
||||||
|
rooms[room_id] = room_sync_result
|
||||||
|
|
||||||
with start_active_span("sliding_sync.generate_room_entries"):
|
with start_active_span("sliding_sync.generate_room_entries"):
|
||||||
await concurrently_execute(handle_room, relevant_room_map, 10)
|
await concurrently_execute(handle_room, relevant_room_map, 10)
|
||||||
|
@ -2198,7 +2245,7 @@ class SlidingSyncConnectionStore:
|
||||||
a connection position of 5 might have totally different states on worker A and
|
a connection position of 5 might have totally different states on worker A and
|
||||||
worker B.
|
worker B.
|
||||||
|
|
||||||
One complication that we need to deal with here is needing to handle requests being
|
One complication that we need to deal with here is needing to handle requests being
|
||||||
resent, i.e. if we sent down a room in a response that the client received, we must
|
resent, i.e. if we sent down a room in a response that the client received, we must
|
||||||
consider the room *not* sent when we get the request again.
|
consider the room *not* sent when we get the request again.
|
||||||
|
|
||||||
|
|
|
@ -2104,3 +2104,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
return RoomStreamToken(stream=last_position.stream - 1)
|
return RoomStreamToken(stream=last_position.stream - 1)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def get_rooms_that_might_have_updates(
|
||||||
|
self, room_ids: StrCollection, from_token: RoomStreamToken
|
||||||
|
) -> StrCollection:
|
||||||
|
"""Filters given room IDs down to those that might have updates, i.e.
|
||||||
|
removes rooms that definitely do not have updates.
|
||||||
|
"""
|
||||||
|
return self._events_stream_cache.get_entities_changed(
|
||||||
|
room_ids, from_token.stream
|
||||||
|
)
|
||||||
|
|
|
@ -238,6 +238,17 @@ class SlidingSyncResult:
|
||||||
notification_count: int
|
notification_count: int
|
||||||
highlight_count: int
|
highlight_count: int
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return (
|
||||||
|
# If this is the first time the client is seeing the room, we should not filter it out
|
||||||
|
# under any circumstance.
|
||||||
|
self.initial
|
||||||
|
# We need to let the client know if there are any new events
|
||||||
|
or bool(self.required_state)
|
||||||
|
or bool(self.timeline_events)
|
||||||
|
or bool(self.stripped_state)
|
||||||
|
)
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
class SlidingWindowList:
|
class SlidingWindowList:
|
||||||
"""
|
"""
|
||||||
|
@ -367,7 +378,11 @@ class SlidingSyncResult:
|
||||||
to tell if the notifier needs to wait for more events when polling for
|
to tell if the notifier needs to wait for more events when polling for
|
||||||
events.
|
events.
|
||||||
"""
|
"""
|
||||||
return bool(self.lists or self.rooms or self.extensions)
|
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
|
||||||
|
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
|
||||||
|
# the latest activity, anything that would cause the order to change would end
|
||||||
|
# up in `self.rooms` and cause us to send down the change.
|
||||||
|
return bool(self.rooms or self.extensions)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
|
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
|
||||||
|
|
|
@ -69,7 +69,6 @@ from tests.federation.transport.test_knocking import (
|
||||||
)
|
)
|
||||||
from tests.server import TimedOutException
|
from tests.server import TimedOutException
|
||||||
from tests.test_utils.event_injection import create_event, mark_event_as_partial_state
|
from tests.test_utils.event_injection import create_event, mark_event_as_partial_state
|
||||||
from tests.unittest import skip_unless
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -1656,12 +1655,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||||
channel.json_body["rooms"][room_id]["timeline"],
|
channel.json_body["rooms"][room_id]["timeline"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
|
|
||||||
# check if there are any updates since the `from_token`.
|
|
||||||
@skip_unless(
|
|
||||||
False,
|
|
||||||
"Once we remove ops from the Sliding Sync response, this test should pass",
|
|
||||||
)
|
|
||||||
def test_wait_for_new_data_timeout(self) -> None:
|
def test_wait_for_new_data_timeout(self) -> None:
|
||||||
"""
|
"""
|
||||||
Test to make sure that the Sliding Sync request waits for new data to arrive but
|
Test to make sure that the Sliding Sync request waits for new data to arrive but
|
||||||
|
@ -1711,12 +1704,8 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||||
channel.await_result(timeout_ms=1200)
|
channel.await_result(timeout_ms=1200)
|
||||||
self.assertEqual(channel.code, 200, channel.json_body)
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
|
||||||
# We still see rooms because that's how Sliding Sync lists work but we reached
|
# There should be no room sent down.
|
||||||
# the timeout before seeing them
|
self.assertFalse(channel.json_body["rooms"])
|
||||||
self.assertEqual(
|
|
||||||
[event["event_id"] for event in channel.json_body["rooms"].keys()],
|
|
||||||
[room_id],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_filter_list(self) -> None:
|
def test_filter_list(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -3556,19 +3545,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||||
|
|
||||||
# Nothing to see for this banned user in the room in the token range
|
# Nothing to see for this banned user in the room in the token range
|
||||||
self.assertIsNone(response_body["rooms"][room_id1].get("timeline"))
|
self.assertIsNone(response_body["rooms"].get(room_id1))
|
||||||
# No events returned in the timeline so nothing is "live"
|
|
||||||
self.assertEqual(
|
|
||||||
response_body["rooms"][room_id1]["num_live"],
|
|
||||||
0,
|
|
||||||
response_body["rooms"][room_id1],
|
|
||||||
)
|
|
||||||
# There aren't anymore events to paginate to in this range
|
|
||||||
self.assertEqual(
|
|
||||||
response_body["rooms"][room_id1]["limited"],
|
|
||||||
False,
|
|
||||||
response_body["rooms"][room_id1],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_rooms_no_required_state(self) -> None:
|
def test_rooms_no_required_state(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -3668,12 +3645,15 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||||
# This one doesn't exist in the room
|
# This one doesn't exist in the room
|
||||||
[EventTypes.Tombstone, ""],
|
[EventTypes.Tombstone, ""],
|
||||||
],
|
],
|
||||||
"timeline_limit": 0,
|
"timeline_limit": 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||||
|
|
||||||
|
# Send a message so the room comes down sync.
|
||||||
|
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||||
|
|
||||||
# Make the incremental Sliding Sync request
|
# Make the incremental Sliding Sync request
|
||||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||||
|
|
||||||
|
@ -4880,6 +4860,61 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||||
self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
|
self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
|
||||||
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
|
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
|
||||||
|
|
||||||
|
def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
|
||||||
|
"""
|
||||||
|
Test that rooms with no updates are returned in subsequent incremental
|
||||||
|
syncs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||||
|
|
||||||
|
sync_body = {
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [[0, 1]],
|
||||||
|
"required_state": [],
|
||||||
|
"timeline_limit": 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||||
|
|
||||||
|
# Make the incremental Sliding Sync request
|
||||||
|
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||||
|
|
||||||
|
# Nothing has happened in the room, so the room should not come down
|
||||||
|
# /sync.
|
||||||
|
self.assertIsNone(response_body["rooms"].get(room_id1))
|
||||||
|
|
||||||
|
def test_empty_initial_room_comes_down_sync(self) -> None:
|
||||||
|
"""
|
||||||
|
Test that rooms come down /sync even with empty required state and
|
||||||
|
timeline limit in initial sync.
|
||||||
|
"""
|
||||||
|
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||||
|
|
||||||
|
sync_body = {
|
||||||
|
"lists": {
|
||||||
|
"foo-list": {
|
||||||
|
"ranges": [[0, 1]],
|
||||||
|
"required_state": [],
|
||||||
|
"timeline_limit": 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Make the Sliding Sync request
|
||||||
|
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||||
|
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
|
||||||
|
|
||||||
|
|
||||||
class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
|
class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
|
||||||
"""Tests for the to-device sliding sync extension"""
|
"""Tests for the to-device sliding sync extension"""
|
||||||
|
|
Loading…
Reference in a new issue