0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2025-01-19 00:41:55 +01:00

Merge branch 'develop' into rav/url_preview_limit_title

This commit is contained in:
Richard van der Hoff 2019-11-05 17:08:07 +00:00 committed by GitHub
commit 55a7da247a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
231 changed files with 4527 additions and 1822 deletions

View file

@ -0,0 +1,21 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the script to connect to the postgresql database that will be available in the
# CI's Docker setup at the point where this file is considered.
server_name: "test"
signing_key_path: "/src/.buildkite/test.signing.key"
report_stats: false
database:
name: "psycopg2"
args:
user: postgres
host: postgres
password: postgres
database: synapse
# Suppress the key server warning.
trusted_key_servers:
- server_name: "matrix.org"
suppress_key_server_warning: true

View file

@ -0,0 +1,36 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.storage.engines import create_engine
logger = logging.getLogger("create_postgres_db")
if __name__ == "__main__":
# Create a PostgresEngine.
db_engine = create_engine({"name": "psycopg2", "args": {}})
# Connect to postgres to create the base database.
# We use "postgres" as a database because it's bound to exist and the "synapse" one
# doesn't exist yet.
db_conn = db_engine.module.connect(
user="postgres", host="postgres", password="postgres", dbname="postgres"
)
db_conn.autocommit = True
cur = db_conn.cursor()
cur.execute("CREATE DATABASE synapse;")
cur.close()
db_conn.close()

View file

@ -0,0 +1,36 @@
#!/bin/bash
#
# Test script for 'synapse_port_db', which creates a virtualenv, installs Synapse along
# with additional dependencies needed for the test (such as coverage or the PostgreSQL
# driver), update the schema of the test SQLite database and run background updates on it,
# create an empty test database in PostgreSQL, then run the 'synapse_port_db' script to
# test porting the SQLite database to the PostgreSQL database (with coverage).
set -xe
cd `dirname $0`/../..
echo "--- Install dependencies"
# Install dependencies for this test.
pip install psycopg2 coverage coverage-enable-subprocess
# Install Synapse itself. This won't update any libraries.
pip install -e .
echo "--- Generate the signing key"
# Generate the server's signing key.
python -m synapse.app.homeserver --generate-keys -c .buildkite/sqlite-config.yaml
echo "--- Prepare the databases"
# Make sure the SQLite3 database is using the latest schema and has no pending background update.
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
# Create the PostgreSQL database.
./.buildkite/scripts/create_postgres_db.py
echo "+++ Run synapse_port_db"
# Run the script
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml

View file

@ -0,0 +1,18 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the 'update_database' script to connect to the test SQLite database to upgrade its
# schema and run background updates on it.
server_name: "test"
signing_key_path: "/src/.buildkite/test.signing.key"
report_stats: false
database:
name: "sqlite3"
args:
database: ".buildkite/test_db.db"
# Suppress the key server warning.
trusted_key_servers:
- server_name: "matrix.org"
suppress_key_server_warning: true

BIN
.buildkite/test_db.db Normal file

Binary file not shown.

View file

@ -5,3 +5,4 @@
* [ ] Pull request is based on the develop branch * [ ] Pull request is based on the develop branch
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#changelog) * [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#changelog)
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#sign-off) * [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#sign-off)
* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#code-style))

View file

@ -58,10 +58,29 @@ All Matrix projects have a well-defined code-style - and sometimes we've even
got as far as documenting it... For instance, synapse's code style doc lives got as far as documenting it... For instance, synapse's code style doc lives
at https://github.com/matrix-org/synapse/tree/master/docs/code_style.md. at https://github.com/matrix-org/synapse/tree/master/docs/code_style.md.
To facilitate meeting these criteria you can run ``scripts-dev/lint.sh``
locally. Since this runs the tools listed in the above document, you'll need
python 3.6 and to install each tool. **Note that the script does not just
test/check, but also reformats code, so you may wish to ensure any new code is
committed first**. By default this script checks all files and can take some
time; if you alter only certain files, you might wish to specify paths as
arguments to reduce the run-time.
Please ensure your changes match the cosmetic style of the existing project, Please ensure your changes match the cosmetic style of the existing project,
and **never** mix cosmetic and functional changes in the same commit, as it and **never** mix cosmetic and functional changes in the same commit, as it
makes it horribly hard to review otherwise. makes it horribly hard to review otherwise.
Before doing a commit, ensure the changes you've made don't produce
linting errors. You can do this by running the linters as follows. Ensure to
commit any files that were corrected.
::
# Install the dependencies
pip install -U black flake8 isort
# Run the linter script
./scripts-dev/lint.sh
Changelog Changelog
~~~~~~~~~ ~~~~~~~~~

View file

@ -413,16 +413,18 @@ For a more detailed guide to configuring your server for federation, see
## Email ## Email
It is desirable for Synapse to have the capability to send email. For example, It is desirable for Synapse to have the capability to send email. This allows
this is required to support the 'password reset' feature. Synapse to send password reset emails, send verifications when an email address
is added to a user's account, and send email notifications to users when they
receive new messages.
To configure an SMTP server for Synapse, modify the configuration section To configure an SMTP server for Synapse, modify the configuration section
headed ``email``, and be sure to have at least the ``smtp_host``, ``smtp_port`` headed `email`, and be sure to have at least the `smtp_host`, `smtp_port`
and ``notif_from`` fields filled out. You may also need to set ``smtp_user``, and `notif_from` fields filled out. You may also need to set `smtp_user`,
``smtp_pass``, and ``require_transport_security``. `smtp_pass`, and `require_transport_security`.
If Synapse is not configured with an SMTP server, password reset via email will If email is not configured, password reset, registration and notifications via
be disabled by default. email will be disabled.
## Registering a user ## Registering a user

1
changelog.d/5727.feature Normal file
View file

@ -0,0 +1 @@
Add federation support for cross-signing.

1
changelog.d/6140.misc Normal file
View file

@ -0,0 +1 @@
Add a CI job to test the `synapse_port_db` script.

1
changelog.d/6164.doc Normal file
View file

@ -0,0 +1 @@
Contributor documentation now mentions script to run linters.

1
changelog.d/6218.misc Normal file
View file

@ -0,0 +1 @@
Convert EventContext to an attrs.

1
changelog.d/6232.bugfix Normal file
View file

@ -0,0 +1 @@
Remove a room from a server's public rooms list on room upgrade.

1
changelog.d/6238.feature Normal file
View file

@ -0,0 +1 @@
Add support for outbound http proxying via http_proxy/HTTPS_PROXY env vars.

1
changelog.d/6240.misc Normal file
View file

@ -0,0 +1 @@
Move `persist_events` out from main data store.

1
changelog.d/6250.misc Normal file
View file

@ -0,0 +1 @@
Reduce verbosity of user/room stats.

1
changelog.d/6251.misc Normal file
View file

@ -0,0 +1 @@
Reduce impact of debug logging.

1
changelog.d/6253.bugfix Normal file
View file

@ -0,0 +1 @@
Delete keys from key backup when deleting backup versions.

1
changelog.d/6254.bugfix Normal file
View file

@ -0,0 +1 @@
Make notification of cross-signing signatures work with workers.

1
changelog.d/6257.doc Normal file
View file

@ -0,0 +1 @@
Modify CAPTCHA_SETUP.md to update the terms `private key` and `public key` to `secret key` and `site key` respectively. Contributed by Yash Jipkate.

1
changelog.d/6259.misc Normal file
View file

@ -0,0 +1 @@
Expose some homeserver functionality to spam checkers.

1
changelog.d/6263.misc Normal file
View file

@ -0,0 +1 @@
Change cache descriptors to always return deferreds.

1
changelog.d/6269.misc Normal file
View file

@ -0,0 +1 @@
Fix incorrect comment regarding the functionality of an `if` statement.

1
changelog.d/6270.misc Normal file
View file

@ -0,0 +1 @@
Update CI to run `isort` over the `scripts` and `scripts-dev` directories.

1
changelog.d/6271.misc Normal file
View file

@ -0,0 +1 @@
Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated.

1
changelog.d/6272.doc Normal file
View file

@ -0,0 +1 @@
Update `INSTALL.md` Email section to talk about `account_threepid_delegates`.

1
changelog.d/6273.doc Normal file
View file

@ -0,0 +1 @@
Fix a small typo in `account_threepid_delegates` configuration option.

1
changelog.d/6274.misc Normal file
View file

@ -0,0 +1 @@
Port replication http server endpoints to async/await.

1
changelog.d/6275.misc Normal file
View file

@ -0,0 +1 @@
Port room rest handlers to async/await.

1
changelog.d/6276.misc Normal file
View file

@ -0,0 +1 @@
Add a CI job to test the `synapse_port_db` script.

1
changelog.d/6277.misc Normal file
View file

@ -0,0 +1 @@
Remove redundant CLI parameters on CI's `flake8` step.

1
changelog.d/6278.bugfix Normal file
View file

@ -0,0 +1 @@
Fix exception when remote servers attempt to join a room that they're not allowed to join.

1
changelog.d/6279.misc Normal file
View file

@ -0,0 +1 @@
Port `federation_server.py` to async/await.

1
changelog.d/6280.misc Normal file
View file

@ -0,0 +1 @@
Port receipt and read markers to async/wait.

1
changelog.d/6284.bugfix Normal file
View file

@ -0,0 +1 @@
Prevent errors from appearing on Synapse startup if `git` is not installed.

1
changelog.d/6291.misc Normal file
View file

@ -0,0 +1 @@
Change cache descriptors to always return deferreds.

1
changelog.d/6294.misc Normal file
View file

@ -0,0 +1 @@
Split out state storage into separate data store.

1
changelog.d/6298.misc Normal file
View file

@ -0,0 +1 @@
Refactor EventContext for clarity.

1
changelog.d/6300.misc Normal file
View file

@ -0,0 +1 @@
Move `persist_events` out from main data store.

1
changelog.d/6301.feature Normal file
View file

@ -0,0 +1 @@
Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)).

1
changelog.d/6304.misc Normal file
View file

@ -0,0 +1 @@
Update the version of black used to 19.10b0.

1
changelog.d/6305.misc Normal file
View file

@ -0,0 +1 @@
Add some documentation about worker replication.

1
changelog.d/6306.bugfix Normal file
View file

