mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-12 13:01:34 +01:00
Sliding Sync: Pre-populate room data for quick filtering/sorting (#17512)
Pre-populate room data for quick filtering/sorting in the Sliding Sync API Spawning from https://github.com/element-hq/synapse/pull/17450#discussion_r1697335578 This PR is acting as the Synapse version `N+1` step in the gradual migration being tracked by https://github.com/element-hq/synapse/issues/17623 Adding two new database tables: - `sliding_sync_joined_rooms`: A table for storing room meta data that the local server is still participating in. The info here can be shared across all `Membership.JOIN`. Keyed on `(room_id)` and updated when the relevant room current state changes or a new event is sent in the room. - `sliding_sync_membership_snapshots`: A table for storing a snapshot of room meta data at the time of the local user's membership. Keyed on `(room_id, user_id)` and only updated when a user's membership in a room changes. Also adds background updates to populate these tables with all of the existing data. We want to have the guarantee that if a row exists in the sliding sync tables, we are able to rely on it (accurate data). And if a row doesn't exist, we use a fallback to get the same info until the background updates fill in the rows or a new event comes in triggering it to be fully inserted. This means we need a couple extra things in place until we bump `SCHEMA_COMPAT_VERSION` and run the foreground update in the `N+2` part of the gradual migration. For context on why we can't rely on the tables without these things see [1]. 1. On start-up, block until we clear out any rows for the rooms that have had events since the max-`stream_ordering` of the `sliding_sync_joined_rooms` table (compare to max-`stream_ordering` of the `events` table). For `sliding_sync_membership_snapshots`, we can compare to the max-`stream_ordering` of `local_current_membership` - This accounts for when someone downgrades their Synapse version and then upgrades it again. This will ensure that we don't have any stale/out-of-date data in the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables since any new events sent in rooms would have also needed to be written to the sliding sync tables. For example a new event needs to bump `event_stream_ordering` in `sliding_sync_joined_rooms` table or some state in the room changing (like the room name). Or another example of someone's membership changing in a room affecting `sliding_sync_membership_snapshots`. 1. Add another background update that will catch-up with any rows that were just deleted from the sliding sync tables (based on the activity in the `events`/`local_current_membership`). The rooms that need recalculating are added to the `sliding_sync_joined_rooms_to_recalculate` table. 1. Making sure rows are fully inserted. Instead of partially inserting, we need to check if the row already exists and fully insert all data if not. All of this extra functionality can be removed once the `SCHEMA_COMPAT_VERSION` is bumped with support for the new sliding sync tables so people can no longer downgrade (the `N+2` part of the gradual migration). <details> <summary><sup>[1]</sup></summary> For `sliding_sync_joined_rooms`, since we partially insert rows as state comes in, we can't rely on the existence of the row for a given `room_id`. We can't even rely on looking at whether the background update has finished. There could still be partial rows from when someone reverted their Synapse version after the background update finished, had some state changes (or new rooms), then upgraded again and more state changes happen leaving a partial row. For `sliding_sync_membership_snapshots`, we insert items as a whole except for the `forgotten` column ~~so we can rely on rows existing and just need to always use a fallback for the `forgotten` data. We can't use the `forgotten` column in the table for the same reasons above about `sliding_sync_joined_rooms`.~~ We could have an out-of-date membership from when someone reverted their Synapse version. (same problems as outlined for `sliding_sync_joined_rooms` above) Discussed in an [internal meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.dz5x6ef4mxz7) </details> ### TODO - [x] Update `stream_ordering`/`bump_stamp` - [x] Handle remote invites - [x] Handle state resets - [x] Consider adding `sender` so we can filter `LEAVE` memberships and distinguish from kicks. - [x] We should add it to be able to tell leaves from kicks - [x] Consider adding `tombstone` state to help address https://github.com/element-hq/synapse/issues/17540 - [x] We should add it `tombstone_successor_room_id` - [x] Consider adding `forgotten` status to avoid extra lookup/table-join on `room_memberships` - [x] We should add it - [x] Background update to fill in values for all joined rooms and non-join membership - [x] Clean-up tables when room is deleted - [ ] Make sure tables are useful to our use case - First explored in https://github.com/element-hq/synapse/compare/erikj/ss_use_new_tables - Also explored in76b5a576eb
- [x] Plan for how can we use this with a fallback - See plan discussed above in main area of the issue description - Discussed in an [internal meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.dz5x6ef4mxz7) - [x] Plan for how we can rely on this new table without a fallback - Synapse version `N+1`: (this PR) Bump `SCHEMA_VERSION` to `87`. Add new tables and background update to backfill all rows. Since this is a new table, we don't have to add any `NOT VALID` constraints and validate them when the background update completes. Read from new tables with a fallback in cases where the rows aren't filled in yet. - Synapse version `N+2`: Bump `SCHEMA_VERSION` to `88` and bump `SCHEMA_COMPAT_VERSION` to `87` because we don't want people to downgrade and miss writes while they are on an older version. Add a foreground update to finish off the backfill so we can read from new tables without the fallback. Application code can now rely on the new tables being populated. - Discussed in an [internal meeting](https://docs.google.com/document/d/1MnuvPkaCkT_wviSQZ6YKBjiWciCBFMd-7hxyCO-OCbQ/edit#bookmark=id.hh7shg4cxdhj) ### Dev notes ``` SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.storage.test_events.SlidingSyncPrePopulatedTablesTestCase SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.storage.test_events.SlidingSyncPrePopulatedTablesTestCase ``` ``` SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.FilterRoomsTestCase ``` Reference: - [Development docs on background updates and worked examples of gradual migrations ](1dfa59b238/docs/development/database_schema.md (background-updates)
) - A real example of a gradual migration: https://github.com/matrix-org/synapse/pull/15649#discussion_r1213779514 - Adding `rooms.creator` field that needed a background update to backfill data, https://github.com/matrix-org/synapse/pull/10697 - Adding `rooms.room_version` that needed a background update to backfill data, https://github.com/matrix-org/synapse/pull/6729 - Adding `room_stats_state.room_type` that needed a background update to backfill data, https://github.com/matrix-org/synapse/pull/13031 - Tables from MSC2716: `insertion_events`, `insertion_event_edges`, `insertion_event_extremities`, `batch_events` - `current_state_events` updated in `synapse/storage/databases/main/events.py` --- ``` persist_event (adds to queue) _persist_event_batch _persist_events_and_state_updates (assigns `stream_ordering` to events) _persist_events_txn _store_event_txn _update_metadata_tables_txn _store_room_members_txn _update_current_state_txn ``` --- > Concatenated Indexes [...] (also known as multi-column, composite or combined index) > > [...] key consists of multiple columns. > > We can take advantage of the fact that the first index column is always usable for searching > > *-- https://use-the-index-luke.com/sql/where-clause/the-equals-operator/concatenated-keys* --- Dealing with `portdb` (`synapse/_scripts/synapse_port_db.py`), https://github.com/element-hq/synapse/pull/17512#discussion_r1725998219 --- <details> <summary>SQL queries:</summary> Both of these are equivalent and work in SQLite and Postgres Options 1: ```sql WITH data_table (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) AS ( VALUES ( ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?), (SELECT stream_ordering FROM events WHERE event_id = ?), {", ".join("?" for _ in insert_values)} ) ) INSERT INTO sliding_sync_non_join_memberships (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) SELECT * FROM data_table WHERE membership != ? ON CONFLICT (room_id, user_id) DO UPDATE SET membership_event_id = EXCLUDED.membership_event_id, membership = EXCLUDED.membership, event_stream_ordering = EXCLUDED.event_stream_ordering, {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} ``` Option 2: ```sql INSERT INTO sliding_sync_non_join_memberships (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) SELECT column1 as room_id, column2 as user_id, column3 as membership_event_id, column4 as membership, column5 as event_stream_ordering, {", ".join("column" + str(i) for i in range(6, 6 + len(insert_keys)))} FROM ( VALUES ( ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?), (SELECT stream_ordering FROM events WHERE event_id = ?), {", ".join("?" for _ in insert_values)} ) ) as v WHERE membership != ? ON CONFLICT (room_id, user_id) DO UPDATE SET membership_event_id = EXCLUDED.membership_event_id, membership = EXCLUDED.membership, event_stream_ordering = EXCLUDED.event_stream_ordering, {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} ``` If we don't need the `membership` condition, we could use: ```sql INSERT INTO sliding_sync_non_join_memberships (room_id, membership_event_id, user_id, membership, event_stream_ordering, {", ".join(insert_keys)}) VALUES ( ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?), (SELECT stream_ordering FROM events WHERE event_id = ?), {", ".join("?" for _ in insert_values)} ) ON CONFLICT (room_id, user_id) DO UPDATE SET membership_event_id = EXCLUDED.membership_event_id, membership = EXCLUDED.membership, event_stream_ordering = EXCLUDED.event_stream_ordering, {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} ``` </details> ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Erik Johnston <erik@matrix.org>
This commit is contained in:
parent
594cd5f9fd
commit
1a6b718f8c
22 changed files with 7413 additions and 114 deletions
1
changelog.d/17512.misc
Normal file
1
changelog.d/17512.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
|
@ -129,6 +129,11 @@ BOOLEAN_COLUMNS = {
|
|||
"remote_media_cache": ["authenticated"],
|
||||
"room_stats_state": ["is_federatable"],
|
||||
"rooms": ["is_public", "has_auth_chain_index"],
|
||||
"sliding_sync_joined_rooms": ["is_encrypted"],
|
||||
"sliding_sync_membership_snapshots": [
|
||||
"has_known_state",
|
||||
"is_encrypted",
|
||||
],
|
||||
"users": ["shadow_banned", "approved", "locked", "suspended"],
|
||||
"un_partial_stated_event_stream": ["rejection_status_changed"],
|
||||
"users_who_share_rooms": ["share_private"],
|
||||
|
|
|
@ -245,6 +245,8 @@ class EventContentFields:
|
|||
# `m.room.encryption`` algorithm field
|
||||
ENCRYPTION_ALGORITHM: Final = "algorithm"
|
||||
|
||||
TOMBSTONE_SUCCESSOR_ROOM: Final = "replacement_room"
|
||||
|
||||
|
||||
class EventUnsignedContentFields:
|
||||
"""Fields found inside the 'unsigned' data on events"""
|
||||
|
|
|
@ -57,7 +57,11 @@ from synapse.types import (
|
|||
StreamKeyType,
|
||||
StreamToken,
|
||||
)
|
||||
from synapse.types.handlers import SlidingSyncConfig, SlidingSyncResult
|
||||
from synapse.types.handlers import (
|
||||
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
SlidingSyncConfig,
|
||||
SlidingSyncResult,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
@ -75,18 +79,6 @@ sync_processing_time = Histogram(
|
|||
)
|
||||
|
||||
|
||||
# The event types that clients should consider as new activity.
|
||||
DEFAULT_BUMP_EVENT_TYPES = {
|
||||
EventTypes.Create,
|
||||
EventTypes.Message,
|
||||
EventTypes.Encrypted,
|
||||
EventTypes.Sticker,
|
||||
EventTypes.CallInvite,
|
||||
EventTypes.PollStart,
|
||||
EventTypes.LiveLocationShareStart,
|
||||
}
|
||||
|
||||
|
||||
class SlidingSyncHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
|
@ -986,7 +978,9 @@ class SlidingSyncHandler:
|
|||
# Figure out the last bump event in the room
|
||||
last_bump_event_result = (
|
||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
|
||||
room_id,
|
||||
to_token.room_key,
|
||||
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -502,8 +502,15 @@ class EventsPersistenceStorageController:
|
|||
"""
|
||||
state = await self._calculate_current_state(room_id)
|
||||
delta = await self._calculate_state_delta(room_id, state)
|
||||
sliding_sync_table_changes = (
|
||||
await self.persist_events_store._calculate_sliding_sync_table_changes(
|
||||
room_id, [], delta
|
||||
)
|
||||
)
|
||||
|
||||
await self.persist_events_store.update_current_state(room_id, delta)
|
||||
await self.persist_events_store.update_current_state(
|
||||
room_id, delta, sliding_sync_table_changes
|
||||
)
|
||||
|
||||
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
|
||||
"""Calculate the current state of a room, based on the forward extremities
|
||||
|
|
|
@ -35,6 +35,7 @@ from typing import (
|
|||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
|
@ -1254,9 +1255,9 @@ class DatabasePool:
|
|||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
|
@ -1299,9 +1300,9 @@ class DatabasePool:
|
|||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
lock: bool = True,
|
||||
) -> bool:
|
||||
|
@ -1322,7 +1323,7 @@ class DatabasePool:
|
|||
|
||||
if lock:
|
||||
# We need to lock the table :(
|
||||
self.engine.lock_table(txn, table)
|
||||
txn.database_engine.lock_table(txn, table)
|
||||
|
||||
def _getwhere(key: str) -> str:
|
||||
# If the value we're passing in is None (aka NULL), we need to use
|
||||
|
@ -1376,13 +1377,13 @@ class DatabasePool:
|
|||
# successfully inserted
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def simple_upsert_txn_native_upsert(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
|
@ -1535,8 +1536,8 @@ class DatabasePool:
|
|||
|
||||
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
|
||||
|
||||
@staticmethod
|
||||
def simple_upsert_many_txn_native_upsert(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
key_names: Collection[str],
|
||||
|
@ -1966,8 +1967,8 @@ class DatabasePool:
|
|||
def simple_update_txn(
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
updatevalues: Dict[str, Any],
|
||||
keyvalues: Mapping[str, Any],
|
||||
updatevalues: Mapping[str, Any],
|
||||
) -> int:
|
||||
"""
|
||||
Update rows in the given database table.
|
||||
|
|
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
@ -457,6 +457,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
) -> Optional[EventBase]:
|
||||
"""Get an event from the database by event_id.
|
||||
|
||||
Events for unknown room versions will also be filtered out.
|
||||
|
||||
Args:
|
||||
event_id: The event_id of the event to fetch
|
||||
|
||||
|
@ -511,6 +513,10 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
) -> Dict[str, EventBase]:
|
||||
"""Get events from the database
|
||||
|
||||
Unknown events will be omitted from the response.
|
||||
|
||||
Events for unknown room versions will also be filtered out.
|
||||
|
||||
Args:
|
||||
event_ids: The event_ids of the events to fetch
|
||||
|
||||
|
@ -553,6 +559,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
Unknown events will be omitted from the response.
|
||||
|
||||
Events for unknown room versions will also be filtered out.
|
||||
|
||||
Args:
|
||||
event_ids: The event_ids of the events to fetch
|
||||
|
||||
|
|
|
@ -454,6 +454,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||
# so must be deleted first.
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
# Note: the sliding_sync_ tables have foreign keys to the `events` table
|
||||
# so must be deleted first.
|
||||
"sliding_sync_joined_rooms",
|
||||
"sliding_sync_membership_snapshots",
|
||||
"events",
|
||||
"federation_inbound_events_staging",
|
||||
"receipts_graph",
|
||||
|
|
|
@ -1337,6 +1337,12 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||
updatevalues={"forgotten": 1},
|
||||
)
|
||||
self.db_pool.simple_update_txn(
|
||||
txn,
|
||||
table="sliding_sync_membership_snapshots",
|
||||
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||
updatevalues={"forgotten": 1},
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
||||
self._invalidate_cache_and_stream(
|
||||
|
|
|
@ -161,45 +161,80 @@ class StateDeltasStore(SQLBaseStore):
|
|||
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||
)
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
*,
|
||||
from_token: Optional[RoomStreamToken],
|
||||
to_token: Optional[RoomStreamToken],
|
||||
) -> List[StateDelta]:
|
||||
"""
|
||||
Get the state deltas between two tokens.
|
||||
|
||||
(> `from_token` and <= `to_token`)
|
||||
"""
|
||||
from_clause = ""
|
||||
from_args = []
|
||||
if from_token is not None:
|
||||
from_clause = "AND ? < stream_id"
|
||||
from_args = [from_token.stream]
|
||||
|
||||
to_clause = ""
|
||||
to_args = []
|
||||
if to_token is not None:
|
||||
to_clause = "AND stream_id <= ?"
|
||||
to_args = [to_token.get_max_stream_pos()]
|
||||
|
||||
sql = f"""
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? {from_clause} {to_clause}
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(sql, [room_id] + from_args + to_args)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
@trace
|
||||
async def get_current_state_deltas_for_room(
|
||||
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
||||
self,
|
||||
room_id: str,
|
||||
*,
|
||||
from_token: Optional[RoomStreamToken],
|
||||
to_token: Optional[RoomStreamToken],
|
||||
) -> List[StateDelta]:
|
||||
"""Get the state deltas between two tokens."""
|
||||
"""
|
||||
Get the state deltas between two tokens.
|
||||
|
||||
if not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||
room_id, from_token.stream
|
||||
(> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
if (
|
||||
from_token is not None
|
||||
and not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||
room_id, from_token.stream
|
||||
)
|
||||
):
|
||||
return []
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[StateDelta]:
|
||||
sql = """
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(
|
||||
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
|
||||
)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
||||
"get_current_state_deltas_for_room",
|
||||
self.get_current_state_deltas_for_room_txn,
|
||||
room_id,
|
||||
from_token=from_token,
|
||||
to_token=to_token,
|
||||
)
|
||||
|
||||
@trace
|
||||
|
|
|
@ -1264,12 +1264,76 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
return None
|
||||
|
||||
async def get_last_event_pos_in_room(
|
||||
self,
|
||||
room_id: str,
|
||||
event_types: Optional[StrCollection] = None,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
"""
|
||||
Returns the ID and event position of the last event in a room.
|
||||
|
||||
Based on `get_last_event_pos_in_room_before_stream_ordering(...)`
|
||||
|
||||
Args:
|
||||
room_id
|
||||
event_types: Optional allowlist of event types to filter by
|
||||
|
||||
Returns:
|
||||
The ID of the most recent event and it's position, or None if there are no
|
||||
events in the room that match the given event types.
|
||||
"""
|
||||
|
||||
def _get_last_event_pos_in_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
event_type_clause = ""
|
||||
event_type_args: List[str] = []
|
||||
if event_types is not None and len(event_types) > 0:
|
||||
event_type_clause, event_type_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "type", event_types
|
||||
)
|
||||
event_type_clause = f"AND {event_type_clause}"
|
||||
|
||||
sql = f"""
|
||||
SELECT event_id, stream_ordering, instance_name
|
||||
FROM events
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE room_id = ?
|
||||
{event_type_clause}
|
||||
AND NOT outlier
|
||||
AND rejections.event_id IS NULL
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
[room_id] + event_type_args,
|
||||
)
|
||||
|
||||
row = cast(Optional[Tuple[str, int, str]], txn.fetchone())
|
||||
if row is not None:
|
||||
event_id, stream_ordering, instance_name = row
|
||||
|
||||
return event_id, PersistedEventPosition(
|
||||
# If instance_name is null we default to "master"
|
||||
instance_name or "master",
|
||||
stream_ordering,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_last_event_pos_in_room",
|
||||
_get_last_event_pos_in_room_txn,
|
||||
)
|
||||
|
||||
@trace
|
||||
async def get_last_event_pos_in_room_before_stream_ordering(
|
||||
self,
|
||||
room_id: str,
|
||||
end_token: RoomStreamToken,
|
||||
event_types: Optional[Collection[str]] = None,
|
||||
event_types: Optional[StrCollection] = None,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
"""
|
||||
Returns the ID and event position of the last event in a room at or before a
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 86 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 87 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
|
@ -142,6 +142,10 @@ Changes in SCHEMA_VERSION = 85
|
|||
|
||||
Changes in SCHEMA_VERSION = 86
|
||||
- Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
|
||||
|
||||
Changes in SCHEMA_VERSION = 87
|
||||
- Add tables to store Sliding Sync data for quick filtering/sorting
|
||||
(`sliding_sync_joined_rooms`, `sliding_sync_membership_snapshots`)
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2024 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- This table is a list/queue used to keep track of which rooms need to be inserted into
|
||||
-- `sliding_sync_joined_rooms`. We do this to avoid reading from `current_state_events`
|
||||
-- during the background update to populate `sliding_sync_joined_rooms` which works but
|
||||
-- it takes a lot of work for the database to grab `DISTINCT` room_ids given how many
|
||||
-- state events there are for each room.
|
||||
--
|
||||
-- This table is prefilled with every room in the `rooms` table (see the
|
||||
-- `sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update` background
|
||||
-- update). This table is also updated whenever we come across stale data so that we can
|
||||
-- catch-up with all of the new data if Synapse was downgraded (see
|
||||
-- `_resolve_stale_data_in_sliding_sync_tables`).
|
||||
--
|
||||
-- FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
||||
-- foreground update for
|
||||
-- `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
||||
-- https://github.com/element-hq/synapse/issues/17623)
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms_to_recalculate(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
PRIMARY KEY (room_id)
|
||||
);
|
||||
|
||||
-- A table for storing room meta data (current state relevant to sliding sync) that the
|
||||
-- local server is still participating in (someone local is joined to the room).
|
||||
--
|
||||
-- We store the joined rooms in separate table from `sliding_sync_membership_snapshots`
|
||||
-- because we need up-to-date information for joined rooms and it can be shared across
|
||||
-- everyone who is joined.
|
||||
--
|
||||
-- This table is kept in sync with `current_state_events` which means if the server is
|
||||
-- no longer participating in a room, the row will be deleted.
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
-- The `stream_ordering` of the most-recent/latest event in the room
|
||||
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||
-- The `stream_ordering` of the last event according to the `bump_event_types`
|
||||
bump_stamp BIGINT,
|
||||
-- `m.room.create` -> `content.type` (current state)
|
||||
--
|
||||
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||
room_type TEXT,
|
||||
-- `m.room.name` -> `content.name` (current state)
|
||||
--
|
||||
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||
room_name TEXT,
|
||||
-- `m.room.encryption` -> `content.algorithm` (current state)
|
||||
--
|
||||
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||
tombstone_successor_room_id TEXT,
|
||||
PRIMARY KEY (room_id)
|
||||
);
|
||||
|
||||
-- So we can purge rooms easily.
|
||||
--
|
||||
-- The primary key is already `room_id`
|
||||
|
||||
-- So we can sort by `stream_ordering
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_joined_rooms_event_stream_ordering ON sliding_sync_joined_rooms(event_stream_ordering);
|
||||
|
||||
-- A table for storing a snapshot of room meta data (historical current state relevant
|
||||
-- for sliding sync) at the time of a local user's membership. Only has rows for the
|
||||
-- latest membership event for a given local user in a room which matches
|
||||
-- `local_current_membership` .
|
||||
--
|
||||
-- We store all memberships including joins. This makes it easy to reference this table
|
||||
-- to find all membership for a given user and shares the same semantics as
|
||||
-- `local_current_membership`. And we get to avoid some table maintenance; if we only
|
||||
-- stored non-joins, we would have to delete the row for the user when the user joins
|
||||
-- the room. Stripped state doesn't include the `m.room.tombstone` event, so we just
|
||||
-- assume that the room doesn't have a tombstone.
|
||||
--
|
||||
-- For remote invite/knocks where the server is not participating in the room, we will
|
||||
-- use stripped state events to populate this table. We assume that if any stripped
|
||||
-- state is given, it will include all possible stripped state events types. For
|
||||
-- example, if stripped state is given but `m.room.encryption` isn't included, we will
|
||||
-- assume that the room is not encrypted.
|
||||
--
|
||||
-- We don't include `bump_stamp` here because we can just use the `stream_ordering` from
|
||||
-- the membership event itself as the `bump_stamp`.
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_membership_snapshots(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
user_id TEXT NOT NULL,
|
||||
-- Useful to be able to tell leaves from kicks (where the `user_id` is different from the `sender`)
|
||||
sender TEXT NOT NULL,
|
||||
membership_event_id TEXT NOT NULL REFERENCES events(event_id),
|
||||
membership TEXT NOT NULL,
|
||||
-- This is an integer just to match `room_memberships` and also means we don't need
|
||||
-- to do any casting.
|
||||
forgotten INTEGER DEFAULT 0 NOT NULL,
|
||||
-- `stream_ordering` of the `membership_event_id`
|
||||
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||
-- `instance_name` of the worker that persisted the `membership_event_id`.
|
||||
-- Useful for crafting `PersistedEventPosition(...)`
|
||||
event_instance_name TEXT NOT NULL,
|
||||
-- For remote invites/knocks that don't include any stripped state, we want to be
|
||||
-- able to distinguish between a room with `None` as valid value for some state and
|
||||
-- room where the state is completely unknown. Basically, this should be True unless
|
||||
-- no stripped state was provided for a remote invite/knock (False).
|
||||
has_known_state BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.create` -> `content.type` (according to the current state at the time of
|
||||
-- the membership).
|
||||
--
|
||||
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||
room_type TEXT,
|
||||
-- `m.room.name` -> `content.name` (according to the current state at the time of
|
||||
-- the membership).
|
||||
--
|
||||
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||
room_name TEXT,
|
||||
-- `m.room.encryption` -> `content.algorithm` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||
tombstone_successor_room_id TEXT,
|
||||
PRIMARY KEY (room_id, user_id)
|
||||
);
|
||||
|
||||
-- So we can purge rooms easily.
|
||||
--
|
||||
-- Since we're using a multi-column index as the primary key (room_id, user_id), the
|
||||
-- first index column (room_id) is always usable for searching so we don't need to
|
||||
-- create a separate index for it.
|
||||
--
|
||||
-- CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_room_id ON sliding_sync_membership_snapshots(room_id);
|
||||
|
||||
-- So we can fetch all rooms for a given user
|
||||
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id);
|
||||
-- So we can sort by `stream_ordering
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream_ordering ON sliding_sync_membership_snapshots(event_stream_ordering);
|
||||
|
||||
|
||||
-- Add a series of background updates to populate the new `sliding_sync_joined_rooms` table:
|
||||
--
|
||||
-- 1. Add a background update to prefill `sliding_sync_joined_rooms_to_recalculate`.
|
||||
-- We do a one-shot bulk insert from the `rooms` table to prefill.
|
||||
-- 2. Add a background update to populate the new `sliding_sync_joined_rooms` table
|
||||
-- based on the rooms listed in the `sliding_sync_joined_rooms_to_recalculate`
|
||||
-- table.
|
||||
--
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{}');
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
(8701, 'sliding_sync_joined_rooms_bg_update', '{}', 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update');
|
||||
|
||||
-- Add a background updates to populate the new `sliding_sync_membership_snapshots` table
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8701, 'sliding_sync_membership_snapshots_bg_update', '{}');
|
|
@ -30,6 +30,7 @@ if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
|||
else:
|
||||
from pydantic import Extra
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
|
@ -45,6 +46,18 @@ from synapse.types.rest.client import SlidingSyncBody
|
|||
if TYPE_CHECKING:
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
|
||||
# Sliding Sync: The event types that clients should consider as new activity and affect
|
||||
# the `bump_stamp`
|
||||
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = {
|
||||
EventTypes.Create,
|
||||
EventTypes.Message,
|
||||
EventTypes.Encrypted,
|
||||
EventTypes.Sticker,
|
||||
EventTypes.CallInvite,
|
||||
EventTypes.PollStart,
|
||||
EventTypes.LiveLocationShareStart,
|
||||
}
|
||||
|
||||
|
||||
class ShutdownRoomParams(TypedDict):
|
||||
"""
|
||||
|
|
|
@ -16,7 +16,7 @@ import logging
|
|||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
|
@ -44,6 +44,10 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
def test_rooms_meta_when_joined(self) -> None:
|
||||
"""
|
||||
|
@ -600,16 +604,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
Test that `bump_stamp` ignores backfilled events, i.e. events with a
|
||||
negative stream ordering.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
creator = "@user:other"
|
||||
room_id = "!foo:other"
|
||||
room_version = RoomVersions.V10
|
||||
shared_kwargs = {
|
||||
"room_id": room_id,
|
||||
"room_version": "10",
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
create_tuple = self.get_success(
|
||||
|
@ -618,6 +622,12 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
prev_event_ids=[],
|
||||
type=EventTypes.Create,
|
||||
state_key="",
|
||||
content={
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: creator,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
|
@ -667,22 +677,29 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
]
|
||||
|
||||
# Ensure the local HS knows the room version
|
||||
self.get_success(
|
||||
self.store.store_room(room_id, creator, False, RoomVersions.V10)
|
||||
)
|
||||
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||
|
||||
# Persist these events as backfilled events.
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(persistence.persist_event(event, context, backfilled=True))
|
||||
self.get_success(
|
||||
self.persistence.persist_event(event, context, backfilled=True)
|
||||
)
|
||||
|
||||
# Now we join the local user to the room
|
||||
join_tuple = self.get_success(
|
||||
# Now we join the local user to the room. We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||
# process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[invite_tuple[0].event_id],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
|
@ -691,7 +708,22 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
self.get_success(persistence.persist_event(*join_tuple))
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id
|
||||
for e in [create_tuple[0], invite_tuple[0]]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# Doing an SS request should return a positive `bump_stamp`, even though
|
||||
# the only event that matches the bump types has as negative stream
|
||||
|
|
|
@ -112,6 +112,24 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase):
|
|||
{(1, "user1", "hello"), (2, "user2", "bleb")},
|
||||
)
|
||||
|
||||
self.get_success(
|
||||
self.storage.db_pool.runInteraction(
|
||||
"test",
|
||||
self.storage.db_pool.simple_upsert_many_txn,
|
||||
self.table_name,
|
||||
key_names=key_names,
|
||||
key_values=[[2, "user2"]],
|
||||
value_names=[],
|
||||
value_values=[],
|
||||
)
|
||||
)
|
||||
|
||||
# Check results are what we expect
|
||||
self.assertEqual(
|
||||
set(self._dump_table_to_tuple()),
|
||||
{(1, "user1", "hello"), (2, "user2", "bleb")},
|
||||
)
|
||||
|
||||
def test_simple_update_many(self) -> None:
|
||||
"""
|
||||
simple_update_many performs many updates at once.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
@ -35,6 +36,8 @@ from synapse.util import Clock
|
|||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExtremPruneTestCase(HomeserverTestCase):
|
||||
servlets = [
|
||||
|
|
|
@ -24,7 +24,7 @@ from typing import List, Optional, Tuple, cast
|
|||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.api.constants import EventContentFields, EventTypes, JoinRules, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.admin import register_servlets_for_client_rest_resource
|
||||
|
@ -38,6 +38,7 @@ from synapse.util import Clock
|
|||
from tests import unittest
|
||||
from tests.server import TestHomeServer
|
||||
from tests.test_utils import event_injection
|
||||
from tests.test_utils.event_injection import create_event
|
||||
from tests.unittest import skip_unless
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -54,6 +55,10 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
|||
# We can't test the RoomMemberStore on its own without the other event
|
||||
# storage logic
|
||||
self.store = hs.get_datastores().main
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
self.u_alice = self.register_user("alice", "pass")
|
||||
self.t_alice = self.login("alice", "pass")
|
||||
|
@ -220,31 +225,166 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
|||
)
|
||||
|
||||
def test_join_locally_forgotten_room(self) -> None:
|
||||
"""Tests if a user joins a forgotten room the room is not forgotten anymore."""
|
||||
self.room = self.helper.create_room_as(self.u_alice, tok=self.t_alice)
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
)
|
||||
"""
|
||||
Tests if a user joins a forgotten room, the room is not forgotten anymore.
|
||||
|
||||
# after leaving and forget the room, it is forgotten
|
||||
self.get_success(
|
||||
event_injection.inject_member_event(
|
||||
self.hs, self.room, self.u_alice, "leave"
|
||||
Since a room can't be re-joined if everyone has left. This can only happen with
|
||||
a room with remote users in it.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
creator = "@user:other"
|
||||
room_id = "!foo:other"
|
||||
room_version = RoomVersions.V10
|
||||
shared_kwargs = {
|
||||
"room_id": room_id,
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
create_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[],
|
||||
type=EventTypes.Create,
|
||||
state_key="",
|
||||
content={
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: creator,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
self.get_success(self.store.forget(self.u_alice, self.room))
|
||||
self.assertTrue(
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
)
|
||||
|
||||
# after rejoin the room is not forgotten anymore
|
||||
self.get_success(
|
||||
event_injection.inject_member_event(
|
||||
self.hs, self.room, self.u_alice, "join"
|
||||
creator_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[create_tuple[0].event_id],
|
||||
auth_event_ids=[create_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=creator,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
remote_events_and_contexts = [
|
||||
create_tuple,
|
||||
creator_tuple,
|
||||
]
|
||||
|
||||
# Ensure the local HS knows the room version
|
||||
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||
|
||||
# Persist these events as backfilled events.
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(
|
||||
self.persistence.persist_event(event, context, backfilled=True)
|
||||
)
|
||||
|
||||
# Now we join the local user to the room. We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||
# process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[creator_tuple[0].event_id],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[create_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=user1_id,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id for e in [create_tuple[0]]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# The room shouldn't be forgotten because the local user just joined
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||
)
|
||||
|
||||
# After all of the local users (there is only user1) leave and forgetting the
|
||||
# room, it is forgotten
|
||||
user1_leave_response = self.helper.leave(room_id, user1_id, tok=user1_tok)
|
||||
user1_leave_event = self.get_success(
|
||||
self.store.get_event(user1_leave_response["event_id"])
|
||||
)
|
||||
self.get_success(self.store.forget(user1_id, room_id))
|
||||
self.assertTrue(self.get_success(self.store.is_locally_forgotten_room(room_id)))
|
||||
|
||||
# Join the local user to the room (again). We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `event_injection.inject_member_event(...)` shortcut but it would be nice to
|
||||
# use the real remote join process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[user1_leave_response["event_id"]],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[
|
||||
create_tuple[0].event_id,
|
||||
user1_leave_response["event_id"],
|
||||
],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=user1_id,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id
|
||||
for e in [create_tuple[0], user1_leave_event]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# After the local user rejoins the remote room, it isn't forgotten anymore
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||
)
|
||||
|
||||
|
||||
|
|
4830
tests/storage/test_sliding_sync_tables.py
Normal file
4830
tests/storage/test_sliding_sync_tables.py
Normal file
File diff suppressed because it is too large
Load diff
|
@ -272,8 +272,8 @@ class TestCase(unittest.TestCase):
|
|||
|
||||
def assertIncludes(
|
||||
self,
|
||||
actual_items: AbstractSet[str],
|
||||
expected_items: AbstractSet[str],
|
||||
actual_items: AbstractSet[TV],
|
||||
expected_items: AbstractSet[TV],
|
||||
exact: bool = False,
|
||||
message: Optional[str] = None,
|
||||
) -> None:
|
||||
|
|
Loading…
Reference in a new issue