diff --git a/changelog.d/17592.misc b/changelog.d/17592.misc new file mode 100644 index 000000000..1b4a53ee1 --- /dev/null +++ b/changelog.d/17592.misc @@ -0,0 +1 @@ +Correctly track read receipts that should be sent down in experimental sliding sync. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 14d0ecbe1..af8d7ab96 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -3088,38 +3088,17 @@ class SlidingSyncHandler: # from that room but we only want to include receipts for events # in the timeline to avoid bloating and blowing up the sync response # as the number of users in the room increases. (this behavior is part of the spec) - initial_rooms = { - room_id + initial_rooms_and_event_ids = [ + (room_id, event.event_id) for room_id in initial_rooms if room_id in actual_room_response_map - } - if initial_rooms: - initial_receipts = await self.store.get_linearized_receipts_for_rooms( - room_ids=initial_rooms, - to_key=to_token.receipt_key, + for event in actual_room_response_map[room_id].timeline_events + ] + if initial_rooms_and_event_ids: + initial_receipts = await self.store.get_linearized_receipts_for_events( + room_and_event_ids=initial_rooms_and_event_ids, ) - - for receipt in initial_receipts: - relevant_event_ids = { - event.event_id - for event in actual_room_response_map[ - receipt["room_id"] - ].timeline_events - } - - content = { - event_id: content_value - for event_id, content_value in receipt["content"].items() - if event_id in relevant_event_ids - } - if content: - fetched_receipts.append( - { - "type": receipt["type"], - "room_id": receipt["room_id"], - "content": content, - } - ) + fetched_receipts.extend(initial_receipts) fetched_receipts = ReceiptEventSource.filter_out_private_receipts( fetched_receipts, sync_config.user.to_string() diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index e266cc2a2..0a20f5db4 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -43,6 +43,7 @@ from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + make_tuple_in_list_sql_clause, ) from synapse.storage.engines._base import IsolationLevel from synapse.storage.util.id_generators import MultiWriterIdGenerator @@ -481,6 +482,83 @@ class ReceiptsWorkerStore(SQLBaseStore): } return results + async def get_linearized_receipts_for_events( + self, + room_and_event_ids: Collection[Tuple[str, str]], + ) -> Sequence[JsonMapping]: + """Get all receipts for the given set of events. + + Arguments: + room_and_event_ids: A collection of 2-tuples of room ID and + event IDs to fetch receipts for + + Returns: + A list of receipts, one per room. + """ + + def get_linearized_receipts_for_events_txn( + txn: LoggingTransaction, + room_id_event_id_tuples: Collection[Tuple[str, str]], + ) -> List[Tuple[str, str, str, str, Optional[str], str]]: + clause, args = make_tuple_in_list_sql_clause( + self.database_engine, ("room_id", "event_id"), room_id_event_id_tuples + ) + + sql = f""" + SELECT room_id, receipt_type, user_id, event_id, thread_id, data + FROM receipts_linearized + WHERE {clause} + """ + + txn.execute(sql, args) + + return txn.fetchall() + + # room_id -> event_id -> receipt_type -> user_id -> receipt data + room_to_content: Dict[str, Dict[str, Dict[str, Dict[str, JsonMapping]]]] = {} + for batch in batch_iter(room_and_event_ids, 1000): + batch_results = await self.db_pool.runInteraction( + "get_linearized_receipts_for_events", + get_linearized_receipts_for_events_txn, + batch, + ) + + for ( + room_id, + receipt_type, + user_id, + event_id, + thread_id, + data, + ) in batch_results: + content = room_to_content.setdefault(room_id, {}) + user_receipts = content.setdefault(event_id, {}).setdefault( + receipt_type, {} + ) + + receipt_data = db_to_json(data) + if thread_id is not None: + receipt_data["thread_id"] = thread_id + + # MSC4102: always replace threaded receipts with unthreaded ones + # if there is a clash. Specifically: + # - if there is no existing receipt, great, set the data. + # - if there is an existing receipt, is it threaded (thread_id + # present)? YES: replace if this receipt has no thread id. + # NO: do not replace. This means we will drop some receipts, but + # MSC4102 is designed to drop semantically meaningless receipts, + # so this is okay. Previously, we would drop meaningful data! + if user_id in user_receipts: + if "thread_id" in user_receipts[user_id] and not thread_id: + user_receipts[user_id] = receipt_data + else: + user_receipts[user_id] = receipt_data + + return [ + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": content} + for room_id, content in room_to_content.items() + ] + @cached( num_args=2, ) @@ -996,6 +1074,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME, self._background_receipts_graph_unique_index, ) + self.db_pool.updates.register_background_index_update( + update_name="receipts_room_id_event_id_index", + index_name="receipts_linearized_event_id", + table="receipts_linearized", + columns=("room_id", "event_id"), + ) async def _populate_receipt_event_stream_ordering( self, progress: JsonDict, batch_size: int diff --git a/synapse/storage/schema/main/delta/86/02_receipts_event_id_index.sql b/synapse/storage/schema/main/delta/86/02_receipts_event_id_index.sql new file mode 100644 index 000000000..e6db91e5b --- /dev/null +++ b/synapse/storage/schema/main/delta/86/02_receipts_event_id_index.sql @@ -0,0 +1,15 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8602, 'receipts_room_id_event_id_index', '{}');