@ -0,0 +1 @@
Appservice requests will no longer contain a double slash prefix when the appservice url provided ends in a slash.

1
changelog.d/6307.bugfix Normal file
View file

@ -0,0 +1 @@
Fix `/purge_room` admin API.

1
changelog.d/6312.misc Normal file
View file

@ -0,0 +1 @@
Document the use of `lint.sh` for code style enforcement & extend it to run on specified paths only.

1
changelog.d/6313.bugfix Normal file
View file

@ -0,0 +1 @@
Fix the `hidden` field in the `devices` table for SQLite versions prior to 3.23.0.

1
changelog.d/6314.misc Normal file
View file

@ -0,0 +1 @@
Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated.

1
changelog.d/6318.misc Normal file
View file

@ -0,0 +1 @@
Remove the dependency on psutil and replace functionality with the stdlib `resource` module.

1
changelog.d/6319.misc Normal file
View file

@ -0,0 +1 @@
Improve documentation for EventContext fields.

View file

@ -78,7 +78,7 @@ class InputOutput(object):
m = re.match("^join (\S+)$", line) m = re.match("^join (\S+)$", line)
if m: if m:
# The `sender` wants to join a room. # The `sender` wants to join a room.
room_name, = m.groups() (room_name,) = m.groups()
self.print_line("%s joining %s" % (self.user, room_name)) self.print_line("%s joining %s" % (self.user, room_name))
self.server.join_room(room_name, self.user, self.user) self.server.join_room(room_name, self.user, self.user)
# self.print_line("OK.") # self.print_line("OK.")
@ -105,7 +105,7 @@ class InputOutput(object):
m = re.match("^backfill (\S+)$", line) m = re.match("^backfill (\S+)$", line)
if m: if m:
# we want to backfill a room # we want to backfill a room
room_name, = m.groups() (room_name,) = m.groups()
self.print_line("backfill %s" % room_name) self.print_line("backfill %s" % room_name)
self.server.backfill(room_name) self.server.backfill(room_name)
return return

View file

@ -101,7 +101,7 @@ is suitable for local testing, but for any practical use, you will either need
to use a reverse proxy, or configure Synapse to expose an HTTPS port. to use a reverse proxy, or configure Synapse to expose an HTTPS port.
For documentation on using a reverse proxy, see For documentation on using a reverse proxy, see
https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst. https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.md.
For more information on enabling TLS support in synapse itself, see For more information on enabling TLS support in synapse itself, see
https://github.com/matrix-org/synapse/blob/master/INSTALL.md#tls-certificates. Of https://github.com/matrix-org/synapse/blob/master/INSTALL.md#tls-certificates. Of

View file

@ -217,8 +217,9 @@ def main(args, environ):
# backwards-compatibility generate-a-config-on-the-fly mode # backwards-compatibility generate-a-config-on-the-fly mode
if "SYNAPSE_CONFIG_PATH" in environ: if "SYNAPSE_CONFIG_PATH" in environ:
error( error(
"SYNAPSE_SERVER_NAME and SYNAPSE_CONFIG_PATH are mutually exclusive " "SYNAPSE_SERVER_NAME can only be combined with SYNAPSE_CONFIG_PATH "
"except in `generate` or `migrate_config` mode." "in `generate` or `migrate_config` mode. To start synapse using a "
"config file, unset the SYNAPSE_SERVER_NAME environment variable."
) )
config_path = "/compiled/homeserver.yaml" config_path = "/compiled/homeserver.yaml"

View file

@ -4,7 +4,7 @@ The captcha mechanism used is Google's ReCaptcha. This requires API keys from Go
## Getting keys ## Getting keys
Requires a public/private key pair from: Requires a site/secret key pair from:
<https://developers.google.com/recaptcha/> <https://developers.google.com/recaptcha/>
@ -15,8 +15,8 @@ Must be a reCAPTCHA v2 key using the "I'm not a robot" Checkbox option
The keys are a config option on the home server config. If they are not The keys are a config option on the home server config. If they are not
visible, you can generate them via `--generate-config`. Set the following value: visible, you can generate them via `--generate-config`. Set the following value:
recaptcha_public_key: YOUR_PUBLIC_KEY recaptcha_public_key: YOUR_SITE_KEY
recaptcha_private_key: YOUR_PRIVATE_KEY recaptcha_private_key: YOUR_SECRET_KEY
In addition, you MUST enable captchas via: In addition, you MUST enable captchas via:

View file

@ -955,7 +955,7 @@ uploads_path: "DATADIR/uploads"
# If a delegate is specified, the config option public_baseurl must also be filled out. # If a delegate is specified, the config option public_baseurl must also be filled out.
# #
account_threepid_delegates: account_threepid_delegates:
#email: https://example.com # Delegate email sending to example.org #email: https://example.com # Delegate email sending to example.com
#msisdn: http://localhost:8090 # Delegate SMS sending to this local process #msisdn: http://localhost:8090 # Delegate SMS sending to this local process
# Users who register on this homeserver will automatically be joined # Users who register on this homeserver will automatically be joined

View file

@ -199,7 +199,20 @@ client (C):
#### REPLICATE (C) #### REPLICATE (C)
Asks the server to replicate a given stream Asks the server to replicate a given stream. The syntax is:
```
REPLICATE <stream_name> <token>
```
Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.
The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
#### USER_SYNC (C) #### USER_SYNC (C)

View file

@ -1,8 +1,11 @@
[mypy] [mypy]
namespace_packages=True namespace_packages = True
plugins=mypy_zope:plugin plugins = mypy_zope:plugin
follow_imports=skip follow_imports = normal
mypy_path=stubs check_untyped_defs = True
show_error_codes = True
show_traceback = True
mypy_path = stubs
[mypy-zope] [mypy-zope]
ignore_missing_imports = True ignore_missing_imports = True

View file

@ -7,7 +7,15 @@
set -e set -e
isort -y -rc synapse tests scripts-dev scripts if [ $# -ge 1 ]
flake8 synapse tests then
python3 -m black synapse tests scripts-dev scripts files=$*
else
files="synapse tests scripts-dev scripts"
fi
echo "Linting these locations: $files"
isort -y -rc $files
flake8 $files
python3 -m black $files
./scripts-dev/config-lint.sh ./scripts-dev/config-lint.sh

124
scripts-dev/update_database Executable file
View file

@ -0,0 +1,124 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import sys
import yaml
from twisted.internet import defer, reactor
from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
logger = logging.getLogger("update_database")
class MockHomeserver(HomeServer):
DATASTORE_CLASS = DataStore
def __init__(self, config, database_engine, db_conn, **kwargs):
super(MockHomeserver, self).__init__(
config.server_name,
reactor=reactor,
config=config,
database_engine=database_engine,
**kwargs
)
self.database_engine = database_engine
self.db_conn = db_conn
def get_db_conn(self):
return self.db_conn
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=(
"Updates a synapse database to the latest schema and runs background updates"
" on it."
)
)
parser.add_argument("-v", action='store_true')
parser.add_argument(
"--database-config",
type=argparse.FileType('r'),
required=True,
help="A database config file for either a SQLite3 database or a PostgreSQL one.",
)
args = parser.parse_args()
logging_config = {
"level": logging.DEBUG if args.v else logging.INFO,
"format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
}
logging.basicConfig(**logging_config)
# Load, process and sanity-check the config.
hs_config = yaml.safe_load(args.database_config)
if "database" not in hs_config:
sys.stderr.write("The configuration file must have a 'database' section.\n")
sys.exit(4)
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")
# Create the database engine and a connection to it.
database_engine = create_engine(config.database_config)
db_conn = database_engine.module.connect(
**{
k: v
for k, v in config.database_config.get("args", {}).items()
if not k.startswith("cp_")
}
)
# Update the database to the latest schema.
prepare_database(db_conn, database_engine, config=config)
db_conn.commit()
# Instantiate and initialise the homeserver object.
hs = MockHomeserver(
config,
database_engine,
db_conn,
db_config=config.database_config,
)
# setup instantiates the store within the homeserver object.
hs.setup()
store = hs.get_datastore()
@defer.inlineCallbacks
def run_background_updates():
yield store.run_background_updates(sleep=False)
# Stop the reactor to exit the script once every background update is run.
reactor.stop()
# Apply all background updates on the database.
reactor.callWhenRunning(lambda: run_as_background_process(
"background_updates", run_background_updates
))
reactor.run()

View file

