mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-12 04:52:26 +01:00
SSS: Implement PREVIOUSLY room tracking (#17535)
Implement tracking of rooms that have had updates that have not been sent down to clients. Simplified Sliding Sync (SSS)
This commit is contained in:
parent
11db575218
commit
44ac2aa3b6
3 changed files with 53 additions and 88 deletions
1
changelog.d/17535.bugfix
Normal file
1
changelog.d/17535.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately.
|
|
@ -543,6 +543,9 @@ class SlidingSyncHandler:
|
||||||
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
||||||
# Keep track of the rooms that we can display and need to fetch more info about
|
# Keep track of the rooms that we can display and need to fetch more info about
|
||||||
relevant_room_map: Dict[str, RoomSyncConfig] = {}
|
relevant_room_map: Dict[str, RoomSyncConfig] = {}
|
||||||
|
# The set of room IDs of all rooms that could appear in any list. These
|
||||||
|
# include rooms that are outside the list ranges.
|
||||||
|
all_rooms: Set[str] = set()
|
||||||
if has_lists and sync_config.lists is not None:
|
if has_lists and sync_config.lists is not None:
|
||||||
with start_active_span("assemble_sliding_window_lists"):
|
with start_active_span("assemble_sliding_window_lists"):
|
||||||
sync_room_map = await self.filter_rooms_relevant_for_sync(
|
sync_room_map = await self.filter_rooms_relevant_for_sync(
|
||||||
|
@ -561,11 +564,6 @@ class SlidingSyncHandler:
|
||||||
to_token,
|
to_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Sort the list
|
|
||||||
sorted_room_info = await self.sort_rooms(
|
|
||||||
filtered_sync_room_map, to_token
|
|
||||||
)
|
|
||||||
|
|
||||||
# Find which rooms are partially stated and may need to be filtered out
|
# Find which rooms are partially stated and may need to be filtered out
|
||||||
# depending on the `required_state` requested (see below).
|
# depending on the `required_state` requested (see below).
|
||||||
partial_state_room_map = (
|
partial_state_room_map = (
|
||||||
|
@ -586,6 +584,23 @@ class SlidingSyncHandler:
|
||||||
and StateValues.LAZY in membership_state_keys
|
and StateValues.LAZY in membership_state_keys
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not lazy_loading:
|
||||||
|
# Exclude partially-stated rooms unless the `required_state`
|
||||||
|
# only has `["m.room.member", "$LAZY"]` for membership
|
||||||
|
# (lazy-loading room members).
|
||||||
|
filtered_sync_room_map = {
|
||||||
|
room_id: room
|
||||||
|
for room_id, room in filtered_sync_room_map.items()
|
||||||
|
if not partial_state_room_map.get(room_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
all_rooms.update(filtered_sync_room_map)
|
||||||
|
|
||||||
|
# Sort the list
|
||||||
|
sorted_room_info = await self.sort_rooms(
|
||||||
|
filtered_sync_room_map, to_token
|
||||||
|
)
|
||||||
|
|
||||||
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
|
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
|
||||||
if list_config.ranges:
|
if list_config.ranges:
|
||||||
for range in list_config.ranges:
|
for range in list_config.ranges:
|
||||||
|
@ -603,15 +618,6 @@ class SlidingSyncHandler:
|
||||||
if len(room_ids_in_list) >= max_num_rooms:
|
if len(room_ids_in_list) >= max_num_rooms:
|
||||||
break
|
break
|
||||||
|
|
||||||
# Exclude partially-stated rooms unless the `required_state`
|
|
||||||
# only has `["m.room.member", "$LAZY"]` for membership
|
|
||||||
# (lazy-loading room members).
|
|
||||||
if (
|
|
||||||
partial_state_room_map.get(room_id)
|
|
||||||
and not lazy_loading
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Take the superset of the `RoomSyncConfig` for each room.
|
# Take the superset of the `RoomSyncConfig` for each room.
|
||||||
#
|
#
|
||||||
# Update our `relevant_room_map` with the room we're going
|
# Update our `relevant_room_map` with the room we're going
|
||||||
|
@ -664,6 +670,8 @@ class SlidingSyncHandler:
|
||||||
if not room_membership_for_user_at_to_token:
|
if not room_membership_for_user_at_to_token:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
all_rooms.add(room_id)
|
||||||
|
|
||||||
room_membership_for_user_map[room_id] = (
|
room_membership_for_user_map[room_id] = (
|
||||||
room_membership_for_user_at_to_token
|
room_membership_for_user_at_to_token
|
||||||
)
|
)
|
||||||
|
@ -771,12 +779,40 @@ class SlidingSyncHandler:
|
||||||
)
|
)
|
||||||
|
|
||||||
if has_lists or has_room_subscriptions:
|
if has_lists or has_room_subscriptions:
|
||||||
|
# We now calculate if any rooms outside the range have had updates,
|
||||||
|
# which we are not sending down.
|
||||||
|
#
|
||||||
|
# We *must* record rooms that have had updates, but it is also fine
|
||||||
|
# to record rooms as having updates even if there might not actually
|
||||||
|
# be anything new for the user (e.g. due to event filters, events
|
||||||
|
# having happened after the user left, etc).
|
||||||
|
unsent_room_ids = []
|
||||||
|
if from_token:
|
||||||
|
# The set of rooms that the client (may) care about, but aren't
|
||||||
|
# in any list range (or subscribed to).
|
||||||
|
missing_rooms = all_rooms - relevant_room_map.keys()
|
||||||
|
|
||||||
|
# We now just go and try fetching any events in the above rooms
|
||||||
|
# to see if anything has happened since the `from_token`.
|
||||||
|
#
|
||||||
|
# TODO: Replace this with something faster. When we land the
|
||||||
|
# sliding sync tables that record the most recent event
|
||||||
|
# positions we can use that.
|
||||||
|
missing_event_map_by_room = (
|
||||||
|
await self.store.get_room_events_stream_for_rooms(
|
||||||
|
room_ids=missing_rooms,
|
||||||
|
from_key=to_token.room_key,
|
||||||
|
to_key=from_token.stream_token.room_key,
|
||||||
|
limit=1,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
unsent_room_ids = list(missing_event_map_by_room)
|
||||||
|
|
||||||
connection_position = await self.connection_store.record_rooms(
|
connection_position = await self.connection_store.record_rooms(
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
from_token=from_token,
|
from_token=from_token,
|
||||||
sent_room_ids=relevant_rooms_to_send_map.keys(),
|
sent_room_ids=relevant_rooms_to_send_map.keys(),
|
||||||
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
|
unsent_room_ids=unsent_room_ids,
|
||||||
unsent_room_ids=[],
|
|
||||||
)
|
)
|
||||||
elif from_token:
|
elif from_token:
|
||||||
connection_position = from_token.connection_position
|
connection_position = from_token.connection_position
|
||||||
|
|
|
@ -21,8 +21,6 @@ import synapse.rest.admin
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.rest.client import login, room, sync
|
from synapse.rest.client import login, room, sync
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import SlidingSyncStreamToken
|
|
||||||
from synapse.types.handlers import SlidingSyncConfig
|
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
||||||
|
@ -130,7 +128,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
|
||||||
self.helper.send(room_id1, "msg", tok=user1_tok)
|
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||||
|
|
||||||
timeline_limit = 5
|
timeline_limit = 5
|
||||||
conn_id = "conn_id"
|
|
||||||
sync_body = {
|
sync_body = {
|
||||||
"lists": {
|
"lists": {
|
||||||
"foo-list": {
|
"foo-list": {
|
||||||
|
@ -170,40 +167,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
|
||||||
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
|
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
|
||||||
)
|
)
|
||||||
|
|
||||||
# FIXME: This is a hack to record that the first room wasn't sent down
|
|
||||||
# sync, as we don't implement that currently.
|
|
||||||
sliding_sync_handler = self.hs.get_sliding_sync_handler()
|
|
||||||
requester = self.get_success(
|
|
||||||
self.hs.get_auth().get_user_by_access_token(user1_tok)
|
|
||||||
)
|
|
||||||
sync_config = SlidingSyncConfig(
|
|
||||||
user=requester.user,
|
|
||||||
requester=requester,
|
|
||||||
conn_id=conn_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
parsed_initial_from_token = self.get_success(
|
|
||||||
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
|
|
||||||
)
|
|
||||||
connection_position = self.get_success(
|
|
||||||
sliding_sync_handler.connection_store.record_rooms(
|
|
||||||
sync_config,
|
|
||||||
parsed_initial_from_token,
|
|
||||||
sent_room_ids=[],
|
|
||||||
unsent_room_ids=[room_id1],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# FIXME: Now fix up `from_token` with new connect position above.
|
|
||||||
parsed_from_token = self.get_success(
|
|
||||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
|
||||||
)
|
|
||||||
parsed_from_token = SlidingSyncStreamToken(
|
|
||||||
stream_token=parsed_from_token.stream_token,
|
|
||||||
connection_position=connection_position,
|
|
||||||
)
|
|
||||||
from_token = self.get_success(parsed_from_token.to_string(self.store))
|
|
||||||
|
|
||||||
# We now send another event to room1, so we should sync all the missing events.
|
# We now send another event to room1, so we should sync all the missing events.
|
||||||
resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
|
resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
|
||||||
expected_events.append(resp["event_id"])
|
expected_events.append(resp["event_id"])
|
||||||
|
@ -238,7 +201,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
|
||||||
|
|
||||||
self.helper.send(room_id1, "msg", tok=user1_tok)
|
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||||
|
|
||||||
conn_id = "conn_id"
|
|
||||||
sync_body = {
|
sync_body = {
|
||||||
"lists": {
|
"lists": {
|
||||||
"foo-list": {
|
"foo-list": {
|
||||||
|
@ -279,40 +241,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
|
||||||
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
|
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
|
||||||
)
|
)
|
||||||
|
|
||||||
# FIXME: This is a hack to record that the first room wasn't sent down
|
|
||||||
# sync, as we don't implement that currently.
|
|
||||||
sliding_sync_handler = self.hs.get_sliding_sync_handler()
|
|
||||||
requester = self.get_success(
|
|
||||||
self.hs.get_auth().get_user_by_access_token(user1_tok)
|
|
||||||
)
|
|
||||||
sync_config = SlidingSyncConfig(
|
|
||||||
user=requester.user,
|
|
||||||
requester=requester,
|
|
||||||
conn_id=conn_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
parsed_initial_from_token = self.get_success(
|
|
||||||
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
|
|
||||||
)
|
|
||||||
connection_position = self.get_success(
|
|
||||||
sliding_sync_handler.connection_store.record_rooms(
|
|
||||||
sync_config,
|
|
||||||
parsed_initial_from_token,
|
|
||||||
sent_room_ids=[],
|
|
||||||
unsent_room_ids=[room_id1],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# FIXME: Now fix up `from_token` with new connect position above.
|
|
||||||
parsed_from_token = self.get_success(
|
|
||||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
|
||||||
)
|
|
||||||
parsed_from_token = SlidingSyncStreamToken(
|
|
||||||
stream_token=parsed_from_token.stream_token,
|
|
||||||
connection_position=connection_position,
|
|
||||||
)
|
|
||||||
from_token = self.get_success(parsed_from_token.to_string(self.store))
|
|
||||||
|
|
||||||
# We now send another event to room1, so we should sync all the missing state.
|
# We now send another event to room1, so we should sync all the missing state.
|
||||||
self.helper.send(room_id1, "msg", tok=user1_tok)
|
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue