Compare commits

..

6 commits

Author SHA1 Message Date
Erik Johnston c7f1498e35 Newsfile 2021-11-11 12:00:15 +00:00
Erik Johnston dccddf15b0 Add tests 2021-11-11 11:53:36 +00:00
Erik Johnston 0ace9f8d85 Expose a sleep(..) func on ModuleApi 2021-11-11 11:53:36 +00:00
Erik Johnston 0c3ba88496 Add a register_background_update_controller 2021-11-11 11:53:36 +00:00
Erik Johnston 957da6f56e Add a BackgroundUpdateController class.
This controls how often and for how long a background update is run for.

The idea is to allow the default one to be replaced by a pluggable
module.
2021-11-11 11:53:36 +00:00
Erik Johnston 4a1a8321dd Store whether a BG update is oneshot or not 2021-11-10 11:15:28 +00:00
97 changed files with 601 additions and 446 deletions

View file

@ -1,97 +1,3 @@
Synapse 1.47.0rc2 (2021-11-10)
==============================
This fixes an issue with publishing the Debian packages for 1.47.0rc1.
It is otherwise identical to 1.47.0rc1.
Synapse 1.47.0rc1 (2021-11-09)
==============================
Deprecations and Removals
-------------------------
- The `user_may_create_room_with_invites` module callback is now deprecated. Please refer to the [upgrade notes](https://matrix-org.github.io/synapse/develop/upgrade#upgrading-to-v1470) for more information. ([\#11206](https://github.com/matrix-org/synapse/issues/11206))
- Remove deprecated admin API to delete rooms (`POST /_synapse/admin/v1/rooms/<room_id>/delete`). ([\#11213](https://github.com/matrix-org/synapse/issues/11213))
Features
--------
- Advertise support for Client-Server API r0.6.1. ([\#11097](https://github.com/matrix-org/synapse/issues/11097))
- Add search by room ID and room alias to the List Room admin API. ([\#11099](https://github.com/matrix-org/synapse/issues/11099))
- Add an `on_new_event` third-party rules callback to allow Synapse modules to act after an event has been sent into a room. ([\#11126](https://github.com/matrix-org/synapse/issues/11126))
- Add a module API method to update a user's membership in a room. ([\#11147](https://github.com/matrix-org/synapse/issues/11147))
- Add metrics for thread pool usage. ([\#11178](https://github.com/matrix-org/synapse/issues/11178))
- Support the stable room type field for [MSC3288](https://github.com/matrix-org/matrix-doc/pull/3288). ([\#11187](https://github.com/matrix-org/synapse/issues/11187))
- Add a module API method to retrieve the current state of a room. ([\#11204](https://github.com/matrix-org/synapse/issues/11204))
- Calculate a default value for `public_baseurl` based on `server_name`. ([\#11210](https://github.com/matrix-org/synapse/issues/11210))
- Add support for serving `/.well-known/matrix/server` files, to redirect federation traffic to port 443. ([\#11211](https://github.com/matrix-org/synapse/issues/11211))
- Add admin APIs to pause, start and check the status of background updates. ([\#11263](https://github.com/matrix-org/synapse/issues/11263))
Bugfixes
--------
- Fix a long-standing bug which allowed hidden devices to receive to-device messages, resulting in unnecessary database bloat. ([\#10097](https://github.com/matrix-org/synapse/issues/10097))
- Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine. ([\#10969](https://github.com/matrix-org/synapse/issues/10969), [\#11212](https://github.com/matrix-org/synapse/issues/11212))
- Do not accept events if a third-party rule `check_event_allowed` callback raises an exception. ([\#11033](https://github.com/matrix-org/synapse/issues/11033))
- Fix long-standing bug where verification requests could fail in certain cases if a federation whitelist was in place but did not include your own homeserver. ([\#11129](https://github.com/matrix-org/synapse/issues/11129))
- Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`. ([\#11188](https://github.com/matrix-org/synapse/issues/11188))
- Fix a bug introduced in Synapse 1.45.0 which prevented the `synapse_review_recent_signups` script from running. Contributed by @samuel-p. ([\#11191](https://github.com/matrix-org/synapse/issues/11191))
- Delete `to_device` messages for hidden devices that will never be read, reducing database size. ([\#11199](https://github.com/matrix-org/synapse/issues/11199))
- Fix a long-standing bug wherein a missing `Content-Type` header when downloading remote media would cause Synapse to throw an error. ([\#11200](https://github.com/matrix-org/synapse/issues/11200))
- Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper. ([\#11207](https://github.com/matrix-org/synapse/issues/11207))
- Fix a bug introduced in Synapse 1.35.0 which made it impossible to join rooms that return a `send_join` response containing floats. ([\#11217](https://github.com/matrix-org/synapse/issues/11217))
- Fix long-standing bug where cross signing keys were not included in the response to `/r0/keys/query` the first time a remote user was queried. ([\#11234](https://github.com/matrix-org/synapse/issues/11234))
- Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection. ([\#11240](https://github.com/matrix-org/synapse/issues/11240))
- Fix a bug preventing Synapse from being rolled back to an earlier version when using workers. ([\#11255](https://github.com/matrix-org/synapse/issues/11255), [\#11276](https://github.com/matrix-org/synapse/issues/11276))
- Fix a bug introduced in Synapse 1.37.1 which caused a remote event being processed by a worker to not get processed on restart if the worker was killed. ([\#11262](https://github.com/matrix-org/synapse/issues/11262))
- Only allow old Element/Riot Android clients to send read receipts without a request body. All other clients must include a request body as required by the specification. Contributed by @rogersheu. ([\#11157](https://github.com/matrix-org/synapse/issues/11157))
Updates to the Docker image
---------------------------
- Avoid changing user ID when started as a non-root user, and no explicit `UID` is set. ([\#11209](https://github.com/matrix-org/synapse/issues/11209))
Improved Documentation
----------------------
- Improve example HAProxy config in the docs to properly handle HTTP `Host` headers with port information. This is required for federation over port 443 to work correctly. ([\#11128](https://github.com/matrix-org/synapse/issues/11128))
- Add documentation for using Authentik as an OpenID Connect Identity Provider. Contributed by @samip5. ([\#11151](https://github.com/matrix-org/synapse/issues/11151))
- Clarify lack of support for Windows. ([\#11198](https://github.com/matrix-org/synapse/issues/11198))
- Improve code formatting and fix a few typos in docs. Contributed by @sumnerevans at Beeper. ([\#11221](https://github.com/matrix-org/synapse/issues/11221))
- Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr. ([\#11257](https://github.com/matrix-org/synapse/issues/11257))
Internal Changes
----------------
- Add type annotations for the `log_function` decorator. ([\#10943](https://github.com/matrix-org/synapse/issues/10943))
- Add type hints to `synapse.events`. ([\#11098](https://github.com/matrix-org/synapse/issues/11098))
- Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code. ([\#11137](https://github.com/matrix-org/synapse/issues/11137))
- Add type hints so that `synapse.http` passes `mypy` checks. ([\#11164](https://github.com/matrix-org/synapse/issues/11164))
- Update scripts to pass Shellcheck lints. ([\#11166](https://github.com/matrix-org/synapse/issues/11166))
- Add knock information in admin export. Contributed by Rafael Gonçalves. ([\#11171](https://github.com/matrix-org/synapse/issues/11171))
- Add tests to check that `ClientIpStore.get_last_client_ip_by_device` and `get_user_ip_and_agents` combine database and in-memory data correctly. ([\#11179](https://github.com/matrix-org/synapse/issues/11179))
- Refactor `Filter` to check different fields depending on the data type. ([\#11194](https://github.com/matrix-org/synapse/issues/11194))
- Improve type hints for the relations datastore. ([\#11205](https://github.com/matrix-org/synapse/issues/11205))
- Replace outdated links in the pull request checklist with links to the rendered documentation. ([\#11225](https://github.com/matrix-org/synapse/issues/11225))
- Fix a bug in unit test `test_block_room_and_not_purge`. ([\#11226](https://github.com/matrix-org/synapse/issues/11226))
- In `ObservableDeferred`, run observers in the order they were registered. ([\#11229](https://github.com/matrix-org/synapse/issues/11229))
- Minor speed up to start up times and getting updates for groups by adding missing index to `local_group_updates.stream_id`. ([\#11231](https://github.com/matrix-org/synapse/issues/11231))
- Add `twine` and `towncrier` as dev dependencies, as they're used by the release script. ([\#11233](https://github.com/matrix-org/synapse/issues/11233))
- Allow `stream_writers.typing` config to be a list of one worker. ([\#11237](https://github.com/matrix-org/synapse/issues/11237))
- Remove debugging statement in tests. ([\#11239](https://github.com/matrix-org/synapse/issues/11239))
- Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers. ([\#11244](https://github.com/matrix-org/synapse/issues/11244))
- Add an additional test for the `cachedList` method decorator. ([\#11246](https://github.com/matrix-org/synapse/issues/11246))
- Make minor correction to the type of `auth_checkers` callbacks. ([\#11253](https://github.com/matrix-org/synapse/issues/11253))
- Clean up trivial aspects of the Debian package build tooling. ([\#11269](https://github.com/matrix-org/synapse/issues/11269), [\#11273](https://github.com/matrix-org/synapse/issues/11273))
- Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse. ([\#11270](https://github.com/matrix-org/synapse/issues/11270))
Synapse 1.46.0 (2021-11-02)
===========================

1
changelog.d/10097.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug which allowed hidden devices to receive to-device messages, resulting in unnecessary database bloat.

1
changelog.d/10943.misc Normal file
View file

@ -0,0 +1 @@
Add type annotations for the `log_function` decorator.

1
changelog.d/10969.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine.

1
changelog.d/11033.bugfix Normal file
View file

@ -0,0 +1 @@
Do not accept events if a third-party rule module API callback raises an exception.

View file

@ -0,0 +1 @@
Advertise support for Client-Server API r0.6.1.

1
changelog.d/11098.misc Normal file
View file

@ -0,0 +1 @@
Add type hints to `synapse.events`.

View file

@ -0,0 +1 @@
Add search by room ID and room alias to List Room admin API.

View file

@ -0,0 +1 @@
Add an `on_new_event` third-party rules callback to allow Synapse modules to act after an event has been sent into a room.

1
changelog.d/11128.doc Normal file
View file

@ -0,0 +1 @@
Improve example HAProxy config in the docs to properly handle host headers with port information. This is required for federation over port 443 to work correctly.

1
changelog.d/11129.bugfix Normal file
View file

@ -0,0 +1 @@
Fix long-standing bug where verification requests could fail in certain cases if whitelist was in place but did not include your own homeserver.

1
changelog.d/11137.misc Normal file
View file

@ -0,0 +1 @@
Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code.

View file

@ -0,0 +1 @@
Add a module API method to update a user's membership in a room.

1
changelog.d/11151.doc Normal file
View file

@ -0,0 +1 @@
Add documentation for using Authentik as an OpenID Connect Identity Provider. Contributed by @samip5.

1
changelog.d/11157.misc Normal file
View file

@ -0,0 +1 @@
Only allow old Element/Riot Android clients to send read receipts without a request body. All other clients must include a request body as required by the specification. Contributed by @rogersheu.

1
changelog.d/11164.misc Normal file
View file

@ -0,0 +1 @@
Add type hints so that `synapse.http` passes `mypy` checks.

1
changelog.d/11166.misc Normal file
View file

@ -0,0 +1 @@
Update scripts to pass Shellcheck lints.

1
changelog.d/11171.misc Normal file
View file

@ -0,0 +1 @@
Add knock information in admin export. Contributed by Rafael Gonçalves.

View file

@ -0,0 +1 @@
Add metrics for thread pool usage.

1
changelog.d/11179.misc Normal file
View file

@ -0,0 +1 @@
Add tests to check that `ClientIpStore.get_last_client_ip_by_device` and `get_user_ip_and_agents` combine database and in-memory data correctly.

View file

@ -0,0 +1 @@
Support the stable room type field for [MSC3288](https://github.com/matrix-org/matrix-doc/pull/3288).

1
changelog.d/11188.bugfix Normal file
View file

@ -0,0 +1 @@
Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`.

1
changelog.d/11191.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.45.0 which prevented the `synapse_review_recent_signups` script from running. Contributed by @samuel-p.

1
changelog.d/11194.misc Normal file
View file

@ -0,0 +1 @@
Refactor `Filter` to check different fields depending on the data type.

1
changelog.d/11198.doc Normal file
View file

@ -0,0 +1 @@
Clarify lack of support for Windows.

1
changelog.d/11199.bugfix Normal file
View file

@ -0,0 +1 @@
Delete `to_device` messages for hidden devices that will never be read, reducing database size.

1
changelog.d/11200.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug wherein a missing `Content-Type` header when downloading remote media would cause Synapse to throw an error.

View file

@ -0,0 +1 @@
Add a module API method to retrieve the current state of a room.

1
changelog.d/11205.misc Normal file
View file

@ -0,0 +1 @@
Improve type hints for the relations datastore.

View file

@ -0,0 +1 @@
The `user_may_create_room_with_invites` module callback is now deprecated. Please refer to the [upgrade notes](https://matrix-org.github.io/synapse/develop/upgrade#upgrading-to-v1470) for more information.

1
changelog.d/11207.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper.

1
changelog.d/11209.docker Normal file
View file

@ -0,0 +1 @@
Avoid changing userid when started as a non-root user, and no explicit `UID` is set.

View file

@ -0,0 +1 @@
Calculate a default value for `public_baseurl` based on `server_name`.

View file

@ -0,0 +1 @@
Add support for serving `/.well-known/matrix/server` files, to redirect federation traffic to port 443.

1
changelog.d/11212.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine.

View file

@ -0,0 +1 @@
Remove deprecated admin API to delete rooms (`POST /_synapse/admin/v1/rooms/<room_id>/delete`).

1
changelog.d/11217.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug introduced in 1.35.0 which made it impossible to join rooms that return a `send_join` response containing floats.

1
changelog.d/11221.doc Normal file
View file

@ -0,0 +1 @@
Improve code formatting and fix a few typos in docs. Contributed by @sumnerevans at Beeper.

1
changelog.d/11225.misc Normal file
View file

@ -0,0 +1 @@
Replace outdated links in the pull request checklist with links to the rendered documentation.

1
changelog.d/11226.misc Normal file
View file

@ -0,0 +1 @@
Fix a bug in unit test `test_block_room_and_not_purge`.

1
changelog.d/11229.misc Normal file
View file

@ -0,0 +1 @@
`ObservableDeferred`: run registered observers in order.

1
changelog.d/11231.misc Normal file
View file

@ -0,0 +1 @@
Minor speed up to start up times and getting updates for groups by adding missing index to `local_group_updates.stream_id`.

1
changelog.d/11233.misc Normal file
View file

@ -0,0 +1 @@
Add `twine` and `towncrier` as dev dependencies, as they're used by the release script.

1
changelog.d/11234.bugfix Normal file
View file

@ -0,0 +1 @@
Fix long-standing bug where cross signing keys were not included in the response to `/r0/keys/query` the first time a remote user was queried.

1
changelog.d/11237.misc Normal file
View file

@ -0,0 +1 @@
Allow `stream_writers.typing` config to be a list of one worker.

1
changelog.d/11239.misc Normal file
View file

@ -0,0 +1 @@
Remove debugging statement in tests.

1
changelog.d/11240.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection.

1
changelog.d/11244.misc Normal file
View file

@ -0,0 +1 @@
Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.

1
changelog.d/11246.misc Normal file
View file

@ -0,0 +1 @@
Add an additional test for the `cachedList` method decorator.

1
changelog.d/11253.misc Normal file
View file

@ -0,0 +1 @@
Make minor correction to the type of `auth_checkers` callbacks.

1
changelog.d/11255.bugfix Normal file
View file

@ -0,0 +1 @@
Fix rolling back Synapse version when using workers.

1
changelog.d/11257.doc Normal file
View file

@ -0,0 +1 @@
Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr.

1
changelog.d/11262.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.

View file

@ -0,0 +1 @@
Add some background update admin APIs.

1
changelog.d/11269.misc Normal file
View file

@ -0,0 +1 @@
Clean up trivial aspects of the Debian package build tooling.

1
changelog.d/11270.misc Normal file
View file

@ -0,0 +1 @@
Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse.

1
changelog.d/11273.misc Normal file
View file

@ -0,0 +1 @@
Clean up trivial aspects of the Debian package build tooling.

1
changelog.d/11276.bugfix Normal file
View file

@ -0,0 +1 @@
Fix rolling back Synapse version when using workers.

View file

@ -1 +0,0 @@
Add missing type hints to `synapse.app`.

View file

@ -1 +0,0 @@
Fix a long-standing bug where uploading extremely thin images (e.g. 1000x1) would fail. Contributed by @Neeeflix.

View file

@ -1 +0,0 @@
Remove unused parameters on `FederationEventHandler._check_event_auth`.

View file

@ -1 +0,0 @@
Add type hints to `synapse._scripts`.

View file

@ -1 +0,0 @@
Add Single Sign-On, SAML and CAS pages to the documentation.

View file

@ -0,0 +1 @@
Add new plugin support for controlling background update timings.

8
debian/changelog vendored
View file

@ -1,16 +1,12 @@
matrix-synapse-py3 (1.47.0~rc2) stable; urgency=medium
matrix-synapse-py3 (1.47.0+nmu1) UNRELEASED; urgency=medium
[ Dan Callahan ]
* Update scripts to pass Shellcheck lints.
* Remove unused Vagrant scripts from debian/ directory.
* Allow building Debian packages for any architecture, not just amd64.
* Preinstall the "wheel" package when building virtualenvs.
* Do not error if /etc/default/matrix-synapse is missing.
[ Synapse Packaging team ]
* New synapse release 1.47.0~rc2.
-- Synapse Packaging team <packages@matrix.org> Wed, 10 Nov 2021 09:41:01 +0000
-- Dan Callahan <danc@element.io> Fri, 22 Oct 2021 22:20:31 +0000
matrix-synapse-py3 (1.46.0) stable; urgency=medium

View file

@ -23,10 +23,10 @@
- [Structured Logging](structured_logging.md)
- [Templates](templates.md)
- [User Authentication](usage/configuration/user_authentication/README.md)
- [Single-Sign On](usage/configuration/user_authentication/single_sign_on/README.md)
- [Single-Sign On]()
- [OpenID Connect](openid.md)
- [SAML](usage/configuration/user_authentication/single_sign_on/saml.md)
- [CAS](usage/configuration/user_authentication/single_sign_on/cas.md)
- [SAML]()
- [CAS]()
- [SSO Mapping Providers](sso_mapping_providers.md)
- [Password Auth Providers](password_auth_providers.md)
- [JSON Web Tokens](jwt.md)

View file

@ -1,5 +0,0 @@
# Single Sign-On
Synapse supports single sign-on through the SAML, Open ID Connect or CAS protocols.
LDAP and other login methods are supported through first and third-party password
auth provider modules.

View file

@ -1,8 +0,0 @@
# CAS
Synapse supports authenticating users via the [Central Authentication
Service protocol](https://en.wikipedia.org/wiki/Central_Authentication_Service)
(CAS) natively.
Please see the `cas_config` and `sso` sections of the [Synapse configuration
file](../../../configuration/homeserver_sample_config.md) for more details.

View file

@ -1,8 +0,0 @@
# SAML
Synapse supports authenticating users via the [Security Assertion
Markup Language](https://en.wikipedia.org/wiki/Security_Assertion_Markup_Language)
(SAML) protocol natively.
Please see the `saml2_config` and `sso` sections of the [Synapse configuration
file](../../../configuration/homeserver_sample_config.md) for more details.

View file

@ -23,6 +23,24 @@ files =
# https://docs.python.org/3/library/re.html#re.X
exclude = (?x)
^(
|synapse/_scripts/register_new_matrix_user.py
|synapse/_scripts/review_recent_signups.py
|synapse/app/__init__.py
|synapse/app/_base.py
|synapse/app/admin_cmd.py
|synapse/app/appservice.py
|synapse/app/client_reader.py
|synapse/app/event_creator.py
|synapse/app/federation_reader.py
|synapse/app/federation_sender.py
|synapse/app/frontend_proxy.py
|synapse/app/generic_worker.py
|synapse/app/homeserver.py
|synapse/app/media_repository.py
|synapse/app/phone_stats_home.py
|synapse/app/pusher.py
|synapse/app/synchrotron.py
|synapse/app/user_dir.py
|synapse/storage/databases/__init__.py
|synapse/storage/databases/main/__init__.py
|synapse/storage/databases/main/account_data.py
@ -163,9 +181,6 @@ exclude = (?x)
[mypy-synapse.api.*]
disallow_untyped_defs = True
[mypy-synapse.app.*]
disallow_untyped_defs = True
[mypy-synapse.crypto.*]
disallow_untyped_defs = True

View file

@ -110,7 +110,6 @@ CONDITIONAL_REQUIREMENTS["mypy"] = [
"types-Pillow>=8.3.4",
"types-pyOpenSSL>=20.0.7",
"types-PyYAML>=5.4.10",
"types-requests>=2.26.0",
"types-setuptools>=57.4.0",
]
@ -119,7 +118,9 @@ CONDITIONAL_REQUIREMENTS["mypy"] = [
# Tests assume that all optional dependencies are installed.
#
# parameterized_class decorator was introduced in parameterized 0.7.0
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0"]
#
# We use `mock` library as that backports `AsyncMock` to Python 3.6
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0", "mock>=4.0.0"]
CONDITIONAL_REQUIREMENTS["dev"] = (
CONDITIONAL_REQUIREMENTS["lint"]

View file

@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.47.0rc2"
__version__ = "1.46.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View file

@ -1,6 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,23 +19,22 @@ import hashlib
import hmac
import logging
import sys
from typing import Callable, Optional
import requests as _requests
import yaml
def request_registration(
user: str,
password: str,
server_location: str,
shared_secret: str,
admin: bool = False,
user_type: Optional[str] = None,
user,
password,
server_location,
shared_secret,
admin=False,
user_type=None,
requests=_requests,
_print: Callable[[str], None] = print,
exit: Callable[[int], None] = sys.exit,
) -> None:
_print=print,
exit=sys.exit,
):
url = "%s/_synapse/admin/v1/register" % (server_location.rstrip("/"),)
@ -67,13 +65,13 @@ def request_registration(
mac.update(b"\x00")
mac.update(user_type.encode("utf8"))
hex_mac = mac.hexdigest()
mac = mac.hexdigest()
data = {
"nonce": nonce,
"username": user,
"password": password,
"mac": hex_mac,
"mac": mac,
"admin": admin,
"user_type": user_type,
}
@ -93,17 +91,10 @@ def request_registration(
_print("Success!")
def register_new_user(
user: str,
password: str,
server_location: str,
shared_secret: str,
admin: Optional[bool],
user_type: Optional[str],
) -> None:
def register_new_user(user, password, server_location, shared_secret, admin, user_type):
if not user:
try:
default_user: Optional[str] = getpass.getuser()
default_user = getpass.getuser()
except Exception:
default_user = None
@ -132,8 +123,8 @@ def register_new_user(
sys.exit(1)
if admin is None:
admin_inp = input("Make admin [no]: ")
if admin_inp in ("y", "yes", "true"):
admin = input("Make admin [no]: ")
if admin in ("y", "yes", "true"):
admin = True
else:
admin = False
@ -143,7 +134,7 @@ def register_new_user(
)
def main() -> None:
def main():
logging.captureWarnings(True)

View file

@ -92,7 +92,7 @@ def get_recent_users(txn: LoggingTransaction, since_ms: int) -> List[UserInfo]:
return user_infos
def main() -> None:
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
@ -142,8 +142,7 @@ def main() -> None:
engine = create_engine(database_config.config)
with make_conn(database_config, engine, "review_recent_signups") as db_conn:
# This generates a type of Cursor, not LoggingTransaction.
user_infos = get_recent_users(db_conn.cursor(), since_ms) # type: ignore[arg-type]
user_infos = get_recent_users(db_conn.cursor(), since_ms)
for user_info in user_infos:
if exclude_users_with_email and user_info.emails:

View file

@ -13,7 +13,6 @@
# limitations under the License.
import logging
import sys
from typing import Container
from synapse import python_dependencies # noqa: E402
@ -28,9 +27,7 @@ except python_dependencies.DependencyException as e:
sys.exit(1)
def check_bind_error(
e: Exception, address: str, bind_addresses: Container[str]
) -> None:
def check_bind_error(e, address, bind_addresses):
"""
This method checks an exception occurred while binding on 0.0.0.0.
If :: is specified in the bind addresses a warning is shown.
@ -41,9 +38,9 @@ def check_bind_error(
When binding on 0.0.0.0 after :: this can safely be ignored.
Args:
e: Exception that was caught.
address: Address on which binding was attempted.
bind_addresses: Addresses on which the service listens.
e (Exception): Exception that was caught.
address (str): Address on which binding was attempted.
bind_addresses (list): Addresses on which the service listens.
"""
if address == "0.0.0.0" and "::" in bind_addresses:
logger.warning(

View file

@ -22,27 +22,13 @@ import socket
import sys
import traceback
import warnings
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Collection,
Dict,
Iterable,
List,
NoReturn,
Tuple,
cast,
)
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable
from cryptography.utils import CryptographyDeprecationWarning
from typing_extensions import NoReturn
import twisted
from twisted.internet import defer, error, reactor as _reactor
from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorSSL, IReactorTCP
from twisted.internet.protocol import ServerFactory
from twisted.internet.tcp import Port
from twisted.internet import defer, error, reactor
from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.threadpool import ThreadPool
@ -62,7 +48,6 @@ from synapse.logging.context import PreserveLoggingContext
from synapse.metrics import register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.types import ISynapseReactor
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
from synapse.util.daemonize import daemonize_process
from synapse.util.gai_resolver import GAIResolver
@ -72,44 +57,33 @@ from synapse.util.versionstring import get_version_string
if TYPE_CHECKING:
from synapse.server import HomeServer
# Twisted injects the global reactor to make it easier to import, this confuses
# mypy which thinks it is a module. Tell it that it a more proper type.
reactor = cast(ISynapseReactor, _reactor)
logger = logging.getLogger(__name__)
# list of tuples of function, args list, kwargs dict
_sighup_callbacks: List[
Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]
] = []
_sighup_callbacks = []
def register_sighup(func: Callable[..., None], *args: Any, **kwargs: Any) -> None:
def register_sighup(func, *args, **kwargs):
"""
Register a function to be called when a SIGHUP occurs.
Args:
func: Function to be called when sent a SIGHUP signal.
func (function): Function to be called when sent a SIGHUP signal.
*args, **kwargs: args and kwargs to be passed to the target function.
"""
_sighup_callbacks.append((func, args, kwargs))
def start_worker_reactor(
appname: str,
config: HomeServerConfig,
run_command: Callable[[], None] = reactor.run,
) -> None:
def start_worker_reactor(appname, config, run_command=reactor.run):
"""Run the reactor in the main process
Daemonizes if necessary, and then configures some resources, before starting
the reactor. Pulls configuration from the 'worker' settings in 'config'.
Args:
appname: application name which will be sent to syslog
config: config object
run_command: callable that actually runs the reactor
appname (str): application name which will be sent to syslog
config (synapse.config.Config): config object
run_command (Callable[]): callable that actually runs the reactor
"""
logger = logging.getLogger(config.worker.worker_app)
@ -127,32 +101,32 @@ def start_worker_reactor(
def start_reactor(
appname: str,
soft_file_limit: int,
gc_thresholds: Tuple[int, int, int],
pid_file: str,
daemonize: bool,
print_pidfile: bool,
logger: logging.Logger,
run_command: Callable[[], None] = reactor.run,
) -> None:
appname,
soft_file_limit,
gc_thresholds,
pid_file,
daemonize,
print_pidfile,
logger,
run_command=reactor.run,
):
"""Run the reactor in the main process
Daemonizes if necessary, and then configures some resources, before starting
the reactor
Args:
appname: application name which will be sent to syslog
soft_file_limit:
appname (str): application name which will be sent to syslog
soft_file_limit (int):
gc_thresholds:
pid_file: name of pid file to write to if daemonize is True
daemonize: true to run the reactor in a background process
print_pidfile: whether to print the pid file, if daemonize is True
logger: logger instance to pass to Daemonize
run_command: callable that actually runs the reactor
pid_file (str): name of pid file to write to if daemonize is True
daemonize (bool): true to run the reactor in a background process
print_pidfile (bool): whether to print the pid file, if daemonize is True
logger (logging.Logger): logger instance to pass to Daemonize
run_command (Callable[]): callable that actually runs the reactor
"""
def run() -> None:
def run():
logger.info("Running")
setup_jemalloc_stats()
change_resource_limit(soft_file_limit)
@ -211,7 +185,7 @@ def redirect_stdio_to_logs() -> None:
print("Redirected stdout/stderr to logs")
def register_start(cb: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> None:
def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
"""Register a callback with the reactor, to be called once it is running
This can be used to initialise parts of the system which require an asynchronous
@ -221,7 +195,7 @@ def register_start(cb: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> N
will exit.
"""
async def wrapper() -> None:
async def wrapper():
try:
await cb(*args, **kwargs)
except Exception:
@ -250,7 +224,7 @@ def register_start(cb: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> N
reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
def listen_metrics(bind_addresses: Iterable[str], port: int) -> None:
def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
"""
@ -262,11 +236,11 @@ def listen_metrics(bind_addresses: Iterable[str], port: int) -> None:
def listen_manhole(
bind_addresses: Collection[str],
bind_addresses: Iterable[str],
port: int,
manhole_settings: ManholeConfig,
manhole_globals: dict,
) -> None:
):
# twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing
# warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so
# suppress the warning for now.
@ -285,18 +259,12 @@ def listen_manhole(
)
def listen_tcp(
bind_addresses: Collection[str],
port: int,
factory: ServerFactory,
reactor: IReactorTCP = reactor,
backlog: int = 50,
) -> List[Port]:
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses
Returns:
list of twisted.internet.tcp.Port listening for TCP connections
list[twisted.internet.tcp.Port]: listening for TCP connections
"""
r = []
for address in bind_addresses:
@ -305,19 +273,12 @@ def listen_tcp(
except error.CannotListenError as e:
check_bind_error(e, address, bind_addresses)
# IReactorTCP returns an object implementing IListeningPort from listenTCP,
# but we know it will be a Port instance.
return r # type: ignore[return-value]
return r
def listen_ssl(
bind_addresses: Collection[str],
port: int,
factory: ServerFactory,
context_factory: IOpenSSLContextFactory,
reactor: IReactorSSL = reactor,
backlog: int = 50,
) -> List[Port]:
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
"""
Create an TLS-over-TCP socket for a port and several addresses
@ -333,13 +294,10 @@ def listen_ssl(
except error.CannotListenError as e:
check_bind_error(e, address, bind_addresses)
# IReactorSSL incorrectly declares that an int is returned from listenSSL,
# it actually returns an object implementing IListeningPort, but we know it
# will be a Port instance.
return r # type: ignore[return-value]
return r
def refresh_certificate(hs: "HomeServer") -> None:
def refresh_certificate(hs: "HomeServer"):
"""
Refresh the TLS certificates that Synapse is using by re-reading them from
disk and updating the TLS context factories to use them.
@ -371,7 +329,7 @@ def refresh_certificate(hs: "HomeServer") -> None:
logger.info("Context factories updated.")
async def start(hs: "HomeServer") -> None:
async def start(hs: "HomeServer"):
"""
Start a Synapse server or worker.
@ -402,7 +360,7 @@ async def start(hs: "HomeServer") -> None:
if hasattr(signal, "SIGHUP"):
@wrap_as_background_process("sighup")
def handle_sighup(*args: Any, **kwargs: Any) -> None:
def handle_sighup(*args, **kwargs):
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")
@ -415,7 +373,7 @@ async def start(hs: "HomeServer") -> None:
# We defer running the sighup handlers until next reactor tick. This
# is so that we're in a sane state, e.g. flushing the logs may fail
# if the sighup happens in the middle of writing a log entry.
def run_sighup(*args: Any, **kwargs: Any) -> None:
def run_sighup(*args, **kwargs):
# `callFromThread` should be "signal safe" as well as thread
# safe.
reactor.callFromThread(handle_sighup, *args, **kwargs)
@ -478,8 +436,12 @@ async def start(hs: "HomeServer") -> None:
atexit.register(gc.freeze)
def setup_sentry(hs: "HomeServer") -> None:
"""Enable sentry integration, if enabled in configuration"""
def setup_sentry(hs: "HomeServer"):
"""Enable sentry integration, if enabled in configuration
Args:
hs
"""
if not hs.config.metrics.sentry_enabled:
return
@ -504,7 +466,7 @@ def setup_sentry(hs: "HomeServer") -> None:
scope.set_tag("worker_name", name)
def setup_sdnotify(hs: "HomeServer") -> None:
def setup_sdnotify(hs: "HomeServer"):
"""Adds process state hooks to tell systemd what we are up to."""
# Tell systemd our state, if we're using it. This will silently fail if
@ -519,7 +481,7 @@ def setup_sdnotify(hs: "HomeServer") -> None:
sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
def sdnotify(state: bytes) -> None:
def sdnotify(state):
"""
Send a notification to systemd, if the NOTIFY_SOCKET env var is set.
@ -528,7 +490,7 @@ def sdnotify(state: bytes) -> None:
package which many OSes don't include as a matter of principle.
Args:
state: notification to send
state (bytes): notification to send
"""
if not isinstance(state, bytes):
raise TypeError("sdnotify should be called with a bytes")

View file

@ -17,7 +17,6 @@ import logging
import os
import sys
import tempfile
from typing import List, Optional
from twisted.internet import defer, task
@ -26,7 +25,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.events import EventBase
from synapse.handlers.admin import ExfiltrationWriter
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@ -42,7 +40,6 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.server import HomeServer
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.types import StateMap
from synapse.util.logcontext import LoggingContext
from synapse.util.versionstring import get_version_string
@ -68,11 +65,16 @@ class AdminCmdSlavedStore(
class AdminCmdServer(HomeServer):
DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore
DATASTORE_CLASS = AdminCmdSlavedStore
async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None:
"""Export data for a user."""
async def export_data_command(hs: HomeServer, args):
"""Export data for a user.
Args:
hs
args (argparse.Namespace)
"""
user_id = args.user_id
directory = args.output_directory
@ -90,12 +92,12 @@ class FileExfiltrationWriter(ExfiltrationWriter):
Note: This writes to disk on the main reactor thread.
Args:
user_id: The user whose data is being exfiltrated.
directory: The directory to write the data to, if None then will write
to a temporary directory.
user_id (str): The user whose data is being exfiltrated.
directory (str|None): The directory to write the data to, if None then
will write to a temporary directory.
"""
def __init__(self, user_id: str, directory: Optional[str] = None):
def __init__(self, user_id, directory=None):
self.user_id = user_id
if directory:
@ -109,7 +111,7 @@ class FileExfiltrationWriter(ExfiltrationWriter):
if list(os.listdir(self.base_directory)):
raise Exception("Directory must be empty")
def write_events(self, room_id: str, events: List[EventBase]) -> None:
def write_events(self, room_id, events):
room_directory = os.path.join(self.base_directory, "rooms", room_id)
os.makedirs(room_directory, exist_ok=True)
events_file = os.path.join(room_directory, "events")
@ -118,9 +120,7 @@ class FileExfiltrationWriter(ExfiltrationWriter):
for event in events:
print(json.dumps(event.get_pdu_json()), file=f)
def write_state(
self, room_id: str, event_id: str, state: StateMap[EventBase]
) -> None:
def write_state(self, room_id, event_id, state):
room_directory = os.path.join(self.base_directory, "rooms", room_id)
state_directory = os.path.join(room_directory, "state")
os.makedirs(state_directory, exist_ok=True)
@ -131,9 +131,7 @@ class FileExfiltrationWriter(ExfiltrationWriter):
for event in state.values():
print(json.dumps(event.get_pdu_json()), file=f)
def write_invite(
self, room_id: str, event: EventBase, state: StateMap[EventBase]
) -> None:
def write_invite(self, room_id, event, state):
self.write_events(room_id, [event])
# We write the invite state somewhere else as they aren't full events
@ -147,9 +145,7 @@ class FileExfiltrationWriter(ExfiltrationWriter):
for event in state.values():
print(json.dumps(event), file=f)
def write_knock(
self, room_id: str, event: EventBase, state: StateMap[EventBase]
) -> None:
def write_knock(self, room_id, event, state):
self.write_events(room_id, [event])
# We write the knock state somewhere else as they aren't full events
@ -163,11 +159,11 @@ class FileExfiltrationWriter(ExfiltrationWriter):
for event in state.values():
print(json.dumps(event), file=f)
def finished(self) -> str:
def finished(self):
return self.base_directory
def start(config_options: List[str]) -> None:
def start(config_options):
parser = argparse.ArgumentParser(description="Synapse Admin Command")
HomeServerConfig.add_arguments_to_parser(parser)
@ -235,7 +231,7 @@ def start(config_options: List[str]) -> None:
# We also make sure that `_base.start` gets run before we actually run the
# command.
async def run() -> None:
async def run():
with LoggingContext("command"):
await _base.start(ss)
await args.func(ss, args)

View file

@ -14,10 +14,11 @@
# limitations under the License.
import logging
import sys
from typing import Dict, List, Optional, Tuple
from typing import Dict, Optional
from twisted.internet import address
from twisted.web.resource import Resource
from twisted.web.resource import IResource
from twisted.web.server import Request
import synapse
import synapse.events
@ -43,7 +44,7 @@ from synapse.config.server import ListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
@ -118,7 +119,6 @@ from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import JsonDict
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.versionstring import get_version_string
@ -143,9 +143,7 @@ class KeyUploadServlet(RestServlet):
self.http_client = hs.get_simple_http_client()
self.main_uri = hs.config.worker.worker_main_http_uri
async def on_POST(
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
async def on_POST(self, request: Request, device_id: Optional[str]):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user_id = requester.user.to_string()
body = parse_json_object_from_request(request)
@ -189,8 +187,9 @@ class KeyUploadServlet(RestServlet):
# If the header exists, add to the comma-separated list of the first
# instance of the header. Otherwise, generate a new header.
if x_forwarded_for:
x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
x_forwarded_for.extend(x_forwarded_for[1:])
x_forwarded_for = [
x_forwarded_for[0] + b", " + previous_host
] + x_forwarded_for[1:]
else:
x_forwarded_for = [previous_host]
headers[b"X-Forwarded-For"] = x_forwarded_for
@ -254,16 +253,13 @@ class GenericWorkerSlavedStore(
SessionStore,
BaseSlavedStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
server_name: str
config: HomeServerConfig
pass
class GenericWorkerServer(HomeServer):
DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore
DATASTORE_CLASS = GenericWorkerSlavedStore
def _listen_http(self, listener_config: ListenerConfig) -> None:
def _listen_http(self, listener_config: ListenerConfig):
port = listener_config.port
bind_addresses = listener_config.bind_addresses
@ -271,10 +267,10 @@ class GenericWorkerServer(HomeServer):
site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = str(port)
site_tag = port
# We always include a health resource.
resources: Dict[str, Resource] = {"/health": HealthResource()}
resources: Dict[str, IResource] = {"/health": HealthResource()}
for res in listener_config.http_options.resources:
for name in res.names:
@ -390,7 +386,7 @@ class GenericWorkerServer(HomeServer):
logger.info("Synapse worker now listening on port %d", port)
def start_listening(self) -> None:
def start_listening(self):
for listener in self.config.worker.worker_listeners:
if listener.type == "http":
self._listen_http(listener)
@ -415,7 +411,7 @@ class GenericWorkerServer(HomeServer):
self.get_tcp_replication().start_replication(self)
def start(config_options: List[str]) -> None:
def start(config_options):
try:
config = HomeServerConfig.load_config("Synapse worker", config_options)
except ConfigError as e:

View file

@ -16,10 +16,10 @@
import logging
import os
import sys
from typing import Dict, Iterable, Iterator, List
from typing import Iterator
from twisted.internet.tcp import Port
from twisted.web.resource import EncodingResourceWrapper, Resource
from twisted.internet import reactor
from twisted.web.resource import EncodingResourceWrapper, IResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@ -76,27 +76,23 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.homeserver")
def gz_wrap(r: Resource) -> Resource:
def gz_wrap(r):
return EncodingResourceWrapper(r, [GzipEncoderFactory()])
class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore # type: ignore
DATASTORE_CLASS = DataStore
def _listener_http(
self, config: HomeServerConfig, listener_config: ListenerConfig
) -> Iterable[Port]:
def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
port = listener_config.port
bind_addresses = listener_config.bind_addresses
tls = listener_config.tls
# Must exist since this is an HTTP listener.
assert listener_config.http_options is not None
site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = str(port)
# We always include a health resource.
resources: Dict[str, Resource] = {"/health": HealthResource()}
resources = {"/health": HealthResource()}
for res in listener_config.http_options.resources:
for name in res.names:
@ -115,7 +111,7 @@ class SynapseHomeServer(HomeServer):
("listeners", site_tag, "additional_resources", "<%s>" % (path,)),
)
handler = handler_cls(config, module_api)
if isinstance(handler, Resource):
if IResource.providedBy(handler):
resource = handler
elif hasattr(handler, "handle_request"):
resource = AdditionalResource(self, handler.handle_request)
@ -132,7 +128,7 @@ class SynapseHomeServer(HomeServer):
# try to find something useful to redirect '/' to
if WEB_CLIENT_PREFIX in resources:
root_resource: Resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
elif STATIC_PREFIX in resources:
root_resource = RootOptionsRedirectResource(STATIC_PREFIX)
else:
@ -149,8 +145,6 @@ class SynapseHomeServer(HomeServer):
)
if tls:
# refresh_certificate should have been called before this.
assert self.tls_server_context_factory is not None
ports = listen_ssl(
bind_addresses,
port,
@ -171,21 +165,20 @@ class SynapseHomeServer(HomeServer):
return ports
def _configure_named_resource(
self, name: str, compress: bool = False
) -> Dict[str, Resource]:
def _configure_named_resource(self, name, compress=False):
"""Build a resource map for a named resource
Args:
name: named resource: one of "client", "federation", etc
compress: whether to enable gzip compression for this resource
name (str): named resource: one of "client", "federation", etc
compress (bool): whether to enable gzip compression for this
resource
Returns:
map from path to HTTP resource
dict[str, Resource]: map from path to HTTP resource
"""
resources: Dict[str, Resource] = {}
resources = {}
if name == "client":
client_resource: Resource = ClientRestResource(self)
client_resource = ClientRestResource(self)
if compress:
client_resource = gz_wrap(client_resource)
@ -214,7 +207,7 @@ class SynapseHomeServer(HomeServer):
if name == "consent":
from synapse.rest.consent.consent_resource import ConsentResource
consent_resource: Resource = ConsentResource(self)
consent_resource = ConsentResource(self)
if compress:
consent_resource = gz_wrap(consent_resource)
resources.update({"/_matrix/consent": consent_resource})
@ -284,7 +277,7 @@ class SynapseHomeServer(HomeServer):
return resources
def start_listening(self) -> None:
def start_listening(self):
if self.config.redis.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
@ -310,9 +303,7 @@ class SynapseHomeServer(HomeServer):
ReplicationStreamProtocolFactory(self),
)
for s in services:
self.get_reactor().addSystemEventTrigger(
"before", "shutdown", s.stopListening
)
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
elif listener.type == "metrics":
if not self.config.metrics.enable_metrics:
logger.warning(
@ -327,13 +318,14 @@ class SynapseHomeServer(HomeServer):
logger.warning("Unrecognized listener type: %s", listener.type)
def setup(config_options: List[str]) -> SynapseHomeServer:
def setup(config_options):
"""
Args:
config_options_options: The options passed to Synapse. Usually `sys.argv[1:]`.
config_options_options: The options passed to Synapse. Usually
`sys.argv[1:]`.
Returns:
A homeserver instance.
HomeServer
"""
try:
config = HomeServerConfig.load_or_generate_config(
@ -372,7 +364,7 @@ def setup(config_options: List[str]) -> SynapseHomeServer:
except Exception as e:
handle_startup_exception(e)
async def start() -> None:
async def start():
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
@ -412,15 +404,39 @@ def format_config_error(e: ConfigError) -> Iterator[str]:
yield ":\n %s" % (e.msg,)
parent_e = e.__cause__
e = e.__cause__
indent = 1
while parent_e:
while e:
indent += 1
yield ":\n%s%s" % (" " * indent, str(parent_e))
parent_e = parent_e.__cause__
yield ":\n%s%s" % (" " * indent, str(e))
e = e.__cause__
def run(hs: HomeServer) -> None:
def run(hs: HomeServer):
PROFILE_SYNAPSE = False
if PROFILE_SYNAPSE:
def profile(func):
from cProfile import Profile
from threading import current_thread
def profiled(*args, **kargs):
profile = Profile()
profile.enable()
func(*args, **kargs)
profile.disable()
ident = current_thread().ident
profile.dump_stats(
"/tmp/%s.%s.%i.pstat" % (hs.hostname, func.__name__, ident)
)
return profiled
from twisted.python.threadpool import ThreadPool
ThreadPool._worker = profile(ThreadPool._worker)
reactor.run = profile(reactor.run)
_base.start_reactor(
"synapse-homeserver",
soft_file_limit=hs.config.server.soft_file_limit,
@ -432,7 +448,7 @@ def run(hs: HomeServer) -> None:
)
def main() -> None:
def main():
with LoggingContext("main"):
# check base requirements
check_requirements()

View file

@ -15,12 +15,11 @@ import logging
import math
import resource
import sys
from typing import TYPE_CHECKING, List, Sized, Tuple
from typing import TYPE_CHECKING
from prometheus_client import Gauge
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -29,7 +28,7 @@ logger = logging.getLogger("synapse.app.homeserver")
# Contains the list of processes we will be monitoring
# currently either 0 or 1
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
_stats_process = []
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
@ -46,15 +45,9 @@ registered_reserved_users_mau_gauge = Gauge(
@wrap_as_background_process("phone_stats_home")
async def phone_stats_home(
hs: "HomeServer",
stats: JsonDict,
stats_process: List[Tuple[int, "resource.struct_rusage"]] = _stats_process,
) -> None:
async def phone_stats_home(hs: "HomeServer", stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
# Ensure the homeserver has started.
assert hs.start_time is not None
uptime = int(now - hs.start_time)
if uptime < 0:
uptime = 0
@ -153,15 +146,15 @@ async def phone_stats_home(
logger.warning("Error reporting stats: %s", e)
def start_phone_stats_home(hs: "HomeServer") -> None:
def start_phone_stats_home(hs: "HomeServer"):
"""
Start the background tasks which report phone home stats.
"""
clock = hs.get_clock()
stats: JsonDict = {}
stats = {}
def performance_stats_init() -> None:
def performance_stats_init():
_stats_process.clear()
_stats_process.append(
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
@ -177,10 +170,10 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
hs.get_datastore().reap_monthly_active_users()
@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users() -> None:
async def generate_monthly_active_users():
current_mau_count = 0
current_mau_count_by_service = {}
reserved_users: Sized = ()
reserved_users = ()
store = hs.get_datastore()
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
current_mau_count = await store.get_monthly_active_count()

View file

@ -234,7 +234,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write_invite(
self, room_id: str, event: EventBase, state: StateMap[EventBase]
self, room_id: str, event: EventBase, state: StateMap[dict]
) -> None:
"""Write an invite for the room, with associated invite state.
@ -248,7 +248,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write_knock(
self, room_id: str, event: EventBase, state: StateMap[EventBase]
self, room_id: str, event: EventBase, state: StateMap[dict]
) -> None:
"""Write a knock for the room, with associated knock state.

View file

@ -981,6 +981,8 @@ class FederationEventHandler:
origin,
event,
context,
state=state,
backfilled=backfilled,
)
except AuthError as e:
# FIXME richvdh 2021/10/07 I don't think this is reachable. Let's log it
@ -1330,6 +1332,8 @@ class FederationEventHandler:
origin: str,
event: EventBase,
context: EventContext,
state: Optional[Iterable[EventBase]] = None,
backfilled: bool = False,
) -> EventContext:
"""
Checks whether an event should be rejected (for failing auth checks).
@ -1340,6 +1344,12 @@ class FederationEventHandler:
context:
The event context.
state:
The state events used to check the event for soft-fail. If this is
not provided the current state events will be used.
backfilled: True if the event was backfilled.
Returns:
The updated context object.

View file

@ -3,7 +3,7 @@ import time
from logging import Handler, LogRecord
from logging.handlers import MemoryHandler
from threading import Thread
from typing import Optional, cast
from typing import Optional
from twisted.internet.interfaces import IReactorCore
@ -56,7 +56,7 @@ class PeriodicallyFlushingMemoryHandler(MemoryHandler):
if reactor is None:
from twisted.internet import reactor as global_reactor
reactor_to_use = cast(IReactorCore, global_reactor)
reactor_to_use = global_reactor # type: ignore[assignment]
else:
reactor_to_use = reactor

View file

@ -31,7 +31,7 @@ import attr
import jinja2
from twisted.internet import defer
from twisted.web.resource import Resource
from twisted.web.resource import IResource
from synapse.api.errors import SynapseError
from synapse.events import EventBase
@ -48,6 +48,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.background_updates import BackgroundUpdateController
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo
from synapse.storage.state import StateFilter
@ -92,6 +93,7 @@ __all__ = [
"JsonDict",
"EventBase",
"StateMap",
"BackgroundUpdateController",
]
logger = logging.getLogger(__name__)
@ -196,7 +198,7 @@ class ModuleApi:
"""
return self._password_auth_provider.register_password_auth_provider_callbacks
def register_web_resource(self, path: str, resource: Resource):
def register_web_resource(self, path: str, resource: IResource):
"""Registers a web resource to be served at the given path.
This function should be called during initialisation of the module.
@ -212,6 +214,21 @@ class ModuleApi:
"""
self._hs.register_module_web_resource(path, resource)
def register_background_update_controller(
self,
controller: BackgroundUpdateController,
) -> None:
"""Registers a background update controller.
Added in v1.48.0
Args:
controller: The controller to use.
"""
for db in self._hs.get_datastores().databases:
db.updates.register_update_controller(controller)
#########################################################################
# The following methods can be called by the module at any point in time.
@ -859,6 +876,11 @@ class ModuleApi:
f,
)
async def sleep(self, seconds: float) -> None:
"""Sleeps for the given number of seconds."""
await self._clock.sleep(seconds)
async def send_mail(
self,
recipient: str,

View file

@ -20,7 +20,7 @@ from typing import TYPE_CHECKING
from prometheus_client import Counter
from twisted.internet.protocol import ServerFactory
from twisted.internet.protocol import Factory
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import PositionCommand
@ -38,7 +38,7 @@ stream_updates_counter = Counter(
logger = logging.getLogger(__name__)
class ReplicationStreamProtocolFactory(ServerFactory):
class ReplicationStreamProtocolFactory(Factory):
"""Factory for new replication connections."""
def __init__(self, hs: "HomeServer"):

View file

@ -101,8 +101,8 @@ class Thumbnailer:
fits within the given rectangle::
(w_in / h_in) = (w_out / h_out)
w_out = max(min(w_max, h_max * (w_in / h_in)), 1)
h_out = max(min(h_max, w_max * (h_in / w_in)), 1)
w_out = min(w_max, h_max * (w_in / h_in))
h_out = min(h_max, w_max * (h_in / w_in))
Args:
max_width: The largest possible width.
@ -110,9 +110,9 @@ class Thumbnailer:
"""
if max_width * self.height < max_height * self.width:
return max_width, max((max_width * self.height) // self.width, 1)
return max_width, (max_width * self.height) // self.width
else:
return max((max_height * self.width) // self.height, 1), max_height
return (max_height * self.width) // self.height, max_height
def _resize(self, width: int, height: int) -> Image.Image:
# 1-bit or 8-bit color palette images need converting to RGB

View file

@ -33,10 +33,9 @@ from typing import (
cast,
)
from twisted.internet.interfaces import IOpenSSLContextFactory
from twisted.internet.tcp import Port
import twisted.internet.tcp
from twisted.web.iweb import IPolicyForHTTPS
from twisted.web.resource import Resource
from twisted.web.resource import IResource
from synapse.api.auth import Auth
from synapse.api.filtering import Filtering
@ -207,7 +206,7 @@ class HomeServer(metaclass=abc.ABCMeta):
Attributes:
config (synapse.config.homeserver.HomeserverConfig):
_listening_services (list[Port]): TCP ports that
_listening_services (list[twisted.internet.tcp.Port]): TCP ports that
we are listening on to provide HTTP services.
"""
@ -226,8 +225,6 @@ class HomeServer(metaclass=abc.ABCMeta):
# instantiated during setup() for future return by get_datastore()
DATASTORE_CLASS = abc.abstractproperty()
tls_server_context_factory: Optional[IOpenSSLContextFactory]
def __init__(
self,
hostname: str,
@ -250,7 +247,7 @@ class HomeServer(metaclass=abc.ABCMeta):
# the key we use to sign events and requests
self.signing_key = config.key.signing_key[0]
self.config = config
self._listening_services: List[Port] = []
self._listening_services: List[twisted.internet.tcp.Port] = []
self.start_time: Optional[int] = None
self._instance_id = random_string(5)
@ -260,10 +257,10 @@ class HomeServer(metaclass=abc.ABCMeta):
self.datastores: Optional[Databases] = None
self._module_web_resources: Dict[str, Resource] = {}
self._module_web_resources: Dict[str, IResource] = {}
self._module_web_resources_consumed = False
def register_module_web_resource(self, path: str, resource: Resource):
def register_module_web_resource(self, path: str, resource: IResource):
"""Allows a module to register a web resource to be served at the given path.
If multiple modules register a resource for the same path, the module that

View file

@ -11,13 +11,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional
from typing import (
TYPE_CHECKING,
AsyncContextManager,
Awaitable,
Callable,
Dict,
Iterable,
Optional,
)
import attr
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.types import Connection
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util import Clock, json_encoder
from . import engines
@ -28,6 +39,136 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _BackgroundUpdateHandler:
"""A handler for a given background update.
Attributes:
callback: The function to call to make progress on the background
update.
oneshot: Wether the update is likely to happen all in one go, ignoring
the supplied target duration, e.g. index creation. This is used by
the update controller to help correctly schedule the update.
"""
callback: Callable[[JsonDict, int], Awaitable[int]]
oneshot: bool = False
class BackgroundUpdateController(abc.ABC):
"""A base class for controlling background update timings."""
####
# NOTE: This is used by modules so changes must be backwards compatible or
# be announced appropriately
####
@abc.abstractmethod
def run_update(
self, update_name: str, database_name: str, oneshot: bool
) -> AsyncContextManager[int]:
"""Called before we do the next iteration of a background update. The
returned async context manager is immediately entered and then exited
after this iteration of the background update has finished.
Implementations will likely want to sleep for a period of time to stop
the background update from continuously being run.
Args:
update_name: The name of the update that is to be run
database_name: The name of the database the background update is
being run on. Really only useful if Synapse is configured with
multiple databases.
oneshot: Whether the update will complete all in one go, e.g.
index creation. In such cases the returned target duration is
ignored.
Returns:
The target duration in milliseconds that the background update
should run for.
Note: this is a *target*, and an iteration may take substantially
longer or shorter.
"""
...
@abc.abstractmethod
async def default_batch_size(self, update_name: str, database_name: str) -> int:
"""The batch size to use for the first iteration of a new background
update.
"""
...
@abc.abstractmethod
async def min_batch_size(self, update_name: str, database_name: str) -> int:
"""A lower bound on the batch size of a new background update.
Used to ensure that progress is always made. Must be greater than 0.
"""
...
class _TimeBasedBackgroundUpdateController(BackgroundUpdateController):
"""The default controller which aims to spend X ms doing the background
update every Y ms.
"""
MINIMUM_BACKGROUND_BATCH_SIZE = 100
DEFAULT_BACKGROUND_BATCH_SIZE = 100
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, clock: Clock):
self._clock = clock
def run_update(
self,
update_name: str,
database_name: str,
oneshot: bool,
) -> AsyncContextManager[int]:
return self
async def default_batch_size(self, update_name: str, database_name: str) -> int:
return self.DEFAULT_BACKGROUND_BATCH_SIZE
async def min_batch_size(self, update_name: str, database_name: str) -> int:
return self.MINIMUM_BACKGROUND_BATCH_SIZE
async def __aenter__(self) -> int:
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000)
return self.BACKGROUND_UPDATE_DURATION_MS
async def __aexit__(self, *exc):
pass
class _ImmediateBackgroundUpdateController(BackgroundUpdateController):
"""A background update controller that doesn't ever wait, effectively
running the background updates as quickly as possible"""
def run_update(
self,
update_name: str,
database_name: str,
oneshot: bool,
) -> AsyncContextManager[int]:
return self
async def default_batch_size(self, update_name: str, database_name: str) -> int:
return 100
async def min_batch_size(self, update_name: str, database_name: str) -> int:
return 100
async def __aenter__(self) -> int:
return 100
async def __aexit__(self, *exc):
pass
class BackgroundUpdatePerformance:
"""Tracks the how long a background update is taking to update its items"""
@ -82,22 +223,21 @@ class BackgroundUpdater:
process and autotuning the batch size.
"""
MINIMUM_BACKGROUND_BATCH_SIZE = 100
DEFAULT_BACKGROUND_BATCH_SIZE = 100
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, hs: "HomeServer", database: "DatabasePool"):
self._clock = hs.get_clock()
self.db_pool = database
self._database_name = database.name()
# if a background update is currently running, its name.
self._current_background_update: Optional[str] = None
self._controller: BackgroundUpdateController = (
_TimeBasedBackgroundUpdateController(self._clock)
)
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
self._background_update_handlers: Dict[
str, Callable[[JsonDict, int], Awaitable[int]]
] = {}
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
self._all_done = False
# Whether we're currently running updates
@ -107,6 +247,13 @@ class BackgroundUpdater:
# enable/disable background updates via the admin API.
self.enabled = True
def register_update_controller(
self, controller: BackgroundUpdateController
) -> None:
"""Register a new background update controller to use."""
self._controller = controller
def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
"""Returns the current background update, if any."""
@ -133,13 +280,8 @@ class BackgroundUpdater:
try:
logger.info("Starting background schema updates")
while self.enabled:
if sleep:
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
result = await self.do_next_background_update(sleep)
except Exception:
logger.exception("Error doing update")
else:
@ -201,13 +343,15 @@ class BackgroundUpdater:
return not update_exists
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
async def do_next_background_update(self, sleep: bool = True) -> bool:
"""Does some amount of work on the next queued background update
Returns once some amount of work is done.
Args:
desired_duration_ms: How long we want to spend updating.
sleep: Whether to limit how quickly we run background updates or
not.
Returns:
True if we have finished running all the background updates, otherwise False
"""
@ -250,7 +394,25 @@ class BackgroundUpdater:
self._current_background_update = upd["update_name"]
await self._do_background_update(desired_duration_ms)
# We have a background update to run, otherwise we would have returned
# early.
assert self._current_background_update is not None
update_info = self._background_update_handlers[self._current_background_update]
if sleep:
controller = self._controller
else:
# If `sleep` is False then we want to run the updates as quickly as
# possible.
controller = _ImmediateBackgroundUpdateController()
async with controller.run_update(
update_name=self._current_background_update,
database_name=self._database_name,
oneshot=update_info.oneshot,
) as desired_duration_ms:
await self._do_background_update(desired_duration_ms)
return False
async def _do_background_update(self, desired_duration_ms: float) -> int:
@ -258,7 +420,7 @@ class BackgroundUpdater:
update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
update_handler = self._background_update_handlers[update_name].callback
performance = self._background_update_performance.get(update_name)
@ -271,9 +433,14 @@ class BackgroundUpdater:
if items_per_ms is not None:
batch_size = int(desired_duration_ms * items_per_ms)
# Clamp the batch size so that we always make progress
batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
batch_size = max(
batch_size,
await self._controller.min_batch_size(update_name, self._database_name),
)
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
batch_size = await self._controller.default_batch_size(
update_name, self._database_name
)
progress_json = await self.db_pool.simple_select_one_onecol(
"background_updates",
@ -292,6 +459,8 @@ class BackgroundUpdater:
duration_ms = time_stop - time_start
performance.update(items_updated, duration_ms)
logger.info(
"Running background update %r. Processed %r items in %rms."
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
@ -304,8 +473,6 @@ class BackgroundUpdater:
batch_size,
)
performance.update(items_updated, duration_ms)
return len(self._background_update_performance)
def register_background_update_handler(
@ -329,7 +496,9 @@ class BackgroundUpdater:
update_name: The name of the update that this code handles.
update_handler: The function that does the update.
"""
self._background_update_handlers[update_name] = update_handler
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
update_handler
)
def register_noop_background_update(self, update_name: str) -> None:
"""Register a noop handler for a background update.
@ -451,7 +620,9 @@ class BackgroundUpdater:
await self._end_background_update(update_name)
return 1
self.register_background_update_handler(update_name, updater)
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.

View file

@ -38,7 +38,6 @@ from zope.interface import Interface
from twisted.internet.interfaces import (
IReactorCore,
IReactorPluggableNameResolver,
IReactorSSL,
IReactorTCP,
IReactorThreads,
IReactorTime,
@ -67,7 +66,6 @@ JsonDict = Dict[str, Any]
# for mypy-zope to realize it is an interface.
class ISynapseReactor(
IReactorTCP,
IReactorSSL,
IReactorPluggableNameResolver,
IReactorTime,
IReactorCore,

View file

@ -31,13 +31,13 @@ from typing import (
Set,
TypeVar,
Union,
cast,
)
import attr
from typing_extensions import ContextManager
from twisted.internet import defer
from twisted.internet.base import ReactorBase
from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime
from twisted.python import failure
@ -271,7 +271,8 @@ class Linearizer:
if not clock:
from twisted.internet import reactor
clock = Clock(cast(IReactorTime, reactor))
assert isinstance(reactor, ReactorBase)
clock = Clock(reactor)
self._clock = clock
self.max_count = max_count

View file

@ -92,9 +92,9 @@ def _resource_id(resource: Resource, path_seg: bytes) -> str:
the mapping should looks like _resource_id(A,C) = B.
Args:
resource: The *parent* Resourceb
path_seg: The name of the child Resource to be attached.
resource (Resource): The *parent* Resourceb
path_seg (str): The name of the child Resource to be attached.
Returns:
A unique string which can be a key to the child Resource.
str: A unique string which can be a key to the child Resource.
"""
return "%s-%r" % (resource, path_seg)

View file

@ -23,7 +23,7 @@ from twisted.conch.manhole import ColoredManhole, ManholeInterpreter
from twisted.conch.ssh.keys import Key
from twisted.cred import checkers, portal
from twisted.internet import defer
from twisted.internet.protocol import ServerFactory
from twisted.internet.protocol import Factory
from synapse.config.server import ManholeConfig
@ -65,7 +65,7 @@ EddTrx3TNpr1D5m/f+6mnXWrc8u9y1+GNx9yz889xMjIBTBI9KqaaOs=
-----END RSA PRIVATE KEY-----"""
def manhole(settings: ManholeConfig, globals: Dict[str, Any]) -> ServerFactory:
def manhole(settings: ManholeConfig, globals: Dict[str, Any]) -> Factory:
"""Starts a ssh listener with password authentication using
the given username and password. Clients connecting to the ssh
listener will find themselves in a colored python shell with
@ -105,8 +105,7 @@ def manhole(settings: ManholeConfig, globals: Dict[str, Any]) -> ServerFactory:
factory.privateKeys[b"ssh-rsa"] = priv_key # type: ignore[assignment]
factory.publicKeys[b"ssh-rsa"] = pub_key # type: ignore[assignment]
# ConchFactory is a Factory, not a ServerFactory, but they are identical.
return factory # type: ignore[return-value]
return factory
class SynapseManhole(ColoredManhole):

View file

@ -408,13 +408,9 @@ class EmailPusherTests(HomeserverTestCase):
self.hs.get_datastore().db_pool.updates._all_done = False
# Now let's actually drive the updates to completion
while not self.get_success(
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.hs.get_datastore().db_pool.updates.do_next_background_update(100),
by=0.1,
)
self.get_success(
self.hs.get_datastore().db_pool.updates.run_background_updates(False)
)
# Check that all pushers with unlinked addresses were deleted
pushers = self.get_success(

View file

@ -1,6 +1,11 @@
from unittest.mock import Mock
from mock import AsyncMock, Mock
from synapse.storage.background_updates import BackgroundUpdater
from twisted.internet.defer import Deferred, ensureDeferred
from synapse.storage.background_updates import (
BackgroundUpdateController,
BackgroundUpdater,
)
from tests import unittest
@ -20,10 +25,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def test_do_background_update(self):
# the time we claim each update takes
duration_ms = 42
duration_ms = 0.2
# the target runtime for each bg update
target_background_update_duration_ms = 50000
target_background_update_duration_ms = 100
store = self.hs.get_datastore()
self.get_success(
@ -48,17 +53,13 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
self.update_handler.side_effect = update
self.update_handler.reset_mock()
res = self.get_success(
self.updates.do_next_background_update(
target_background_update_duration_ms
),
by=0.1,
self.updates.do_next_background_update(False),
by=0.01,
)
self.assertFalse(res)
# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
{"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE
)
self.update_handler.assert_called_once_with({"my_key": 1}, 100)
# second step: complete the update
# we should now get run with a much bigger number of items to update
@ -74,16 +75,80 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
self.update_handler.side_effect = update
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
result = self.get_success(self.updates.do_next_background_update(False))
self.assertFalse(result)
self.update_handler.assert_called_once()
# third step: we don't expect to be called any more
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
result = self.get_success(self.updates.do_next_background_update(False))
self.assertTrue(result)
self.assertFalse(self.update_handler.called)
class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
# the base test class should have run the real bg updates for us
self.assertTrue(
self.get_success(self.updates.has_completed_background_updates())
)
self.update_deferred = Deferred()
self.update_handler = Mock(return_value=self.update_deferred)
self.updates.register_background_update_handler(
"test_update", self.update_handler
)
self._controller_ctx_mgr = AsyncMock(name="_controller_ctx_mgr")
self._controller = AsyncMock(BackgroundUpdateController)
self._controller.run_update.return_value = self._controller_ctx_mgr
self.updates.register_update_controller(self._controller)
def test_controller(self):
store = self.hs.get_datastore()
self.get_success(
store.db_pool.simple_insert(
"background_updates",
values={"update_name": "test_update", "progress_json": "{}"},
)
)
default_batch_size = 100
# Set up the return values of the controller.
enter_defer = Deferred()
self._controller_ctx_mgr.__aenter__ = Mock(return_value=enter_defer)
self._controller.default_batch_size.return_value = default_batch_size
self._controller.min_batch_size.return_value = default_batch_size
# Start the background update.
do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
self.pump()
# `run_update` should have been called, but the update handler won't be
# called until the `enter_defer` (returned by `__aenter__`) is resolved.
self._controller.run_update.assert_called_once_with(
update_name="test_update",
database_name="master",
oneshot=False,
)
self.assertFalse(do_update_d.called)
self.assertFalse(self.update_deferred.called)
# Resolving the `enter_defer` should call the update handler, which then
# blocks.
enter_defer.callback(100)
self.pump()
self.update_handler.assert_called_once_with({}, default_batch_size)
self.assertFalse(self.update_deferred.called)
self._controller_ctx_mgr.__aexit__.assert_not_awaited()
# Resolving the update handler deferred should cause the
# `do_next_background_update` to finish and return
self.update_deferred.callback(100)
self.pump()
self._controller_ctx_mgr.__aexit__.assert_awaited()
self.get_success(do_update_d)

View file

@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
):
iterations += 1
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
self.store.db_pool.updates.do_next_background_update(False), by=0.1
)
# Ensure that we did actually take multiple iterations to process the
@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
):
iterations += 1
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
self.store.db_pool.updates.do_next_background_update(False), by=0.1
)
# Ensure that we did actually take multiple iterations to process the

View file

@ -81,6 +81,8 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
origin,
event,
context,
state=None,
backfilled=False,
):
return context

View file

@ -319,12 +319,7 @@ class HomeserverTestCase(TestCase):
def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.get_success(self.store.db_pool.updates.run_background_updates(False))
def make_homeserver(self, reactor, clock):
"""
@ -484,8 +479,7 @@ class HomeserverTestCase(TestCase):
async def run_bg_updates():
with LoggingContext("run_bg_updates"):
while not await stor.db_pool.updates.has_completed_background_updates():
await stor.db_pool.updates.do_next_background_update(1)
self.get_success(stor.db_pool.updates.run_background_updates(False))
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore()