Add an admin API to run background jobs. (#11352)

Instead of having admins poke into the database directly.

Can currently run jobs to populate stats and to populate
the user directory.
This commit is contained in:
Dirk Klimpel 2021-11-19 20:39:46 +01:00 committed by GitHub
parent 7ae559944a
commit ea20937084
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 280 additions and 43 deletions

View file

@ -0,0 +1 @@
Add admin API to run background jobs.

View file

@ -2360,8 +2360,8 @@ user_directory:
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
# manually trigger a rebuild following the instructions at
# https://matrix-org.github.io/synapse/latest/user_directory.html
# manually trigger a rebuild via API following the instructions at
# https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.

View file

@ -42,7 +42,6 @@ For each update:
`average_items_per_ms` how many items are processed per millisecond based on an exponential average.
## Enabled
This API allow pausing background updates.
@ -82,3 +81,29 @@ The API returns the `enabled` param.
```
There is also a `GET` version which returns the `enabled` state.
## Run
This API schedules a specific background update to run. The job starts immediately after calling the API.
The API is:
```
POST /_synapse/admin/v1/background_updates/start_job
```
with the following body:
```json
{
"job_name": "populate_stats_process_rooms"
}
```
The following JSON body parameters are available:
- `job_name` - A string which job to run. Valid values are:
- `populate_stats_process_rooms` - Recalculate the stats for all rooms.
- `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync.

View file

@ -6,9 +6,9 @@ on this particular server - i.e. ones which your account shares a room with, or
who are present in a publicly viewable room present on the server.
The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
and then restart synapse. This should then start a background task to
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to use the [admin API](usage/administration/admin_api/background_updates.md#run)
and execute the job `regenerate_directory`. This should then start a background task to
flush the current tables and regenerate the directory.
Data model

View file

@ -53,8 +53,8 @@ class UserDirectoryConfig(Config):
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
# manually trigger a rebuild following the instructions at
# https://matrix-org.github.io/synapse/latest/user_directory.html
# manually trigger a rebuild via API following the instructions at
# https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.

View file

@ -28,6 +28,7 @@ from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
from synapse.rest.admin.background_updates import (
BackgroundUpdateEnabledRestServlet,
BackgroundUpdateRestServlet,
BackgroundUpdateStartJobRestServlet,
)
from synapse.rest.admin.devices import (
DeleteDevicesRestServlet,
@ -261,6 +262,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SendServerNoticeServlet(hs).register(http_server)
BackgroundUpdateEnabledRestServlet(hs).register(http_server)
BackgroundUpdateRestServlet(hs).register(http_server)
BackgroundUpdateStartJobRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(

View file

@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
from synapse.types import JsonDict
@ -29,37 +34,36 @@ logger = logging.getLogger(__name__)
class BackgroundUpdateEnabledRestServlet(RestServlet):
"""Allows temporarily disabling background updates"""
PATTERNS = admin_patterns("/background_updates/enabled")
PATTERNS = admin_patterns("/background_updates/enabled$")
def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
self.data_stores = hs.get_datastores()
self._auth = hs.get_auth()
self._data_stores = hs.get_datastores()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)
# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
enabled = all(db.updates.enabled for db in self._data_stores.databases)
return 200, {"enabled": enabled}
return HTTPStatus.OK, {"enabled": enabled}
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)
body = parse_json_object_from_request(request)
enabled = body.get("enabled", True)
if not isinstance(enabled, bool):
raise SynapseError(400, "'enabled' parameter must be a boolean")
raise SynapseError(
HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean"
)
for db in self.data_stores.databases:
for db in self._data_stores.databases:
db.updates.enabled = enabled
# If we're re-enabling them ensure that we start the background
@ -67,32 +71,29 @@ class BackgroundUpdateEnabledRestServlet(RestServlet):
if enabled:
db.updates.start_doing_background_updates()
return 200, {"enabled": enabled}
return HTTPStatus.OK, {"enabled": enabled}
class BackgroundUpdateRestServlet(RestServlet):
"""Fetch information about background updates"""
PATTERNS = admin_patterns("/background_updates/status")
PATTERNS = admin_patterns("/background_updates/status$")
def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
self.data_stores = hs.get_datastores()
self._auth = hs.get_auth()
self._data_stores = hs.get_datastores()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)
# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
enabled = all(db.updates.enabled for db in self._data_stores.databases)
current_updates = {}
for db in self.data_stores.databases:
for db in self._data_stores.databases:
update = db.updates.get_current_update()
if not update:
continue
@ -104,4 +105,72 @@ class BackgroundUpdateRestServlet(RestServlet):
"average_items_per_ms": update.average_items_per_ms(),
}
return 200, {"enabled": enabled, "current_updates": current_updates}
return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates}
class BackgroundUpdateStartJobRestServlet(RestServlet):
"""Allows to start specific background updates"""
PATTERNS = admin_patterns("/background_updates/start_job")
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastore()
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)
body = parse_json_object_from_request(request)
assert_params_in_dict(body, ["job_name"])
job_name = body["job_name"]
if job_name == "populate_stats_process_rooms":
jobs = [
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
},
]
elif job_name == "regenerate_directory":
jobs = [
{
"update_name": "populate_user_directory_createtables",
"progress_json": "{}",
"depends_on": "",
},
{
"update_name": "populate_user_directory_process_rooms",
"progress_json": "{}",
"depends_on": "populate_user_directory_createtables",
},
{
"update_name": "populate_user_directory_process_users",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_rooms",
},
{
"update_name": "populate_user_directory_cleanup",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_users",
},
]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
try:
await self._store.db_pool.simple_insert_many(
table="background_updates",
values=jobs,
desc=f"admin_api_run_{job_name}",
)
except self._store.db_pool.engine.module.IntegrityError:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Job %s is already in queue of background updates." % (job_name,),
)
self._store.db_pool.updates.start_doing_background_updates()
return HTTPStatus.OK, {}

