mirror of
https://mau.dev/maunium/synapse.git
synced 2025-01-22 07:20:15 +01:00
Speed up SS room sorting (#17468)
We do this by bulk fetching the latest stream ordering. --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
parent
1daae43f3a
commit
d225b6b3eb
6 changed files with 159 additions and 29 deletions
1
changelog.d/17468.misc
Normal file
1
changelog.d/17468.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Speed up sorting of the room list in sliding sync.
|
|
@ -1230,34 +1230,33 @@ class SlidingSyncHandler:
|
||||||
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
|
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
|
||||||
# user should see in the room (<= `to_token`)
|
# user should see in the room (<= `to_token`)
|
||||||
last_activity_in_room_map: Dict[str, int] = {}
|
last_activity_in_room_map: Dict[str, int] = {}
|
||||||
|
|
||||||
for room_id, room_for_user in sync_room_map.items():
|
for room_id, room_for_user in sync_room_map.items():
|
||||||
# If they are fully-joined to the room, let's find the latest activity
|
if room_for_user.membership != Membership.JOIN:
|
||||||
# at/before the `to_token`.
|
# If the user has left/been invited/knocked/been banned from a
|
||||||
if room_for_user.membership == Membership.JOIN:
|
# room, they shouldn't see anything past that point.
|
||||||
last_event_result = (
|
|
||||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
|
||||||
room_id, to_token.room_key
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# If the room has no events at/before the `to_token`, this is probably a
|
|
||||||
# mistake in the code that generates the `sync_room_map` since that should
|
|
||||||
# only give us rooms that the user had membership in during the token range.
|
|
||||||
assert last_event_result is not None
|
|
||||||
|
|
||||||
_, event_pos = last_event_result
|
|
||||||
|
|
||||||
last_activity_in_room_map[room_id] = event_pos.stream
|
|
||||||
else:
|
|
||||||
# Otherwise, if the user has left/been invited/knocked/been banned from
|
|
||||||
# a room, they shouldn't see anything past that point.
|
|
||||||
#
|
#
|
||||||
# FIXME: It's possible that people should see beyond this point in
|
# FIXME: It's possible that people should see beyond this point
|
||||||
# invited/knocked cases if for example the room has
|
# in invited/knocked cases if for example the room has
|
||||||
# `invite`/`world_readable` history visibility, see
|
# `invite`/`world_readable` history visibility, see
|
||||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
||||||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
||||||
|
|
||||||
|
# For fully-joined rooms, we find the latest activity at/before the
|
||||||
|
# `to_token`.
|
||||||
|
joined_room_positions = (
|
||||||
|
await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
|
||||||
|
[
|
||||||
|
room_id
|
||||||
|
for room_id, room_for_user in sync_room_map.items()
|
||||||
|
if room_for_user.membership == Membership.JOIN
|
||||||
|
],
|
||||||
|
to_token.room_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
last_activity_in_room_map.update(joined_room_positions)
|
||||||
|
|
||||||
return sorted(
|
return sorted(
|
||||||
sync_room_map.values(),
|
sync_room_map.values(),
|
||||||
# Sort by the last activity (stream_ordering) in the room
|
# Sort by the last activity (stream_ordering) in the room
|
||||||
|
|
|
@ -1313,6 +1313,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
# We want to make the cache more effective, so we clamp to the last
|
# We want to make the cache more effective, so we clamp to the last
|
||||||
# change before the given ordering.
|
# change before the given ordering.
|
||||||
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # type: ignore[attr-defined]
|
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # type: ignore[attr-defined]
|
||||||
|
if last_change is None:
|
||||||
|
# If the room isn't in the cache we know that the last change was
|
||||||
|
# somewhere before the earliest known position of the cache, so we
|
||||||
|
# can clamp to that.
|
||||||
|
last_change = self._events_stream_cache.get_earliest_known_position() # type: ignore[attr-defined]
|
||||||
|
|
||||||
# We don't always have a full stream_to_exterm_id table, e.g. after
|
# We don't always have a full stream_to_exterm_id table, e.g. after
|
||||||
# the upgrade that introduced it, so we make sure we never ask for a
|
# the upgrade that introduced it, so we make sure we never ask for a
|
||||||
|
|
|
@ -78,10 +78,11 @@ from synapse.storage.database import (
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||||
from synapse.types import PersistedEventPosition, RoomStreamToken
|
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.cancellation import cancellable
|
from synapse.util.cancellation import cancellable
|
||||||
|
from synapse.util.iterutils import batch_iter
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
@ -1293,6 +1294,126 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
get_last_event_pos_in_room_before_stream_ordering_txn,
|
get_last_event_pos_in_room_before_stream_ordering_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def bulk_get_last_event_pos_in_room_before_stream_ordering(
|
||||||
|
self,
|
||||||
|
room_ids: StrCollection,
|
||||||
|
end_token: RoomStreamToken,
|
||||||
|
) -> Dict[str, int]:
|
||||||
|
"""Bulk fetch the stream position of the latest events in the given
|
||||||
|
rooms
|
||||||
|
"""
|
||||||
|
|
||||||
|
min_token = end_token.stream
|
||||||
|
max_token = end_token.get_max_stream_pos()
|
||||||
|
results: Dict[str, int] = {}
|
||||||
|
|
||||||
|
# First, we check for the rooms in the stream change cache to see if we
|
||||||
|
# can just use the latest position from it.
|
||||||
|
missing_room_ids: Set[str] = set()
|
||||||
|
for room_id in room_ids:
|
||||||
|
stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
|
||||||
|
if stream_pos and stream_pos <= min_token:
|
||||||
|
results[room_id] = stream_pos
|
||||||
|
else:
|
||||||
|
missing_room_ids.add(room_id)
|
||||||
|
|
||||||
|
# Next, we query the stream position from the DB. At first we fetch all
|
||||||
|
# positions less than the *max* stream pos in the token, then filter
|
||||||
|
# them down. We do this as a) this is a cheaper query, and b) the vast
|
||||||
|
# majority of rooms will have a latest token from before the min stream
|
||||||
|
# pos.
|
||||||
|
|
||||||
|
def bulk_get_last_event_pos_txn(
|
||||||
|
txn: LoggingTransaction, batch_room_ids: StrCollection
|
||||||
|
) -> Dict[str, int]:
|
||||||
|
# This query fetches the latest stream position in the rooms before
|
||||||
|
# the given max position.
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "room_id", batch_room_ids
|
||||||
|
)
|
||||||
|
sql = f"""
|
||||||
|
SELECT room_id, (
|
||||||
|
SELECT stream_ordering FROM events AS e
|
||||||
|
LEFT JOIN rejections USING (event_id)
|
||||||
|
WHERE e.room_id = r.room_id
|
||||||
|
AND stream_ordering <= ?
|
||||||
|
AND NOT outlier
|
||||||
|
AND rejection_reason IS NULL
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
FROM rooms AS r
|
||||||
|
WHERE {clause}
|
||||||
|
"""
|
||||||
|
txn.execute(sql, [max_token] + args)
|
||||||
|
return {row[0]: row[1] for row in txn}
|
||||||
|
|
||||||
|
recheck_rooms: Set[str] = set()
|
||||||
|
for batched in batch_iter(missing_room_ids, 1000):
|
||||||
|
result = await self.db_pool.runInteraction(
|
||||||
|
"bulk_get_last_event_pos_in_room_before_stream_ordering",
|
||||||
|
bulk_get_last_event_pos_txn,
|
||||||
|
batched,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that the stream position for the rooms are from before the
|
||||||
|
# minimum position of the token. If not then we need to fetch more
|
||||||
|
# rows.
|
||||||
|
for room_id, stream in result.items():
|
||||||
|
if stream <= min_token:
|
||||||
|
results[room_id] = stream
|
||||||
|
else:
|
||||||
|
recheck_rooms.add(room_id)
|
||||||
|
|
||||||
|
if not recheck_rooms:
|
||||||
|
return results
|
||||||
|
|
||||||
|
# For the remaining rooms we need to fetch all rows between the min and
|
||||||
|
# max stream positions in the end token, and filter out the rows that
|
||||||
|
# are after the end token.
|
||||||
|
#
|
||||||
|
# This query should be fast as the range between the min and max should
|
||||||
|
# be small.
|
||||||
|
|
||||||
|
def bulk_get_last_event_pos_recheck_txn(
|
||||||
|
txn: LoggingTransaction, batch_room_ids: StrCollection
|
||||||
|
) -> Dict[str, int]:
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "room_id", batch_room_ids
|
||||||
|
)
|
||||||
|
sql = f"""
|
||||||
|
SELECT room_id, instance_name, stream_ordering
|
||||||
|
FROM events
|
||||||
|
WHERE ? < stream_ordering AND stream_ordering <= ?
|
||||||
|
AND NOT outlier
|
||||||
|
AND rejection_reason IS NULL
|
||||||
|
AND {clause}
|
||||||
|
ORDER BY stream_ordering ASC
|
||||||
|
"""
|
||||||
|
txn.execute(sql, [min_token, max_token] + args)
|
||||||
|
|
||||||
|
# We take the max stream ordering that is less than the token. Since
|
||||||
|
# we ordered by stream ordering we just need to iterate through and
|
||||||
|
# take the last matching stream ordering.
|
||||||
|
txn_results: Dict[str, int] = {}
|
||||||
|
for row in txn:
|
||||||
|
room_id = row[0]
|
||||||
|
event_pos = PersistedEventPosition(row[1], row[2])
|
||||||
|
if not event_pos.persisted_after(end_token):
|
||||||
|
txn_results[room_id] = event_pos.stream
|
||||||
|
|
||||||
|
return txn_results
|
||||||
|
|
||||||
|
for batched in batch_iter(recheck_rooms, 1000):
|
||||||
|
recheck_result = await self.db_pool.runInteraction(
|
||||||
|
"bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
|
||||||
|
bulk_get_last_event_pos_recheck_txn,
|
||||||
|
batched,
|
||||||
|
)
|
||||||
|
results.update(recheck_result)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
async def get_current_room_stream_token_for_room_id(
|
async def get_current_room_stream_token_for_room_id(
|
||||||
self, room_id: str
|
self, room_id: str
|
||||||
) -> RoomStreamToken:
|
) -> RoomStreamToken:
|
||||||
|
|
|
@ -327,7 +327,7 @@ class StreamChangeCache:
|
||||||
for entity in r:
|
for entity in r:
|
||||||
self._entity_to_key.pop(entity, None)
|
self._entity_to_key.pop(entity, None)
|
||||||
|
|
||||||
def get_max_pos_of_last_change(self, entity: EntityType) -> int:
|
def get_max_pos_of_last_change(self, entity: EntityType) -> Optional[int]:
|
||||||
"""Returns an upper bound of the stream id of the last change to an
|
"""Returns an upper bound of the stream id of the last change to an
|
||||||
entity.
|
entity.
|
||||||
|
|
||||||
|
@ -335,7 +335,11 @@ class StreamChangeCache:
|
||||||
entity: The entity to check.
|
entity: The entity to check.
|
||||||
|
|
||||||
Return:
|
Return:
|
||||||
The stream position of the latest change for the given entity or
|
The stream position of the latest change for the given entity, if
|
||||||
the earliest known stream position if the entitiy is unknown.
|
known
|
||||||
"""
|
"""
|
||||||
return self._entity_to_key.get(entity, self._earliest_known_stream_pos)
|
return self._entity_to_key.get(entity)
|
||||||
|
|
||||||
|
def get_earliest_known_position(self) -> int:
|
||||||
|
"""Returns the earliest position in the cache."""
|
||||||
|
return self._earliest_known_stream_pos
|
||||||
|
|
|
@ -249,5 +249,5 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||||
self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3)
|
self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3)
|
||||||
self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4)
|
self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4)
|
||||||
|
|
||||||
# Unknown entities will return the stream start position.
|
# Unknown entities will return None
|
||||||
self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1)
|
self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None)
|
||||||
|
|
Loading…
Add table
Reference in a new issue