mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 02:33:47 +01:00
Use autocommit mode for single statement DB functions. (#8542)
Autocommit means that we don't wrap the functions in transactions, and instead get executed directly. Introduced in #8456. This will help: 1. reduce the number of `could not serialize access due to concurrent delete` errors that we see (though there are a few functions that often cause serialization errors that we don't fix here); 2. improve the DB performance, as it no longer needs to deal with the overhead of `REPEATABLE READ` isolation levels; and 3. improve wall clock speed of these functions, as we no longer need to send `BEGIN` and `COMMIT` to the DB. Some notes about the differences between autocommit mode and our default `REPEATABLE READ` transactions: 1. Currently `autocommit` only applies when using PostgreSQL, and is ignored when using SQLite (due to silliness with [Twisted DB classes](https://twistedmatrix.com/trac/ticket/9998)). 2. Autocommit functions may get retried on error, which means they can get applied *twice* (or more) to the DB (since they are not in a transaction the previous call would not get rolled back). This means that the functions need to be idempotent (or otherwise not care about being called multiple times). Read queries, simple deletes, and updates/upserts that replace rows (rather than generating new values from existing rows) are all idempotent. 3. Autocommit functions no longer get executed in [`REPEATABLE READ`](https://www.postgresql.org/docs/current/transaction-iso.html) isolation level, and so data can change queries, which is fine for single statement queries.
This commit is contained in:
parent
618d405a32
commit
19b15d63e8
5 changed files with 155 additions and 69 deletions
1
changelog.d/8542.misc
Normal file
1
changelog.d/8542.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve database performance by executing more queries without starting transactions.
|
|
@ -893,6 +893,12 @@ class DatabasePool:
|
|||
attempts = 0
|
||||
while True:
|
||||
try:
|
||||
# We can autocommit if we are going to use native upserts
|
||||
autocommit = (
|
||||
self.engine.can_native_upsert
|
||||
and table not in self._unsafe_to_upsert_tables
|
||||
)
|
||||
|
||||
return await self.runInteraction(
|
||||
desc,
|
||||
self.simple_upsert_txn,
|
||||
|
@ -901,6 +907,7 @@ class DatabasePool:
|
|||
values,
|
||||
insertion_values,
|
||||
lock=lock,
|
||||
db_autocommit=autocommit,
|
||||
)
|
||||
except self.engine.module.IntegrityError as e:
|
||||
attempts += 1
|
||||
|
@ -1063,6 +1070,43 @@ class DatabasePool:
|
|||
)
|
||||
txn.execute(sql, list(allvalues.values()))
|
||||
|
||||
async def simple_upsert_many(
|
||||
self,
|
||||
table: str,
|
||||
key_names: Collection[str],
|
||||
key_values: Collection[Iterable[Any]],
|
||||
value_names: Collection[str],
|
||||
value_values: Iterable[Iterable[Any]],
|
||||
desc: str,
|
||||
) -> None:
|
||||
"""
|
||||
Upsert, many times.
|
||||
|
||||
Args:
|
||||
table: The table to upsert into
|
||||
key_names: The key column names.
|
||||
key_values: A list of each row's key column values.
|
||||
value_names: The value column names
|
||||
value_values: A list of each row's value column values.
|
||||
Ignored if value_names is empty.
|
||||
"""
|
||||
|
||||
# We can autocommit if we are going to use native upserts
|
||||
autocommit = (
|
||||
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
|
||||
)
|
||||
|
||||
return await self.runInteraction(
|
||||
desc,
|
||||
self.simple_upsert_many_txn,
|
||||
table,
|
||||
key_names,
|
||||
key_values,
|
||||
value_names,
|
||||
value_values,
|
||||
db_autocommit=autocommit,
|
||||
)
|
||||
|
||||
def simple_upsert_many_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
|
@ -1214,7 +1258,13 @@ class DatabasePool:
|
|||
desc: description of the transaction, for logging and metrics
|
||||
"""
|
||||
return await self.runInteraction(
|
||||
desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
|
||||
desc,
|
||||
self.simple_select_one_txn,
|
||||
table,
|
||||
keyvalues,
|
||||
retcols,
|
||||
allow_none,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
@overload
|
||||
|
@ -1265,6 +1315,7 @@ class DatabasePool:
|
|||
keyvalues,
|
||||
retcol,
|
||||
allow_none=allow_none,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
@overload
|
||||
|
@ -1346,7 +1397,12 @@ class DatabasePool:
|
|||
Results in a list
|
||||
"""
|
||||
return await self.runInteraction(
|
||||
desc, self.simple_select_onecol_txn, table, keyvalues, retcol
|
||||
desc,
|
||||
self.simple_select_onecol_txn,
|
||||
table,
|
||||
keyvalues,
|
||||
retcol,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
async def simple_select_list(
|
||||
|
@ -1371,7 +1427,12 @@ class DatabasePool:
|
|||
A list of dictionaries.
|
||||
"""
|
||||
return await self.runInteraction(
|
||||
desc, self.simple_select_list_txn, table, keyvalues, retcols
|
||||
desc,
|
||||
self.simple_select_list_txn,
|
||||
table,
|
||||
keyvalues,
|
||||
retcols,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
@ -1450,6 +1511,7 @@ class DatabasePool:
|
|||
chunk,
|
||||
keyvalues,
|
||||
retcols,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
results.extend(rows)
|
||||
|
@ -1548,7 +1610,12 @@ class DatabasePool:
|
|||
desc: description of the transaction, for logging and metrics
|
||||
"""
|
||||
await self.runInteraction(
|
||||
desc, self.simple_update_one_txn, table, keyvalues, updatevalues
|
||||
desc,
|
||||
self.simple_update_one_txn,
|
||||
table,
|
||||
keyvalues,
|
||||
updatevalues,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
@ -1607,7 +1674,9 @@ class DatabasePool:
|
|||
keyvalues: dict of column names and values to select the row with
|
||||
desc: description of the transaction, for logging and metrics
|
||||
"""
|
||||
await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
|
||||
await self.runInteraction(
|
||||
desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def simple_delete_one_txn(
|
||||
|
@ -1646,7 +1715,9 @@ class DatabasePool:
|
|||
Returns:
|
||||
The number of deleted rows.
|
||||
"""
|
||||
return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
|
||||
return await self.runInteraction(
|
||||
desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def simple_delete_txn(
|
||||
|
@ -1694,7 +1765,13 @@ class DatabasePool:
|
|||
Number rows deleted
|
||||
"""
|
||||
return await self.runInteraction(
|
||||
desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
|
||||
desc,
|
||||
self.simple_delete_many_txn,
|
||||
table,
|
||||
column,
|
||||
iterable,
|
||||
keyvalues,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
@ -1860,7 +1937,13 @@ class DatabasePool:
|
|||
"""
|
||||
|
||||
return await self.runInteraction(
|
||||
desc, self.simple_search_list_txn, table, term, col, retcols
|
||||
desc,
|
||||
self.simple_search_list_txn,
|
||||
table,
|
||||
term,
|
||||
col,
|
||||
retcols,
|
||||
db_autocommit=True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -122,9 +122,7 @@ class KeyStore(SQLBaseStore):
|
|||
# param, which is itself the 2-tuple (server_name, key_id).
|
||||
invalidations.append((server_name, key_id))
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"store_server_verify_keys",
|
||||
self.db_pool.simple_upsert_many_txn,
|
||||
await self.db_pool.simple_upsert_many(
|
||||
table="server_signature_keys",
|
||||
key_names=("server_name", "key_id"),
|
||||
key_values=key_values,
|
||||
|
@ -135,6 +133,7 @@ class KeyStore(SQLBaseStore):
|
|||
"verify_key",
|
||||
),
|
||||
value_values=value_values,
|
||||
desc="store_server_verify_keys",
|
||||
)
|
||||
|
||||
invalidate = self._get_server_verify_key.invalidate
|
||||
|
|
|
@ -208,42 +208,56 @@ class TransactionStore(TransactionWorkerStore):
|
|||
"""
|
||||
|
||||
self._destination_retry_cache.pop(destination, None)
|
||||
return await self.db_pool.runInteraction(
|
||||
"set_destination_retry_timings",
|
||||
self._set_destination_retry_timings,
|
||||
destination,
|
||||
failure_ts,
|
||||
retry_last_ts,
|
||||
retry_interval,
|
||||
)
|
||||
if self.database_engine.can_native_upsert:
|
||||
return await self.db_pool.runInteraction(
|
||||
"set_destination_retry_timings",
|
||||
self._set_destination_retry_timings_native,
|
||||
destination,
|
||||
failure_ts,
|
||||
retry_last_ts,
|
||||
retry_interval,
|
||||
db_autocommit=True, # Safe as its a single upsert
|
||||
)
|
||||
else:
|
||||
return await self.db_pool.runInteraction(
|
||||
"set_destination_retry_timings",
|
||||
self._set_destination_retry_timings_emulated,
|
||||
destination,
|
||||
failure_ts,
|
||||
retry_last_ts,
|
||||
retry_interval,
|
||||
)
|
||||
|
||||
def _set_destination_retry_timings(
|
||||
def _set_destination_retry_timings_native(
|
||||
self, txn, destination, failure_ts, retry_last_ts, retry_interval
|
||||
):
|
||||
assert self.database_engine.can_native_upsert
|
||||
|
||||
if self.database_engine.can_native_upsert:
|
||||
# Upsert retry time interval if retry_interval is zero (i.e. we're
|
||||
# resetting it) or greater than the existing retry interval.
|
||||
# Upsert retry time interval if retry_interval is zero (i.e. we're
|
||||
# resetting it) or greater than the existing retry interval.
|
||||
#
|
||||
# WARNING: This is executed in autocommit, so we shouldn't add any more
|
||||
# SQL calls in here (without being very careful).
|
||||
sql = """
|
||||
INSERT INTO destinations (
|
||||
destination, failure_ts, retry_last_ts, retry_interval
|
||||
)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT (destination) DO UPDATE SET
|
||||
failure_ts = EXCLUDED.failure_ts,
|
||||
retry_last_ts = EXCLUDED.retry_last_ts,
|
||||
retry_interval = EXCLUDED.retry_interval
|
||||
WHERE
|
||||
EXCLUDED.retry_interval = 0
|
||||
OR destinations.retry_interval IS NULL
|
||||
OR destinations.retry_interval < EXCLUDED.retry_interval
|
||||
"""
|
||||
|
||||
sql = """
|
||||
INSERT INTO destinations (
|
||||
destination, failure_ts, retry_last_ts, retry_interval
|
||||
)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT (destination) DO UPDATE SET
|
||||
failure_ts = EXCLUDED.failure_ts,
|
||||
retry_last_ts = EXCLUDED.retry_last_ts,
|
||||
retry_interval = EXCLUDED.retry_interval
|
||||
WHERE
|
||||
EXCLUDED.retry_interval = 0
|
||||
OR destinations.retry_interval IS NULL
|
||||
OR destinations.retry_interval < EXCLUDED.retry_interval
|
||||
"""
|
||||
|
||||
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
|
||||
|
||||
return
|
||||
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
|
||||
|
||||
def _set_destination_retry_timings_emulated(
|
||||
self, txn, destination, failure_ts, retry_last_ts, retry_interval
|
||||
):
|
||||
self.database_engine.lock_table(txn, "destinations")
|
||||
|
||||
# We need to be careful here as the data may have changed from under us
|
||||
|
|
|
@ -480,21 +480,16 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
|||
user_id_tuples: iterable of 2-tuple of user IDs.
|
||||
"""
|
||||
|
||||
def _add_users_who_share_room_txn(txn):
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="users_who_share_private_rooms",
|
||||
key_names=["user_id", "other_user_id", "room_id"],
|
||||
key_values=[
|
||||
(user_id, other_user_id, room_id)
|
||||
for user_id, other_user_id in user_id_tuples
|
||||
],
|
||||
value_names=(),
|
||||
value_values=None,
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"add_users_who_share_room", _add_users_who_share_room_txn
|
||||
await self.db_pool.simple_upsert_many(
|
||||
table="users_who_share_private_rooms",
|
||||
key_names=["user_id", "other_user_id", "room_id"],
|
||||
key_values=[
|
||||
(user_id, other_user_id, room_id)
|
||||
for user_id, other_user_id in user_id_tuples
|
||||
],
|
||||
value_names=(),
|
||||
value_values=None,
|
||||
desc="add_users_who_share_room",
|
||||
)
|
||||
|
||||
async def add_users_in_public_rooms(
|
||||
|
@ -508,19 +503,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
|||
user_ids
|
||||
"""
|
||||
|
||||
def _add_users_in_public_rooms_txn(txn):
|
||||
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="users_in_public_rooms",
|
||||
key_names=["user_id", "room_id"],
|
||||
key_values=[(user_id, room_id) for user_id in user_ids],
|
||||
value_names=(),
|
||||
value_values=None,
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"add_users_in_public_rooms", _add_users_in_public_rooms_txn
|
||||
await self.db_pool.simple_upsert_many(
|
||||
table="users_in_public_rooms",
|
||||
key_names=["user_id", "room_id"],
|
||||
key_values=[(user_id, room_id) for user_id in user_ids],
|
||||
value_names=(),
|
||||
value_values=None,
|
||||
desc="add_users_in_public_rooms",
|
||||
)
|
||||
|
||||
async def delete_all_from_user_dir(self) -> None:
|
||||
|
|
Loading…
Reference in a new issue