Compare commits
6 commits
develop
...
erikj/bg_u
Author | SHA1 | Date | |
---|---|---|---|
c7f1498e35 | |||
dccddf15b0 | |||
0ace9f8d85 | |||
0c3ba88496 | |||
957da6f56e | |||
4a1a8321dd |
1
changelog.d/11306.feature
Normal file
1
changelog.d/11306.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add new plugin support for controlling background update timings.
|
4
setup.py
4
setup.py
|
@ -118,7 +118,9 @@ CONDITIONAL_REQUIREMENTS["mypy"] = [
|
|||
# Tests assume that all optional dependencies are installed.
|
||||
#
|
||||
# parameterized_class decorator was introduced in parameterized 0.7.0
|
||||
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0"]
|
||||
#
|
||||
# We use `mock` library as that backports `AsyncMock` to Python 3.6
|
||||
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0", "mock>=4.0.0"]
|
||||
|
||||
CONDITIONAL_REQUIREMENTS["dev"] = (
|
||||
CONDITIONAL_REQUIREMENTS["lint"]
|
||||
|
|
|
@ -48,6 +48,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
|
|||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.client.login import LoginResponse
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.background_updates import BackgroundUpdateController
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main.roommember import ProfileInfo
|
||||
from synapse.storage.state import StateFilter
|
||||
|
@ -92,6 +93,7 @@ __all__ = [
|
|||
"JsonDict",
|
||||
"EventBase",
|
||||
"StateMap",
|
||||
"BackgroundUpdateController",
|
||||
]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -212,6 +214,21 @@ class ModuleApi:
|
|||
"""
|
||||
self._hs.register_module_web_resource(path, resource)
|
||||
|
||||
def register_background_update_controller(
|
||||
self,
|
||||
controller: BackgroundUpdateController,
|
||||
) -> None:
|
||||
"""Registers a background update controller.
|
||||
|
||||
Added in v1.48.0
|
||||
|
||||
Args:
|
||||
controller: The controller to use.
|
||||
"""
|
||||
|
||||
for db in self._hs.get_datastores().databases:
|
||||
db.updates.register_update_controller(controller)
|
||||
|
||||
#########################################################################
|
||||
# The following methods can be called by the module at any point in time.
|
||||
|
||||
|
@ -859,6 +876,11 @@ class ModuleApi:
|
|||
f,
|
||||
)
|
||||
|
||||
async def sleep(self, seconds: float) -> None:
|
||||
"""Sleeps for the given number of seconds."""
|
||||
|
||||
await self._clock.sleep(seconds)
|
||||
|
||||
async def send_mail(
|
||||
self,
|
||||
recipient: str,
|
||||
|
|
|
@ -11,13 +11,24 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import abc
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AsyncContextManager,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
Optional,
|
||||
)
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.types import Connection
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util import Clock, json_encoder
|
||||
|
||||
from . import engines
|
||||
|
||||
|
@ -28,6 +39,136 @@ if TYPE_CHECKING:
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _BackgroundUpdateHandler:
|
||||
"""A handler for a given background update.
|
||||
|
||||
Attributes:
|
||||
callback: The function to call to make progress on the background
|
||||
update.
|
||||
oneshot: Wether the update is likely to happen all in one go, ignoring
|
||||
the supplied target duration, e.g. index creation. This is used by
|
||||
the update controller to help correctly schedule the update.
|
||||
"""
|
||||
|
||||
callback: Callable[[JsonDict, int], Awaitable[int]]
|
||||
oneshot: bool = False
|
||||
|
||||
|
||||
class BackgroundUpdateController(abc.ABC):
|
||||
"""A base class for controlling background update timings."""
|
||||
|
||||
####
|
||||
# NOTE: This is used by modules so changes must be backwards compatible or
|
||||
# be announced appropriately
|
||||
####
|
||||
|
||||
@abc.abstractmethod
|
||||
def run_update(
|
||||
self, update_name: str, database_name: str, oneshot: bool
|
||||
) -> AsyncContextManager[int]:
|
||||
"""Called before we do the next iteration of a background update. The
|
||||
returned async context manager is immediately entered and then exited
|
||||
after this iteration of the background update has finished.
|
||||
|
||||
Implementations will likely want to sleep for a period of time to stop
|
||||
the background update from continuously being run.
|
||||
|
||||
Args:
|
||||
update_name: The name of the update that is to be run
|
||||
database_name: The name of the database the background update is
|
||||
being run on. Really only useful if Synapse is configured with
|
||||
multiple databases.
|
||||
oneshot: Whether the update will complete all in one go, e.g.
|
||||
index creation. In such cases the returned target duration is
|
||||
ignored.
|
||||
|
||||
Returns:
|
||||
The target duration in milliseconds that the background update
|
||||
should run for.
|
||||
|
||||
Note: this is a *target*, and an iteration may take substantially
|
||||
longer or shorter.
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
async def default_batch_size(self, update_name: str, database_name: str) -> int:
|
||||
"""The batch size to use for the first iteration of a new background
|
||||
update.
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
async def min_batch_size(self, update_name: str, database_name: str) -> int:
|
||||
"""A lower bound on the batch size of a new background update.
|
||||
|
||||
Used to ensure that progress is always made. Must be greater than 0.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class _TimeBasedBackgroundUpdateController(BackgroundUpdateController):
|
||||
"""The default controller which aims to spend X ms doing the background
|
||||
update every Y ms.
|
||||
"""
|
||||
|
||||
MINIMUM_BACKGROUND_BATCH_SIZE = 100
|
||||
DEFAULT_BACKGROUND_BATCH_SIZE = 100
|
||||
|
||||
BACKGROUND_UPDATE_INTERVAL_MS = 1000
|
||||
BACKGROUND_UPDATE_DURATION_MS = 100
|
||||
|
||||
def __init__(self, clock: Clock):
|
||||
self._clock = clock
|
||||
|
||||
def run_update(
|
||||
self,
|
||||
update_name: str,
|
||||
database_name: str,
|
||||
oneshot: bool,
|
||||
) -> AsyncContextManager[int]:
|
||||
return self
|
||||
|
||||
async def default_batch_size(self, update_name: str, database_name: str) -> int:
|
||||
return self.DEFAULT_BACKGROUND_BATCH_SIZE
|
||||
|
||||
async def min_batch_size(self, update_name: str, database_name: str) -> int:
|
||||
return self.MINIMUM_BACKGROUND_BATCH_SIZE
|
||||
|
||||
async def __aenter__(self) -> int:
|
||||
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000)
|
||||
return self.BACKGROUND_UPDATE_DURATION_MS
|
||||
|
||||
async def __aexit__(self, *exc):
|
||||
pass
|
||||
|
||||
|
||||
class _ImmediateBackgroundUpdateController(BackgroundUpdateController):
|
||||
"""A background update controller that doesn't ever wait, effectively
|
||||
running the background updates as quickly as possible"""
|
||||
|
||||
def run_update(
|
||||
self,
|
||||
update_name: str,
|
||||
database_name: str,
|
||||
oneshot: bool,
|
||||
) -> AsyncContextManager[int]:
|
||||
return self
|
||||
|
||||
async def default_batch_size(self, update_name: str, database_name: str) -> int:
|
||||
return 100
|
||||
|
||||
async def min_batch_size(self, update_name: str, database_name: str) -> int:
|
||||
return 100
|
||||
|
||||
async def __aenter__(self) -> int:
|
||||
return 100
|
||||
|
||||
async def __aexit__(self, *exc):
|
||||
pass
|
||||
|
||||
|
||||
class BackgroundUpdatePerformance:
|
||||
"""Tracks the how long a background update is taking to update its items"""
|
||||
|
||||
|
@ -82,22 +223,21 @@ class BackgroundUpdater:
|
|||
process and autotuning the batch size.
|
||||
"""
|
||||
|
||||
MINIMUM_BACKGROUND_BATCH_SIZE = 100
|
||||
DEFAULT_BACKGROUND_BATCH_SIZE = 100
|
||||
BACKGROUND_UPDATE_INTERVAL_MS = 1000
|
||||
BACKGROUND_UPDATE_DURATION_MS = 100
|
||||
|
||||
def __init__(self, hs: "HomeServer", database: "DatabasePool"):
|
||||
self._clock = hs.get_clock()
|
||||
self.db_pool = database
|
||||
|
||||
self._database_name = database.name()
|
||||
|
||||
# if a background update is currently running, its name.
|
||||
self._current_background_update: Optional[str] = None
|
||||
|
||||
self._controller: BackgroundUpdateController = (
|
||||
_TimeBasedBackgroundUpdateController(self._clock)
|
||||
)
|
||||
|
||||
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
|
||||
self._background_update_handlers: Dict[
|
||||
str, Callable[[JsonDict, int], Awaitable[int]]
|
||||
] = {}
|
||||
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
|
||||
self._all_done = False
|
||||
|
||||
# Whether we're currently running updates
|
||||
|
@ -107,6 +247,13 @@ class BackgroundUpdater:
|
|||
# enable/disable background updates via the admin API.
|
||||
self.enabled = True
|
||||
|
||||
def register_update_controller(
|
||||
self, controller: BackgroundUpdateController
|
||||
) -> None:
|
||||
"""Register a new background update controller to use."""
|
||||
|
||||
self._controller = controller
|
||||
|
||||
def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
|
||||
"""Returns the current background update, if any."""
|
||||
|
||||
|
@ -133,13 +280,8 @@ class BackgroundUpdater:
|
|||
try:
|
||||
logger.info("Starting background schema updates")
|
||||
while self.enabled:
|
||||
if sleep:
|
||||
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
|
||||
|
||||
try:
|
||||
result = await self.do_next_background_update(
|
||||
self.BACKGROUND_UPDATE_DURATION_MS
|
||||
)
|
||||
result = await self.do_next_background_update(sleep)
|
||||
except Exception:
|
||||
logger.exception("Error doing update")
|
||||
else:
|
||||
|
@ -201,13 +343,15 @@ class BackgroundUpdater:
|
|||
|
||||
return not update_exists
|
||||
|
||||
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
|
||||
async def do_next_background_update(self, sleep: bool = True) -> bool:
|
||||
"""Does some amount of work on the next queued background update
|
||||
|
||||
Returns once some amount of work is done.
|
||||
|
||||
Args:
|
||||
desired_duration_ms: How long we want to spend updating.
|
||||
sleep: Whether to limit how quickly we run background updates or
|
||||
not.
|
||||
|
||||
Returns:
|
||||
True if we have finished running all the background updates, otherwise False
|
||||
"""
|
||||
|
@ -250,7 +394,25 @@ class BackgroundUpdater:
|
|||
|
||||
self._current_background_update = upd["update_name"]
|
||||
|
||||
# We have a background update to run, otherwise we would have returned
|
||||
# early.
|
||||
assert self._current_background_update is not None
|
||||
update_info = self._background_update_handlers[self._current_background_update]
|
||||
|
||||
if sleep:
|
||||
controller = self._controller
|
||||
else:
|
||||
# If `sleep` is False then we want to run the updates as quickly as
|
||||
# possible.
|
||||
controller = _ImmediateBackgroundUpdateController()
|
||||
|
||||
async with controller.run_update(
|
||||
update_name=self._current_background_update,
|
||||
database_name=self._database_name,
|
||||
oneshot=update_info.oneshot,
|
||||
) as desired_duration_ms:
|
||||
await self._do_background_update(desired_duration_ms)
|
||||
|
||||
return False
|
||||
|
||||
async def _do_background_update(self, desired_duration_ms: float) -> int:
|
||||
|
@ -258,7 +420,7 @@ class BackgroundUpdater:
|
|||
update_name = self._current_background_update
|
||||
logger.info("Starting update batch on background update '%s'", update_name)
|
||||
|
||||
update_handler = self._background_update_handlers[update_name]
|
||||
update_handler = self._background_update_handlers[update_name].callback
|
||||
|
||||
performance = self._background_update_performance.get(update_name)
|
||||
|
||||
|
@ -271,9 +433,14 @@ class BackgroundUpdater:
|
|||
if items_per_ms is not None:
|
||||
batch_size = int(desired_duration_ms * items_per_ms)
|
||||
# Clamp the batch size so that we always make progress
|
||||
batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
|
||||
batch_size = max(
|
||||
batch_size,
|
||||
await self._controller.min_batch_size(update_name, self._database_name),
|
||||
)
|
||||
else:
|
||||
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
|
||||
batch_size = await self._controller.default_batch_size(
|
||||
update_name, self._database_name
|
||||
)
|
||||
|
||||
progress_json = await self.db_pool.simple_select_one_onecol(
|
||||
"background_updates",
|
||||
|
@ -292,6 +459,8 @@ class BackgroundUpdater:
|
|||
|
||||
duration_ms = time_stop - time_start
|
||||
|
||||
performance.update(items_updated, duration_ms)
|
||||
|
||||
logger.info(
|
||||
"Running background update %r. Processed %r items in %rms."
|
||||
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
|
||||
|
@ -304,8 +473,6 @@ class BackgroundUpdater:
|
|||
batch_size,
|
||||
)
|
||||
|
||||
performance.update(items_updated, duration_ms)
|
||||
|
||||
return len(self._background_update_performance)
|
||||
|
||||
def register_background_update_handler(
|
||||
|
@ -329,7 +496,9 @@ class BackgroundUpdater:
|
|||
update_name: The name of the update that this code handles.
|
||||
update_handler: The function that does the update.
|
||||
"""
|
||||
self._background_update_handlers[update_name] = update_handler
|
||||
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
|
||||
update_handler
|
||||
)
|
||||
|
||||
def register_noop_background_update(self, update_name: str) -> None:
|
||||
"""Register a noop handler for a background update.
|
||||
|
@ -451,7 +620,9 @@ class BackgroundUpdater:
|
|||
await self._end_background_update(update_name)
|
||||
return 1
|
||||
|
||||
self.register_background_update_handler(update_name, updater)
|
||||
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
|
||||
updater, oneshot=True
|
||||
)
|
||||
|
||||
async def _end_background_update(self, update_name: str) -> None:
|
||||
"""Removes a completed background update task from the queue.
|
||||
|
|
|
@ -408,12 +408,8 @@ class EmailPusherTests(HomeserverTestCase):
|
|||
self.hs.get_datastore().db_pool.updates._all_done = False
|
||||
|
||||
# Now let's actually drive the updates to completion
|
||||
while not self.get_success(
|
||||
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
|
||||
):
|
||||
self.get_success(
|
||||
self.hs.get_datastore().db_pool.updates.do_next_background_update(100),
|
||||
by=0.1,
|
||||
self.hs.get_datastore().db_pool.updates.run_background_updates(False)
|
||||
)
|
||||
|
||||
# Check that all pushers with unlinked addresses were deleted
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
from unittest.mock import Mock
|
||||
from mock import AsyncMock, Mock
|
||||
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
from twisted.internet.defer import Deferred, ensureDeferred
|
||||
|
||||
from synapse.storage.background_updates import (
|
||||
BackgroundUpdateController,
|
||||
BackgroundUpdater,
|
||||
)
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
@ -20,10 +25,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
def test_do_background_update(self):
|
||||
# the time we claim each update takes
|
||||
duration_ms = 42
|
||||
duration_ms = 0.2
|
||||
|
||||
# the target runtime for each bg update
|
||||
target_background_update_duration_ms = 50000
|
||||
target_background_update_duration_ms = 100
|
||||
|
||||
store = self.hs.get_datastore()
|
||||
self.get_success(
|
||||
|
@ -48,17 +53,13 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
|||
self.update_handler.side_effect = update
|
||||
self.update_handler.reset_mock()
|
||||
res = self.get_success(
|
||||
self.updates.do_next_background_update(
|
||||
target_background_update_duration_ms
|
||||
),
|
||||
by=0.1,
|
||||
self.updates.do_next_background_update(False),
|
||||
by=0.01,
|
||||
)
|
||||
self.assertFalse(res)
|
||||
|
||||
# on the first call, we should get run with the default background update size
|
||||
self.update_handler.assert_called_once_with(
|
||||
{"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE
|
||||
)
|
||||
self.update_handler.assert_called_once_with({"my_key": 1}, 100)
|
||||
|
||||
# second step: complete the update
|
||||
# we should now get run with a much bigger number of items to update
|
||||
|
@ -74,16 +75,80 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
self.update_handler.side_effect = update
|
||||
self.update_handler.reset_mock()
|
||||
result = self.get_success(
|
||||
self.updates.do_next_background_update(target_background_update_duration_ms)
|
||||
)
|
||||
result = self.get_success(self.updates.do_next_background_update(False))
|
||||
self.assertFalse(result)
|
||||
self.update_handler.assert_called_once()
|
||||
|
||||
# third step: we don't expect to be called any more
|
||||
self.update_handler.reset_mock()
|
||||
result = self.get_success(
|
||||
self.updates.do_next_background_update(target_background_update_duration_ms)
|
||||
)
|
||||
result = self.get_success(self.updates.do_next_background_update(False))
|
||||
self.assertTrue(result)
|
||||
self.assertFalse(self.update_handler.called)
|
||||
|
||||
|
||||
class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
|
||||
# the base test class should have run the real bg updates for us
|
||||
self.assertTrue(
|
||||
self.get_success(self.updates.has_completed_background_updates())
|
||||
)
|
||||
|
||||
self.update_deferred = Deferred()
|
||||
self.update_handler = Mock(return_value=self.update_deferred)
|
||||
self.updates.register_background_update_handler(
|
||||
"test_update", self.update_handler
|
||||
)
|
||||
|
||||
self._controller_ctx_mgr = AsyncMock(name="_controller_ctx_mgr")
|
||||
self._controller = AsyncMock(BackgroundUpdateController)
|
||||
self._controller.run_update.return_value = self._controller_ctx_mgr
|
||||
|
||||
self.updates.register_update_controller(self._controller)
|
||||
|
||||
def test_controller(self):
|
||||
store = self.hs.get_datastore()
|
||||
self.get_success(
|
||||
store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
values={"update_name": "test_update", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
|
||||
default_batch_size = 100
|
||||
|
||||
# Set up the return values of the controller.
|
||||
enter_defer = Deferred()
|
||||
self._controller_ctx_mgr.__aenter__ = Mock(return_value=enter_defer)
|
||||
self._controller.default_batch_size.return_value = default_batch_size
|
||||
self._controller.min_batch_size.return_value = default_batch_size
|
||||
|
||||
# Start the background update.
|
||||
do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
|
||||
|
||||
self.pump()
|
||||
|
||||
# `run_update` should have been called, but the update handler won't be
|
||||
# called until the `enter_defer` (returned by `__aenter__`) is resolved.
|
||||
self._controller.run_update.assert_called_once_with(
|
||||
update_name="test_update",
|
||||
database_name="master",
|
||||
oneshot=False,
|
||||
)
|
||||
self.assertFalse(do_update_d.called)
|
||||
self.assertFalse(self.update_deferred.called)
|
||||
|
||||
# Resolving the `enter_defer` should call the update handler, which then
|
||||
# blocks.
|
||||
enter_defer.callback(100)
|
||||
self.pump()
|
||||
self.update_handler.assert_called_once_with({}, default_batch_size)
|
||||
self.assertFalse(self.update_deferred.called)
|
||||
self._controller_ctx_mgr.__aexit__.assert_not_awaited()
|
||||
|
||||
# Resolving the update handler deferred should cause the
|
||||
# `do_next_background_update` to finish and return
|
||||
self.update_deferred.callback(100)
|
||||
self.pump()
|
||||
self._controller_ctx_mgr.__aexit__.assert_awaited()
|
||||
self.get_success(do_update_d)
|
||||
|
|
|
@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
|
|||
):
|
||||
iterations += 1
|
||||
self.get_success(
|
||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
||||
self.store.db_pool.updates.do_next_background_update(False), by=0.1
|
||||
)
|
||||
|
||||
# Ensure that we did actually take multiple iterations to process the
|
||||
|
@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
|
|||
):
|
||||
iterations += 1
|
||||
self.get_success(
|
||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
||||
self.store.db_pool.updates.do_next_background_update(False), by=0.1
|
||||
)
|
||||
|
||||
# Ensure that we did actually take multiple iterations to process the
|
||||
|
|
|
@ -319,12 +319,7 @@ class HomeserverTestCase(TestCase):
|
|||
|
||||
def wait_for_background_updates(self) -> None:
|
||||
"""Block until all background database updates have completed."""
|
||||
while not self.get_success(
|
||||
self.store.db_pool.updates.has_completed_background_updates()
|
||||
):
|
||||
self.get_success(
|
||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
||||
)
|
||||
self.get_success(self.store.db_pool.updates.run_background_updates(False))
|
||||
|
||||
def make_homeserver(self, reactor, clock):
|
||||
"""
|
||||
|
@ -484,8 +479,7 @@ class HomeserverTestCase(TestCase):
|
|||
|
||||
async def run_bg_updates():
|
||||
with LoggingContext("run_bg_updates"):
|
||||
while not await stor.db_pool.updates.has_completed_background_updates():
|
||||
await stor.db_pool.updates.do_next_background_update(1)
|
||||
self.get_success(stor.db_pool.updates.run_background_updates(False))
|
||||
|
||||
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
|
||||
stor = hs.get_datastore()
|
||||
|
|
Loading…
Reference in a new issue