From 2a0f86f88fdb3d450212541ba7db57df6a184ae3 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Fri, 15 Sep 2023 03:16:45 -0500 Subject: [PATCH] Convert `_insert_graph_receipts_txn` to `simple_upsert` (#16299) --- changelog.d/16299.misc | 1 + synapse/storage/database.py | 3 +++ synapse/storage/databases/main/receipts.py | 23 +++++++++------------- 3 files changed, 13 insertions(+), 14 deletions(-) create mode 100644 changelog.d/16299.misc diff --git a/changelog.d/16299.misc b/changelog.d/16299.misc new file mode 100644 index 000000000..d45466915 --- /dev/null +++ b/changelog.d/16299.misc @@ -0,0 +1 @@ +Refactor `receipts_graph` Postgres transactions to stop error messages. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 6c5fcdcec..697bc5651 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1193,6 +1193,7 @@ class DatabasePool: keyvalues: Dict[str, Any], values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, + where_clause: Optional[str] = None, desc: str = "simple_upsert", ) -> bool: """Insert a row with values + insertion_values; on conflict, update with values. @@ -1243,6 +1244,7 @@ class DatabasePool: keyvalues: The unique key columns and their new values values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting + where_clause: An index predicate to apply to the upsert. desc: description of the transaction, for logging and metrics Returns: Returns True if a row was inserted or updated (i.e. if `values` is @@ -1263,6 +1265,7 @@ class DatabasePool: keyvalues, values, insertion_values, + where_clause, db_autocommit=autocommit, ) except self.engine.module.IntegrityError as e: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index e4d10ff25..a074c4398 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -795,9 +795,7 @@ class ReceiptsWorkerStore(SQLBaseStore): now - event_ts, ) - await self.db_pool.runInteraction( - "insert_graph_receipt", - self._insert_graph_receipt_txn, + await self._insert_graph_receipt( room_id, receipt_type, user_id, @@ -810,9 +808,8 @@ class ReceiptsWorkerStore(SQLBaseStore): return stream_id, max_persisted_id - def _insert_graph_receipt_txn( + async def _insert_graph_receipt( self, - txn: LoggingTransaction, room_id: str, receipt_type: str, user_id: str, @@ -822,13 +819,6 @@ class ReceiptsWorkerStore(SQLBaseStore): ) -> None: assert self._can_write_to_receipts - txn.call_after( - self._get_receipts_for_user_with_orderings.invalidate, - (user_id, receipt_type), - ) - # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) - keyvalues = { "room_id": room_id, "receipt_type": receipt_type, @@ -840,8 +830,8 @@ class ReceiptsWorkerStore(SQLBaseStore): else: keyvalues["thread_id"] = thread_id - self.db_pool.simple_upsert_txn( - txn, + await self.db_pool.simple_upsert( + desc="insert_graph_receipt", table="receipts_graph", keyvalues=keyvalues, values={ @@ -851,6 +841,11 @@ class ReceiptsWorkerStore(SQLBaseStore): where_clause=where_clause, ) + self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type)) + + # FIXME: This shouldn't invalidate the whole cache + self._get_linearized_receipts_for_room.invalidate((room_id,)) + class ReceiptsBackgroundUpdateStore(SQLBaseStore): POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"