@ -72,7 +72,7 @@ def move_media(origin_server, file_id, src_paths, dest_paths):
# check that the original exists # check that the original exists
original_file = src_paths.remote_media_filepath(origin_server, file_id) original_file = src_paths.remote_media_filepath(origin_server, file_id)
if not os.path.exists(original_file): if not os.path.exists(original_file):
logger.warn( logger.warning(
"Original for %s/%s (%s) does not exist", "Original for %s/%s (%s) does not exist",
origin_server, origin_server,
file_id, file_id,

View file

@ -157,7 +157,7 @@ class Store(
) )
except self.database_engine.module.DatabaseError as e: except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e): if self.database_engine.is_deadlock(e):
logger.warn("[TXN DEADLOCK] {%s} %d/%d", desc, i, N) logger.warning("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
if i < N: if i < N:
i += 1 i += 1
conn.rollback() conn.rollback()
@ -432,7 +432,7 @@ class Porter(object):
for row in rows: for row in rows:
d = dict(zip(headers, row)) d = dict(zip(headers, row))
if "\0" in d['value']: if "\0" in d['value']:
logger.warn('dropping search row %s', d) logger.warning('dropping search row %s', d)
else: else:
rows_dict.append(d) rows_dict.append(d)
@ -647,7 +647,7 @@ class Porter(object):
if isinstance(col, bytes): if isinstance(col, bytes):
return bytearray(col) return bytearray(col)
elif isinstance(col, string_types) and "\0" in col: elif isinstance(col, string_types) and "\0" in col:
logger.warn( logger.warning(
"DROPPING ROW: NUL value in table %s col %s: %r", "DROPPING ROW: NUL value in table %s col %s: %r",
table, table,
headers[j], headers[j],

View file

@ -497,7 +497,7 @@ class Auth(object):
token = self.get_access_token_from_request(request) token = self.get_access_token_from_request(request)
service = self.store.get_app_service_by_token(token) service = self.store.get_app_service_by_token(token)
if not service: if not service:
logger.warn("Unrecognised appservice access token.") logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError() raise InvalidClientTokenError()
request.authenticated_entity = service.sender request.authenticated_entity = service.sender
return defer.succeed(service) return defer.succeed(service)

View file

@ -138,3 +138,10 @@ class LimitBlockingTypes(object):
MONTHLY_ACTIVE_USER = "monthly_active_user" MONTHLY_ACTIVE_USER = "monthly_active_user"
HS_DISABLED = "hs_disabled" HS_DISABLED = "hs_disabled"
class EventContentFields(object):
"""Fields found in events' content, regardless of type."""
# Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326
LABELS = "org.matrix.labels"

View file

@ -20,6 +20,7 @@ from jsonschema import FormatChecker
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventContentFields
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.storage.presence import UserPresenceState from synapse.storage.presence import UserPresenceState
from synapse.types import RoomID, UserID from synapse.types import RoomID, UserID
@ -66,6 +67,10 @@ ROOM_EVENT_FILTER_SCHEMA = {
"contains_url": {"type": "boolean"}, "contains_url": {"type": "boolean"},
"lazy_load_members": {"type": "boolean"}, "lazy_load_members": {"type": "boolean"},
"include_redundant_members": {"type": "boolean"}, "include_redundant_members": {"type": "boolean"},
# Include or exclude events with the provided labels.
# cf https://github.com/matrix-org/matrix-doc/pull/2326
"org.matrix.labels": {"type": "array", "items": {"type": "string"}},
"org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
}, },
} }
@ -259,6 +264,9 @@ class Filter(object):
self.contains_url = self.filter_json.get("contains_url", None) self.contains_url = self.filter_json.get("contains_url", None)
self.labels = self.filter_json.get("org.matrix.labels", None)
self.not_labels = self.filter_json.get("org.matrix.not_labels", [])
def filters_all_types(self): def filters_all_types(self):
return "*" in self.not_types return "*" in self.not_types
@ -282,6 +290,7 @@ class Filter(object):
room_id = None room_id = None
ev_type = "m.presence" ev_type = "m.presence"
contains_url = False contains_url = False
labels = []
else: else:
sender = event.get("sender", None) sender = event.get("sender", None)
if not sender: if not sender:
@ -300,10 +309,11 @@ class Filter(object):
content = event.get("content", {}) content = event.get("content", {})
# check if there is a string url field in the content for filtering purposes # check if there is a string url field in the content for filtering purposes
contains_url = isinstance(content.get("url"), text_type) contains_url = isinstance(content.get("url"), text_type)
labels = content.get(EventContentFields.LABELS, [])
return self.check_fields(room_id, sender, ev_type, contains_url) return self.check_fields(room_id, sender, ev_type, labels, contains_url)
def check_fields(self, room_id, sender, event_type, contains_url): def check_fields(self, room_id, sender, event_type, labels, contains_url):
"""Checks whether the filter matches the given event fields. """Checks whether the filter matches the given event fields.
Returns: Returns:
@ -313,6 +323,7 @@ class Filter(object):
"rooms": lambda v: room_id == v, "rooms": lambda v: room_id == v,
"senders": lambda v: sender == v, "senders": lambda v: sender == v,
"types": lambda v: _matches_wildcard(event_type, v), "types": lambda v: _matches_wildcard(event_type, v),
"labels": lambda v: v in labels,
} }
for name, match_func in literal_keys.items(): for name, match_func in literal_keys.items():

View file

@ -44,6 +44,8 @@ def check_bind_error(e, address, bind_addresses):
bind_addresses (list): Addresses on which the service listens. bind_addresses (list): Addresses on which the service listens.
""" """
if address == "0.0.0.0" and "::" in bind_addresses: if address == "0.0.0.0" and "::" in bind_addresses:
logger.warn("Failed to listen on 0.0.0.0, continuing because listening on [::]") logger.warning(
"Failed to listen on 0.0.0.0, continuing because listening on [::]"
)
else: else:
raise e raise e

View file

