Compare commits

...

6 commits

Author SHA1 Message Date
Erik Johnston c7f1498e35 Newsfile 2021-11-11 12:00:15 +00:00
Erik Johnston dccddf15b0 Add tests 2021-11-11 11:53:36 +00:00
Erik Johnston 0ace9f8d85 Expose a sleep(..) func on ModuleApi 2021-11-11 11:53:36 +00:00
Erik Johnston 0c3ba88496 Add a register_background_update_controller 2021-11-11 11:53:36 +00:00
Erik Johnston 957da6f56e Add a BackgroundUpdateController class.
This controls how often and for how long a background update is run for.

The idea is to allow the default one to be replaced by a pluggable
module.
2021-11-11 11:53:36 +00:00
Erik Johnston 4a1a8321dd Store whether a BG update is oneshot or not 2021-11-10 11:15:28 +00:00
8 changed files with 312 additions and 61 deletions

View file

@ -0,0 +1 @@
Add new plugin support for controlling background update timings.

View file

@ -118,7 +118,9 @@ CONDITIONAL_REQUIREMENTS["mypy"] = [
# Tests assume that all optional dependencies are installed.
#
# parameterized_class decorator was introduced in parameterized 0.7.0
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0"]
#
# We use `mock` library as that backports `AsyncMock` to Python 3.6
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0", "mock>=4.0.0"]
CONDITIONAL_REQUIREMENTS["dev"] = (
CONDITIONAL_REQUIREMENTS["lint"]

View file

@ -48,6 +48,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.background_updates import BackgroundUpdateController
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo
from synapse.storage.state import StateFilter
@ -92,6 +93,7 @@ __all__ = [
"JsonDict",
"EventBase",
"StateMap",
"BackgroundUpdateController",
]
logger = logging.getLogger(__name__)
@ -212,6 +214,21 @@ class ModuleApi:
"""
self._hs.register_module_web_resource(path, resource)
def register_background_update_controller(
self,
controller: BackgroundUpdateController,
) -> None:
"""Registers a background update controller.
Added in v1.48.0
Args:
controller: The controller to use.
"""
for db in self._hs.get_datastores().databases:
db.updates.register_update_controller(controller)
#########################################################################
# The following methods can be called by the module at any point in time.
@ -859,6 +876,11 @@ class ModuleApi:
f,
)
async def sleep(self, seconds: float) -> None:
"""Sleeps for the given number of seconds."""
await self._clock.sleep(seconds)
async def send_mail(
self,
recipient: str,

View file

@ -11,13 +11,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional
from typing import (
TYPE_CHECKING,
AsyncContextManager,
Awaitable,
Callable,
Dict,
Iterable,
Optional,
)
import attr
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.types import Connection
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util import Clock, json_encoder
from . import engines
@ -28,6 +39,136 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _BackgroundUpdateHandler:
"""A handler for a given background update.
Attributes:
callback: The function to call to make progress on the background
update.
oneshot: Wether the update is likely to happen all in one go, ignoring
the supplied target duration, e.g. index creation. This is used by
the update controller to help correctly schedule the update.
"""
callback: Callable[[JsonDict, int], Awaitable[int]]
oneshot: bool = False
class BackgroundUpdateController(abc.ABC):
"""A base class for controlling background update timings."""
####
# NOTE: This is used by modules so changes must be backwards compatible or
# be announced appropriately
####
@abc.abstractmethod
def run_update(
self, update_name: str, database_name: str, oneshot: bool
) -> AsyncContextManager[int]:
"""Called before we do the next iteration of a background update. The
returned async context manager is immediately entered and then exited
after this iteration of the background update has finished.
Implementations will likely want to sleep for a period of time to stop
the background update from continuously being run.
Args:
update_name: The name of the update that is to be run
database_name: The name of the database the background update is
being run on. Really only useful if Synapse is configured with
multiple databases.
oneshot: Whether the update will complete all in one go, e.g.
index creation. In such cases the returned target duration is
ignored.
Returns:
The target duration in milliseconds that the background update
should run for.
Note: this is a *target*, and an iteration may take substantially
longer or shorter.
"""
...
@abc.abstractmethod
async def default_batch_size(self, update_name: str, database_name: str) -> int:
"""The batch size to use for the first iteration of a new background
update.
"""
...
@abc.abstractmethod
async def min_batch_size(self, update_name: str, database_name: str) -> int:
"""A lower bound on the batch size of a new background update.
Used to ensure that progress is always made. Must be greater than 0.
"""
...
class _TimeBasedBackgroundUpdateController(BackgroundUpdateController):
"""The default controller which aims to spend X ms doing the background
update every Y ms.
"""
MINIMUM_BACKGROUND_BATCH_SIZE = 100
DEFAULT_BACKGROUND_BATCH_SIZE = 100
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, clock: Clock):
self._clock = clock
def run_update(
self,
update_name: str,
database_name: str,
oneshot: bool,
) -> AsyncContextManager[int]:
return self
async def default_batch_size(self, update_name: str, database_name: str) -> int:
return self.DEFAULT_BACKGROUND_BATCH_SIZE
async def min_batch_size(self, update_name: str, database_name: str) -> int:
return self.MINIMUM_BACKGROUND_BATCH_SIZE
async def __aenter__(self) -> int:
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000)
return self.BACKGROUND_UPDATE_DURATION_MS
async def __aexit__(self, *exc):
pass
class _ImmediateBackgroundUpdateController(BackgroundUpdateController):
"""A background update controller that doesn't ever wait, effectively
running the background updates as quickly as possible"""
def run_update(
self,
update_name: str,
database_name: str,
oneshot: bool,
) -> AsyncContextManager[int]:
return self
async def default_batch_size(self, update_name: str, database_name: str) -> int:
return 100
async def min_batch_size(self, update_name: str, database_name: str) -> int:
return 100
async def __aenter__(self) -> int:
return 100
async def __aexit__(self, *exc):
pass
class BackgroundUpdatePerformance:
"""Tracks the how long a background update is taking to update its items"""
@ -82,22 +223,21 @@ class BackgroundUpdater:
process and autotuning the batch size.
"""
MINIMUM_BACKGROUND_BATCH_SIZE = 100
DEFAULT_BACKGROUND_BATCH_SIZE = 100
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, hs: "HomeServer", database: "DatabasePool"):
self._clock = hs.get_clock()
self.db_pool = database
self._database_name = database.name()
# if a background update is currently running, its name.
self._current_background_update: Optional[str] = None
self._controller: BackgroundUpdateController = (
_TimeBasedBackgroundUpdateController(self._clock)
)
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
self._background_update_handlers: Dict[
str, Callable[[JsonDict, int], Awaitable[int]]
] = {}
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
self._all_done = False
# Whether we're currently running updates
@ -107,6 +247,13 @@ class BackgroundUpdater:
# enable/disable background updates via the admin API.
self.enabled = True
def register_update_controller(
self, controller: BackgroundUpdateController
) -> None:
"""Register a new background update controller to use."""
self._controller = controller
def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
"""Returns the current background update, if any."""
@ -133,13 +280,8 @@ class BackgroundUpdater:
try:
logger.info("Starting background schema updates")
while self.enabled:
if sleep:
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
result = await self.do_next_background_update(sleep)
except Exception:
logger.exception("Error doing update")
else:
@ -201,13 +343,15 @@ class BackgroundUpdater:
return not update_exists
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
async def do_next_background_update(self, sleep: bool = True) -> bool:
"""Does some amount of work on the next queued background update
Returns once some amount of work is done.
Args:
desired_duration_ms: How long we want to spend updating.
sleep: Whether to limit how quickly we run background updates or
not.
Returns:
True if we have finished running all the background updates, otherwise False
"""
@ -250,7 +394,25 @@ class BackgroundUpdater:
self._current_background_update = upd["update_name"]
await self._do_background_update(desired_duration_ms)
# We have a background update to run, otherwise we would have returned
# early.
assert self._current_background_update is not None
update_info = self._background_update_handlers[self._current_background_update]
if sleep:
controller = self._controller
else:
# If `sleep` is False then we want to run the updates as quickly as
# possible.
controller = _ImmediateBackgroundUpdateController()
async with controller.run_update(
update_name=self._current_background_update,
database_name=self._database_name,
oneshot=update_info.oneshot,
) as desired_duration_ms:
await self._do_background_update(desired_duration_ms)
return False
async def _do_background_update(self, desired_duration_ms: float) -> int:
@ -258,7 +420,7 @@ class BackgroundUpdater:
update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
update_handler = self._background_update_handlers[update_name].callback
performance = self._background_update_performance.get(update_name)
@ -271,9 +433,14 @@ class BackgroundUpdater:
if items_per_ms is not None:
batch_size = int(desired_duration_ms * items_per_ms)
# Clamp the batch size so that we always make progress
batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
batch_size = max(
batch_size,
await self._controller.min_batch_size(update_name, self._database_name),
)
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
batch_size = await self._controller.default_batch_size(
update_name, self._database_name
)
progress_json = await self.db_pool.simple_select_one_onecol(
"background_updates",
@ -292,6 +459,8 @@ class BackgroundUpdater:
duration_ms = time_stop - time_start
performance.update(items_updated, duration_ms)
logger.info(
"Running background update %r. Processed %r items in %rms."
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
@ -304,8 +473,6 @@ class BackgroundUpdater:
batch_size,
)
performance.update(items_updated, duration_ms)
return len(self._background_update_performance)
def register_background_update_handler(
@ -329,7 +496,9 @@ class BackgroundUpdater:
update_name: The name of the update that this code handles.
update_handler: The function that does the update.
"""
self._background_update_handlers[update_name] = update_handler
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
update_handler
)
def register_noop_background_update(self, update_name: str) -> None:
"""Register a noop handler for a background update.
@ -451,7 +620,9 @@ class BackgroundUpdater:
await self._end_background_update(update_name)
return 1
self.register_background_update_handler(update_name, updater)
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.

View file

@ -408,13 +408,9 @@ class EmailPusherTests(HomeserverTestCase):
self.hs.get_datastore().db_pool.updates._all_done = False
# Now let's actually drive the updates to completion
while not self.get_success(
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.hs.get_datastore().db_pool.updates.do_next_background_update(100),
by=0.1,
)
self.get_success(
self.hs.get_datastore().db_pool.updates.run_background_updates(False)
)
# Check that all pushers with unlinked addresses were deleted
pushers = self.get_success(

View file

@ -1,6 +1,11 @@
from unittest.mock import Mock
from mock import AsyncMock, Mock
from synapse.storage.background_updates import BackgroundUpdater
from twisted.internet.defer import Deferred, ensureDeferred
from synapse.storage.background_updates import (
BackgroundUpdateController,
BackgroundUpdater,
)
from tests import unittest
@ -20,10 +25,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def test_do_background_update(self):
# the time we claim each update takes
duration_ms = 42
duration_ms = 0.2
# the target runtime for each bg update
target_background_update_duration_ms = 50000
target_background_update_duration_ms = 100
store = self.hs.get_datastore()
self.get_success(
@ -48,17 +53,13 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
self.update_handler.side_effect = update
self.update_handler.reset_mock()
res = self.get_success(
self.updates.do_next_background_update(
target_background_update_duration_ms
),
by=0.1,
self.updates.do_next_background_update(False),
by=0.01,
)
self.assertFalse(res)
# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
{"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE
)
self.update_handler.assert_called_once_with({"my_key": 1}, 100)
# second step: complete the update
# we should now get run with a much bigger number of items to update
@ -74,16 +75,80 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
self.update_handler.side_effect = update
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
result = self.get_success(self.updates.do_next_background_update(False))
self.assertFalse(result)
self.update_handler.assert_called_once()
# third step: we don't expect to be called any more
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
result = self.get_success(self.updates.do_next_background_update(False))
self.assertTrue(result)
self.assertFalse(self.update_handler.called)
class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
# the base test class should have run the real bg updates for us
self.assertTrue(
self.get_success(self.updates.has_completed_background_updates())
)
self.update_deferred = Deferred()
self.update_handler = Mock(return_value=self.update_deferred)
self.updates.register_background_update_handler(
"test_update", self.update_handler
)
self._controller_ctx_mgr = AsyncMock(name="_controller_ctx_mgr")
self._controller = AsyncMock(BackgroundUpdateController)
self._controller.run_update.return_value = self._controller_ctx_mgr
self.updates.register_update_controller(self._controller)
def test_controller(self):
store = self.hs.get_datastore()
self.get_success(
store.db_pool.simple_insert(
"background_updates",
values={"update_name": "test_update", "progress_json": "{}"},
)
)
default_batch_size = 100
# Set up the return values of the controller.
enter_defer = Deferred()
self._controller_ctx_mgr.__aenter__ = Mock(return_value=enter_defer)
self._controller.default_batch_size.return_value = default_batch_size
self._controller.min_batch_size.return_value = default_batch_size
# Start the background update.
do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
self.pump()
# `run_update` should have been called, but the update handler won't be
# called until the `enter_defer` (returned by `__aenter__`) is resolved.
self._controller.run_update.assert_called_once_with(
update_name="test_update",
database_name="master",
oneshot=False,
)
self.assertFalse(do_update_d.called)
self.assertFalse(self.update_deferred.called)
# Resolving the `enter_defer` should call the update handler, which then
# blocks.
enter_defer.callback(100)
self.pump()
self.update_handler.assert_called_once_with({}, default_batch_size)
self.assertFalse(self.update_deferred.called)
self._controller_ctx_mgr.__aexit__.assert_not_awaited()
# Resolving the update handler deferred should cause the
# `do_next_background_update` to finish and return
self.update_deferred.callback(100)
self.pump()
self._controller_ctx_mgr.__aexit__.assert_awaited()
self.get_success(do_update_d)

View file

@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
):
iterations += 1
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
self.store.db_pool.updates.do_next_background_update(False), by=0.1
)
# Ensure that we did actually take multiple iterations to process the
@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
):
iterations += 1
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
self.store.db_pool.updates.do_next_background_update(False), by=0.1
)
# Ensure that we did actually take multiple iterations to process the

View file

@ -319,12 +319,7 @@ class HomeserverTestCase(TestCase):
def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.get_success(self.store.db_pool.updates.run_background_updates(False))
def make_homeserver(self, reactor, clock):
"""
@ -484,8 +479,7 @@ class HomeserverTestCase(TestCase):
async def run_bg_updates():
with LoggingContext("run_bg_updates"):
while not await stor.db_pool.updates.has_completed_background_updates():
await stor.db_pool.updates.do_next_background_update(1)
self.get_success(stor.db_pool.updates.run_background_updates(False))
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore()