diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 30818aad65..e5ecfc1f3e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -11,15 +11,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc import logging -from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional +from typing import ( + TYPE_CHECKING, + AsyncContextManager, + Awaitable, + Callable, + Dict, + Iterable, + Optional, +) import attr from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.types import Connection from synapse.types import JsonDict -from synapse.util import json_encoder +from synapse.util import Clock, json_encoder from . import engines @@ -46,6 +55,120 @@ class _BackgroundUpdateHandler: oneshot: bool = False +class BackgroundUpdateController(abc.ABC): + """A base class for controlling background update timings.""" + + #### + # NOTE: This is used by modules so changes must be backwards compatible or + # be announced appropriately + #### + + @abc.abstractmethod + def run_update( + self, update_name: str, database_name: str, oneshot: bool + ) -> AsyncContextManager[int]: + """Called before we do the next iteration of a background update. The + returned async context manager is immediately entered and then exited + after this iteration of the background update has finished. + + Implementations will likely want to sleep for a period of time to stop + the background update from continuously being run. + + Args: + update_name: The name of the update that is to be run + database_name: The name of the database the background update is + being run on. Really only useful if Synapse is configured with + multiple databases. + oneshot: Whether the update will complete all in one go, e.g. + index creation. In such cases the returned target duration is + ignored. + + Returns: + The target duration in milliseconds that the background update + should run for. + + Note: this is a *target*, and an iteration may take substantially + longer or shorter. + """ + ... + + @abc.abstractmethod + async def default_batch_size(self, update_name: str, database_name: str) -> int: + """The batch size to use for the first iteration of a new background + update. + """ + ... + + @abc.abstractmethod + async def min_batch_size(self, update_name: str, database_name: str) -> int: + """A lower bound on the batch size of a new background update. + + Used to ensure that progress is always made. Must be greater than 0. + """ + ... + + +class _TimeBasedBackgroundUpdateController(BackgroundUpdateController): + """The default controller which aims to spend X ms doing the background + update every Y ms. + """ + + MINIMUM_BACKGROUND_BATCH_SIZE = 100 + DEFAULT_BACKGROUND_BATCH_SIZE = 100 + + BACKGROUND_UPDATE_INTERVAL_MS = 1000 + BACKGROUND_UPDATE_DURATION_MS = 100 + + def __init__(self, clock: Clock): + self._clock = clock + + def run_update( + self, + update_name: str, + database_name: str, + oneshot: bool, + ) -> AsyncContextManager[int]: + return self + + async def default_batch_size(self, update_name: str, database_name: str) -> int: + return self.DEFAULT_BACKGROUND_BATCH_SIZE + + async def min_batch_size(self, update_name: str, database_name: str) -> int: + return self.MINIMUM_BACKGROUND_BATCH_SIZE + + async def __aenter__(self) -> int: + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000) + return self.BACKGROUND_UPDATE_DURATION_MS + + async def __aexit__(self, *exc): + pass + + +class _ImmediateBackgroundUpdateController(BackgroundUpdateController): + """A background update controller that doesn't ever wait, effectively + running the background updates as quickly as possible""" + + def run_update( + self, + update_name: str, + database_name: str, + oneshot: bool, + ) -> AsyncContextManager[int]: + return self + + async def default_batch_size(self, update_name: str, database_name: str) -> int: + return 100 + + async def min_batch_size(self, update_name: str, database_name: str) -> int: + return 100 + + async def __aenter__(self) -> int: + return 100 + + async def __aexit__(self, *exc): + pass + + class BackgroundUpdatePerformance: """Tracks the how long a background update is taking to update its items""" @@ -100,18 +223,17 @@ class BackgroundUpdater: process and autotuning the batch size. """ - MINIMUM_BACKGROUND_BATCH_SIZE = 100 - DEFAULT_BACKGROUND_BATCH_SIZE = 100 - BACKGROUND_UPDATE_INTERVAL_MS = 1000 - BACKGROUND_UPDATE_DURATION_MS = 100 - def __init__(self, hs: "HomeServer", database: "DatabasePool"): self._clock = hs.get_clock() self.db_pool = database + self._database_name = database.name() + # if a background update is currently running, its name. self._current_background_update: Optional[str] = None + self._controller = _TimeBasedBackgroundUpdateController(self._clock) + self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {} self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {} self._all_done = False @@ -149,13 +271,8 @@ class BackgroundUpdater: try: logger.info("Starting background schema updates") while self.enabled: - if sleep: - await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) - try: - result = await self.do_next_background_update( - self.BACKGROUND_UPDATE_DURATION_MS - ) + result = await self.do_next_background_update(sleep) except Exception: logger.exception("Error doing update") else: @@ -217,13 +334,15 @@ class BackgroundUpdater: return not update_exists - async def do_next_background_update(self, desired_duration_ms: float) -> bool: + async def do_next_background_update(self, sleep: bool = True) -> bool: """Does some amount of work on the next queued background update Returns once some amount of work is done. Args: - desired_duration_ms: How long we want to spend updating. + sleep: Whether to limit how quickly we run background updates or + not. + Returns: True if we have finished running all the background updates, otherwise False """ @@ -266,7 +385,25 @@ class BackgroundUpdater: self._current_background_update = upd["update_name"] - await self._do_background_update(desired_duration_ms) + # We have a background update to run, otherwise we would have returned + # early. + assert self._current_background_update is not None + update_info = self._background_update_handlers[self._current_background_update] + + if sleep: + controller = self._controller + else: + # If `sleep` is False then we want to run the updates as quickly as + # possible. + controller = _ImmediateBackgroundUpdateController() + + async with controller.run_update( + update_name=self._current_background_update, + database_name=self._database_name, + oneshot=update_info.oneshot, + ) as desired_duration_ms: + await self._do_background_update(desired_duration_ms) + return False async def _do_background_update(self, desired_duration_ms: float) -> int: @@ -287,9 +424,14 @@ class BackgroundUpdater: if items_per_ms is not None: batch_size = int(desired_duration_ms * items_per_ms) # Clamp the batch size so that we always make progress - batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE) + batch_size = max( + batch_size, + await self._controller.min_batch_size(update_name, self._database_name), + ) else: - batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE + batch_size = await self._controller.default_batch_size( + update_name, self._database_name + ) progress_json = await self.db_pool.simple_select_one_onecol( "background_updates", @@ -308,6 +450,8 @@ class BackgroundUpdater: duration_ms = time_stop - time_start + performance.update(items_updated, duration_ms) + logger.info( "Running background update %r. Processed %r items in %rms." " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)", @@ -320,8 +464,6 @@ class BackgroundUpdater: batch_size, ) - performance.update(items_updated, duration_ms) - return len(self._background_update_performance) def register_background_update_handler( diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 90f800e564..ad7a2bf334 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -408,13 +408,9 @@ class EmailPusherTests(HomeserverTestCase): self.hs.get_datastore().db_pool.updates._all_done = False # Now let's actually drive the updates to completion - while not self.get_success( - self.hs.get_datastore().db_pool.updates.has_completed_background_updates() - ): - self.get_success( - self.hs.get_datastore().db_pool.updates.do_next_background_update(100), - by=0.1, - ) + self.get_success( + self.hs.get_datastore().db_pool.updates.run_background_updates(False) + ) # Check that all pushers with unlinked addresses were deleted pushers = self.get_success( diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 0da42b5ac5..6ad783d253 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -20,10 +20,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): def test_do_background_update(self): # the time we claim each update takes - duration_ms = 42 + duration_ms = 0.2 # the target runtime for each bg update - target_background_update_duration_ms = 50000 + target_background_update_duration_ms = 100 store = self.hs.get_datastore() self.get_success( @@ -48,17 +48,13 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() res = self.get_success( - self.updates.do_next_background_update( - target_background_update_duration_ms - ), - by=0.1, + self.updates.do_next_background_update(False), + by=0.01, ) self.assertFalse(res) # on the first call, we should get run with the default background update size - self.update_handler.assert_called_once_with( - {"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE - ) + self.update_handler.assert_called_once_with({"my_key": 1}, 100) # second step: complete the update # we should now get run with a much bigger number of items to update @@ -74,16 +70,12 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() - result = self.get_success( - self.updates.do_next_background_update(target_background_update_duration_ms) - ) + result = self.get_success(self.updates.do_next_background_update(False)) self.assertFalse(result) self.update_handler.assert_called_once() # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = self.get_success( - self.updates.do_next_background_update(target_background_update_duration_ms) - ) + result = self.get_success(self.updates.do_next_background_update(False)) self.assertTrue(result) self.assertFalse(self.update_handler.called) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index b31c5eb5ec..7b7f6c349e 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(False), by=0.1 ) # Ensure that we did actually take multiple iterations to process the @@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(False), by=0.1 ) # Ensure that we did actually take multiple iterations to process the diff --git a/tests/unittest.py b/tests/unittest.py index a9b60b7eeb..a5a8ec0593 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -319,12 +319,7 @@ class HomeserverTestCase(TestCase): def wait_for_background_updates(self) -> None: """Block until all background database updates have completed.""" - while not self.get_success( - self.store.db_pool.updates.has_completed_background_updates() - ): - self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 - ) + self.get_success(self.store.db_pool.updates.run_background_updates(False)) def make_homeserver(self, reactor, clock): """ @@ -484,8 +479,7 @@ class HomeserverTestCase(TestCase): async def run_bg_updates(): with LoggingContext("run_bg_updates"): - while not await stor.db_pool.updates.has_completed_background_updates(): - await stor.db_pool.updates.do_next_background_update(1) + self.get_success(stor.db_pool.updates.run_background_updates(False)) hs = setup_test_homeserver(self.addCleanup, *args, **kwargs) stor = hs.get_datastore()