0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-05-18 11:33:45 +02:00

Handle remote device list updates during partial join (#13913)

c.f. #12993 (comment), point 3

This stores all device list updates that we receive while partial joins are ongoing, and processes them once we have the full state.

Note: We don't actually process the device lists in the same ways as if we weren't partially joined. Instead of updating the device list remote cache, we simply notify local users that a change in the remote user's devices has happened. I think this is safe as if the local user requests the keys for the remote user and we don't have them we'll simply fetch them as normal.
This commit is contained in:
Erik Johnston 2022-09-28 14:42:43 +01:00 committed by GitHub
parent 6caa303083
commit 4b17a5ace8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 170 additions and 0 deletions

1
changelog.d/13913.misc Normal file
View file

@ -0,0 +1 @@
Faster remote room joins: correctly handle remote device list updates during a partial join.

View file

@ -309,6 +309,17 @@ class DeviceWorkerHandler:
"self_signing_key": self_signing_key,
}
async def handle_room_un_partial_stated(self, room_id: str) -> None:
"""Handles sending appropriate device list updates in a room that has
gone from partial to full state.
"""
# TODO(faster_joins): worker mode support
# https://github.com/matrix-org/synapse/issues/12994
logger.error(
"Trying handling device list state for partial join: not supported on workers."
)
class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs: "HomeServer"):
@ -746,6 +757,15 @@ class DeviceHandler(DeviceWorkerHandler):
finally:
self._handle_new_device_update_is_processing = False
async def handle_room_un_partial_stated(self, room_id: str) -> None:
"""Handles sending appropriate device list updates in a room that has
gone from partial to full state.
"""
# We defer to the device list updater implementation as we're on the
# right worker.
await self.device_list_updater.handle_room_un_partial_stated(room_id)
def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
@ -836,6 +856,16 @@ class DeviceListUpdater:
)
return
# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
partial_rooms = await self.store.get_partial_state_rooms_and_servers()
if partial_rooms:
await self.store.add_remote_device_list_to_pending(
user_id,
device_id,
)
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
@ -1175,3 +1205,35 @@ class DeviceListUpdater:
device_ids.append(verify_key.version)
return device_ids
async def handle_room_un_partial_stated(self, room_id: str) -> None:
"""Handles sending appropriate device list updates in a room that has
gone from partial to full state.
"""
pending_updates = (
await self.store.get_pending_remote_device_list_updates_for_room(room_id)
)
for user_id, device_id in pending_updates:
logger.info(
"Got pending device list update in room %s: %s / %s",
room_id,
user_id,
device_id,
)
position = await self.store.add_device_change_to_streams(
user_id,
[device_id],
room_ids=[room_id],
)
if not position:
# This should only happen if there are no updates, which
# shouldn't happen when we've passed in a non-empty set of
# device IDs.
continue
self.device_handler.notifier.on_new_event(
StreamKeyType.DEVICE_LIST, position, rooms=[room_id]
)

View file

@ -149,6 +149,7 @@ class FederationHandler:
self.http_client = hs.get_proxied_blacklisted_http_client()
self._replication = hs.get_replication_data_handler()
self._federation_event_handler = hs.get_federation_event_handler()
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
@ -1631,6 +1632,9 @@ class FederationHandler:
# https://github.com/matrix-org/synapse/issues/12994
await self.state_handler.update_current_state(room_id)
logger.info("Handling any pending device list updates")
await self._device_handler.handle_room_un_partial_stated(room_id)
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:

View file

@ -1995,3 +1995,58 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
add_device_list_outbound_pokes_txn,
stream_ids,
)
async def add_remote_device_list_to_pending(
self, user_id: str, device_id: str
) -> None:
"""Add a device list update to the table tracking remote device list
updates during partial joins.
"""
async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
await self.db_pool.simple_upsert(
table="device_lists_remote_pending",
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
values={"stream_id": stream_id},
desc="add_remote_device_list_to_pending",
)
async def get_pending_remote_device_list_updates_for_room(
self, room_id: str
) -> Collection[Tuple[str, str]]:
"""Get the set of remote device list updates from the pending table for
the room.
"""
min_device_stream_id = await self.db_pool.simple_select_one_onecol(
table="partial_state_rooms",
keyvalues={
"room_id": room_id,
},
retcol="device_lists_stream_id",
desc="get_pending_remote_device_list_updates_for_room_device",
)
sql = """
SELECT user_id, device_id FROM device_lists_remote_pending AS d
INNER JOIN current_state_events AS c ON
type = 'm.room.member'
AND state_key = user_id
AND membership = 'join'
WHERE
room_id = ? AND stream_id > ?
"""
def get_pending_remote_device_list_updates_for_room_txn(
txn: LoggingTransaction,
) -> Collection[Tuple[str, str]]:
txn.execute(sql, (room_id, min_device_stream_id))
return cast(Collection[Tuple[str, str]], txn.fetchall())
return await self.db_pool.runInteraction(
"get_pending_remote_device_list_updates_for_room",
get_pending_remote_device_list_updates_for_room_txn,
)

View file

@ -1217,6 +1217,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
# We now delete anything from `device_lists_remote_pending` with a
# stream ID less than the minimum
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
txn,
table="partial_state_rooms",
keyvalues={},
retcol="MIN(device_lists_stream_id)",
allow_none=True,
)
if device_lists_stream_id is None:
# There are no rooms being currently partially joined, so we delete everything.
txn.execute("DELETE FROM device_lists_remote_pending")
else:
sql = """
DELETE FROM device_lists_remote_pending
WHERE stream_id <= ?
"""
txn.execute(sql, (device_lists_stream_id,))
@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.

View file

@ -0,0 +1,28 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Stores remote device lists we have received for remote users while a partial
-- join is in progress.
--
-- This allows us to replay any device list updates if it turns out the remote
-- user was in the partially joined room
CREATE TABLE device_lists_remote_pending(
stream_id BIGINT PRIMARY KEY,
user_id TEXT NOT NULL,
device_id TEXT NOT NULL
);
-- We only keep the most recent update for a given user/device pair.
CREATE UNIQUE INDEX device_lists_remote_pending_user_device_id ON device_lists_remote_pending(user_id, device_id);