Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2020-02-18 15:50:06 +00:00
commit bc936b5657
121 changed files with 1980 additions and 1220 deletions

View file

@ -39,3 +39,5 @@ Server correctly handles incoming m.device_list_update
# this fails reliably with a torture level of 100 due to https://github.com/matrix-org/synapse/issues/6536
Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state
Can get rooms/{roomId}/members at a given point

View file

@ -1,11 +1,35 @@
Synapse 1.10.1 (2020-02-17)
===========================
Bugfixes
--------
- Fix a bug introduced in Synapse 1.10.0 which would cause room state to be cleared in the database if Synapse was upgraded direct from 1.2.1 or earlier to 1.10.0. ([\#6924](https://github.com/matrix-org/synapse/issues/6924))
Synapse 1.10.0 (2020-02-12)
===========================
**WARNING to client developers**: As of this release Synapse validates `client_secret` parameters in the Client-Server API as per the spec. See [\#6766](https://github.com/matrix-org/synapse/issues/6766) for details.
Updates to the Docker image
---------------------------
- Update the docker images to Alpine Linux 3.11. ([\#6897](https://github.com/matrix-org/synapse/issues/6897))
Synapse 1.10.0rc5 (2020-02-11)
==============================
Bugfixes
--------
- Fix the filtering introduced in 1.10.0rc3 to also apply to the state blocks returned by `/sync`. ([\#6884](https://github.com/matrix-org/synapse/issues/6884))
Synapse 1.10.0rc4 (2020-02-11)
==============================
Features
--------
- Filter out m.room.aliases from /sync state blocks until a full fix lands. ([\#6884](https://github.com/matrix-org/synapse/issues/6884))
This release candidate was built incorrectly and is superceded by 1.10.0rc5.
Synapse 1.10.0rc3 (2020-02-10)
==============================
@ -13,7 +37,7 @@ Synapse 1.10.0rc3 (2020-02-10)
Features
--------
- Filter out m.room.aliases from the CS API to mitigate abuse while a better solution is specced. ([\#6878](https://github.com/matrix-org/synapse/issues/6878))
- Filter out `m.room.aliases` from the CS API to mitigate abuse while a better solution is specced. ([\#6878](https://github.com/matrix-org/synapse/issues/6878))
Internal Changes
@ -41,9 +65,6 @@ Internal Changes
Synapse 1.10.0rc1 (2020-01-31)
==============================
**WARNING to client developers**: As of this release Synapse validates `client_secret` parameters in the Client-Server API as per the spec. See [\#6766](https://github.com/matrix-org/synapse/issues/6766) for details.
Features
--------

View file

@ -200,6 +200,20 @@ Git allows you to add this signoff automatically when using the `-s`
flag to `git commit`, which uses the name and email set in your
`user.name` and `user.email` git configs.
## Merge Strategy
We use the commit history of develop/master extensively to identify
when regressions were introduced and what changes have been made.
We aim to have a clean merge history, which means we normally squash-merge
changes into develop. For small changes this means there is no need to rebase
to clean up your PR before merging. Larger changes with an organised set of
commits may be merged as-is, if the history is judged to be useful.
This use of squash-merging will mean PRs built on each other will be hard to
merge. We suggest avoiding these where possible, and if required, ensuring
each PR has a tidy set of commits to ease merging.
## Conclusion
That's it! Matrix is a very open and collaborative project as you might expect

View file

@ -388,15 +388,17 @@ Once you have installed synapse as above, you will need to configure it.
## TLS certificates
The default configuration exposes a single HTTP port: http://localhost:8008. It
is suitable for local testing, but for any practical use, you will either need
to enable a reverse proxy, or configure Synapse to expose an HTTPS port.
The default configuration exposes a single HTTP port on the local
interface: `http://localhost:8008`. It is suitable for local testing,
but for any practical use, you will need Synapse's APIs to be served
over HTTPS.
For information on using a reverse proxy, see
The recommended way to do so is to set up a reverse proxy on port
`8448`. You can find documentation on doing so in
[docs/reverse_proxy.md](docs/reverse_proxy.md).
To configure Synapse to expose an HTTPS port, you will need to edit
`homeserver.yaml`, as follows:
Alternatively, you can configure Synapse to expose an HTTPS port. To do
so, you will need to edit `homeserver.yaml`, as follows:
* First, under the `listeners` section, uncomment the configuration for the
TLS-enabled listener. (Remove the hash sign (`#`) at the start of
@ -414,11 +416,15 @@ To configure Synapse to expose an HTTPS port, you will need to edit
point these settings at an existing certificate and key, or you can
enable Synapse's built-in ACME (Let's Encrypt) support. Instructions
for having Synapse automatically provision and renew federation
certificates through ACME can be found at [ACME.md](docs/ACME.md). If you
are using your own certificate, be sure to use a `.pem` file that includes
the full certificate chain including any intermediate certificates (for
instance, if using certbot, use `fullchain.pem` as your certificate, not
`cert.pem`).
certificates through ACME can be found at [ACME.md](docs/ACME.md).
Note that, as pointed out in that document, this feature will not
work with installs set up after November 2020.
If you are using your
own certificate, be sure to use a `.pem` file that includes the full
certificate chain including any intermediate certificates (for
instance, if using certbot, use `fullchain.pem` as your certificate,
not `cert.pem`).
For a more detailed guide to configuring your server for federation, see
[federate.md](docs/federate.md)

View file

@ -272,7 +272,7 @@ to install using pip and a virtualenv::
virtualenv -p python3 env
source env/bin/activate
python -m pip install --no-use-pep517 -e .[all]
python -m pip install --no-use-pep517 -e ".[all]"
This will run a process of downloading and installing all the needed
dependencies into a virtual env.

1
changelog.d/6769.feature Normal file
View file

@ -0,0 +1 @@
Admin API to add or modify threepids of user accounts.

1
changelog.d/6781.bugfix Normal file
View file

@ -0,0 +1 @@
Fixed third party event rules function `on_create_room`'s return value being ignored.

1
changelog.d/6821.misc Normal file
View file

@ -0,0 +1 @@
Add type hints to `SyncHandler`.

1
changelog.d/6823.misc Normal file
View file

@ -0,0 +1 @@
Refactoring work in preparation for changing the event redaction algorithm.

1
changelog.d/6825.bugfix Normal file
View file

@ -0,0 +1 @@
Allow URL-encoded User IDs on `/_synapse/admin/v2/users/<user_id>[/admin]` endpoints. Thanks to @NHAS for reporting.

1
changelog.d/6827.misc Normal file
View file

@ -0,0 +1 @@
Refactoring work in preparation for changing the event redaction algorithm.

1
changelog.d/6833.misc Normal file
View file

@ -0,0 +1 @@
Reducing log level to DEBUG for synapse.storage.TIME.

1
changelog.d/6834.misc Normal file
View file

@ -0,0 +1 @@
Change the default power levels of invites, tombstones and server ACLs for new rooms.

1
changelog.d/6836.misc Normal file
View file

@ -0,0 +1 @@
Fix stacktraces when using `ObservableDeferred` and async/await.

1
changelog.d/6837.misc Normal file
View file

@ -0,0 +1 @@
Port much of `synapse.handlers.federation` to async/await.

1
changelog.d/6840.misc Normal file
View file

@ -0,0 +1 @@
Port much of `synapse.handlers.federation` to async/await.

1
changelog.d/6844.bugfix Normal file
View file

@ -0,0 +1 @@
Fix an issue with cross-signing where device signatures were not sent to remote servers.

1
changelog.d/6846.doc Normal file
View file

@ -0,0 +1 @@
Add details of PR merge strategy to contributing docs.

1
changelog.d/6847.misc Normal file
View file

@ -0,0 +1 @@
Populate `rooms.room_version` database column at startup, rather than in a background update.

1
changelog.d/6849.bugfix Normal file
View file

@ -0,0 +1 @@
Fix Synapse refusing to start if `federation_certificate_verification_whitelist` option is blank.

1
changelog.d/6854.misc Normal file
View file

@ -0,0 +1 @@
Refactoring work in preparation for changing the event redaction algorithm.

1
changelog.d/6855.misc Normal file
View file

@ -0,0 +1 @@
Update pip install directiosn in readme to avoid error when using zsh.

1
changelog.d/6856.misc Normal file
View file

@ -0,0 +1 @@
Refactoring work in preparation for changing the event redaction algorithm.

1
changelog.d/6857.misc Normal file
View file

@ -0,0 +1 @@
Refactoring work in preparation for changing the event redaction algorithm.

1
changelog.d/6858.misc Normal file
View file

@ -0,0 +1 @@
Refactoring work in preparation for changing the event redaction algorithm.

1
changelog.d/6862.misc Normal file
View file

@ -0,0 +1 @@
Reduce amount we log at `INFO` level.

1
changelog.d/6864.misc Normal file
View file

@ -0,0 +1 @@
Limit the number of events that can be requested by the backfill federation API to 100.

1
changelog.d/6869.misc Normal file
View file

@ -0,0 +1 @@
Remove unused `get_room_stats_state` method.

1
changelog.d/6871.misc Normal file
View file

@ -0,0 +1 @@
Add typing to `synapse.federation.sender` and port to async/await.

1
changelog.d/6877.removal Normal file
View file

@ -0,0 +1 @@
Remove `m.lazy_load_members` from `unstable_features` since lazy loading is in the stable Client-Server API version r0.5.0.

1
changelog.d/6882.misc Normal file
View file

@ -0,0 +1 @@
Reject device display names over 100 characters in length.

1
changelog.d/6883.misc Normal file
View file

@ -0,0 +1 @@
Add an additional entry to the SyTest blacklist for worker mode.

1
changelog.d/6887.misc Normal file
View file

@ -0,0 +1 @@
Fix the use of sed in the linting scripts when using BSD sed.

1
changelog.d/6888.feature Normal file
View file

@ -0,0 +1 @@
The result of a user directory search can now be filtered via the spam checker.

1
changelog.d/6891.doc Normal file
View file

@ -0,0 +1 @@
Spell out that the last event sent to a room won't be deleted by a purge.

1
changelog.d/6901.misc Normal file
View file

@ -0,0 +1 @@
Return a 404 instead of 200 for querying information of a non-existant user through the admin API.

1
changelog.d/6904.removal Normal file
View file

@ -0,0 +1 @@
Stop sending alias events during adding / removing aliases. Check alt_aliases in the latest canonical aliases event when deleting an alias.

1
changelog.d/6905.doc Normal file
View file

@ -0,0 +1 @@
Update Synapse's documentation to warn about the deprecation of ACME v1.

1
changelog.d/6906.doc Normal file
View file

@ -0,0 +1 @@
Add documentation for the spam checker.

1
changelog.d/6909.doc Normal file
View file

@ -0,0 +1 @@
Update Synapse's documentation to warn about the deprecation of ACME v1.

1
changelog.d/6915.misc Normal file
View file

@ -0,0 +1 @@
Add type hints to the spam checker module.

1
changelog.d/6918.docker Normal file
View file

@ -0,0 +1 @@
The deprecated "generate-config-on-the-fly" mode is no longer supported.

1
changelog.d/6919.misc Normal file
View file

@ -0,0 +1 @@
Convert the directory handler tests to use HomeserverTestCase.

1
changelog.d/6920.misc Normal file
View file

@ -0,0 +1 @@
Add a warning about indentation to generated configuration files.

1
changelog.d/6921.docker Normal file
View file

@ -0,0 +1 @@
Databases created using the compose file in contrib/docker will now always have correct encoding and locale settings. Contributed by Fridtjof Mund.

1
changelog.d/6937.misc Normal file
View file

@ -0,0 +1 @@
Increase perf of `get_auth_chain_ids` used in state res v2.

1
changelog.d/6938.doc Normal file
View file

@ -0,0 +1 @@
Fix worker docs to point `/publicised_groups` API correctly.

View file

@ -56,6 +56,9 @@ services:
environment:
- POSTGRES_USER=synapse
- POSTGRES_PASSWORD=changeme
# ensure the database gets created correctly
# https://github.com/matrix-org/synapse/blob/master/docs/postgres.md#set-up-database
- POSTGRES_INITDB_ARGS="--encoding=UTF-8 --lc-collate=C --lc-ctype=C"
volumes:
# You may store the database tables in a local folder..
- ./schemas:/var/lib/postgresql/data

12
debian/changelog vendored
View file

@ -1,3 +1,15 @@
matrix-synapse-py3 (1.10.1) stable; urgency=medium
* New synapse release 1.10.1.
-- Synapse Packaging team <packages@matrix.org> Mon, 17 Feb 2020 16:27:28 +0000
matrix-synapse-py3 (1.10.0) stable; urgency=medium
* New synapse release 1.10.0.
-- Synapse Packaging team <packages@matrix.org> Wed, 12 Feb 2020 12:18:54 +0000
matrix-synapse-py3 (1.9.1) stable; urgency=medium
* New synapse release 1.9.1.

View file

@ -16,7 +16,7 @@ ARG PYTHON_VERSION=3.7
###
### Stage 0: builder
###
FROM docker.io/python:${PYTHON_VERSION}-alpine3.10 as builder
FROM docker.io/python:${PYTHON_VERSION}-alpine3.11 as builder
# install the OS build deps

View file

@ -110,12 +110,12 @@ argument to `docker run`.
## Legacy dynamic configuration file support
For backwards-compatibility only, the docker image supports creating a dynamic
configuration file based on environment variables. This is now deprecated, but
is enabled when the `SYNAPSE_SERVER_NAME` variable is set (and `generate` is
not given).
The docker image used to support creating a dynamic configuration file based
on environment variables. This is no longer supported, and an error will be
raised if you try to run synapse without a config file.
To migrate from a dynamic configuration file to a static one, run the docker
It is, however, possible to generate a static configuration file based on
the environment variables that were previously used. To do this, run the docker
container once with the environment variables set, and `migrate_config`
command line option. For example:
@ -127,15 +127,20 @@ docker run -it --rm \
matrixdotorg/synapse:latest migrate_config
```
This will generate the same configuration file as the legacy mode used, but
will store it in `/data/homeserver.yaml` instead of a temporary location. You
can then use it as shown above at [Running synapse](#running-synapse).
This will generate the same configuration file as the legacy mode used, and
will store it in `/data/homeserver.yaml`. You can then use it as shown above at
[Running synapse](#running-synapse).
Note that the defaults used in this configuration file may be different to
those when generating a new config file with `generate`: for example, TLS is
enabled by default in this mode. You are encouraged to inspect the generated
configuration file and edit it to ensure it meets your needs.
## Building the image
If you need to build the image from a Synapse checkout, use the following `docker
build` command from the repo's root:
```
docker build -t matrixdotorg/synapse -f docker/Dockerfile .
```

View file

@ -188,11 +188,6 @@ def main(args, environ):
else:
ownership = "{}:{}".format(desired_uid, desired_gid)
log(
"Container running as UserID %s:%s, ENV (or defaults) requests %s:%s"
% (os.getuid(), os.getgid(), desired_uid, desired_gid)
)
if ownership is None:
log("Will not perform chmod/su-exec as UserID already matches request")
@ -213,38 +208,30 @@ def main(args, environ):
if mode is not None:
error("Unknown execution mode '%s'" % (mode,))
if "SYNAPSE_SERVER_NAME" in environ:
# backwards-compatibility generate-a-config-on-the-fly mode
if "SYNAPSE_CONFIG_PATH" in environ:
error(
"SYNAPSE_SERVER_NAME can only be combined with SYNAPSE_CONFIG_PATH "
"in `generate` or `migrate_config` mode. To start synapse using a "
"config file, unset the SYNAPSE_SERVER_NAME environment variable."
)
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
config_path = "/compiled/homeserver.yaml"
log(
"Generating config file '%s' on-the-fly from environment variables.\n"
"Note that this mode is deprecated. You can migrate to a static config\n"
"file by running with 'migrate_config'. See the README for more details."
% (config_path,)
)
generate_config_from_template("/compiled", config_path, environ, ownership)
else:
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get(
"SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml"
)
if not os.path.exists(config_path):
if not os.path.exists(config_path):
if "SYNAPSE_SERVER_NAME" in environ:
error(
"Config file '%s' does not exist. You should either create a new "
"config file by running with the `generate` argument (and then edit "
"the resulting file before restarting) or specify the path to an "
"existing config file with the SYNAPSE_CONFIG_PATH variable."
"""\
Config file '%s' does not exist.
The synapse docker image no longer supports generating a config file on-the-fly
based on environment variables. You can migrate to a static config file by
running with 'migrate_config'. See the README for more details.
"""
% (config_path,)
)
error(
"Config file '%s' does not exist. You should either create a new "
"config file by running with the `generate` argument (and then edit "
"the resulting file before restarting) or specify the path to an "
"existing config file with the SYNAPSE_CONFIG_PATH variable."
% (config_path,)
)
log("Starting synapse with config file " + config_path)
args = ["python", "-m", synapse_worker, "--config-path", config_path]

View file

@ -1,4 +1,4 @@
# The config is maintained as an up-to-date snapshot of the default
# This file is maintained as an up-to-date snapshot of the default
# homeserver.yaml configuration generated by Synapse.
#
# It is intended to act as a reference for the default configuration,
@ -10,3 +10,5 @@
# homeserver.yaml. Instead, if you are starting from scratch, please generate
# a fresh config using Synapse by following the instructions in INSTALL.md.
################################################################################

View file

@ -1,12 +1,48 @@
# ACME
Synapse v1.0 will require valid TLS certificates for communication between
servers (port `8448` by default) in addition to those that are client-facing
(port `443`). If you do not already have a valid certificate for your domain,
the easiest way to get one is with Synapse's new ACME support, which will use
the ACME protocol to provision a certificate automatically. Synapse v0.99.0+
will provision server-to-server certificates automatically for you for free
through [Let's Encrypt](https://letsencrypt.org/) if you tell it to.
From version 1.0 (June 2019) onwards, Synapse requires valid TLS
certificates for communication between servers (by default on port
`8448`) in addition to those that are client-facing (port `443`). To
help homeserver admins fulfil this new requirement, Synapse v0.99.0
introduced support for automatically provisioning certificates through
[Let's Encrypt](https://letsencrypt.org/) using the ACME protocol.
## Deprecation of ACME v1
In [March 2019](https://community.letsencrypt.org/t/end-of-life-plan-for-acmev1/88430),
Let's Encrypt announced that they were deprecating version 1 of the ACME
protocol, with the plan to disable the use of it for new accounts in
November 2019, and for existing accounts in June 2020.
Synapse doesn't currently support version 2 of the ACME protocol, which
means that:
* for existing installs, Synapse's built-in ACME support will continue
to work until June 2020.
* for new installs, this feature will not work at all.
Either way, it is recommended to move from Synapse's ACME support
feature to an external automated tool such as [certbot](https://github.com/certbot/certbot)
(or browse [this list](https://letsencrypt.org/fr/docs/client-options/)
for an alternative ACME client).
It's also recommended to use a reverse proxy for the server-facing
communications (more documentation about this can be found
[here](/docs/reverse_proxy.md)) as well as the client-facing ones and
have it serve the certificates.
In case you can't do that and need Synapse to serve them itself, make
sure to set the `tls_certificate_path` configuration setting to the path
of the certificate (make sure to use the certificate containing the full
certification chain, e.g. `fullchain.pem` if using certbot) and
`tls_private_key_path` to the path of the matching private key. Note
that in this case you will need to restart Synapse after each
certificate renewal so that Synapse stops using the old certificate.
If you still want to use Synapse's built-in ACME support, the rest of
this document explains how to set it up.
## Initial setup
In the case that your `server_name` config variable is the same as
the hostname that the client connects to, then the same certificate can be
@ -32,11 +68,6 @@ If you already have certificates, you will need to back up or delete them
(files `example.com.tls.crt` and `example.com.tls.key` in Synapse's root
directory), Synapse's ACME implementation will not overwrite them.
You may wish to use alternate methods such as Certbot to obtain a certificate
from Let's Encrypt, depending on your server configuration. Of course, if you
already have a valid certificate for your homeserver's domain, that can be
placed in Synapse's config directory without the need for any ACME setup.
## ACME setup
The main steps for enabling ACME support in short summary are:

View file

@ -8,6 +8,9 @@ Depending on the amount of history being purged a call to the API may take
several minutes or longer. During this period users will not be able to
paginate further back in the room from the point being purged from.
Note that Synapse requires at least one message in each room, so it will never
delete the last message in a room.
The API is:
``POST /_synapse/admin/v1/purge_history/<room_id>[/<event_id>]``

View file

@ -2,7 +2,8 @@ Create or modify Account
========================
This API allows an administrator to create or modify a user account with a
specific ``user_id``.
specific ``user_id``. Be aware that ``user_id`` is fully qualified: for example,
``@user:server.com``.
This api is::
@ -15,6 +16,16 @@ with a body of:
{
"password": "user_password",
"displayname": "User",
"threepids": [
{
"medium": "email",
"address": "<user_mail_1>"
},
{
"medium": "email",
"address": "<user_mail_2>"
}
],
"avatar_url": "<avatar_url>",
"admin": false,
"deactivated": false
@ -23,6 +34,7 @@ with a body of:
including an ``access_token`` of a server admin.
The parameter ``displayname`` is optional and defaults to ``user_id``.
The parameter ``threepids`` is optional.
The parameter ``avatar_url`` is optional.
The parameter ``admin`` is optional and defaults to 'false'.
The parameter ``deactivated`` is optional and defaults to 'false'.

View file

@ -42,6 +42,10 @@ purged according to its room's policy, then the receiving server will
process and store that event until it's picked up by the next purge job,
though it will always hide it from clients.
Synapse requires at least one message in each room, so it will never
delete the last message in a room. It will, however, hide it from
clients.
## Server configuration

View file

@ -1,4 +1,4 @@
# The config is maintained as an up-to-date snapshot of the default
# This file is maintained as an up-to-date snapshot of the default
# homeserver.yaml configuration generated by Synapse.
#
# It is intended to act as a reference for the default configuration,
@ -10,6 +10,16 @@
# homeserver.yaml. Instead, if you are starting from scratch, please generate
# a fresh config using Synapse by following the instructions in INSTALL.md.
################################################################################
# Configuration file for Synapse.
#
# This is a YAML file: see [1] for a quick introduction. Note in particular
# that *indentation is important*: all the elements of a list or dictionary
# should have the same indentation.
#
# [1] https://docs.ansible.com/ansible/latest/reference_appendices/YAMLSyntax.html
## Server ##
# The domain name of the server, with optional explicit port.

88
docs/spam_checker.md Normal file
View file

@ -0,0 +1,88 @@
# Handling spam in Synapse
Synapse has support to customize spam checking behavior. It can plug into a
variety of events and affect how they are presented to users on your homeserver.
The spam checking behavior is implemented as a Python class, which must be
able to be imported by the running Synapse.
## Python spam checker class
The Python class is instantiated with two objects:
* Any configuration (see below).
* An instance of `synapse.spam_checker_api.SpamCheckerApi`.
It then implements methods which return a boolean to alter behavior in Synapse.
There's a generic method for checking every event (`check_event_for_spam`), as
well as some specific methods:
* `user_may_invite`
* `user_may_create_room`
* `user_may_create_room_alias`
* `user_may_publish_room`
The details of the each of these methods (as well as their inputs and outputs)
are documented in the `synapse.events.spamcheck.SpamChecker` class.
The `SpamCheckerApi` class provides a way for the custom spam checker class to
call back into the homeserver internals. It currently implements the following
methods:
* `get_state_events_in_room`
### Example
```python
class ExampleSpamChecker:
def __init__(self, config, api):
self.config = config
self.api = api
def check_event_for_spam(self, foo):
return False # allow all events
def user_may_invite(self, inviter_userid, invitee_userid, room_id):
return True # allow all invites
def user_may_create_room(self, userid):
return True # allow all room creations
def user_may_create_room_alias(self, userid, room_alias):
return True # allow all room aliases
def user_may_publish_room(self, userid, room_id):
return True # allow publishing of all rooms
def check_username_for_spam(self, user_profile):
return False # allow all usernames
```
## Configuration
Modify the `spam_checker` section of your `homeserver.yaml` in the following
manner:
`module` should point to the fully qualified Python class that implements your
custom logic, e.g. `my_module.ExampleSpamChecker`.
`config` is a dictionary that gets passed to the spam checker class.
### Example
This section might look like:
```yaml
spam_checker:
module: my_module.ExampleSpamChecker
config:
# Enable or disable a specific option in ExampleSpamChecker.
my_custom_option: true
```
## Examples
The [Mjolnir](https://github.com/matrix-org/mjolnir) project is a full fledged
example using the Synapse spam checking API, including a bot for dynamic
configuration.

View file

@ -261,7 +261,8 @@ following regular expressions:
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
^/_matrix/client/(api/v1|r0|unstable)/get_groups_publicised$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/
Additionally, the following REST endpoints can be handled for GET requests:
@ -287,8 +288,8 @@ the following regular expressions:
^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$
When using this worker you must also set `update_user_directory: False` in the
shared configuration file to stop the main synapse running background
When using this worker you must also set `update_user_directory: False` in the
shared configuration file to stop the main synapse running background
jobs related to updating the user directory.
### `synapse.app.frontend_proxy`

View file

@ -3,7 +3,8 @@
# Exits with 0 if there are no problems, or another code otherwise.
# Fix non-lowercase true/false values
sed -i -E "s/: +True/: true/g; s/: +False/: false/g;" docs/sample_config.yaml
sed -i.bak -E "s/: +True/: true/g; s/: +False/: false/g;" docs/sample_config.yaml
rm docs/sample_config.yaml.bak
# Check if anything changed
git diff --exit-code docs/sample_config.yaml

View file

@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.10.0rc4"
__version__ = "1.10.1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View file

@ -53,6 +53,18 @@ Missing mandatory `server_name` config option.
"""
CONFIG_FILE_HEADER = """\
# Configuration file for Synapse.
#
# This is a YAML file: see [1] for a quick introduction. Note in particular
# that *indentation is important*: all the elements of a list or dictionary
# should have the same indentation.
#
# [1] https://docs.ansible.com/ansible/latest/reference_appendices/YAMLSyntax.html
"""
def path_exists(file_path):
"""Check if a file exists
@ -344,7 +356,7 @@ class RootConfig(object):
str: the yaml config file
"""
return "\n\n".join(
return CONFIG_FILE_HEADER + "\n\n".join(
dedent(conf)
for conf in self.invoke_all(
"generate_config_section",
@ -574,8 +586,8 @@ class RootConfig(object):
if not path_exists(config_dir_path):
os.makedirs(config_dir_path)
with open(config_path, "w") as config_file:
config_file.write("# vim:ft=yaml\n\n")
config_file.write(config_str)
config_file.write("\n\n# vim:ft=yaml")
config_dict = yaml.safe_load(config_str)
obj.generate_missing_files(config_dict, config_dir_path)

View file

@ -109,6 +109,8 @@ class TlsConfig(Config):
fed_whitelist_entries = config.get(
"federation_certificate_verification_whitelist", []
)
if fed_whitelist_entries is None:
fed_whitelist_entries = []
# Support globs (*) in whitelist values
self.federation_certificate_verification_whitelist = [] # type: List[str]

View file

@ -16,13 +16,13 @@
import os
from distutils.util import strtobool
from typing import Optional, Type
import six
from unpaddedbase64 import encode_base64
from synapse.api.errors import UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@ -189,8 +189,14 @@ class EventBase(object):
redacts = _event_dict_property("redacts", None)
room_id = _event_dict_property("room_id")
sender = _event_dict_property("sender")
state_key = _event_dict_property("state_key")
type = _event_dict_property("type")
user_id = _event_dict_property("sender")
@property
def event_id(self) -> str:
raise NotImplementedError()
@property
def membership(self):
return self.content["membership"]
@ -281,10 +287,7 @@ class FrozenEvent(EventBase):
else:
frozen_dict = event_dict
self.event_id = event_dict["event_id"]
self.type = event_dict["type"]
if "state_key" in event_dict:
self.state_key = event_dict["state_key"]
self._event_id = event_dict["event_id"]
super(FrozenEvent, self).__init__(
frozen_dict,
@ -294,6 +297,10 @@ class FrozenEvent(EventBase):
rejected_reason=rejected_reason,
)
@property
def event_id(self) -> str:
return self._event_id
def __str__(self):
return self.__repr__()
@ -332,9 +339,6 @@ class FrozenEventV2(EventBase):
frozen_dict = event_dict
self._event_id = None
self.type = event_dict["type"]
if "state_key" in event_dict:
self.state_key = event_dict["state_key"]
super(FrozenEventV2, self).__init__(
frozen_dict,
@ -404,28 +408,7 @@ class FrozenEventV3(FrozenEventV2):
return self._event_id
def room_version_to_event_format(room_version):
"""Converts a room version string to the event format
Args:
room_version (str)
Returns:
int
Raises:
UnsupportedRoomVersionError if the room version is unknown
"""
v = KNOWN_ROOM_VERSIONS.get(room_version)
if not v:
# this can happen if support is withdrawn for a room version
raise UnsupportedRoomVersionError()
return v.event_format
def event_type_from_format_version(format_version):
def event_type_from_format_version(format_version: int) -> Type[EventBase]:
"""Returns the python type to use to construct an Event object for the
given event format version.
@ -445,3 +428,14 @@ def event_type_from_format_version(format_version):
return FrozenEventV3
else:
raise Exception("No event format %r" % (format_version,))
def make_event_from_dict(
event_dict: JsonDict,
room_version: RoomVersion = RoomVersions.V1,
internal_metadata_dict: JsonDict = {},
rejected_reason: Optional[str] = None,
) -> EventBase:
"""Construct an EventBase from the given event dict"""
event_type = event_type_from_format_version(room_version.event_format)
return event_type(event_dict, internal_metadata_dict, rejected_reason)

View file

@ -28,11 +28,7 @@ from synapse.api.room_versions import (
RoomVersion,
)
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import (
EventBase,
_EventInternalMetadata,
event_type_from_format_version,
)
from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
from synapse.types import EventID, JsonDict
from synapse.util import Clock
from synapse.util.stringutils import random_string
@ -256,8 +252,8 @@ def create_local_event_from_event_dict(
event_dict.setdefault("signatures", {})
add_hashes_and_signatures(room_version, event_dict, hostname, signing_key)
return event_type_from_format_version(format_version)(
event_dict, internal_metadata_dict=internal_metadata_dict
return make_event_from_dict(
event_dict, room_version, internal_metadata_dict=internal_metadata_dict
)

View file

@ -15,12 +15,17 @@
# limitations under the License.
import inspect
from typing import Dict
from synapse.spam_checker_api import SpamCheckerApi
MYPY = False
if MYPY:
import synapse.server
class SpamChecker(object):
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self.spam_checker = None
module = None
@ -40,7 +45,7 @@ class SpamChecker(object):
else:
self.spam_checker = module(config=config)
def check_event_for_spam(self, event):
def check_event_for_spam(self, event: "synapse.events.EventBase") -> bool:
"""Checks if a given event is considered "spammy" by this server.
If the server considers an event spammy, then it will be rejected if
@ -48,26 +53,30 @@ class SpamChecker(object):
users receive a blank event.
Args:
event (synapse.events.EventBase): the event to be checked
event: the event to be checked
Returns:
bool: True if the event is spammy.
True if the event is spammy.
"""
if self.spam_checker is None:
return False
return self.spam_checker.check_event_for_spam(event)
def user_may_invite(self, inviter_userid, invitee_userid, room_id):
def user_may_invite(
self, inviter_userid: str, invitee_userid: str, room_id: str
) -> bool:
"""Checks if a given user may send an invite
If this method returns false, the invite will be rejected.
Args:
userid (string): The sender's user ID
inviter_userid: The user ID of the sender of the invitation
invitee_userid: The user ID targeted in the invitation
room_id: The room ID
Returns:
bool: True if the user may send an invite, otherwise False
True if the user may send an invite, otherwise False
"""
if self.spam_checker is None:
return True
@ -76,52 +85,78 @@ class SpamChecker(object):
inviter_userid, invitee_userid, room_id
)
def user_may_create_room(self, userid):
def user_may_create_room(self, userid: str) -> bool:
"""Checks if a given user may create a room
If this method returns false, the creation request will be rejected.
Args:
userid (string): The sender's user ID
userid: The ID of the user attempting to create a room
Returns:
bool: True if the user may create a room, otherwise False
True if the user may create a room, otherwise False
"""
if self.spam_checker is None:
return True
return self.spam_checker.user_may_create_room(userid)
def user_may_create_room_alias(self, userid, room_alias):
def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool:
"""Checks if a given user may create a room alias
If this method returns false, the association request will be rejected.
Args:
userid (string): The sender's user ID
room_alias (string): The alias to be created
userid: The ID of the user attempting to create a room alias
room_alias: The alias to be created
Returns:
bool: True if the user may create a room alias, otherwise False
True if the user may create a room alias, otherwise False
"""
if self.spam_checker is None:
return True
return self.spam_checker.user_may_create_room_alias(userid, room_alias)
def user_may_publish_room(self, userid, room_id):
def user_may_publish_room(self, userid: str, room_id: str) -> bool:
"""Checks if a given user may publish a room to the directory
If this method returns false, the publish request will be rejected.
Args:
userid (string): The sender's user ID
room_id (string): The ID of the room that would be published
userid: The user ID attempting to publish the room
room_id: The ID of the room that would be published
Returns:
bool: True if the user may publish the room, otherwise False
True if the user may publish the room, otherwise False
"""
if self.spam_checker is None:
return True
return self.spam_checker.user_may_publish_room(userid, room_id)
def check_username_for_spam(self, user_profile: Dict[str, str]) -> bool:
"""Checks if a user ID or display name are considered "spammy" by this server.
If the server considers a username spammy, then it will not be included in
user directory results.
Args:
user_profile: The user information to check, it contains the keys:
* user_id
* display_name
* avatar_url
Returns:
True if the user is spammy.
"""
if self.spam_checker is None:
return False
# For backwards compatibility, if the method does not exist on the spam checker, fallback to not interfering.
checker = getattr(self.spam_checker, "check_username_for_spam", None)
if not checker:
return False
# Make a copy of the user profile object to ensure the spam checker
# cannot modify it.
return checker(user_profile.copy())

View file

@ -74,15 +74,16 @@ class ThirdPartyEventRules(object):
is_requester_admin (bool): If the requester is an admin
Returns:
defer.Deferred
defer.Deferred[bool]: Whether room creation is allowed or denied.
"""
if self.third_party_rules is None:
return
return True
yield self.third_party_rules.on_create_room(
ret = yield self.third_party_rules.on_create_room(
requester, config, is_requester_admin
)
return ret
@defer.inlineCallbacks
def check_threepid_can_be_invited(self, medium, address, room_id):

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -22,9 +23,13 @@ from twisted.internet.defer import DeferredList
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
)
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import event_type_from_format_version
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
@ -33,7 +38,7 @@ from synapse.logging.context import (
make_deferred_yieldable,
preserve_fn,
)
from synapse.types import get_domain_from_id
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
logger = logging.getLogger(__name__)
@ -342,16 +347,15 @@ def _is_invite_via_3pid(event):
)
def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
"""Construct a FrozenEvent from an event json received over federation
def event_from_pdu_json(
pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False
) -> EventBase:
"""Construct an EventBase from an event json received over federation
Args:
pdu_json (object): pdu as received over federation
event_format_version (int): The event format version
outlier (bool): True to mark this event as an outlier
Returns:
FrozenEvent
pdu_json: pdu as received over federation
room_version: The version of the room this event belongs to
outlier: True to mark this event as an outlier
Raises:
SynapseError: if the pdu is missing required fields or is otherwise
@ -370,8 +374,7 @@ def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
event = event_type_from_format_version(event_format_version)(pdu_json)
event = make_event_from_dict(pdu_json, room_version)
event.internal_metadata.outlier = outlier
return event

View file

@ -17,7 +17,18 @@
import copy
import itertools
import logging
from typing import Dict, Iterable
from typing import (
Any,
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
Tuple,
TypeVar,
)
from prometheus_client import Counter
@ -35,12 +46,14 @@ from synapse.api.errors import (
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
RoomVersions,
)
from synapse.events import builder, room_version_to_event_format
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.utils import log_function
from synapse.types import JsonDict
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@ -52,6 +65,8 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t
PDU_RETRY_TIME_MS = 1 * 60 * 1000
T = TypeVar("T")
class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response
@ -170,21 +185,17 @@ class FederationClient(FederationBase):
sent_queries_counter.labels("client_one_time_keys").inc()
return self.transport_layer.claim_client_keys(destination, content, timeout)
@defer.inlineCallbacks
@log_function
def backfill(self, dest, room_id, limit, extremities):
"""Requests some more historic PDUs for the given context from the
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
) -> List[EventBase]:
"""Requests some more historic PDUs for the given room from the
given destination server.
Args:
dest (str): The remote homeserver to ask.
room_id (str): The room_id to backfill.
limit (int): The maximum number of PDUs to return.
extremities (list): List of PDU id and origins of the first pdus
we have seen from the context
Returns:
Deferred: Results in the received PDUs.
limit (int): The maximum number of events to return.
extremities (list): our current backwards extremities, to backfill from
"""
logger.debug("backfill extrem=%s", extremities)
@ -192,34 +203,37 @@ class FederationClient(FederationBase):
if not extremities:
return
transaction_data = yield self.transport_layer.backfill(
transaction_data = await self.transport_layer.backfill(
dest, room_id, extremities, limit
)
logger.debug("backfill transaction_data=%r", transaction_data)
room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)
pdus = [
event_from_pdu_json(p, format_ver, outlier=False)
event_from_pdu_json(p, room_version, outlier=False)
for p in transaction_data["pdus"]
]
# FIXME: We should handle signature failures more gracefully.
pdus[:] = yield make_deferred_yieldable(
pdus[:] = await make_deferred_yieldable(
defer.gatherResults(
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
self._check_sigs_and_hashes(room_version.identifier, pdus),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
return pdus
@defer.inlineCallbacks
@log_function
def get_pdu(
self, destinations, event_id, room_version, outlier=False, timeout=None
):
async def get_pdu(
self,
destinations: Iterable[str],
event_id: str,
room_version: RoomVersion,
outlier: bool = False,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
servers.
@ -227,18 +241,17 @@ class FederationClient(FederationBase):
one succeeds.
Args:
destinations (list): Which homeservers to query
event_id (str): event to fetch
room_version (str): version of the room
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
destinations: Which homeservers to query
event_id: event to fetch
room_version: version of the room
outlier: Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
timeout (int): How long to try (in ms) each destination for before
timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
Returns:
Deferred: Results in the requested PDU, or None if we were unable to find
it.
The requested PDU, or None if we were unable to find it.
"""
# TODO: Rate limit the number of times we try and get the same event.
@ -249,8 +262,6 @@ class FederationClient(FederationBase):
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
format_ver = room_version_to_event_format(room_version)
signed_pdu = None
for destination in destinations:
now = self._clock.time_msec()
@ -259,7 +270,7 @@ class FederationClient(FederationBase):
continue
try:
transaction_data = yield self.transport_layer.get_event(
transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout
)
@ -271,7 +282,7 @@ class FederationClient(FederationBase):
)
pdu_list = [
event_from_pdu_json(p, format_ver, outlier=outlier)
event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
]
@ -279,7 +290,9 @@ class FederationClient(FederationBase):
pdu = pdu_list[0]
# Check signatures are correct.
signed_pdu = yield self._check_sigs_and_hash(room_version, pdu)
signed_pdu = await self._check_sigs_and_hash(
room_version.identifier, pdu
)
break
@ -309,15 +322,16 @@ class FederationClient(FederationBase):
return signed_pdu
@defer.inlineCallbacks
def get_room_state_ids(self, destination: str, room_id: str, event_id: str):
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> Tuple[List[str], List[str]]:
"""Calls the /state_ids endpoint to fetch the state at a particular point
in the room, and the auth events for the given event
Returns:
Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids)
a tuple of (state event_ids, auth event_ids)
"""
result = yield self.transport_layer.get_room_state_ids(
result = await self.transport_layer.get_room_state_ids(
destination, room_id, event_id=event_id
)
@ -331,37 +345,39 @@ class FederationClient(FederationBase):
return state_event_ids, auth_event_ids
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
async def get_event_auth(self, destination, room_id, event_id):
res = await self.transport_layer.get_event_auth(destination, room_id, event_id)
room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)
auth_chain = [
event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"]
event_from_pdu_json(p, room_version, outlier=True)
for p in res["auth_chain"]
]
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True, room_version=room_version
signed_auth = await self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True, room_version=room_version.identifier
)
signed_auth.sort(key=lambda e: e.depth)
return signed_auth
@defer.inlineCallbacks
def _try_destination_list(self, description, destinations, callback):
async def _try_destination_list(
self,
description: str,
destinations: Iterable[str],
callback: Callable[[str], Awaitable[T]],
) -> T:
"""Try an operation on a series of servers, until it succeeds
Args:
description (unicode): description of the operation we're doing, for logging
description: description of the operation we're doing, for logging
destinations (Iterable[unicode]): list of server_names to try
destinations: list of server_names to try
callback (callable): Function to run for each server. Passed a single
argument: the server_name to try. May return a deferred.
callback: Function to run for each server. Passed a single
argument: the server_name to try.
If the callback raises a CodeMessageException with a 300/400 code,
attempts to perform the operation stop immediately and the exception is
@ -372,7 +388,7 @@ class FederationClient(FederationBase):
suppressed if the exception is an InvalidResponseError.
Returns:
The [Deferred] result of callback, if it succeeds
The result of callback, if it succeeds
Raises:
SynapseError if the chosen remote server returns a 300/400 code, or
@ -383,7 +399,7 @@ class FederationClient(FederationBase):
continue
try:
res = yield callback(destination)
res = await callback(destination)
return res
except InvalidResponseError as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
@ -402,12 +418,12 @@ class FederationClient(FederationBase):
)
except Exception:
logger.warning(
"Failed to %s via %s", description, destination, exc_info=1
"Failed to %s via %s", description, destination, exc_info=True
)
raise SynapseError(502, "Failed to %s via any server" % (description,))
def make_membership_event(
async def make_membership_event(
self,
destinations: Iterable[str],
room_id: str,
@ -415,7 +431,7 @@ class FederationClient(FederationBase):
membership: str,
content: dict,
params: Dict[str, str],
):
) -> Tuple[str, EventBase, RoomVersion]:
"""
Creates an m.room.member event, with context, without participating in the room.
@ -436,19 +452,19 @@ class FederationClient(FederationBase):
content: Any additional data to put into the content field of the
event.
params: Query parameters to include in the request.
Return:
Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of
Returns:
`(origin, event, room_version)` where origin is the remote
homeserver which generated the event, and room_version is the
version of the room.
Fails with a `UnsupportedRoomVersionError` if remote responds with
a room version we don't understand.
Raises:
UnsupportedRoomVersionError: if remote responds with
a room version we don't understand.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
SynapseError: if the chosen remote server returns a 300/400 code.
Fails with a ``RuntimeError`` if no servers were reachable.
RuntimeError: if no servers were reachable.
"""
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
@ -457,9 +473,8 @@ class FederationClient(FederationBase):
% (membership, ",".join(valid_memberships))
)
@defer.inlineCallbacks
def send_request(destination):
ret = yield self.transport_layer.make_membership_event(
async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]:
ret = await self.transport_layer.make_membership_event(
destination, room_id, user_id, membership, params
)
@ -492,88 +507,83 @@ class FederationClient(FederationBase):
event_dict=pdu_dict,
)
return (destination, ev, room_version)
return destination, ev, room_version
return self._try_destination_list(
return await self._try_destination_list(
"make_" + membership, destinations, send_request
)
def send_join(self, destinations, pdu, event_format_version):
async def send_join(
self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion
) -> Dict[str, Any]:
"""Sends a join event to one of a list of homeservers.
Doing so will cause the remote server to add the event to the graph,
and send the event out to the rest of the federation.
Args:
destinations (str): Candidate homeservers which are probably
destinations: Candidate homeservers which are probably
participating in the room.
pdu (BaseEvent): event to be sent
event_format_version (int): The event format version
pdu: event to be sent
room_version: the version of the room (according to the server that
did the make_join)
Return:
Deferred: resolves to a dict with members ``origin`` (a string
giving the serer the event was sent to, ``state`` (?) and
Returns:
a dict with members ``origin`` (a string
giving the server the event was sent to, ``state`` (?) and
``auth_chain``.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
Raises:
SynapseError: if the chosen remote server returns a 300/400 code.
Fails with a ``RuntimeError`` if no servers were reachable.
RuntimeError: if no servers were reachable.
"""
def check_authchain_validity(signed_auth_chain):
for e in signed_auth_chain:
if e.type == EventTypes.Create:
create_event = e
break
else:
raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,))
# the room version should be sane.
room_version = create_event.content.get("room_version", "1")
if room_version not in KNOWN_ROOM_VERSIONS:
# This shouldn't be possible, because the remote server should have
# rejected the join attempt during make_join.
raise InvalidResponseError(
"room appears to have unsupported version %s" % (room_version,)
)
@defer.inlineCallbacks
def send_request(destination):
content = yield self._do_send_join(destination, pdu)
async def send_request(destination) -> Dict[str, Any]:
content = await self._do_send_join(destination, pdu)
logger.debug("Got content: %s", content)
state = [
event_from_pdu_json(p, event_format_version, outlier=True)
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("state", [])
]
auth_chain = [
event_from_pdu_json(p, event_format_version, outlier=True)
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("auth_chain", [])
]
pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
room_version = None
create_event = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
room_version = e.content.get(
"room_version", RoomVersions.V1.identifier
)
create_event = e
break
if room_version is None:
if create_event is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
# the room version should be sane.
create_room_version = create_event.content.get(
"room_version", RoomVersions.V1.identifier
)
if create_room_version != room_version.identifier:
# either the server that fulfilled the make_join, or the server that is
# handling the send_join, is lying.
raise InvalidResponseError(
"Unexpected room version %s in create event"
% (create_room_version,)
)
valid_pdus = await self._check_sigs_and_hash_and_fetch(
destination,
list(pdus.values()),
outlier=True,
room_version=room_version,
room_version=room_version.identifier,
)
valid_pdus_map = {p.event_id: p for p in valid_pdus}
@ -597,7 +607,17 @@ class FederationClient(FederationBase):
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
check_authchain_validity(signed_auth)
# double-check that the same create event has ended up in the auth chain
auth_chain_create_events = [
e.event_id
for e in signed_auth
if (e.type, e.state_key) == (EventTypes.Create, "")
]
if auth_chain_create_events != [create_event.event_id]:
raise InvalidResponseError(
"Unexpected create event(s) in auth chain"
% (auth_chain_create_events,)
)
return {
"state": signed_state,
@ -605,14 +625,13 @@ class FederationClient(FederationBase):
"origin": destination,
}
return self._try_destination_list("send_join", destinations, send_request)
return await self._try_destination_list("send_join", destinations, send_request)
@defer.inlineCallbacks
def _do_send_join(self, destination, pdu):
async def _do_send_join(self, destination: str, pdu: EventBase):
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_join_v2(
content = await self.transport_layer.send_join_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@ -634,7 +653,7 @@ class FederationClient(FederationBase):
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
resp = yield self.transport_layer.send_join_v1(
resp = await self.transport_layer.send_join_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@ -645,51 +664,45 @@ class FederationClient(FederationBase):
# content.
return resp[1]
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
room_version = yield self.store.get_room_version_id(room_id)
async def send_invite(
self, destination: str, room_id: str, event_id: str, pdu: EventBase,
) -> EventBase:
room_version = await self.store.get_room_version(room_id)
content = yield self._do_send_invite(destination, pdu, room_version)
content = await self._do_send_invite(destination, pdu, room_version)
pdu_dict = content["event"]
logger.debug("Got response to send_invite: %s", pdu_dict)
room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(pdu_dict, format_ver)
pdu = event_from_pdu_json(pdu_dict, room_version)
# Check signatures are correct.
pdu = yield self._check_sigs_and_hash(room_version, pdu)
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
# FIXME: We should handle signature failures more gracefully.
return pdu
@defer.inlineCallbacks
def _do_send_invite(self, destination, pdu, room_version):
async def _do_send_invite(
self, destination: str, pdu: EventBase, room_version: RoomVersion
) -> JsonDict:
"""Actually sends the invite, first trying v2 API and falling back to
v1 API if necessary.
Args:
destination (str): Target server
pdu (FrozenEvent)
room_version (str)
Returns:
dict: The event as a dict as returned by the remote server
The event as a dict as returned by the remote server
"""
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_invite_v2(
content = await self.transport_layer.send_invite_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content={
"event": pdu.get_pdu_json(time_now),
"room_version": room_version,
"room_version": room_version.identifier,
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
},
)
@ -707,8 +720,7 @@ class FederationClient(FederationBase):
# Otherwise, we assume that the remote server doesn't understand
# the v2 invite API. That's ok provided the room uses old-style event
# IDs.
v = KNOWN_ROOM_VERSIONS.get(room_version)
if v.event_format != EventFormatVersions.V1:
if room_version.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",
@ -722,7 +734,7 @@ class FederationClient(FederationBase):
# Didn't work, try v1 API.
# Note the v1 API returns a tuple of `(200, content)`
_, content = yield self.transport_layer.send_invite_v1(
_, content = await self.transport_layer.send_invite_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@ -730,7 +742,7 @@ class FederationClient(FederationBase):
)
return content
def send_leave(self, destinations, pdu):
async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None:
"""Sends a leave event to one of a list of homeservers.
Doing so will cause the remote server to add the event to the graph,
@ -739,34 +751,29 @@ class FederationClient(FederationBase):
This is mostly useful to reject received invites.
Args:
destinations (str): Candidate homeservers which are probably
destinations: Candidate homeservers which are probably
participating in the room.
pdu (BaseEvent): event to be sent
pdu: event to be sent
Return:
Deferred: resolves to None.
Raises:
SynapseError if the chosen remote server returns a 300/400 code.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
Fails with a ``RuntimeError`` if no servers were reachable.
RuntimeError if no servers were reachable.
"""
@defer.inlineCallbacks
def send_request(destination):
content = yield self._do_send_leave(destination, pdu)
async def send_request(destination: str) -> None:
content = await self._do_send_leave(destination, pdu)
logger.debug("Got content: %s", content)
return None
return self._try_destination_list("send_leave", destinations, send_request)
return await self._try_destination_list(
"send_leave", destinations, send_request
)
@defer.inlineCallbacks
def _do_send_leave(self, destination, pdu):
async def _do_send_leave(self, destination, pdu):
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_leave_v2(
content = await self.transport_layer.send_leave_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@ -788,7 +795,7 @@ class FederationClient(FederationBase):
logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
resp = yield self.transport_layer.send_leave_v1(
resp = await self.transport_layer.send_leave_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
@ -820,34 +827,33 @@ class FederationClient(FederationBase):
third_party_instance_id=third_party_instance_id,
)
@defer.inlineCallbacks
def get_missing_events(
async def get_missing_events(
self,
destination,
room_id,
earliest_events_ids,
latest_events,
limit,
min_depth,
timeout,
):
destination: str,
room_id: str,
earliest_events_ids: Sequence[str],
latest_events: Iterable[EventBase],
limit: int,
min_depth: int,
timeout: int,
) -> List[EventBase]:
"""Tries to fetch events we are missing. This is called when we receive
an event without having received all of its ancestors.
Args:
destination (str)
room_id (str)
earliest_events_ids (list): List of event ids. Effectively the
destination
room_id
earliest_events_ids: List of event ids. Effectively the
events we expected to receive, but haven't. `get_missing_events`
should only return events that didn't happen before these.
latest_events (list): List of events we have received that we don't
latest_events: List of events we have received that we don't
have all previous events for.
limit (int): Maximum number of events to return.
min_depth (int): Minimum depth of events tor return.
timeout (int): Max time to wait in ms
limit: Maximum number of events to return.
min_depth: Minimum depth of events to return.
timeout: Max time to wait in ms
"""
try:
content = yield self.transport_layer.get_missing_events(
content = await self.transport_layer.get_missing_events(
destination=destination,
room_id=room_id,
earliest_events=earliest_events_ids,
@ -857,15 +863,14 @@ class FederationClient(FederationBase):
timeout=timeout,
)
room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)
events = [
event_from_pdu_json(e, format_ver) for e in content.get("events", [])
event_from_pdu_json(e, room_version) for e in content.get("events", [])
]
signed_events = yield self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False, room_version=room_version
signed_events = await self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False, room_version=room_version.identifier
)
except HttpResponseException as e:
if not e.code == 400:

View file

@ -38,7 +38,6 @@ from synapse.api.errors import (
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
@ -54,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import glob_to_regex, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@ -236,24 +235,17 @@ class FederationServer(FederationBase):
continue
try:
room_version = await self.store.get_room_version_id(room_id)
room_version = await self.store.get_room_version(room_id)
except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue
try:
format_ver = room_version_to_event_format(room_version)
except UnsupportedRoomVersionError:
except UnsupportedRoomVersionError as e:
# this can happen if support for a given room version is withdrawn,
# so that we still get events for said room.
logger.info(
"Ignoring PDU for room %s with unknown version %s",
room_id,
room_version,
)
logger.info("Ignoring PDU: %s", e)
continue
event = event_from_pdu_json(p, format_ver)
event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)
pdu_results = {}
@ -304,7 +296,12 @@ class FederationServer(FederationBase):
async def _process_edu(edu_dict):
received_edus_counter.inc()
edu = Edu(**edu_dict)
edu = Edu(
origin=origin,
destination=self.server_name,
edu_type=edu_dict["edu_type"],
content=edu_dict["content"],
)
await self.registry.on_edu(edu.edu_type, origin, edu.content)
await concurrently_execute(
@ -398,20 +395,21 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
async def on_invite_request(self, origin, content, room_version):
if room_version not in KNOWN_ROOM_VERSIONS:
async def on_invite_request(
self, origin: str, content: JsonDict, room_version_id: str
):
room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
if not room_version:
raise SynapseError(
400,
"Homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
pdu = await self._check_sigs_and_hash(room_version, pdu)
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
time_now = self._clock.time_msec()
return {"event": ret_pdu.get_pdu_json(time_now)}
@ -419,16 +417,15 @@ class FederationServer(FederationBase):
async def on_send_join_request(self, origin, content, room_id):
logger.debug("on_send_join_request: content: %s", content)
room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
room_version = await self.store.get_room_version(room_id)
pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
pdu = await self._check_sigs_and_hash(room_version, pdu)
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
@ -450,16 +447,15 @@ class FederationServer(FederationBase):
async def on_send_leave_request(self, origin, content, room_id):
logger.debug("on_send_leave_request: content: %s", content)
room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
room_version = await self.store.get_room_version(room_id)
pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
pdu = await self._check_sigs_and_hash(room_version, pdu)
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
await self.handler.on_send_leave_request(origin, pdu)
return {}
@ -497,15 +493,14 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)
auth_chain = [
event_from_pdu_json(e, format_ver) for e in content["auth_chain"]
event_from_pdu_json(e, room_version) for e in content["auth_chain"]
]
signed_auth = await self._check_sigs_and_hash_and_fetch(
origin, auth_chain, outlier=True, room_version=room_version
origin, auth_chain, outlier=True, room_version=room_version.identifier
)
ret = await self.handler.on_query_auth(
@ -573,7 +568,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
logger.info(
logger.debug(
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
" limit: %d",
earliest_events,
@ -586,11 +581,11 @@ class FederationServer(FederationBase):
)
if len(missing_events) < 5:
logger.info(
logger.debug(
"Returning %d events: %r", len(missing_events), missing_events
)
else:
logger.info("Returning %d events", len(missing_events))
logger.debug("Returning %d events", len(missing_events))
time_now = self._clock.time_msec()

View file

@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Dict, Hashable, Iterable, List, Optional, Set
from six import itervalues
@ -23,6 +24,7 @@ from twisted.internet import defer
import synapse
import synapse.metrics
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
@ -39,6 +41,8 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@ -68,7 +72,7 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs)
# map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: dict[str, PerDestinationQueue]
self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
@ -84,7 +88,7 @@ class FederationSender(object):
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
self.pending_presence = {} # type: Dict[str, UserPresenceState]
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
@ -116,20 +120,17 @@ class FederationSender(object):
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room = (
{}
) # type: dict[str, set[PerDestinationQueue]]
) # type: Dict[str, Set[PerDestinationQueue]]
self._rr_txn_interval_per_room_ms = (
1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)
def _get_per_destination_queue(self, destination):
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
Args:
destination (str): server_name of remote server
Returns:
PerDestinationQueue
destination: server_name of remote server
"""
queue = self._per_destination_queues.get(destination)
if not queue:
@ -137,7 +138,7 @@ class FederationSender(object):
self._per_destination_queues[destination] = queue
return queue
def notify_new_events(self, current_id):
def notify_new_events(self, current_id: int) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
@ -151,8 +152,7 @@ class FederationSender(object):
"process_event_queue_for_federation", self._process_event_queue_loop
)
@defer.inlineCallbacks
def _process_event_queue_loop(self):
async def _process_event_queue_loop(self):
loop_start_time = self.clock.time_msec()
try:
self._is_processing = True
@ -171,8 +171,8 @@ class FederationSender(object):
)
self._transaction_manager.deprioritise_transmission = True
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
)
@ -181,8 +181,7 @@ class FederationSender(object):
if not events and next_token >= self._last_poked_id:
break
@defer.inlineCallbacks
def handle_event(event):
async def handle_event(event: EventBase) -> None:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
@ -199,7 +198,7 @@ class FederationSender(object):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_hosts_in_room_at_events(
destinations = await self.state.get_hosts_in_room_at_events(
event.room_id, event_ids=event.prev_event_ids()
)
except Exception:
@ -221,17 +220,16 @@ class FederationSender(object):
self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
for event in events:
yield handle_event(event)
await handle_event(event)
events_by_room = {}
events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield make_deferred_yieldable(
await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
@ -241,11 +239,11 @@ class FederationSender(object):
)
)
yield self.store.update_federation_out_pos("events", next_token)
await self.store.update_federation_out_pos("events", next_token)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
ts = await self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.labels(
"federation_sender"
@ -272,7 +270,7 @@ class FederationSender(object):
logger.info("Event queue caught up: re-prioritising transmission")
self._transaction_manager.deprioritise_transmission = False
def _send_pdu(self, pdu, destinations):
def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
@ -294,11 +292,11 @@ class FederationSender(object):
self._get_per_destination_queue(destination).send_pdu(pdu, order)
@defer.inlineCallbacks
def send_read_receipt(self, receipt):
def send_read_receipt(self, receipt: ReadReceipt):
"""Send a RR to any other servers in the room
Args:
receipt (synapse.types.ReadReceipt): receipt to be sent
receipt: receipt to be sent
"""
# Some background on the rate-limiting going on here.
@ -361,7 +359,7 @@ class FederationSender(object):
else:
queue.flush_read_receipts_for_room(room_id)
def _schedule_rr_flush_for_room(self, room_id, n_domains):
def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
@ -370,7 +368,7 @@ class FederationSender(object):
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()
def _flush_rrs_for_room(self, room_id):
def _flush_rrs_for_room(self, room_id: str) -> None:
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)
@ -386,14 +384,11 @@ class FederationSender(object):
@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
@ -430,11 +425,10 @@ class FederationSender(object):
finally:
self._processing_pending_presence = False
def send_presence_to_destinations(self, states, destinations):
def send_presence_to_destinations(
self, states: List[UserPresenceState], destinations: List[str]
) -> None:
"""Send the given presence states to the given destinations.
Args:
states (list[UserPresenceState])
destinations (list[str])
"""
@ -449,12 +443,9 @@ class FederationSender(object):
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
Args:
states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
@ -464,14 +455,20 @@ class FederationSender(object):
continue
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(self, destination, edu_type, content, key=None):
def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: dict,
key: Optional[Hashable] = None,
):
"""Construct an Edu object, and queue it for sending
Args:
destination (str): name of server to send to
edu_type (str): type of EDU to send
content (dict): content of EDU
key (Any|None): clobbering key for this edu
destination: name of server to send to
edu_type: type of EDU to send
content: content of EDU
key: clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
@ -486,12 +483,12 @@ class FederationSender(object):
self.send_edu(edu, key)
def send_edu(self, edu, key):
def send_edu(self, edu: Edu, key: Optional[Hashable]):
"""Queue an EDU for sending
Args:
edu (Edu): edu to send
key (Any|None): clobbering key for this edu
edu: edu to send
key: clobbering key for this edu
"""
queue = self._get_per_destination_queue(edu.destination)
if key:
@ -499,7 +496,7 @@ class FederationSender(object):
else:
queue.send_edu(edu)
def send_device_messages(self, destination):
def send_device_messages(self, destination: str):
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
@ -519,5 +516,5 @@ class FederationSender(object):
self._get_per_destination_queue(destination).attempt_new_transaction()
def get_current_token(self):
def get_current_token(self) -> int:
return 0

View file

@ -16,11 +16,11 @@
import datetime
import logging
import random
from typing import Dict, Hashable, Iterable, List, Tuple
from prometheus_client import Counter
from twisted.internet import defer
import synapse.server
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
@ -32,7 +32,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import StateMap
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
# This is defined in the Matrix spec and enforced by the receiver.
@ -59,13 +59,18 @@ class PerDestinationQueue(object):
Manages the per-destination transmission queues.
Args:
hs (synapse.HomeServer):
transaction_sender (TransactionManager):
destination (str): the server_name of the destination that we are managing
hs
transaction_sender
destination: the server_name of the destination that we are managing
transmission for.
"""
def __init__(self, hs, transaction_manager, destination):
def __init__(
self,
hs: "synapse.server.HomeServer",
transaction_manager: "synapse.federation.sender.TransactionManager",
destination: str,
):
self._server_name = hs.hostname
self._clock = hs.get_clock()
self._store = hs.get_datastore()
@ -75,20 +80,20 @@ class PerDestinationQueue(object):
self.transmission_loop_running = False
# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: list[tuple[EventBase, int]]
self._pending_edus = [] # type: list[Edu]
self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
self._pending_edus = [] # type: List[Edu]
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
self._pending_edus_keyed = {} # type: StateMap[Edu]
self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu]
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
self._pending_presence = {} # type: dict[str, UserPresenceState]
self._pending_presence = {} # type: Dict[str, UserPresenceState]
# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs = {}
self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]]
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@ -98,50 +103,50 @@ class PerDestinationQueue(object):
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
def __str__(self):
def __str__(self) -> str:
return "PerDestinationQueue[%s]" % self._destination
def pending_pdu_count(self):
def pending_pdu_count(self) -> int:
return len(self._pending_pdus)
def pending_edu_count(self):
def pending_edu_count(self) -> int:
return (
len(self._pending_edus)
+ len(self._pending_presence)
+ len(self._pending_edus_keyed)
)
def send_pdu(self, pdu, order):
def send_pdu(self, pdu: EventBase, order: int) -> None:
"""Add a PDU to the queue, and start the transmission loop if neccessary
Args:
pdu (EventBase): pdu to send
order (int):
pdu: pdu to send
order
"""
self._pending_pdus.append((pdu, order))
self.attempt_new_transaction()
def send_presence(self, states):
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
"""Add presence updates to the queue. Start the transmission loop if neccessary.
Args:
states (iterable[UserPresenceState]): presence to send
states: presence to send
"""
self._pending_presence.update({state.user_id: state for state in states})
self.attempt_new_transaction()
def queue_read_receipt(self, receipt):
def queue_read_receipt(self, receipt: ReadReceipt) -> None:
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
(see flush_read_receipts_for_room)
Args:
receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
receipt: receipt to be queued
"""
self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
def flush_read_receipts_for_room(self, room_id):
def flush_read_receipts_for_room(self, room_id: str) -> None:
# if we don't have any read-receipts for this room, it may be that we've already
# sent them out, so we don't need to flush.
if room_id not in self._pending_rrs:
@ -149,15 +154,15 @@ class PerDestinationQueue(object):
self._rrs_pending_flush = True
self.attempt_new_transaction()
def send_keyed_edu(self, edu, key):
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
def send_edu(self, edu):
def send_edu(self, edu) -> None:
self._pending_edus.append(edu)
self.attempt_new_transaction()
def attempt_new_transaction(self):
def attempt_new_transaction(self) -> None:
"""Try to start a new transaction to this destination
If there is already a transaction in progress to this destination,
@ -180,16 +185,15 @@ class PerDestinationQueue(object):
self._transaction_transmission_loop,
)
@defer.inlineCallbacks
def _transaction_transmission_loop(self):
pending_pdus = []
async def _transaction_transmission_loop(self) -> None:
pending_pdus = [] # type: List[Tuple[EventBase, int]]
try:
self.transmission_loop_running = True
# This will throw if we wouldn't retry. We do this here so we fail
# quickly, but we will later check this again in the http client,
# hence why we throw the result away.
yield get_retry_limiter(self._destination, self._clock, self._store)
await get_retry_limiter(self._destination, self._clock, self._store)
pending_pdus = []
while True:
@ -203,12 +207,12 @@ class PerDestinationQueue(object):
logger.info(
"TX [%s]: sleeping for %f seconds", self._destination, sleeptime
)
yield self._clock.sleep(sleeptime)
await self._clock.sleep(sleeptime)
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2
device_update_edus, dev_list_id = yield self._get_device_update_edus(
device_update_edus, dev_list_id = await self._get_device_update_edus(
limit
)
@ -217,7 +221,7 @@ class PerDestinationQueue(object):
(
to_device_edus,
device_stream_id,
) = yield self._get_to_device_message_edus(limit)
) = await self._get_to_device_message_edus(limit)
pending_edus = device_update_edus + to_device_edus
@ -284,7 +288,7 @@ class PerDestinationQueue(object):
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
success = await self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
@ -295,7 +299,7 @@ class PerDestinationQueue(object):
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
await self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
@ -304,7 +308,7 @@ class PerDestinationQueue(object):
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
yield self._store.mark_as_sent_devices_by_remote(
await self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
@ -349,7 +353,7 @@ class PerDestinationQueue(object):
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
def _get_rr_edus(self, force_flush):
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
if not force_flush and not self._rrs_pending_flush:
@ -366,17 +370,16 @@ class PerDestinationQueue(object):
self._rrs_pending_flush = False
yield edu
def _pop_pending_edus(self, limit):
def _pop_pending_edus(self, limit: int) -> List[Edu]:
pending_edus = self._pending_edus
pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
return pending_edus
@defer.inlineCallbacks
def _get_device_update_edus(self, limit):
async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_list = self._last_device_list_stream_id
# Retrieve list of new device updates to send to the destination
now_stream_id, results = yield self._store.get_device_updates_by_remote(
now_stream_id, results = await self._store.get_device_updates_by_remote(
self._destination, last_device_list, limit=limit
)
edus = [
@ -393,11 +396,10 @@ class PerDestinationQueue(object):
return (edus, now_stream_id)
@defer.inlineCallbacks
def _get_to_device_message_edus(self, limit):
async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
contents, stream_id = await self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit
)
edus = [

View file

@ -13,14 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from canonicaljson import json
from twisted.internet import defer
import synapse.server
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
from synapse.federation.units import Edu, Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
@ -39,7 +40,7 @@ class TransactionManager(object):
shared between PerDestinationQueue objects
"""
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._store = hs.get_datastore()
@ -54,8 +55,9 @@ class TransactionManager(object):
self.deprioritise_transmission = False
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
async def send_new_transaction(
self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
):
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
@ -131,7 +133,7 @@ class TransactionManager(object):
return data
try:
response = yield self._transport_layer.send_transaction(
response = await self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200

View file

@ -158,7 +158,7 @@ class Authenticator(object):
origin, json_request, now, "Incoming request"
)
logger.info("Request from %s", origin)
logger.debug("Request from %s", origin)
request.authenticated_entity = origin
# If we get a valid signed request from the other side, its probably
@ -579,7 +579,7 @@ class FederationV1InviteServlet(BaseFederationServlet):
# state resolution algorithm, and we don't use that for processing
# invites
content = await self.handler.on_invite_request(
origin, content, room_version=RoomVersions.V1.identifier
origin, content, room_version_id=RoomVersions.V1.identifier
)
# V1 federation API is defined to return a content of `[200, {...}]`
@ -606,7 +606,7 @@ class FederationV2InviteServlet(BaseFederationServlet):
event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state
content = await self.handler.on_invite_request(
origin, event, room_version=room_version
origin, event, room_version_id=room_version
)
return 200, content

View file

@ -19,11 +19,15 @@ server protocol.
import logging
import attr
from synapse.types import JsonDict
from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__)
@attr.s(slots=True)
class Edu(JsonEncodedObject):
""" An Edu represents a piece of data sent from one homeserver to another.
@ -32,11 +36,24 @@ class Edu(JsonEncodedObject):
internal ID or previous references graph.
"""
valid_keys = ["origin", "destination", "edu_type", "content"]
edu_type = attr.ib(type=str)
content = attr.ib(type=dict)
origin = attr.ib(type=str)
destination = attr.ib(type=str)
required_keys = ["edu_type"]
def get_dict(self) -> JsonDict:
return {
"edu_type": self.edu_type,
"content": self.content,
}
internal_keys = ["origin", "destination"]
def get_internal_dict(self) -> JsonDict:
return {
"edu_type": self.edu_type,
"content": self.content,
"origin": self.origin,
"destination": self.destination,
}
def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")

View file

@ -58,8 +58,10 @@ class AdminHandler(BaseHandler):
ret = await self.store.get_user_by_id(user.to_string())
if ret:
profile = await self.store.get_profileinfo(user.localpart)
threepids = await self.store.user_get_threepids(user.to_string())
ret["displayname"] = profile.display_name
ret["avatar_url"] = profile.avatar_url
ret["threepids"] = threepids
return ret
async def export_user_data(self, user_id, writer):

View file

@ -816,6 +816,14 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def add_threepid(self, user_id, medium, address, validated_at):
# check if medium has a valid value
if medium not in ["email", "msisdn"]:
raise SynapseError(
code=400,
msg=("'%s' is not a valid value for 'medium'" % (medium,)),
errcode=Codes.INVALID_PARAM,
)
# 'Canonicalise' email addresses down to lower case.
# We've now moving towards the homeserver being the entity that
# is responsible for validating threepids used for resetting passwords

View file

@ -26,6 +26,7 @@ from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
RequestSendFailed,
SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id
@ -39,6 +40,8 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
MAX_DEVICE_DISPLAY_NAME_LEN = 100
class DeviceWorkerHandler(BaseHandler):
def __init__(self, hs):
@ -404,9 +407,18 @@ class DeviceHandler(DeviceWorkerHandler):
defer.Deferred:
"""
# Reject a new displayname which is too long.
new_display_name = content.get("display_name")
if new_display_name and len(new_display_name) > MAX_DEVICE_DISPLAY_NAME_LEN:
raise SynapseError(
400,
"Device display name is too long (max %i)"
% (MAX_DEVICE_DISPLAY_NAME_LEN,),
)
try:
yield self.store.update_device(
user_id, device_id, new_display_name=content.get("display_name")
user_id, device_id, new_display_name=new_display_name
)
yield self.notify_device_update(user_id, [device_id])
except errors.StoreError as e:

View file

@ -81,13 +81,7 @@ class DirectoryHandler(BaseHandler):
@defer.inlineCallbacks
def create_association(
self,
requester,
room_alias,
room_id,
servers=None,
send_event=True,
check_membership=True,
self, requester, room_alias, room_id, servers=None, check_membership=True,
):
"""Attempt to create a new alias
@ -97,7 +91,6 @@ class DirectoryHandler(BaseHandler):
room_id (str)
servers (list[str]|None): List of servers that others servers
should try and join via
send_event (bool): Whether to send an updated m.room.aliases event
check_membership (bool): Whether to check if the user is in the room
before the alias can be set (if the server's config requires it).
@ -150,16 +143,9 @@ class DirectoryHandler(BaseHandler):
)
yield self._create_association(room_alias, room_id, servers, creator=user_id)
if send_event:
try:
yield self.send_room_alias_update_event(requester, room_id)
except AuthError as e:
# sending the aliases event may fail due to the user not having
# permission in the room; this is permitted.
logger.info("Skipping updating aliases event due to auth error %s", e)
@defer.inlineCallbacks
def delete_association(self, requester, room_alias, send_event=True):
def delete_association(self, requester, room_alias):
"""Remove an alias from the directory
(this is only meant for human users; AS users should call
@ -168,9 +154,6 @@ class DirectoryHandler(BaseHandler):
Args:
requester (Requester):
room_alias (RoomAlias):
send_event (bool): Whether to send an updated m.room.aliases event.
Note that, if we delete the canonical alias, we will always attempt
to send an m.room.canonical_alias event
Returns:
Deferred[unicode]: room id that the alias used to point to
@ -206,9 +189,6 @@ class DirectoryHandler(BaseHandler):
room_id = yield self._delete_association(room_alias)
try:
if send_event:
yield self.send_room_alias_update_event(requester, room_id)
yield self._update_canonical_alias(
requester, requester.user.to_string(), room_id, room_alias
)
@ -319,25 +299,50 @@ class DirectoryHandler(BaseHandler):
@defer.inlineCallbacks
def _update_canonical_alias(self, requester, user_id, room_id, room_alias):
"""
Send an updated canonical alias event if the removed alias was set as
the canonical alias or listed in the alt_aliases field.
"""
alias_event = yield self.state.get_current_state(
room_id, EventTypes.CanonicalAlias, ""
)
alias_str = room_alias.to_string()
if not alias_event or alias_event.content.get("alias", "") != alias_str:
# There is no canonical alias, nothing to do.
if not alias_event:
return
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.CanonicalAlias,
"state_key": "",
"room_id": room_id,
"sender": user_id,
"content": {},
},
ratelimit=False,
)
# Obtain a mutable version of the event content.
content = dict(alias_event.content)
send_update = False
# Remove the alias property if it matches the removed alias.
alias_str = room_alias.to_string()
if alias_event.content.get("alias", "") == alias_str:
send_update = True
content.pop("alias", "")
# Filter alt_aliases for the removed alias.
alt_aliases = content.pop("alt_aliases", None)
# If the aliases are not a list (or not found) do not attempt to modify
# the list.
if isinstance(alt_aliases, list):
send_update = True
alt_aliases = [alias for alias in alt_aliases if alias != alias_str]
if alt_aliases:
content["alt_aliases"] = alt_aliases
if send_update:
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.CanonicalAlias,
"state_key": "",
"room_id": room_id,
"sender": user_id,
"content": content,
},
ratelimit=False,
)
@defer.inlineCallbacks
def get_association_from_room_alias(self, room_alias):

File diff suppressed because it is too large Load diff

View file

@ -932,10 +932,9 @@ class EventCreationHandler(object):
# way? If we have been invited by a remote server, we need
# to get them to sign the event.
returned_invite = yield federation_handler.send_invite(
invitee.domain, event
returned_invite = yield defer.ensureDeferred(
federation_handler.send_invite(invitee.domain, event)
)
event.unsigned.pop("room_state", None)
# TODO: Make sure the signatures actually are correct.

View file

@ -64,18 +64,21 @@ class RoomCreationHandler(BaseHandler):
"history_visibility": "shared",
"original_invitees_have_ops": False,
"guest_can_join": True,
"power_level_content_override": {"invite": 0},
},
RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
"join_rules": JoinRules.INVITE,
"history_visibility": "shared",
"original_invitees_have_ops": True,
"guest_can_join": True,
"power_level_content_override": {"invite": 0},
},
RoomCreationPreset.PUBLIC_CHAT: {
"join_rules": JoinRules.PUBLIC,
"history_visibility": "shared",
"original_invitees_have_ops": False,
"guest_can_join": False,
"power_level_content_override": {},
},
}
@ -259,7 +262,7 @@ class RoomCreationHandler(BaseHandler):
for v in ("invite", "events_default"):
current = int(pl_content.get(v, 0))
if current < restricted_level:
logger.info(
logger.debug(
"Setting level for %s in %s to %i (was %i)",
v,
old_room_id,
@ -269,7 +272,7 @@ class RoomCreationHandler(BaseHandler):
pl_content[v] = restricted_level
updated = True
else:
logger.info("Not setting level for %s (already %i)", v, current)
logger.debug("Not setting level for %s (already %i)", v, current)
if updated:
try:
@ -296,7 +299,7 @@ class RoomCreationHandler(BaseHandler):
EventTypes.Aliases, events_default
)
logger.info("Setting correct PLs in new room to %s", new_pl_content)
logger.debug("Setting correct PLs in new room to %s", new_pl_content)
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
@ -475,9 +478,7 @@ class RoomCreationHandler(BaseHandler):
for alias_str in aliases:
alias = RoomAlias.from_string(alias_str)
try:
yield directory_handler.delete_association(
requester, alias, send_event=False
)
yield directory_handler.delete_association(requester, alias)
removed_aliases.append(alias_str)
except SynapseError as e:
logger.warning("Unable to remove alias %s from old room: %s", alias, e)
@ -508,7 +509,6 @@ class RoomCreationHandler(BaseHandler):
RoomAlias.from_string(alias),
new_room_id,
servers=(self.hs.hostname,),
send_event=False,
check_membership=False,
)
logger.info("Moved alias %s to new room", alias)
@ -579,9 +579,13 @@ class RoomCreationHandler(BaseHandler):
# Check whether the third party rules allows/changes the room create
# request.
yield self.third_party_event_rules.on_create_room(
event_allowed = yield self.third_party_event_rules.on_create_room(
requester, config, is_requester_admin=is_requester_admin
)
if not event_allowed:
raise SynapseError(
403, "You are not permitted to create rooms", Codes.FORBIDDEN
)
if not is_requester_admin and not self.spam_checker.user_may_create_room(
user_id
@ -657,7 +661,6 @@ class RoomCreationHandler(BaseHandler):
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
send_event=False,
check_membership=False,
)
@ -782,7 +785,7 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def send(etype, content, **kwargs):
event = create(etype, content, **kwargs)
logger.info("Sending %s in new room", etype)
logger.debug("Sending %s in new room", etype)
yield self.event_creation_handler.create_and_send_nonmember_event(
creator, event, ratelimit=False
)
@ -796,7 +799,7 @@ class RoomCreationHandler(BaseHandler):
creation_content.update({"creator": creator_id})
yield send(etype=EventTypes.Create, content=creation_content)
logger.info("Sending %s in new room", EventTypes.Member)
logger.debug("Sending %s in new room", EventTypes.Member)
yield self.room_member_handler.update_membership(
creator,
creator.user,
@ -825,19 +828,24 @@ class RoomCreationHandler(BaseHandler):
# This will be reudundant on pre-MSC2260 rooms, since the
# aliases event is special-cased.
EventTypes.Aliases: 0,
EventTypes.Tombstone: 100,
EventTypes.ServerACL: 100,
},
"events_default": 0,
"state_default": 50,
"ban": 50,
"kick": 50,
"redact": 50,
"invite": 0,
"invite": 50,
}
if config["original_invitees_have_ops"]:
for invitee in invite_list:
power_level_content["users"][invitee] = 100
# Power levels overrides are defined per chat preset
power_level_content.update(config["power_level_content_override"])
if power_level_content_override:
power_level_content.update(power_level_content_override)

View file

@ -964,8 +964,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
yield self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
yield defer.ensureDeferred(
self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
)
yield self._user_joined_room(user, room_id)
@ -1002,8 +1004,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
ret = yield defer.ensureDeferred(
fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
)
)
return ret
except Exception as e:

View file

@ -300,7 +300,7 @@ class StatsHandler(StateDeltasHandler):
room_state["guest_access"] = event_content.get("guest_access")
for room_id, state in room_to_state_updates.items():
logger.info("Updating room_stats_state for %s: %s", room_id, state)
logger.debug("Updating room_stats_state for %s: %s", room_id, state)
yield self.store.update_room_state(room_id, state)
return room_to_stats_deltas, user_to_stats_deltas

File diff suppressed because it is too large Load diff

View file

@ -52,6 +52,7 @@ class UserDirectoryHandler(StateDeltasHandler):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
self.spam_checker = hs.get_spam_checker()
# The current position in the current_state_delta stream
self.pos = None
@ -65,7 +66,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)
def search_users(self, user_id, search_term, limit):
async def search_users(self, user_id, search_term, limit):
"""Searches for users in directory
Returns:
@ -82,7 +83,16 @@ class UserDirectoryHandler(StateDeltasHandler):
]
}
"""
return self.store.search_user_dir(user_id, search_term, limit)
results = await self.store.search_user_dir(user_id, search_term, limit)
# Remove any spammy users from the results.
results["results"] = [
user
for user in results["results"]
if not self.spam_checker.check_username_for_spam(user)
]
return results
def notify_new_event(self):
"""Called when there may be more deltas to process
@ -149,7 +159,7 @@ class UserDirectoryHandler(StateDeltasHandler):
self.pos, room_max_stream_ordering
)
logger.info("Handling %d state deltas", len(deltas))
logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = max_pos
@ -195,7 +205,7 @@ class UserDirectoryHandler(StateDeltasHandler):
room_id, self.server_name
)
if not is_in_room:
logger.info("Server left room: %r", room_id)
logger.debug("Server left room: %r", room_id)
# Fetch all the users that we marked as being in user
# directory due to being in the room and then check if
# need to remove those users or not

View file

@ -225,7 +225,7 @@ class SynapseRequest(Request):
self.start_time, name=servlet_name, method=self.get_method()
)
self.site.access_logger.info(
self.site.access_logger.debug(
"%s - %s - Received request: %s %s",
self.getClientIP(),
self.site.site_tag,

View file

@ -402,7 +402,7 @@ class HttpPusher(object):
Args:
badge (int): number of unread messages
"""
logger.info("Sending updated badge count %d to %s", badge, self.name)
logger.debug("Sending updated badge count %d to %s", badge, self.name)
d = {
"notification": {
"id": "",

View file

@ -21,7 +21,7 @@ from six import text_type
from six.moves import http_client
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
@ -105,7 +105,7 @@ class UsersRestServletV2(RestServlet):
class UserRestServletV2(RestServlet):
PATTERNS = (re.compile("^/_synapse/admin/v2/users/(?P<user_id>@[^/]+)$"),)
PATTERNS = (re.compile("^/_synapse/admin/v2/users/(?P<user_id>[^/]+)$"),)
"""Get request to list user details.
This needs user to have administrator access in Synapse.
@ -136,6 +136,8 @@ class UserRestServletV2(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
self.admin_handler = hs.get_handlers().admin_handler
self.store = hs.get_datastore()
self.auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.set_password_handler = hs.get_set_password_handler()
self.deactivate_account_handler = hs.get_deactivate_account_handler()
@ -150,6 +152,9 @@ class UserRestServletV2(RestServlet):
ret = await self.admin_handler.get_user(target_user)
if not ret:
raise NotFoundError("User not found")
return 200, ret
async def on_PUT(self, request, user_id):
@ -163,6 +168,7 @@ class UserRestServletV2(RestServlet):
raise SynapseError(400, "This endpoint can only be used with local users")
user = await self.admin_handler.get_user(target_user)
user_id = target_user.to_string()
if user: # modify user
if "displayname" in body:
@ -170,6 +176,29 @@ class UserRestServletV2(RestServlet):
target_user, requester, body["displayname"], True
)
if "threepids" in body:
# check for required parameters for each threepid
for threepid in body["threepids"]:
assert_params_in_dict(threepid, ["medium", "address"])
# remove old threepids from user
threepids = await self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
await self.auth_handler.delete_threepid(
user_id, threepid["medium"], threepid["address"], None
)
except Exception:
logger.exception("Failed to remove threepids")
raise SynapseError(500, "Failed to remove threepids")
# add new threepids to user
current_time = self.hs.get_clock().time_msec()
for threepid in body["threepids"]:
await self.auth_handler.add_threepid(
user_id, threepid["medium"], threepid["address"], current_time
)
if "avatar_url" in body:
await self.profile_handler.set_avatar_url(
target_user, requester, body["avatar_url"], True
@ -221,6 +250,7 @@ class UserRestServletV2(RestServlet):
admin = body.get("admin", None)
user_type = body.get("user_type", None)
displayname = body.get("displayname", None)
threepids = body.get("threepids", None)
if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
raise SynapseError(400, "Invalid user type")
@ -232,6 +262,18 @@ class UserRestServletV2(RestServlet):
default_display_name=displayname,
user_type=user_type,
)
if "threepids" in body:
# check for required parameters for each threepid
for threepid in body["threepids"]:
assert_params_in_dict(threepid, ["medium", "address"])
current_time = self.hs.get_clock().time_msec()
for threepid in body["threepids"]:
await self.auth_handler.add_threepid(
user_id, threepid["medium"], threepid["address"], current_time
)
if "avatar_url" in body:
await self.profile_handler.set_avatar_url(
user_id, requester, body["avatar_url"], True
@ -568,7 +610,7 @@ class UserAdminServlet(RestServlet):
{}
"""
PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>[^/]*)/admin$"),)
def __init__(self, hs):
self.hs = hs

View file

@ -52,7 +52,6 @@ class VersionsRestServlet(RestServlet):
],
# as per MSC1497:
"unstable_features": {
"m.lazy_load_members": True,
# as per MSC2190, as amended by MSC2264
# to be removed in r0.6.0
"m.id_access_token": True,

View file

@ -107,3 +107,5 @@ class HomeServer(object):
self,
) -> synapse.replication.tcp.client.ReplicationClientHandler:
pass
def is_mine_id(self, domain_id: str) -> bool:
pass

View file

@ -18,6 +18,10 @@ from twisted.internet import defer
from synapse.storage.state import StateFilter
MYPY = False
if MYPY:
import synapse.server
logger = logging.getLogger(__name__)
@ -26,18 +30,18 @@ class SpamCheckerApi(object):
access to rooms and other relevant information.
"""
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self._store = hs.get_datastore()
@defer.inlineCallbacks
def get_state_events_in_room(self, room_id, types):
def get_state_events_in_room(self, room_id: str, types: tuple) -> defer.Deferred:
"""Gets state events for the given room.
Args:
room_id (string): The room ID to get state events in.
types (tuple): The event type and state key (using None
room_id: The room ID to get state events in.
types: The event type and state key (using None
to represent 'any') of the room state to acquire.
Returns:

View file

@ -26,6 +26,7 @@ from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.signatures import SignatureWorkerStore
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
@ -61,6 +62,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
)
def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
if isinstance(self.database_engine, PostgresEngine):
# For efficiency we make the database do this if we can.
sql = """
WITH RECURSIVE auth_chain(event_id) AS (
SELECT auth_id FROM event_auth WHERE event_id = ANY(?)
UNION
SELECT auth_id FROM event_auth
INNER JOIN auth_chain USING (event_id)
)
SELECT event_id FROM auth_chain
"""
txn.execute(sql, (list(event_ids),))
results = set(event_id for event_id, in txn)
if include_given:
results.update(event_ids)
return list(results)
# Database doesn't necessarily support recursive CTE, so we fall
# back to do doing it manually.
if include_given:
results = set(event_ids)
else:

View file

@ -15,5 +15,8 @@
-- Add background update to go and delete current state events for rooms the
-- server is no longer in.
INSERT into background_updates (update_name, progress_json)
VALUES ('delete_old_current_state_events', '{}');
--
-- this relies on the 'membership' column of current_state_events, so make sure
-- that's populated first!
INSERT into background_updates (update_name, progress_json, depends_on)
VALUES ('delete_old_current_state_events', '{}', 'current_state_events_membership');

View file

@ -0,0 +1,35 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- when we first added the room_version column, it was populated via a background
-- update. We now need it to be populated before synapse starts, so we populate
-- any remaining rows with a NULL room version now. For servers which have completed
-- the background update, this will be pretty quick.
-- the following query will set room_version to NULL if no create event is found for
-- the room in current_state_events, and will set it to '1' if a create event with no
-- room_version is found.
UPDATE rooms SET room_version=(
SELECT COALESCE(json::json->'content'->>'room_version','1')
FROM current_state_events cse INNER JOIN event_json ej USING (event_id)
WHERE cse.room_id=rooms.room_id AND cse.type='m.room.create' AND cse.state_key=''
) WHERE rooms.room_version IS NULL;
-- we still allow the background update to complete: it has the useful side-effect of
-- populating `rooms` with any missing rooms (based on the current_state_events table).
-- see also rooms_version_column_2.sql.sqlite which has a copy of the above query, using
-- sqlite syntax for the json extraction.

View file

@ -0,0 +1,22 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- see rooms_version_column_2.sql.postgres for details of what's going on here.
UPDATE rooms SET room_version=(
SELECT COALESCE(json_extract(ej.json, '$.content.room_version'), '1')
FROM current_state_events cse INNER JOIN event_json ej USING (event_id)
WHERE cse.room_id=rooms.room_id AND cse.type='m.room.create' AND cse.state_key=''
) WHERE rooms.room_version IS NULL;

View file

@ -271,31 +271,6 @@ class StatsStore(StateDeltasStore):
return slice_list
def get_room_stats_state(self, room_id):
"""
Returns the current room_stats_state for a room.
Args:
room_id (str): The ID of the room to return state for.
Returns (dict):
Dictionary containing these keys:
"name", "topic", "canonical_alias", "avatar", "join_rules",
"history_visibility"
"""
return self.db.simple_select_one(
"room_stats_state",
{"room_id": room_id},
retcols=(
"name",
"topic",
"canonical_alias",
"avatar",
"join_rules",
"history_visibility",
),
)
@cached()
def get_earliest_token_for_stats(self, stats_type, id):
"""

View file

@ -183,7 +183,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
)
return 1
logger.info(
logger.debug(
"Processing the next %d rooms of %d remaining"
% (len(rooms_to_work_on), progress["remaining"])
)
@ -308,7 +308,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
)
return 1
logger.info(
logger.debug(
"Processing the next %d users of %d remaining"
% (len(users_to_work_on), progress["remaining"])
)

View file

@ -343,7 +343,7 @@ class Database(object):
top_three_counters = self._txn_perf_counters.interval(duration, limit=3)
perf_logger.info(
perf_logger.debug(
"Total database time: %.3f%% {%s}", ratio * 100, top_three_counters
)

Some files were not shown because too many files have changed in this diff Show more