0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-12 04:52:26 +01:00

Sliding Sync: Speed up getting receipts for initial rooms (#17592)

Let's only pull out the events we care about. Note that the index isn't
necessary here, as postgres is happy to scan the set of rooms for the
events.
This commit is contained in:
Erik Johnston 2024-08-20 12:57:34 +01:00 committed by GitHub
parent 10428046e4
commit f1e8d2d15a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 108 additions and 29 deletions

1
changelog.d/17592.misc Normal file
View file

@ -0,0 +1 @@
Correctly track read receipts that should be sent down in experimental sliding sync.

View file

@ -3088,38 +3088,17 @@ class SlidingSyncHandler:
# from that room but we only want to include receipts for events # from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response # in the timeline to avoid bloating and blowing up the sync response
# as the number of users in the room increases. (this behavior is part of the spec) # as the number of users in the room increases. (this behavior is part of the spec)
initial_rooms = { initial_rooms_and_event_ids = [
room_id (room_id, event.event_id)
for room_id in initial_rooms for room_id in initial_rooms
if room_id in actual_room_response_map if room_id in actual_room_response_map
} for event in actual_room_response_map[room_id].timeline_events
if initial_rooms: ]
initial_receipts = await self.store.get_linearized_receipts_for_rooms( if initial_rooms_and_event_ids:
room_ids=initial_rooms, initial_receipts = await self.store.get_linearized_receipts_for_events(
to_key=to_token.receipt_key, room_and_event_ids=initial_rooms_and_event_ids,
) )
fetched_receipts.extend(initial_receipts)
for receipt in initial_receipts:
relevant_event_ids = {
event.event_id
for event in actual_room_response_map[
receipt["room_id"]
].timeline_events
}
content = {
event_id: content_value
for event_id, content_value in receipt["content"].items()
if event_id in relevant_event_ids
}
if content:
fetched_receipts.append(
{
"type": receipt["type"],
"room_id": receipt["room_id"],
"content": content,
}
)
fetched_receipts = ReceiptEventSource.filter_out_private_receipts( fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string() fetched_receipts, sync_config.user.to_string()

View file

@ -43,6 +43,7 @@ from synapse.storage.database import (
DatabasePool, DatabasePool,
LoggingDatabaseConnection, LoggingDatabaseConnection,
LoggingTransaction, LoggingTransaction,
make_tuple_in_list_sql_clause,
) )
from synapse.storage.engines._base import IsolationLevel from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.storage.util.id_generators import MultiWriterIdGenerator
@ -481,6 +482,83 @@ class ReceiptsWorkerStore(SQLBaseStore):
} }
return results return results
async def get_linearized_receipts_for_events(
self,
room_and_event_ids: Collection[Tuple[str, str]],
) -> Sequence[JsonMapping]:
"""Get all receipts for the given set of events.
Arguments:
room_and_event_ids: A collection of 2-tuples of room ID and
event IDs to fetch receipts for
Returns:
A list of receipts, one per room.
"""
def get_linearized_receipts_for_events_txn(
txn: LoggingTransaction,
room_id_event_id_tuples: Collection[Tuple[str, str]],
) -> List[Tuple[str, str, str, str, Optional[str], str]]:
clause, args = make_tuple_in_list_sql_clause(
self.database_engine, ("room_id", "event_id"), room_id_event_id_tuples
)
sql = f"""
SELECT room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE {clause}
"""
txn.execute(sql, args)
return txn.fetchall()
# room_id -> event_id -> receipt_type -> user_id -> receipt data
room_to_content: Dict[str, Dict[str, Dict[str, Dict[str, JsonMapping]]]] = {}
for batch in batch_iter(room_and_event_ids, 1000):
batch_results = await self.db_pool.runInteraction(
"get_linearized_receipts_for_events",
get_linearized_receipts_for_events_txn,
batch,
)
for (
room_id,
receipt_type,
user_id,
event_id,
thread_id,
data,
) in batch_results:
content = room_to_content.setdefault(room_id, {})
user_receipts = content.setdefault(event_id, {}).setdefault(
receipt_type, {}
)
receipt_data = db_to_json(data)
if thread_id is not None:
receipt_data["thread_id"] = thread_id
# MSC4102: always replace threaded receipts with unthreaded ones
# if there is a clash. Specifically:
# - if there is no existing receipt, great, set the data.
# - if there is an existing receipt, is it threaded (thread_id
# present)? YES: replace if this receipt has no thread id.
# NO: do not replace. This means we will drop some receipts, but
# MSC4102 is designed to drop semantically meaningless receipts,
# so this is okay. Previously, we would drop meaningful data!
if user_id in user_receipts:
if "thread_id" in user_receipts[user_id] and not thread_id:
user_receipts[user_id] = receipt_data
else:
user_receipts[user_id] = receipt_data
return [
{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}
for room_id, content in room_to_content.items()
]
@cached( @cached(
num_args=2, num_args=2,
) )
@ -996,6 +1074,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME, self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
self._background_receipts_graph_unique_index, self._background_receipts_graph_unique_index,
) )
self.db_pool.updates.register_background_index_update(
update_name="receipts_room_id_event_id_index",
index_name="receipts_linearized_event_id",
table="receipts_linearized",
columns=("room_id", "event_id"),
)
async def _populate_receipt_event_stream_ordering( async def _populate_receipt_event_stream_ordering(
self, progress: JsonDict, batch_size: int self, progress: JsonDict, batch_size: int

View file

@ -0,0 +1,15 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8602, 'receipts_room_id_event_id_index', '{}');