Add a BackgroundUpdateController class.

This controls how often and for how long a background update is run for.

The idea is to allow the default one to be replaced by a pluggable
module.
This commit is contained in:
Erik Johnston 2021-11-09 11:50:01 +00:00
parent 4a1a8321dd
commit 957da6f56e
5 changed files with 176 additions and 52 deletions

View file

@ -11,15 +11,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional
from typing import (
TYPE_CHECKING,
AsyncContextManager,
Awaitable,
Callable,
Dict,
Iterable,
Optional,
)
import attr
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.types import Connection
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util import Clock, json_encoder
from . import engines
@ -46,6 +55,120 @@ class _BackgroundUpdateHandler:
oneshot: bool = False
class BackgroundUpdateController(abc.ABC):
"""A base class for controlling background update timings."""
####
# NOTE: This is used by modules so changes must be backwards compatible or
# be announced appropriately
####
@abc.abstractmethod
def run_update(
self, update_name: str, database_name: str, oneshot: bool
) -> AsyncContextManager[int]:
"""Called before we do the next iteration of a background update. The
returned async context manager is immediately entered and then exited
after this iteration of the background update has finished.
Implementations will likely want to sleep for a period of time to stop
the background update from continuously being run.
Args:
update_name: The name of the update that is to be run
database_name: The name of the database the background update is
being run on. Really only useful if Synapse is configured with
multiple databases.
oneshot: Whether the update will complete all in one go, e.g.
index creation. In such cases the returned target duration is
ignored.
Returns:
The target duration in milliseconds that the background update
should run for.
Note: this is a *target*, and an iteration may take substantially
longer or shorter.
"""
...
@abc.abstractmethod
async def default_batch_size(self, update_name: str, database_name: str) -> int:
"""The batch size to use for the first iteration of a new background
update.
"""
...
@abc.abstractmethod
async def min_batch_size(self, update_name: str, database_name: str) -> int:
"""A lower bound on the batch size of a new background update.
Used to ensure that progress is always made. Must be greater than 0.
"""
...
class _TimeBasedBackgroundUpdateController(BackgroundUpdateController):
"""The default controller which aims to spend X ms doing the background
update every Y ms.
"""
MINIMUM_BACKGROUND_BATCH_SIZE = 100
DEFAULT_BACKGROUND_BATCH_SIZE = 100
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, clock: Clock):
self._clock = clock
def run_update(
self,
update_name: str,
database_name: str,
oneshot: bool,
) -> AsyncContextManager[int]:
return self
async def default_batch_size(self, update_name: str, database_name: str) -> int:
return self.DEFAULT_BACKGROUND_BATCH_SIZE
async def min_batch_size(self, update_name: str, database_name: str) -> int:
return self.MINIMUM_BACKGROUND_BATCH_SIZE
async def __aenter__(self) -> int:
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000)
return self.BACKGROUND_UPDATE_DURATION_MS
async def __aexit__(self, *exc):
pass
class _ImmediateBackgroundUpdateController(BackgroundUpdateController):
"""A background update controller that doesn't ever wait, effectively
running the background updates as quickly as possible"""
def run_update(
self,
update_name: str,
database_name: str,
oneshot: bool,
) -> AsyncContextManager[int]:
return self
async def default_batch_size(self, update_name: str, database_name: str) -> int:
return 100
async def min_batch_size(self, update_name: str, database_name: str) -> int:
return 100
async def __aenter__(self) -> int:
return 100
async def __aexit__(self, *exc):
pass
class BackgroundUpdatePerformance:
"""Tracks the how long a background update is taking to update its items"""
@ -100,18 +223,17 @@ class BackgroundUpdater:
process and autotuning the batch size.
"""
MINIMUM_BACKGROUND_BATCH_SIZE = 100
DEFAULT_BACKGROUND_BATCH_SIZE = 100
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, hs: "HomeServer", database: "DatabasePool"):
self._clock = hs.get_clock()
self.db_pool = database
self._database_name = database.name()
# if a background update is currently running, its name.
self._current_background_update: Optional[str] = None
self._controller = _TimeBasedBackgroundUpdateController(self._clock)
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
self._all_done = False
@ -149,13 +271,8 @@ class BackgroundUpdater:
try:
logger.info("Starting background schema updates")
while self.enabled:
if sleep:
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
result = await self.do_next_background_update(sleep)
except Exception:
logger.exception("Error doing update")
else:
@ -217,13 +334,15 @@ class BackgroundUpdater:
return not update_exists
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
async def do_next_background_update(self, sleep: bool = True) -> bool:
"""Does some amount of work on the next queued background update
Returns once some amount of work is done.
Args:
desired_duration_ms: How long we want to spend updating.
sleep: Whether to limit how quickly we run background updates or
not.
Returns:
True if we have finished running all the background updates, otherwise False
"""
@ -266,7 +385,25 @@ class BackgroundUpdater:
self._current_background_update = upd["update_name"]
await self._do_background_update(desired_duration_ms)
# We have a background update to run, otherwise we would have returned
# early.
assert self._current_background_update is not None
update_info = self._background_update_handlers[self._current_background_update]
if sleep:
controller = self._controller
else:
# If `sleep` is False then we want to run the updates as quickly as
# possible.
controller = _ImmediateBackgroundUpdateController()
async with controller.run_update(
update_name=self._current_background_update,
database_name=self._database_name,
oneshot=update_info.oneshot,
) as desired_duration_ms:
await self._do_background_update(desired_duration_ms)
return False
async def _do_background_update(self, desired_duration_ms: float) -> int:
@ -287,9 +424,14 @@ class BackgroundUpdater:
if items_per_ms is not None:
batch_size = int(desired_duration_ms * items_per_ms)
# Clamp the batch size so that we always make progress
batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
batch_size = max(
batch_size,
await self._controller.min_batch_size(update_name, self._database_name),
)
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
batch_size = await self._controller.default_batch_size(
update_name, self._database_name
)
progress_json = await self.db_pool.simple_select_one_onecol(
"background_updates",
@ -308,6 +450,8 @@ class BackgroundUpdater:
duration_ms = time_stop - time_start
performance.update(items_updated, duration_ms)
logger.info(
"Running background update %r. Processed %r items in %rms."
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
@ -320,8 +464,6 @@ class BackgroundUpdater:
batch_size,
)
performance.update(items_updated, duration_ms)
return len(self._background_update_performance)
def register_background_update_handler(

View file

@ -408,13 +408,9 @@ class EmailPusherTests(HomeserverTestCase):
self.hs.get_datastore().db_pool.updates._all_done = False
# Now let's actually drive the updates to completion
while not self.get_success(
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.hs.get_datastore().db_pool.updates.do_next_background_update(100),
by=0.1,
)
self.get_success(
self.hs.get_datastore().db_pool.updates.run_background_updates(False)
)
# Check that all pushers with unlinked addresses were deleted
pushers = self.get_success(

View file

@ -20,10 +20,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def test_do_background_update(self):
# the time we claim each update takes
duration_ms = 42
duration_ms = 0.2
# the target runtime for each bg update
target_background_update_duration_ms = 50000
target_background_update_duration_ms = 100
store = self.hs.get_datastore()
self.get_success(
@ -48,17 +48,13 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
self.update_handler.side_effect = update
self.update_handler.reset_mock()
res = self.get_success(
self.updates.do_next_background_update(
target_background_update_duration_ms
),
by=0.1,
self.updates.do_next_background_update(False),
by=0.01,
)
self.assertFalse(res)
# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
{"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE
)
self.update_handler.assert_called_once_with({"my_key": 1}, 100)
# second step: complete the update
# we should now get run with a much bigger number of items to update
@ -74,16 +70,12 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
self.update_handler.side_effect = update
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
result = self.get_success(self.updates.do_next_background_update(False))
self.assertFalse(result)
self.update_handler.assert_called_once()
# third step: we don't expect to be called any more
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
result = self.get_success(self.updates.do_next_background_update(False))
self.assertTrue(result)
self.assertFalse(self.update_handler.called)

View file

@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
):
iterations += 1
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
self.store.db_pool.updates.do_next_background_update(False), by=0.1
)
# Ensure that we did actually take multiple iterations to process the
@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
):
iterations += 1
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
self.store.db_pool.updates.do_next_background_update(False), by=0.1
)
# Ensure that we did actually take multiple iterations to process the

View file

@ -319,12 +319,7 @@ class HomeserverTestCase(TestCase):
def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.get_success(self.store.db_pool.updates.run_background_updates(False))
def make_homeserver(self, reactor, clock):
"""
@ -484,8 +479,7 @@ class HomeserverTestCase(TestCase):
async def run_bg_updates():
with LoggingContext("run_bg_updates"):
while not await stor.db_pool.updates.has_completed_background_updates():
await stor.db_pool.updates.do_next_background_update(1)
self.get_success(stor.db_pool.updates.run_background_updates(False))
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore()