Merge branch 'develop' into matrix-org-hotfixes
This commit is contained in:
commit
527f73d902
|
@ -4,18 +4,16 @@ jobs:
|
|||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:${CIRCLE_TAG} -t matrixdotorg/synapse:${CIRCLE_TAG}-py3 .
|
||||
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:${CIRCLE_TAG} .
|
||||
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
|
||||
- run: docker push matrixdotorg/synapse:${CIRCLE_TAG}
|
||||
- run: docker push matrixdotorg/synapse:${CIRCLE_TAG}-py3
|
||||
dockerhubuploadlatest:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:latest -t matrixdotorg/synapse:latest-py3 .
|
||||
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:latest .
|
||||
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
|
||||
- run: docker push matrixdotorg/synapse:latest
|
||||
- run: docker push matrixdotorg/synapse:latest-py3
|
||||
|
||||
workflows:
|
||||
version: 2
|
||||
|
|
4
.github/ISSUE_TEMPLATE/BUG_REPORT.md
vendored
4
.github/ISSUE_TEMPLATE/BUG_REPORT.md
vendored
|
@ -4,12 +4,12 @@ about: Create a report to help us improve
|
|||
|
||||
---
|
||||
|
||||
<!--
|
||||
|
||||
**THIS IS NOT A SUPPORT CHANNEL!**
|
||||
**IF YOU HAVE SUPPORT QUESTIONS ABOUT RUNNING OR CONFIGURING YOUR OWN HOME SERVER**,
|
||||
please ask in **#synapse:matrix.org** (using a matrix.org account if necessary)
|
||||
|
||||
<!--
|
||||
|
||||
If you want to report a security issue, please see https://matrix.org/security-disclosure-policy/
|
||||
|
||||
This is a bug report template. By following the instructions below and
|
||||
|
|
1
changelog.d/7314.misc
Normal file
1
changelog.d/7314.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Allow guest access to the `GET /_matrix/client/r0/rooms/{room_id}/members` endpoint, according to MSC2689. Contributed by Awesome Technologies Innovationslabor GmbH.
|
1
changelog.d/7372.misc
Normal file
1
changelog.d/7372.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Reduce the amount of whitespace in JSON stored and sent in responses. Contributed by David Vo.
|
|
@ -1 +0,0 @@
|
|||
Add unread messages count to sync responses, as specified in [MSC2654](https://github.com/matrix-org/matrix-doc/pull/2654).
|
1
changelog.d/7977.bugfix
Normal file
1
changelog.d/7977.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a bug introduced in Synapse v1.7.2 which caused inaccurate membership counts in the room directory.
|
1
changelog.d/7987.misc
Normal file
1
changelog.d/7987.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/7989.misc
Normal file
1
changelog.d/7989.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/7996.bugfix
Normal file
1
changelog.d/7996.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix various comments and minor discrepencies in server notices code.
|
1
changelog.d/7997.misc
Normal file
1
changelog.d/7997.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Implement new experimental push rules for some users.
|
1
changelog.d/7999.bugfix
Normal file
1
changelog.d/7999.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a long standing bug where HTTP HEAD requests resulted in a 400 error.
|
1
changelog.d/8000.doc
Normal file
1
changelog.d/8000.doc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve workers docs.
|
1
changelog.d/8001.misc
Normal file
1
changelog.d/8001.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Remove redundant and unreliable signature check for v1 Identity Service lookup responses.
|
1
changelog.d/8003.misc
Normal file
1
changelog.d/8003.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8008.feature
Normal file
1
changelog.d/8008.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add rate limiting to users joining rooms.
|
1
changelog.d/8009.misc
Normal file
1
changelog.d/8009.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve the performance of the register endpoint.
|
1
changelog.d/8010.doc
Normal file
1
changelog.d/8010.doc
Normal file
|
@ -0,0 +1 @@
|
|||
Add documentation for how to undo a room shutdown.
|
1
changelog.d/8011.bugfix
Normal file
1
changelog.d/8011.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a long-standing bug which caused two copies of some log lines to be written when synctl was used along with a MemoryHandler logger.
|
1
changelog.d/8012.bugfix
Normal file
1
changelog.d/8012.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a long-standing bug which caused two copies of some log lines to be written when synctl was used along with a MemoryHandler logger.
|
1
changelog.d/8014.misc
Normal file
1
changelog.d/8014.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8016.misc
Normal file
1
changelog.d/8016.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8024.misc
Normal file
1
changelog.d/8024.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Reduce less useful output in the newsfragment CI step. Add a link to the changelog section of the contributing guide on error.
|
1
changelog.d/8027.misc
Normal file
1
changelog.d/8027.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8031.misc
Normal file
1
changelog.d/8031.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8032.misc
Normal file
1
changelog.d/8032.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8033.misc
Normal file
1
changelog.d/8033.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Rename storage layer objects to be more sensible.
|
1
changelog.d/8035.misc
Normal file
1
changelog.d/8035.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8039.misc
Normal file
1
changelog.d/8039.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Revert MSC2654 implementation because of perf issues. Please delete this line when processing the 1.19 changelog.
|
1
changelog.d/8040.misc
Normal file
1
changelog.d/8040.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Change the default log config to reduce disk I/O and storage for new servers.
|
1
changelog.d/8041.misc
Normal file
1
changelog.d/8041.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add an assertion on prev_events in create_new_client_event.
|
1
changelog.d/8042.misc
Normal file
1
changelog.d/8042.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8043.misc
Normal file
1
changelog.d/8043.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add a comment to `ServerContextFactory` about the use of `SSLv23_METHOD`.
|
1
changelog.d/8044.misc
Normal file
1
changelog.d/8044.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8045.misc
Normal file
1
changelog.d/8045.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8048.feature
Normal file
1
changelog.d/8048.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add a `/health` endpoint to every configured HTTP listener that can be used as a health check endpoint by load balancers.
|
1
changelog.d/8049.misc
Normal file
1
changelog.d/8049.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Log `OPTIONS` requests at `DEBUG` rather than `INFO` level to reduce amount logged at `INFO`.
|
1
changelog.d/8050.misc
Normal file
1
changelog.d/8050.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Reduce amount of outbound request logging at INFO level.
|
1
changelog.d/8051.misc
Normal file
1
changelog.d/8051.misc
Normal file
|
@ -0,0 +1 @@
|
|||
It is no longer necessary to explicitly define `filters` in the logging configuration. (Continuing to do so is redundant but harmless.)
|
1
changelog.d/8052.feature
Normal file
1
changelog.d/8052.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Allow login to be blocked based on the values of SAML attributes.
|
1
changelog.d/8056.docker
Normal file
1
changelog.d/8056.docker
Normal file
|
@ -0,0 +1 @@
|
|||
We no longer publish Docker images with the `-py3` tag suffix, as announced at https://github.com/matrix-org/synapse/blob/develop/UPGRADE.rst#upgrading-to-v1180.
|
1
changelog.d/8058.misc
Normal file
1
changelog.d/8058.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add type hints to `Notifier`.
|
1
changelog.d/8060.misc
Normal file
1
changelog.d/8060.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve typing information on `HomeServer` object.
|
1
changelog.d/8061.misc
Normal file
1
changelog.d/8061.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8062.misc
Normal file
1
changelog.d/8062.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8063.misc
Normal file
1
changelog.d/8063.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8064.misc
Normal file
1
changelog.d/8064.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add type hints to `Notifier`.
|
1
changelog.d/8066.misc
Normal file
1
changelog.d/8066.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8067.misc
Normal file
1
changelog.d/8067.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add type hints to `synapse.handlers.message` and `synapse.events.builder`.
|
1
changelog.d/8069.misc
Normal file
1
changelog.d/8069.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
1
changelog.d/8070.misc
Normal file
1
changelog.d/8070.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert various parts of the codebase to async/await.
|
|
@ -4,16 +4,10 @@ formatters:
|
|||
precise:
|
||||
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
formatter: precise
|
||||
filters: [context]
|
||||
|
||||
loggers:
|
||||
synapse.storage.SQL:
|
||||
|
|
|
@ -79,13 +79,20 @@ Response:
|
|||
the structure can and does change without notice.
|
||||
|
||||
First, it's important to understand that a room shutdown is very destructive. Undoing a shutdown is not as simple as pretending it
|
||||
never happened - work has to be done to move forward instead of resetting the past.
|
||||
never happened - work has to be done to move forward instead of resetting the past. In fact, in some cases it might not be possible
|
||||
to recover at all:
|
||||
|
||||
1. For safety reasons, it is recommended to shut down Synapse prior to continuing.
|
||||
* If the room was invite-only, your users will need to be re-invited.
|
||||
* If the room no longer has any members at all, it'll be impossible to rejoin.
|
||||
* The first user to rejoin will have to do so via an alias on a different server.
|
||||
|
||||
With all that being said, if you still want to try and recover the room:
|
||||
|
||||
1. For safety reasons, shut down Synapse.
|
||||
2. In the database, run `DELETE FROM blocked_rooms WHERE room_id = '!example:example.org';`
|
||||
* For caution: it's recommended to run this in a transaction: `BEGIN; DELETE ...;`, verify you got 1 result, then `COMMIT;`.
|
||||
* The room ID is the same one supplied to the shutdown room API, not the Content Violation room.
|
||||
3. Restart Synapse (required).
|
||||
3. Restart Synapse.
|
||||
|
||||
You will have to manually handle, if you so choose, the following:
|
||||
|
||||
|
|
|
@ -139,3 +139,10 @@ client IP addresses are recorded correctly.
|
|||
Having done so, you can then use `https://matrix.example.com` (instead
|
||||
of `https://matrix.example.com:8448`) as the "Custom server" when
|
||||
connecting to Synapse from a client.
|
||||
|
||||
|
||||
## Health check endpoint
|
||||
|
||||
Synapse exposes a health check endpoint for use by reverse proxies.
|
||||
Each configured HTTP listener has a `/health` endpoint which always returns
|
||||
200 OK (and doesn't get logged).
|
||||
|
|
|
@ -746,6 +746,10 @@ log_config: "CONFDIR/SERVERNAME.log.config"
|
|||
# - one for ratelimiting redactions by room admins. If this is not explicitly
|
||||
# set then it uses the same ratelimiting as per rc_message. This is useful
|
||||
# to allow room admins to deal with abuse quickly.
|
||||
# - two for ratelimiting number of rooms a user can join, "local" for when
|
||||
# users are joining rooms the server is already in (this is cheap) vs
|
||||
# "remote" for when users are trying to join rooms not on the server (which
|
||||
# can be more expensive)
|
||||
#
|
||||
# The defaults are as shown below.
|
||||
#
|
||||
|
@ -771,6 +775,14 @@ log_config: "CONFDIR/SERVERNAME.log.config"
|
|||
#rc_admin_redaction:
|
||||
# per_second: 1
|
||||
# burst_count: 50
|
||||
#
|
||||
#rc_joins:
|
||||
# local:
|
||||
# per_second: 0.1
|
||||
# burst_count: 3
|
||||
# remote:
|
||||
# per_second: 0.01
|
||||
# burst_count: 3
|
||||
|
||||
|
||||
# Ratelimiting settings for incoming federation
|
||||
|
@ -1565,6 +1577,17 @@ saml2_config:
|
|||
#
|
||||
#grandfathered_mxid_source_attribute: upn
|
||||
|
||||
# It is possible to configure Synapse to only allow logins if SAML attributes
|
||||
# match particular values. The requirements can be listed under
|
||||
# `attribute_requirements` as shown below. All of the listed attributes must
|
||||
# match for the login to be permitted.
|
||||
#
|
||||
#attribute_requirements:
|
||||
# - attribute: userGroup
|
||||
# value: "staff"
|
||||
# - attribute: department
|
||||
# value: "sales"
|
||||
|
||||
# Directory in which Synapse will try to find the template files below.
|
||||
# If not set, default templates from within the Synapse package will be used.
|
||||
#
|
||||
|
|
|
@ -11,24 +11,33 @@ formatters:
|
|||
precise:
|
||||
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
file:
|
||||
class: logging.handlers.RotatingFileHandler
|
||||
class: logging.handlers.TimedRotatingFileHandler
|
||||
formatter: precise
|
||||
filename: /var/log/matrix-synapse/homeserver.log
|
||||
maxBytes: 104857600
|
||||
backupCount: 10
|
||||
filters: [context]
|
||||
when: midnight
|
||||
backupCount: 3 # Does not include the current log file.
|
||||
encoding: utf8
|
||||
|
||||
# Default to buffering writes to log file for efficiency. This means that
|
||||
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
|
||||
# logs will still be flushed immediately.
|
||||
buffer:
|
||||
class: logging.handlers.MemoryHandler
|
||||
target: file
|
||||
# The capacity is the number of log lines that are buffered before
|
||||
# being written to disk. Increasing this will lead to better
|
||||
# performance, at the expensive of it taking longer for log lines to
|
||||
# be written to disk.
|
||||
capacity: 10
|
||||
flushLevel: 30 # Flush for WARNING logs as well
|
||||
|
||||
# A handler that writes logs to stderr. Unused by default, but can be used
|
||||
# instead of "buffer" and "file" in the logger handlers.
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
formatter: precise
|
||||
filters: [context]
|
||||
|
||||
loggers:
|
||||
synapse.storage.SQL:
|
||||
|
@ -36,8 +45,23 @@ loggers:
|
|||
# information such as access tokens.
|
||||
level: INFO
|
||||
|
||||
twisted:
|
||||
# We send the twisted logging directly to the file handler,
|
||||
# to work around https://github.com/matrix-org/synapse/issues/3471
|
||||
# when using "buffer" logger. Use "console" to log to stderr instead.
|
||||
handlers: [file]
|
||||
propagate: false
|
||||
|
||||
root:
|
||||
level: INFO
|
||||
handlers: [file, console]
|
||||
|
||||
# Write logs to the `buffer` handler, which will buffer them together in memory,
|
||||
# then write them to a file.
|
||||
#
|
||||
# Replace "buffer" with "console" to log to stderr instead. (Note that you'll
|
||||
# also need to update the configuation for the `twisted` logger above, in
|
||||
# this case.)
|
||||
#
|
||||
handlers: [buffer]
|
||||
|
||||
disable_existing_loggers: false
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
worker_app: synapse.app.federation_reader
|
||||
worker_name: federation_reader1
|
||||
|
||||
worker_replication_host: 127.0.0.1
|
||||
worker_replication_port: 9092
|
||||
worker_replication_http_port: 9093
|
||||
|
||||
worker_listeners:
|
||||
|
|
|
@ -7,6 +7,6 @@ who are present in a publicly viewable room present on the server.
|
|||
|
||||
The directory info is stored in various tables, which can (typically after
|
||||
DB corruption) get stale or out of sync. If this happens, for now the
|
||||
solution to fix it is to execute the SQL [here](../synapse/storage/data_stores/main/schema/delta/53/user_dir_populate.sql)
|
||||
solution to fix it is to execute the SQL [here](../synapse/storage/databases/main/schema/delta/53/user_dir_populate.sql)
|
||||
and then restart synapse. This should then start a background task to
|
||||
flush the current tables and regenerate the directory.
|
||||
|
|
|
@ -23,7 +23,7 @@ The processes communicate with each other via a Synapse-specific protocol called
|
|||
feeds streams of newly written data between processes so they can be kept in
|
||||
sync with the database state.
|
||||
|
||||
When configured to do so, Synapse uses a
|
||||
When configured to do so, Synapse uses a
|
||||
[Redis pub/sub channel](https://redis.io/topics/pubsub) to send the replication
|
||||
stream between all configured Synapse processes. Additionally, processes may
|
||||
make HTTP requests to each other, primarily for operations which need to wait
|
||||
|
@ -66,23 +66,31 @@ https://hub.docker.com/r/matrixdotorg/synapse/.
|
|||
|
||||
To make effective use of the workers, you will need to configure an HTTP
|
||||
reverse-proxy such as nginx or haproxy, which will direct incoming requests to
|
||||
the correct worker, or to the main synapse instance. See
|
||||
the correct worker, or to the main synapse instance. See
|
||||
[reverse_proxy.md](reverse_proxy.md) for information on setting up a reverse
|
||||
proxy.
|
||||
|
||||
To enable workers you should create a configuration file for each worker
|
||||
process. Each worker configuration file inherits the configuration of the shared
|
||||
homeserver configuration file. You can then override configuration specific to
|
||||
that worker, e.g. the HTTP listener that it provides (if any); logging
|
||||
configuration; etc. You should minimise the number of overrides though to
|
||||
maintain a usable config.
|
||||
When using workers, each worker process has its own configuration file which
|
||||
contains settings specific to that worker, such as the HTTP listener that it
|
||||
provides (if any), logging configuration, etc.
|
||||
|
||||
Normally, the worker processes are configured to read from a shared
|
||||
configuration file as well as the worker-specific configuration files. This
|
||||
makes it easier to keep common configuration settings synchronised across all
|
||||
the processes.
|
||||
|
||||
The main process is somewhat special in this respect: it does not normally
|
||||
need its own configuration file and can take all of its configuration from the
|
||||
shared configuration file.
|
||||
|
||||
|
||||
### Shared Configuration
|
||||
### Shared configuration
|
||||
|
||||
Normally, only a couple of changes are needed to make an existing configuration
|
||||
file suitable for use with workers. First, you need to enable an "HTTP replication
|
||||
listener" for the main process; and secondly, you need to enable redis-based
|
||||
replication. For example:
|
||||
|
||||
Next you need to add both a HTTP replication listener, used for HTTP requests
|
||||
between processes, and redis config to the shared Synapse configuration file
|
||||
(`homeserver.yaml`). For example:
|
||||
|
||||
```yaml
|
||||
# extend the existing `listeners` section. This defines the ports that the
|
||||
|
@ -105,7 +113,7 @@ Under **no circumstances** should the replication listener be exposed to the
|
|||
public internet; it has no authentication and is unencrypted.
|
||||
|
||||
|
||||
### Worker Configuration
|
||||
### Worker configuration
|
||||
|
||||
In the config file for each worker, you must specify the type of worker
|
||||
application (`worker_app`), and you should specify a unqiue name for the worker
|
||||
|
@ -145,6 +153,9 @@ plain HTTP endpoint on port 8083 separately serving various endpoints, e.g.
|
|||
Obviously you should configure your reverse-proxy to route the relevant
|
||||
endpoints to the worker (`localhost:8083` in the above example).
|
||||
|
||||
|
||||
### Running Synapse with workers
|
||||
|
||||
Finally, you need to start your worker processes. This can be done with either
|
||||
`synctl` or your distribution's preferred service manager such as `systemd`. We
|
||||
recommend the use of `systemd` where available: for information on setting up
|
||||
|
@ -407,6 +418,23 @@ all these to be folded into the `generic_worker` app and to use config to define
|
|||
which processes handle the various proccessing such as push notifications.
|
||||
|
||||
|
||||
## Migration from old config
|
||||
|
||||
There are two main independent changes that have been made: introducing Redis
|
||||
support and merging apps into `synapse.app.generic_worker`. Both these changes
|
||||
are backwards compatible and so no changes to the config are required, however
|
||||
server admins are encouraged to plan to migrate to Redis as the old style direct
|
||||
TCP replication config is deprecated.
|
||||
|
||||
To migrate to Redis add the `redis` config as above, and optionally remove the
|
||||
TCP `replication` listener from master and `worker_replication_port` from worker
|
||||
config.
|
||||
|
||||
To migrate apps to use `synapse.app.generic_worker` simply update the
|
||||
`worker_app` option in the worker configs, and where worker are started (e.g.
|
||||
in systemd service files, but not required for synctl).
|
||||
|
||||
|
||||
## Architectural diagram
|
||||
|
||||
The following shows an example setup using Redis and a reverse proxy:
|
||||
|
|
3
mypy.ini
3
mypy.ini
|
@ -81,3 +81,6 @@ ignore_missing_imports = True
|
|||
|
||||
[mypy-rust_python_jaeger_reporter.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-nacl.*]
|
||||
ignore_missing_imports = True
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
# A script which checks that an appropriate news file has been added on this
|
||||
# branch.
|
||||
|
||||
echo -e "+++ \033[32mChecking newsfragment\033[m"
|
||||
|
||||
set -e
|
||||
|
||||
# make sure that origin/develop is up to date
|
||||
|
@ -16,6 +18,8 @@ pr="$BUILDKITE_PULL_REQUEST"
|
|||
if ! git diff --quiet FETCH_HEAD... -- debian; then
|
||||
if git diff --quiet FETCH_HEAD... -- debian/changelog; then
|
||||
echo "Updates to debian directory, but no update to the changelog." >&2
|
||||
echo "!! Please see the contributing guide for help writing your changelog entry:" >&2
|
||||
echo "https://github.com/matrix-org/synapse/blob/develop/CONTRIBUTING.md#debian-changelog" >&2
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
@ -26,7 +30,12 @@ if ! git diff --name-only FETCH_HEAD... | grep -qv '^debian/'; then
|
|||
exit 0
|
||||
fi
|
||||
|
||||
tox -qe check-newsfragment
|
||||
# Print a link to the contributing guide if the user makes a mistake
|
||||
CONTRIBUTING_GUIDE_TEXT="!! Please see the contributing guide for help writing your changelog entry:
|
||||
https://github.com/matrix-org/synapse/blob/develop/CONTRIBUTING.md#changelog"
|
||||
|
||||
# If check-newsfragment returns a non-zero exit code, print the contributing guide and exit
|
||||
tox -qe check-newsfragment || (echo -e "$CONTRIBUTING_GUIDE_TEXT" >&2 && exit 1)
|
||||
|
||||
echo
|
||||
echo "--------------------------"
|
||||
|
@ -38,6 +47,7 @@ for f in `git diff --name-only FETCH_HEAD... -- changelog.d`; do
|
|||
lastchar=`tr -d '\n' < $f | tail -c 1`
|
||||
if [ $lastchar != '.' -a $lastchar != '!' ]; then
|
||||
echo -e "\e[31mERROR: newsfragment $f does not end with a '.' or '!'\e[39m" >&2
|
||||
echo -e "$CONTRIBUTING_GUIDE_TEXT" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
@ -47,5 +57,6 @@ done
|
|||
|
||||
if [[ -n "$pr" && "$matched" -eq 0 ]]; then
|
||||
echo -e "\e[31mERROR: Did not find a news fragment with the right number: expected changelog.d/$pr.*.\e[39m" >&2
|
||||
echo -e "$CONTRIBUTING_GUIDE_TEXT" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
|
|
@ -40,7 +40,7 @@ class MockHomeserver(HomeServer):
|
|||
config.server_name, reactor=reactor, config=config, **kwargs
|
||||
)
|
||||
|
||||
self.version_string = "Synapse/"+get_version_string(synapse)
|
||||
self.version_string = "Synapse/" + get_version_string(synapse)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -86,7 +86,7 @@ if __name__ == "__main__":
|
|||
store = hs.get_datastore()
|
||||
|
||||
async def run_background_updates():
|
||||
await store.db.updates.run_background_updates(sleep=False)
|
||||
await store.db_pool.updates.run_background_updates(sleep=False)
|
||||
# Stop the reactor to exit the script once every background update is run.
|
||||
reactor.stop()
|
||||
|
||||
|
|
|
@ -35,31 +35,29 @@ from synapse.logging.context import (
|
|||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.deviceinbox import (
|
||||
DeviceInboxBackgroundUpdateStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.devices import DeviceBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.events_bg_updates import (
|
||||
from synapse.storage.database import DatabasePool, make_conn
|
||||
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.events_bg_updates import (
|
||||
EventsBackgroundUpdatesStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.media_repository import (
|
||||
from synapse.storage.databases.main.media_repository import (
|
||||
MediaRepositoryBackgroundUpdateStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.registration import (
|
||||
from synapse.storage.databases.main.registration import (
|
||||
RegistrationBackgroundUpdateStore,
|
||||
find_max_generated_user_id_localpart,
|
||||
)
|
||||
from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.stats import StatsStore
|
||||
from synapse.storage.data_stores.main.user_directory import (
|
||||
from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.stats import StatsStore
|
||||
from synapse.storage.databases.main.user_directory import (
|
||||
UserDirectoryBackgroundUpdateStore,
|
||||
)
|
||||
from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.database import Database, make_conn
|
||||
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
from synapse.util import Clock
|
||||
|
@ -69,7 +67,7 @@ logger = logging.getLogger("synapse_port_db")
|
|||
|
||||
|
||||
BOOLEAN_COLUMNS = {
|
||||
"events": ["processed", "outlier", "contains_url", "count_as_unread"],
|
||||
"events": ["processed", "outlier", "contains_url"],
|
||||
"rooms": ["is_public"],
|
||||
"event_edges": ["is_state"],
|
||||
"presence_list": ["accepted"],
|
||||
|
@ -175,14 +173,14 @@ class Store(
|
|||
StatsStore,
|
||||
):
|
||||
def execute(self, f, *args, **kwargs):
|
||||
return self.db.runInteraction(f.__name__, f, *args, **kwargs)
|
||||
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
|
||||
|
||||
def execute_sql(self, sql, *args):
|
||||
def r(txn):
|
||||
txn.execute(sql, args)
|
||||
return txn.fetchall()
|
||||
|
||||
return self.db.runInteraction("execute_sql", r)
|
||||
return self.db_pool.runInteraction("execute_sql", r)
|
||||
|
||||
def insert_many_txn(self, txn, table, headers, rows):
|
||||
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
|
||||
|
@ -227,7 +225,7 @@ class Porter(object):
|
|||
async def setup_table(self, table):
|
||||
if table in APPEND_ONLY_TABLES:
|
||||
# It's safe to just carry on inserting.
|
||||
row = await self.postgres_store.db.simple_select_one(
|
||||
row = await self.postgres_store.db_pool.simple_select_one(
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": table},
|
||||
retcols=("forward_rowid", "backward_rowid"),
|
||||
|
@ -244,7 +242,7 @@ class Porter(object):
|
|||
) = await self._setup_sent_transactions()
|
||||
backward_chunk = 0
|
||||
else:
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={
|
||||
"table_name": table,
|
||||
|
@ -274,7 +272,7 @@ class Porter(object):
|
|||
|
||||
await self.postgres_store.execute(delete_all)
|
||||
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
|
||||
)
|
||||
|
@ -318,7 +316,7 @@ class Porter(object):
|
|||
if table == "user_directory_stream_pos":
|
||||
# We need to make sure there is a single row, `(X, null), as that is
|
||||
# what synapse expects to be there.
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table=table, values={"stream_id": None}
|
||||
)
|
||||
self.progress.update(table, table_size) # Mark table as done
|
||||
|
@ -359,7 +357,7 @@ class Porter(object):
|
|||
|
||||
return headers, forward_rows, backward_rows
|
||||
|
||||
headers, frows, brows = await self.sqlite_store.db.runInteraction(
|
||||
headers, frows, brows = await self.sqlite_store.db_pool.runInteraction(
|
||||
"select", r
|
||||
)
|
||||
|
||||
|
@ -375,7 +373,7 @@ class Porter(object):
|
|||
def insert(txn):
|
||||
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
|
||||
|
||||
self.postgres_store.db.simple_update_one_txn(
|
||||
self.postgres_store.db_pool.simple_update_one_txn(
|
||||
txn,
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": table},
|
||||
|
@ -413,7 +411,7 @@ class Porter(object):
|
|||
|
||||
return headers, rows
|
||||
|
||||
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
|
||||
headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
|
||||
|
||||
if rows:
|
||||
forward_chunk = rows[-1][0] + 1
|
||||
|
@ -451,7 +449,7 @@ class Porter(object):
|
|||
],
|
||||
)
|
||||
|
||||
self.postgres_store.db.simple_update_one_txn(
|
||||
self.postgres_store.db_pool.simple_update_one_txn(
|
||||
txn,
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": "event_search"},
|
||||
|
@ -494,7 +492,7 @@ class Porter(object):
|
|||
db_conn, allow_outdated_version=allow_outdated_version
|
||||
)
|
||||
prepare_database(db_conn, engine, config=self.hs_config)
|
||||
store = Store(Database(hs, db_config, engine), db_conn, hs)
|
||||
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs)
|
||||
db_conn.commit()
|
||||
|
||||
return store
|
||||
|
@ -502,7 +500,7 @@ class Porter(object):
|
|||
async def run_background_updates_on_postgres(self):
|
||||
# Manually apply all background updates on the PostgreSQL database.
|
||||
postgres_ready = (
|
||||
await self.postgres_store.db.updates.has_completed_background_updates()
|
||||
await self.postgres_store.db_pool.updates.has_completed_background_updates()
|
||||
)
|
||||
|
||||
if not postgres_ready:
|
||||
|
@ -511,9 +509,9 @@ class Porter(object):
|
|||
self.progress.set_state("Running background updates on PostgreSQL")
|
||||
|
||||
while not postgres_ready:
|
||||
await self.postgres_store.db.updates.do_next_background_update(100)
|
||||
await self.postgres_store.db_pool.updates.do_next_background_update(100)
|
||||
postgres_ready = await (
|
||||
self.postgres_store.db.updates.has_completed_background_updates()
|
||||
self.postgres_store.db_pool.updates.has_completed_background_updates()
|
||||
)
|
||||
|
||||
async def run(self):
|
||||
|
@ -534,7 +532,7 @@ class Porter(object):
|
|||
|
||||
# Check if all background updates are done, abort if not.
|
||||
updates_complete = (
|
||||
await self.sqlite_store.db.updates.has_completed_background_updates()
|
||||
await self.sqlite_store.db_pool.updates.has_completed_background_updates()
|
||||
)
|
||||
if not updates_complete:
|
||||
end_error = (
|
||||
|
@ -576,22 +574,24 @@ class Porter(object):
|
|||
)
|
||||
|
||||
try:
|
||||
await self.postgres_store.db.runInteraction("alter_table", alter_table)
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"alter_table", alter_table
|
||||
)
|
||||
except Exception:
|
||||
# On Error Resume Next
|
||||
pass
|
||||
|
||||
await self.postgres_store.db.runInteraction(
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"create_port_table", create_port_table
|
||||
)
|
||||
|
||||
# Step 2. Get tables.
|
||||
self.progress.set_state("Fetching tables")
|
||||
sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
|
||||
sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
|
||||
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
|
||||
)
|
||||
|
||||
postgres_tables = await self.postgres_store.db.simple_select_onecol(
|
||||
postgres_tables = await self.postgres_store.db_pool.simple_select_onecol(
|
||||
table="information_schema.tables",
|
||||
keyvalues={},
|
||||
retcol="distinct table_name",
|
||||
|
@ -692,7 +692,7 @@ class Porter(object):
|
|||
|
||||
return headers, [r for r in rows if r[ts_ind] < yesterday]
|
||||
|
||||
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
|
||||
headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
|
||||
|
||||
rows = self._convert_rows("sent_transactions", headers, rows)
|
||||
|
||||
|
@ -725,7 +725,7 @@ class Porter(object):
|
|||
next_chunk = await self.sqlite_store.execute(get_start_id)
|
||||
next_chunk = max(max_inserted_rowid + 1, next_chunk)
|
||||
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={
|
||||
"table_name": "sent_transactions",
|
||||
|
@ -794,14 +794,14 @@ class Porter(object):
|
|||
next_id = curr_id + 1
|
||||
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
|
||||
|
||||
return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)
|
||||
return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
|
||||
|
||||
def _setup_user_id_seq(self):
|
||||
def r(txn):
|
||||
next_id = find_max_generated_user_id_localpart(txn) + 1
|
||||
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
|
||||
|
||||
return self.postgres_store.db.runInteraction("setup_user_id_seq", r)
|
||||
return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
|
||||
|
||||
|
||||
##############################################
|
||||
|
|
|
@ -13,12 +13,11 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import Optional
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import pymacaroons
|
||||
from netaddr import IPAddress
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.web.server import Request
|
||||
|
||||
import synapse.types
|
||||
|
@ -80,13 +79,14 @@ class Auth(object):
|
|||
self._track_appservice_user_ips = hs.config.track_appservice_user_ips
|
||||
self._macaroon_secret_key = hs.config.macaroon_secret_key
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_from_context(self, room_version: str, event, context, do_sig_check=True):
|
||||
prev_state_ids = yield defer.ensureDeferred(context.get_prev_state_ids())
|
||||
auth_events_ids = yield self.compute_auth_events(
|
||||
async def check_from_context(
|
||||
self, room_version: str, event, context, do_sig_check=True
|
||||
):
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
auth_events_ids = self.compute_auth_events(
|
||||
event, prev_state_ids, for_verification=True
|
||||
)
|
||||
auth_events = yield self.store.get_events(auth_events_ids)
|
||||
auth_events = await self.store.get_events(auth_events_ids)
|
||||
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
|
||||
|
||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||
|
@ -94,14 +94,13 @@ class Auth(object):
|
|||
room_version_obj, event, auth_events=auth_events, do_sig_check=do_sig_check
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_user_in_room(
|
||||
async def check_user_in_room(
|
||||
self,
|
||||
room_id: str,
|
||||
user_id: str,
|
||||
current_state: Optional[StateMap[EventBase]] = None,
|
||||
allow_departed_users: bool = False,
|
||||
):
|
||||
) -> EventBase:
|
||||
"""Check if the user is in the room, or was at some point.
|
||||
Args:
|
||||
room_id: The room to check.
|
||||
|
@ -119,37 +118,35 @@ class Auth(object):
|
|||
Raises:
|
||||
AuthError if the user is/was not in the room.
|
||||
Returns:
|
||||
Deferred[Optional[EventBase]]:
|
||||
Membership event for the user if the user was in the
|
||||
room. This will be the join event if they are currently joined to
|
||||
the room. This will be the leave event if they have left the room.
|
||||
Membership event for the user if the user was in the
|
||||
room. This will be the join event if they are currently joined to
|
||||
the room. This will be the leave event if they have left the room.
|
||||
"""
|
||||
if current_state:
|
||||
member = current_state.get((EventTypes.Member, user_id), None)
|
||||
else:
|
||||
member = yield defer.ensureDeferred(
|
||||
self.state.get_current_state(
|
||||
room_id=room_id, event_type=EventTypes.Member, state_key=user_id
|
||||
)
|
||||
member = await self.state.get_current_state(
|
||||
room_id=room_id, event_type=EventTypes.Member, state_key=user_id
|
||||
)
|
||||
membership = member.membership if member else None
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
return member
|
||||
if member:
|
||||
membership = member.membership
|
||||
|
||||
# XXX this looks totally bogus. Why do we not allow users who have been banned,
|
||||
# or those who were members previously and have been re-invited?
|
||||
if allow_departed_users and membership == Membership.LEAVE:
|
||||
forgot = yield self.store.did_forget(user_id, room_id)
|
||||
if not forgot:
|
||||
if membership == Membership.JOIN:
|
||||
return member
|
||||
|
||||
# XXX this looks totally bogus. Why do we not allow users who have been banned,
|
||||
# or those who were members previously and have been re-invited?
|
||||
if allow_departed_users and membership == Membership.LEAVE:
|
||||
forgot = await self.store.did_forget(user_id, room_id)
|
||||
if not forgot:
|
||||
return member
|
||||
|
||||
raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_host_in_room(self, room_id, host):
|
||||
async def check_host_in_room(self, room_id, host):
|
||||
with Measure(self.clock, "check_host_in_room"):
|
||||
latest_event_ids = yield self.store.is_host_joined(room_id, host)
|
||||
latest_event_ids = await self.store.is_host_joined(room_id, host)
|
||||
return latest_event_ids
|
||||
|
||||
def can_federate(self, event, auth_events):
|
||||
|
@ -160,14 +157,13 @@ class Auth(object):
|
|||
def get_public_keys(self, invite_event):
|
||||
return event_auth.get_public_keys(invite_event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_by_req(
|
||||
async def get_user_by_req(
|
||||
self,
|
||||
request: Request,
|
||||
allow_guest: bool = False,
|
||||
rights: str = "access",
|
||||
allow_expired: bool = False,
|
||||
):
|
||||
) -> synapse.types.Requester:
|
||||
""" Get a registered user's ID.
|
||||
|
||||
Args:
|
||||
|
@ -180,7 +176,7 @@ class Auth(object):
|
|||
/login will deliver access tokens regardless of expiration.
|
||||
|
||||
Returns:
|
||||
defer.Deferred: resolves to a `synapse.types.Requester` object
|
||||
Resolves to the requester
|
||||
Raises:
|
||||
InvalidClientCredentialsError if no user by that token exists or the token
|
||||
is invalid.
|
||||
|
@ -194,14 +190,14 @@ class Auth(object):
|
|||
|
||||
access_token = self.get_access_token_from_request(request)
|
||||
|
||||
user_id, app_service = yield self._get_appservice_user_id(request)
|
||||
user_id, app_service = await self._get_appservice_user_id(request)
|
||||
if user_id:
|
||||
request.authenticated_entity = user_id
|
||||
opentracing.set_tag("authenticated_entity", user_id)
|
||||
opentracing.set_tag("appservice_id", app_service.id)
|
||||
|
||||
if ip_addr and self._track_appservice_user_ips:
|
||||
yield self.store.insert_client_ip(
|
||||
await self.store.insert_client_ip(
|
||||
user_id=user_id,
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
|
@ -211,7 +207,7 @@ class Auth(object):
|
|||
|
||||
return synapse.types.create_requester(user_id, app_service=app_service)
|
||||
|
||||
user_info = yield self.get_user_by_access_token(
|
||||
user_info = await self.get_user_by_access_token(
|
||||
access_token, rights, allow_expired=allow_expired
|
||||
)
|
||||
user = user_info["user"]
|
||||
|
@ -221,7 +217,7 @@ class Auth(object):
|
|||
# Deny the request if the user account has expired.
|
||||
if self._account_validity.enabled and not allow_expired:
|
||||
user_id = user.to_string()
|
||||
expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
|
||||
expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
|
||||
if (
|
||||
expiration_ts is not None
|
||||
and self.clock.time_msec() >= expiration_ts
|
||||
|
@ -235,7 +231,7 @@ class Auth(object):
|
|||
device_id = user_info.get("device_id")
|
||||
|
||||
if user and access_token and ip_addr:
|
||||
yield self.store.insert_client_ip(
|
||||
await self.store.insert_client_ip(
|
||||
user_id=user.to_string(),
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
|
@ -261,8 +257,7 @@ class Auth(object):
|
|||
except KeyError:
|
||||
raise MissingClientTokenError()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_appservice_user_id(self, request):
|
||||
async def _get_appservice_user_id(self, request):
|
||||
app_service = self.store.get_app_service_by_token(
|
||||
self.get_access_token_from_request(request)
|
||||
)
|
||||
|
@ -283,14 +278,13 @@ class Auth(object):
|
|||
|
||||
if not app_service.is_interested_in_user(user_id):
|
||||
raise AuthError(403, "Application service cannot masquerade as this user.")
|
||||
if not (yield self.store.get_user_by_id(user_id)):
|
||||
if not (await self.store.get_user_by_id(user_id)):
|
||||
raise AuthError(403, "Application service has not registered this user")
|
||||
return user_id, app_service
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_by_access_token(
|
||||
async def get_user_by_access_token(
|
||||
self, token: str, rights: str = "access", allow_expired: bool = False,
|
||||
):
|
||||
) -> dict:
|
||||
""" Validate access token and get user_id from it
|
||||
|
||||
Args:
|
||||
|
@ -300,7 +294,7 @@ class Auth(object):
|
|||
allow_expired: If False, raises an InvalidClientTokenError
|
||||
if the token is expired
|
||||
Returns:
|
||||
Deferred[dict]: dict that includes:
|
||||
dict that includes:
|
||||
`user` (UserID)
|
||||
`is_guest` (bool)
|
||||
`token_id` (int|None): access token id. May be None if guest
|
||||
|
@ -314,7 +308,7 @@ class Auth(object):
|
|||
|
||||
if rights == "access":
|
||||
# first look in the database
|
||||
r = yield self._look_up_user_by_access_token(token)
|
||||
r = await self._look_up_user_by_access_token(token)
|
||||
if r:
|
||||
valid_until_ms = r["valid_until_ms"]
|
||||
if (
|
||||
|
@ -352,7 +346,7 @@ class Auth(object):
|
|||
# It would of course be much easier to store guest access
|
||||
# tokens in the database as well, but that would break existing
|
||||
# guest tokens.
|
||||
stored_user = yield self.store.get_user_by_id(user_id)
|
||||
stored_user = await self.store.get_user_by_id(user_id)
|
||||
if not stored_user:
|
||||
raise InvalidClientTokenError("Unknown user_id %s" % user_id)
|
||||
if not stored_user["is_guest"]:
|
||||
|
@ -482,9 +476,8 @@ class Auth(object):
|
|||
now = self.hs.get_clock().time_msec()
|
||||
return now < expiry
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _look_up_user_by_access_token(self, token):
|
||||
ret = yield self.store.get_user_by_access_token(token)
|
||||
async def _look_up_user_by_access_token(self, token):
|
||||
ret = await self.store.get_user_by_access_token(token)
|
||||
if not ret:
|
||||
return None
|
||||
|
||||
|
@ -507,7 +500,7 @@ class Auth(object):
|
|||
logger.warning("Unrecognised appservice access token.")
|
||||
raise InvalidClientTokenError()
|
||||
request.authenticated_entity = service.sender
|
||||
return defer.succeed(service)
|
||||
return service
|
||||
|
||||
async def is_server_admin(self, user: UserID) -> bool:
|
||||
""" Check if the given user is a local server admin.
|
||||
|
@ -522,7 +515,7 @@ class Auth(object):
|
|||
|
||||
def compute_auth_events(
|
||||
self, event, current_state_ids: StateMap[str], for_verification: bool = False,
|
||||
):
|
||||
) -> List[str]:
|
||||
"""Given an event and current state return the list of event IDs used
|
||||
to auth an event.
|
||||
|
||||
|
@ -530,11 +523,11 @@ class Auth(object):
|
|||
should be added to the event's `auth_events`.
|
||||
|
||||
Returns:
|
||||
defer.Deferred(list[str]): List of event IDs.
|
||||
List of event IDs.
|
||||
"""
|
||||
|
||||
if event.type == EventTypes.Create:
|
||||
return defer.succeed([])
|
||||
return []
|
||||
|
||||
# Currently we ignore the `for_verification` flag even though there are
|
||||
# some situations where we can drop particular auth events when adding
|
||||
|
@ -553,7 +546,7 @@ class Auth(object):
|
|||
if auth_ev_id:
|
||||
auth_ids.append(auth_ev_id)
|
||||
|
||||
return defer.succeed(auth_ids)
|
||||
return auth_ids
|
||||
|
||||
async def check_can_change_room_list(self, room_id: str, user: UserID):
|
||||
"""Determine whether the user is allowed to edit the room's entry in the
|
||||
|
@ -636,10 +629,9 @@ class Auth(object):
|
|||
|
||||
return query_params[0].decode("ascii")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_user_in_room_or_world_readable(
|
||||
async def check_user_in_room_or_world_readable(
|
||||
self, room_id: str, user_id: str, allow_departed_users: bool = False
|
||||
):
|
||||
) -> Tuple[str, Optional[str]]:
|
||||
"""Checks that the user is or was in the room or the room is world
|
||||
readable. If it isn't then an exception is raised.
|
||||
|
||||
|
@ -650,10 +642,9 @@ class Auth(object):
|
|||
members but have now departed
|
||||
|
||||
Returns:
|
||||
Deferred[tuple[str, str|None]]: Resolves to the current membership of
|
||||
the user in the room and the membership event ID of the user. If
|
||||
the user is not in the room and never has been, then
|
||||
`(Membership.JOIN, None)` is returned.
|
||||
Resolves to the current membership of the user in the room and the
|
||||
membership event ID of the user. If the user is not in the room and
|
||||
never has been, then `(Membership.JOIN, None)` is returned.
|
||||
"""
|
||||
|
||||
try:
|
||||
|
@ -662,15 +653,13 @@ class Auth(object):
|
|||
# * The user is a non-guest user, and was ever in the room
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
member_event = yield self.check_user_in_room(
|
||||
member_event = await self.check_user_in_room(
|
||||
room_id, user_id, allow_departed_users=allow_departed_users
|
||||
)
|
||||
return member_event.membership, member_event.event_id
|
||||
except AuthError:
|
||||
visibility = yield defer.ensureDeferred(
|
||||
self.state.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
)
|
||||
visibility = await self.state.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
)
|
||||
if (
|
||||
visibility
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import LimitBlockingTypes, UserTypes
|
||||
from synapse.api.errors import Codes, ResourceLimitError
|
||||
from synapse.config.server import is_threepid_reserved
|
||||
|
@ -36,8 +34,7 @@ class AuthBlocking(object):
|
|||
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
|
||||
self._mau_limits_reserved_threepids = hs.config.mau_limits_reserved_threepids
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
|
||||
async def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
|
||||
"""Checks if the user should be rejected for some external reason,
|
||||
such as monthly active user limiting or global disable flag
|
||||
|
||||
|
@ -60,7 +57,7 @@ class AuthBlocking(object):
|
|||
if user_id is not None:
|
||||
if user_id == self._server_notices_mxid:
|
||||
return
|
||||
if (yield self.store.is_support_user(user_id)):
|
||||
if await self.store.is_support_user(user_id):
|
||||
return
|
||||
|
||||
if self._hs_disabled:
|
||||
|
@ -76,11 +73,11 @@ class AuthBlocking(object):
|
|||
|
||||
# If the user is already part of the MAU cohort or a trial user
|
||||
if user_id:
|
||||
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
|
||||
timestamp = await self.store.user_last_seen_monthly_active(user_id)
|
||||
if timestamp:
|
||||
return
|
||||
|
||||
is_trial = yield self.store.is_trial_user(user_id)
|
||||
is_trial = await self.store.is_trial_user(user_id)
|
||||
if is_trial:
|
||||
return
|
||||
elif threepid:
|
||||
|
@ -93,7 +90,7 @@ class AuthBlocking(object):
|
|||
# allow registration. Support users are excluded from MAU checks.
|
||||
return
|
||||
# Else if there is no room in the MAU bucket, bail
|
||||
current_mau = yield self.store.get_monthly_active_count()
|
||||
current_mau = await self.store.get_monthly_active_count()
|
||||
if current_mau >= self._max_mau_value:
|
||||
raise ResourceLimitError(
|
||||
403,
|
||||
|
|
|
@ -238,14 +238,16 @@ class InteractiveAuthIncompleteError(Exception):
|
|||
(This indicates we should return a 401 with 'result' as the body)
|
||||
|
||||
Attributes:
|
||||
session_id: The ID of the ongoing interactive auth session.
|
||||
result: the server response to the request, which should be
|
||||
passed back to the client
|
||||
"""
|
||||
|
||||
def __init__(self, result: "JsonDict"):
|
||||
def __init__(self, session_id: str, result: "JsonDict"):
|
||||
super(InteractiveAuthIncompleteError, self).__init__(
|
||||
"Interactive auth not yet complete"
|
||||
)
|
||||
self.session_id = session_id
|
||||
self.result = result
|
||||
|
||||
|
||||
|
|
|
@ -21,8 +21,6 @@ import jsonschema
|
|||
from canonicaljson import json
|
||||
from jsonschema import FormatChecker
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventContentFields
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
|
@ -137,9 +135,8 @@ class Filtering(object):
|
|||
super(Filtering, self).__init__()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_filter(self, user_localpart, filter_id):
|
||||
result = yield self.store.get_user_filter(user_localpart, filter_id)
|
||||
async def get_user_filter(self, user_localpart, filter_id):
|
||||
result = await self.store.get_user_filter(user_localpart, filter_id)
|
||||
return FilterCollection(result)
|
||||
|
||||
def add_user_filter(self, user_localpart, user_filter):
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import gc
|
||||
import logging
|
||||
import os
|
||||
|
@ -22,7 +21,6 @@ import sys
|
|||
import traceback
|
||||
from typing import Iterable
|
||||
|
||||
from daemonize import Daemonize
|
||||
from typing_extensions import NoReturn
|
||||
|
||||
from twisted.internet import defer, error, reactor
|
||||
|
@ -34,6 +32,7 @@ from synapse.config.server import ListenerConfig
|
|||
from synapse.crypto import context_factory
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.daemonize import daemonize_process
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
@ -129,17 +128,8 @@ def start_reactor(
|
|||
if print_pidfile:
|
||||
print(pid_file)
|
||||
|
||||
daemon = Daemonize(
|
||||
app=appname,
|
||||
pid=pid_file,
|
||||
action=run,
|
||||
auto_close_fds=False,
|
||||
verbose=True,
|
||||
logger=logger,
|
||||
)
|
||||
daemon.start()
|
||||
else:
|
||||
run()
|
||||
daemonize_process(pid_file, logger)
|
||||
run()
|
||||
|
||||
|
||||
def quit_with_error(error_string: str) -> NoReturn:
|
||||
|
@ -278,7 +268,7 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
|||
|
||||
# It is now safe to start your Synapse.
|
||||
hs.start_listening(listeners)
|
||||
hs.get_datastore().db.start_profiling()
|
||||
hs.get_datastore().db_pool.start_profiling()
|
||||
hs.get_pusherpool().start()
|
||||
|
||||
setup_sentry(hs)
|
||||
|
|
|
@ -123,17 +123,18 @@ from synapse.rest.client.v2_alpha.account_data import (
|
|||
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
|
||||
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
|
||||
from synapse.rest.client.versions import VersionsRestServlet
|
||||
from synapse.rest.health import HealthResource
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.censor_events import CensorEventsStore
|
||||
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
from synapse.server import HomeServer, cache_in_self
|
||||
from synapse.storage.databases.main.censor_events import CensorEventsStore
|
||||
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
|
||||
from synapse.storage.databases.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.presence import UserPresenceState
|
||||
from synapse.storage.data_stores.main.search import SearchWorkerStore
|
||||
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
|
||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||
from synapse.storage.databases.main.presence import UserPresenceState
|
||||
from synapse.storage.databases.main.search import SearchWorkerStore
|
||||
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
|
||||
from synapse.storage.databases.main.user_directory import UserDirectoryStore
|
||||
from synapse.types import ReadReceipt
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
|
@ -493,7 +494,10 @@ class GenericWorkerServer(HomeServer):
|
|||
site_tag = listener_config.http_options.tag
|
||||
if site_tag is None:
|
||||
site_tag = port
|
||||
resources = {}
|
||||
|
||||
# We always include a health resource.
|
||||
resources = {"/health": HealthResource()}
|
||||
|
||||
for res in listener_config.http_options.resources:
|
||||
for name in res.names:
|
||||
if name == "metrics":
|
||||
|
@ -631,10 +635,12 @@ class GenericWorkerServer(HomeServer):
|
|||
async def remove_pusher(self, app_id, push_key, user_id):
|
||||
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
def build_replication_data_handler(self):
|
||||
@cache_in_self
|
||||
def get_replication_data_handler(self):
|
||||
return GenericWorkerReplicationHandler(self)
|
||||
|
||||
def build_presence_handler(self):
|
||||
@cache_in_self
|
||||
def get_presence_handler(self):
|
||||
return GenericWorkerPresence(self)
|
||||
|
||||
|
||||
|
|
|
@ -68,6 +68,7 @@ from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
|||
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
||||
from synapse.rest import ClientRestResource
|
||||
from synapse.rest.admin import AdminRestResource
|
||||
from synapse.rest.health import HealthResource
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.rest.well_known import WellKnownResource
|
||||
from synapse.server import HomeServer
|
||||
|
@ -98,7 +99,9 @@ class SynapseHomeServer(HomeServer):
|
|||
if site_tag is None:
|
||||
site_tag = port
|
||||
|
||||
resources = {}
|
||||
# We always include a health resource.
|
||||
resources = {"/health": HealthResource()}
|
||||
|
||||
for res in listener_config.http_options.resources:
|
||||
for name in res.names:
|
||||
if name == "openid" and "federation" in res.names:
|
||||
|
@ -380,13 +383,12 @@ def setup(config_options):
|
|||
|
||||
hs.setup_master()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_acme():
|
||||
async def do_acme() -> bool:
|
||||
"""
|
||||
Reprovision an ACME certificate, if it's required.
|
||||
|
||||
Returns:
|
||||
Deferred[bool]: Whether the cert has been updated.
|
||||
Whether the cert has been updated.
|
||||
"""
|
||||
acme = hs.get_acme_handler()
|
||||
|
||||
|
@ -405,7 +407,7 @@ def setup(config_options):
|
|||
provision = True
|
||||
|
||||
if provision:
|
||||
yield acme.provision_certificate()
|
||||
await acme.provision_certificate()
|
||||
|
||||
return provision
|
||||
|
||||
|
@ -415,7 +417,7 @@ def setup(config_options):
|
|||
Provision a certificate from ACME, if required, and reload the TLS
|
||||
certificate if it's renewed.
|
||||
"""
|
||||
reprovisioned = yield do_acme()
|
||||
reprovisioned = yield defer.ensureDeferred(do_acme())
|
||||
if reprovisioned:
|
||||
_base.refresh_certificate(hs)
|
||||
|
||||
|
@ -427,8 +429,8 @@ def setup(config_options):
|
|||
acme = hs.get_acme_handler()
|
||||
# Start up the webservices which we will respond to ACME
|
||||
# challenges with, and then provision.
|
||||
yield acme.start_listening()
|
||||
yield do_acme()
|
||||
yield defer.ensureDeferred(acme.start_listening())
|
||||
yield defer.ensureDeferred(do_acme())
|
||||
|
||||
# Check if it needs to be reprovisioned every day.
|
||||
hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
|
||||
|
@ -442,7 +444,7 @@ def setup(config_options):
|
|||
|
||||
_base.start(hs, config.listeners)
|
||||
|
||||
hs.get_datastore().db.updates.start_doing_background_updates()
|
||||
hs.get_datastore().db_pool.updates.start_doing_background_updates()
|
||||
except Exception:
|
||||
# Print the exception and bail out.
|
||||
print("Error during startup:", file=sys.stderr)
|
||||
|
@ -552,8 +554,8 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process):
|
|||
#
|
||||
|
||||
# This only reports info about the *main* database.
|
||||
stats["database_engine"] = hs.get_datastore().db.engine.module.__name__
|
||||
stats["database_server_version"] = hs.get_datastore().db.engine.server_version
|
||||
stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__
|
||||
stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version
|
||||
|
||||
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
|
||||
try:
|
||||
|
|
|
@ -175,7 +175,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
|||
urllib.parse.quote(protocol),
|
||||
)
|
||||
try:
|
||||
info = yield self.get_json(uri, {})
|
||||
info = yield defer.ensureDeferred(self.get_json(uri, {}))
|
||||
|
||||
if not _is_valid_3pe_metadata(info):
|
||||
logger.warning(
|
||||
|
|
49
synapse/config/_util.py
Normal file
49
synapse/config/_util.py
Normal file
|
@ -0,0 +1,49 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Any, List
|
||||
|
||||
import jsonschema
|
||||
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.types import JsonDict
|
||||
|
||||
|
||||
def validate_config(json_schema: JsonDict, config: Any, config_path: List[str]) -> None:
|
||||
"""Validates a config setting against a JsonSchema definition
|
||||
|
||||
This can be used to validate a section of the config file against a schema
|
||||
definition. If the validation fails, a ConfigError is raised with a textual
|
||||
description of the problem.
|
||||
|
||||
Args:
|
||||
json_schema: the schema to validate against
|
||||
config: the configuration value to be validated
|
||||
config_path: the path within the config file. This will be used as a basis
|
||||
for the error message.
|
||||
"""
|
||||
try:
|
||||
jsonschema.validate(config, json_schema)
|
||||
except jsonschema.ValidationError as e:
|
||||
# copy `config_path` before modifying it.
|
||||
path = list(config_path)
|
||||
for p in list(e.path):
|
||||
if isinstance(p, int):
|
||||
path.append("<item %i>" % p)
|
||||
else:
|
||||
path.append(str(p))
|
||||
|
||||
raise ConfigError(
|
||||
"Unable to parse configuration: %s at %s" % (e.message, ".".join(path))
|
||||
)
|
|
@ -100,7 +100,10 @@ class DatabaseConnectionConfig:
|
|||
|
||||
self.name = name
|
||||
self.config = db_config
|
||||
self.data_stores = data_stores
|
||||
|
||||
# The `data_stores` config is actually talking about `databases` (we
|
||||
# changed the name).
|
||||
self.databases = data_stores
|
||||
|
||||
|
||||
class DatabaseConfig(Config):
|
||||
|
|
|
@ -55,24 +55,33 @@ formatters:
|
|||
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - \
|
||||
%(request)s - %(message)s'
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
file:
|
||||
class: logging.handlers.RotatingFileHandler
|
||||
class: logging.handlers.TimedRotatingFileHandler
|
||||
formatter: precise
|
||||
filename: ${log_file}
|
||||
maxBytes: 104857600
|
||||
backupCount: 10
|
||||
filters: [context]
|
||||
when: midnight
|
||||
backupCount: 3 # Does not include the current log file.
|
||||
encoding: utf8
|
||||
|
||||
# Default to buffering writes to log file for efficiency. This means that
|
||||
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
|
||||
# logs will still be flushed immediately.
|
||||
buffer:
|
||||
class: logging.handlers.MemoryHandler
|
||||
target: file
|
||||
# The capacity is the number of log lines that are buffered before
|
||||
# being written to disk. Increasing this will lead to better
|
||||
# performance, at the expensive of it taking longer for log lines to
|
||||
# be written to disk.
|
||||
capacity: 10
|
||||
flushLevel: 30 # Flush for WARNING logs as well
|
||||
|
||||
# A handler that writes logs to stderr. Unused by default, but can be used
|
||||
# instead of "buffer" and "file" in the logger handlers.
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
formatter: precise
|
||||
filters: [context]
|
||||
|
||||
loggers:
|
||||
synapse.storage.SQL:
|
||||
|
@ -80,9 +89,24 @@ loggers:
|
|||
# information such as access tokens.
|
||||
level: INFO
|
||||
|
||||
twisted:
|
||||
# We send the twisted logging directly to the file handler,
|
||||
# to work around https://github.com/matrix-org/synapse/issues/3471
|
||||
# when using "buffer" logger. Use "console" to log to stderr instead.
|
||||
handlers: [file]
|
||||
propagate: false
|
||||
|
||||
root:
|
||||
level: INFO
|
||||
handlers: [file, console]
|
||||
|
||||
# Write logs to the `buffer` handler, which will buffer them together in memory,
|
||||
# then write them to a file.
|
||||
#
|
||||
# Replace "buffer" with "console" to log to stderr instead. (Note that you'll
|
||||
# also need to update the configuation for the `twisted` logger above, in
|
||||
# this case.)
|
||||
#
|
||||
handlers: [buffer]
|
||||
|
||||
disable_existing_loggers: false
|
||||
"""
|
||||
|
@ -168,11 +192,26 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
|
|||
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
handler.addFilter(LoggingContextFilter(request=""))
|
||||
logger.addHandler(handler)
|
||||
else:
|
||||
logging.config.dictConfig(log_config)
|
||||
|
||||
# We add a log record factory that runs all messages through the
|
||||
# LoggingContextFilter so that we get the context *at the time we log*
|
||||
# rather than when we write to a handler. This can be done in config using
|
||||
# filter options, but care must when using e.g. MemoryHandler to buffer
|
||||
# writes.
|
||||
|
||||
log_filter = LoggingContextFilter(request="")
|
||||
old_factory = logging.getLogRecordFactory()
|
||||
|
||||
def factory(*args, **kwargs):
|
||||
record = old_factory(*args, **kwargs)
|
||||
log_filter.filter(record)
|
||||
return record
|
||||
|
||||
logging.setLogRecordFactory(factory)
|
||||
|
||||
# Route Twisted's native logging through to the standard library logging
|
||||
# system.
|
||||
observer = STDLibLogObserver()
|
||||
|
|
|
@ -93,6 +93,15 @@ class RatelimitConfig(Config):
|
|||
if rc_admin_redaction:
|
||||
self.rc_admin_redaction = RateLimitConfig(rc_admin_redaction)
|
||||
|
||||
self.rc_joins_local = RateLimitConfig(
|
||||
config.get("rc_joins", {}).get("local", {}),
|
||||
defaults={"per_second": 0.1, "burst_count": 3},
|
||||
)
|
||||
self.rc_joins_remote = RateLimitConfig(
|
||||
config.get("rc_joins", {}).get("remote", {}),
|
||||
defaults={"per_second": 0.01, "burst_count": 3},
|
||||
)
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
## Ratelimiting ##
|
||||
|
@ -118,6 +127,10 @@ class RatelimitConfig(Config):
|
|||
# - one for ratelimiting redactions by room admins. If this is not explicitly
|
||||
# set then it uses the same ratelimiting as per rc_message. This is useful
|
||||
# to allow room admins to deal with abuse quickly.
|
||||
# - two for ratelimiting number of rooms a user can join, "local" for when
|
||||
# users are joining rooms the server is already in (this is cheap) vs
|
||||
# "remote" for when users are trying to join rooms not on the server (which
|
||||
# can be more expensive)
|
||||
#
|
||||
# The defaults are as shown below.
|
||||
#
|
||||
|
@ -143,6 +156,14 @@ class RatelimitConfig(Config):
|
|||
#rc_admin_redaction:
|
||||
# per_second: 1
|
||||
# burst_count: 50
|
||||
#
|
||||
#rc_joins:
|
||||
# local:
|
||||
# per_second: 0.1
|
||||
# burst_count: 3
|
||||
# remote:
|
||||
# per_second: 0.01
|
||||
# burst_count: 3
|
||||
|
||||
|
||||
# Ratelimiting settings for incoming federation
|
||||
|
|
|
@ -15,7 +15,9 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Any, List
|
||||
|
||||
import attr
|
||||
import jinja2
|
||||
import pkg_resources
|
||||
|
||||
|
@ -23,6 +25,7 @@ from synapse.python_dependencies import DependencyException, check_requirements
|
|||
from synapse.util.module_loader import load_module, load_python_module
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
from ._util import validate_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -80,6 +83,11 @@ class SAML2Config(Config):
|
|||
|
||||
self.saml2_enabled = True
|
||||
|
||||
attribute_requirements = saml2_config.get("attribute_requirements") or []
|
||||
self.attribute_requirements = _parse_attribute_requirements_def(
|
||||
attribute_requirements
|
||||
)
|
||||
|
||||
self.saml2_grandfathered_mxid_source_attribute = saml2_config.get(
|
||||
"grandfathered_mxid_source_attribute", "uid"
|
||||
)
|
||||
|
@ -341,6 +349,17 @@ class SAML2Config(Config):
|
|||
#
|
||||
#grandfathered_mxid_source_attribute: upn
|
||||
|
||||
# It is possible to configure Synapse to only allow logins if SAML attributes
|
||||
# match particular values. The requirements can be listed under
|
||||
# `attribute_requirements` as shown below. All of the listed attributes must
|
||||
# match for the login to be permitted.
|
||||
#
|
||||
#attribute_requirements:
|
||||
# - attribute: userGroup
|
||||
# value: "staff"
|
||||
# - attribute: department
|
||||
# value: "sales"
|
||||
|
||||
# Directory in which Synapse will try to find the template files below.
|
||||
# If not set, default templates from within the Synapse package will be used.
|
||||
#
|
||||
|
@ -368,3 +387,34 @@ class SAML2Config(Config):
|
|||
""" % {
|
||||
"config_dir_path": config_dir_path
|
||||
}
|
||||
|
||||
|
||||
@attr.s(frozen=True)
|
||||
class SamlAttributeRequirement:
|
||||
"""Object describing a single requirement for SAML attributes."""
|
||||
|
||||
attribute = attr.ib(type=str)
|
||||
value = attr.ib(type=str)
|
||||
|
||||
JSON_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {"attribute": {"type": "string"}, "value": {"type": "string"}},
|
||||
"required": ["attribute", "value"],
|
||||
}
|
||||
|
||||
|
||||
ATTRIBUTE_REQUIREMENTS_SCHEMA = {
|
||||
"type": "array",
|
||||
"items": SamlAttributeRequirement.JSON_SCHEMA,
|
||||
}
|
||||
|
||||
|
||||
def _parse_attribute_requirements_def(
|
||||
attribute_requirements: Any,
|
||||
) -> List[SamlAttributeRequirement]:
|
||||
validate_config(
|
||||
ATTRIBUTE_REQUIREMENTS_SCHEMA,
|
||||
attribute_requirements,
|
||||
config_path=["saml2_config", "attribute_requirements"],
|
||||
)
|
||||
return [SamlAttributeRequirement(**x) for x in attribute_requirements]
|
||||
|
|
|
@ -530,6 +530,21 @@ class ServerConfig(Config):
|
|||
"request_token_inhibit_3pid_errors", False,
|
||||
)
|
||||
|
||||
# List of users trialing the new experimental default push rules. This setting is
|
||||
# not included in the sample configuration file on purpose as it's a temporary
|
||||
# hack, so that some users can trial the new defaults without impacting every
|
||||
# user on the homeserver.
|
||||
users_new_default_push_rules = (
|
||||
config.get("users_new_default_push_rules") or []
|
||||
) # type: list
|
||||
if not isinstance(users_new_default_push_rules, list):
|
||||
raise ConfigError("'users_new_default_push_rules' must be a list")
|
||||
|
||||
# Turn the list into a set to improve lookup speed.
|
||||
self.users_new_default_push_rules = set(
|
||||
users_new_default_push_rules
|
||||
) # type: set
|
||||
|
||||
def has_tls_listener(self) -> bool:
|
||||
return any(listener.tls for listener in self.listeners)
|
||||
|
||||
|
|
|
@ -48,6 +48,14 @@ class ServerContextFactory(ContextFactory):
|
|||
connections."""
|
||||
|
||||
def __init__(self, config):
|
||||
# TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version,
|
||||
# switch to those (see https://github.com/pyca/cryptography/issues/5379).
|
||||
#
|
||||
# note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2
|
||||
# or v3, but is a synonym for TLS_METHOD, which allows the client and server
|
||||
# to negotiate an appropriate version of TLS constrained by the version options
|
||||
# set with context.set_options.
|
||||
#
|
||||
self._context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
self.configure_context(self._context, config)
|
||||
|
||||
|
|
|
@ -223,8 +223,7 @@ class Keyring(object):
|
|||
|
||||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_key_lookups(self, verify_requests):
|
||||
async def _start_key_lookups(self, verify_requests):
|
||||
"""Sets off the key fetches for each verify request
|
||||
|
||||
Once each fetch completes, verify_request.key_ready will be resolved.
|
||||
|
@ -245,7 +244,7 @@ class Keyring(object):
|
|||
server_to_request_ids.setdefault(server_name, set()).add(request_id)
|
||||
|
||||
# Wait for any previous lookups to complete before proceeding.
|
||||
yield self.wait_for_previous_lookups(server_to_request_ids.keys())
|
||||
await self.wait_for_previous_lookups(server_to_request_ids.keys())
|
||||
|
||||
# take out a lock on each of the servers by sticking a Deferred in
|
||||
# key_downloads
|
||||
|
@ -283,15 +282,14 @@ class Keyring(object):
|
|||
except Exception:
|
||||
logger.exception("Error starting key lookups")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_previous_lookups(self, server_names):
|
||||
async def wait_for_previous_lookups(self, server_names) -> None:
|
||||
"""Waits for any previous key lookups for the given servers to finish.
|
||||
|
||||
Args:
|
||||
server_names (Iterable[str]): list of servers which we want to look up
|
||||
|
||||
Returns:
|
||||
Deferred[None]: resolves once all key lookups for the given servers have
|
||||
Resolves once all key lookups for the given servers have
|
||||
completed. Follows the synapse rules of logcontext preservation.
|
||||
"""
|
||||
loop_count = 1
|
||||
|
@ -309,7 +307,7 @@ class Keyring(object):
|
|||
loop_count,
|
||||
)
|
||||
with PreserveLoggingContext():
|
||||
yield defer.DeferredList((w[1] for w in wait_on))
|
||||
await defer.DeferredList((w[1] for w in wait_on))
|
||||
|
||||
loop_count += 1
|
||||
|
||||
|
@ -326,44 +324,44 @@ class Keyring(object):
|
|||
|
||||
remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_iterations():
|
||||
with Measure(self.clock, "get_server_verify_keys"):
|
||||
for f in self._key_fetchers:
|
||||
if not remaining_requests:
|
||||
return
|
||||
yield self._attempt_key_fetches_with_fetcher(f, remaining_requests)
|
||||
|
||||
# look for any requests which weren't satisfied
|
||||
with PreserveLoggingContext():
|
||||
for verify_request in remaining_requests:
|
||||
verify_request.key_ready.errback(
|
||||
SynapseError(
|
||||
401,
|
||||
"No key for %s with ids in %s (min_validity %i)"
|
||||
% (
|
||||
verify_request.server_name,
|
||||
verify_request.key_ids,
|
||||
verify_request.minimum_valid_until_ts,
|
||||
),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
async def do_iterations():
|
||||
try:
|
||||
with Measure(self.clock, "get_server_verify_keys"):
|
||||
for f in self._key_fetchers:
|
||||
if not remaining_requests:
|
||||
return
|
||||
await self._attempt_key_fetches_with_fetcher(
|
||||
f, remaining_requests
|
||||
)
|
||||
|
||||
def on_err(err):
|
||||
# we don't really expect to get here, because any errors should already
|
||||
# have been caught and logged. But if we do, let's log the error and make
|
||||
# sure that all of the deferreds are resolved.
|
||||
logger.error("Unexpected error in _get_server_verify_keys: %s", err)
|
||||
with PreserveLoggingContext():
|
||||
for verify_request in remaining_requests:
|
||||
if not verify_request.key_ready.called:
|
||||
verify_request.key_ready.errback(err)
|
||||
# look for any requests which weren't satisfied
|
||||
with PreserveLoggingContext():
|
||||
for verify_request in remaining_requests:
|
||||
verify_request.key_ready.errback(
|
||||
SynapseError(
|
||||
401,
|
||||
"No key for %s with ids in %s (min_validity %i)"
|
||||
% (
|
||||
verify_request.server_name,
|
||||
verify_request.key_ids,
|
||||
verify_request.minimum_valid_until_ts,
|
||||
),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
)
|
||||
except Exception as err:
|
||||
# we don't really expect to get here, because any errors should already
|
||||
# have been caught and logged. But if we do, let's log the error and make
|
||||
# sure that all of the deferreds are resolved.
|
||||
logger.error("Unexpected error in _get_server_verify_keys: %s", err)
|
||||
with PreserveLoggingContext():
|
||||
for verify_request in remaining_requests:
|
||||
if not verify_request.key_ready.called:
|
||||
verify_request.key_ready.errback(err)
|
||||
|
||||
run_in_background(do_iterations).addErrback(on_err)
|
||||
run_in_background(do_iterations)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests):
|
||||
async def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests):
|
||||
"""Use a key fetcher to attempt to satisfy some key requests
|
||||
|
||||
Args:
|
||||
|
@ -390,7 +388,7 @@ class Keyring(object):
|
|||
verify_request.minimum_valid_until_ts,
|
||||
)
|
||||
|
||||
results = yield fetcher.get_keys(missing_keys)
|
||||
results = await fetcher.get_keys(missing_keys)
|
||||
|
||||
completed = []
|
||||
for verify_request in remaining_requests:
|
||||
|
@ -423,7 +421,7 @@ class Keyring(object):
|
|||
|
||||
|
||||
class KeyFetcher(object):
|
||||
def get_keys(self, keys_to_fetch):
|
||||
async def get_keys(self, keys_to_fetch):
|
||||
"""
|
||||
Args:
|
||||
keys_to_fetch (dict[str, dict[str, int]]):
|
||||
|
@ -442,8 +440,7 @@ class StoreKeyFetcher(KeyFetcher):
|
|||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_keys(self, keys_to_fetch):
|
||||
async def get_keys(self, keys_to_fetch):
|
||||
"""see KeyFetcher.get_keys"""
|
||||
|
||||
keys_to_fetch = (
|
||||
|
@ -452,7 +449,7 @@ class StoreKeyFetcher(KeyFetcher):
|
|||
for key_id in keys_for_server.keys()
|
||||
)
|
||||
|
||||
res = yield self.store.get_server_verify_keys(keys_to_fetch)
|
||||
res = await self.store.get_server_verify_keys(keys_to_fetch)
|
||||
keys = {}
|
||||
for (server_name, key_id), key in res.items():
|
||||
keys.setdefault(server_name, {})[key_id] = key
|
||||
|
@ -464,8 +461,7 @@ class BaseV2KeyFetcher(object):
|
|||
self.store = hs.get_datastore()
|
||||
self.config = hs.get_config()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process_v2_response(self, from_server, response_json, time_added_ms):
|
||||
async def process_v2_response(self, from_server, response_json, time_added_ms):
|
||||
"""Parse a 'Server Keys' structure from the result of a /key request
|
||||
|
||||
This is used to parse either the entirety of the response from
|
||||
|
@ -537,7 +533,7 @@ class BaseV2KeyFetcher(object):
|
|||
|
||||
key_json_bytes = encode_canonical_json(response_json)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
|
@ -567,14 +563,12 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
self.client = hs.get_http_client()
|
||||
self.key_servers = self.config.key_servers
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_keys(self, keys_to_fetch):
|
||||
async def get_keys(self, keys_to_fetch):
|
||||
"""see KeyFetcher.get_keys"""
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_key(key_server):
|
||||
async def get_key(key_server):
|
||||
try:
|
||||
result = yield self.get_server_verify_key_v2_indirect(
|
||||
result = await self.get_server_verify_key_v2_indirect(
|
||||
keys_to_fetch, key_server
|
||||
)
|
||||
return result
|
||||
|
@ -592,7 +586,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
|
||||
return {}
|
||||
|
||||
results = yield make_deferred_yieldable(
|
||||
results = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[run_in_background(get_key, server) for server in self.key_servers],
|
||||
consumeErrors=True,
|
||||
|
@ -606,8 +600,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
|
||||
return union_of_keys
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server):
|
||||
async def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server):
|
||||
"""
|
||||
Args:
|
||||
keys_to_fetch (dict[str, dict[str, int]]):
|
||||
|
@ -617,7 +610,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
the keys
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map
|
||||
dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]: map
|
||||
from server_name -> key_id -> FetchKeyResult
|
||||
|
||||
Raises:
|
||||
|
@ -632,20 +625,18 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
)
|
||||
|
||||
try:
|
||||
query_response = yield defer.ensureDeferred(
|
||||
self.client.post_json(
|
||||
destination=perspective_name,
|
||||
path="/_matrix/key/v2/query",
|
||||
data={
|
||||
"server_keys": {
|
||||
server_name: {
|
||||
key_id: {"minimum_valid_until_ts": min_valid_ts}
|
||||
for key_id, min_valid_ts in server_keys.items()
|
||||
}
|
||||
for server_name, server_keys in keys_to_fetch.items()
|
||||
query_response = await self.client.post_json(
|
||||
destination=perspective_name,
|
||||
path="/_matrix/key/v2/query",
|
||||
data={
|
||||
"server_keys": {
|
||||
server_name: {
|
||||
key_id: {"minimum_valid_until_ts": min_valid_ts}
|
||||
for key_id, min_valid_ts in server_keys.items()
|
||||
}
|
||||
},
|
||||
)
|
||||
for server_name, server_keys in keys_to_fetch.items()
|
||||
}
|
||||
},
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
# these both have str() representations which we can't really improve upon
|
||||
|
@ -670,7 +661,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
try:
|
||||
self._validate_perspectives_response(key_server, response)
|
||||
|
||||
processed_response = yield self.process_v2_response(
|
||||
processed_response = await self.process_v2_response(
|
||||
perspective_name, response, time_added_ms=time_now_ms
|
||||
)
|
||||
except KeyLookupError as e:
|
||||
|
@ -689,7 +680,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
)
|
||||
keys.setdefault(server_name, {}).update(processed_response)
|
||||
|
||||
yield self.store.store_server_verify_keys(
|
||||
await self.store.store_server_verify_keys(
|
||||
perspective_name, time_now_ms, added_keys
|
||||
)
|
||||
|
||||
|
@ -741,24 +732,23 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
|||
self.clock = hs.get_clock()
|
||||
self.client = hs.get_http_client()
|
||||
|
||||
def get_keys(self, keys_to_fetch):
|
||||
async def get_keys(self, keys_to_fetch):
|
||||
"""
|
||||
Args:
|
||||
keys_to_fetch (dict[str, iterable[str]]):
|
||||
the keys to be fetched. server_name -> key_ids
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
|
||||
dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]:
|
||||
map from server_name -> key_id -> FetchKeyResult
|
||||
"""
|
||||
|
||||
results = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_key(key_to_fetch_item):
|
||||
async def get_key(key_to_fetch_item):
|
||||
server_name, key_ids = key_to_fetch_item
|
||||
try:
|
||||
keys = yield self.get_server_verify_key_v2_direct(server_name, key_ids)
|
||||
keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
|
||||
results[server_name] = keys
|
||||
except KeyLookupError as e:
|
||||
logger.warning(
|
||||
|
@ -767,12 +757,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
|||
except Exception:
|
||||
logger.exception("Error getting keys %s from %s", key_ids, server_name)
|
||||
|
||||
return yieldable_gather_results(get_key, keys_to_fetch.items()).addCallback(
|
||||
lambda _: results
|
||||
)
|
||||
return await yieldable_gather_results(
|
||||
get_key, keys_to_fetch.items()
|
||||
).addCallback(lambda _: results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_server_verify_key_v2_direct(self, server_name, key_ids):
|
||||
async def get_server_verify_key_v2_direct(self, server_name, key_ids):
|
||||
"""
|
||||
|
||||
Args:
|
||||
|
@ -794,25 +783,23 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
|||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
try:
|
||||
response = yield defer.ensureDeferred(
|
||||
self.client.get_json(
|
||||
destination=server_name,
|
||||
path="/_matrix/key/v2/server/"
|
||||
+ urllib.parse.quote(requested_key_id),
|
||||
ignore_backoff=True,
|
||||
# we only give the remote server 10s to respond. It should be an
|
||||
# easy request to handle, so if it doesn't reply within 10s, it's
|
||||
# probably not going to.
|
||||
#
|
||||
# Furthermore, when we are acting as a notary server, we cannot
|
||||
# wait all day for all of the origin servers, as the requesting
|
||||
# server will otherwise time out before we can respond.
|
||||
#
|
||||
# (Note that get_json may make 4 attempts, so this can still take
|
||||
# almost 45 seconds to fetch the headers, plus up to another 60s to
|
||||
# read the response).
|
||||
timeout=10000,
|
||||
)
|
||||
response = await self.client.get_json(
|
||||
destination=server_name,
|
||||
path="/_matrix/key/v2/server/"
|
||||
+ urllib.parse.quote(requested_key_id),
|
||||
ignore_backoff=True,
|
||||
# we only give the remote server 10s to respond. It should be an
|
||||
# easy request to handle, so if it doesn't reply within 10s, it's
|
||||
# probably not going to.
|
||||
#
|
||||
# Furthermore, when we are acting as a notary server, we cannot
|
||||
# wait all day for all of the origin servers, as the requesting
|
||||
# server will otherwise time out before we can respond.
|
||||
#
|
||||
# (Note that get_json may make 4 attempts, so this can still take
|
||||
# almost 45 seconds to fetch the headers, plus up to another 60s to
|
||||
# read the response).
|
||||
timeout=10000,
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
# these both have str() representations which we can't really improve
|
||||
|
@ -827,12 +814,12 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
|||
% (server_name, response["server_name"])
|
||||
)
|
||||
|
||||
response_keys = yield self.process_v2_response(
|
||||
response_keys = await self.process_v2_response(
|
||||
from_server=server_name,
|
||||
response_json=response,
|
||||
time_added_ms=time_now_ms,
|
||||
)
|
||||
yield self.store.store_server_verify_keys(
|
||||
await self.store.store_server_verify_keys(
|
||||
server_name,
|
||||
time_now_ms,
|
||||
((server_name, key_id, key) for key_id, key in response_keys.items()),
|
||||
|
@ -842,22 +829,18 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
|||
return keys
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_key_deferred(verify_request):
|
||||
async def _handle_key_deferred(verify_request) -> None:
|
||||
"""Waits for the key to become available, and then performs a verification
|
||||
|
||||
Args:
|
||||
verify_request (VerifyJsonRequest):
|
||||
|
||||
Returns:
|
||||
Deferred[None]
|
||||
|
||||
Raises:
|
||||
SynapseError if there was a problem performing the verification
|
||||
"""
|
||||
server_name = verify_request.server_name
|
||||
with PreserveLoggingContext():
|
||||
_, key_id, verify_key = yield verify_request.key_ready
|
||||
_, key_id, verify_key = await verify_request.key_ready
|
||||
|
||||
json_object = verify_request.json_object
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ from typing import Optional
|
|||
import attr
|
||||
from nacl.signing import SigningKey
|
||||
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.api.constants import MAX_DEPTH
|
||||
from synapse.api.errors import UnsupportedRoomVersionError
|
||||
from synapse.api.room_versions import (
|
||||
|
@ -27,6 +28,8 @@ from synapse.api.room_versions import (
|
|||
)
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.types import EventID, JsonDict
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
@ -42,45 +45,46 @@ class EventBuilder(object):
|
|||
|
||||
Attributes:
|
||||
room_version: Version of the target room
|
||||
room_id (str)
|
||||
type (str)
|
||||
sender (str)
|
||||
content (dict)
|
||||
unsigned (dict)
|
||||
internal_metadata (_EventInternalMetadata)
|
||||
room_id
|
||||
type
|
||||
sender
|
||||
content
|
||||
unsigned
|
||||
internal_metadata
|
||||
|
||||
_state (StateHandler)
|
||||
_auth (synapse.api.Auth)
|
||||
_store (DataStore)
|
||||
_clock (Clock)
|
||||
_hostname (str): The hostname of the server creating the event
|
||||
_state
|
||||
_auth
|
||||
_store
|
||||
_clock
|
||||
_hostname: The hostname of the server creating the event
|
||||
_signing_key: The signing key to use to sign the event as the server
|
||||
"""
|
||||
|
||||
_state = attr.ib()
|
||||
_auth = attr.ib()
|
||||
_store = attr.ib()
|
||||
_clock = attr.ib()
|
||||
_hostname = attr.ib()
|
||||
_signing_key = attr.ib()
|
||||
_state = attr.ib(type=StateHandler)
|
||||
_auth = attr.ib(type=Auth)
|
||||
_store = attr.ib(type=DataStore)
|
||||
_clock = attr.ib(type=Clock)
|
||||
_hostname = attr.ib(type=str)
|
||||
_signing_key = attr.ib(type=SigningKey)
|
||||
|
||||
room_version = attr.ib(type=RoomVersion)
|
||||
|
||||
room_id = attr.ib()
|
||||
type = attr.ib()
|
||||
sender = attr.ib()
|
||||
room_id = attr.ib(type=str)
|
||||
type = attr.ib(type=str)
|
||||
sender = attr.ib(type=str)
|
||||
|
||||
content = attr.ib(default=attr.Factory(dict))
|
||||
unsigned = attr.ib(default=attr.Factory(dict))
|
||||
content = attr.ib(default=attr.Factory(dict), type=JsonDict)
|
||||
unsigned = attr.ib(default=attr.Factory(dict), type=JsonDict)
|
||||
|
||||
# These only exist on a subset of events, so they raise AttributeError if
|
||||
# someone tries to get them when they don't exist.
|
||||
_state_key = attr.ib(default=None)
|
||||
_redacts = attr.ib(default=None)
|
||||
_origin_server_ts = attr.ib(default=None)
|
||||
_state_key = attr.ib(default=None, type=Optional[str])
|
||||
_redacts = attr.ib(default=None, type=Optional[str])
|
||||
_origin_server_ts = attr.ib(default=None, type=Optional[int])
|
||||
|
||||
internal_metadata = attr.ib(
|
||||
default=attr.Factory(lambda: _EventInternalMetadata({}))
|
||||
default=attr.Factory(lambda: _EventInternalMetadata({})),
|
||||
type=_EventInternalMetadata,
|
||||
)
|
||||
|
||||
@property
|
||||
|
@ -106,7 +110,7 @@ class EventBuilder(object):
|
|||
state_ids = await self._state.get_current_state_ids(
|
||||
self.room_id, prev_event_ids
|
||||
)
|
||||
auth_ids = await self._auth.compute_auth_events(self, state_ids)
|
||||
auth_ids = self._auth.compute_auth_events(self, state_ids)
|
||||
|
||||
format_version = self.room_version.event_format
|
||||
if format_version == EventFormatVersions.V1:
|
||||
|
|
|
@ -23,7 +23,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
|
|||
from synapse.types import StateMap
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.storage.data_stores.main import DataStore
|
||||
from synapse.storage.databases.main import DataStore
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List
|
||||
from typing import TYPE_CHECKING, List, Tuple
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
|
@ -58,7 +58,10 @@ class TransactionManager(object):
|
|||
|
||||
@measure_func("_send_new_transaction")
|
||||
async def send_new_transaction(
|
||||
self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
|
||||
self,
|
||||
destination: str,
|
||||
pending_pdus: List[Tuple[EventBase, int]],
|
||||
pending_edus: List[Edu],
|
||||
):
|
||||
|
||||
# Make a transaction-sending opentracing span. This span follows on from
|
||||
|
|
|
@ -17,7 +17,6 @@ import logging
|
|||
|
||||
import twisted
|
||||
import twisted.internet.error
|
||||
from twisted.internet import defer
|
||||
from twisted.web import server, static
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
|
@ -41,8 +40,7 @@ class AcmeHandler(object):
|
|||
self.reactor = hs.get_reactor()
|
||||
self._acme_domain = hs.config.acme_domain
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start_listening(self):
|
||||
async def start_listening(self):
|
||||
from synapse.handlers import acme_issuing_service
|
||||
|
||||
# Configure logging for txacme, if you need to debug
|
||||
|
@ -82,18 +80,17 @@ class AcmeHandler(object):
|
|||
self._issuer._registered = False
|
||||
|
||||
try:
|
||||
yield self._issuer._ensure_registered()
|
||||
await self._issuer._ensure_registered()
|
||||
except Exception:
|
||||
logger.error(ACME_REGISTER_FAIL_ERROR)
|
||||
raise
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def provision_certificate(self):
|
||||
async def provision_certificate(self):
|
||||
|
||||
logger.warning("Reprovisioning %s", self._acme_domain)
|
||||
|
||||
try:
|
||||
yield self._issuer.issue_cert(self._acme_domain)
|
||||
await self._issuer.issue_cert(self._acme_domain)
|
||||
except Exception:
|
||||
logger.exception("Fail!")
|
||||
raise
|
||||
|
|
|
@ -101,7 +101,7 @@ class ApplicationServicesHandler(object):
|
|||
|
||||
async def start_scheduler():
|
||||
try:
|
||||
return self.scheduler.start()
|
||||
return await self.scheduler.start()
|
||||
except Exception:
|
||||
logger.error("Application Services Failure")
|
||||
|
||||
|
|
|
@ -162,7 +162,7 @@ class AuthHandler(BaseHandler):
|
|||
request_body: Dict[str, Any],
|
||||
clientip: str,
|
||||
description: str,
|
||||
) -> dict:
|
||||
) -> Tuple[dict, str]:
|
||||
"""
|
||||
Checks that the user is who they claim to be, via a UI auth.
|
||||
|
||||
|
@ -183,9 +183,14 @@ class AuthHandler(BaseHandler):
|
|||
describes the operation happening on their account.
|
||||
|
||||
Returns:
|
||||
The parameters for this request (which may
|
||||
A tuple of (params, session_id).
|
||||
|
||||
'params' contains the parameters for this request (which may
|
||||
have been given only in a previous call).
|
||||
|
||||
'session_id' is the ID of this session, either passed in by the
|
||||
client or assigned by this call
|
||||
|
||||
Raises:
|
||||
InteractiveAuthIncompleteError if the client has not yet completed
|
||||
any of the permitted login flows
|
||||
|
@ -207,7 +212,7 @@ class AuthHandler(BaseHandler):
|
|||
flows = [[login_type] for login_type in self._supported_ui_auth_types]
|
||||
|
||||
try:
|
||||
result, params, _ = await self.check_auth(
|
||||
result, params, session_id = await self.check_ui_auth(
|
||||
flows, request, request_body, clientip, description
|
||||
)
|
||||
except LoginError:
|
||||
|
@ -230,7 +235,7 @@ class AuthHandler(BaseHandler):
|
|||
if user_id != requester.user.to_string():
|
||||
raise AuthError(403, "Invalid auth")
|
||||
|
||||
return params
|
||||
return params, session_id
|
||||
|
||||
def get_enabled_auth_types(self):
|
||||
"""Return the enabled user-interactive authentication types
|
||||
|
@ -240,7 +245,7 @@ class AuthHandler(BaseHandler):
|
|||
"""
|
||||
return self.checkers.keys()
|
||||
|
||||
async def check_auth(
|
||||
async def check_ui_auth(
|
||||
self,
|
||||
flows: List[List[str]],
|
||||
request: SynapseRequest,
|
||||
|
@ -363,7 +368,7 @@ class AuthHandler(BaseHandler):
|
|||
|
||||
if not authdict:
|
||||
raise InteractiveAuthIncompleteError(
|
||||
self._auth_dict_for_flows(flows, session.session_id)
|
||||
session.session_id, self._auth_dict_for_flows(flows, session.session_id)
|
||||
)
|
||||
|
||||
# check auth type currently being presented
|
||||
|
@ -410,7 +415,7 @@ class AuthHandler(BaseHandler):
|
|||
ret = self._auth_dict_for_flows(flows, session.session_id)
|
||||
ret["completed"] = list(creds)
|
||||
ret.update(errordict)
|
||||
raise InteractiveAuthIncompleteError(ret)
|
||||
raise InteractiveAuthIncompleteError(session.session_id, ret)
|
||||
|
||||
async def add_oob_auth(
|
||||
self, stagetype: str, authdict: Dict[str, Any], clientip: str
|
||||
|
|
|
@ -57,13 +57,10 @@ class EventStreamHandler(BaseHandler):
|
|||
timeout=0,
|
||||
as_client_event=True,
|
||||
affect_presence=True,
|
||||
only_keys=None,
|
||||
room_id=None,
|
||||
is_guest=False,
|
||||
):
|
||||
"""Fetches the events stream for a given user.
|
||||
|
||||
If `only_keys` is not None, events from keys will be sent down.
|
||||
"""
|
||||
|
||||
if room_id:
|
||||
|
@ -93,7 +90,6 @@ class EventStreamHandler(BaseHandler):
|
|||
auth_user,
|
||||
pagin_config,
|
||||
timeout,
|
||||
only_keys=only_keys,
|
||||
is_guest=is_guest,
|
||||
explicit_room_id=room_id,
|
||||
)
|
||||
|
|
|
@ -71,7 +71,7 @@ from synapse.replication.http.federation import (
|
|||
)
|
||||
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.distributor import user_joined_room
|
||||
|
@ -2064,7 +2064,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
if not auth_events:
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
auth_events_ids = await self.auth.compute_auth_events(
|
||||
auth_events_ids = self.auth.compute_auth_events(
|
||||
event, prev_state_ids, for_verification=True
|
||||
)
|
||||
auth_events_x = await self.store.get_events(auth_events_ids)
|
||||
|
|
|
@ -22,14 +22,10 @@ import urllib.parse
|
|||
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from canonicaljson import json
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import verify_signed_json
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
||||
from twisted.internet.error import TimeoutError
|
||||
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
CodeMessageException,
|
||||
Codes,
|
||||
HttpResponseException,
|
||||
|
@ -628,9 +624,9 @@ class IdentityHandler(BaseHandler):
|
|||
)
|
||||
|
||||
if "mxid" in data:
|
||||
if "signatures" not in data:
|
||||
raise AuthError(401, "No signatures on 3pid binding")
|
||||
await self._verify_any_signature(data, id_server)
|
||||
# note: we used to verify the identity server's signature here, but no longer
|
||||
# require or validate it. See the following for context:
|
||||
# https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
|
||||
return data["mxid"]
|
||||
except TimeoutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
@ -751,30 +747,6 @@ class IdentityHandler(BaseHandler):
|
|||
mxid = lookup_results["mappings"].get(lookup_value)
|
||||
return mxid
|
||||
|
||||
async def _verify_any_signature(self, data, server_hostname):
|
||||
if server_hostname not in data["signatures"]:
|
||||
raise AuthError(401, "No signature from server %s" % (server_hostname,))
|
||||
for key_name, signature in data["signatures"][server_hostname].items():
|
||||
try:
|
||||
key_data = await self.blacklisting_http_client.get_json(
|
||||
"%s%s/_matrix/identity/api/v1/pubkey/%s"
|
||||
% (id_server_scheme, server_hostname, key_name)
|
||||
)
|
||||
except TimeoutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
if "public_key" not in key_data:
|
||||
raise AuthError(
|
||||
401, "No public key named %s from %s" % (key_name, server_hostname)
|
||||
)
|
||||
verify_signed_json(
|
||||
data,
|
||||
server_hostname,
|
||||
decode_verify_key_bytes(
|
||||
key_name, decode_base64(key_data["public_key"])
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
async def ask_id_server_for_third_party_invite(
|
||||
self,
|
||||
requester: Requester,
|
||||
|
|
|
@ -109,7 +109,7 @@ class InitialSyncHandler(BaseHandler):
|
|||
|
||||
rooms_ret = []
|
||||
|
||||
now_token = await self.hs.get_event_sources().get_current_token()
|
||||
now_token = self.hs.get_event_sources().get_current_token()
|
||||
|
||||
presence_stream = self.hs.get_event_sources().sources["presence"]
|
||||
pagination_config = PaginationConfig(from_token=now_token)
|
||||
|
@ -360,7 +360,7 @@ class InitialSyncHandler(BaseHandler):
|
|||
current_state.values(), time_now
|
||||
)
|
||||
|
||||
now_token = await self.hs.get_event_sources().get_current_token()
|
||||
now_token = self.hs.get_event_sources().get_current_token()
|
||||
|
||||
limit = pagin_config.limit if pagin_config else None
|
||||
if limit is None:
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
|
||||
|
@ -45,7 +45,7 @@ from synapse.events.validator import EventValidator
|
|||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
Collection,
|
||||
|
@ -93,11 +93,11 @@ class MessageHandler(object):
|
|||
|
||||
async def get_room_data(
|
||||
self,
|
||||
user_id: str = None,
|
||||
room_id: str = None,
|
||||
event_type: Optional[str] = None,
|
||||
state_key: str = "",
|
||||
is_guest: bool = False,
|
||||
user_id: str,
|
||||
room_id: str,
|
||||
event_type: str,
|
||||
state_key: str,
|
||||
is_guest: bool,
|
||||
) -> dict:
|
||||
""" Get data from a room.
|
||||
|
||||
|
@ -407,7 +407,7 @@ class EventCreationHandler(object):
|
|||
#
|
||||
# map from room id to time-of-last-attempt.
|
||||
#
|
||||
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int]
|
||||
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: Dict[str, int]
|
||||
|
||||
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
|
||||
# config options, but *only* if we have a configuration for which we are
|
||||
|
@ -709,7 +709,7 @@ class EventCreationHandler(object):
|
|||
async def create_and_send_nonmember_event(
|
||||
self,
|
||||
requester: Requester,
|
||||
event_dict: EventBase,
|
||||
event_dict: dict,
|
||||
ratelimit: bool = True,
|
||||
txn_id: Optional[str] = None,
|
||||
) -> Tuple[EventBase, int]:
|
||||
|
@ -770,6 +770,15 @@ class EventCreationHandler(object):
|
|||
else:
|
||||
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
|
||||
|
||||
# we now ought to have some prev_events (unless it's a create event).
|
||||
#
|
||||
# do a quick sanity check here, rather than waiting until we've created the
|
||||
# event and then try to auth it (which fails with a somewhat confusing "No
|
||||
# create event in auth events")
|
||||
assert (
|
||||
builder.type == EventTypes.Create or len(prev_event_ids) > 0
|
||||
), "Attempting to create an event with no prev_events"
|
||||
|
||||
event = await builder.build(prev_event_ids=prev_event_ids)
|
||||
context = await self.state.compute_event_context(event)
|
||||
if requester:
|
||||
|
@ -964,7 +973,7 @@ class EventCreationHandler(object):
|
|||
# Validate a newly added alias or newly added alt_aliases.
|
||||
|
||||
original_alias = None
|
||||
original_alt_aliases = set()
|
||||
original_alt_aliases = [] # type: List[str]
|
||||
|
||||
original_event_id = event.unsigned.get("replaces_state")
|
||||
if original_event_id:
|
||||
|
@ -1012,6 +1021,10 @@ class EventCreationHandler(object):
|
|||
|
||||
current_state_ids = await context.get_current_state_ids()
|
||||
|
||||
# We know this event is not an outlier, so this must be
|
||||
# non-None.
|
||||
assert current_state_ids is not None
|
||||
|
||||
state_to_include_ids = [
|
||||
e_id
|
||||
for k, e_id in current_state_ids.items()
|
||||
|
@ -1063,7 +1076,7 @@ class EventCreationHandler(object):
|
|||
raise SynapseError(400, "Cannot redact event from a different room")
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
auth_events_ids = await self.auth.compute_auth_events(
|
||||
auth_events_ids = self.auth.compute_auth_events(
|
||||
event, prev_state_ids, for_verification=True
|
||||
)
|
||||
auth_events = await self.store.get_events(auth_events_ids)
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Generic, List, Optional, Tuple, TypeVar
|
||||
from typing import TYPE_CHECKING, Dict, Generic, List, Optional, Tuple, TypeVar
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import attr
|
||||
|
@ -39,9 +39,11 @@ from synapse.http.server import respond_with_html
|
|||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.push.mailer import load_jinja2_templates
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import UserID, map_username_to_mxid_localpart
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SESSION_COOKIE_NAME = b"oidc_session"
|
||||
|
@ -91,7 +93,7 @@ class OidcHandler:
|
|||
"""Handles requests related to the OpenID Connect login flow.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: HomeServer):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._callback_url = hs.config.oidc_callback_url # type: str
|
||||
self._scopes = hs.config.oidc_scopes # type: List[str]
|
||||
self._client_auth = ClientAuth(
|
||||
|
|
|
@ -309,7 +309,7 @@ class PaginationHandler(object):
|
|||
room_token = pagin_config.from_token.room_key
|
||||
else:
|
||||
pagin_config.from_token = (
|
||||
await self.hs.get_event_sources().get_current_token_for_pagination()
|
||||
self.hs.get_event_sources().get_current_token_for_pagination()
|
||||
)
|
||||
room_token = pagin_config.from_token.room_key
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ from synapse.logging.utils import log_function
|
|||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage.data_stores.main import DataStore
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
@ -319,7 +319,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
is some spurious presence changes that will self-correct.
|
||||
"""
|
||||
# If the DB pool has already terminated, don't try updating
|
||||
if not self.store.db.is_running():
|
||||
if not self.store.db_pool.is_running():
|
||||
return
|
||||
|
||||
logger.info(
|
||||
|
|
|
@ -548,7 +548,7 @@ class RegistrationHandler(BaseHandler):
|
|||
address (str|None): the IP address used to perform the registration.
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
Awaitable
|
||||
"""
|
||||
if self.hs.config.worker_app:
|
||||
return self._register_client(
|
||||
|
|
|
@ -22,7 +22,7 @@ import logging
|
|||
import math
|
||||
import string
|
||||
from collections import OrderedDict
|
||||
from typing import Optional, Tuple
|
||||
from typing import Awaitable, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import (
|
||||
EventTypes,
|
||||
|
@ -1041,7 +1041,7 @@ class RoomEventSource(object):
|
|||
):
|
||||
# We just ignore the key for now.
|
||||
|
||||
to_key = await self.get_current_key()
|
||||
to_key = self.get_current_key()
|
||||
|
||||
from_token = RoomStreamToken.parse(from_key)
|
||||
if from_token.topological:
|
||||
|
@ -1081,10 +1081,10 @@ class RoomEventSource(object):
|
|||
|
||||
return (events, end_key)
|
||||
|
||||
def get_current_key(self):
|
||||
return self.store.get_room_events_max_id()
|
||||
def get_current_key(self) -> str:
|
||||
return "s%d" % (self.store.get_room_max_stream_ordering(),)
|
||||
|
||||
def get_current_key_for_room(self, room_id):
|
||||
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
|
||||
return self.store.get_room_events_max_id(room_id)
|
||||
|
||||
|
||||
|
|
|
@ -16,13 +16,14 @@
|
|||
import abc
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from synapse import types
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.api.room_versions import EventFormatVersions
|
||||
from synapse.crypto.event_signing import compute_event_reference_hash
|
||||
from synapse.events import EventBase
|
||||
|
@ -36,6 +37,10 @@ from synapse.util.distributor import user_joined_room, user_left_room
|
|||
|
||||
from ._base import BaseHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -47,7 +52,7 @@ class RoomMemberHandler(object):
|
|||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
self.auth = hs.get_auth()
|
||||
|
@ -78,6 +83,17 @@ class RoomMemberHandler(object):
|
|||
if self._is_on_event_persistence_instance:
|
||||
self.persist_event_storage = hs.get_storage().persistence
|
||||
|
||||
self._join_rate_limiter_local = Ratelimiter(
|
||||
clock=self.clock,
|
||||
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
|
||||
)
|
||||
self._join_rate_limiter_remote = Ratelimiter(
|
||||
clock=self.clock,
|
||||
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
|
||||
)
|
||||
|
||||
# This is only used to get at ratelimit function, and
|
||||
# maybe_kick_guest_users. It's fine there are multiple of these as
|
||||
# it doesn't store state.
|
||||
|
@ -196,7 +212,7 @@ class RoomMemberHandler(object):
|
|||
return duplicate.event_id, stream_id
|
||||
|
||||
stream_id = await self.event_creation_handler.handle_new_client_event(
|
||||
requester, event, context, extra_users=[target], ratelimit=ratelimit
|
||||
requester, event, context, extra_users=[target], ratelimit=ratelimit,
|
||||
)
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
|
@ -461,7 +477,28 @@ class RoomMemberHandler(object):
|
|||
# so don't really fit into the general auth process.
|
||||
raise AuthError(403, "Guest access not allowed")
|
||||
|
||||
if not is_host_in_room:
|
||||
if is_host_in_room:
|
||||
time_now_s = self.clock.time()
|
||||
allowed, time_allowed = self._join_rate_limiter_local.can_do_action(
|
||||
requester.user.to_string(),
|
||||
)
|
||||
|
||||
if not allowed:
|
||||
raise LimitExceededError(
|
||||
retry_after_ms=int(1000 * (time_allowed - time_now_s))
|
||||
)
|
||||
|
||||
else:
|
||||
time_now_s = self.clock.time()
|
||||
allowed, time_allowed = self._join_rate_limiter_remote.can_do_action(
|
||||
requester.user.to_string(),
|
||||
)
|
||||
|
||||
if not allowed:
|
||||
raise LimitExceededError(
|
||||
retry_after_ms=int(1000 * (time_allowed - time_now_s))
|
||||
)
|
||||
|
||||
inviter = await self._get_inviter(target.to_string(), room_id)
|
||||
if inviter and not self.hs.is_mine(inviter):
|
||||
remote_room_hosts.append(inviter.domain)
|
||||
|
@ -987,7 +1024,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
|||
|
||||
check_complexity = self.hs.config.limit_remote_rooms.enabled
|
||||
if check_complexity and self.hs.config.limit_remote_rooms.admins_can_join:
|
||||
check_complexity = not await self.hs.auth.is_server_admin(user)
|
||||
check_complexity = not await self.auth.is_server_admin(user)
|
||||
|
||||
if check_complexity:
|
||||
# Fetch the room complexity
|
||||
|
|
|
@ -14,15 +14,16 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
import re
|
||||
from typing import Callable, Dict, Optional, Set, Tuple
|
||||
from typing import TYPE_CHECKING, Callable, Dict, Optional, Set, Tuple
|
||||
|
||||
import attr
|
||||
import saml2
|
||||
import saml2.response
|
||||
from saml2.client import Saml2Client
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.errors import AuthError, SynapseError
|
||||
from synapse.config import ConfigError
|
||||
from synapse.config.saml2_config import SamlAttributeRequirement
|
||||
from synapse.http.servlet import parse_string
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.module_api import ModuleApi
|
||||
|
@ -34,6 +35,9 @@ from synapse.types import (
|
|||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.iterutils import chunk_seq
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import synapse.server
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -49,7 +53,7 @@ class Saml2SessionData:
|
|||
|
||||
|
||||
class SamlHandler:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||
self._saml_client = Saml2Client(hs.config.saml2_sp_config)
|
||||
self._auth = hs.get_auth()
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
|
@ -62,6 +66,7 @@ class SamlHandler:
|
|||
self._grandfathered_mxid_source_attribute = (
|
||||
hs.config.saml2_grandfathered_mxid_source_attribute
|
||||
)
|
||||
self._saml2_attribute_requirements = hs.config.saml2.attribute_requirements
|
||||
|
||||
# plugin to do custom mapping from saml response to mxid
|
||||
self._user_mapping_provider = hs.config.saml2_user_mapping_provider_class(
|
||||
|
@ -73,7 +78,7 @@ class SamlHandler:
|
|||
self._auth_provider_id = "saml"
|
||||
|
||||
# a map from saml session id to Saml2SessionData object
|
||||
self._outstanding_requests_dict = {}
|
||||
self._outstanding_requests_dict = {} # type: Dict[str, Saml2SessionData]
|
||||
|
||||
# a lock on the mappings
|
||||
self._mapping_lock = Linearizer(name="saml_mapping", clock=self._clock)
|
||||
|
@ -165,11 +170,18 @@ class SamlHandler:
|
|||
saml2.BINDING_HTTP_POST,
|
||||
outstanding=self._outstanding_requests_dict,
|
||||
)
|
||||
except saml2.response.UnsolicitedResponse as e:
|
||||
# the pysaml2 library helpfully logs an ERROR here, but neglects to log
|
||||
# the session ID. I don't really want to put the full text of the exception
|
||||
# in the (user-visible) exception message, so let's log the exception here
|
||||
# so we can track down the session IDs later.
|
||||
logger.warning(str(e))
|
||||
raise SynapseError(400, "Unexpected SAML2 login.")
|
||||
except Exception as e:
|
||||
raise SynapseError(400, "Unable to parse SAML2 response: %s" % (e,))
|
||||
raise SynapseError(400, "Unable to parse SAML2 response: %s." % (e,))
|
||||
|
||||
if saml2_auth.not_signed:
|
||||
raise SynapseError(400, "SAML2 response was not signed")
|
||||
raise SynapseError(400, "SAML2 response was not signed.")
|
||||
|
||||
logger.debug("SAML2 response: %s", saml2_auth.origxml)
|
||||
for assertion in saml2_auth.assertions:
|
||||
|
@ -188,6 +200,9 @@ class SamlHandler:
|
|||
saml2_auth.in_response_to, None
|
||||
)
|
||||
|
||||
for requirement in self._saml2_attribute_requirements:
|
||||
_check_attribute_requirement(saml2_auth.ava, requirement)
|
||||
|
||||
remote_user_id = self._user_mapping_provider.get_remote_user_id(
|
||||
saml2_auth, client_redirect_url
|
||||
)
|
||||
|
@ -294,6 +309,21 @@ class SamlHandler:
|
|||
del self._outstanding_requests_dict[reqid]
|
||||
|
||||
|
||||
def _check_attribute_requirement(ava: dict, req: SamlAttributeRequirement):
|
||||
values = ava.get(req.attribute, [])
|
||||
for v in values:
|
||||
if v == req.value:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"SAML2 attribute %s did not match required value '%s' (was '%s')",
|
||||
req.attribute,
|
||||
req.value,
|
||||
values,
|
||||
)
|
||||
raise AuthError(403, "You are not authorized to log in here.")
|
||||
|
||||
|
||||
DOT_REPLACE_PATTERN = re.compile(
|
||||
("[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),))
|
||||
)
|
||||
|
|
|
@ -340,7 +340,7 @@ class SearchHandler(BaseHandler):
|
|||
# If client has asked for "context" for each event (i.e. some surrounding
|
||||
# events and state), fetch that
|
||||
if event_context is not None:
|
||||
now_token = await self.hs.get_event_sources().get_current_token()
|
||||
now_token = self.hs.get_event_sources().get_current_token()
|
||||
|
||||
contexts = {}
|
||||
for event in allowed_events:
|
||||
|
|
|
@ -232,7 +232,7 @@ class StatsHandler:
|
|||
|
||||
if membership == prev_membership:
|
||||
pass # noop
|
||||
if membership == Membership.JOIN:
|
||||
elif membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] += 1
|
||||
elif membership == Membership.INVITE:
|
||||
room_stats_delta["invited_members"] += 1
|
||||
|
|
|
@ -104,7 +104,6 @@ class JoinedSyncResult:
|
|||
account_data = attr.ib(type=List[JsonDict])
|
||||
unread_notifications = attr.ib(type=JsonDict)
|
||||
summary = attr.ib(type=Optional[JsonDict])
|
||||
unread_count = attr.ib(type=int)
|
||||
|
||||
def __nonzero__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
|
@ -964,7 +963,7 @@ class SyncHandler(object):
|
|||
# this is due to some of the underlying streams not supporting the ability
|
||||
# to query up to a given point.
|
||||
# Always use the `now_token` in `SyncResultBuilder`
|
||||
now_token = await self.event_sources.get_current_token()
|
||||
now_token = self.event_sources.get_current_token()
|
||||
|
||||
logger.debug(
|
||||
"Calculating sync response for %r between %s and %s",
|
||||
|
@ -1890,10 +1889,6 @@ class SyncHandler(object):
|
|||
|
||||
if room_builder.rtype == "joined":
|
||||
unread_notifications = {} # type: Dict[str, str]
|
||||
|
||||
unread_count = await self.store.get_unread_message_count_for_user(
|
||||
room_id, sync_config.user.to_string(),
|
||||
)
|
||||
room_sync = JoinedSyncResult(
|
||||
room_id=room_id,
|
||||
timeline=batch,
|
||||
|
@ -1902,7 +1897,6 @@ class SyncHandler(object):
|
|||
account_data=account_data_events,
|
||||
unread_notifications=unread_notifications,
|
||||
summary=summary,
|
||||
unread_count=unread_count,
|
||||
)
|
||||
|
||||
if room_sync or always_include:
|
||||
|
|
|
@ -284,8 +284,7 @@ class SimpleHttpClient(object):
|
|||
ip_blacklist=self._ip_blacklist,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def request(self, method, uri, data=None, headers=None):
|
||||
async def request(self, method, uri, data=None, headers=None):
|
||||
"""
|
||||
Args:
|
||||
method (str): HTTP method to use.
|
||||
|
@ -298,7 +297,7 @@ class SimpleHttpClient(object):
|
|||
outgoing_requests_counter.labels(method).inc()
|
||||
|
||||
# log request but strip `access_token` (AS requests for example include this)
|
||||
logger.info("Sending request %s %s", method, redact_uri(uri))
|
||||
logger.debug("Sending request %s %s", method, redact_uri(uri))
|
||||
|
||||
with start_active_span(
|
||||
"outgoing-client-request",
|
||||
|
@ -330,7 +329,7 @@ class SimpleHttpClient(object):
|
|||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
)
|
||||
response = yield make_deferred_yieldable(request_deferred)
|
||||
response = await make_deferred_yieldable(request_deferred)
|
||||
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
logger.info(
|
||||
|
@ -353,8 +352,7 @@ class SimpleHttpClient(object):
|
|||
set_tag("error_reason", e.args[0])
|
||||
raise
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def post_urlencoded_get_json(self, uri, args={}, headers=None):
|
||||
async def post_urlencoded_get_json(self, uri, args={}, headers=None):
|
||||
"""
|
||||
Args:
|
||||
uri (str):
|
||||
|
@ -363,7 +361,7 @@ class SimpleHttpClient(object):
|
|||
header name to a list of values for that header
|
||||
|
||||
Returns:
|
||||
Deferred[object]: parsed json
|
||||
object: parsed json
|
||||
|
||||
Raises:
|
||||
HttpResponseException: On a non-2xx HTTP response.
|
||||
|
@ -386,11 +384,11 @@ class SimpleHttpClient(object):
|
|||
if headers:
|
||||
actual_headers.update(headers)
|
||||
|
||||
response = yield self.request(
|
||||
response = await self.request(
|
||||
"POST", uri, headers=Headers(actual_headers), data=query_bytes
|
||||
)
|
||||
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
body = await make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
return json.loads(body.decode("utf-8"))
|
||||
|
@ -399,8 +397,7 @@ class SimpleHttpClient(object):
|
|||
response.code, response.phrase.decode("ascii", errors="replace"), body
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def post_json_get_json(self, uri, post_json, headers=None):
|
||||
async def post_json_get_json(self, uri, post_json, headers=None):
|
||||
"""
|
||||
|
||||
Args:
|
||||
|
@ -410,7 +407,7 @@ class SimpleHttpClient(object):
|
|||
header name to a list of values for that header
|
||||
|
||||
Returns:
|
||||
Deferred[object]: parsed json
|
||||
object: parsed json
|
||||
|
||||
Raises:
|
||||
HttpResponseException: On a non-2xx HTTP response.
|
||||
|
@ -429,11 +426,11 @@ class SimpleHttpClient(object):
|
|||
if headers:
|
||||
actual_headers.update(headers)
|
||||
|
||||
response = yield self.request(
|
||||
response = await self.request(
|
||||
"POST", uri, headers=Headers(actual_headers), data=json_str
|
||||
)
|
||||
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
body = await make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
return json.loads(body.decode("utf-8"))
|
||||
|
@ -442,8 +439,7 @@ class SimpleHttpClient(object):
|
|||
response.code, response.phrase.decode("ascii", errors="replace"), body
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_json(self, uri, args={}, headers=None):
|
||||
async def get_json(self, uri, args={}, headers=None):
|
||||
""" Gets some json from the given URI.
|
||||
|
||||
Args:
|
||||
|
@ -455,7 +451,7 @@ class SimpleHttpClient(object):
|
|||
headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
|
||||
header name to a list of values for that header
|
||||
Returns:
|
||||
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
|
||||
Succeeds when we get *any* 2xx HTTP response, with the
|
||||
HTTP body as JSON.
|
||||
Raises:
|
||||
HttpResponseException On a non-2xx HTTP response.
|
||||
|
@ -466,11 +462,10 @@ class SimpleHttpClient(object):
|
|||
if headers:
|
||||
actual_headers.update(headers)
|
||||
|
||||
body = yield self.get_raw(uri, args, headers=headers)
|
||||
body = await self.get_raw(uri, args, headers=headers)
|
||||
return json.loads(body.decode("utf-8"))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def put_json(self, uri, json_body, args={}, headers=None):
|
||||
async def put_json(self, uri, json_body, args={}, headers=None):
|
||||
""" Puts some json to the given URI.
|
||||
|
||||
Args:
|
||||
|
@ -483,7 +478,7 @@ class SimpleHttpClient(object):
|
|||
headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
|
||||
header name to a list of values for that header
|
||||
Returns:
|
||||
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
|
||||
Succeeds when we get *any* 2xx HTTP response, with the
|
||||
HTTP body as JSON.
|
||||
Raises:
|
||||
HttpResponseException On a non-2xx HTTP response.
|
||||
|
@ -504,11 +499,11 @@ class SimpleHttpClient(object):
|
|||
if headers:
|
||||
actual_headers.update(headers)
|
||||
|
||||
response = yield self.request(
|
||||
response = await self.request(
|
||||
"PUT", uri, headers=Headers(actual_headers), data=json_str
|
||||
)
|
||||
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
body = await make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
return json.loads(body.decode("utf-8"))
|
||||
|
@ -517,8 +512,7 @@ class SimpleHttpClient(object):
|
|||
response.code, response.phrase.decode("ascii", errors="replace"), body
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_raw(self, uri, args={}, headers=None):
|
||||
async def get_raw(self, uri, args={}, headers=None):
|
||||
""" Gets raw text from the given URI.
|
||||
|
||||
Args:
|
||||
|
@ -530,7 +524,7 @@ class SimpleHttpClient(object):
|
|||
headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
|
||||
header name to a list of values for that header
|
||||
Returns:
|
||||
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
|
||||
Succeeds when we get *any* 2xx HTTP response, with the
|
||||
HTTP body as bytes.
|
||||
Raises:
|
||||
HttpResponseException on a non-2xx HTTP response.
|
||||
|
@ -543,9 +537,9 @@ class SimpleHttpClient(object):
|
|||
if headers:
|
||||
actual_headers.update(headers)
|
||||
|
||||
response = yield self.request("GET", uri, headers=Headers(actual_headers))
|
||||
response = await self.request("GET", uri, headers=Headers(actual_headers))
|
||||
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
body = await make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
return body
|
||||
|
@ -557,8 +551,7 @@ class SimpleHttpClient(object):
|
|||
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
|
||||
# The two should be factored out.
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_file(self, url, output_stream, max_size=None, headers=None):
|
||||
async def get_file(self, url, output_stream, max_size=None, headers=None):
|
||||
"""GETs a file from a given URL
|
||||
Args:
|
||||
url (str): The URL to GET
|
||||
|
@ -574,7 +567,7 @@ class SimpleHttpClient(object):
|
|||
if headers:
|
||||
actual_headers.update(headers)
|
||||
|
||||
response = yield self.request("GET", url, headers=Headers(actual_headers))
|
||||
response = await self.request("GET", url, headers=Headers(actual_headers))
|
||||
|
||||
resp_headers = dict(response.headers.getAllRawHeaders())
|
||||
|
||||
|
@ -598,7 +591,7 @@ class SimpleHttpClient(object):
|
|||
# straight back in again
|
||||
|
||||
try:
|
||||
length = yield make_deferred_yieldable(
|
||||
length = await make_deferred_yieldable(
|
||||
_readBodyToFile(response, output_stream, max_size)
|
||||
)
|
||||
except SynapseError:
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Reference in a new issue