mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-18 07:33:49 +01:00
Split up txn for fetching device keys (#15215)
We look up keys in batches, but we should do that outside of the transaction to avoid starving the database pool.
This commit is contained in:
parent
41f127e068
commit
c69aae94cd
3 changed files with 26 additions and 9 deletions
1
changelog.d/15215.misc
Normal file
1
changelog.d/15215.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Refactor database transaction for query users' devices to reduce database pool contention.
|
|
@ -672,7 +672,15 @@ class DatabasePool:
|
||||||
f = cast(types.FunctionType, func) # type: ignore[redundant-cast]
|
f = cast(types.FunctionType, func) # type: ignore[redundant-cast]
|
||||||
if f.__closure__:
|
if f.__closure__:
|
||||||
for i, cell in enumerate(f.__closure__):
|
for i, cell in enumerate(f.__closure__):
|
||||||
if inspect.isgenerator(cell.cell_contents):
|
try:
|
||||||
|
contents = cell.cell_contents
|
||||||
|
except ValueError:
|
||||||
|
# cell.cell_contents can raise if the "cell" is empty,
|
||||||
|
# which indicates that the variable is currently
|
||||||
|
# unbound.
|
||||||
|
continue
|
||||||
|
|
||||||
|
if inspect.isgenerator(contents):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Programming error: function %s references generator %s "
|
"Programming error: function %s references generator %s "
|
||||||
"via its closure",
|
"via its closure",
|
||||||
|
|
|
@ -244,9 +244,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||||
set_tag("include_all_devices", include_all_devices)
|
set_tag("include_all_devices", include_all_devices)
|
||||||
set_tag("include_deleted_devices", include_deleted_devices)
|
set_tag("include_deleted_devices", include_deleted_devices)
|
||||||
|
|
||||||
result = await self.db_pool.runInteraction(
|
result = await self._get_e2e_device_keys(
|
||||||
"get_e2e_device_keys",
|
|
||||||
self._get_e2e_device_keys_txn,
|
|
||||||
query_list,
|
query_list,
|
||||||
include_all_devices,
|
include_all_devices,
|
||||||
include_deleted_devices,
|
include_deleted_devices,
|
||||||
|
@ -285,9 +283,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||||
log_kv(result)
|
log_kv(result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def _get_e2e_device_keys_txn(
|
async def _get_e2e_device_keys(
|
||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
|
||||||
query_list: Collection[Tuple[str, Optional[str]]],
|
query_list: Collection[Tuple[str, Optional[str]]],
|
||||||
include_all_devices: bool = False,
|
include_all_devices: bool = False,
|
||||||
include_deleted_devices: bool = False,
|
include_deleted_devices: bool = False,
|
||||||
|
@ -319,7 +316,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||||
|
|
||||||
if user_list:
|
if user_list:
|
||||||
user_id_in_list_clause, user_args = make_in_list_sql_clause(
|
user_id_in_list_clause, user_args = make_in_list_sql_clause(
|
||||||
txn.database_engine, "user_id", user_list
|
self.database_engine, "user_id", user_list
|
||||||
)
|
)
|
||||||
query_clauses.append(user_id_in_list_clause)
|
query_clauses.append(user_id_in_list_clause)
|
||||||
query_params_list.append(user_args)
|
query_params_list.append(user_args)
|
||||||
|
@ -332,13 +329,16 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||||
user_device_id_in_list_clause,
|
user_device_id_in_list_clause,
|
||||||
user_device_args,
|
user_device_args,
|
||||||
) = make_tuple_in_list_sql_clause(
|
) = make_tuple_in_list_sql_clause(
|
||||||
txn.database_engine, ("user_id", "device_id"), user_device_batch
|
self.database_engine, ("user_id", "device_id"), user_device_batch
|
||||||
)
|
)
|
||||||
query_clauses.append(user_device_id_in_list_clause)
|
query_clauses.append(user_device_id_in_list_clause)
|
||||||
query_params_list.append(user_device_args)
|
query_params_list.append(user_device_args)
|
||||||
|
|
||||||
result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
|
result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
|
||||||
for query_clause, query_params in zip(query_clauses, query_params_list):
|
|
||||||
|
def get_e2e_device_keys_txn(
|
||||||
|
txn: LoggingTransaction, query_clause: str, query_params: list
|
||||||
|
) -> None:
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT user_id, device_id, "
|
"SELECT user_id, device_id, "
|
||||||
" d.display_name, "
|
" d.display_name, "
|
||||||
|
@ -361,6 +361,14 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||||
display_name, db_to_json(key_json) if key_json else None
|
display_name, db_to_json(key_json) if key_json else None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for query_clause, query_params in zip(query_clauses, query_params_list):
|
||||||
|
await self.db_pool.runInteraction(
|
||||||
|
"_get_e2e_device_keys",
|
||||||
|
get_e2e_device_keys_txn,
|
||||||
|
query_clause,
|
||||||
|
query_params,
|
||||||
|
)
|
||||||
|
|
||||||
if include_deleted_devices:
|
if include_deleted_devices:
|
||||||
for user_id, device_id in deleted_devices:
|
for user_id, device_id in deleted_devices:
|
||||||
if device_id is None:
|
if device_id is None:
|
||||||
|
|
Loading…
Reference in a new issue