forked from MirrorHub/synapse
Convert runInteraction to async/await (#8156)
This commit is contained in:
parent
112266eafd
commit
912e024913
2 changed files with 15 additions and 15 deletions
1
changelog.d/8156.misc
Normal file
1
changelog.d/8156.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Convert various parts of the codebase to async/await.
|
|
@ -28,6 +28,7 @@ from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
Tuple,
|
Tuple,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
|
cast,
|
||||||
overload,
|
overload,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -35,7 +36,6 @@ from prometheus_client import Histogram
|
||||||
from typing_extensions import Literal
|
from typing_extensions import Literal
|
||||||
|
|
||||||
from twisted.enterprise import adbapi
|
from twisted.enterprise import adbapi
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.config.database import DatabaseConnectionConfig
|
from synapse.config.database import DatabaseConnectionConfig
|
||||||
|
@ -507,8 +507,9 @@ class DatabasePool(object):
|
||||||
self._txn_perf_counters.update(desc, duration)
|
self._txn_perf_counters.update(desc, duration)
|
||||||
sql_txn_timer.labels(desc).observe(duration)
|
sql_txn_timer.labels(desc).observe(duration)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
async def runInteraction(
|
||||||
def runInteraction(self, desc: str, func: Callable, *args: Any, **kwargs: Any):
|
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
|
||||||
|
) -> R:
|
||||||
"""Starts a transaction on the database and runs a given function
|
"""Starts a transaction on the database and runs a given function
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
|
@ -521,7 +522,7 @@ class DatabasePool(object):
|
||||||
kwargs: named args to pass to `func`
|
kwargs: named args to pass to `func`
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: The result of func
|
The result of func
|
||||||
"""
|
"""
|
||||||
after_callbacks = [] # type: List[_CallbackListEntry]
|
after_callbacks = [] # type: List[_CallbackListEntry]
|
||||||
exception_callbacks = [] # type: List[_CallbackListEntry]
|
exception_callbacks = [] # type: List[_CallbackListEntry]
|
||||||
|
@ -530,8 +531,7 @@ class DatabasePool(object):
|
||||||
logger.warning("Starting db txn '%s' from sentinel context", desc)
|
logger.warning("Starting db txn '%s' from sentinel context", desc)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = yield defer.ensureDeferred(
|
result = await self.runWithConnection(
|
||||||
self.runWithConnection(
|
|
||||||
self.new_transaction,
|
self.new_transaction,
|
||||||
desc,
|
desc,
|
||||||
after_callbacks,
|
after_callbacks,
|
||||||
|
@ -540,7 +540,6 @@ class DatabasePool(object):
|
||||||
*args,
|
*args,
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
for after_callback, after_args, after_kwargs in after_callbacks:
|
for after_callback, after_args, after_kwargs in after_callbacks:
|
||||||
after_callback(*after_args, **after_kwargs)
|
after_callback(*after_args, **after_kwargs)
|
||||||
|
@ -549,7 +548,7 @@ class DatabasePool(object):
|
||||||
after_callback(*after_args, **after_kwargs)
|
after_callback(*after_args, **after_kwargs)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
return result
|
return cast(R, result)
|
||||||
|
|
||||||
async def runWithConnection(
|
async def runWithConnection(
|
||||||
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
|
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
|
||||||
|
|
Loading…
Reference in a new issue