diff --git a/changelog.d/17358.misc b/changelog.d/17358.misc new file mode 100644 index 000000000..d3ef0b377 --- /dev/null +++ b/changelog.d/17358.misc @@ -0,0 +1 @@ +Handle device lists notifications for large accounts more efficiently in worker mode. diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 5eeca6165..59a035dd6 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -2131,7 +2131,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id: str, device_id: str, hosts: Collection[str], - stream_ids: List[int], + stream_id: int, context: Optional[Dict[str, str]], ) -> None: if self._device_list_federation_stream_cache: @@ -2139,11 +2139,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): txn.call_after( self._device_list_federation_stream_cache.entity_has_changed, host, - stream_ids[-1], + stream_id, ) now = self._clock.time_msec() - stream_id_iterator = iter(stream_ids) encoded_context = json_encoder.encode(context) mark_sent = not self.hs.is_mine_id(user_id) @@ -2152,7 +2151,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ( destination, self._instance_name, - next(stream_id_iterator), + stream_id, user_id, device_id, mark_sent, @@ -2337,22 +2336,22 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): return def add_device_list_outbound_pokes_txn( - txn: LoggingTransaction, stream_ids: List[int] + txn: LoggingTransaction, stream_id: int ) -> None: self._add_device_outbound_poke_to_stream_txn( txn, user_id=user_id, device_id=device_id, hosts=hosts, - stream_ids=stream_ids, + stream_id=stream_id, context=context, ) - async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: + async with self._device_list_id_gen.get_next() as stream_id: return await self.db_pool.runInteraction( "add_device_list_outbound_pokes", add_device_list_outbound_pokes_txn, - stream_ids, + stream_id, ) async def add_remote_device_list_to_pending(