mirror of
https://mau.dev/maunium/synapse.git
synced 2025-01-18 23:12:00 +01:00
Sliding Sync: Use Sliding Sync tables for sorting (#17693)
Use Sliding Sync tables for sorting (`bulk_get_last_event_pos_in_room_before_stream_ordering(...)` -> `_bulk_get_max_event_pos(...)`)
This commit is contained in:
parent
e4a1f271b9
commit
16af80b8fb
8 changed files with 103 additions and 105 deletions
1
changelog.d/17693.misc
Normal file
1
changelog.d/17693.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Use Sliding Sync tables as a bulk shortcut for getting the max `event_stream_ordering` of rooms.
|
|
@ -27,6 +27,7 @@ from typing import (
|
||||||
Set,
|
Set,
|
||||||
Tuple,
|
Tuple,
|
||||||
Union,
|
Union,
|
||||||
|
cast,
|
||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -355,11 +356,18 @@ class SlidingSyncRoomLists:
|
||||||
if list_config.ranges:
|
if list_config.ranges:
|
||||||
if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]:
|
if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]:
|
||||||
# If we are asking for the full range, we don't need to sort the list.
|
# If we are asking for the full range, we don't need to sort the list.
|
||||||
sorted_room_info = list(filtered_sync_room_map.values())
|
sorted_room_info: List[RoomsForUserType] = list(
|
||||||
|
filtered_sync_room_map.values()
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Sort the list
|
# Sort the list
|
||||||
sorted_room_info = await self.sort_rooms_using_tables(
|
sorted_room_info = await self.sort_rooms(
|
||||||
filtered_sync_room_map, to_token
|
# Cast is safe because RoomsForUserSlidingSync is part
|
||||||
|
# of the `RoomsForUserType` union. Why can't it detect this?
|
||||||
|
cast(
|
||||||
|
Dict[str, RoomsForUserType], filtered_sync_room_map
|
||||||
|
),
|
||||||
|
to_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
for range in list_config.ranges:
|
for range in list_config.ranges:
|
||||||
|
@ -1762,63 +1770,6 @@ class SlidingSyncRoomLists:
|
||||||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||||
|
|
||||||
@trace
|
|
||||||
async def sort_rooms_using_tables(
|
|
||||||
self,
|
|
||||||
sync_room_map: Mapping[str, RoomsForUserSlidingSync],
|
|
||||||
to_token: StreamToken,
|
|
||||||
) -> List[RoomsForUserSlidingSync]:
|
|
||||||
"""
|
|
||||||
Sort by `stream_ordering` of the last event that the user should see in the
|
|
||||||
room. `stream_ordering` is unique so we get a stable sort.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
sync_room_map: Dictionary of room IDs to sort along with membership
|
|
||||||
information in the room at the time of `to_token`.
|
|
||||||
to_token: We sort based on the events in the room at this token (<= `to_token`)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A sorted list of room IDs by `stream_ordering` along with membership information.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
|
|
||||||
# user should see in the room (<= `to_token`)
|
|
||||||
last_activity_in_room_map: Dict[str, int] = {}
|
|
||||||
|
|
||||||
for room_id, room_for_user in sync_room_map.items():
|
|
||||||
if room_for_user.membership != Membership.JOIN:
|
|
||||||
# If the user has left/been invited/knocked/been banned from a
|
|
||||||
# room, they shouldn't see anything past that point.
|
|
||||||
#
|
|
||||||
# FIXME: It's possible that people should see beyond this point
|
|
||||||
# in invited/knocked cases if for example the room has
|
|
||||||
# `invite`/`world_readable` history visibility, see
|
|
||||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
|
||||||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
|
||||||
|
|
||||||
# For fully-joined rooms, we find the latest activity at/before the
|
|
||||||
# `to_token`.
|
|
||||||
joined_room_positions = (
|
|
||||||
await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
|
|
||||||
[
|
|
||||||
room_id
|
|
||||||
for room_id, room_for_user in sync_room_map.items()
|
|
||||||
if room_for_user.membership == Membership.JOIN
|
|
||||||
],
|
|
||||||
to_token.room_key,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
last_activity_in_room_map.update(joined_room_positions)
|
|
||||||
|
|
||||||
return sorted(
|
|
||||||
sync_room_map.values(),
|
|
||||||
# Sort by the last activity (stream_ordering) in the room
|
|
||||||
key=lambda room_info: last_activity_in_room_map[room_info.room_id],
|
|
||||||
# We want descending order
|
|
||||||
reverse=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def sort_rooms(
|
async def sort_rooms(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -52,6 +52,7 @@ from synapse.storage.types import Cursor
|
||||||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
||||||
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
|
from synapse.types.storage import _BackgroundUpdates
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
from synapse.util.iterutils import batch_iter
|
from synapse.util.iterutils import batch_iter
|
||||||
|
|
||||||
|
@ -76,34 +77,6 @@ _REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class _BackgroundUpdates:
|
|
||||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
|
||||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
|
||||||
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
|
||||||
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
|
|
||||||
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
|
|
||||||
INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
|
|
||||||
INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
|
|
||||||
INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
|
|
||||||
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
|
|
||||||
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
|
|
||||||
|
|
||||||
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
|
|
||||||
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
|
|
||||||
|
|
||||||
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
|
|
||||||
|
|
||||||
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
|
|
||||||
|
|
||||||
SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
|
|
||||||
"sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
|
|
||||||
)
|
|
||||||
SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
|
|
||||||
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
|
|
||||||
"sliding_sync_membership_snapshots_bg_update"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
class _CalculateChainCover:
|
class _CalculateChainCover:
|
||||||
"""Return value for _calculate_chain_cover_txn."""
|
"""Return value for _calculate_chain_cover_txn."""
|
||||||
|
|
|
@ -83,6 +83,7 @@ from synapse.storage.util.id_generators import (
|
||||||
from synapse.storage.util.sequence import build_sequence_generator
|
from synapse.storage.util.sequence import build_sequence_generator
|
||||||
from synapse.types import JsonDict, get_domain_from_id
|
from synapse.types import JsonDict, get_domain_from_id
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
|
from synapse.types.storage import _BackgroundUpdates
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||||
from synapse.util.caches.descriptors import cached, cachedList
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
|
@ -2465,3 +2466,14 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.invalidate_get_event_cache_after_txn(txn, event_id)
|
self.invalidate_get_event_cache_after_txn(txn, event_id)
|
||||||
|
|
||||||
|
async def have_finished_sliding_sync_background_jobs(self) -> bool:
|
||||||
|
"""Return if it's safe to use the sliding sync membership tables."""
|
||||||
|
|
||||||
|
return await self.db_pool.updates.have_completed_background_updates(
|
||||||
|
(
|
||||||
|
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
|
||||||
|
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
|
||||||
|
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
|
@ -51,7 +51,6 @@ from synapse.storage.database import (
|
||||||
LoggingTransaction,
|
LoggingTransaction,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||||
from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates
|
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.engines import Sqlite3Engine
|
from synapse.storage.engines import Sqlite3Engine
|
||||||
from synapse.storage.roommember import (
|
from synapse.storage.roommember import (
|
||||||
|
@ -1449,17 +1448,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||||
get_sliding_sync_rooms_for_user_txn,
|
get_sliding_sync_rooms_for_user_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def have_finished_sliding_sync_background_jobs(self) -> bool:
|
|
||||||
"""Return if it's safe to use the sliding sync membership tables."""
|
|
||||||
|
|
||||||
return await self.db_pool.updates.have_completed_background_updates(
|
|
||||||
(
|
|
||||||
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
|
|
||||||
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
|
|
||||||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
|
@ -1524,7 +1524,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# majority of rooms will have a latest token from before the min stream
|
# majority of rooms will have a latest token from before the min stream
|
||||||
# pos.
|
# pos.
|
||||||
|
|
||||||
def bulk_get_max_event_pos_txn(
|
def bulk_get_max_event_pos_fallback_txn(
|
||||||
txn: LoggingTransaction, batched_room_ids: StrCollection
|
txn: LoggingTransaction, batched_room_ids: StrCollection
|
||||||
) -> Dict[str, int]:
|
) -> Dict[str, int]:
|
||||||
clause, args = make_in_list_sql_clause(
|
clause, args = make_in_list_sql_clause(
|
||||||
|
@ -1547,11 +1547,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
txn.execute(sql, [max_pos] + args)
|
txn.execute(sql, [max_pos] + args)
|
||||||
return {row[0]: row[1] for row in txn}
|
return {row[0]: row[1] for row in txn}
|
||||||
|
|
||||||
|
# It's easier to look at the `sliding_sync_joined_rooms` table and avoid all of
|
||||||
|
# the joins and sub-queries.
|
||||||
|
def bulk_get_max_event_pos_from_sliding_sync_tables_txn(
|
||||||
|
txn: LoggingTransaction, batched_room_ids: StrCollection
|
||||||
|
) -> Dict[str, int]:
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "room_id", batched_room_ids
|
||||||
|
)
|
||||||
|
sql = f"""
|
||||||
|
SELECT room_id, event_stream_ordering
|
||||||
|
FROM sliding_sync_joined_rooms
|
||||||
|
WHERE {clause}
|
||||||
|
ORDER BY event_stream_ordering DESC
|
||||||
|
"""
|
||||||
|
txn.execute(sql, args)
|
||||||
|
return {row[0]: row[1] for row in txn}
|
||||||
|
|
||||||
recheck_rooms: Set[str] = set()
|
recheck_rooms: Set[str] = set()
|
||||||
for batched in batch_iter(room_ids, 1000):
|
for batched in batch_iter(room_ids, 1000):
|
||||||
batch_results = await self.db_pool.runInteraction(
|
if await self.have_finished_sliding_sync_background_jobs():
|
||||||
"_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched
|
batch_results = await self.db_pool.runInteraction(
|
||||||
)
|
"bulk_get_max_event_pos_from_sliding_sync_tables_txn",
|
||||||
|
bulk_get_max_event_pos_from_sliding_sync_tables_txn,
|
||||||
|
batched,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
batch_results = await self.db_pool.runInteraction(
|
||||||
|
"bulk_get_max_event_pos_fallback_txn",
|
||||||
|
bulk_get_max_event_pos_fallback_txn,
|
||||||
|
batched,
|
||||||
|
)
|
||||||
for room_id, stream_ordering in batch_results.items():
|
for room_id, stream_ordering in batch_results.items():
|
||||||
if stream_ordering <= now_token.stream:
|
if stream_ordering <= now_token.stream:
|
||||||
results.update(batch_results)
|
results.update(batch_results)
|
||||||
|
|
47
synapse/types/storage/__init__.py
Normal file
47
synapse/types/storage/__init__.py
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
#
|
||||||
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2024 New Vector, Ltd
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# See the GNU Affero General Public License for more details:
|
||||||
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
#
|
||||||
|
# Originally licensed under the Apache License, Version 2.0:
|
||||||
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||||
|
#
|
||||||
|
# [This file includes modifications made by New Vector Limited]
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
|
|
||||||
|
class _BackgroundUpdates:
|
||||||
|
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||||
|
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||||
|
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
||||||
|
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
|
||||||
|
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
|
||||||
|
INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
|
||||||
|
INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
|
||||||
|
INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
|
||||||
|
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
|
||||||
|
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
|
||||||
|
|
||||||
|
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
|
||||||
|
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
|
||||||
|
|
||||||
|
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
|
||||||
|
|
||||||
|
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
|
||||||
|
|
||||||
|
SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE = (
|
||||||
|
"sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update"
|
||||||
|
)
|
||||||
|
SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
|
||||||
|
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
|
||||||
|
"sliding_sync_membership_snapshots_bg_update"
|
||||||
|
)
|
|
@ -34,11 +34,11 @@ from synapse.rest.client import login, room
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.databases.main.events import DeltaState
|
from synapse.storage.databases.main.events import DeltaState
|
||||||
from synapse.storage.databases.main.events_bg_updates import (
|
from synapse.storage.databases.main.events_bg_updates import (
|
||||||
_BackgroundUpdates,
|
|
||||||
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
|
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
|
||||||
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
|
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
|
||||||
)
|
)
|
||||||
from synapse.types import create_requester
|
from synapse.types import create_requester
|
||||||
|
from synapse.types.storage import _BackgroundUpdates
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.test_utils.event_injection import create_event
|
from tests.test_utils.event_injection import create_event
|
||||||
|
|
Loading…
Add table
Reference in a new issue