mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 23:33:53 +01:00
Speed up /keys/changes
(#17548)
Follow on from #17537. This is just adding a batched lookup function (you might want to hide whitespace in the diff).
This commit is contained in:
parent
9ce489be5e
commit
f162c92f2a
3 changed files with 77 additions and 20 deletions
1
changelog.d/17548.misc
Normal file
1
changelog.d/17548.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix performance of device lists in `/key/changes` and sliding sync.
|
|
@ -267,27 +267,23 @@ class DeviceWorkerHandler:
|
||||||
newly_left_rooms.add(change.room_id)
|
newly_left_rooms.add(change.room_id)
|
||||||
|
|
||||||
# We now work out if any other users have since joined or left the rooms
|
# We now work out if any other users have since joined or left the rooms
|
||||||
# the user is currently in. First we filter out rooms that we know
|
# the user is currently in.
|
||||||
# haven't changed recently.
|
|
||||||
rooms_changed = self.store.get_rooms_that_changed(
|
|
||||||
joined_room_ids, from_token.room_key
|
|
||||||
)
|
|
||||||
|
|
||||||
# List of membership changes per room
|
# List of membership changes per room
|
||||||
room_to_deltas: Dict[str, List[StateDelta]] = {}
|
room_to_deltas: Dict[str, List[StateDelta]] = {}
|
||||||
# The set of event IDs of membership events (so we can fetch their
|
# The set of event IDs of membership events (so we can fetch their
|
||||||
# associated membership).
|
# associated membership).
|
||||||
memberships_to_fetch: Set[str] = set()
|
memberships_to_fetch: Set[str] = set()
|
||||||
for room_id in rooms_changed:
|
|
||||||
# TODO: Only pull out membership events?
|
# TODO: Only pull out membership events?
|
||||||
state_changes = await self.store.get_current_state_deltas_for_room(
|
state_changes = await self.store.get_current_state_deltas_for_rooms(
|
||||||
room_id, from_token=from_token.room_key, to_token=now_token.room_key
|
joined_room_ids, from_token=from_token.room_key, to_token=now_token.room_key
|
||||||
)
|
)
|
||||||
for delta in state_changes:
|
for delta in state_changes:
|
||||||
if delta.event_type != EventTypes.Member:
|
if delta.event_type != EventTypes.Member:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
room_to_deltas.setdefault(room_id, []).append(delta)
|
room_to_deltas.setdefault(delta.room_id, []).append(delta)
|
||||||
if delta.event_id:
|
if delta.event_id:
|
||||||
memberships_to_fetch.add(delta.event_id)
|
memberships_to_fetch.add(delta.event_id)
|
||||||
if delta.prev_event_id:
|
if delta.prev_event_id:
|
||||||
|
|
|
@ -26,10 +26,11 @@ import attr
|
||||||
|
|
||||||
from synapse.logging.opentracing import trace
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import LoggingTransaction
|
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
|
||||||
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken, StrCollection
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
from synapse.util.iterutils import batch_iter
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -200,3 +201,62 @@ class StateDeltasStore(SQLBaseStore):
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
async def get_current_state_deltas_for_rooms(
|
||||||
|
self,
|
||||||
|
room_ids: StrCollection,
|
||||||
|
from_token: RoomStreamToken,
|
||||||
|
to_token: RoomStreamToken,
|
||||||
|
) -> List[StateDelta]:
|
||||||
|
"""Get the state deltas between two tokens for the set of rooms."""
|
||||||
|
|
||||||
|
room_ids = self._curr_state_delta_stream_cache.get_entities_changed(
|
||||||
|
room_ids, from_token.stream
|
||||||
|
)
|
||||||
|
if not room_ids:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_current_state_deltas_for_rooms_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
room_ids: StrCollection,
|
||||||
|
) -> List[StateDelta]:
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "room_id", room_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
|
SELECT instance_name, stream_id, room_id, type, state_key, event_id, prev_event_id
|
||||||
|
FROM current_state_delta_stream
|
||||||
|
WHERE {clause} AND ? < stream_id AND stream_id <= ?
|
||||||
|
ORDER BY stream_id ASC
|
||||||
|
"""
|
||||||
|
args.append(from_token.stream)
|
||||||
|
args.append(to_token.get_max_stream_pos())
|
||||||
|
|
||||||
|
txn.execute(sql, args)
|
||||||
|
|
||||||
|
return [
|
||||||
|
StateDelta(
|
||||||
|
stream_id=row[1],
|
||||||
|
room_id=row[2],
|
||||||
|
event_type=row[3],
|
||||||
|
state_key=row[4],
|
||||||
|
event_id=row[5],
|
||||||
|
prev_event_id=row[6],
|
||||||
|
)
|
||||||
|
for row in txn
|
||||||
|
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||||
|
]
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for batch in batch_iter(room_ids, 1000):
|
||||||
|
deltas = await self.db_pool.runInteraction(
|
||||||
|
"get_current_state_deltas_for_rooms",
|
||||||
|
get_current_state_deltas_for_rooms_txn,
|
||||||
|
batch,
|
||||||
|
)
|
||||||
|
|
||||||
|
results.extend(deltas)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
Loading…
Reference in a new issue