@ -94,7 +94,7 @@ class AppserviceServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -103,7 +103,7 @@ class AppserviceServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -153,7 +153,7 @@ class ClientReaderServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -162,7 +162,7 @@ class ClientReaderServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -147,7 +147,7 @@ class EventCreatorServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -156,7 +156,7 @@ class EventCreatorServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -132,7 +132,7 @@ class FederationReaderServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -141,7 +141,7 @@ class FederationReaderServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -123,7 +123,7 @@ class FederationSenderServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -132,7 +132,7 @@ class FederationSenderServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -204,7 +204,7 @@ class FrontendProxyServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -213,7 +213,7 @@ class FrontendProxyServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -19,12 +19,13 @@ from __future__ import print_function
import gc import gc
import logging import logging
import math
import os import os
import resource
import sys import sys
from six import iteritems from six import iteritems
import psutil
from prometheus_client import Gauge from prometheus_client import Gauge
from twisted.application import service from twisted.application import service
@ -282,7 +283,7 @@ class SynapseHomeServer(HomeServer):
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -291,7 +292,7 @@ class SynapseHomeServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
def run_startup_checks(self, db_conn, database_engine): def run_startup_checks(self, db_conn, database_engine):
all_users_native = are_all_users_on_domain( all_users_native = are_all_users_on_domain(
@ -471,6 +472,87 @@ class SynapseService(service.Service):
return self._port.stopListening() return self._port.stopListening()
# Contains the list of processes we will be monitoring
# currently either 0 or 1
_stats_process = []
@defer.inlineCallbacks
def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
uptime = int(now - hs.start_time)
if uptime < 0:
uptime = 0
stats["homeserver"] = hs.config.server_name
stats["server_context"] = hs.config.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = CACHE_SIZE_FACTOR
stats["event_cache_size"] = hs.config.event_cache_size
#
# Performance statistics
#
old = stats_process[0]
new = (now, resource.getrusage(resource.RUSAGE_SELF))
stats_process[0] = new
# Get RSS in bytes
stats["memory_rss"] = new[1].ru_maxrss
# Get CPU time in % of a single core, not % of all cores
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
old[1].ru_utime + old[1].ru_stime
)
if used_cpu_time == 0 or new[0] == old[0]:
stats["cpu_average"] = 0
else:
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
#
# Database version
#
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
try:
yield hs.get_proxied_http_client().put_json(
hs.config.report_stats_endpoint, stats
)
except Exception as e:
logger.warning("Error reporting stats: %s", e)
def run(hs): def run(hs):
PROFILE_SYNAPSE = False PROFILE_SYNAPSE = False
if PROFILE_SYNAPSE: if PROFILE_SYNAPSE:
@ -497,91 +579,19 @@ def run(hs):
reactor.run = profile(reactor.run) reactor.run = profile(reactor.run)
clock = hs.get_clock() clock = hs.get_clock()
start_time = clock.time()
stats = {} stats = {}
# Contains the list of processes we will be monitoring def performance_stats_init():
# currently either 0 or 1 _stats_process.clear()
stats_process = [] _stats_process.append(
(int(hs.get_clock().time(), resource.getrusage(resource.RUSAGE_SELF)))
)
def start_phone_stats_home(): def start_phone_stats_home():
return run_as_background_process("phone_stats_home", phone_stats_home) return run_as_background_process(
"phone_stats_home", phone_stats_home, hs, stats
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
uptime = int(now - start_time)
if uptime < 0:
uptime = 0
stats["homeserver"] = hs.config.server_name
stats["server_context"] = hs.config.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
) )
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
stats[
"daily_active_rooms"
] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = CACHE_SIZE_FACTOR
stats["event_cache_size"] = hs.config.event_cache_size
if len(stats_process) > 0:
stats["memory_rss"] = 0
stats["cpu_average"] = 0
for process in stats_process:
stats["memory_rss"] += process.memory_info().rss
stats["cpu_average"] += int(process.cpu_percent(interval=None))
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
logger.info(
"Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)
)
try:
yield hs.get_simple_http_client().put_json(
hs.config.report_stats_endpoint, stats
)
except Exception as e:
logger.warn("Error reporting stats: %s", e)
def performance_stats_init():
try:
process = psutil.Process()
# Ensure we can fetch both, and make the initial request for cpu_percent
# so the next request will use this as the initial point.
process.memory_info().rss
process.cpu_percent(interval=None)
logger.info("report_stats can use psutil")
stats_process.append(process)
except (AttributeError):
logger.warning("Unable to read memory/cpu stats. Disabling reporting.")
def generate_user_daily_visit_stats(): def generate_user_daily_visit_stats():
return run_as_background_process( return run_as_background_process(
@ -626,7 +636,7 @@ def run(hs):
if hs.config.report_stats: if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals") logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
# We need to defer this init for the cases that we daemonize # We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process # otherwise the process ID we get is that of the non-daemon process
@ -634,7 +644,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can # We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes # be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home) clock.call_later(5 * 60, start_phone_stats_home, hs, stats)
_base.start_reactor( _base.start_reactor(
"synapse-homeserver", "synapse-homeserver",

View file

@ -120,7 +120,7 @@ class MediaRepositoryServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -129,7 +129,7 @@ class MediaRepositoryServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -114,7 +114,7 @@ class PusherServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -123,7 +123,7 @@ class PusherServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -326,7 +326,7 @@ class SynchrotronServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -335,7 +335,7 @@ class SynchrotronServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer):
) )
elif listener["type"] == "metrics": elif listener["type"] == "metrics":
if not self.get_config().enable_metrics: if not self.get_config().enable_metrics:
logger.warn( logger.warning(
( (
"Metrics listener configured, but " "Metrics listener configured, but "
"enable_metrics is not True!" "enable_metrics is not True!"
@ -159,7 +159,7 @@ class UserDirectoryServer(HomeServer):
else: else:
_base.listen_metrics(listener["bind_addresses"], listener["port"]) _base.listen_metrics(listener["bind_addresses"], listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warning("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self) self.get_tcp_replication().start_replication(self)

View file

@ -94,7 +94,9 @@ class ApplicationService(object):
ip_range_whitelist=None, ip_range_whitelist=None,
): ):
self.token = token self.token = token
self.url = url self.url = (
url.rstrip("/") if isinstance(url, str) else None
) # url must not end with a slash
self.hs_token = hs_token self.hs_token = hs_token
self.sender = sender self.sender = sender
self.server_name = hostname self.server_name = hostname

View file

@ -125,7 +125,7 @@ class KeyConfig(Config):
# if neither trusted_key_servers nor perspectives are given, use the default. # if neither trusted_key_servers nor perspectives are given, use the default.
if "perspectives" not in config and "trusted_key_servers" not in config: if "perspectives" not in config and "trusted_key_servers" not in config:
logger.warn(TRUSTED_KEY_SERVER_NOT_CONFIGURED_WARN) logger.warning(TRUSTED_KEY_SERVER_NOT_CONFIGURED_WARN)
key_servers = [{"server_name": "matrix.org"}] key_servers = [{"server_name": "matrix.org"}]
else: else:
key_servers = config.get("trusted_key_servers", []) key_servers = config.get("trusted_key_servers", [])
@ -156,7 +156,7 @@ class KeyConfig(Config):
if not self.macaroon_secret_key: if not self.macaroon_secret_key:
# Unfortunately, there are people out there that don't have this # Unfortunately, there are people out there that don't have this
# set. Lets just be "nice" and derive one from their secret key. # set. Lets just be "nice" and derive one from their secret key.
logger.warn("Config is missing macaroon_secret_key") logger.warning("Config is missing macaroon_secret_key")
seed = bytes(self.signing_key[0]) seed = bytes(self.signing_key[0])
self.macaroon_secret_key = hashlib.sha256(seed).digest() self.macaroon_secret_key = hashlib.sha256(seed).digest()

View file

@ -182,7 +182,7 @@ def _reload_stdlib_logging(*args, log_config=None):
logger = logging.getLogger("") logger = logging.getLogger("")
if not log_config: if not log_config:
logger.warn("Reloaded a blank config?") logger.warning("Reloaded a blank config?")
logging.config.dictConfig(log_config) logging.config.dictConfig(log_config)
@ -234,8 +234,8 @@ def setup_logging(
# make sure that the first thing we log is a thing we can grep backwards # make sure that the first thing we log is a thing we can grep backwards
# for # for
logging.warn("***** STARTING SERVER *****") logging.warning("***** STARTING SERVER *****")
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name) logging.info("Server hostname: %s", config.server_name)
return logger return logger

View file

@ -300,7 +300,7 @@ class RegistrationConfig(Config):
# If a delegate is specified, the config option public_baseurl must also be filled out. # If a delegate is specified, the config option public_baseurl must also be filled out.
# #
account_threepid_delegates: account_threepid_delegates:
#email: https://example.com # Delegate email sending to example.org #email: https://example.com # Delegate email sending to example.com
#msisdn: http://localhost:8090 # Delegate SMS sending to this local process #msisdn: http://localhost:8090 # Delegate SMS sending to this local process
# Users who register on this homeserver will automatically be joined # Users who register on this homeserver will automatically be joined

View file

@ -125,9 +125,11 @@ def compute_event_signature(event_dict, signature_name, signing_key):
redact_json = prune_event_dict(event_dict) redact_json = prune_event_dict(event_dict)
redact_json.pop("age_ts", None) redact_json.pop("age_ts", None)
redact_json.pop("unsigned", None) redact_json.pop("unsigned", None)
logger.debug("Signing event: %s", encode_canonical_json(redact_json)) if logger.isEnabledFor(logging.DEBUG):
logger.debug("Signing event: %s", encode_canonical_json(redact_json))
redact_json = sign_json(redact_json, signature_name, signing_key) redact_json = sign_json(redact_json, signature_name, signing_key)
logger.debug("Signed event: %s", encode_canonical_json(redact_json)) if logger.isEnabledFor(logging.DEBUG):
logger.debug("Signed event: %s", encode_canonical_json(redact_json))
return redact_json["signatures"] return redact_json["signatures"]

View file

@ -77,7 +77,7 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru
if auth_events is None: if auth_events is None:
# Oh, we don't know what the state of the room was, so we # Oh, we don't know what the state of the room was, so we
# are trusting that this is allowed (at least for now) # are trusting that this is allowed (at least for now)
logger.warn("Trusting event: %s", event.event_id) logger.warning("Trusting event: %s", event.event_id)
return return
if event.type == EventTypes.Create: if event.type == EventTypes.Create:

View file

@ -12,104 +12,103 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Dict, Optional, Tuple, Union
from six import iteritems from six import iteritems
import attr
from frozendict import frozendict from frozendict import frozendict
from twisted.internet import defer from twisted.internet import defer
from synapse.appservice import ApplicationService
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
class EventContext(object): @attr.s(slots=True)
class EventContext:
""" """
Holds information relevant to persisting an event
Attributes: Attributes:
state_group (int|None): state group id, if the state has been stored rejected: A rejection reason if the event was rejected, else False
as a state group. This is usually only None if e.g. the event is
an outlier.
rejected (bool|str): A rejection reason if the event was rejected, else
False
push_actions (list[(str, list[object])]): list of (user_id, actions) state_group: The ID of the state group for this event. Note that state events
tuples are persisted with a state group which includes the new event, so this is
effectively the state *after* the event in question.
prev_group (int): Previously persisted state group. ``None`` for an For a *rejected* state event, where the state of the rejected event is
outlier. ignored, this state_group should never make it into the
delta_ids (dict[(str, str), str]): Delta from ``prev_group``. event_to_state_groups table. Indeed, inspecting this value for a rejected
(type, state_key) -> event_id. ``None`` for an outlier. state event is almost certainly incorrect.
prev_state_events (?): XXX: is this ever set to anything other than For an outlier, where we don't have the state at the event, this will be
the empty list? None.
prev_group: If it is known, ``state_group``'s prev_group. Note that this being
None does not necessarily mean that ``state_group`` does not have
a prev_group!
If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
will always also be ``None``.
Note that this *not* (necessarily) the state group associated with
``_prev_state_ids``.
delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
and ``state_group``.
app_service: If this event is being sent by a (local) application service, that
app service.
_current_state_ids: The room state map, including this event - ie, the state
in ``state_group``.
_current_state_ids (dict[(str, str), str]|None):
The current state map including the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id (type, state_key) -> event_id
_prev_state_ids (dict[(str, str), str]|None): FIXME: what is this for an outlier? it seems ill-defined. It seems like
The current state map excluding the current event. None if outlier it could be either {}, or the state we were given by the remote
or we haven't fetched the state from DB yet. server, depending on $THINGS
Note that this is a private attribute: it should be accessed via
``get_current_state_ids``. _AsyncEventContext impl calculates this
on-demand: it will be None until that happens.
_prev_state_ids: The room state map, excluding this event. For a non-state
event, this will be the same as _current_state_events.
Note that it is a completely different thing to prev_group!
(type, state_key) -> event_id (type, state_key) -> event_id
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have FIXME: again, what is this for an outlier?
been calculated. None if we haven't started calculating yet
_event_type (str): The type of the event the context is associated with. As with _current_state_ids, this is a private attribute. It should be
Only set when state has not been fetched yet. accessed via get_prev_state_ids.
_event_state_key (str|None): The state_key of the event the context is
associated with. Only set when state has not been fetched yet.
_prev_state_id (str|None): If the event associated with the context is
a state event, then `_prev_state_id` is the event_id of the state
that was replaced.
Only set when state has not been fetched yet.
""" """
__slots__ = [ rejected = attr.ib(default=False, type=Union[bool, str])
"state_group", state_group = attr.ib(default=None, type=Optional[int])
"rejected", prev_group = attr.ib(default=None, type=Optional[int])
"prev_group", delta_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
"delta_ids", app_service = attr.ib(default=None, type=Optional[ApplicationService])
"prev_state_events",
"app_service",
"_current_state_ids",
"_prev_state_ids",
"_prev_state_id",
"_event_type",
"_event_state_key",
"_fetching_state_deferred",
]
def __init__(self): _current_state_ids = attr.ib(
self.prev_state_events = [] default=None, type=Optional[Dict[Tuple[str, str], str]]
self.rejected = False )
self.app_service = None _prev_state_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
@staticmethod @staticmethod
def with_state( def with_state(
state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None
): ):
context = EventContext() return EventContext(
current_state_ids=current_state_ids,
# The current state including the current event prev_state_ids=prev_state_ids,
context._current_state_ids = current_state_ids state_group=state_group,
# The current state excluding the current event prev_group=prev_group,
context._prev_state_ids = prev_state_ids delta_ids=delta_ids,
context.state_group = state_group )
context._prev_state_id = None
context._event_type = None
context._event_state_key = None
context._fetching_state_deferred = defer.succeed(None)
# A previously persisted state group and a delta between that
# and this state.
context.prev_group = prev_group
context.delta_ids = delta_ids
return context
@defer.inlineCallbacks @defer.inlineCallbacks
def serialize(self, event, store): def serialize(self, event, store):
@ -141,7 +140,6 @@ class EventContext(object):
"rejected": self.rejected, "rejected": self.rejected,
"prev_group": self.prev_group, "prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids), "delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None, "app_service_id": self.app_service.id if self.app_service else None,
} }
@ -157,24 +155,17 @@ class EventContext(object):
Returns: Returns:
EventContext EventContext
""" """
context = EventContext() context = _AsyncEventContextImpl(
# We use the state_group and prev_state_id stuff to pull the
# We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids.
# current_state_ids out of the DB and construct prev_state_ids. prev_state_id=input["prev_state_id"],
context._prev_state_id = input["prev_state_id"] event_type=input["event_type"],
context._event_type = input["event_type"] event_state_key=input["event_state_key"],
context._event_state_key = input["event_state_key"] state_group=input["state_group"],
prev_group=input["prev_group"],
context._current_state_ids = None delta_ids=_decode_state_dict(input["delta_ids"]),
context._prev_state_ids = None rejected=input["rejected"],
context._fetching_state_deferred = None )
context.state_group = input["state_group"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.rejected = input["rejected"]
context.prev_state_events = input["prev_state_events"]
app_service_id = input["app_service_id"] app_service_id = input["app_service_id"]
if app_service_id: if app_service_id:
@ -192,14 +183,7 @@ class EventContext(object):
Maps a (type, state_key) to the event ID of the state event matching Maps a (type, state_key) to the event ID of the state event matching
this tuple. this tuple.
""" """
yield self._ensure_fetched(store)
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store
)
yield make_deferred_yieldable(self._fetching_state_deferred)
return self._current_state_ids return self._current_state_ids
@defer.inlineCallbacks @defer.inlineCallbacks
@ -212,14 +196,7 @@ class EventContext(object):
Maps a (type, state_key) to the event ID of the state event matching Maps a (type, state_key) to the event ID of the state event matching
this tuple. this tuple.
""" """
yield self._ensure_fetched(store)
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store
)
yield make_deferred_yieldable(self._fetching_state_deferred)
return self._prev_state_ids return self._prev_state_ids
def get_cached_current_state_ids(self): def get_cached_current_state_ids(self):
@ -233,6 +210,44 @@ class EventContext(object):
return self._current_state_ids return self._current_state_ids
def _ensure_fetched(self, store):
return defer.succeed(None)
@attr.s(slots=True)
class _AsyncEventContextImpl(EventContext):
"""
An implementation of EventContext which fetches _current_state_ids and
_prev_state_ids from the database on demand.
Attributes:
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
been calculated. None if we haven't started calculating yet
_event_type (str): The type of the event the context is associated with.
_event_state_key (str): The state_key of the event the context is
associated with.
_prev_state_id (str|None): If the event associated with the context is
a state event, then `_prev_state_id` is the event_id of the state
that was replaced.
"""
_prev_state_id = attr.ib(default=None)
_event_type = attr.ib(default=None)
_event_state_key = attr.ib(default=None)
_fetching_state_deferred = attr.ib(default=None)
def _ensure_fetched(self, store):
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store
)
return make_deferred_yieldable(self._fetching_state_deferred)
@defer.inlineCallbacks @defer.inlineCallbacks
def _fill_out_state(self, store): def _fill_out_state(self, store):
"""Called to populate the _current_state_ids and _prev_state_ids """Called to populate the _current_state_ids and _prev_state_ids
@ -250,27 +265,6 @@ class EventContext(object):
else: else:
self._prev_state_ids = self._current_state_ids self._prev_state_ids = self._current_state_ids
@defer.inlineCallbacks
def update_state(
self, state_group, prev_state_ids, current_state_ids, prev_group, delta_ids
):
"""Replace the state in the context
"""
# We need to make sure we wait for any ongoing fetching of state
# to complete so that the updated state doesn't get clobbered
if self._fetching_state_deferred:
yield make_deferred_yieldable(self._fetching_state_deferred)
self.state_group = state_group
self._prev_state_ids = prev_state_ids
self.prev_group = prev_group
self._current_state_ids = current_state_ids
self.delta_ids = delta_ids
# We need to ensure that that we've marked as having fetched the state
self._fetching_state_deferred = defer.succeed(None)
def _encode_state_dict(state_dict): def _encode_state_dict(state_dict):
"""Since dicts of (type, state_key) -> event_id cannot be serialized in """Since dicts of (type, state_key) -> event_id cannot be serialized in

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd # Copyright 2017 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -13,6 +14,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import inspect
from synapse.spam_checker_api import SpamCheckerApi
class SpamChecker(object): class SpamChecker(object):
def __init__(self, hs): def __init__(self, hs):
@ -26,7 +31,14 @@ class SpamChecker(object):
pass pass
if module is not None: if module is not None:
self.spam_checker = module(config=config) # Older spam checkers don't accept the `api` argument, so we
# try and detect support.
spam_args = inspect.getfullargspec(module)
if "api" in spam_args.args:
api = SpamCheckerApi(hs)
self.spam_checker = module(config=config, api=api)
else:
self.spam_checker = module(config=config)
def check_event_for_spam(self, event): def check_event_for_spam(self, event):
"""Checks if a given event is considered "spammy" by this server. """Checks if a given event is considered "spammy" by this server.

View file

@ -102,7 +102,7 @@ class FederationBase(object):
pass pass
if not res: if not res:
logger.warn( logger.warning(
"Failed to find copy of %s with valid signature", pdu.event_id "Failed to find copy of %s with valid signature", pdu.event_id
) )
@ -173,7 +173,7 @@ class FederationBase(object):
return redacted_event return redacted_event
if self.spam_checker.check_event_for_spam(pdu): if self.spam_checker.check_event_for_spam(pdu):
logger.warn( logger.warning(
"Event contains spam, redacting %s: %s", "Event contains spam, redacting %s: %s",
pdu.event_id, pdu.event_id,
pdu.get_pdu_json(), pdu.get_pdu_json(),
@ -185,7 +185,7 @@ class FederationBase(object):
def errback(failure, pdu): def errback(failure, pdu):
failure.trap(SynapseError) failure.trap(SynapseError)
with PreserveLoggingContext(ctx): with PreserveLoggingContext(ctx):
logger.warn( logger.warning(
"Signature check failed for %s: %s", "Signature check failed for %s: %s",
pdu.event_id, pdu.event_id,
failure.getErrorMessage(), failure.getErrorMessage(),

View file

@ -196,7 +196,7 @@ class FederationClient(FederationBase):
dest, room_id, extremities, limit dest, room_id, extremities, limit
) )
logger.debug("backfill transaction_data=%s", repr(transaction_data)) logger.debug("backfill transaction_data=%r", transaction_data)
room_version = yield self.store.get_room_version(room_id) room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version) format_ver = room_version_to_event_format(room_version)
@ -522,12 +522,12 @@ class FederationClient(FederationBase):
res = yield callback(destination) res = yield callback(destination)
return res return res
except InvalidResponseError as e: except InvalidResponseError as e:
logger.warn("Failed to %s via %s: %s", description, destination, e) logger.warning("Failed to %s via %s: %s", description, destination, e)
except HttpResponseException as e: except HttpResponseException as e:
if not 500 <= e.code < 600: if not 500 <= e.code < 600:
raise e.to_synapse_error() raise e.to_synapse_error()
else: else:
logger.warn( logger.warning(
"Failed to %s via %s: %i %s", "Failed to %s via %s: %i %s",
description, description,
destination, destination,
@ -535,7 +535,9 @@ class FederationClient(FederationBase):
e.args[0], e.args[0],
) )
except Exception: except Exception:
logger.warn("Failed to %s via %s", description, destination, exc_info=1) logger.warning(
"Failed to %s via %s", description, destination, exc_info=1
)
raise SynapseError(502, "Failed to %s via any server" % (description,)) raise SynapseError(502, "Failed to %s via any server" % (description,))
@ -553,7 +555,7 @@ class FederationClient(FederationBase):
Note that this does not append any events to any graphs. Note that this does not append any events to any graphs.
Args: Args:
destinations (str): Candidate homeservers which are probably destinations (Iterable[str]): Candidate homeservers which are probably
participating in the room. participating in the room.
room_id (str): The room in which the event will happen. room_id (str): The room in which the event will happen.
user_id (str): The user whose membership is being evented. user_id (str): The user whose membership is being evented.

View file

@ -21,7 +21,6 @@ from six import iteritems
from canonicaljson import json from canonicaljson import json
from prometheus_client import Counter from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress from twisted.internet.abstract import isIPAddress
from twisted.python import failure from twisted.python import failure
@ -86,14 +85,12 @@ class FederationServer(FederationBase):
# come in waves. # come in waves.
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
@defer.inlineCallbacks async def on_backfill_request(self, origin, room_id, versions, limit):
@log_function with (await self._server_linearizer.queue((origin, room_id))):
def on_backfill_request(self, origin, room_id, versions, limit):
with (yield self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
pdus = yield self.handler.on_backfill_request( pdus = await self.handler.on_backfill_request(
origin, room_id, versions, limit origin, room_id, versions, limit
) )
@ -101,9 +98,7 @@ class FederationServer(FederationBase):
return 200, res return 200, res
@defer.inlineCallbacks async def on_incoming_transaction(self, origin, transaction_data):
@log_function
def on_incoming_transaction(self, origin, transaction_data):
# keep this as early as possible to make the calculated origin ts as # keep this as early as possible to make the calculated origin ts as
# accurate as possible. # accurate as possible.
request_time = self._clock.time_msec() request_time = self._clock.time_msec()
@ -118,18 +113,17 @@ class FederationServer(FederationBase):
# use a linearizer to ensure that we don't process the same transaction # use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel. # multiple times in parallel.
with ( with (
yield self._transaction_linearizer.queue( await self._transaction_linearizer.queue(
(origin, transaction.transaction_id) (origin, transaction.transaction_id)
) )
): ):
result = yield self._handle_incoming_transaction( result = await self._handle_incoming_transaction(
origin, transaction, request_time origin, transaction, request_time
) )
return result return result
@defer.inlineCallbacks async def _handle_incoming_transaction(self, origin, transaction, request_time):
def _handle_incoming_transaction(self, origin, transaction, request_time):
""" Process an incoming transaction and return the HTTP response """ Process an incoming transaction and return the HTTP response
Args: Args:
@ -140,7 +134,7 @@ class FederationServer(FederationBase):
Returns: Returns:
Deferred[(int, object)]: http response code and body Deferred[(int, object)]: http response code and body
""" """
response = yield self.transaction_actions.have_responded(origin, transaction) response = await self.transaction_actions.have_responded(origin, transaction)
if response: if response:
logger.debug( logger.debug(
@ -151,7 +145,7 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id) logger.debug("[%s] Transaction is new", transaction.transaction_id)
# Reject if PDU count > 50 and EDU count > 100 # Reject if PDU count > 50 or EDU count > 100
if len(transaction.pdus) > 50 or ( if len(transaction.pdus) > 50 or (
hasattr(transaction, "edus") and len(transaction.edus) > 100 hasattr(transaction, "edus") and len(transaction.edus) > 100
): ):
@ -159,7 +153,7 @@ class FederationServer(FederationBase):
logger.info("Transaction PDU or EDU count too large. Returning 400") logger.info("Transaction PDU or EDU count too large. Returning 400")
response = {} response = {}
yield self.transaction_actions.set_response( await self.transaction_actions.set_response(
origin, transaction, 400, response origin, transaction, 400, response
) )
return 400, response return 400, response
@ -195,7 +189,7 @@ class FederationServer(FederationBase):
continue continue
try: try:
room_version = yield self.store.get_room_version(room_id) room_version = await self.store.get_room_version(room_id)
except NotFoundError: except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id) logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue continue
@ -221,13 +215,12 @@ class FederationServer(FederationBase):
# require callouts to other servers to fetch missing events), but # require callouts to other servers to fetch missing events), but
# impose a limit to avoid going too crazy with ram/cpu. # impose a limit to avoid going too crazy with ram/cpu.
@defer.inlineCallbacks async def process_pdus_for_room(room_id):
def process_pdus_for_room(room_id):
logger.debug("Processing PDUs for %s", room_id) logger.debug("Processing PDUs for %s", room_id)
try: try:
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
except AuthError as e: except AuthError as e:
logger.warn("Ignoring PDUs for room %s from banned server", room_id) logger.warning("Ignoring PDUs for room %s from banned server", room_id)
for pdu in pdus_by_room[room_id]: for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id event_id = pdu.event_id
pdu_results[event_id] = e.error_dict() pdu_results[event_id] = e.error_dict()
@ -237,10 +230,10 @@ class FederationServer(FederationBase):
event_id = pdu.event_id event_id = pdu.event_id
with nested_logging_context(event_id): with nested_logging_context(event_id):
try: try:
yield self._handle_received_pdu(origin, pdu) await self._handle_received_pdu(origin, pdu)
pdu_results[event_id] = {} pdu_results[event_id] = {}
except FederationError as e: except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e) logger.warning("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)} pdu_results[event_id] = {"error": str(e)}
except Exception as e: except Exception as e:
f = failure.Failure() f = failure.Failure()
@ -251,36 +244,33 @@ class FederationServer(FederationBase):
exc_info=(f.type, f.value, f.getTracebackObject()), exc_info=(f.type, f.value, f.getTracebackObject()),
) )
yield concurrently_execute( await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
) )
if hasattr(transaction, "edus"): if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus): for edu in (Edu(**x) for x in transaction.edus):
yield self.received_edu(origin, edu.edu_type, edu.content) await self.received_edu(origin, edu.edu_type, edu.content)
response = {"pdus": pdu_results} response = {"pdus": pdu_results}
logger.debug("Returning: %s", str(response)) logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response(origin, transaction, 200, response) await self.transaction_actions.set_response(origin, transaction, 200, response)
return 200, response return 200, response
@defer.inlineCallbacks async def received_edu(self, origin, edu_type, content):
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc() received_edus_counter.inc()
yield self.registry.on_edu(edu_type, origin, content) await self.registry.on_edu(edu_type, origin, content)
@defer.inlineCallbacks async def on_context_state_request(self, origin, room_id, event_id):
@log_function
def on_context_state_request(self, origin, room_id, event_id):
if not event_id: if not event_id:
raise NotImplementedError("Specify an event") raise NotImplementedError("Specify an event")
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
in_room = yield self.auth.check_host_in_room(room_id, origin) in_room = await self.auth.check_host_in_room(room_id, origin)
if not in_room: if not in_room:
raise AuthError(403, "Host not in room.") raise AuthError(403, "Host not in room.")
@ -289,8 +279,8 @@ class FederationServer(FederationBase):
# in the cache so we could return it without waiting for the linearizer # in the cache so we could return it without waiting for the linearizer
# - but that's non-trivial to get right, and anyway somewhat defeats # - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer. # the point of the linearizer.
with (yield self._server_linearizer.queue((origin, room_id))): with (await self._server_linearizer.queue((origin, room_id))):
resp = yield self._state_resp_cache.wrap( resp = await self._state_resp_cache.wrap(
(room_id, event_id), (room_id, event_id),
self._on_context_state_request_compute, self._on_context_state_request_compute,
room_id, room_id,
@ -299,65 +289,60 @@ class FederationServer(FederationBase):
return 200, resp return 200, resp
@defer.inlineCallbacks async def on_state_ids_request(self, origin, room_id, event_id):
def on_state_ids_request(self, origin, room_id, event_id):
if not event_id: if not event_id:
raise NotImplementedError("Specify an event") raise NotImplementedError("Specify an event")
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
in_room = yield self.auth.check_host_in_room(room_id, origin) in_room = await self.auth.check_host_in_room(room_id, origin)
if not in_room: if not in_room:
raise AuthError(403, "Host not in room.") raise AuthError(403, "Host not in room.")
state_ids = yield self.handler.get_state_ids_for_pdu(room_id, event_id) state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id)
auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids) auth_chain_ids = await self.store.get_auth_chain_ids(state_ids)
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
@defer.inlineCallbacks async def _on_context_state_request_compute(self, room_id, event_id):
def _on_context_state_request_compute(self, room_id, event_id): pdus = await self.handler.get_state_for_pdu(room_id, event_id)
pdus = yield self.handler.get_state_for_pdu(room_id, event_id) auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])
auth_chain = yield self.store.get_auth_chain([pdu.event_id for pdu in pdus])
return { return {
"pdus": [pdu.get_pdu_json() for pdu in pdus], "pdus": [pdu.get_pdu_json() for pdu in pdus],
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
} }
@defer.inlineCallbacks async def on_pdu_request(self, origin, event_id):
@log_function pdu = await self.handler.get_persisted_pdu(origin, event_id)
def on_pdu_request(self, origin, event_id):
pdu = yield self.handler.get_persisted_pdu(origin, event_id)
if pdu: if pdu:
return 200, self._transaction_from_pdus([pdu]).get_dict() return 200, self._transaction_from_pdus([pdu]).get_dict()
else: else:
return 404, "" return 404, ""
@defer.inlineCallbacks async def on_query_request(self, query_type, args):
def on_query_request(self, query_type, args):
received_queries_counter.labels(query_type).inc() received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args) resp = await self.registry.on_query(query_type, args)
return 200, resp return 200, resp
@defer.inlineCallbacks async def on_make_join_request(self, origin, room_id, user_id, supported_versions):
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
room_version = yield self.store.get_room_version(room_id) room_version = await self.store.get_room_version(room_id)
if room_version not in supported_versions: if room_version not in supported_versions:
logger.warn("Room version %s not in %s", room_version, supported_versions) logger.warning(
"Room version %s not in %s", room_version, supported_versions
)
raise IncompatibleRoomVersionError(room_version=room_version) raise IncompatibleRoomVersionError(room_version=room_version)
pdu = yield self.handler.on_make_join_request(origin, room_id, user_id) pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
@defer.inlineCallbacks async def on_invite_request(self, origin, content, room_version):
def on_invite_request(self, origin, content, room_version):
if room_version not in KNOWN_ROOM_VERSIONS: if room_version not in KNOWN_ROOM_VERSIONS:
raise SynapseError( raise SynapseError(
400, 400,
@ -369,28 +354,27 @@ class FederationServer(FederationBase):
pdu = event_from_pdu_json(content, format_ver) pdu = event_from_pdu_json(content, format_ver)
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, pdu.room_id) await self.check_server_matches_acl(origin_host, pdu.room_id)
pdu = yield self._check_sigs_and_hash(room_version, pdu) pdu = await self._check_sigs_and_hash(room_version, pdu)
ret_pdu = yield self.handler.on_invite_request(origin, pdu) ret_pdu = await self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
return {"event": ret_pdu.get_pdu_json(time_now)} return {"event": ret_pdu.get_pdu_json(time_now)}
@defer.inlineCallbacks async def on_send_join_request(self, origin, content, room_id):
def on_send_join_request(self, origin, content, room_id):
logger.debug("on_send_join_request: content: %s", content) logger.debug("on_send_join_request: content: %s", content)
room_version = yield self.store.get_room_version(room_id) room_version = await self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version) format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver) pdu = event_from_pdu_json(content, format_ver)
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, pdu.room_id) await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
pdu = yield self._check_sigs_and_hash(room_version, pdu) pdu = await self._check_sigs_and_hash(room_version, pdu)
res_pdus = yield self.handler.on_send_join_request(origin, pdu) res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
return ( return (
200, 200,
@ -402,48 +386,44 @@ class FederationServer(FederationBase):
}, },
) )
@defer.inlineCallbacks async def on_make_leave_request(self, origin, room_id, user_id):
def on_make_leave_request(self, origin, room_id, user_id):
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
pdu = yield self.handler.on_make_leave_request(origin, room_id, user_id) pdu = await self.handler.on_make_leave_request(origin, room_id, user_id)
room_version = yield self.store.get_room_version(room_id) room_version = await self.store.get_room_version(room_id)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
@defer.inlineCallbacks async def on_send_leave_request(self, origin, content, room_id):
def on_send_leave_request(self, origin, content, room_id):
logger.debug("on_send_leave_request: content: %s", content) logger.debug("on_send_leave_request: content: %s", content)
room_version = yield self.store.get_room_version(room_id) room_version = await self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version) format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver) pdu = event_from_pdu_json(content, format_ver)
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, pdu.room_id) await self.check_server_matches_acl(origin_host, pdu.room_id)
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
pdu = yield self._check_sigs_and_hash(room_version, pdu) pdu = await self._check_sigs_and_hash(room_version, pdu)
yield self.handler.on_send_leave_request(origin, pdu) await self.handler.on_send_leave_request(origin, pdu)
return 200, {} return 200, {}
@defer.inlineCallbacks async def on_event_auth(self, origin, room_id, event_id):
def on_event_auth(self, origin, room_id, event_id): with (await self._server_linearizer.queue((origin, room_id))):
with (yield self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id) auth_pdus = await self.handler.on_event_auth(event_id)
res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]} res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
return 200, res return 200, res
@defer.inlineCallbacks async def on_query_auth_request(self, origin, content, room_id, event_id):
def on_query_auth_request(self, origin, content, room_id, event_id):
""" """
Content is a dict with keys:: Content is a dict with keys::
auth_chain (list): A list of events that give the auth chain. auth_chain (list): A list of events that give the auth chain.
@ -462,22 +442,22 @@ class FederationServer(FederationBase):
Returns: Returns:
Deferred: Results in `dict` with the same format as `content` Deferred: Results in `dict` with the same format as `content`
""" """
with (yield self._server_linearizer.queue((origin, room_id))): with (await self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
room_version = yield self.store.get_room_version(room_id) room_version = await self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version) format_ver = room_version_to_event_format(room_version)
auth_chain = [ auth_chain = [
event_from_pdu_json(e, format_ver) for e in content["auth_chain"] event_from_pdu_json(e, format_ver) for e in content["auth_chain"]
] ]
signed_auth = yield self._check_sigs_and_hash_and_fetch( signed_auth = await self._check_sigs_and_hash_and_fetch(
origin, auth_chain, outlier=True, room_version=room_version origin, auth_chain, outlier=True, room_version=room_version
) )
ret = yield self.handler.on_query_auth( ret = await self.handler.on_query_auth(
origin, origin,
event_id, event_id,
room_id, room_id,
@ -503,16 +483,14 @@ class FederationServer(FederationBase):
return self.on_query_request("user_devices", user_id) return self.on_query_request("user_devices", user_id)
@trace @trace
@defer.inlineCallbacks async def on_claim_client_keys(self, origin, content):
@log_function
def on_claim_client_keys(self, origin, content):
query = [] query = []
for user_id, device_keys in content.get("one_time_keys", {}).items(): for user_id, device_keys in content.get("one_time_keys", {}).items():
for device_id, algorithm in device_keys.items(): for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm)) query.append((user_id, device_id, algorithm))
log_kv({"message": "Claiming one time keys.", "user, device pairs": query}) log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = yield self.store.claim_e2e_one_time_keys(query) results = await self.store.claim_e2e_one_time_keys(query)
json_result = {} json_result = {}
for user_id, device_keys in results.items(): for user_id, device_keys in results.items():
@ -536,14 +514,12 @@ class FederationServer(FederationBase):
return {"one_time_keys": json_result} return {"one_time_keys": json_result}
@defer.inlineCallbacks async def on_get_missing_events(
@log_function
def on_get_missing_events(
self, origin, room_id, earliest_events, latest_events, limit self, origin, room_id, earliest_events, latest_events, limit
): ):
with (yield self._server_linearizer.queue((origin, room_id))): with (await self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) await self.check_server_matches_acl(origin_host, room_id)
logger.info( logger.info(
"on_get_missing_events: earliest_events: %r, latest_events: %r," "on_get_missing_events: earliest_events: %r, latest_events: %r,"
@ -553,7 +529,7 @@ class FederationServer(FederationBase):
limit, limit,
) )
missing_events = yield self.handler.on_get_missing_events( missing_events = await self.handler.on_get_missing_events(
origin, room_id, earliest_events, latest_events, limit origin, room_id, earliest_events, latest_events, limit
) )
@ -586,8 +562,7 @@ class FederationServer(FederationBase):
destination=None, destination=None,
) )
@defer.inlineCallbacks async def _handle_received_pdu(self, origin, pdu):
def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction. """ Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError. If the event is invalid, then this method throws a FederationError.
@ -640,37 +615,34 @@ class FederationServer(FederationBase):
logger.info("Accepting join PDU %s from %s", pdu.event_id, origin) logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
# We've already checked that we know the room version by this point # We've already checked that we know the room version by this point
room_version = yield self.store.get_room_version(pdu.room_id) room_version = await self.store.get_room_version(pdu.room_id)
# Check signature. # Check signature.
try: try:
pdu = yield self._check_sigs_and_hash(room_version, pdu) pdu = await self._check_sigs_and_hash(room_version, pdu)
except SynapseError as e: except SynapseError as e:
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id) raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
yield self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True) await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
def __str__(self): def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name return "<ReplicationLayer(%s)>" % self.server_name
@defer.inlineCallbacks async def exchange_third_party_invite(
def exchange_third_party_invite(
self, sender_user_id, target_user_id, room_id, signed self, sender_user_id, target_user_id, room_id, signed
): ):
ret = yield self.handler.exchange_third_party_invite( ret = await self.handler.exchange_third_party_invite(
sender_user_id, target_user_id, room_id, signed sender_user_id, target_user_id, room_id, signed
) )
return ret return ret
@defer.inlineCallbacks async def on_exchange_third_party_invite_request(self, room_id, event_dict):
def on_exchange_third_party_invite_request(self, room_id, event_dict): ret = await self.handler.on_exchange_third_party_invite_request(
ret = yield self.handler.on_exchange_third_party_invite_request(
room_id, event_dict room_id, event_dict
) )
return ret return ret
@defer.inlineCallbacks async def check_server_matches_acl(self, server_name, room_id):
def check_server_matches_acl(self, server_name, room_id):
"""Check if the given server is allowed by the server ACLs in the room """Check if the given server is allowed by the server ACLs in the room
Args: Args:
@ -680,13 +652,13 @@ class FederationServer(FederationBase):
Raises: Raises:
AuthError if the server does not match the ACL AuthError if the server does not match the ACL
""" """
state_ids = yield self.store.get_current_state_ids(room_id) state_ids = await self.store.get_current_state_ids(room_id)
acl_event_id = state_ids.get((EventTypes.ServerACL, "")) acl_event_id = state_ids.get((EventTypes.ServerACL, ""))
if not acl_event_id: if not acl_event_id:
return return
acl_event = yield self.store.get_event(acl_event_id) acl_event = await self.store.get_event(acl_event_id)
if server_matches_acl_event(server_name, acl_event): if server_matches_acl_event(server_name, acl_event):
return return
@ -709,7 +681,7 @@ def server_matches_acl_event(server_name, acl_event):
# server name is a literal IP # server name is a literal IP
allow_ip_literals = acl_event.content.get("allow_ip_literals", True) allow_ip_literals = acl_event.content.get("allow_ip_literals", True)
if not isinstance(allow_ip_literals, bool): if not isinstance(allow_ip_literals, bool):
logger.warn("Ignorning non-bool allow_ip_literals flag") logger.warning("Ignorning non-bool allow_ip_literals flag")
allow_ip_literals = True allow_ip_literals = True
if not allow_ip_literals: if not allow_ip_literals:
# check for ipv6 literals. These start with '['. # check for ipv6 literals. These start with '['.
@ -723,7 +695,7 @@ def server_matches_acl_event(server_name, acl_event):
# next, check the deny list # next, check the deny list
deny = acl_event.content.get("deny", []) deny = acl_event.content.get("deny", [])
if not isinstance(deny, (list, tuple)): if not isinstance(deny, (list, tuple)):
logger.warn("Ignorning non-list deny ACL %s", deny) logger.warning("Ignorning non-list deny ACL %s", deny)
deny = [] deny = []
for e in deny: for e in deny:
if _acl_entry_matches(server_name, e): if _acl_entry_matches(server_name, e):
@ -733,7 +705,7 @@ def server_matches_acl_event(server_name, acl_event):
# then the allow list. # then the allow list.
allow = acl_event.content.get("allow", []) allow = acl_event.content.get("allow", [])
if not isinstance(allow, (list, tuple)): if not isinstance(allow, (list, tuple)):
logger.warn("Ignorning non-list allow ACL %s", allow) logger.warning("Ignorning non-list allow ACL %s", allow)
allow = [] allow = []
for e in allow: for e in allow:
if _acl_entry_matches(server_name, e): if _acl_entry_matches(server_name, e):
@ -747,7 +719,7 @@ def server_matches_acl_event(server_name, acl_event):
def _acl_entry_matches(server_name, acl_entry): def _acl_entry_matches(server_name, acl_entry):
if not isinstance(acl_entry, six.string_types): if not isinstance(acl_entry, six.string_types):
logger.warn( logger.warning(
"Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry) "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
) )
return False return False
@ -799,15 +771,14 @@ class FederationHandlerRegistry(object):
self.query_handlers[query_type] = handler self.query_handlers[query_type] = handler
@defer.inlineCallbacks async def on_edu(self, edu_type, origin, content):
def on_edu(self, edu_type, origin, content):
handler = self.edu_handlers.get(edu_type) handler = self.edu_handlers.get(edu_type)
if not handler: if not handler:
logger.warn("No handler registered for EDU type %s", edu_type) logger.warning("No handler registered for EDU type %s", edu_type)
with start_active_span_from_edu(content, "handle_edu"): with start_active_span_from_edu(content, "handle_edu"):
try: try:
yield handler(origin, content) await handler(origin, content)
except SynapseError as e: except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e) logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception: except Exception:
@ -816,7 +787,7 @@ class FederationHandlerRegistry(object):
def on_query(self, query_type, args): def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type) handler = self.query_handlers.get(query_type)
if not handler: if not handler:
logger.warn("No handler registered for query type %s", query_type) logger.warning("No handler registered for query type %s", query_type)
raise NotFoundError("No handler for Query type '%s'" % (query_type,)) raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args) return handler(args)
@ -840,7 +811,7 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
super(ReplicationFederationHandlerRegistry, self).__init__() super(ReplicationFederationHandlerRegistry, self).__init__()
def on_edu(self, edu_type, origin, content): async def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry """Overrides FederationHandlerRegistry
""" """
if not self.config.use_presence and edu_type == "m.presence": if not self.config.use_presence and edu_type == "m.presence":
@ -848,17 +819,17 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
handler = self.edu_handlers.get(edu_type) handler = self.edu_handlers.get(edu_type)
if handler: if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu( return await super(ReplicationFederationHandlerRegistry, self).on_edu(
edu_type, origin, content edu_type, origin, content
) )
return self._send_edu(edu_type=edu_type, origin=origin, content=content) return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
def on_query(self, query_type, args): async def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry """Overrides FederationHandlerRegistry
""" """
handler = self.query_handlers.get(query_type) handler = self.query_handlers.get(query_type)
if handler: if handler:
return handler(args) return await handler(args)
return self._get_query_client(query_type=query_type, args=args) return await self._get_query_client(query_type=query_type, args=args)

View file

@ -36,6 +36,8 @@ from six import iteritems
from sortedcontainers import SortedDict from sortedcontainers import SortedDict
from twisted.internet import defer
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -212,7 +214,7 @@ class FederationRemoteSendQueue(object):
receipt (synapse.types.ReadReceipt): receipt (synapse.types.ReadReceipt):
""" """
# nothing to do here: the replication listener will handle it. # nothing to do here: the replication listener will handle it.
pass return defer.succeed(None)
def send_presence(self, states): def send_presence(self, states):
"""As per FederationSender """As per FederationSender

View file

@ -192,15 +192,16 @@ class PerDestinationQueue(object):
# We have to keep 2 free slots for presence and rr_edus # We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2 limit = MAX_EDUS_PER_TRANSACTION - 2
device_update_edus, dev_list_id = ( device_update_edus, dev_list_id = yield self._get_device_update_edus(
yield self._get_device_update_edus(limit) limit
) )
limit -= len(device_update_edus) limit -= len(device_update_edus)
to_device_edus, device_stream_id = ( (
yield self._get_to_device_message_edus(limit) to_device_edus,
) device_stream_id,
) = yield self._get_to_device_message_edus(limit)
pending_edus = device_update_edus + to_device_edus pending_edus = device_update_edus + to_device_edus
@ -359,20 +360,20 @@ class PerDestinationQueue(object):
last_device_list = self._last_device_list_stream_id last_device_list = self._last_device_list_stream_id
# Retrieve list of new device updates to send to the destination # Retrieve list of new device updates to send to the destination
now_stream_id, results = yield self._store.get_devices_by_remote( now_stream_id, results = yield self._store.get_device_updates_by_remote(
self._destination, last_device_list, limit=limit self._destination, last_device_list, limit=limit
) )
edus = [ edus = [
Edu( Edu(
origin=self._server_name, origin=self._server_name,
destination=self._destination, destination=self._destination,
edu_type="m.device_list_update", edu_type=edu_type,
content=content, content=content,
) )
for content in results for (edu_type, content) in results
] ]
assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs" assert len(edus) <= limit, "get_device_updates_by_remote returned too many EDUs"
return (edus, now_stream_id) return (edus, now_stream_id)

View file

@ -146,7 +146,7 @@ class TransactionManager(object):
if code == 200: if code == 200:
for e_id, r in response.get("pdus", {}).items(): for e_id, r in response.get("pdus", {}).items():
if "error" in r: if "error" in r:
logger.warn( logger.warning(
"TX [%s] {%s} Remote returned error for %s: %s", "TX [%s] {%s} Remote returned error for %s: %s",
destination, destination,
txn_id, txn_id,
@ -155,7 +155,7 @@ class TransactionManager(object):
) )
else: else:
for p in pdus: for p in pdus:
logger.warn( logger.warning(
"TX [%s] {%s} Failed to send event %s", "TX [%s] {%s} Failed to send event %s",
destination, destination,
txn_id, txn_id,

View file

@ -122,10 +122,10 @@ class TransportLayerClient(object):
Deferred: Results in a dict received from the remote homeserver. Deferred: Results in a dict received from the remote homeserver.
""" """
logger.debug( logger.debug(
"backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s", "backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s",
destination, destination,
room_id, room_id,
repr(event_tuples), event_tuples,
str(limit), str(limit),
) )

View file

@ -202,7 +202,7 @@ def _parse_auth_header(header_bytes):
sig = strip_quotes(param_dict["sig"]) sig = strip_quotes(param_dict["sig"])
return origin, key, sig return origin, key, sig
except Exception as e: except Exception as e:
logger.warn( logger.warning(
"Error parsing auth header '%s': %s", "Error parsing auth header '%s': %s",
header_bytes.decode("ascii", "replace"), header_bytes.decode("ascii", "replace"),
e, e,
@ -287,10 +287,12 @@ class BaseFederationServlet(object):
except NoAuthenticationError: except NoAuthenticationError:
origin = None origin = None
if self.REQUIRE_AUTH: if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication") logger.warning(
"authenticate_request failed: missing authentication"
)
raise raise
except Exception as e: except Exception as e:
logger.warn("authenticate_request failed: %s", e) logger.warning("authenticate_request failed: %s", e)
raise raise
request_tags = { request_tags = {

View file

@ -181,7 +181,7 @@ class GroupAttestionRenewer(object):
elif not self.is_mine_id(user_id): elif not self.is_mine_id(user_id):
destination = get_domain_from_id(user_id) destination = get_domain_from_id(user_id)
else: else:
logger.warn( logger.warning(
"Incorrectly trying to do attestations for user: %r in %r", "Incorrectly trying to do attestations for user: %r in %r",
user_id, user_id,
group_id, group_id,

View file

@ -488,7 +488,7 @@ class GroupsServerHandler(object):
profile = yield self.profile_handler.get_profile_from_cache(user_id) profile = yield self.profile_handler.get_profile_from_cache(user_id)
user_profile.update(profile) user_profile.update(profile)
except Exception as e: except Exception as e:
logger.warn("Error getting profile for %s: %s", user_id, e) logger.warning("Error getting profile for %s: %s", user_id, e)
user_profiles.append(user_profile) user_profiles.append(user_profile)
return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)} return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)}

View file

@ -38,9 +38,10 @@ class AccountDataEventSource(object):
{"type": "m.tag", "content": {"tags": room_tags}, "room_id": room_id} {"type": "m.tag", "content": {"tags": room_tags}, "room_id": room_id}
) )
account_data, room_account_data = ( (
yield self.store.get_updated_account_data_for_user(user_id, last_stream_id) account_data,
) room_account_data,
) = yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
for account_data_type, content in account_data.items(): for account_data_type, content in account_data.items():
results.append({"type": account_data_type, "content": content}) results.append({"type": account_data_type, "content": content})

View file

@ -30,6 +30,9 @@ class AdminHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(AdminHandler, self).__init__(hs) super(AdminHandler, self).__init__(hs)
self.storage = hs.get_storage()
self.state_store = self.storage.state
@defer.inlineCallbacks @defer.inlineCallbacks
def get_whois(self, user): def get_whois(self, user):
connections = [] connections = []
@ -205,7 +208,7 @@ class AdminHandler(BaseHandler):
from_key = events[-1].internal_metadata.after from_key = events[-1].internal_metadata.after
events = yield filter_events_for_client(self.store, user_id, events) events = yield filter_events_for_client(self.storage, user_id, events)
writer.write_events(room_id, events) writer.write_events(room_id, events)
@ -241,7 +244,7 @@ class AdminHandler(BaseHandler):
for event_id in extremities: for event_id in extremities:
if not event_to_unseen_prevs[event_id]: if not event_to_unseen_prevs[event_id]:
continue continue
state = yield self.store.get_state_for_event(event_id) state = yield self.state_store.get_state_for_event(event_id)
writer.write_state(room_id, event_id, state) writer.write_state(room_id, event_id, state)
return writer.finished() return writer.finished()

View file

@ -73,7 +73,10 @@ class ApplicationServicesHandler(object):
try: try:
limit = 100 limit = 100
while True: while True:
upper_bound, events = yield self.store.get_new_events_for_appservice( (
upper_bound,
events,
) = yield self.store.get_new_events_for_appservice(
self.current_max, limit self.current_max, limit
) )

View file

@ -525,7 +525,7 @@ class AuthHandler(BaseHandler):
result = None result = None
if not user_infos: if not user_infos:
logger.warn("Attempted to login as %s but they do not exist", user_id) logger.warning("Attempted to login as %s but they do not exist", user_id)
elif len(user_infos) == 1: elif len(user_infos) == 1:
# a single match (possibly not exact) # a single match (possibly not exact)
result = user_infos.popitem() result = user_infos.popitem()
@ -534,7 +534,7 @@ class AuthHandler(BaseHandler):
result = (user_id, user_infos[user_id]) result = (user_id, user_infos[user_id])
else: else:
# multiple matches, none of them exact # multiple matches, none of them exact
logger.warn( logger.warning(
"Attempted to login as %s but it matches more than one user " "Attempted to login as %s but it matches more than one user "
"inexactly: %r", "inexactly: %r",
user_id, user_id,
@ -728,7 +728,7 @@ class AuthHandler(BaseHandler):
result = yield self.validate_hash(password, password_hash) result = yield self.validate_hash(password, password_hash)
if not result: if not result:
logger.warn("Failed password login for user %s", user_id) logger.warning("Failed password login for user %s", user_id)
return None return None
return user_id return user_id

View file

@ -46,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.hs = hs self.hs = hs
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.state_store = hs.get_storage().state
self._auth_handler = hs.get_auth_handler() self._auth_handler = hs.get_auth_handler()
@trace @trace
@ -178,7 +179,7 @@ class DeviceWorkerHandler(BaseHandler):
continue continue
# mapping from event_id -> state_dict # mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) prev_state_ids = yield self.state_store.get_state_ids_for_events(event_ids)
# Check if we've joined the room? If so we just blindly add all the users to # Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users. # the "possibly changed" users.
@ -458,7 +459,18 @@ class DeviceHandler(DeviceWorkerHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id): def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
return {"user_id": user_id, "stream_id": stream_id, "devices": devices} master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
self_signing_key = yield self.store.get_e2e_cross_signing_key(
user_id, "self_signing"
)
return {
"user_id": user_id,
"stream_id": stream_id,
"devices": devices,
"master_key": master_key,
"self_signing_key": self_signing_key,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def user_left_room(self, user, room_id): def user_left_room(self, user, room_id):
@ -656,7 +668,7 @@ class DeviceListUpdater(object):
except (NotRetryingDestination, RequestSendFailed, HttpResponseException): except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
# TODO: Remember that we are now out of sync and try again # TODO: Remember that we are now out of sync and try again
# later # later
logger.warn("Failed to handle device list update for %s", user_id) logger.warning("Failed to handle device list update for %s", user_id)
# We abort on exceptions rather than accepting the update # We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list # as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync # is out of date. If we bail then we will retry the resync
@ -694,7 +706,7 @@ class DeviceListUpdater(object):
# up on storing the total list of devices and only handle the # up on storing the total list of devices and only handle the
# delta instead. # delta instead.
if len(devices) > 1000: if len(devices) > 1000:
logger.warn( logger.warning(
"Ignoring device list snapshot for %s as it has >1K devs (%d)", "Ignoring device list snapshot for %s as it has >1K devs (%d)",
user_id, user_id,
len(devices), len(devices),

View file

@ -52,7 +52,7 @@ class DeviceMessageHandler(object):
local_messages = {} local_messages = {}
sender_user_id = content["sender"] sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id): if origin != get_domain_from_id(sender_user_id):
logger.warn( logger.warning(
"Dropping device message from %r with spoofed sender %r", "Dropping device message from %r with spoofed sender %r",
origin, origin,
sender_user_id, sender_user_id,

View file

@ -250,7 +250,7 @@ class DirectoryHandler(BaseHandler):
ignore_backoff=True, ignore_backoff=True,
) )
except CodeMessageException as e: except CodeMessageException as e:
logging.warn("Error retrieving alias") logging.warning("Error retrieving alias")
if e.code == 404: if e.code == 404:
result = None result = None
else: else:

Some files were not shown because too many files have changed in this diff Show more