Merge branch 'develop' into disallow_null_byte

This commit is contained in:
H. Shay 2021-11-09 11:15:19 -08:00
commit 8b32da9b9a
144 changed files with 2966 additions and 783 deletions

View file

@ -1,12 +1,13 @@
### Pull Request Checklist
<!-- Please read CONTRIBUTING.md before submitting your pull request -->
<!-- Please read https://matrix-org.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request -->
* [ ] Pull request is based on the develop branch
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog). The entry should:
* [ ] Pull request includes a [changelog file](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should:
- Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
- Use markdown where necessary, mostly for `code blocks`.
- End with either a period (.) or an exclamation mark (!).
- Start with a capital letter.
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#sign-off)
* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#code-style))
* [ ] Pull request includes a [sign off](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#sign-off)
* [ ] [Code style](https://matrix-org.github.io/synapse/latest/code_style.html) is correct
(run the [linters](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

View file

@ -1,8 +1,17 @@
Synapse 1.46.0rc1 (2021-10-27)
==============================
Synapse 1.46.0 (2021-11-02)
===========================
The cause of the [performance regression affecting Synapse 1.44](https://github.com/matrix-org/synapse/issues/11049) has been identified and fixed. ([\#11177](https://github.com/matrix-org/synapse/issues/11177))
Bugfixes
--------
- Fix a bug introduced in v1.46.0rc1 where URL previews of some XML documents would fail. ([\#11196](https://github.com/matrix-org/synapse/issues/11196))
Synapse 1.46.0rc1 (2021-10-27)
==============================
Features
--------

1
changelog.d/11098.misc Normal file
View file

@ -0,0 +1 @@
Add type hints to `synapse.events`.

View file

@ -0,0 +1 @@
Add search by room ID and room alias to List Room admin API.

1
changelog.d/11137.misc Normal file
View file

@ -0,0 +1 @@
Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code.

1
changelog.d/11157.misc Normal file
View file

@ -0,0 +1 @@
Only allow old Element/Riot Android clients to send read receipts without a request body. All other clients must include a request body as required by the specification. Contributed by @rogersheu.

1
changelog.d/11188.bugfix Normal file
View file

@ -0,0 +1 @@
Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`.

1
changelog.d/11199.bugfix Normal file
View file

@ -0,0 +1 @@
Delete `to_device` messages for hidden devices that will never be read, reducing database size.

1
changelog.d/11200.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug wherein a missing `Content-Type` header when downloading remote media would cause Synapse to throw an error.

1
changelog.d/11207.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper.

View file

@ -0,0 +1 @@
Calculate a default value for `public_baseurl` based on `server_name`.

1
changelog.d/11225.misc Normal file
View file

@ -0,0 +1 @@
Replace outdated links in the pull request checklist with links to the rendered documentation.

View file

@ -0,0 +1 @@
Allow the admin [Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api) to block a room without the need to join it.

1
changelog.d/11229.misc Normal file
View file

@ -0,0 +1 @@
`ObservableDeferred`: run registered observers in order.

1
changelog.d/11231.misc Normal file
View file

@ -0,0 +1 @@
Minor speed up to start up times and getting updates for groups by adding missing index to `local_group_updates.stream_id`.

1
changelog.d/11233.misc Normal file
View file

@ -0,0 +1 @@
Add `twine` and `towncrier` as dev dependencies, as they're used by the release script.

1
changelog.d/11234.bugfix Normal file
View file

@ -0,0 +1 @@
Fix long-standing bug where cross signing keys were not included in the response to `/r0/keys/query` the first time a remote user was queried.

View file

@ -0,0 +1 @@
Support filtering by relation senders & types per [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).

1
changelog.d/11237.misc Normal file
View file

@ -0,0 +1 @@
Allow `stream_writers.typing` config to be a list of one worker.

1
changelog.d/11239.misc Normal file
View file

@ -0,0 +1 @@
Remove debugging statement in tests.

1
changelog.d/11240.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection.

1
changelog.d/11244.misc Normal file
View file

@ -0,0 +1 @@
Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.

1
changelog.d/11246.misc Normal file
View file

@ -0,0 +1 @@
Add an additional test for the `cachedList` method decorator.

1
changelog.d/11247.misc Normal file
View file

@ -0,0 +1 @@
Clean up code relating to to-device messages and sending ephemeral events to application services.

1
changelog.d/11253.misc Normal file
View file

@ -0,0 +1 @@
Make minor correction to the type of `auth_checkers` callbacks.

1
changelog.d/11255.bugfix Normal file
View file

@ -0,0 +1 @@
Fix rolling back Synapse version when using workers.

1
changelog.d/11257.doc Normal file
View file

@ -0,0 +1 @@
Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr.

1
changelog.d/11262.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.

View file

@ -0,0 +1 @@
Add some background update admin APIs.

1
changelog.d/11269.misc Normal file
View file

@ -0,0 +1 @@
Clean up trivial aspects of the Debian package build tooling.

1
changelog.d/11270.misc Normal file
View file

@ -0,0 +1 @@
Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse.

1
changelog.d/11273.misc Normal file
View file

@ -0,0 +1 @@
Clean up trivial aspects of the Debian package build tooling.

1
changelog.d/11276.bugfix Normal file
View file

@ -0,0 +1 @@
Fix rolling back Synapse version when using workers.

1
changelog.d/11278.misc Normal file
View file

@ -0,0 +1 @@
Fix a small typo in the error response when a relation type other than 'm.annotation' is passed to `GET /rooms/{room_id}/aggregations/{event_id}`.

1
changelog.d/11282.misc Normal file
View file

@ -0,0 +1 @@
Require all files in synapse/ and tests/ to pass mypy unless specifically excluded.

1
changelog.d/11285.misc Normal file
View file

@ -0,0 +1 @@
Require all files in synapse/ and tests/ to pass mypy unless specifically excluded.

View file

@ -40,6 +40,7 @@ dh_virtualenv \
--upgrade-pip \
--preinstall="lxml" \
--preinstall="mock" \
--preinstall="wheel" \
--extra-pip-arg="--no-cache-dir" \
--extra-pip-arg="--compile" \
--extras="all,systemd,test"

16
debian/changelog vendored
View file

@ -1,8 +1,22 @@
matrix-synapse-py3 (1.47.0+nmu1) UNRELEASED; urgency=medium
* Update scripts to pass Shellcheck lints.
* Remove unused Vagrant scripts from debian/ directory.
* Allow building Debian packages for any architecture, not just amd64.
* Preinstall the "wheel" package when building virtualenvs.
* Do not error if /etc/default/matrix-synapse is missing.
-- root <root@cae79a6e79d7> Fri, 22 Oct 2021 22:20:31 +0000
-- Dan Callahan <danc@element.io> Fri, 22 Oct 2021 22:20:31 +0000
matrix-synapse-py3 (1.46.0) stable; urgency=medium
[ Richard van der Hoff ]
* Compress debs with xz, to fix incompatibility of impish debs with reprepro.
[ Synapse Packaging team ]
* New synapse release 1.46.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 02 Nov 2021 13:22:53 +0000
matrix-synapse-py3 (1.46.0~rc1) stable; urgency=medium

2
debian/control vendored
View file

@ -19,7 +19,7 @@ Standards-Version: 3.9.8
Homepage: https://github.com/matrix-org/synapse
Package: matrix-synapse-py3
Architecture: amd64
Architecture: any
Provides: matrix-synapse
Conflicts:
matrix-synapse (<< 0.34.0.1-0matrix2),

View file

@ -5,7 +5,7 @@ Description=Synapse Matrix homeserver
Type=notify
User=matrix-synapse
WorkingDirectory=/var/lib/matrix-synapse
EnvironmentFile=/etc/default/matrix-synapse
EnvironmentFile=-/etc/default/matrix-synapse
ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
ExecReload=/bin/kill -HUP $MAINPID

6
debian/rules vendored
View file

@ -51,5 +51,11 @@ override_dh_shlibdeps:
override_dh_virtualenv:
./debian/build_virtualenv
override_dh_builddeb:
# force the compression to xzip, to stop dpkg-deb on impish defaulting to zstd
# (which requires reprepro 5.3.0-1.3, which is currently only in 'experimental' in Debian:
# https://metadata.ftp-master.debian.org/changelogs/main/r/reprepro/reprepro_5.3.0-1.3_changelog)
dh_builddeb -- -Zxz
%:
dh $@ --with python-virtualenv

View file

@ -1,2 +0,0 @@
.vagrant
*.log

View file

@ -1,24 +0,0 @@
#!/bin/bash
#
# provisioning script for vagrant boxes for testing the matrix-synapse debs.
#
# Will install the most recent matrix-synapse-py3 deb for this platform from
# the /debs directory.
set -e
apt-get update
apt-get install -y lsb-release
deb=$(find /debs -name "matrix-synapse-py3_*+$(lsb_release -cs)*.deb" | sort | tail -n1)
debconf-set-selections <<EOF
matrix-synapse matrix-synapse/report-stats boolean false
matrix-synapse matrix-synapse/server-name string localhost:18448
EOF
dpkg -i "$deb"
sed -i -e 's/port: 8448$/port: 18448/; s/port: 8008$/port: 18008' /etc/matrix-synapse/homeserver.yaml
echo 'registration_shared_secret: secret' >> /etc/matrix-synapse/homeserver.yaml
systemctl restart matrix-synapse

View file

@ -1,13 +0,0 @@
# -*- mode: ruby -*-
# vi: set ft=ruby :
ver = `cd ../../..; dpkg-parsechangelog -S Version`.strip()
Vagrant.configure("2") do |config|
config.vm.box = "debian/stretch64"
config.vm.synced_folder ".", "/vagrant", disabled: true
config.vm.synced_folder "../../../../debs", "/debs", type: "nfs"
config.vm.provision "shell", path: "../provision.sh"
end

View file

@ -1,10 +0,0 @@
# -*- mode: ruby -*-
# vi: set ft=ruby :
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/xenial64"
config.vm.synced_folder ".", "/vagrant", disabled: true
config.vm.synced_folder "../../../../debs", "/debs"
config.vm.provision "shell", path: "../provision.sh"
end

View file

@ -51,6 +51,7 @@
- [Administration](usage/administration/README.md)
- [Admin API](usage/administration/admin_api/README.md)
- [Account Validity](admin_api/account_validity.md)
- [Background Updates](usage/administration/admin_api/background_updates.md)
- [Delete Group](admin_api/delete_group.md)
- [Event Reports](admin_api/event_reports.md)
- [Media](admin_api/media_admin_api.md)

View file

@ -38,9 +38,14 @@ The following query parameters are available:
- `history_visibility` - Rooms are ordered alphabetically by visibility of history of the room.
- `state_events` - Rooms are ordered by number of state events. Largest to smallest.
* `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting
this value to `b` will reverse the above sort order. Defaults to `f`.
* `search_term` - Filter rooms by their room name. Search term can be contained in any
part of the room name. Defaults to no filtering.
this value to `b` will reverse the above sort order. Defaults to `f`.
* `search_term` - Filter rooms by their room name, canonical alias and room id.
Specifically, rooms are selected if the search term is contained in
- the room's name,
- the local part of the room's canonical alias, or
- the complete (local and server part) room's id (case sensitive).
Defaults to no filtering.
**Response**
@ -380,7 +385,7 @@ A response body like the following is returned:
# Delete Room API
The Delete Room admin API allows server admins to remove rooms from server
The Delete Room admin API allows server admins to remove rooms from the server
and block these rooms.
Shuts down a room. Moves all local users and room aliases automatically to a
@ -391,13 +396,17 @@ The new room will be created with the user specified by the `new_room_user_id` p
as room administrator and will contain a message explaining what happened. Users invited
to the new room will have power level `-10` by default, and thus be unable to speak.
If `block` is `True` it prevents new joins to the old room.
If `block` is `true`, users will be prevented from joining the old room.
This option can also be used to pre-emptively block a room, even if it's unknown
to this homeserver. In this case, the room will be blocked, and no further action
will be taken. If `block` is `false`, attempting to delete an unknown room is
invalid and will be rejected as a bad request.
This API will remove all trace of the old room from your database after removing
all local users. If `purge` is `true` (the default), all traces of the old room will
be removed from your database after removing all local users. If you do not want
this to happen, set `purge` to `false`.
Depending on the amount of history being purged a call to the API may take
Depending on the amount of history being purged, a call to the API may take
several minutes or longer.
The local server will only have the power to move local user and room aliases to
@ -459,8 +468,9 @@ The following JSON body parameters are available:
`new_room_user_id` in the new room. Ideally this will clearly convey why the
original room was shut down. Defaults to `Sharing illegal content on this server
is not permitted and rooms in violation will be blocked.`
* `block` - Optional. If set to `true`, this room will be added to a blocking list, preventing
future attempts to join the room. Defaults to `false`.
* `block` - Optional. If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Rooms can be blocked
even if they're not yet known to the homeserver. Defaults to `false`.
* `purge` - Optional. If set to `true`, it will remove all traces of the room from your database.
Defaults to `true`.
* `force_purge` - Optional, and ignored unless `purge` is `true`. If set to `true`, it
@ -478,7 +488,8 @@ The following fields are returned in the JSON response body:
* `failed_to_kick_users` - An array of users (`user_id`) that that were not kicked.
* `local_aliases` - An array of strings representing the local aliases that were migrated from
the old room to the new.
* `new_room_id` - A string representing the room ID of the new room.
* `new_room_id` - A string representing the room ID of the new room, or `null` if
no such room was created.
## Undoing room deletions

View file

@ -11,7 +11,7 @@ registered by using the Module API's `register_password_auth_provider_callbacks`
_First introduced in Synapse v1.46.0_
```python
auth_checkers: Dict[Tuple[str,Tuple], Callable]
auth_checkers: Dict[Tuple[str, Tuple[str, ...]], Callable]
```
A dict mapping from tuples of a login type identifier (such as `m.login.password`) and a

View file

@ -22,6 +22,7 @@ such as [Github][github-idp].
[google-idp]: https://developers.google.com/identity/protocols/oauth2/openid-connect
[auth0]: https://auth0.com/
[authentik]: https://goauthentik.io/
[lemonldap]: https://lemonldap-ng.org/
[okta]: https://www.okta.com/
[dex-idp]: https://github.com/dexidp/dex
[keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols
@ -243,6 +244,43 @@ oidc_providers:
display_name_template: "{{ user.preferred_username|capitalize }}" # TO BE FILLED: If your users have names in Authentik and you want those in Synapse, this should be replaced with user.name|capitalize.
```
### LemonLDAP
[LemonLDAP::NG][lemonldap] is an open-source IdP solution.
1. Create an OpenID Connect Relying Parties in LemonLDAP::NG
2. The parameters are:
- Client ID under the basic menu of the new Relying Parties (`Options > Basic >
Client ID`)
- Client secret (`Options > Basic > Client secret`)
- JWT Algorithm: RS256 within the security menu of the new Relying Parties
(`Options > Security > ID Token signature algorithm` and `Options > Security >
Access Token signature algorithm`)
- Scopes: OpenID, Email and Profile
- Allowed redirection addresses for login (`Options > Basic > Allowed
redirection addresses for login` ) :
`[synapse public baseurl]/_synapse/client/oidc/callback`
Synapse config:
```yaml
oidc_providers:
- idp_id: lemonldap
idp_name: lemonldap
discover: true
issuer: "https://auth.example.org/" # TO BE FILLED: replace with your domain
client_id: "your client id" # TO BE FILLED
client_secret: "your client secret" # TO BE FILLED
scopes:
- "openid"
- "profile"
- "email"
user_mapping_provider:
config:
localpart_template: "{{ user.preferred_username }}}"
# TO BE FILLED: If your users have names in LemonLDAP::NG and you want those in Synapse, this should be replaced with user.name|capitalize or any valid filter.
display_name_template: "{{ user.preferred_username|capitalize }}"
```
### GitHub
[GitHub][github-idp] is a bit special as it is not an OpenID Connect compliant provider, but

View file

@ -91,6 +91,8 @@ pid_file: DATADIR/homeserver.pid
# Otherwise, it should be the URL to reach Synapse's client HTTP listener (see
# 'listeners' below).
#
# Defaults to 'https://<server_name>/'.
#
#public_baseurl: https://example.com/
# Uncomment the following to tell other servers to send federation traffic on
@ -1265,7 +1267,7 @@ oembed:
# in on this server.
#
# (By default, no suggestion is made, so it is left up to the client.
# This setting is ignored unless public_baseurl is also set.)
# This setting is ignored unless public_baseurl is also explicitly set.)
#
#default_identity_server: https://matrix.org
@ -1290,8 +1292,6 @@ oembed:
# by the Matrix Identity Service API specification:
# https://matrix.org/docs/spec/identity_service/latest
#
# If a delegate is specified, the config option public_baseurl must also be filled out.
#
account_threepid_delegates:
#email: https://example.com # Delegate email sending to example.com
#msisdn: http://localhost:8090 # Delegate SMS sending to this local process
@ -1981,11 +1981,10 @@ sso:
# phishing attacks from evil.site. To avoid this, include a slash after the
# hostname: "https://my.client/".
#
# If public_baseurl is set, then the login fallback page (used by clients
# that don't natively support the required login flows) is whitelisted in
# addition to any URLs in this list.
# The login fallback page (used by clients that don't natively support the
# required login flows) is whitelisted in addition to any URLs in this list.
#
# By default, this list is empty.
# By default, this list contains only the login fallback page.
#
#client_whitelist:
# - https://riot.im/develop

View file

@ -15,7 +15,7 @@ Type=notify
NotifyAccess=main
User=matrix-synapse
WorkingDirectory=/var/lib/matrix-synapse
EnvironmentFile=/etc/default/matrix-synapse
EnvironmentFile=-/etc/default/matrix-synapse
ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.generic_worker --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml
ExecReload=/bin/kill -HUP $MAINPID
Restart=always

View file

@ -10,7 +10,7 @@ Type=notify
NotifyAccess=main
User=matrix-synapse
WorkingDirectory=/var/lib/matrix-synapse
EnvironmentFile=/etc/default/matrix-synapse
EnvironmentFile=-/etc/default/matrix-synapse
ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
ExecReload=/bin/kill -HUP $MAINPID

View file

@ -0,0 +1,84 @@
# Background Updates API
This API allows a server administrator to manage the background updates being
run against the database.
## Status
This API gets the current status of the background updates.
The API is:
```
GET /_synapse/admin/v1/background_updates/status
```
Returning:
```json
{
"enabled": true,
"current_updates": {
"<db_name>": {
"name": "<background_update_name>",
"total_item_count": 50,
"total_duration_ms": 10000.0,
"average_items_per_ms": 2.2,
},
}
}
```
`enabled` whether the background updates are enabled or disabled.
`db_name` the database name (usually Synapse is configured with a single database named 'master').
For each update:
`name` the name of the update.
`total_item_count` total number of "items" processed (the meaning of 'items' depends on the update in question).
`total_duration_ms` how long the background process has been running, not including time spent sleeping.
`average_items_per_ms` how many items are processed per millisecond based on an exponential average.
## Enabled
This API allow pausing background updates.
Background updates should *not* be paused for significant periods of time, as
this can affect the performance of Synapse.
*Note*: This won't persist over restarts.
*Note*: This won't cancel any update query that is currently running. This is
usually fine since most queries are short lived, except for `CREATE INDEX`
background updates which won't be cancelled once started.
The API is:
```
POST /_synapse/admin/v1/background_updates/enabled
```
with the following body:
```json
{
"enabled": false
}
```
`enabled` sets whether the background updates are enabled or disabled.
The API returns the `enabled` param.
```json
{
"enabled": false
}
```
There is also a `GET` version which returns the `enabled` state.

248
mypy.ini
View file

@ -10,88 +10,173 @@ warn_unreachable = True
local_partial_types = True
no_implicit_optional = True
# To find all folders that pass mypy you run:
#
# find synapse/* -type d -not -name __pycache__ -exec bash -c "mypy '{}' > /dev/null" \; -print
files =
scripts-dev/sign_json,
synapse/__init__.py,
synapse/api,
synapse/appservice,
synapse/config,
synapse/crypto,
synapse/event_auth.py,
synapse/events/builder.py,
synapse/events/presence_router.py,
synapse/events/snapshot.py,
synapse/events/spamcheck.py,
synapse/events/third_party_rules.py,
synapse/events/utils.py,
synapse/events/validator.py,
synapse/federation,
synapse/groups,
synapse/handlers,
synapse/http,
synapse/logging,
synapse/metrics,
synapse/module_api,
synapse/notifier.py,
synapse/push,
synapse/replication,
synapse/rest,
synapse/server.py,
synapse/server_notices,
synapse/spam_checker_api,
synapse/state,
synapse/storage/__init__.py,
synapse/storage/_base.py,
synapse/storage/background_updates.py,
synapse/storage/databases/main/appservice.py,
synapse/storage/databases/main/client_ips.py,
synapse/storage/databases/main/events.py,
synapse/storage/databases/main/keys.py,
synapse/storage/databases/main/pusher.py,
synapse/storage/databases/main/registration.py,
synapse/storage/databases/main/relations.py,
synapse/storage/databases/main/session.py,
synapse/storage/databases/main/stream.py,
synapse/storage/databases/main/ui_auth.py,
synapse/storage/databases/state,
synapse/storage/database.py,
synapse/storage/engines,
synapse/storage/keys.py,
synapse/storage/persist_events.py,
synapse/storage/prepare_database.py,
synapse/storage/purge_events.py,
synapse/storage/push_rule.py,
synapse/storage/relations.py,
synapse/storage/roommember.py,
synapse/storage/state.py,
synapse/storage/types.py,
synapse/storage/util,
synapse/streams,
synapse/types.py,
synapse/util,
synapse/visibility.py,
tests/replication,
tests/test_event_auth.py,
tests/test_utils,
tests/handlers/test_password_providers.py,
tests/handlers/test_room.py,
tests/handlers/test_room_summary.py,
tests/handlers/test_send_email.py,
tests/handlers/test_sync.py,
tests/handlers/test_user_directory.py,
tests/rest/client/test_login.py,
tests/rest/client/test_auth.py,
tests/rest/client/test_relations.py,
tests/rest/media/v1/test_filepath.py,
tests/rest/media/v1/test_oembed.py,
tests/storage/test_state.py,
tests/storage/test_user_directory.py,
tests/util/test_itertools.py,
tests/util/test_stream_change_cache.py
setup.py,
synapse/,
tests/
# Note: Better exclusion syntax coming in mypy > 0.910
# https://github.com/python/mypy/pull/11329
#
# For now, set the (?x) flag enable "verbose" regexes
# https://docs.python.org/3/library/re.html#re.X
exclude = (?x)
^(
|synapse/_scripts/register_new_matrix_user.py
|synapse/_scripts/review_recent_signups.py
|synapse/app/__init__.py
|synapse/app/_base.py
|synapse/app/admin_cmd.py
|synapse/app/appservice.py
|synapse/app/client_reader.py
|synapse/app/event_creator.py
|synapse/app/federation_reader.py
|synapse/app/federation_sender.py
|synapse/app/frontend_proxy.py
|synapse/app/generic_worker.py
|synapse/app/homeserver.py
|synapse/app/media_repository.py
|synapse/app/phone_stats_home.py
|synapse/app/pusher.py
|synapse/app/synchrotron.py
|synapse/app/user_dir.py
|synapse/storage/databases/__init__.py
|synapse/storage/databases/main/__init__.py
|synapse/storage/databases/main/account_data.py
|synapse/storage/databases/main/cache.py
|synapse/storage/databases/main/censor_events.py
|synapse/storage/databases/main/deviceinbox.py
|synapse/storage/databases/main/devices.py
|synapse/storage/databases/main/directory.py
|synapse/storage/databases/main/e2e_room_keys.py
|synapse/storage/databases/main/end_to_end_keys.py
|synapse/storage/databases/main/event_federation.py
|synapse/storage/databases/main/event_push_actions.py
|synapse/storage/databases/main/events_bg_updates.py
|synapse/storage/databases/main/events_forward_extremities.py
|synapse/storage/databases/main/events_worker.py
|synapse/storage/databases/main/filtering.py
|synapse/storage/databases/main/group_server.py
|synapse/storage/databases/main/lock.py
|synapse/storage/databases/main/media_repository.py
|synapse/storage/databases/main/metrics.py
|synapse/storage/databases/main/monthly_active_users.py
|synapse/storage/databases/main/openid.py
|synapse/storage/databases/main/presence.py
|synapse/storage/databases/main/profile.py
|synapse/storage/databases/main/purge_events.py
|synapse/storage/databases/main/push_rule.py
|synapse/storage/databases/main/receipts.py
|synapse/storage/databases/main/rejections.py
|synapse/storage/databases/main/room.py
|synapse/storage/databases/main/room_batch.py
|synapse/storage/databases/main/roommember.py
|synapse/storage/databases/main/search.py
|synapse/storage/databases/main/signatures.py
|synapse/storage/databases/main/state.py
|synapse/storage/databases/main/state_deltas.py
|synapse/storage/databases/main/stats.py
|synapse/storage/databases/main/tags.py
|synapse/storage/databases/main/transactions.py
|synapse/storage/databases/main/user_directory.py
|synapse/storage/databases/main/user_erasure_store.py
|synapse/storage/schema/
|tests/api/test_auth.py
|tests/api/test_ratelimiting.py
|tests/app/test_openid_listener.py
|tests/appservice/test_scheduler.py
|tests/config/test_cache.py
|tests/config/test_tls.py
|tests/crypto/test_keyring.py
|tests/events/test_presence_router.py
|tests/events/test_utils.py
|tests/federation/test_federation_catch_up.py
|tests/federation/test_federation_sender.py
|tests/federation/test_federation_server.py
|tests/federation/transport/test_knocking.py
|tests/federation/transport/test_server.py
|tests/handlers/test_cas.py
|tests/handlers/test_directory.py
|tests/handlers/test_e2e_keys.py
|tests/handlers/test_federation.py
|tests/handlers/test_oidc.py
|tests/handlers/test_presence.py
|tests/handlers/test_profile.py
|tests/handlers/test_saml.py
|tests/handlers/test_typing.py
|tests/http/federation/test_matrix_federation_agent.py
|tests/http/federation/test_srv_resolver.py
|tests/http/test_fedclient.py
|tests/http/test_proxyagent.py
|tests/http/test_servlet.py
|tests/http/test_site.py
|tests/logging/__init__.py
|tests/logging/test_terse_json.py
|tests/module_api/test_api.py
|tests/push/test_email.py
|tests/push/test_http.py
|tests/push/test_presentable_names.py
|tests/push/test_push_rule_evaluator.py
|tests/rest/admin/test_admin.py
|tests/rest/admin/test_device.py
|tests/rest/admin/test_media.py
|tests/rest/admin/test_server_notice.py
|tests/rest/admin/test_user.py
|tests/rest/admin/test_username_available.py
|tests/rest/client/test_account.py
|tests/rest/client/test_events.py
|tests/rest/client/test_filter.py
|tests/rest/client/test_groups.py
|tests/rest/client/test_register.py
|tests/rest/client/test_report_event.py
|tests/rest/client/test_rooms.py
|tests/rest/client/test_third_party_rules.py
|tests/rest/client/test_transactions.py
|tests/rest/client/test_typing.py
|tests/rest/client/utils.py
|tests/rest/key/v2/test_remote_key_resource.py
|tests/rest/media/v1/test_base.py
|tests/rest/media/v1/test_media_storage.py
|tests/rest/media/v1/test_url_preview.py
|tests/scripts/test_new_matrix_user.py
|tests/server.py
|tests/server_notices/test_resource_limits_server_notices.py
|tests/state/test_v2.py
|tests/storage/test_account_data.py
|tests/storage/test_appservice.py
|tests/storage/test_background_update.py
|tests/storage/test_base.py
|tests/storage/test_client_ips.py
|tests/storage/test_database.py
|tests/storage/test_event_federation.py
|tests/storage/test_id_generators.py
|tests/storage/test_roommember.py
|tests/test_metrics.py
|tests/test_phone_home.py
|tests/test_server.py
|tests/test_state.py
|tests/test_terms_auth.py
|tests/test_visibility.py
|tests/unittest.py
|tests/util/caches/test_cached_call.py
|tests/util/caches/test_deferred_cache.py
|tests/util/caches/test_descriptors.py
|tests/util/caches/test_response_cache.py
|tests/util/caches/test_ttlcache.py
|tests/util/test_async_helpers.py
|tests/util/test_batching_queue.py
|tests/util/test_dict_cache.py
|tests/util/test_expiring_cache.py
|tests/util/test_file_consumer.py
|tests/util/test_linearizer.py
|tests/util/test_logcontext.py
|tests/util/test_lrucache.py
|tests/util/test_rwlock.py
|tests/util/test_wheel_timer.py
|tests/utils.py
)$
[mypy-synapse.api.*]
disallow_untyped_defs = True
@ -278,6 +363,9 @@ ignore_missing_imports = True
[mypy-opentracing]
ignore_missing_imports = True
[mypy-parameterized.*]
ignore_missing_imports = True
[mypy-phonenumbers.*]
ignore_missing_imports = True

View file

@ -43,6 +43,7 @@ from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyBackground
from synapse.storage.databases.main.events_bg_updates import (
EventsBackgroundUpdatesStore,
)
from synapse.storage.databases.main.group_server import GroupServerWorkerStore
from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
@ -181,6 +182,7 @@ class Store(
StatsStore,
PusherWorkerStore,
PresenceBackgroundUpdateStore,
GroupServerWorkerStore,
):
def execute(self, f, *args, **kwargs):
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)

View file

@ -17,6 +17,7 @@
# limitations under the License.
import glob
import os
from typing import Any, Dict
from setuptools import Command, find_packages, setup
@ -49,8 +50,6 @@ here = os.path.abspath(os.path.dirname(__file__))
# [1]: http://tox.readthedocs.io/en/2.5.0/example/basic.html#integration-with-setup-py-test-command
# [2]: https://pypi.python.org/pypi/setuptools_trial
class TestCommand(Command):
user_options = []
def initialize_options(self):
pass
@ -75,7 +74,7 @@ def read_file(path_segments):
def exec_file(path_segments):
"""Execute a single python file to get the variables defined in it"""
result = {}
result: Dict[str, Any] = {}
code = read_file(path_segments)
exec(code, result)
return result
@ -132,6 +131,9 @@ CONDITIONAL_REQUIREMENTS["dev"] = (
"GitPython==3.1.14",
"commonmark==0.9.1",
"pygithub==1.55",
# The following are executed as commands by the release script.
"twine",
"towncrier",
]
)

View file

@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.46.0rc1"
__version__ = "1.46.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View file

@ -1,7 +1,7 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -86,6 +86,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
# cf https://github.com/matrix-org/matrix-doc/pull/2326
"org.matrix.labels": {"type": "array", "items": {"type": "string"}},
"org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
# MSC3440, filtering by event relations.
"io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
"io.element.relation_types": {"type": "array", "items": {"type": "string"}},
},
}
@ -146,14 +149,16 @@ def matrix_user_id_validator(user_id_str: str) -> UserID:
class Filtering:
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.store = hs.get_datastore()
self.DEFAULT_FILTER_COLLECTION = FilterCollection(hs, {})
async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
) -> "FilterCollection":
result = await self.store.get_user_filter(user_localpart, filter_id)
return FilterCollection(result)
return FilterCollection(self._hs, result)
def add_user_filter(
self, user_localpart: str, user_filter: JsonDict
@ -191,21 +196,22 @@ FilterEvent = TypeVar("FilterEvent", EventBase, UserPresenceState, JsonDict)
class FilterCollection:
def __init__(self, filter_json: JsonDict):
def __init__(self, hs: "HomeServer", filter_json: JsonDict):
self._filter_json = filter_json
room_filter_json = self._filter_json.get("room", {})
self._room_filter = Filter(
{k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")}
hs,
{k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")},
)
self._room_timeline_filter = Filter(room_filter_json.get("timeline", {}))
self._room_state_filter = Filter(room_filter_json.get("state", {}))
self._room_ephemeral_filter = Filter(room_filter_json.get("ephemeral", {}))
self._room_account_data = Filter(room_filter_json.get("account_data", {}))
self._presence_filter = Filter(filter_json.get("presence", {}))
self._account_data = Filter(filter_json.get("account_data", {}))
self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {}))
self._room_state_filter = Filter(hs, room_filter_json.get("state", {}))
self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {}))
self._room_account_data = Filter(hs, room_filter_json.get("account_data", {}))
self._presence_filter = Filter(hs, filter_json.get("presence", {}))
self._account_data = Filter(hs, filter_json.get("account_data", {}))
self.include_leave = filter_json.get("room", {}).get("include_leave", False)
self.event_fields = filter_json.get("event_fields", [])
@ -232,25 +238,37 @@ class FilterCollection:
def include_redundant_members(self) -> bool:
return self._room_state_filter.include_redundant_members
def filter_presence(
async def filter_presence(
self, events: Iterable[UserPresenceState]
) -> List[UserPresenceState]:
return self._presence_filter.filter(events)
return await self._presence_filter.filter(events)
def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
return self._account_data.filter(events)
async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
return await self._account_data.filter(events)
def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
return self._room_state_filter.filter(self._room_filter.filter(events))
async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
return await self._room_state_filter.filter(
await self._room_filter.filter(events)
)
def filter_room_timeline(self, events: Iterable[EventBase]) -> List[EventBase]:
return self._room_timeline_filter.filter(self._room_filter.filter(events))
async def filter_room_timeline(
self, events: Iterable[EventBase]
) -> List[EventBase]:
return await self._room_timeline_filter.filter(
await self._room_filter.filter(events)
)
def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
return self._room_ephemeral_filter.filter(self._room_filter.filter(events))
async def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
return await self._room_ephemeral_filter.filter(
await self._room_filter.filter(events)
)
def filter_room_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
return self._room_account_data.filter(self._room_filter.filter(events))
async def filter_room_account_data(
self, events: Iterable[JsonDict]
) -> List[JsonDict]:
return await self._room_account_data.filter(
await self._room_filter.filter(events)
)
def blocks_all_presence(self) -> bool:
return (
@ -274,7 +292,9 @@ class FilterCollection:
class Filter:
def __init__(self, filter_json: JsonDict):
def __init__(self, hs: "HomeServer", filter_json: JsonDict):
self._hs = hs
self._store = hs.get_datastore()
self.filter_json = filter_json
self.limit = filter_json.get("limit", 10)
@ -297,6 +317,20 @@ class Filter:
self.labels = filter_json.get("org.matrix.labels", None)
self.not_labels = filter_json.get("org.matrix.not_labels", [])
# Ideally these would be rejected at the endpoint if they were provided
# and not supported, but that would involve modifying the JSON schema
# based on the homeserver configuration.
if hs.config.experimental.msc3440_enabled:
self.relation_senders = self.filter_json.get(
"io.element.relation_senders", None
)
self.relation_types = self.filter_json.get(
"io.element.relation_types", None
)
else:
self.relation_senders = None
self.relation_types = None
def filters_all_types(self) -> bool:
return "*" in self.not_types
@ -306,7 +340,7 @@ class Filter:
def filters_all_rooms(self) -> bool:
return "*" in self.not_rooms
def check(self, event: FilterEvent) -> bool:
def _check(self, event: FilterEvent) -> bool:
"""Checks whether the filter matches the given event.
Args:
@ -420,8 +454,30 @@ class Filter:
return room_ids
def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
return list(filter(self.check, events))
async def _check_event_relations(
self, events: Iterable[FilterEvent]
) -> List[FilterEvent]:
# The event IDs to check, mypy doesn't understand the ifinstance check.
event_ids = [event.event_id for event in events if isinstance(event, EventBase)] # type: ignore[attr-defined]
event_ids_to_keep = set(
await self._store.events_have_relations(
event_ids, self.relation_senders, self.relation_types
)
)
return [
event
for event in events
if not isinstance(event, EventBase) or event.event_id in event_ids_to_keep
]
async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
result = [event for event in events if self._check(event)]
if self.relation_senders or self.relation_types:
return await self._check_event_relations(result)
return result
def with_room_ids(self, room_ids: Iterable[str]) -> "Filter":
"""Returns a new filter with the given room IDs appended.
@ -433,7 +489,7 @@ class Filter:
filter: A new filter including the given rooms and the old
filter's rooms.
"""
newFilter = Filter(self.filter_json)
newFilter = Filter(self._hs, self.filter_json)
newFilter.rooms += room_ids
return newFilter
@ -444,6 +500,3 @@ def _matches_wildcard(actual_value: Optional[str], filter_value: str) -> bool:
return actual_value.startswith(type_prefix)
else:
return actual_value == filter_value
DEFAULT_FILTER_COLLECTION = FilterCollection({})

View file

@ -38,9 +38,6 @@ class ConsentURIBuilder:
def __init__(self, hs_config: HomeServerConfig):
if hs_config.key.form_secret is None:
raise ConfigError("form_secret not set in config")
if hs_config.server.public_baseurl is None:
raise ConfigError("public_baseurl not set in config")
self._hmac_secret = hs_config.key.form_secret.encode("utf-8")
self._public_baseurl = hs_config.server.public_baseurl

View file

@ -75,10 +75,6 @@ class AccountValidityConfig(Config):
self.account_validity_period * 10.0 / 100.0
)
if self.account_validity_renew_by_email_enabled:
if not self.root.server.public_baseurl:
raise ConfigError("Can't send renewal emails without 'public_baseurl'")
# Load account validity templates.
account_validity_template_dir = account_validity_config.get("template_dir")
if account_validity_template_dir is not None:

View file

@ -16,7 +16,7 @@ from typing import Any, List
from synapse.config.sso import SsoAttributeRequirement
from ._base import Config, ConfigError
from ._base import Config
from ._util import validate_config
@ -35,14 +35,10 @@ class CasConfig(Config):
if self.cas_enabled:
self.cas_server_url = cas_config["server_url"]
# The public baseurl is required because it is used by the redirect
# template.
public_baseurl = self.root.server.public_baseurl
if not public_baseurl:
raise ConfigError("cas_config requires a public_baseurl to be set")
# TODO Update this to a _synapse URL.
public_baseurl = self.root.server.public_baseurl
self.cas_service_url = public_baseurl + "_matrix/client/r0/login/cas/ticket"
self.cas_displayname_attribute = cas_config.get("displayname_attribute")
required_attributes = cas_config.get("required_attributes") or {}
self.cas_required_attributes = _parsed_required_attributes_def(

View file

@ -186,11 +186,6 @@ class EmailConfig(Config):
if not self.email_notif_from:
missing.append("email.notif_from")
# public_baseurl is required to build password reset and validation links that
# will be emailed to users
if config.get("public_baseurl") is None:
missing.append("public_baseurl")
if missing:
raise ConfigError(
MISSING_PASSWORD_RESET_CONFIG_ERROR % (", ".join(missing),)
@ -296,9 +291,6 @@ class EmailConfig(Config):
if not self.email_notif_from:
missing.append("email.notif_from")
if config.get("public_baseurl") is None:
missing.append("public_baseurl")
if missing:
raise ConfigError(
"email.enable_notifs is True but required keys are missing: %s"

View file

@ -59,8 +59,6 @@ class OIDCConfig(Config):
)
public_baseurl = self.root.server.public_baseurl
if public_baseurl is None:
raise ConfigError("oidc_config requires a public_baseurl to be set")
self.oidc_callback_url = public_baseurl + "_synapse/client/oidc/callback"
@property

View file

@ -45,17 +45,6 @@ class RegistrationConfig(Config):
account_threepid_delegates = config.get("account_threepid_delegates") or {}
self.account_threepid_delegate_email = account_threepid_delegates.get("email")
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
if (
self.account_threepid_delegate_msisdn
and not self.root.server.public_baseurl
):
raise ConfigError(
"The configuration option `public_baseurl` is required if "
"`account_threepid_delegate.msisdn` is set, such that "
"clients know where to submit validation tokens to. Please "
"configure `public_baseurl`."
)
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
@ -240,7 +229,7 @@ class RegistrationConfig(Config):
# in on this server.
#
# (By default, no suggestion is made, so it is left up to the client.
# This setting is ignored unless public_baseurl is also set.)
# This setting is ignored unless public_baseurl is also explicitly set.)
#
#default_identity_server: https://matrix.org
@ -265,8 +254,6 @@ class RegistrationConfig(Config):
# by the Matrix Identity Service API specification:
# https://matrix.org/docs/spec/identity_service/latest
#
# If a delegate is specified, the config option public_baseurl must also be filled out.
#
account_threepid_delegates:
#email: https://example.com # Delegate email sending to example.com
#msisdn: http://localhost:8090 # Delegate SMS sending to this local process

View file

@ -199,14 +199,11 @@ class SAML2Config(Config):
"""
import saml2
public_baseurl = self.root.server.public_baseurl
if public_baseurl is None:
raise ConfigError("saml2_config requires a public_baseurl to be set")
if self.saml2_grandfathered_mxid_source_attribute:
optional_attributes.add(self.saml2_grandfathered_mxid_source_attribute)
optional_attributes -= required_attributes
public_baseurl = self.root.server.public_baseurl
metadata_url = public_baseurl + "_synapse/client/saml2/metadata.xml"
response_url = public_baseurl + "_synapse/client/saml2/authn_response"
return {

View file

@ -16,6 +16,7 @@ import itertools
import logging
import os.path
import re
import urllib.parse
from textwrap import indent
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
@ -264,10 +265,44 @@ class ServerConfig(Config):
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.serve_server_wellknown = config.get("serve_server_wellknown", False)
self.public_baseurl = config.get("public_baseurl")
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
# Whether we should serve a "client well-known":
# (a) at .well-known/matrix/client on our client HTTP listener
# (b) in the response to /login
#
# ... which together help ensure that clients use our public_baseurl instead of
# whatever they were told by the user.
#
# For the sake of backwards compatibility with existing installations, this is
# True if public_baseurl is specified explicitly, and otherwise False. (The
# reasoning here is that we have no way of knowing that the default
# public_baseurl is actually correct for existing installations - many things
# will not work correctly, but that's (probably?) better than sending clients
# to a completely broken URL.
self.serve_client_wellknown = False
public_baseurl = config.get("public_baseurl")
if public_baseurl is None:
public_baseurl = f"https://{self.server_name}/"
logger.info("Using default public_baseurl %s", public_baseurl)
else:
self.serve_client_wellknown = True
if public_baseurl[-1] != "/":
public_baseurl += "/"
self.public_baseurl = public_baseurl
# check that public_baseurl is valid
try:
splits = urllib.parse.urlsplit(self.public_baseurl)
except Exception as e:
raise ConfigError(f"Unable to parse URL: {e}", ("public_baseurl",))
if splits.scheme not in ("https", "http"):
raise ConfigError(
f"Invalid scheme '{splits.scheme}': only https and http are supported"
)
if splits.query or splits.fragment:
raise ConfigError(
"public_baseurl cannot contain query parameters or a #-fragment"
)
# Whether to enable user presence.
presence_config = config.get("presence") or {}
@ -773,6 +808,8 @@ class ServerConfig(Config):
# Otherwise, it should be the URL to reach Synapse's client HTTP listener (see
# 'listeners' below).
#
# Defaults to 'https://<server_name>/'.
#
#public_baseurl: https://example.com/
# Uncomment the following to tell other servers to send federation traffic on

View file

@ -101,13 +101,10 @@ class SSOConfig(Config):
# gracefully to the client). This would make it pointless to ask the user for
# confirmation, since the URL the confirmation page would be showing wouldn't be
# the client's.
# public_baseurl is an optional setting, so we only add the fallback's URL to the
# list if it's provided (because we can't figure out what that URL is otherwise).
if self.root.server.public_baseurl:
login_fallback_url = (
self.root.server.public_baseurl + "_matrix/static/client/login"
)
self.sso_client_whitelist.append(login_fallback_url)
login_fallback_url = (
self.root.server.public_baseurl + "_matrix/static/client/login"
)
self.sso_client_whitelist.append(login_fallback_url)
def generate_config_section(self, **kwargs):
return """\
@ -128,11 +125,10 @@ class SSOConfig(Config):
# phishing attacks from evil.site. To avoid this, include a slash after the
# hostname: "https://my.client/".
#
# If public_baseurl is set, then the login fallback page (used by clients
# that don't natively support the required login flows) is whitelisted in
# addition to any URLs in this list.
# The login fallback page (used by clients that don't natively support the
# required login flows) is whitelisted in addition to any URLs in this list.
#
# By default, this list is empty.
# By default, this list contains only the login fallback page.
#
#client_whitelist:
# - https://riot.im/develop

View file

@ -63,7 +63,8 @@ class WriterLocations:
Attributes:
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
typing: The instances that write to the typing stream. Currently
can only be a single instance.
to_device: The instances that write to the to_device stream. Currently
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
@ -75,9 +76,15 @@ class WriterLocations:
"""
events = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)
typing = attr.ib(
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)
typing = attr.ib(default="master", type=str)
to_device = attr.ib(
default=["master"],
type=List[str],
@ -217,6 +224,11 @@ class WorkerConfig(Config):
% (instance, stream)
)
if len(self.writers.typing) != 1:
raise ConfigError(
"Must only specify one instance to handle `typing` messages."
)
if len(self.writers.to_device) != 1:
raise ConfigError(
"Must only specify one instance to handle `to_device` messages."

View file

@ -16,8 +16,23 @@
import abc
import os
from typing import Dict, Optional, Tuple, Type
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
Iterable,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
overload,
)
from typing_extensions import Literal
from unpaddedbase64 import encode_base64
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
@ -26,6 +41,9 @@ from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
from synapse.util.stringutils import strtobool
if TYPE_CHECKING:
from synapse.events.builder import EventBuilder
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
# bugs where we accidentally share e.g. signature dicts. However, converting a
# dict to frozen_dicts is expensive.
@ -37,7 +55,23 @@ from synapse.util.stringutils import strtobool
USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0"))
class DictProperty:
T = TypeVar("T")
# DictProperty (and DefaultDictProperty) require the classes they're used with to
# have a _dict property to pull properties from.
#
# TODO _DictPropertyInstance should not include EventBuilder but due to
# https://github.com/python/mypy/issues/5570 it thinks the DictProperty and
# DefaultDictProperty get applied to EventBuilder when it is in a Union with
# EventBase. This is the least invasive hack to get mypy to comply.
#
# Note that DictProperty/DefaultDictProperty cannot actually be used with
# EventBuilder as it lacks a _dict property.
_DictPropertyInstance = Union["_EventInternalMetadata", "EventBase", "EventBuilder"]
class DictProperty(Generic[T]):
"""An object property which delegates to the `_dict` within its parent object."""
__slots__ = ["key"]
@ -45,12 +79,33 @@ class DictProperty:
def __init__(self, key: str):
self.key = key
def __get__(self, instance, owner=None):
@overload
def __get__(
self,
instance: Literal[None],
owner: Optional[Type[_DictPropertyInstance]] = None,
) -> "DictProperty":
...
@overload
def __get__(
self,
instance: _DictPropertyInstance,
owner: Optional[Type[_DictPropertyInstance]] = None,
) -> T:
...
def __get__(
self,
instance: Optional[_DictPropertyInstance],
owner: Optional[Type[_DictPropertyInstance]] = None,
) -> Union[T, "DictProperty"]:
# if the property is accessed as a class property rather than an instance
# property, return the property itself rather than the value
if instance is None:
return self
try:
assert isinstance(instance, (EventBase, _EventInternalMetadata))
return instance._dict[self.key]
except KeyError as e1:
# We want this to look like a regular attribute error (mostly so that
@ -65,10 +120,12 @@ class DictProperty:
"'%s' has no '%s' property" % (type(instance), self.key)
) from e1.__context__
def __set__(self, instance, v):
def __set__(self, instance: _DictPropertyInstance, v: T) -> None:
assert isinstance(instance, (EventBase, _EventInternalMetadata))
instance._dict[self.key] = v
def __delete__(self, instance):
def __delete__(self, instance: _DictPropertyInstance) -> None:
assert isinstance(instance, (EventBase, _EventInternalMetadata))
try:
del instance._dict[self.key]
except KeyError as e1:
@ -77,7 +134,7 @@ class DictProperty:
) from e1.__context__
class DefaultDictProperty(DictProperty):
class DefaultDictProperty(DictProperty, Generic[T]):
"""An extension of DictProperty which provides a default if the property is
not present in the parent's _dict.
@ -86,13 +143,34 @@ class DefaultDictProperty(DictProperty):
__slots__ = ["default"]
def __init__(self, key, default):
def __init__(self, key: str, default: T):
super().__init__(key)
self.default = default
def __get__(self, instance, owner=None):
@overload
def __get__(
self,
instance: Literal[None],
owner: Optional[Type[_DictPropertyInstance]] = None,
) -> "DefaultDictProperty":
...
@overload
def __get__(
self,
instance: _DictPropertyInstance,
owner: Optional[Type[_DictPropertyInstance]] = None,
) -> T:
...
def __get__(
self,
instance: Optional[_DictPropertyInstance],
owner: Optional[Type[_DictPropertyInstance]] = None,
) -> Union[T, "DefaultDictProperty"]:
if instance is None:
return self
assert isinstance(instance, (EventBase, _EventInternalMetadata))
return instance._dict.get(self.key, self.default)
@ -111,22 +189,22 @@ class _EventInternalMetadata:
# in the DAG)
self.outlier = False
out_of_band_membership: bool = DictProperty("out_of_band_membership")
send_on_behalf_of: str = DictProperty("send_on_behalf_of")
recheck_redaction: bool = DictProperty("recheck_redaction")
soft_failed: bool = DictProperty("soft_failed")
proactively_send: bool = DictProperty("proactively_send")
redacted: bool = DictProperty("redacted")
txn_id: str = DictProperty("txn_id")
token_id: int = DictProperty("token_id")
historical: bool = DictProperty("historical")
out_of_band_membership: DictProperty[bool] = DictProperty("out_of_band_membership")
send_on_behalf_of: DictProperty[str] = DictProperty("send_on_behalf_of")
recheck_redaction: DictProperty[bool] = DictProperty("recheck_redaction")
soft_failed: DictProperty[bool] = DictProperty("soft_failed")
proactively_send: DictProperty[bool] = DictProperty("proactively_send")
redacted: DictProperty[bool] = DictProperty("redacted")
txn_id: DictProperty[str] = DictProperty("txn_id")
token_id: DictProperty[int] = DictProperty("token_id")
historical: DictProperty[bool] = DictProperty("historical")
# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
# be here
before: RoomStreamToken = DictProperty("before")
after: RoomStreamToken = DictProperty("after")
order: Tuple[int, int] = DictProperty("order")
before: DictProperty[RoomStreamToken] = DictProperty("before")
after: DictProperty[RoomStreamToken] = DictProperty("after")
order: DictProperty[Tuple[int, int]] = DictProperty("order")
def get_dict(self) -> JsonDict:
return dict(self._dict)
@ -162,9 +240,6 @@ class _EventInternalMetadata:
If the sender of the redaction event is allowed to redact any event
due to auth rules, then this will always return false.
Returns:
bool
"""
return self._dict.get("recheck_redaction", False)
@ -176,32 +251,23 @@ class _EventInternalMetadata:
sent to clients.
2. They should not be added to the forward extremities (and
therefore not to current state).
Returns:
bool
"""
return self._dict.get("soft_failed", False)
def should_proactively_send(self):
def should_proactively_send(self) -> bool:
"""Whether the event, if ours, should be sent to other clients and
servers.
This is used for sending dummy events internally. Servers and clients
can still explicitly fetch the event.
Returns:
bool
"""
return self._dict.get("proactively_send", True)
def is_redacted(self):
def is_redacted(self) -> bool:
"""Whether the event has been redacted.
This is used for efficiently checking whether an event has been
marked as redacted without needing to make another database call.
Returns:
bool
"""
return self._dict.get("redacted", False)
@ -241,29 +307,31 @@ class EventBase(metaclass=abc.ABCMeta):
self.internal_metadata = _EventInternalMetadata(internal_metadata_dict)
auth_events = DictProperty("auth_events")
depth = DictProperty("depth")
content = DictProperty("content")
hashes = DictProperty("hashes")
origin = DictProperty("origin")
origin_server_ts = DictProperty("origin_server_ts")
prev_events = DictProperty("prev_events")
redacts = DefaultDictProperty("redacts", None)
room_id = DictProperty("room_id")
sender = DictProperty("sender")
state_key = DictProperty("state_key")
type = DictProperty("type")
user_id = DictProperty("sender")
depth: DictProperty[int] = DictProperty("depth")
content: DictProperty[JsonDict] = DictProperty("content")
hashes: DictProperty[Dict[str, str]] = DictProperty("hashes")
origin: DictProperty[str] = DictProperty("origin")
origin_server_ts: DictProperty[int] = DictProperty("origin_server_ts")
redacts: DefaultDictProperty[Optional[str]] = DefaultDictProperty("redacts", None)
room_id: DictProperty[str] = DictProperty("room_id")
sender: DictProperty[str] = DictProperty("sender")
# TODO state_key should be Optional[str], this is generally asserted in Synapse
# by calling is_state() first (which ensures this), but it is hard (not possible?)
# to properly annotate that calling is_state() asserts that state_key exists
# and is non-None.
state_key: DictProperty[str] = DictProperty("state_key")
type: DictProperty[str] = DictProperty("type")
user_id: DictProperty[str] = DictProperty("sender")
@property
def event_id(self) -> str:
raise NotImplementedError()
@property
def membership(self):
def membership(self) -> str:
return self.content["membership"]
def is_state(self):
def is_state(self) -> bool:
return hasattr(self, "state_key") and self.state_key is not None
def get_dict(self) -> JsonDict:
@ -272,13 +340,13 @@ class EventBase(metaclass=abc.ABCMeta):
return d
def get(self, key, default=None):
def get(self, key: str, default: Optional[Any] = None) -> Any:
return self._dict.get(key, default)
def get_internal_metadata_dict(self):
def get_internal_metadata_dict(self) -> JsonDict:
return self.internal_metadata.get_dict()
def get_pdu_json(self, time_now=None) -> JsonDict:
def get_pdu_json(self, time_now: Optional[int] = None) -> JsonDict:
pdu_json = self.get_dict()
if time_now is not None and "age_ts" in pdu_json["unsigned"]:
@ -305,49 +373,46 @@ class EventBase(metaclass=abc.ABCMeta):
return template_json
def __set__(self, instance, value):
raise AttributeError("Unrecognized attribute %s" % (instance,))
def __getitem__(self, field):
def __getitem__(self, field: str) -> Optional[Any]:
return self._dict[field]
def __contains__(self, field):
def __contains__(self, field: str) -> bool:
return field in self._dict
def items(self):
def items(self) -> List[Tuple[str, Optional[Any]]]:
return list(self._dict.items())
def keys(self):
def keys(self) -> Iterable[str]:
return self._dict.keys()
def prev_event_ids(self):
def prev_event_ids(self) -> Sequence[str]:
"""Returns the list of prev event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's prev_events
The list of event IDs of this event's prev_events
"""
return [e for e, _ in self.prev_events]
return [e for e, _ in self._dict["prev_events"]]
def auth_event_ids(self):
def auth_event_ids(self) -> Sequence[str]:
"""Returns the list of auth event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's auth_events
The list of event IDs of this event's auth_events
"""
return [e for e, _ in self.auth_events]
return [e for e, _ in self._dict["auth_events"]]
def freeze(self):
def freeze(self) -> None:
"""'Freeze' the event dict, so it cannot be modified by accident"""
# this will be a no-op if the event dict is already frozen.
self._dict = freeze(self._dict)
def __str__(self):
def __str__(self) -> str:
return self.__repr__()
def __repr__(self):
def __repr__(self) -> str:
rejection = f"REJECTED={self.rejected_reason}, " if self.rejected_reason else ""
return (
@ -443,7 +508,7 @@ class FrozenEventV2(EventBase):
else:
frozen_dict = event_dict
self._event_id = None
self._event_id: Optional[str] = None
super().__init__(
frozen_dict,
@ -455,7 +520,7 @@ class FrozenEventV2(EventBase):
)
@property
def event_id(self):
def event_id(self) -> str:
# We have to import this here as otherwise we get an import loop which
# is hard to break.
from synapse.crypto.event_signing import compute_event_reference_hash
@ -465,23 +530,23 @@ class FrozenEventV2(EventBase):
self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1])
return self._event_id
def prev_event_ids(self):
def prev_event_ids(self) -> Sequence[str]:
"""Returns the list of prev event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's prev_events
The list of event IDs of this event's prev_events
"""
return self.prev_events
return self._dict["prev_events"]
def auth_event_ids(self):
def auth_event_ids(self) -> Sequence[str]:
"""Returns the list of auth event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's auth_events
The list of event IDs of this event's auth_events
"""
return self.auth_events
return self._dict["auth_events"]
class FrozenEventV3(FrozenEventV2):
@ -490,7 +555,7 @@ class FrozenEventV3(FrozenEventV2):
format_version = EventFormatVersions.V3 # All events of this type are V3
@property
def event_id(self):
def event_id(self) -> str:
# We have to import this here as otherwise we get an import loop which
# is hard to break.
from synapse.crypto.event_signing import compute_event_reference_hash
@ -503,12 +568,14 @@ class FrozenEventV3(FrozenEventV2):
return self._event_id
def _event_type_from_format_version(format_version: int) -> Type[EventBase]:
def _event_type_from_format_version(
format_version: int,
) -> Type[Union[FrozenEvent, FrozenEventV2, FrozenEventV3]]:
"""Returns the python type to use to construct an Event object for the
given event format version.
Args:
format_version (int): The event format version
format_version: The event format version
Returns:
type: A type that can be initialized as per the initializer of

View file

@ -55,7 +55,7 @@ class EventValidator:
]
for k in required:
if not hasattr(event, k):
if k not in event:
raise SynapseError(400, "Event does not have key %s" % (k,))
# Check that the following keys have string values

View file

@ -213,6 +213,11 @@ class FederationServer(FederationBase):
self._started_handling_of_staged_events = True
self._handle_old_staged_events()
# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
# without dropping its locks.
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
@ -1232,10 +1237,6 @@ class FederationHandlerRegistry:
self.query_handlers[query_type] = handler
def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
"""Register that the EDU handler is on a different instance than master."""
self._edu_type_to_instance[edu_type] = [instance_name]
def register_instances_for_edu(
self, edu_type: str, instance_names: List[str]
) -> None:

View file

@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@ -58,6 +59,10 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
self._ephemeral_events_linearizer = Linearizer(
name="appservice_ephemeral_events"
)
def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.
@ -182,8 +187,8 @@ class ApplicationServicesHandler:
def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Optional[int],
users: Optional[Collection[Union[str, UserID]]] = None,
new_token: Union[int, RoomStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
"""
This is called by the notifier in the background when an ephemeral event is handled
@ -198,32 +203,55 @@ class ApplicationServicesHandler:
value for `stream_key` will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
them.
receiving them by setting `push_ephemeral` to true in their registration
file. Note that while MSC2409 is experimental, this option is called
`de.sorunome.msc2409.push_ephemeral`.
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
new_token: The latest stream token.
new_token: The stream token of the event.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
return
# Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
# Assert that new_token is an integer (and not a RoomStreamToken).
# All of the supported streams that this function handles use an
# integer to track progress (rather than a RoomStreamToken - a
# vector clock implementation) as they don't support multiple
# stream writers.
#
# As a result, we simply assert that new_token is an integer.
# If we do end up needing to pass a RoomStreamToken down here
# in the future, using RoomStreamToken.stream (the minimum stream
# position) to convert to an ascending integer value should work.
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
# Note that whether these events are actually relevant to these appservices
# is decided later on.
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services:
# Bail out early if none of the target appservices have explicitly registered
# to receive these ephemeral events.
return
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
services, stream_key, new_token, users or []
services, stream_key, new_token, users
)
@wrap_as_background_process("notify_interested_services_ephemeral")
@ -231,14 +259,13 @@ class ApplicationServicesHandler:
self,
services: List[ApplicationService],
stream_key: str,
new_token: Optional[int],
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
if stream_key == "typing_key":
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
@ -248,26 +275,37 @@ class ApplicationServicesHandler:
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
continue
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
# Since we read/update the stream position for this AS/stream
with (
await self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
async def _handle_typing(
self, service: ApplicationService, new_token: int
@ -304,7 +342,9 @@ class ApplicationServicesHandler:
)
return typing
async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
@ -315,6 +355,9 @@ class ApplicationServicesHandler:
Args:
service: The application service to check for which events it should receive.
new_token: A receipts event stream token. Purely used to double-check that the
from_token we pull from the database isn't greater than or equal to this
token. Prevents accidentally duplicating work.
Returns:
A list of JSON dictionaries containing data derived from the read receipts that
@ -323,6 +366,12 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
if new_token is not None and new_token <= from_key:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
return []
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
@ -330,7 +379,10 @@ class ApplicationServicesHandler:
return receipts
async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
new_token: Optional[int],
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
@ -343,6 +395,9 @@ class ApplicationServicesHandler:
Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.
new_token: A presence update stream token. Purely used to double-check that the
from_token we pull from the database isn't greater than or equal to this
token. Prevents accidentally duplicating work.
Returns:
A list of json dictionaries containing data derived from the presence events
@ -353,6 +408,12 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
if new_token is not None and new_token <= from_key:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
return []
for user in users:
if isinstance(user, str):
user = UserID.from_string(user)

View file

@ -1989,7 +1989,9 @@ class PasswordAuthProvider:
self,
check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None,
auth_checkers: Optional[
Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
] = None,
) -> None:
# Register check_3pid_auth callback
if check_3pid_auth is not None:

View file

@ -89,6 +89,13 @@ class DeviceMessageHandler:
)
async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
"""
Handle receiving to-device messages from remote homeservers.
Args:
origin: The remote homeserver.
content: The JSON dictionary containing the to-device messages.
"""
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
@ -135,12 +142,16 @@ class DeviceMessageHandler:
message_type, sender_user_id, by_device
)
stream_id = await self.store.add_messages_from_remote_to_device_inbox(
# Add messages to the database.
# Retrieve the stream id of the last-processed to-device message.
last_stream_id = await self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", stream_id, users=local_messages.keys()
"to_device_key", last_stream_id, users=local_messages.keys()
)
async def _check_for_unknown_devices(
@ -195,6 +206,14 @@ class DeviceMessageHandler:
message_type: str,
messages: Dict[str, Dict[str, JsonDict]],
) -> None:
"""
Handle a request from a user to send to-device message(s).
Args:
requester: The user that is sending the to-device messages.
message_type: The type of to-device messages that are being sent.
messages: A dictionary containing recipients mapped to messages intended for them.
"""
sender_user_id = requester.user.to_string()
message_id = random_string(16)
@ -257,12 +276,16 @@ class DeviceMessageHandler:
"org.matrix.opentracing_context": json_encoder.encode(context),
}
stream_id = await self.store.add_messages_to_device_inbox(
# Add messages to the database.
# Retrieve the stream id of the last-processed to-device message.
last_stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", stream_id, users=local_messages.keys()
"to_device_key", last_stream_id, users=local_messages.keys()
)
if self.federation_sender:

View file

@ -201,95 +201,19 @@ class E2eKeysHandler:
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
@trace
async def do_remote_query(destination: str) -> None:
"""This is called when we are querying the device list of a user on
a remote homeserver and their device list is not in the device list
cache. If we share a room with this user and we're not querying for
specific user we will update the cache with their device list.
"""
destination_query = remote_queries_not_in_cache[destination]
# We first consider whether we wish to update the device list cache with
# the users device list. We want to track a user's devices when the
# authenticated user shares a room with the queried user and the query
# has not specified a particular device.
# If we update the cache for the queried user we remove them from further
# queries. We use the more efficient batched query_client_keys for all
# remaining users
user_ids_updated = []
for (user_id, device_list) in destination_query.items():
if user_id in user_ids_updated:
continue
if device_list:
continue
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
continue
# We've decided we're sharing a room with this user and should
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
if self._is_master:
user_devices = await self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
user_devices = await self._user_device_resync_client(
user_id=user_id
)
user_devices = user_devices["devices"]
user_results = results.setdefault(user_id, {})
for device in user_devices:
user_results[device["device_id"]] = device["keys"]
user_ids_updated.append(user_id)
except Exception as e:
failures[destination] = _exception_to_failure(e)
if len(destination_query) == len(user_ids_updated):
# We've updated all the users in the query and we do not need to
# make any further remote calls.
return
# Remove all the users from the query which we have updated
for user_id in user_ids_updated:
destination_query.pop(user_id)
try:
remote_result = await self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
)
for user_id, keys in remote_result["device_keys"].items():
if user_id in destination_query:
results[user_id] = keys
if "master_keys" in remote_result:
for user_id, key in remote_result["master_keys"].items():
if user_id in destination_query:
cross_signing_keys["master_keys"][user_id] = key
if "self_signing_keys" in remote_result:
for user_id, key in remote_result["self_signing_keys"].items():
if user_id in destination_query:
cross_signing_keys["self_signing_keys"][user_id] = key
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)
await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache
run_in_background(
self._query_devices_for_destination,
results,
cross_signing_keys,
failures,
destination,
queries,
timeout,
)
for destination, queries in remote_queries_not_in_cache.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
@ -301,6 +225,121 @@ class E2eKeysHandler:
return ret
@trace
async def _query_devices_for_destination(
self,
results: JsonDict,
cross_signing_keys: JsonDict,
failures: Dict[str, JsonDict],
destination: str,
destination_query: Dict[str, Iterable[str]],
timeout: int,
) -> None:
"""This is called when we are querying the device list of a user on
a remote homeserver and their device list is not in the device list
cache. If we share a room with this user and we're not querying for
specific user we will update the cache with their device list.
Args:
results: A map from user ID to their device keys, which gets
updated with the newly fetched keys.
cross_signing_keys: Map from user ID to their cross signing keys,
which gets updated with the newly fetched keys.
failures: Map of destinations to failures that have occurred while
attempting to fetch keys.
destination: The remote server to query
destination_query: The query dict of devices to query the remote
server for.
timeout: The timeout for remote HTTP requests.
"""
# We first consider whether we wish to update the device list cache with
# the users device list. We want to track a user's devices when the
# authenticated user shares a room with the queried user and the query
# has not specified a particular device.
# If we update the cache for the queried user we remove them from further
# queries. We use the more efficient batched query_client_keys for all
# remaining users
user_ids_updated = []
for (user_id, device_list) in destination_query.items():
if user_id in user_ids_updated:
continue
if device_list:
continue
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
continue
# We've decided we're sharing a room with this user and should
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
if self._is_master:
resync_results = await self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
resync_results = await self._user_device_resync_client(
user_id=user_id
)
# Add the device keys to the results.
user_devices = resync_results["devices"]
user_results = results.setdefault(user_id, {})
for device in user_devices:
user_results[device["device_id"]] = device["keys"]
user_ids_updated.append(user_id)
# Add any cross signing keys to the results.
master_key = resync_results.get("master_key")
self_signing_key = resync_results.get("self_signing_key")
if master_key:
cross_signing_keys["master_keys"][user_id] = master_key
if self_signing_key:
cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
except Exception as e:
failures[destination] = _exception_to_failure(e)
if len(destination_query) == len(user_ids_updated):
# We've updated all the users in the query and we do not need to
# make any further remote calls.
return
# Remove all the users from the query which we have updated
for user_id in user_ids_updated:
destination_query.pop(user_id)
try:
remote_result = await self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
)
for user_id, keys in remote_result["device_keys"].items():
if user_id in destination_query:
results[user_id] = keys
if "master_keys" in remote_result:
for user_id, key in remote_result["master_keys"].items():
if user_id in destination_query:
cross_signing_keys["master_keys"][user_id] = key
if "self_signing_keys" in remote_result:
for user_id, key in remote_result["self_signing_keys"].items():
if user_id in destination_query:
cross_signing_keys["self_signing_keys"][user_id] = key
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)
return
async def get_cross_signing_keys_from_cache(
self, query: Iterable[str], from_user_id: Optional[str]
) -> Dict[str, Dict[str, dict]]:

View file

@ -1643,7 +1643,7 @@ class FederationEventHandler:
event: the event whose auth_events we want
Returns:
all of the events in `event.auth_events`, after deduplication
all of the events listed in `event.auth_events_ids`, after deduplication
Raises:
AuthError if we were unable to fetch the auth_events for any reason.

View file

@ -537,10 +537,6 @@ class IdentityHandler:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
# It is already checked that public_baseurl is configured since this code
# should only be used if account_threepid_delegate_msisdn is true.
assert self.hs.config.server.public_baseurl
# we need to tell the client to send the token back to us, since it doesn't
# otherwise know where to send it, so add submit_url response parameter
# (see also MSC2078)

View file

@ -1318,6 +1318,8 @@ class EventCreationHandler:
# user is actually admin or not).
is_admin_redaction = False
if event.type == EventTypes.Redaction:
assert event.redacts is not None
original_event = await self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
@ -1413,6 +1415,8 @@ class EventCreationHandler:
)
if event.type == EventTypes.Redaction:
assert event.redacts is not None
original_event = await self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
@ -1500,11 +1504,13 @@ class EventCreationHandler:
next_batch_id = event.content.get(
EventContentFields.MSC2716_NEXT_BATCH_ID
)
conflicting_insertion_event_id = (
await self.store.get_insertion_event_by_batch_id(
event.room_id, next_batch_id
conflicting_insertion_event_id = None
if next_batch_id:
conflicting_insertion_event_id = (
await self.store.get_insertion_event_id_by_batch_id(
event.room_id, next_batch_id
)
)
)
if conflicting_insertion_event_id is not None:
# The current insertion event that we're processing is invalid
# because an insertion event already exists in the room with the

View file

@ -424,7 +424,7 @@ class PaginationHandler:
if events:
if event_filter:
events = event_filter.filter(events)
events = await event_filter.filter(events)
events = await filter_events_for_client(
self.storage, user_id, events, is_peeking=(member_event_id is None)

View file

@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains functions for performing events on rooms."""
"""Contains functions for performing actions on rooms."""
import itertools
import logging
import math
@ -31,6 +30,8 @@ from typing import (
Tuple,
)
from typing_extensions import TypedDict
from synapse.api.constants import (
EventContentFields,
EventTypes,
@ -525,7 +526,7 @@ class RoomCreationHandler:
):
await self.room_member_handler.update_membership(
requester,
UserID.from_string(old_event["state_key"]),
UserID.from_string(old_event.state_key),
new_room_id,
"ban",
ratelimit=False,
@ -1158,8 +1159,10 @@ class RoomContextHandler:
)
if event_filter:
results["events_before"] = event_filter.filter(results["events_before"])
results["events_after"] = event_filter.filter(results["events_after"])
results["events_before"] = await event_filter.filter(
results["events_before"]
)
results["events_after"] = await event_filter.filter(results["events_after"])
results["events_before"] = await filter_evts(results["events_before"])
results["events_after"] = await filter_evts(results["events_after"])
@ -1195,7 +1198,7 @@ class RoomContextHandler:
state_events = list(state[last_event_id].values())
if event_filter:
state_events = event_filter.filter(state_events)
state_events = await event_filter.filter(state_events)
results["state"] = await filter_evts(state_events)
@ -1275,6 +1278,13 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_room_events_max_id(room_id)
class ShutdownRoomResponse(TypedDict):
kicked_users: List[str]
failed_to_kick_users: List[str]
local_aliases: List[str]
new_room_id: Optional[str]
class RoomShutdownHandler:
DEFAULT_MESSAGE = (
@ -1300,7 +1310,7 @@ class RoomShutdownHandler:
new_room_name: Optional[str] = None,
message: Optional[str] = None,
block: bool = False,
) -> dict:
) -> ShutdownRoomResponse:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@ -1334,8 +1344,13 @@ class RoomShutdownHandler:
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
If set to `True`, users will be prevented from joining the old
room. This option can also be used to pre-emptively block a room,
even if it's unknown to this homeserver. In this case, the room
will be blocked, and no further action will be taken. If `False`,
attempting to delete an unknown room is invalid.
Defaults to `False`.
Returns: a dict containing the following keys:
kicked_users: An array of users (`user_id`) that were kicked.
@ -1344,7 +1359,9 @@ class RoomShutdownHandler:
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
new_room_id: A string representing the room ID of the new room.
new_room_id:
A string representing the room ID of the new room, or None if
no such room was created.
"""
if not new_room_name:
@ -1355,14 +1372,28 @@ class RoomShutdownHandler:
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
if not await self.store.get_room(room_id):
raise NotFoundError("Unknown room id %s" % (room_id,))
# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
# Action the block first (even if the room doesn't exist yet)
if block:
# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
await self.store.block_room(room_id, requester_user_id)
if not await self.store.get_room(room_id):
if block:
# We allow you to block an unknown room.
return {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
else:
# But if you don't want to preventatively block another room,
# this function can't do anything useful.
raise NotFoundError(
"Cannot shut down room: unknown room id %s" % (room_id,)
)
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(

View file

@ -355,7 +355,7 @@ class RoomBatchHandler:
for (event, context) in reversed(events_to_persist):
await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event["sender"], app_service_requester.app_service
event.sender, app_service_requester.app_service
),
event=event,
context=context,

View file

@ -1669,7 +1669,9 @@ class RoomMemberMasterHandler(RoomMemberHandler):
#
# the prev_events consist solely of the previous membership event.
prev_event_ids = [previous_membership_event.event_id]
auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids
auth_event_ids = (
list(previous_membership_event.auth_event_ids()) + prev_event_ids
)
event, context = await self.event_creation_handler.create_event(
requester,

View file

@ -180,7 +180,7 @@ class SearchHandler:
% (set(group_keys) - {"room_id", "sender"},),
)
search_filter = Filter(filter_dict)
search_filter = Filter(self.hs, filter_dict)
# TODO: Search through left rooms too
rooms = await self.store.get_rooms_for_local_user_where_membership_is(
@ -242,7 +242,7 @@ class SearchHandler:
rank_map.update({r["event"].event_id: r["rank"] for r in results})
filtered_events = search_filter.filter([r["event"] for r in results])
filtered_events = await search_filter.filter([r["event"] for r in results])
events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events
@ -292,7 +292,9 @@ class SearchHandler:
rank_map.update({r["event"].event_id: r["rank"] for r in results})
filtered_events = search_filter.filter([r["event"] for r in results])
filtered_events = await search_filter.filter(
[r["event"] for r in results]
)
events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events

View file

@ -510,7 +510,7 @@ class SyncHandler:
log_kv({"limited": limited})
if potential_recents:
recents = sync_config.filter_collection.filter_room_timeline(
recents = await sync_config.filter_collection.filter_room_timeline(
potential_recents
)
log_kv({"recents_after_sync_filtering": len(recents)})
@ -575,8 +575,8 @@ class SyncHandler:
log_kv({"loaded_recents": len(events)})
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
loaded_recents = (
await sync_config.filter_collection.filter_room_timeline(events)
)
log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)})
@ -1015,7 +1015,7 @@ class SyncHandler:
return {
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(
for e in await sync_config.filter_collection.filter_room_state(
list(state.values())
)
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
@ -1383,7 +1383,7 @@ class SyncHandler:
sync_config.user
)
account_data_for_user = sync_config.filter_collection.filter_account_data(
account_data_for_user = await sync_config.filter_collection.filter_account_data(
[
{"type": account_data_type, "content": content}
for account_data_type, content in account_data.items()
@ -1448,7 +1448,7 @@ class SyncHandler:
# Deduplicate the presence entries so that there's at most one per user
presence = list({p.user_id: p for p in presence}.values())
presence = sync_config.filter_collection.filter_presence(presence)
presence = await sync_config.filter_collection.filter_presence(presence)
sync_result_builder.presence = presence
@ -2021,12 +2021,14 @@ class SyncHandler:
)
account_data_events = (
sync_config.filter_collection.filter_room_account_data(
await sync_config.filter_collection.filter_room_account_data(
account_data_events
)
)
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
ephemeral = await sync_config.filter_collection.filter_room_ephemeral(
ephemeral
)
if not (
always_include

View file

@ -62,8 +62,8 @@ class FollowerTypingHandler:
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
if hs.config.worker.writers.typing != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(
if hs.get_instance_name() not in hs.config.worker.writers.typing:
hs.get_federation_registry().register_instances_for_edu(
"m.typing",
hs.config.worker.writers.typing,
)
@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
assert hs.config.worker.writers.typing == hs.get_instance_name()
assert hs.get_instance_name() in hs.config.worker.writers.typing
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()

View file

@ -383,29 +383,6 @@ class Notifier:
except Exception:
logger.exception("Error notifying application services of event")
def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""Notify application services of ephemeral event activity.
Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event, if any.
"""
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users or []
)
except Exception:
logger.exception("Error notifying application services of event")
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
self._pusher_pool.on_new_notifications(max_room_stream_token)
@ -467,12 +444,15 @@ class Notifier:
self.notify_replication()
# Notify appservices
self._notify_app_services_ephemeral(
stream_key,
new_token,
users,
)
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened

View file

@ -232,6 +232,8 @@ class BulkPushRuleEvaluator:
# that user, as they might not be already joined.
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
if not isinstance(display_name, str):
display_name = None
if count_as_unread:
# Add an element for the current user if the event needs to be marked as
@ -268,7 +270,7 @@ def _condition_checker(
evaluator: PushRuleEvaluatorForEvent,
conditions: List[dict],
uid: str,
display_name: str,
display_name: Optional[str],
cache: Dict[str, bool],
) -> bool:
for cond in conditions:

View file

@ -18,7 +18,7 @@ import re
from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
from synapse.events import EventBase
from synapse.types import UserID
from synapse.types import JsonDict, UserID
from synapse.util import glob_to_regex, re_word_boundary
from synapse.util.caches.lrucache import LruCache
@ -129,7 +129,7 @@ class PushRuleEvaluatorForEvent:
self._value_cache = _flatten_dict(event)
def matches(
self, condition: Dict[str, Any], user_id: str, display_name: str
self, condition: Dict[str, Any], user_id: str, display_name: Optional[str]
) -> bool:
if condition["kind"] == "event_match":
return self._event_match(condition, user_id)
@ -172,7 +172,7 @@ class PushRuleEvaluatorForEvent:
return _glob_matches(pattern, haystack)
def _contains_display_name(self, display_name: str) -> bool:
def _contains_display_name(self, display_name: Optional[str]) -> bool:
if not display_name:
return False
@ -222,7 +222,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
def _flatten_dict(
d: Union[EventBase, dict],
d: Union[EventBase, JsonDict],
prefix: Optional[List[str]] = None,
result: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
@ -233,7 +233,7 @@ def _flatten_dict(
for key, value in d.items():
if isinstance(value, str):
result[".".join(prefix + [key])] = value.lower()
elif hasattr(value, "items"):
elif isinstance(value, dict):
_flatten_dict(value, prefix=(prefix + [key]), result=result)
return result

View file

@ -138,7 +138,7 @@ class ReplicationCommandHandler:
if isinstance(stream, TypingStream):
# Only add TypingStream as a source on the instance in charge of
# typing.
if hs.config.worker.writers.typing == hs.get_instance_name():
if hs.get_instance_name() in hs.config.worker.writers.typing:
self._streams_to_replicate.append(stream)
continue

View file

@ -328,8 +328,7 @@ class TypingStream(Stream):
ROW_TYPE = TypingStreamRow
def __init__(self, hs: "HomeServer"):
writer_instance = hs.config.worker.writers.typing
if writer_instance == hs.get_instance_name():
if hs.get_instance_name() in hs.config.worker.writers.typing:
# On the writer, query the typing handler
typing_writer_handler = hs.get_typing_writer_handler()
update_function: Callable[

View file

@ -25,6 +25,10 @@ from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
from synapse.rest.admin.background_updates import (
BackgroundUpdateEnabledRestServlet,
BackgroundUpdateRestServlet,
)
from synapse.rest.admin.devices import (
DeleteDevicesRestServlet,
DeviceRestServlet,
@ -247,6 +251,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
# Some servlets only get registered for the main process.
if hs.config.worker.worker_app is None:
SendServerNoticeServlet(hs).register(http_server)
BackgroundUpdateEnabledRestServlet(hs).register(http_server)
BackgroundUpdateRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(

View file

@ -0,0 +1,107 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Tuple
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class BackgroundUpdateEnabledRestServlet(RestServlet):
"""Allows temporarily disabling background updates"""
PATTERNS = admin_patterns("/background_updates/enabled")
def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
self.data_stores = hs.get_datastores()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
return 200, {"enabled": enabled}
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
body = parse_json_object_from_request(request)
enabled = body.get("enabled", True)
if not isinstance(enabled, bool):
raise SynapseError(400, "'enabled' parameter must be a boolean")
for db in self.data_stores.databases:
db.updates.enabled = enabled
# If we're re-enabling them ensure that we start the background
# process again.
if enabled:
db.updates.start_doing_background_updates()
return 200, {"enabled": enabled}
class BackgroundUpdateRestServlet(RestServlet):
"""Fetch information about background updates"""
PATTERNS = admin_patterns("/background_updates/status")
def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
self.data_stores = hs.get_datastores()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
current_updates = {}
for db in self.data_stores.databases:
update = db.updates.get_current_update()
if not update:
continue
current_updates[db.name()] = {
"name": update.name,
"total_item_count": update.total_item_count,
"total_duration_ms": update.total_duration_ms,
"average_items_per_ms": update.average_items_per_ms(),
}
return 200, {"enabled": enabled, "current_updates": current_updates}

View file

@ -13,7 +13,7 @@
# limitations under the License.
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from urllib import parse as urlparse
from synapse.api.constants import EventTypes, JoinRules, Membership
@ -239,9 +239,22 @@ class RoomRestServlet(RestServlet):
# Purge room
if purge:
await pagination_handler.purge_room(room_id, force=force_purge)
try:
await pagination_handler.purge_room(room_id, force=force_purge)
except NotFoundError:
if block:
# We can block unknown rooms with this endpoint, in which case
# a failed purge is expected.
pass
else:
# But otherwise, we expect this purge to have succeeded.
raise
return 200, ret
# Cast safety: cast away the knowledge that this is a TypedDict.
# See https://github.com/python/mypy/issues/4976#issuecomment-579883622
# for some discussion on why this is necessary. Either way,
# `ret` is an opaque dictionary blob as far as the rest of the app cares.
return 200, cast(JsonDict, ret)
class RoomMembersRestServlet(RestServlet):
@ -583,6 +596,7 @@ class RoomEventContextServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler()
self._event_serializer = hs.get_event_client_serializer()
@ -600,7 +614,9 @@ class RoomEventContextServlet(RestServlet):
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
event_filter: Optional[Filter] = Filter(
self._hs, json_decoder.decode(filter_json)
)
else:
event_filter = None

View file

@ -13,10 +13,12 @@
# limitations under the License.
import logging
import re
from typing import TYPE_CHECKING, Tuple
from synapse.api.constants import ReadReceiptEventFields
from synapse.api.errors import Codes, SynapseError
from synapse.http import get_request_user_agent
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
@ -24,6 +26,8 @@ from synapse.types import JsonDict
from ._base import client_patterns
pattern = re.compile(r"(?:Element|SchildiChat)/1\.[012]\.")
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -52,7 +56,13 @@ class ReceiptRestServlet(RestServlet):
if receipt_type != "m.read":
raise SynapseError(400, "Receipt type must be 'm.read'")
body = parse_json_object_from_request(request, allow_empty_body=True)
# Do not allow older SchildiChat and Element Android clients (prior to Element/1.[012].x) to send an empty body.
user_agent = get_request_user_agent(request)
allow_empty_body = False
if "Android" in user_agent:
if pattern.match(user_agent) or "Riot" in user_agent:
allow_empty_body = True
body = parse_json_object_from_request(request, allow_empty_body)
hidden = body.get(ReadReceiptEventFields.MSC2285_HIDDEN, False)
if not isinstance(hidden, bool):

View file

@ -298,7 +298,9 @@ class RelationAggregationPaginationServlet(RestServlet):
raise SynapseError(404, "Unknown parent event.")
if relation_type not in (RelationTypes.ANNOTATION, None):
raise SynapseError(400, "Relation type must be 'annotation'")
raise SynapseError(
400, f"Relation type must be '{RelationTypes.ANNOTATION}'"
)
limit = parse_integer(request, "limit", default=5)
from_token_str = parse_string(request, "from")

View file

@ -550,6 +550,7 @@ class RoomMessageListRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()
@ -567,7 +568,9 @@ class RoomMessageListRestServlet(RestServlet):
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
event_filter: Optional[Filter] = Filter(
self._hs, json_decoder.decode(filter_json)
)
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
@ -672,6 +675,7 @@ class RoomEventContextServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler()
self._event_serializer = hs.get_event_client_serializer()
@ -688,7 +692,9 @@ class RoomEventContextServlet(RestServlet):
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
event_filter: Optional[Filter] = Filter(
self._hs, json_decoder.decode(filter_json)
)
else:
event_filter = None
@ -914,7 +920,7 @@ class RoomTypingRestServlet(RestServlet):
# If we're not on the typing writer instance we should scream if we get
# requests.
self._is_typing_writer = (
hs.config.worker.writers.typing == hs.get_instance_name()
hs.get_instance_name() in hs.config.worker.writers.typing
)
async def on_PUT(

View file

@ -112,7 +112,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
# and have the batch connected.
if batch_id_from_query:
corresponding_insertion_event_id = (
await self.store.get_insertion_event_by_batch_id(
await self.store.get_insertion_event_id_by_batch_id(
room_id, batch_id_from_query
)
)
@ -131,20 +131,22 @@ class RoomBatchSendEventRestServlet(RestServlet):
prev_event_ids_from_query
)
state_event_ids_at_start = []
# Create and persist all of the state events that float off on their own
# before the batch. These will most likely be all of the invite/member
# state events used to auth the upcoming historical messages.
state_event_ids_at_start = (
await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"],
room_id=room_id,
initial_auth_event_ids=auth_event_ids,
app_service_requester=requester,
if body["state_events_at_start"]:
state_event_ids_at_start = (
await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"],
room_id=room_id,
initial_auth_event_ids=auth_event_ids,
app_service_requester=requester,
)
)
)
# Update our ongoing auth event ID list with all of the new state we
# just created
auth_event_ids.extend(state_event_ids_at_start)
# Update our ongoing auth event ID list with all of the new state we
# just created
auth_event_ids.extend(state_event_ids_at_start)
inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids(
prev_event_ids_from_query
@ -191,14 +193,17 @@ class RoomBatchSendEventRestServlet(RestServlet):
depth=inherited_depth,
)
batch_id_to_connect_to = base_insertion_event["content"][
batch_id_to_connect_to = base_insertion_event.content[
EventContentFields.MSC2716_NEXT_BATCH_ID
]
# Also connect the historical event chain to the end of the floating
# state chain, which causes the HS to ask for the state at the start of
# the batch later.
prev_event_ids = [state_event_ids_at_start[-1]]
# the batch later. If there is no state chain to connect to, just make
# the insertion event float itself.
prev_event_ids = []
if len(state_event_ids_at_start):
prev_event_ids = [state_event_ids_at_start[-1]]
# Create and persist all of the historical events as well as insertion
# and batch meta events to make the batch navigable in the DAG.

View file

@ -29,7 +29,7 @@ from typing import (
from synapse.api.constants import Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.events.utils import (
@ -150,7 +150,7 @@ class SyncRestServlet(RestServlet):
request_key = (user, timeout, since, filter_id, full_state, device_id)
if filter_id is None:
filter_collection = DEFAULT_FILTER_COLLECTION
filter_collection = self.filtering.DEFAULT_FILTER_COLLECTION
elif filter_id.startswith("{"):
try:
filter_object = json_decoder.decode(filter_id)
@ -160,7 +160,7 @@ class SyncRestServlet(RestServlet):
except Exception:
raise SynapseError(400, "Invalid filter JSON")
self.filtering.check_valid_filter(filter_object)
filter_collection = FilterCollection(filter_object)
filter_collection = FilterCollection(self.hs, filter_object)
else:
try:
filter_collection = await self.filtering.get_user_filter(

View file

@ -215,6 +215,8 @@ class MediaRepository:
self.mark_recently_accessed(None, media_id)
media_type = media_info["media_type"]
if not media_type:
media_type = "application/octet-stream"
media_length = media_info["media_length"]
upload_name = name if name else media_info["upload_name"]
url_cache = media_info["url_cache"]
@ -333,6 +335,9 @@ class MediaRepository:
logger.info("Media is quarantined")
raise NotFoundError()
if not media_info["media_type"]:
media_info["media_type"] = "application/octet-stream"
responder = await self.media_storage.fetch_media(file_info)
if responder:
return responder, media_info
@ -354,6 +359,8 @@ class MediaRepository:
raise e
file_id = media_info["filesystem_id"]
if not media_info["media_type"]:
media_info["media_type"] = "application/octet-stream"
file_info = FileInfo(server_name, file_id)
# We generate thumbnails even if another process downloaded the media
@ -445,7 +452,10 @@ class MediaRepository:
await finish()
media_type = headers[b"Content-Type"][0].decode("ascii")
if b"Content-Type" in headers:
media_type = headers[b"Content-Type"][0].decode("ascii")
else:
media_type = "application/octet-stream"
upload_name = get_filename_from_headers(headers)
time_now_ms = self.clock.time_msec()

View file

@ -718,9 +718,12 @@ def decode_body(
if not body:
return None
# The idea here is that multiple encodings are tried until one works.
# Unfortunately the result is never used and then LXML will decode the string
# again with the found encoding.
for encoding in get_html_media_encodings(body, content_type):
try:
body_str = body.decode(encoding)
body.decode(encoding)
except Exception:
pass
else:
@ -732,11 +735,11 @@ def decode_body(
from lxml import etree
# Create an HTML parser.
parser = etree.HTMLParser(recover=True, encoding="utf-8")
parser = etree.HTMLParser(recover=True, encoding=encoding)
# Attempt to parse the body. Returns None if the body was successfully
# parsed, but no tree was found.
return etree.fromstring(body_str, parser)
return etree.fromstring(body, parser)
def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]:

Some files were not shown because too many files have changed in this diff Show more