mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 02:03:51 +01:00
Speed up sliding sync when there are many active subscriptions (#17789)
Two changes: a) use a batch lookup function instead of a loop, b) check existing data to see if we already have what we need and only fetch what we don't.
This commit is contained in:
parent
e8c8924b81
commit
e2610de208
3 changed files with 63 additions and 7 deletions
1
changelog.d/17789.misc
Normal file
1
changelog.d/17789.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Speed up sliding sync when there are many active subscriptions.
|
|
@ -500,6 +500,16 @@ class SlidingSyncRoomLists:
|
||||||
# depending on the `required_state` requested (see below).
|
# depending on the `required_state` requested (see below).
|
||||||
partial_state_rooms = await self.store.get_partial_rooms()
|
partial_state_rooms = await self.store.get_partial_rooms()
|
||||||
|
|
||||||
|
# Fetch any rooms that we have not already fetched from the database.
|
||||||
|
subscription_sliding_sync_rooms = (
|
||||||
|
await self.store.get_sliding_sync_room_for_user_batch(
|
||||||
|
user_id,
|
||||||
|
sync_config.room_subscriptions.keys()
|
||||||
|
- room_membership_for_user_map.keys(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
room_membership_for_user_map.update(subscription_sliding_sync_rooms)
|
||||||
|
|
||||||
for (
|
for (
|
||||||
room_id,
|
room_id,
|
||||||
room_subscription,
|
room_subscription,
|
||||||
|
@ -507,17 +517,11 @@ class SlidingSyncRoomLists:
|
||||||
# Check if we have a membership for the room, but didn't pull it out
|
# Check if we have a membership for the room, but didn't pull it out
|
||||||
# above. This could be e.g. a leave that we don't pull out by
|
# above. This could be e.g. a leave that we don't pull out by
|
||||||
# default.
|
# default.
|
||||||
current_room_entry = (
|
current_room_entry = room_membership_for_user_map.get(room_id)
|
||||||
await self.store.get_sliding_sync_room_for_user(
|
|
||||||
user_id, room_id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if not current_room_entry:
|
if not current_room_entry:
|
||||||
# TODO: Handle rooms the user isn't in.
|
# TODO: Handle rooms the user isn't in.
|
||||||
continue
|
continue
|
||||||
|
|
||||||
room_membership_for_user_map[room_id] = current_room_entry
|
|
||||||
|
|
||||||
all_rooms.add(room_id)
|
all_rooms.add(room_id)
|
||||||
|
|
||||||
# Take the superset of the `RoomSyncConfig` for each room.
|
# Take the superset of the `RoomSyncConfig` for each room.
|
||||||
|
|
|
@ -1499,6 +1499,57 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||||
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
|
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_sliding_sync_room_for_user_batch(
|
||||||
|
self, user_id: str, room_ids: StrCollection
|
||||||
|
) -> Dict[str, RoomsForUserSlidingSync]:
|
||||||
|
"""Get the sliding sync room entry for the given user and rooms."""
|
||||||
|
|
||||||
|
if not room_ids:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def get_sliding_sync_room_for_user_batch_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> Dict[str, RoomsForUserSlidingSync]:
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "m.room_id", room_ids
|
||||||
|
)
|
||||||
|
sql = f"""
|
||||||
|
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
|
||||||
|
r.room_version,
|
||||||
|
m.event_instance_name, m.event_stream_ordering,
|
||||||
|
m.has_known_state,
|
||||||
|
COALESCE(j.room_type, m.room_type),
|
||||||
|
COALESCE(j.is_encrypted, m.is_encrypted)
|
||||||
|
FROM sliding_sync_membership_snapshots AS m
|
||||||
|
INNER JOIN rooms AS r USING (room_id)
|
||||||
|
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
|
||||||
|
WHERE m.forgotten = 0
|
||||||
|
AND {clause}
|
||||||
|
AND user_id = ?
|
||||||
|
"""
|
||||||
|
args.append(user_id)
|
||||||
|
txn.execute(sql, args)
|
||||||
|
|
||||||
|
return {
|
||||||
|
row[0]: RoomsForUserSlidingSync(
|
||||||
|
room_id=row[0],
|
||||||
|
sender=row[1],
|
||||||
|
membership=row[2],
|
||||||
|
event_id=row[3],
|
||||||
|
room_version_id=row[4],
|
||||||
|
event_pos=PersistedEventPosition(row[5], row[6]),
|
||||||
|
has_known_state=bool(row[7]),
|
||||||
|
room_type=row[8],
|
||||||
|
is_encrypted=row[9],
|
||||||
|
)
|
||||||
|
for row in txn
|
||||||
|
}
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_sliding_sync_room_for_user_batch",
|
||||||
|
get_sliding_sync_room_for_user_batch_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
Loading…
Reference in a new issue