View file

@ -122,6 +122,8 @@ class BackgroundUpdater:
def start_doing_background_updates(self) -> None:
if self.enabled:
# if we start a new background update, not all updates are done.
self._all_done = False
run_as_background_process("background_updates", self.run_background_updates)
async def run_background_updates(self, sleep: bool = True) -> None:

View file

@ -11,8 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http import HTTPStatus
from typing import Collection
from parameterized import parameterized
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.server import HomeServer
@ -30,6 +35,60 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@parameterized.expand(
[
("GET", "/_synapse/admin/v1/background_updates/enabled"),
("POST", "/_synapse/admin/v1/background_updates/enabled"),
("GET", "/_synapse/admin/v1/background_updates/status"),
("POST", "/_synapse/admin/v1/background_updates/start_job"),
]
)
def test_requester_is_no_admin(self, method: str, url: str):
"""
If the user is not a server admin, an error 403 is returned.
"""
self.register_user("user", "pass", admin=False)
other_user_tok = self.login("user", "pass")
channel = self.make_request(
method,
url,
content={},
access_token=other_user_tok,
)
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self):
"""
If parameters are invalid, an error is returned.
"""
url = "/_synapse/admin/v1/background_updates/start_job"
# empty content
channel = self.make_request(
"POST",
url,
content={},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
# job_name invalid
channel = self.make_request(
"POST",
url,
content={"job_name": "unknown"},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
def _register_bg_update(self):
"Adds a bg update but doesn't start it"
@ -60,7 +119,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled, but none should be running.
self.assertDictEqual(
@ -82,7 +141,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled, and one should be running.
self.assertDictEqual(
@ -114,7 +173,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/enabled",
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": True})
# Disable the BG updates
@ -124,7 +183,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
content={"enabled": False},
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": False})
# Advance a bit and get the current status, note this will finish the in
@ -137,7 +196,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(
channel.json_body,
{
@ -162,7 +221,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# There should be no change from the previous /status response.
self.assertDictEqual(
@ -188,7 +247,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
content={"enabled": True},
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": True})
@ -199,7 +258,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled and making progress.
self.assertDictEqual(
@ -216,3 +275,82 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"enabled": True,
},
)
@parameterized.expand(
[
("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
(
"regenerate_directory",
[
"populate_user_directory_createtables",
"populate_user_directory_process_rooms",
"populate_user_directory_process_users",
"populate_user_directory_cleanup",
],
),
]
)
def test_start_backround_job(self, job_name: str, updates: Collection[str]):
"""
Test that background updates add to database and be processed.
Args:
job_name: name of the job to call with API
updates: collection of background updates to be started
"""
# no background update is waiting
self.assertTrue(
self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
)
)
channel = self.make_request(
"POST",
"/_synapse/admin/v1/background_updates/start_job",
content={"job_name": job_name},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# test that each background update is waiting now
for update in updates:
self.assertFalse(
self.get_success(
self.store.db_pool.updates.has_completed_background_update(update)
)
)
self.wait_for_background_updates()
# background updates are done
self.assertTrue(
self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
)
)
def test_start_backround_job_twice(self):
"""Test that add a background update twice return an error."""
# add job to database
self.get_success(
self.store.db_pool.simple_insert(
table="background_updates",
values={
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
},
)
)
channel = self.make_request(
"POST",
"/_synapse/admin/v1/background_updates/start_job",
content={"job_name": "populate_stats_process_rooms"},
access_token=self.admin_user_tok,
)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)