mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 16:03:51 +01:00
Convert _insert_graph_receipts_txn
to simple_upsert
(#16299)
This commit is contained in:
parent
edec0b93ca
commit
2a0f86f88f
3 changed files with 13 additions and 14 deletions
1
changelog.d/16299.misc
Normal file
1
changelog.d/16299.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Refactor `receipts_graph` Postgres transactions to stop error messages.
|
|
@ -1193,6 +1193,7 @@ class DatabasePool:
|
||||||
keyvalues: Dict[str, Any],
|
keyvalues: Dict[str, Any],
|
||||||
values: Dict[str, Any],
|
values: Dict[str, Any],
|
||||||
insertion_values: Optional[Dict[str, Any]] = None,
|
insertion_values: Optional[Dict[str, Any]] = None,
|
||||||
|
where_clause: Optional[str] = None,
|
||||||
desc: str = "simple_upsert",
|
desc: str = "simple_upsert",
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Insert a row with values + insertion_values; on conflict, update with values.
|
"""Insert a row with values + insertion_values; on conflict, update with values.
|
||||||
|
@ -1243,6 +1244,7 @@ class DatabasePool:
|
||||||
keyvalues: The unique key columns and their new values
|
keyvalues: The unique key columns and their new values
|
||||||
values: The nonunique columns and their new values
|
values: The nonunique columns and their new values
|
||||||
insertion_values: additional key/values to use only when inserting
|
insertion_values: additional key/values to use only when inserting
|
||||||
|
where_clause: An index predicate to apply to the upsert.
|
||||||
desc: description of the transaction, for logging and metrics
|
desc: description of the transaction, for logging and metrics
|
||||||
Returns:
|
Returns:
|
||||||
Returns True if a row was inserted or updated (i.e. if `values` is
|
Returns True if a row was inserted or updated (i.e. if `values` is
|
||||||
|
@ -1263,6 +1265,7 @@ class DatabasePool:
|
||||||
keyvalues,
|
keyvalues,
|
||||||
values,
|
values,
|
||||||
insertion_values,
|
insertion_values,
|
||||||
|
where_clause,
|
||||||
db_autocommit=autocommit,
|
db_autocommit=autocommit,
|
||||||
)
|
)
|
||||||
except self.engine.module.IntegrityError as e:
|
except self.engine.module.IntegrityError as e:
|
||||||
|
|
|
@ -795,9 +795,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||||
now - event_ts,
|
now - event_ts,
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
await self._insert_graph_receipt(
|
||||||
"insert_graph_receipt",
|
|
||||||
self._insert_graph_receipt_txn,
|
|
||||||
room_id,
|
room_id,
|
||||||
receipt_type,
|
receipt_type,
|
||||||
user_id,
|
user_id,
|
||||||
|
@ -810,9 +808,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
return stream_id, max_persisted_id
|
return stream_id, max_persisted_id
|
||||||
|
|
||||||
def _insert_graph_receipt_txn(
|
async def _insert_graph_receipt(
|
||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
|
||||||
room_id: str,
|
room_id: str,
|
||||||
receipt_type: str,
|
receipt_type: str,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
|
@ -822,13 +819,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||||
) -> None:
|
) -> None:
|
||||||
assert self._can_write_to_receipts
|
assert self._can_write_to_receipts
|
||||||
|
|
||||||
txn.call_after(
|
|
||||||
self._get_receipts_for_user_with_orderings.invalidate,
|
|
||||||
(user_id, receipt_type),
|
|
||||||
)
|
|
||||||
# FIXME: This shouldn't invalidate the whole cache
|
|
||||||
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
|
|
||||||
|
|
||||||
keyvalues = {
|
keyvalues = {
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
"receipt_type": receipt_type,
|
"receipt_type": receipt_type,
|
||||||
|
@ -840,8 +830,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||||
else:
|
else:
|
||||||
keyvalues["thread_id"] = thread_id
|
keyvalues["thread_id"] = thread_id
|
||||||
|
|
||||||
self.db_pool.simple_upsert_txn(
|
await self.db_pool.simple_upsert(
|
||||||
txn,
|
desc="insert_graph_receipt",
|
||||||
table="receipts_graph",
|
table="receipts_graph",
|
||||||
keyvalues=keyvalues,
|
keyvalues=keyvalues,
|
||||||
values={
|
values={
|
||||||
|
@ -851,6 +841,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||||
where_clause=where_clause,
|
where_clause=where_clause,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
|
||||||
|
|
||||||
|
# FIXME: This shouldn't invalidate the whole cache
|
||||||
|
self._get_linearized_receipts_for_room.invalidate((room_id,))
|
||||||
|
|
||||||
|
|
||||||
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
||||||
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
|
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
|
||||||
|
|
Loading…
Reference in a new issue