Compare commits
6 commits
develop
...
erikj/bg_u
Author | SHA1 | Date | |
---|---|---|---|
c7f1498e35 | |||
dccddf15b0 | |||
0ace9f8d85 | |||
0c3ba88496 | |||
957da6f56e | |||
4a1a8321dd |
1
changelog.d/11306.feature
Normal file
1
changelog.d/11306.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add new plugin support for controlling background update timings.
|
4
setup.py
4
setup.py
|
@ -118,7 +118,9 @@ CONDITIONAL_REQUIREMENTS["mypy"] = [
|
||||||
# Tests assume that all optional dependencies are installed.
|
# Tests assume that all optional dependencies are installed.
|
||||||
#
|
#
|
||||||
# parameterized_class decorator was introduced in parameterized 0.7.0
|
# parameterized_class decorator was introduced in parameterized 0.7.0
|
||||||
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0"]
|
#
|
||||||
|
# We use `mock` library as that backports `AsyncMock` to Python 3.6
|
||||||
|
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0", "mock>=4.0.0"]
|
||||||
|
|
||||||
CONDITIONAL_REQUIREMENTS["dev"] = (
|
CONDITIONAL_REQUIREMENTS["dev"] = (
|
||||||
CONDITIONAL_REQUIREMENTS["lint"]
|
CONDITIONAL_REQUIREMENTS["lint"]
|
||||||
|
|
|
@ -48,6 +48,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.rest.client.login import LoginResponse
|
from synapse.rest.client.login import LoginResponse
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
|
from synapse.storage.background_updates import BackgroundUpdateController
|
||||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||||
from synapse.storage.databases.main.roommember import ProfileInfo
|
from synapse.storage.databases.main.roommember import ProfileInfo
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
|
@ -92,6 +93,7 @@ __all__ = [
|
||||||
"JsonDict",
|
"JsonDict",
|
||||||
"EventBase",
|
"EventBase",
|
||||||
"StateMap",
|
"StateMap",
|
||||||
|
"BackgroundUpdateController",
|
||||||
]
|
]
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -212,6 +214,21 @@ class ModuleApi:
|
||||||
"""
|
"""
|
||||||
self._hs.register_module_web_resource(path, resource)
|
self._hs.register_module_web_resource(path, resource)
|
||||||
|
|
||||||
|
def register_background_update_controller(
|
||||||
|
self,
|
||||||
|
controller: BackgroundUpdateController,
|
||||||
|
) -> None:
|
||||||
|
"""Registers a background update controller.
|
||||||
|
|
||||||
|
Added in v1.48.0
|
||||||
|
|
||||||
|
Args:
|
||||||
|
controller: The controller to use.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for db in self._hs.get_datastores().databases:
|
||||||
|
db.updates.register_update_controller(controller)
|
||||||
|
|
||||||
#########################################################################
|
#########################################################################
|
||||||
# The following methods can be called by the module at any point in time.
|
# The following methods can be called by the module at any point in time.
|
||||||
|
|
||||||
|
@ -859,6 +876,11 @@ class ModuleApi:
|
||||||
f,
|
f,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def sleep(self, seconds: float) -> None:
|
||||||
|
"""Sleeps for the given number of seconds."""
|
||||||
|
|
||||||
|
await self._clock.sleep(seconds)
|
||||||
|
|
||||||
async def send_mail(
|
async def send_mail(
|
||||||
self,
|
self,
|
||||||
recipient: str,
|
recipient: str,
|
||||||
|
|
|
@ -11,13 +11,24 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import abc
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
AsyncContextManager,
|
||||||
|
Awaitable,
|
||||||
|
Callable,
|
||||||
|
Dict,
|
||||||
|
Iterable,
|
||||||
|
Optional,
|
||||||
|
)
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.types import Connection
|
from synapse.storage.types import Connection
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
from synapse.util import json_encoder
|
from synapse.util import Clock, json_encoder
|
||||||
|
|
||||||
from . import engines
|
from . import engines
|
||||||
|
|
||||||
|
@ -28,6 +39,136 @@ if TYPE_CHECKING:
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class _BackgroundUpdateHandler:
|
||||||
|
"""A handler for a given background update.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
callback: The function to call to make progress on the background
|
||||||
|
update.
|
||||||
|
oneshot: Wether the update is likely to happen all in one go, ignoring
|
||||||
|
the supplied target duration, e.g. index creation. This is used by
|
||||||
|
the update controller to help correctly schedule the update.
|
||||||
|
"""
|
||||||
|
|
||||||
|
callback: Callable[[JsonDict, int], Awaitable[int]]
|
||||||
|
oneshot: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class BackgroundUpdateController(abc.ABC):
|
||||||
|
"""A base class for controlling background update timings."""
|
||||||
|
|
||||||
|
####
|
||||||
|
# NOTE: This is used by modules so changes must be backwards compatible or
|
||||||
|
# be announced appropriately
|
||||||
|
####
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def run_update(
|
||||||
|
self, update_name: str, database_name: str, oneshot: bool
|
||||||
|
) -> AsyncContextManager[int]:
|
||||||
|
"""Called before we do the next iteration of a background update. The
|
||||||
|
returned async context manager is immediately entered and then exited
|
||||||
|
after this iteration of the background update has finished.
|
||||||
|
|
||||||
|
Implementations will likely want to sleep for a period of time to stop
|
||||||
|
the background update from continuously being run.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
update_name: The name of the update that is to be run
|
||||||
|
database_name: The name of the database the background update is
|
||||||
|
being run on. Really only useful if Synapse is configured with
|
||||||
|
multiple databases.
|
||||||
|
oneshot: Whether the update will complete all in one go, e.g.
|
||||||
|
index creation. In such cases the returned target duration is
|
||||||
|
ignored.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The target duration in milliseconds that the background update
|
||||||
|
should run for.
|
||||||
|
|
||||||
|
Note: this is a *target*, and an iteration may take substantially
|
||||||
|
longer or shorter.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
async def default_batch_size(self, update_name: str, database_name: str) -> int:
|
||||||
|
"""The batch size to use for the first iteration of a new background
|
||||||
|
update.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
async def min_batch_size(self, update_name: str, database_name: str) -> int:
|
||||||
|
"""A lower bound on the batch size of a new background update.
|
||||||
|
|
||||||
|
Used to ensure that progress is always made. Must be greater than 0.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class _TimeBasedBackgroundUpdateController(BackgroundUpdateController):
|
||||||
|
"""The default controller which aims to spend X ms doing the background
|
||||||
|
update every Y ms.
|
||||||
|
"""
|
||||||
|
|
||||||
|
MINIMUM_BACKGROUND_BATCH_SIZE = 100
|
||||||
|
DEFAULT_BACKGROUND_BATCH_SIZE = 100
|
||||||
|
|
||||||
|
BACKGROUND_UPDATE_INTERVAL_MS = 1000
|
||||||
|
BACKGROUND_UPDATE_DURATION_MS = 100
|
||||||
|
|
||||||
|
def __init__(self, clock: Clock):
|
||||||
|
self._clock = clock
|
||||||
|
|
||||||
|
def run_update(
|
||||||
|
self,
|
||||||
|
update_name: str,
|
||||||
|
database_name: str,
|
||||||
|
oneshot: bool,
|
||||||
|
) -> AsyncContextManager[int]:
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def default_batch_size(self, update_name: str, database_name: str) -> int:
|
||||||
|
return self.DEFAULT_BACKGROUND_BATCH_SIZE
|
||||||
|
|
||||||
|
async def min_batch_size(self, update_name: str, database_name: str) -> int:
|
||||||
|
return self.MINIMUM_BACKGROUND_BATCH_SIZE
|
||||||
|
|
||||||
|
async def __aenter__(self) -> int:
|
||||||
|
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000)
|
||||||
|
return self.BACKGROUND_UPDATE_DURATION_MS
|
||||||
|
|
||||||
|
async def __aexit__(self, *exc):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class _ImmediateBackgroundUpdateController(BackgroundUpdateController):
|
||||||
|
"""A background update controller that doesn't ever wait, effectively
|
||||||
|
running the background updates as quickly as possible"""
|
||||||
|
|
||||||
|
def run_update(
|
||||||
|
self,
|
||||||
|
update_name: str,
|
||||||
|
database_name: str,
|
||||||
|
oneshot: bool,
|
||||||
|
) -> AsyncContextManager[int]:
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def default_batch_size(self, update_name: str, database_name: str) -> int:
|
||||||
|
return 100
|
||||||
|
|
||||||
|
async def min_batch_size(self, update_name: str, database_name: str) -> int:
|
||||||
|
return 100
|
||||||
|
|
||||||
|
async def __aenter__(self) -> int:
|
||||||
|
return 100
|
||||||
|
|
||||||
|
async def __aexit__(self, *exc):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class BackgroundUpdatePerformance:
|
class BackgroundUpdatePerformance:
|
||||||
"""Tracks the how long a background update is taking to update its items"""
|
"""Tracks the how long a background update is taking to update its items"""
|
||||||
|
|
||||||
|
@ -82,22 +223,21 @@ class BackgroundUpdater:
|
||||||
process and autotuning the batch size.
|
process and autotuning the batch size.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
MINIMUM_BACKGROUND_BATCH_SIZE = 100
|
|
||||||
DEFAULT_BACKGROUND_BATCH_SIZE = 100
|
|
||||||
BACKGROUND_UPDATE_INTERVAL_MS = 1000
|
|
||||||
BACKGROUND_UPDATE_DURATION_MS = 100
|
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer", database: "DatabasePool"):
|
def __init__(self, hs: "HomeServer", database: "DatabasePool"):
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.db_pool = database
|
self.db_pool = database
|
||||||
|
|
||||||
|
self._database_name = database.name()
|
||||||
|
|
||||||
# if a background update is currently running, its name.
|
# if a background update is currently running, its name.
|
||||||
self._current_background_update: Optional[str] = None
|
self._current_background_update: Optional[str] = None
|
||||||
|
|
||||||
|
self._controller: BackgroundUpdateController = (
|
||||||
|
_TimeBasedBackgroundUpdateController(self._clock)
|
||||||
|
)
|
||||||
|
|
||||||
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
|
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
|
||||||
self._background_update_handlers: Dict[
|
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
|
||||||
str, Callable[[JsonDict, int], Awaitable[int]]
|
|
||||||
] = {}
|
|
||||||
self._all_done = False
|
self._all_done = False
|
||||||
|
|
||||||
# Whether we're currently running updates
|
# Whether we're currently running updates
|
||||||
|
@ -107,6 +247,13 @@ class BackgroundUpdater:
|
||||||
# enable/disable background updates via the admin API.
|
# enable/disable background updates via the admin API.
|
||||||
self.enabled = True
|
self.enabled = True
|
||||||
|
|
||||||
|
def register_update_controller(
|
||||||
|
self, controller: BackgroundUpdateController
|
||||||
|
) -> None:
|
||||||
|
"""Register a new background update controller to use."""
|
||||||
|
|
||||||
|
self._controller = controller
|
||||||
|
|
||||||
def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
|
def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
|
||||||
"""Returns the current background update, if any."""
|
"""Returns the current background update, if any."""
|
||||||
|
|
||||||
|
@ -133,13 +280,8 @@ class BackgroundUpdater:
|
||||||
try:
|
try:
|
||||||
logger.info("Starting background schema updates")
|
logger.info("Starting background schema updates")
|
||||||
while self.enabled:
|
while self.enabled:
|
||||||
if sleep:
|
|
||||||
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = await self.do_next_background_update(
|
result = await self.do_next_background_update(sleep)
|
||||||
self.BACKGROUND_UPDATE_DURATION_MS
|
|
||||||
)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error doing update")
|
logger.exception("Error doing update")
|
||||||
else:
|
else:
|
||||||
|
@ -201,13 +343,15 @@ class BackgroundUpdater:
|
||||||
|
|
||||||
return not update_exists
|
return not update_exists
|
||||||
|
|
||||||
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
|
async def do_next_background_update(self, sleep: bool = True) -> bool:
|
||||||
"""Does some amount of work on the next queued background update
|
"""Does some amount of work on the next queued background update
|
||||||
|
|
||||||
Returns once some amount of work is done.
|
Returns once some amount of work is done.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
desired_duration_ms: How long we want to spend updating.
|
sleep: Whether to limit how quickly we run background updates or
|
||||||
|
not.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if we have finished running all the background updates, otherwise False
|
True if we have finished running all the background updates, otherwise False
|
||||||
"""
|
"""
|
||||||
|
@ -250,7 +394,25 @@ class BackgroundUpdater:
|
||||||
|
|
||||||
self._current_background_update = upd["update_name"]
|
self._current_background_update = upd["update_name"]
|
||||||
|
|
||||||
await self._do_background_update(desired_duration_ms)
|
# We have a background update to run, otherwise we would have returned
|
||||||
|
# early.
|
||||||
|
assert self._current_background_update is not None
|
||||||
|
update_info = self._background_update_handlers[self._current_background_update]
|
||||||
|
|
||||||
|
if sleep:
|
||||||
|
controller = self._controller
|
||||||
|
else:
|
||||||
|
# If `sleep` is False then we want to run the updates as quickly as
|
||||||
|
# possible.
|
||||||
|
controller = _ImmediateBackgroundUpdateController()
|
||||||
|
|
||||||
|
async with controller.run_update(
|
||||||
|
update_name=self._current_background_update,
|
||||||
|
database_name=self._database_name,
|
||||||
|
oneshot=update_info.oneshot,
|
||||||
|
) as desired_duration_ms:
|
||||||
|
await self._do_background_update(desired_duration_ms)
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def _do_background_update(self, desired_duration_ms: float) -> int:
|
async def _do_background_update(self, desired_duration_ms: float) -> int:
|
||||||
|
@ -258,7 +420,7 @@ class BackgroundUpdater:
|
||||||
update_name = self._current_background_update
|
update_name = self._current_background_update
|
||||||
logger.info("Starting update batch on background update '%s'", update_name)
|
logger.info("Starting update batch on background update '%s'", update_name)
|
||||||
|
|
||||||
update_handler = self._background_update_handlers[update_name]
|
update_handler = self._background_update_handlers[update_name].callback
|
||||||
|
|
||||||
performance = self._background_update_performance.get(update_name)
|
performance = self._background_update_performance.get(update_name)
|
||||||
|
|
||||||
|
@ -271,9 +433,14 @@ class BackgroundUpdater:
|
||||||
if items_per_ms is not None:
|
if items_per_ms is not None:
|
||||||
batch_size = int(desired_duration_ms * items_per_ms)
|
batch_size = int(desired_duration_ms * items_per_ms)
|
||||||
# Clamp the batch size so that we always make progress
|
# Clamp the batch size so that we always make progress
|
||||||
batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
|
batch_size = max(
|
||||||
|
batch_size,
|
||||||
|
await self._controller.min_batch_size(update_name, self._database_name),
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
|
batch_size = await self._controller.default_batch_size(
|
||||||
|
update_name, self._database_name
|
||||||
|
)
|
||||||
|
|
||||||
progress_json = await self.db_pool.simple_select_one_onecol(
|
progress_json = await self.db_pool.simple_select_one_onecol(
|
||||||
"background_updates",
|
"background_updates",
|
||||||
|
@ -292,6 +459,8 @@ class BackgroundUpdater:
|
||||||
|
|
||||||
duration_ms = time_stop - time_start
|
duration_ms = time_stop - time_start
|
||||||
|
|
||||||
|
performance.update(items_updated, duration_ms)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Running background update %r. Processed %r items in %rms."
|
"Running background update %r. Processed %r items in %rms."
|
||||||
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
|
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
|
||||||
|
@ -304,8 +473,6 @@ class BackgroundUpdater:
|
||||||
batch_size,
|
batch_size,
|
||||||
)
|
)
|
||||||
|
|
||||||
performance.update(items_updated, duration_ms)
|
|
||||||
|
|
||||||
return len(self._background_update_performance)
|
return len(self._background_update_performance)
|
||||||
|
|
||||||
def register_background_update_handler(
|
def register_background_update_handler(
|
||||||
|
@ -329,7 +496,9 @@ class BackgroundUpdater:
|
||||||
update_name: The name of the update that this code handles.
|
update_name: The name of the update that this code handles.
|
||||||
update_handler: The function that does the update.
|
update_handler: The function that does the update.
|
||||||
"""
|
"""
|
||||||
self._background_update_handlers[update_name] = update_handler
|
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
|
||||||
|
update_handler
|
||||||
|
)
|
||||||
|
|
||||||
def register_noop_background_update(self, update_name: str) -> None:
|
def register_noop_background_update(self, update_name: str) -> None:
|
||||||
"""Register a noop handler for a background update.
|
"""Register a noop handler for a background update.
|
||||||
|
@ -451,7 +620,9 @@ class BackgroundUpdater:
|
||||||
await self._end_background_update(update_name)
|
await self._end_background_update(update_name)
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
self.register_background_update_handler(update_name, updater)
|
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
|
||||||
|
updater, oneshot=True
|
||||||
|
)
|
||||||
|
|
||||||
async def _end_background_update(self, update_name: str) -> None:
|
async def _end_background_update(self, update_name: str) -> None:
|
||||||
"""Removes a completed background update task from the queue.
|
"""Removes a completed background update task from the queue.
|
||||||
|
|
|
@ -408,13 +408,9 @@ class EmailPusherTests(HomeserverTestCase):
|
||||||
self.hs.get_datastore().db_pool.updates._all_done = False
|
self.hs.get_datastore().db_pool.updates._all_done = False
|
||||||
|
|
||||||
# Now let's actually drive the updates to completion
|
# Now let's actually drive the updates to completion
|
||||||
while not self.get_success(
|
self.get_success(
|
||||||
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
|
self.hs.get_datastore().db_pool.updates.run_background_updates(False)
|
||||||
):
|
)
|
||||||
self.get_success(
|
|
||||||
self.hs.get_datastore().db_pool.updates.do_next_background_update(100),
|
|
||||||
by=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check that all pushers with unlinked addresses were deleted
|
# Check that all pushers with unlinked addresses were deleted
|
||||||
pushers = self.get_success(
|
pushers = self.get_success(
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
from unittest.mock import Mock
|
from mock import AsyncMock, Mock
|
||||||
|
|
||||||
from synapse.storage.background_updates import BackgroundUpdater
|
from twisted.internet.defer import Deferred, ensureDeferred
|
||||||
|
|
||||||
|
from synapse.storage.background_updates import (
|
||||||
|
BackgroundUpdateController,
|
||||||
|
BackgroundUpdater,
|
||||||
|
)
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
|
||||||
|
@ -20,10 +25,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
def test_do_background_update(self):
|
def test_do_background_update(self):
|
||||||
# the time we claim each update takes
|
# the time we claim each update takes
|
||||||
duration_ms = 42
|
duration_ms = 0.2
|
||||||
|
|
||||||
# the target runtime for each bg update
|
# the target runtime for each bg update
|
||||||
target_background_update_duration_ms = 50000
|
target_background_update_duration_ms = 100
|
||||||
|
|
||||||
store = self.hs.get_datastore()
|
store = self.hs.get_datastore()
|
||||||
self.get_success(
|
self.get_success(
|
||||||
|
@ -48,17 +53,13 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
self.update_handler.side_effect = update
|
self.update_handler.side_effect = update
|
||||||
self.update_handler.reset_mock()
|
self.update_handler.reset_mock()
|
||||||
res = self.get_success(
|
res = self.get_success(
|
||||||
self.updates.do_next_background_update(
|
self.updates.do_next_background_update(False),
|
||||||
target_background_update_duration_ms
|
by=0.01,
|
||||||
),
|
|
||||||
by=0.1,
|
|
||||||
)
|
)
|
||||||
self.assertFalse(res)
|
self.assertFalse(res)
|
||||||
|
|
||||||
# on the first call, we should get run with the default background update size
|
# on the first call, we should get run with the default background update size
|
||||||
self.update_handler.assert_called_once_with(
|
self.update_handler.assert_called_once_with({"my_key": 1}, 100)
|
||||||
{"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE
|
|
||||||
)
|
|
||||||
|
|
||||||
# second step: complete the update
|
# second step: complete the update
|
||||||
# we should now get run with a much bigger number of items to update
|
# we should now get run with a much bigger number of items to update
|
||||||
|
@ -74,16 +75,80 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
self.update_handler.side_effect = update
|
self.update_handler.side_effect = update
|
||||||
self.update_handler.reset_mock()
|
self.update_handler.reset_mock()
|
||||||
result = self.get_success(
|
result = self.get_success(self.updates.do_next_background_update(False))
|
||||||
self.updates.do_next_background_update(target_background_update_duration_ms)
|
|
||||||
)
|
|
||||||
self.assertFalse(result)
|
self.assertFalse(result)
|
||||||
self.update_handler.assert_called_once()
|
self.update_handler.assert_called_once()
|
||||||
|
|
||||||
# third step: we don't expect to be called any more
|
# third step: we don't expect to be called any more
|
||||||
self.update_handler.reset_mock()
|
self.update_handler.reset_mock()
|
||||||
result = self.get_success(
|
result = self.get_success(self.updates.do_next_background_update(False))
|
||||||
self.updates.do_next_background_update(target_background_update_duration_ms)
|
|
||||||
)
|
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
self.assertFalse(self.update_handler.called)
|
self.assertFalse(self.update_handler.called)
|
||||||
|
|
||||||
|
|
||||||
|
class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
|
||||||
|
def prepare(self, reactor, clock, homeserver):
|
||||||
|
self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
|
||||||
|
# the base test class should have run the real bg updates for us
|
||||||
|
self.assertTrue(
|
||||||
|
self.get_success(self.updates.has_completed_background_updates())
|
||||||
|
)
|
||||||
|
|
||||||
|
self.update_deferred = Deferred()
|
||||||
|
self.update_handler = Mock(return_value=self.update_deferred)
|
||||||
|
self.updates.register_background_update_handler(
|
||||||
|
"test_update", self.update_handler
|
||||||
|
)
|
||||||
|
|
||||||
|
self._controller_ctx_mgr = AsyncMock(name="_controller_ctx_mgr")
|
||||||
|
self._controller = AsyncMock(BackgroundUpdateController)
|
||||||
|
self._controller.run_update.return_value = self._controller_ctx_mgr
|
||||||
|
|
||||||
|
self.updates.register_update_controller(self._controller)
|
||||||
|
|
||||||
|
def test_controller(self):
|
||||||
|
store = self.hs.get_datastore()
|
||||||
|
self.get_success(
|
||||||
|
store.db_pool.simple_insert(
|
||||||
|
"background_updates",
|
||||||
|
values={"update_name": "test_update", "progress_json": "{}"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
default_batch_size = 100
|
||||||
|
|
||||||
|
# Set up the return values of the controller.
|
||||||
|
enter_defer = Deferred()
|
||||||
|
self._controller_ctx_mgr.__aenter__ = Mock(return_value=enter_defer)
|
||||||
|
self._controller.default_batch_size.return_value = default_batch_size
|
||||||
|
self._controller.min_batch_size.return_value = default_batch_size
|
||||||
|
|
||||||
|
# Start the background update.
|
||||||
|
do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
|
||||||
|
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
# `run_update` should have been called, but the update handler won't be
|
||||||
|
# called until the `enter_defer` (returned by `__aenter__`) is resolved.
|
||||||
|
self._controller.run_update.assert_called_once_with(
|
||||||
|
update_name="test_update",
|
||||||
|
database_name="master",
|
||||||
|
oneshot=False,
|
||||||
|
)
|
||||||
|
self.assertFalse(do_update_d.called)
|
||||||
|
self.assertFalse(self.update_deferred.called)
|
||||||
|
|
||||||
|
# Resolving the `enter_defer` should call the update handler, which then
|
||||||
|
# blocks.
|
||||||
|
enter_defer.callback(100)
|
||||||
|
self.pump()
|
||||||
|
self.update_handler.assert_called_once_with({}, default_batch_size)
|
||||||
|
self.assertFalse(self.update_deferred.called)
|
||||||
|
self._controller_ctx_mgr.__aexit__.assert_not_awaited()
|
||||||
|
|
||||||
|
# Resolving the update handler deferred should cause the
|
||||||
|
# `do_next_background_update` to finish and return
|
||||||
|
self.update_deferred.callback(100)
|
||||||
|
self.pump()
|
||||||
|
self._controller_ctx_mgr.__aexit__.assert_awaited()
|
||||||
|
self.get_success(do_update_d)
|
||||||
|
|
|
@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
|
||||||
):
|
):
|
||||||
iterations += 1
|
iterations += 1
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
self.store.db_pool.updates.do_next_background_update(False), by=0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ensure that we did actually take multiple iterations to process the
|
# Ensure that we did actually take multiple iterations to process the
|
||||||
|
@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
|
||||||
):
|
):
|
||||||
iterations += 1
|
iterations += 1
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
self.store.db_pool.updates.do_next_background_update(False), by=0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ensure that we did actually take multiple iterations to process the
|
# Ensure that we did actually take multiple iterations to process the
|
||||||
|
|
|
@ -319,12 +319,7 @@ class HomeserverTestCase(TestCase):
|
||||||
|
|
||||||
def wait_for_background_updates(self) -> None:
|
def wait_for_background_updates(self) -> None:
|
||||||
"""Block until all background database updates have completed."""
|
"""Block until all background database updates have completed."""
|
||||||
while not self.get_success(
|
self.get_success(self.store.db_pool.updates.run_background_updates(False))
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
def make_homeserver(self, reactor, clock):
|
def make_homeserver(self, reactor, clock):
|
||||||
"""
|
"""
|
||||||
|
@ -484,8 +479,7 @@ class HomeserverTestCase(TestCase):
|
||||||
|
|
||||||
async def run_bg_updates():
|
async def run_bg_updates():
|
||||||
with LoggingContext("run_bg_updates"):
|
with LoggingContext("run_bg_updates"):
|
||||||
while not await stor.db_pool.updates.has_completed_background_updates():
|
self.get_success(stor.db_pool.updates.run_background_updates(False))
|
||||||
await stor.db_pool.updates.do_next_background_update(1)
|
|
||||||
|
|
||||||
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
|
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
|
||||||
stor = hs.get_datastore()
|
stor = hs.get_datastore()
|
||||||
|
|
Loading…
Reference in a new issue