Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2021-05-27 17:11:57 +01:00
commit d2fcfef679
85 changed files with 519 additions and 350 deletions

View file

@ -41,7 +41,7 @@ workflows:
- dockerhubuploadlatest:
filters:
branches:
only: master
only: [ master, main ]
commands:
docker_prepare:

View file

@ -34,7 +34,13 @@ jobs:
if: ${{ github.base_ref == 'develop' || contains(github.base_ref, 'release-') }}
runs-on: ubuntu-latest
steps:
# Note: This and the script can be simplified once we drop Buildkite. See:
# https://github.com/actions/checkout/issues/266#issuecomment-638346893
# https://github.com/actions/checkout/issues/416
- uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0
- uses: actions/setup-python@v2
- run: pip install tox
- name: Patch Buildkite-specific test script

View file

@ -1,3 +1,76 @@
Synapse 1.35.0rc2 (2021-05-27)
==============================
Bugfixes
--------
- Fix a bug introduced in v1.35.0rc1 when calling the spaces summary API via a GET request. ([\#10079](https://github.com/matrix-org/synapse/issues/10079))
Synapse 1.35.0rc1 (2021-05-25)
==============================
Features
--------
- Add experimental support to allow a user who could join a restricted room to view it in the spaces summary. ([\#9922](https://github.com/matrix-org/synapse/issues/9922), [\#10007](https://github.com/matrix-org/synapse/issues/10007), [\#10038](https://github.com/matrix-org/synapse/issues/10038))
- Reduce memory usage when joining very large rooms over federation. ([\#9958](https://github.com/matrix-org/synapse/issues/9958))
- Add a configuration option which allows enabling opentracing by user id. ([\#9978](https://github.com/matrix-org/synapse/issues/9978))
- Enable experimental support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) (spaces summary API) and [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) (restricted join rules) by default. ([\#10011](https://github.com/matrix-org/synapse/issues/10011))
Bugfixes
--------
- Fix a bug introduced in v1.26.0 which meant that `synapse_port_db` would not correctly initialise some postgres sequences, requiring manual updates afterwards. ([\#9991](https://github.com/matrix-org/synapse/issues/9991))
- Fix `synctl`'s `--no-daemonize` parameter to work correctly with worker processes. ([\#9995](https://github.com/matrix-org/synapse/issues/9995))
- Fix a validation bug introduced in v1.34.0 in the ordering of spaces in the space summary API. ([\#10002](https://github.com/matrix-org/synapse/issues/10002))
- Fixed deletion of new presence stream states from database. ([\#10014](https://github.com/matrix-org/synapse/issues/10014), [\#10033](https://github.com/matrix-org/synapse/issues/10033))
- Fixed a bug with very high resolution image uploads throwing internal server errors. ([\#10029](https://github.com/matrix-org/synapse/issues/10029))
Updates to the Docker image
---------------------------
- Fix bug introduced in Synapse 1.33.0 which caused a `Permission denied: '/homeserver.log'` error when starting Synapse with the generated log configuration. Contributed by Sergio Miguéns Iglesias. ([\#10045](https://github.com/matrix-org/synapse/issues/10045))
Improved Documentation
----------------------
- Add hardened systemd files as proposed in [#9760](https://github.com/matrix-org/synapse/issues/9760) and added them to `contrib/`. Change the docs to reflect the presence of these files. ([\#9803](https://github.com/matrix-org/synapse/issues/9803))
- Clarify documentation around SSO mapping providers generating unique IDs and localparts. ([\#9980](https://github.com/matrix-org/synapse/issues/9980))
- Updates to the PostgreSQL documentation (`postgres.md`). ([\#9988](https://github.com/matrix-org/synapse/issues/9988), [\#9989](https://github.com/matrix-org/synapse/issues/9989))
- Fix broken link in user directory documentation. Contributed by @junquera. ([\#10016](https://github.com/matrix-org/synapse/issues/10016))
- Add missing room state entry to the table of contents of room admin API. ([\#10043](https://github.com/matrix-org/synapse/issues/10043))
Deprecations and Removals
-------------------------
- Removed support for the deprecated `tls_fingerprints` configuration setting. Contributed by Jerin J Titus. ([\#9280](https://github.com/matrix-org/synapse/issues/9280))
Internal Changes
----------------
- Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`. ([\#9823](https://github.com/matrix-org/synapse/issues/9823))
- Update comments in the space summary handler. ([\#9974](https://github.com/matrix-org/synapse/issues/9974))
- Minor enhancements to the `@cachedList` descriptor. ([\#9975](https://github.com/matrix-org/synapse/issues/9975))
- Split multipart email sending into a dedicated handler. ([\#9977](https://github.com/matrix-org/synapse/issues/9977))
- Run `black` on files in the `scripts` directory. ([\#9981](https://github.com/matrix-org/synapse/issues/9981))
- Add missing type hints to `synapse.util` module. ([\#9982](https://github.com/matrix-org/synapse/issues/9982))
- Simplify a few helper functions. ([\#9984](https://github.com/matrix-org/synapse/issues/9984), [\#9985](https://github.com/matrix-org/synapse/issues/9985), [\#9986](https://github.com/matrix-org/synapse/issues/9986))
- Remove unnecessary property from SQLBaseStore. ([\#9987](https://github.com/matrix-org/synapse/issues/9987))
- Remove `keylen` param on `LruCache`. ([\#9993](https://github.com/matrix-org/synapse/issues/9993))
- Update the Grafana dashboard in `contrib/`. ([\#10001](https://github.com/matrix-org/synapse/issues/10001))
- Add a batching queue implementation. ([\#10017](https://github.com/matrix-org/synapse/issues/10017))
- Reduce memory usage when verifying signatures on large numbers of events at once. ([\#10018](https://github.com/matrix-org/synapse/issues/10018))
- Properly invalidate caches for destination retry timings every (instead of expiring entries every 5 minutes). ([\#10036](https://github.com/matrix-org/synapse/issues/10036))
- Fix running complement tests with Synapse workers. ([\#10039](https://github.com/matrix-org/synapse/issues/10039))
- Fix typo in `get_state_ids_for_event` docstring where the return type was incorrect. ([\#10050](https://github.com/matrix-org/synapse/issues/10050))
Synapse 1.34.0 (2021-05-17)
===========================

View file

@ -399,11 +399,9 @@ Once you have installed synapse as above, you will need to configure it.
### Using PostgreSQL
By default Synapse uses [SQLite](https://sqlite.org/) and in doing so trades performance for convenience.
SQLite is only recommended in Synapse for testing purposes or for servers with
very light workloads.
Almost all installations should opt to use [PostgreSQL](https://www.postgresql.org). Advantages include:
By default Synapse uses an [SQLite](https://sqlite.org/) database and in doing so trades
performance for convenience. Almost all installations should opt to use [PostgreSQL](https://www.postgresql.org)
instead. Advantages include:
- significant performance improvements due to the superior threading and
caching model, smarter query optimiser
@ -412,6 +410,10 @@ Almost all installations should opt to use [PostgreSQL](https://www.postgresql.o
For information on how to install and use PostgreSQL in Synapse, please see
[docs/postgres.md](docs/postgres.md)
SQLite is only acceptable for testing purposes. SQLite should not be used in
a production server. Synapse will perform poorly when using
SQLite, especially when participating in large rooms.
### TLS certificates
The default configuration exposes a single HTTP port on the local

View file

@ -149,21 +149,45 @@ For details on having Synapse manage your federation TLS certificates
automatically, please see `<docs/ACME.md>`_.
Security Note
Security note
=============
Matrix serves raw user generated data in some APIs - specifically the `content
repository endpoints <https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
Matrix serves raw, user-supplied data in some APIs -- specifically the `content
repository endpoints`_.
Whilst we have tried to mitigate against possible XSS attacks (e.g.
https://github.com/matrix-org/synapse/pull/1021) we recommend running
matrix homeservers on a dedicated domain name, to limit any malicious user generated
content served to web browsers a matrix API from being able to attack webapps hosted
on the same domain. This is particularly true of sharing a matrix webclient and
server on the same domain.
.. _content repository endpoints: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid
See https://github.com/vector-im/riot-web/issues/1977 and
https://developer.github.com/changes/2014-04-25-user-content-security for more details.
Whilst we make a reasonable effort to mitigate against XSS attacks (for
instance, by using `CSP`_), a Matrix homeserver should not be hosted on a
domain hosting other web applications. This especially applies to sharing
the domain with Matrix web clients and other sensitive applications like
webmail. See
https://developer.github.com/changes/2014-04-25-user-content-security for more
information.
.. _CSP: https://github.com/matrix-org/synapse/pull/1021
Ideally, the homeserver should not simply be on a different subdomain, but on
a completely different `registered domain`_ (also known as top-level site or
eTLD+1). This is because `some attacks`_ are still possible as long as the two
applications share the same registered domain.
.. _registered domain: https://tools.ietf.org/html/draft-ietf-httpbis-rfc6265bis-03#section-2.3
.. _some attacks: https://en.wikipedia.org/wiki/Session_fixation#Attacks_using_cross-subdomain_cookie
To illustrate this with an example, if your Element Web or other sensitive web
application is hosted on ``A.example1.com``, you should ideally host Synapse on
``example2.com``. Some amount of protection is offered by hosting on
``B.example1.com`` instead, so this is also acceptable in some scenarios.
However, you should *not* host your Synapse on ``A.example1.com``.
Note that all of the above refers exclusively to the domain used in Synapse's
``public_baseurl`` setting. In particular, it has no bearing on the domain
mentioned in MXIDs hosted on that server.
Following this advice ensures that even if an XSS is found in Synapse, the
impact to other applications will be minimal.
Upgrading an existing Synapse

View file

@ -1 +0,0 @@
Update the Grafana dashboard in `contrib/`.

View file

@ -1 +0,0 @@
Fix a validation bug introduced in v1.34.0 in the ordering of spaces in the space summary API.

View file

@ -1 +0,0 @@
Experimental support to allow a user who could join a restricted room to view it in the spaces summary.

View file

@ -1 +0,0 @@
Enable experimental support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) (spaces summary API) and [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) (restricted join rules) by default.

View file

@ -1 +0,0 @@
Fixed deletion of new presence stream states from database.

View file

@ -1 +0,0 @@
Fix broken link in user directory documentation. Contributed by @junquera.

View file

@ -1 +0,0 @@
Add a batching queue implementation.

View file

@ -1 +0,0 @@
Reduce memory usage when verifying signatures on large numbers of events at once.

View file

@ -1 +0,0 @@
Fixed a bug with very high resolution image uploads throwing internal server errors.

View file

@ -1 +0,0 @@
Fixed deletion of new presence stream states from database.

View file

@ -1 +0,0 @@
Properly invalidate caches for destination retry timings every (instead of expiring entries every 5 minutes).

View file

@ -1 +0,0 @@
Experimental support to allow a user who could join a restricted room to view it in the spaces summary.

View file

@ -1 +0,0 @@
Fix running complement tests with Synapse workers.

View file

@ -0,0 +1 @@
Add an admin API for unprotecting local media from quarantine. Contributed by @dklimpel.

View file

@ -1 +0,0 @@
Add missing room state entry to the table of contents of room admin API.

View file

@ -1 +0,0 @@
Fix bug introduced in Synapse 1.33.0 which caused a `Permission denied: '/homeserver.log'` error when starting Synapse with the generated log configuration. Contributed by Sergio Miguéns Iglesias.

1
changelog.d/10046.doc Normal file
View file

@ -0,0 +1 @@
Update CAPTCHA documentation to mention turning off the verify origin feature. Contributed by @aaronraimist.

View file

@ -1 +0,0 @@
Fix typo in `get_state_ids_for_event` docstring where the return type was incorrect.

1
changelog.d/10054.misc Normal file
View file

@ -0,0 +1 @@
Remove some dead code regarding TLS certificate handling.

1
changelog.d/10055.misc Normal file
View file

@ -0,0 +1 @@
Remove redundant, unmaintained `convert_server_keys` script.

1
changelog.d/10057.doc Normal file
View file

@ -0,0 +1 @@
Tweak wording of database recommendation in `INSTALL.md`. Contributed by @aaronraimist.

1
changelog.d/10059.misc Normal file
View file

@ -0,0 +1 @@
Improve the error message printed by synctl when synapse fails to start.

View file

@ -0,0 +1 @@
Remove the experimental `spaces_enabled` flag. The spaces features are always available now.

1
changelog.d/10069.misc Normal file
View file

@ -0,0 +1 @@
Fix GitHub Actions lint for newsfragments.

1
changelog.d/10078.misc Normal file
View file

@ -0,0 +1 @@
Fix up `BatchingQueue` implementation.

1
changelog.d/10082.bugfix Normal file
View file

@ -0,0 +1 @@
Fixed a bug causing replication requests to fail when receiving a lot of events via federation.

1
changelog.d/9221.doc Normal file
View file

@ -0,0 +1 @@
Clarify security note regarding hosting Synapse on the same domain as other web applications.

View file

@ -1 +0,0 @@
Removed support for the deprecated `tls_fingerprints` configuration setting. Contributed by Jerin J Titus.

View file

@ -1 +0,0 @@
Add hardened systemd files as proposed in [#9760](https://github.com/matrix-org/synapse/issues/9760) and added them to `contrib/`. Change the docs to reflect the presence of these files.

View file

@ -1 +0,0 @@
Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`.

1
changelog.d/9906.misc Normal file
View file

@ -0,0 +1 @@
Tell CircleCI to build Docker images from `main` branch.

View file

@ -1 +0,0 @@
Experimental support to allow a user who could join a restricted room to view it in the spaces summary.

View file

@ -1 +0,0 @@
Reduce memory usage when joining very large rooms over federation.

1
changelog.d/9973.misc Normal file
View file

@ -0,0 +1 @@
Make `LruCache.invalidate` support tree invalidation, and remove `invalidate_many`.

View file

@ -1 +0,0 @@
Update comments in the space summary handler.

View file

@ -1 +0,0 @@
Minor enhancements to the `@cachedList` descriptor.

View file

@ -1 +0,0 @@
Split multipart email sending into a dedicated handler.

View file

@ -1 +0,0 @@
Add a configuration option which allows enabling opentracing by user id.

View file

@ -1 +0,0 @@
Clarify documentation around SSO mapping providers generating unique IDs and localparts.

View file

@ -1 +0,0 @@
Run `black` on files in the `scripts` directory.

View file

@ -1 +0,0 @@
Add missing type hints to `synapse.util` module.

View file

@ -1 +0,0 @@
Simplify a few helper functions.

View file

@ -1 +0,0 @@
Simplify a few helper functions.

View file

@ -1 +0,0 @@
Simplify a few helper functions.

View file

@ -1 +0,0 @@
Remove unnecessary property from SQLBaseStore.

View file

@ -1 +0,0 @@
Updates to the PostgreSQL documentation (`postgres.md`).

View file

@ -1 +0,0 @@
Updates to the PostgreSQL documentation (`postgres.md`).

View file

@ -1 +0,0 @@
Fix a bug introduced in v1.26.0 which meant that `synapse_port_db` would not correctly initialise some postgres sequences, requiring manual updates afterwards.

View file

@ -1 +0,0 @@
Remove `keylen` param on `LruCache`.

View file

@ -1 +0,0 @@
Fix `synctl`'s `--no-daemonize` parameter to work correctly with worker processes.

View file

@ -1,31 +1,37 @@
# Overview
Captcha can be enabled for this home server. This file explains how to do that.
The captcha mechanism used is Google's ReCaptcha. This requires API keys from Google.
A captcha can be enabled on your homeserver to help prevent bots from registering
accounts. Synapse currently uses Google's reCAPTCHA service which requires API keys
from Google.
## Getting keys
Requires a site/secret key pair from:
<https://developers.google.com/recaptcha/>
Must be a reCAPTCHA v2 key using the "I'm not a robot" Checkbox option
## Setting ReCaptcha Keys
The keys are a config option on the home server config. If they are not
visible, you can generate them via `--generate-config`. Set the following value:
## Getting API keys
1. Create a new site at <https://www.google.com/recaptcha/admin/create>
1. Set the label to anything you want
1. Set the type to reCAPTCHA v2 using the "I'm not a robot" Checkbox option.
This is the only type of captcha that works with Synapse.
1. Add the public hostname for your server, as set in `public_baseurl`
in `homeserver.yaml`, to the list of authorized domains. If you have not set
`public_baseurl`, use `server_name`.
1. Agree to the terms of service and submit.
1. Copy your site key and secret key and add them to your `homeserver.yaml`
configuration file
```
recaptcha_public_key: YOUR_SITE_KEY
recaptcha_private_key: YOUR_SECRET_KEY
In addition, you MUST enable captchas via:
```
1. Enable the CAPTCHA for new registrations
```
enable_registration_captcha: true
```
1. Go to the settings page for the CAPTCHA you just created
1. Uncheck the "Verify the origin of reCAPTCHA solutions" checkbox so that the
captcha can be displayed in any client. If you do not disable this option then you
must specify the domains of every client that is allowed to display the CAPTCHA.
## Configuring IP used for auth
The ReCaptcha API requires that the IP address of the user who solved the
captcha is sent. If the client is connecting through a proxy or load balancer,
The reCAPTCHA API requires that the IP address of the user who solved the
CAPTCHA is sent. If the client is connecting through a proxy or load balancer,
it may be required to use the `X-Forwarded-For` (XFF) header instead of the origin
IP address. This can be configured using the `x_forwarded` directive in the
listeners section of the homeserver.yaml configuration file.
listeners section of the `homeserver.yaml` configuration file.

View file

@ -7,6 +7,7 @@
* [Quarantining media in a room](#quarantining-media-in-a-room)
* [Quarantining all media of a user](#quarantining-all-media-of-a-user)
* [Protecting media from being quarantined](#protecting-media-from-being-quarantined)
* [Unprotecting media from being quarantined](#unprotecting-media-from-being-quarantined)
- [Delete local media](#delete-local-media)
* [Delete a specific local media](#delete-a-specific-local-media)
* [Delete local media by date or size](#delete-local-media-by-date-or-size)
@ -159,6 +160,26 @@ Response:
{}
```
## Unprotecting media from being quarantined
This API reverts the protection of a media.
Request:
```
POST /_synapse/admin/v1/media/unprotect/<media_id>
{}
```
Where `media_id` is in the form of `abcdefg12345...`.
Response:
```json
{}
```
# Delete local media
This API deletes the *local* media from the disk of your own server.
This includes any local thumbnails and copies of media downloaded from

View file

@ -2916,18 +2916,3 @@ redis:
# Optional password if configured on the Redis instance
#
#password: <secret_password>
# Enable experimental features in Synapse.
#
# Experimental features might break or be removed without a deprecation
# period.
#
experimental_features:
# Support for Spaces (MSC1772), it enables the following:
#
# * The Spaces Summary API (MSC2946).
# * Restricting room membership based on space membership (MSC3083).
#
# Uncomment to disable support for Spaces.
#spaces_enabled: false

View file

@ -1,108 +0,0 @@
import json
import sys
import time
import psycopg2
import yaml
from canonicaljson import encode_canonical_json
from signedjson.key import read_signing_keys
from signedjson.sign import sign_json
from unpaddedbase64 import encode_base64
db_binary_type = memoryview
def select_v1_keys(connection):
cursor = connection.cursor()
cursor.execute("SELECT server_name, key_id, verify_key FROM server_signature_keys")
rows = cursor.fetchall()
cursor.close()
results = {}
for server_name, key_id, verify_key in rows:
results.setdefault(server_name, {})[key_id] = encode_base64(verify_key)
return results
def select_v1_certs(connection):
cursor = connection.cursor()
cursor.execute("SELECT server_name, tls_certificate FROM server_tls_certificates")
rows = cursor.fetchall()
cursor.close()
results = {}
for server_name, tls_certificate in rows:
results[server_name] = tls_certificate
return results
def select_v2_json(connection):
cursor = connection.cursor()
cursor.execute("SELECT server_name, key_id, key_json FROM server_keys_json")
rows = cursor.fetchall()
cursor.close()
results = {}
for server_name, key_id, key_json in rows:
results.setdefault(server_name, {})[key_id] = json.loads(
str(key_json).decode("utf-8")
)
return results
def convert_v1_to_v2(server_name, valid_until, keys, certificate):
return {
"old_verify_keys": {},
"server_name": server_name,
"verify_keys": {key_id: {"key": key} for key_id, key in keys.items()},
"valid_until_ts": valid_until,
}
def rows_v2(server, json):
valid_until = json["valid_until_ts"]
key_json = encode_canonical_json(json)
for key_id in json["verify_keys"]:
yield (server, key_id, "-", valid_until, valid_until, db_binary_type(key_json))
def main():
config = yaml.safe_load(open(sys.argv[1]))
valid_until = int(time.time() / (3600 * 24)) * 1000 * 3600 * 24
server_name = config["server_name"]
signing_key = read_signing_keys(open(config["signing_key_path"]))[0]
database = config["database"]
assert database["name"] == "psycopg2", "Can only convert for postgresql"
args = database["args"]
args.pop("cp_max")
args.pop("cp_min")
connection = psycopg2.connect(**args)
keys = select_v1_keys(connection)
certificates = select_v1_certs(connection)
json = select_v2_json(connection)
result = {}
for server in keys:
if server not in json:
v2_json = convert_v1_to_v2(
server, valid_until, keys[server], certificates[server]
)
v2_json = sign_json(v2_json, server_name, signing_key)
result[server] = v2_json
yaml.safe_dump(result, sys.stdout, default_flow_style=False)
rows = [row for server, json in result.items() for row in rows_v2(server, json)]
cursor = connection.cursor()
cursor.executemany(
"INSERT INTO server_keys_json ("
" server_name, key_id, from_server,"
" ts_added_ms, ts_valid_until_ms, key_json"
") VALUES (%s, %s, %s, %s, %s, %s)",
rows,
)
connection.commit()
if __name__ == "__main__":
main()

View file

@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.34.0"
__version__ = "1.35.0rc2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View file

@ -181,6 +181,6 @@ KNOWN_ROOM_VERSIONS = {
RoomVersions.V5,
RoomVersions.V6,
RoomVersions.MSC2176,
RoomVersions.MSC3083,
)
# Note that we do not include MSC3083 here unless it is enabled in the config.
} # type: Dict[str, RoomVersion]

View file

@ -261,13 +261,10 @@ def refresh_certificate(hs):
Refresh the TLS certificates that Synapse is using by re-reading them from
disk and updating the TLS context factories to use them.
"""
if not hs.config.has_tls_listener():
# attempt to reload the certs for the good of the tls_fingerprints
hs.config.read_certificate_from_disk(require_cert_and_key=False)
return
hs.config.read_certificate_from_disk(require_cert_and_key=True)
hs.config.read_certificate_from_disk()
hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config)
if hs._listening_services:

View file

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.config._base import Config
from synapse.types import JsonDict
@ -28,27 +27,5 @@ class ExperimentalConfig(Config):
# MSC2858 (multiple SSO identity providers)
self.msc2858_enabled = experimental.get("msc2858_enabled", False) # type: bool
# Spaces (MSC1772, MSC2946, MSC3083, etc)
self.spaces_enabled = experimental.get("spaces_enabled", True) # type: bool
if self.spaces_enabled:
KNOWN_ROOM_VERSIONS[RoomVersions.MSC3083.identifier] = RoomVersions.MSC3083
# MSC3026 (busy presence state)
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool
def generate_config_section(self, **kwargs):
return """\
# Enable experimental features in Synapse.
#
# Experimental features might break or be removed without a deprecation
# period.
#
experimental_features:
# Support for Spaces (MSC1772), it enables the following:
#
# * The Spaces Summary API (MSC2946).
# * Restricting room membership based on space membership (MSC3083).
#
# Uncomment to disable support for Spaces.
#spaces_enabled: false
"""

View file

@ -215,28 +215,12 @@ class TlsConfig(Config):
days_remaining = (expires_on - now).days
return days_remaining
def read_certificate_from_disk(self, require_cert_and_key: bool):
def read_certificate_from_disk(self):
"""
Read the certificates and private key from disk.
Args:
require_cert_and_key: set to True to throw an error if the certificate
and key file are not given
"""
if require_cert_and_key:
self.tls_private_key = self.read_tls_private_key()
self.tls_certificate = self.read_tls_certificate()
elif self.tls_certificate_file:
# we only need the certificate for the tls_fingerprints. Reload it if we
# can, but it's not a fatal error if we can't.
try:
self.tls_certificate = self.read_tls_certificate()
except Exception as e:
logger.info(
"Unable to read TLS certificate (%s). Ignoring as no "
"tls listeners enabled.",
e,
)
self.tls_private_key = self.read_tls_private_key()
self.tls_certificate = self.read_tls_certificate()
def generate_config_section(
self,

View file

@ -1398,7 +1398,7 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
)
return 200, await self.handler.federation_space_summary(
room_id, suggested_only, max_rooms_per_space, exclude_rooms
origin, room_id, suggested_only, max_rooms_per_space, exclude_rooms
)
# TODO When switching to the stable endpoint, remove the POST handler.
@ -1562,13 +1562,12 @@ def register_servlets(
server_name=hs.hostname,
).register(resource)
if hs.config.experimental.spaces_enabled:
FederationSpaceSummaryServlet(
handler=hs.get_space_summary_handler(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
).register(resource)
FederationSpaceSummaryServlet(
handler=hs.get_space_summary_handler(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
).register(resource)
if "openid" in servlet_groups:
for servletclass in OPENID_SERVLET_CLASSES:

View file

@ -91,6 +91,7 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler):
"""
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
result = await self._send_events(
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
# Limit the number of events sent over federation.
for batch in batch_iter(event_and_contexts, 1000):
result = await self._send_events(
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
)
return result["max_stream_id"]
else:
assert self.storage.persistence

View file

@ -68,7 +68,7 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
self._get_cached_user_device.invalidate_many((row.entity,))
self._get_cached_user_device.invalidate((row.entity,))
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
else:

View file

@ -137,8 +137,31 @@ class ProtectMediaByID(RestServlet):
logging.info("Protecting local media by ID: %s", media_id)
# Quarantine this media id
await self.store.mark_local_media_as_safe(media_id)
# Protect this media id
await self.store.mark_local_media_as_safe(media_id, safe=True)
return 200, {}
class UnprotectMediaByID(RestServlet):
"""Unprotect local media from being quarantined."""
PATTERNS = admin_patterns("/media/unprotect/(?P<media_id>[^/]+)")
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
async def on_POST(
self, request: SynapseRequest, media_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
logging.info("Unprotecting local media by ID: %s", media_id)
# Unprotect this media id
await self.store.mark_local_media_as_safe(media_id, safe=False)
return 200, {}
@ -269,6 +292,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server):
QuarantineMediaByID(hs).register(http_server)
QuarantineMediaByUser(hs).register(http_server)
ProtectMediaByID(hs).register(http_server)
UnprotectMediaByID(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
DeleteMediaByID(hs).register(http_server)
DeleteMediaByDateSize(hs).register(http_server)

View file

@ -1060,9 +1060,7 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
if hs.config.experimental.spaces_enabled:
RoomSpaceSummaryRestServlet(hs).register(http_server)
RoomSpaceSummaryRestServlet(hs).register(http_server)
# Some servlets only get registered for the main process.
if not is_worker:

View file

@ -171,7 +171,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.get_latest_event_ids_in_room.invalidate((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
@ -184,8 +184,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.get_invited_rooms_for_local_user.invalidate((state_key,))
if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
self.get_relations_for_event.invalidate((relates_to,))
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):

View file

@ -1282,7 +1282,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
)

View file

@ -860,7 +860,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

View file

@ -1748,9 +1748,9 @@ class PersistEventsStore:
},
)
txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,))
txn.call_after(
self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
self.store.get_aggregation_groups_for_event.invalidate, (parent_id,)
)
if rel_type == RelationTypes.REPLACE:
@ -1903,7 +1903,7 @@ class PersistEventsStore:
for user_id in user_ids:
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)
@ -1917,7 +1917,7 @@ class PersistEventsStore:
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
txn.execute(

View file

@ -143,6 +143,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"created_ts",
"quarantined_by",
"url_cache",
"safe_from_quarantine",
),
allow_none=True,
desc="get_local_media",
@ -296,12 +297,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media",
)
async def mark_local_media_as_safe(self, media_id: str) -> None:
"""Mark a local media as safe from quarantining."""
async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
"""Mark a local media as safe or unsafe from quarantining."""
await self.db_pool.simple_update_one(
table="local_media_repository",
keyvalues={"media_id": media_id},
updatevalues={"safe_from_quarantine": True},
updatevalues={"safe_from_quarantine": safe},
desc="mark_local_media_as_safe",
)

View file

@ -460,7 +460,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
self._get_linearized_receipts_for_room.invalidate_many((room_id,))
self._get_linearized_receipts_for_room.invalidate((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
@ -659,9 +659,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
)
txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type))
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(
self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
)
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
self.db_pool.simple_delete_txn(
txn,

View file

@ -25,10 +25,11 @@ from typing import (
TypeVar,
)
from prometheus_client import Gauge
from twisted.internet import defer
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
@ -38,6 +39,24 @@ logger = logging.getLogger(__name__)
V = TypeVar("V")
R = TypeVar("R")
number_queued = Gauge(
"synapse_util_batching_queue_number_queued",
"The number of items waiting in the queue across all keys",
labelnames=("name",),
)
number_in_flight = Gauge(
"synapse_util_batching_queue_number_pending",
"The number of items across all keys either being processed or waiting in a queue",
labelnames=("name",),
)
number_of_keys = Gauge(
"synapse_util_batching_queue_number_of_keys",
"The number of distinct keys that have items queued",
labelnames=("name",),
)
class BatchingQueue(Generic[V, R]):
"""A queue that batches up work, calling the provided processing function
@ -48,10 +67,20 @@ class BatchingQueue(Generic[V, R]):
called, and will keep being called until the queue has been drained (for the
given key).
If the processing function raises an exception then the exception is proxied
through to the callers waiting on that batch of work.
Note that the return value of `add_to_queue` will be the return value of the
processing function that processed the given item. This means that the
returned value will likely include data for other items that were in the
batch.
Args:
name: A name for the queue, used for logging contexts and metrics.
This must be unique, otherwise the metrics will be wrong.
clock: The clock to use to schedule work.
process_batch_callback: The callback to to be run to process a batch of
work.
"""
def __init__(
@ -73,19 +102,15 @@ class BatchingQueue(Generic[V, R]):
# The function to call with batches of values.
self._process_batch_callback = process_batch_callback
LaterGauge(
"synapse_util_batching_queue_number_queued",
"The number of items waiting in the queue across all keys",
labels=("name",),
caller=lambda: sum(len(v) for v in self._next_values.values()),
number_queued.labels(self._name).set_function(
lambda: sum(len(q) for q in self._next_values.values())
)
LaterGauge(
"synapse_util_batching_queue_number_of_keys",
"The number of distinct keys that have items queued",
labels=("name",),
caller=lambda: len(self._next_values),
)
number_of_keys.labels(self._name).set_function(lambda: len(self._next_values))
self._number_in_flight_metric = number_in_flight.labels(
self._name
) # type: Gauge
async def add_to_queue(self, value: V, key: Hashable = ()) -> R:
"""Adds the value to the queue with the given key, returning the result
@ -107,17 +132,18 @@ class BatchingQueue(Generic[V, R]):
if key not in self._processing_keys:
run_as_background_process(self._name, self._process_queue, key)
return await make_deferred_yieldable(d)
with self._number_in_flight_metric.track_inprogress():
return await make_deferred_yieldable(d)
async def _process_queue(self, key: Hashable) -> None:
"""A background task to repeatedly pull things off the queue for the
given key and call the `self._process_batch_callback` with the values.
"""
try:
if key in self._processing_keys:
return
if key in self._processing_keys:
return
try:
self._processing_keys.add(key)
while True:
@ -137,16 +163,16 @@ class BatchingQueue(Generic[V, R]):
values = [value for value, _ in next_values]
results = await self._process_batch_callback(values)
for _, deferred in next_values:
with PreserveLoggingContext():
with PreserveLoggingContext():
for _, deferred in next_values:
deferred.callback(results)
except Exception as e:
for _, deferred in next_values:
if deferred.called:
continue
with PreserveLoggingContext():
for _, deferred in next_values:
if deferred.called:
continue
with PreserveLoggingContext():
deferred.errback(e)
finally:

View file

@ -16,16 +16,7 @@
import enum
import threading
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union
from prometheus_client import Gauge
@ -91,7 +82,7 @@ class DeferredCache(Generic[KT, VT]):
# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
self._pending_deferred_cache = (
cache_type()
) # type: MutableMapping[KT, CacheEntry]
) # type: Union[TreeCache, MutableMapping[KT, CacheEntry]]
def metrics_cb():
cache_pending_metric.labels(name).set(len(self._pending_deferred_cache))
@ -287,8 +278,17 @@ class DeferredCache(Generic[KT, VT]):
self.cache.set(key, value, callbacks=callbacks)
def invalidate(self, key):
"""Delete a key, or tree of entries
If the cache is backed by a regular dict, then "key" must be of
the right type for this cache
If the cache is backed by a TreeCache, then "key" must be a tuple, but
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
self.check_thread()
self.cache.pop(key, None)
self.cache.del_multi(key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned
@ -299,20 +299,10 @@ class DeferredCache(Generic[KT, VT]):
# run the invalidation callbacks now, rather than waiting for the
# deferred to resolve.
if entry:
entry.invalidate()
def invalidate_many(self, key: KT):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
key = cast(KT, key)
self.cache.del_multi(key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
# _pending_deferred_cache.pop should either return a CacheEntry, or, in the
# case of a TreeCache, a dict of keys to cache entries. Either way calling
# iterate_tree_cache_entry on it will do the right thing.
for entry in iterate_tree_cache_entry(entry):
entry.invalidate()
def invalidate_all(self):

View file

@ -48,7 +48,6 @@ F = TypeVar("F", bound=Callable[..., Any])
class _CachedFunction(Generic[F]):
invalidate = None # type: Any
invalidate_all = None # type: Any
invalidate_many = None # type: Any
prefill = None # type: Any
cache = None # type: Any
num_args = None # type: Any
@ -262,6 +261,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
):
super().__init__(orig, num_args=num_args, cache_context=cache_context)
if tree and self.num_args < 2:
raise RuntimeError(
"tree=True is nonsensical for cached functions with a single parameter"
)
self.max_entries = max_entries
self.tree = tree
self.iterable = iterable
@ -302,11 +306,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
wrapped = cast(_CachedFunction, _wrapped)
if self.num_args == 1:
assert not self.tree
wrapped.invalidate = lambda key: cache.invalidate(key[0])
wrapped.prefill = lambda key, val: cache.prefill(key[0], val)
else:
wrapped.invalidate = cache.invalidate
wrapped.invalidate_many = cache.invalidate_many
wrapped.prefill = cache.prefill
wrapped.invalidate_all = cache.invalidate_all

View file

@ -152,7 +152,6 @@ class LruCache(Generic[KT, VT]):
"""
Least-recently-used cache, supporting prometheus metrics and invalidation callbacks.
Supports del_multi only if cache_type=TreeCache
If cache_type=TreeCache, all keys must be tuples.
"""
@ -393,10 +392,16 @@ class LruCache(Generic[KT, VT]):
@synchronized
def cache_del_multi(key: KT) -> None:
"""Delete an entry, or tree of entries
If the LruCache is backed by a regular dict, then "key" must be of
the right type for this cache
If the LruCache is backed by a TreeCache, then "key" must be a tuple, but
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
This will only work if constructed with cache_type=TreeCache
"""
popped = cache.pop(key)
popped = cache.pop(key, None)
if popped is None:
return
# for each deleted node, we now need to remove it from the linked list
@ -430,11 +435,10 @@ class LruCache(Generic[KT, VT]):
self.set = cache_set
self.setdefault = cache_set_default
self.pop = cache_pop
self.del_multi = cache_del_multi
# `invalidate` is exposed for consistency with DeferredCache, so that it can be
# invalidated by the cache invalidation replication stream.
self.invalidate = cache_pop
if cache_type is TreeCache:
self.del_multi = cache_del_multi
self.invalidate = cache_del_multi
self.len = synchronized(cache_len)
self.contains = cache_contains
self.clear = cache_clear

View file

@ -89,6 +89,9 @@ class TreeCache:
value. If the key is partial, the TreeCacheNode corresponding to the part
of the tree that was removed.
"""
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
# a list of the nodes we have touched on the way down the tree
nodes = []

12
synctl
View file

@ -97,11 +97,15 @@ def start(pidfile: str, app: str, config_files: Iterable[str], daemonize: bool)
write("started %s(%s)" % (app, ",".join(config_files)), colour=GREEN)
return True
except subprocess.CalledProcessError as e:
write(
"error starting %s(%s) (exit code: %d); see above for logs"
% (app, ",".join(config_files), e.returncode),
colour=RED,
err = "%s(%s) failed to start (exit code: %d). Check the Synapse logfile" % (
app,
",".join(config_files),
e.returncode,
)
if daemonize:
err += ", or run synctl with --no-daemonize"
err += "."
write(err, colour=RED, stream=sys.stderr)
return False

View file

@ -74,12 +74,11 @@ s4niecZKPBizL6aucT59CsunNmmb5Glq8rlAcU+1ZTZZzGYqVYhF6axB9Qg=
config = {
"tls_certificate_path": os.path.join(config_dir, "cert.pem"),
"tls_fingerprints": [],
}
t = TestConfig()
t.read_config(config, config_dir_path="", data_dir_path="")
t.read_certificate_from_disk(require_cert_and_key=False)
t.read_tls_certificate()
warnings = self.flushWarnings()
self.assertEqual(len(warnings), 1)

View file

@ -16,6 +16,8 @@ import json
import os
from binascii import unhexlify
from parameterized import parameterized
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client.v1 import login, profile, room
@ -562,3 +564,100 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
)
# Test that the file is deleted
self.assertFalse(os.path.exists(local_path))
class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.admin.register_servlets_for_media_repo,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
media_repo = hs.get_media_repository_resource()
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
# Create media
upload_resource = media_repo.children[b"upload"]
# file size is 67 Byte
image_data = unhexlify(
b"89504e470d0a1a0a0000000d4948445200000001000000010806"
b"0000001f15c4890000000a49444154789c63000100000500010d"
b"0a2db40000000049454e44ae426082"
)
# Upload some media into the room
response = self.helper.upload_media(
upload_resource, image_data, tok=self.admin_user_tok, expect_code=200
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
self.media_id = server_and_media_id.split("/")[1]
self.url = "/_synapse/admin/v1/media/%s/%s"
@parameterized.expand(["protect", "unprotect"])
def test_no_auth(self, action: str):
"""
Try to protect media without authentication.
"""
channel = self.make_request("POST", self.url % (action, self.media_id), b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["protect", "unprotect"])
def test_requester_is_no_admin(self, action: str):
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"POST",
self.url % (action, self.media_id),
access_token=self.other_user_token,
)
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_protect_media(self):
"""
Tests that protect and unprotect a media is successfully
"""
media_info = self.get_success(self.store.get_local_media(self.media_id))
self.assertFalse(media_info["safe_from_quarantine"])
# protect
channel = self.make_request(
"POST",
self.url % ("protect", self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
self.assertTrue(media_info["safe_from_quarantine"])
# unprotect
channel = self.make_request(
"POST",
self.url % ("unprotect", self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
self.assertFalse(media_info["safe_from_quarantine"])

View file

@ -622,17 +622,17 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase):
self.assertEquals(callcount2[0], 1)
a.func2.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 1)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 1)
yield a.func2("foo")
a.func2.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 2)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 2)
self.assertEquals(callcount[0], 1)
self.assertEquals(callcount2[0], 2)
a.func.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 3)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 3)
yield a.func("foo")
self.assertEquals(callcount[0], 2)

View file

@ -14,7 +14,12 @@
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable
from synapse.util.batching_queue import BatchingQueue
from synapse.util.batching_queue import (
BatchingQueue,
number_in_flight,
number_of_keys,
number_queued,
)
from tests.server import get_clock
from tests.unittest import TestCase
@ -24,6 +29,14 @@ class BatchingQueueTestCase(TestCase):
def setUp(self):
self.clock, hs_clock = get_clock()
# We ensure that we remove any existing metrics for "test_queue".
try:
number_queued.remove("test_queue")
number_of_keys.remove("test_queue")
number_in_flight.remove("test_queue")
except KeyError:
pass
self._pending_calls = []
self.queue = BatchingQueue("test_queue", hs_clock, self._process_queue)
@ -32,6 +45,41 @@ class BatchingQueueTestCase(TestCase):
self._pending_calls.append((values, d))
return await make_deferred_yieldable(d)
def _assert_metrics(self, queued, keys, in_flight):
"""Assert that the metrics are correct"""
self.assertEqual(len(number_queued.collect()), 1)
self.assertEqual(len(number_queued.collect()[0].samples), 1)
self.assertEqual(
number_queued.collect()[0].samples[0].labels,
{"name": self.queue._name},
)
self.assertEqual(
number_queued.collect()[0].samples[0].value,
queued,
"number_queued",
)
self.assertEqual(len(number_of_keys.collect()), 1)
self.assertEqual(len(number_of_keys.collect()[0].samples), 1)
self.assertEqual(
number_queued.collect()[0].samples[0].labels, {"name": self.queue._name}
)
self.assertEqual(
number_of_keys.collect()[0].samples[0].value, keys, "number_of_keys"
)
self.assertEqual(len(number_in_flight.collect()), 1)
self.assertEqual(len(number_in_flight.collect()[0].samples), 1)
self.assertEqual(
number_queued.collect()[0].samples[0].labels, {"name": self.queue._name}
)
self.assertEqual(
number_in_flight.collect()[0].samples[0].value,
in_flight,
"number_in_flight",
)
def test_simple(self):
"""Tests the basic case of calling `add_to_queue` once and having
`_process_queue` return.
@ -41,6 +89,8 @@ class BatchingQueueTestCase(TestCase):
queue_d = defer.ensureDeferred(self.queue.add_to_queue("foo"))
self._assert_metrics(queued=1, keys=1, in_flight=1)
# The queue should wait a reactor tick before calling the processing
# function.
self.assertFalse(self._pending_calls)
@ -52,12 +102,15 @@ class BatchingQueueTestCase(TestCase):
self.assertEqual(len(self._pending_calls), 1)
self.assertEqual(self._pending_calls[0][0], ["foo"])
self.assertFalse(queue_d.called)
self._assert_metrics(queued=0, keys=0, in_flight=1)
# Return value of the `_process_queue` should be propagated back.
self._pending_calls.pop()[1].callback("bar")
self.assertEqual(self.successResultOf(queue_d), "bar")
self._assert_metrics(queued=0, keys=0, in_flight=0)
def test_batching(self):
"""Test that multiple calls at the same time get batched up into one
call to `_process_queue`.
@ -68,6 +121,8 @@ class BatchingQueueTestCase(TestCase):
queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1"))
queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2"))
self._assert_metrics(queued=2, keys=1, in_flight=2)
self.clock.pump([0])
# We should see only *one* call to `_process_queue`
@ -75,12 +130,14 @@ class BatchingQueueTestCase(TestCase):
self.assertEqual(self._pending_calls[0][0], ["foo1", "foo2"])
self.assertFalse(queue_d1.called)
self.assertFalse(queue_d2.called)
self._assert_metrics(queued=0, keys=0, in_flight=2)
# Return value of the `_process_queue` should be propagated back to both.
self._pending_calls.pop()[1].callback("bar")
self.assertEqual(self.successResultOf(queue_d1), "bar")
self.assertEqual(self.successResultOf(queue_d2), "bar")
self._assert_metrics(queued=0, keys=0, in_flight=0)
def test_queuing(self):
"""Test that we queue up requests while a `_process_queue` is being
@ -92,13 +149,20 @@ class BatchingQueueTestCase(TestCase):
queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1"))
self.clock.pump([0])
self.assertEqual(len(self._pending_calls), 1)
# We queue up work after the process function has been called, testing
# that they get correctly queued up.
queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2"))
queue_d3 = defer.ensureDeferred(self.queue.add_to_queue("foo3"))
# We should see only *one* call to `_process_queue`
self.assertEqual(len(self._pending_calls), 1)
self.assertEqual(self._pending_calls[0][0], ["foo1"])
self.assertFalse(queue_d1.called)
self.assertFalse(queue_d2.called)
self.assertFalse(queue_d3.called)
self._assert_metrics(queued=2, keys=1, in_flight=3)
# Return value of the `_process_queue` should be propagated back to the
# first.
@ -106,18 +170,24 @@ class BatchingQueueTestCase(TestCase):
self.assertEqual(self.successResultOf(queue_d1), "bar1")
self.assertFalse(queue_d2.called)
self.assertFalse(queue_d3.called)
self._assert_metrics(queued=2, keys=1, in_flight=2)
# We should now see a second call to `_process_queue`
self.clock.pump([0])
self.assertEqual(len(self._pending_calls), 1)
self.assertEqual(self._pending_calls[0][0], ["foo2"])
self.assertEqual(self._pending_calls[0][0], ["foo2", "foo3"])
self.assertFalse(queue_d2.called)
self.assertFalse(queue_d3.called)
self._assert_metrics(queued=0, keys=0, in_flight=2)
# Return value of the `_process_queue` should be propagated back to the
# second.
self._pending_calls.pop()[1].callback("bar2")
self.assertEqual(self.successResultOf(queue_d2), "bar2")
self.assertEqual(self.successResultOf(queue_d3), "bar2")
self._assert_metrics(queued=0, keys=0, in_flight=0)
def test_different_keys(self):
"""Test that calls to different keys get processed in parallel."""
@ -140,6 +210,7 @@ class BatchingQueueTestCase(TestCase):
self.assertFalse(queue_d1.called)
self.assertFalse(queue_d2.called)
self.assertFalse(queue_d3.called)
self._assert_metrics(queued=1, keys=1, in_flight=3)
# Return value of the `_process_queue` should be propagated back to the
# first.
@ -148,6 +219,7 @@ class BatchingQueueTestCase(TestCase):
self.assertEqual(self.successResultOf(queue_d1), "bar1")
self.assertFalse(queue_d2.called)
self.assertFalse(queue_d3.called)
self._assert_metrics(queued=1, keys=1, in_flight=2)
# Return value of the `_process_queue` should be propagated back to the
# second.
@ -161,9 +233,11 @@ class BatchingQueueTestCase(TestCase):
self.assertEqual(len(self._pending_calls), 1)
self.assertEqual(self._pending_calls[0][0], ["foo3"])
self.assertFalse(queue_d3.called)
self._assert_metrics(queued=0, keys=0, in_flight=1)
# Return value of the `_process_queue` should be propagated back to the
# third deferred.
self._pending_calls.pop()[1].callback("bar4")
self.assertEqual(self.successResultOf(queue_d3), "bar4")
self._assert_metrics(queued=0, keys=0, in_flight=0)