Merge branch 'develop' into babolivier/context_filters

This commit is contained in:
Brendan Abolivier 2019-11-26 10:53:48 +00:00 committed by GitHub
commit 4c1b799e1b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
125 changed files with 2317 additions and 1160 deletions

View file

@ -1,3 +1,95 @@
Synapse 1.6.0rc2 (2019-11-25)
=============================
Bugfixes
--------
- Fix a bug which could cause the background database update hander for event labels to get stuck in a loop raising exceptions. ([\#6407](https://github.com/matrix-org/synapse/issues/6407))
Synapse 1.6.0rc1 (2019-11-20)
=============================
Features
--------
- Add federation support for cross-signing. ([\#5727](https://github.com/matrix-org/synapse/issues/5727))
- Increase default room version from 4 to 5, thereby enforcing server key validity period checks. ([\#6220](https://github.com/matrix-org/synapse/issues/6220))
- Add support for outbound http proxying via http_proxy/HTTPS_PROXY env vars. ([\#6238](https://github.com/matrix-org/synapse/issues/6238))
- Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)). ([\#6301](https://github.com/matrix-org/synapse/issues/6301), [\#6310](https://github.com/matrix-org/synapse/issues/6310), [\#6340](https://github.com/matrix-org/synapse/issues/6340))
Bugfixes
--------
- Fix LruCache callback deduplication for Python 3.8. Contributed by @V02460. ([\#6213](https://github.com/matrix-org/synapse/issues/6213))
- Remove a room from a server's public rooms list on room upgrade. ([\#6232](https://github.com/matrix-org/synapse/issues/6232), [\#6235](https://github.com/matrix-org/synapse/issues/6235))
- Delete keys from key backup when deleting backup versions. ([\#6253](https://github.com/matrix-org/synapse/issues/6253))
- Make notification of cross-signing signatures work with workers. ([\#6254](https://github.com/matrix-org/synapse/issues/6254))
- Fix exception when remote servers attempt to join a room that they're not allowed to join. ([\#6278](https://github.com/matrix-org/synapse/issues/6278))
- Prevent errors from appearing on Synapse startup if `git` is not installed. ([\#6284](https://github.com/matrix-org/synapse/issues/6284))
- Appservice requests will no longer contain a double slash prefix when the appservice url provided ends in a slash. ([\#6306](https://github.com/matrix-org/synapse/issues/6306))
- Fix `/purge_room` admin API. ([\#6307](https://github.com/matrix-org/synapse/issues/6307))
- Fix the `hidden` field in the `devices` table for SQLite versions prior to 3.23.0. ([\#6313](https://github.com/matrix-org/synapse/issues/6313))
- Fix bug which casued rejected events to be persisted with the wrong room state. ([\#6320](https://github.com/matrix-org/synapse/issues/6320))
- Fix bug where `rc_login` ratelimiting would prematurely kick in. ([\#6335](https://github.com/matrix-org/synapse/issues/6335))
- Prevent the server taking a long time to start up when guest registration is enabled. ([\#6338](https://github.com/matrix-org/synapse/issues/6338))
- Fix bug where upgrading a guest account to a full user would fail when account validity is enabled. ([\#6359](https://github.com/matrix-org/synapse/issues/6359))
- Fix `to_device` stream ID getting reset every time Synapse restarts, which had the potential to cause unable to decrypt errors. ([\#6363](https://github.com/matrix-org/synapse/issues/6363))
- Fix permission denied error when trying to generate a config file with the docker image. ([\#6389](https://github.com/matrix-org/synapse/issues/6389))
Improved Documentation
----------------------
- Contributor documentation now mentions script to run linters. ([\#6164](https://github.com/matrix-org/synapse/issues/6164))
- Modify CAPTCHA_SETUP.md to update the terms `private key` and `public key` to `secret key` and `site key` respectively. Contributed by Yash Jipkate. ([\#6257](https://github.com/matrix-org/synapse/issues/6257))
- Update `INSTALL.md` Email section to talk about `account_threepid_delegates`. ([\#6272](https://github.com/matrix-org/synapse/issues/6272))
- Fix a small typo in `account_threepid_delegates` configuration option. ([\#6273](https://github.com/matrix-org/synapse/issues/6273))
Internal Changes
----------------
- Add a CI job to test the `synapse_port_db` script. ([\#6140](https://github.com/matrix-org/synapse/issues/6140), [\#6276](https://github.com/matrix-org/synapse/issues/6276))
- Convert EventContext to an attrs. ([\#6218](https://github.com/matrix-org/synapse/issues/6218))
- Move `persist_events` out from main data store. ([\#6240](https://github.com/matrix-org/synapse/issues/6240), [\#6300](https://github.com/matrix-org/synapse/issues/6300))
- Reduce verbosity of user/room stats. ([\#6250](https://github.com/matrix-org/synapse/issues/6250))
- Reduce impact of debug logging. ([\#6251](https://github.com/matrix-org/synapse/issues/6251))
- Expose some homeserver functionality to spam checkers. ([\#6259](https://github.com/matrix-org/synapse/issues/6259))
- Change cache descriptors to always return deferreds. ([\#6263](https://github.com/matrix-org/synapse/issues/6263), [\#6291](https://github.com/matrix-org/synapse/issues/6291))
- Fix incorrect comment regarding the functionality of an `if` statement. ([\#6269](https://github.com/matrix-org/synapse/issues/6269))
- Update CI to run `isort` over the `scripts` and `scripts-dev` directories. ([\#6270](https://github.com/matrix-org/synapse/issues/6270))
- Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated. ([\#6271](https://github.com/matrix-org/synapse/issues/6271), [\#6314](https://github.com/matrix-org/synapse/issues/6314))
- Port replication http server endpoints to async/await. ([\#6274](https://github.com/matrix-org/synapse/issues/6274))
- Port room rest handlers to async/await. ([\#6275](https://github.com/matrix-org/synapse/issues/6275))
- Remove redundant CLI parameters on CI's `flake8` step. ([\#6277](https://github.com/matrix-org/synapse/issues/6277))
- Port `federation_server.py` to async/await. ([\#6279](https://github.com/matrix-org/synapse/issues/6279))
- Port receipt and read markers to async/wait. ([\#6280](https://github.com/matrix-org/synapse/issues/6280))
- Split out state storage into separate data store. ([\#6294](https://github.com/matrix-org/synapse/issues/6294), [\#6295](https://github.com/matrix-org/synapse/issues/6295))
- Refactor EventContext for clarity. ([\#6298](https://github.com/matrix-org/synapse/issues/6298))
- Update the version of black used to 19.10b0. ([\#6304](https://github.com/matrix-org/synapse/issues/6304))
- Add some documentation about worker replication. ([\#6305](https://github.com/matrix-org/synapse/issues/6305))
- Move admin endpoints into separate files. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#6308](https://github.com/matrix-org/synapse/issues/6308))
- Document the use of `lint.sh` for code style enforcement & extend it to run on specified paths only. ([\#6312](https://github.com/matrix-org/synapse/issues/6312))
- Add optional python dependencies and dependant binary libraries to snapcraft packaging. ([\#6317](https://github.com/matrix-org/synapse/issues/6317))
- Remove the dependency on psutil and replace functionality with the stdlib `resource` module. ([\#6318](https://github.com/matrix-org/synapse/issues/6318), [\#6336](https://github.com/matrix-org/synapse/issues/6336))
- Improve documentation for EventContext fields. ([\#6319](https://github.com/matrix-org/synapse/issues/6319))
- Add some checks that we aren't using state from rejected events. ([\#6330](https://github.com/matrix-org/synapse/issues/6330))
- Add continuous integration for python 3.8. ([\#6341](https://github.com/matrix-org/synapse/issues/6341))
- Correct spacing/case of various instances of the word "homeserver". ([\#6357](https://github.com/matrix-org/synapse/issues/6357))
- Temporarily blacklist the failing unit test PurgeRoomTestCase.test_purge_room. ([\#6361](https://github.com/matrix-org/synapse/issues/6361))
Synapse 1.5.1 (2019-11-06)
==========================
Features
--------
- Limit the length of data returned by url previews, to prevent DoS attacks. ([\#6331](https://github.com/matrix-org/synapse/issues/6331), [\#6334](https://github.com/matrix-org/synapse/issues/6334))
Synapse 1.5.0 (2019-10-29) Synapse 1.5.0 (2019-10-29)
========================== ==========================

View file

@ -36,7 +36,7 @@ that your email address is probably `user@example.com` rather than
System requirements: System requirements:
- POSIX-compliant system (tested on Linux & OS X) - POSIX-compliant system (tested on Linux & OS X)
- Python 3.5, 3.6, or 3.7 - Python 3.5, 3.6, 3.7 or 3.8.
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org - At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
Synapse is written in Python but some of the libraries it uses are written in Synapse is written in Python but some of the libraries it uses are written in
@ -133,9 +133,9 @@ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
sudo yum groupinstall "Development Tools" sudo yum groupinstall "Development Tools"
``` ```
#### Mac OS X #### macOS
Installing prerequisites on Mac OS X: Installing prerequisites on macOS:
``` ```
xcode-select --install xcode-select --install
@ -144,6 +144,14 @@ sudo pip install virtualenv
brew install pkg-config libffi brew install pkg-config libffi
``` ```
On macOS Catalina (10.15) you may need to explicitly install OpenSSL
via brew and inform `pip` about it so that `psycopg2` builds:
```
brew install openssl@1.1
export LDFLAGS=-L/usr/local/Cellar/openssl\@1.1/1.1.1d/lib/
```
#### OpenSUSE #### OpenSUSE
Installing prerequisites on openSUSE: Installing prerequisites on openSUSE:

View file

@ -1 +0,0 @@
Add federation support for cross-signing.

View file

@ -1 +0,0 @@
Add a CI job to test the `synapse_port_db` script.

View file

@ -1 +0,0 @@
Contributor documentation now mentions script to run linters.

View file

@ -1 +0,0 @@
Convert EventContext to an attrs.

View file

@ -1 +0,0 @@
Remove a room from a server's public rooms list on room upgrade.

View file

@ -1 +0,0 @@
Add support for outbound http proxying via http_proxy/HTTPS_PROXY env vars.

View file

@ -1 +0,0 @@
Move `persist_events` out from main data store.

View file

@ -1 +0,0 @@
Reduce verbosity of user/room stats.

View file

@ -1 +0,0 @@
Reduce impact of debug logging.

View file

@ -1 +0,0 @@
Delete keys from key backup when deleting backup versions.

View file

@ -1 +0,0 @@
Make notification of cross-signing signatures work with workers.

View file

@ -1 +0,0 @@
Modify CAPTCHA_SETUP.md to update the terms `private key` and `public key` to `secret key` and `site key` respectively. Contributed by Yash Jipkate.

View file

@ -1 +0,0 @@
Expose some homeserver functionality to spam checkers.

View file

@ -1 +0,0 @@
Change cache descriptors to always return deferreds.

View file

@ -1 +0,0 @@
Fix incorrect comment regarding the functionality of an `if` statement.

View file

@ -1 +0,0 @@
Update CI to run `isort` over the `scripts` and `scripts-dev` directories.

View file

@ -1 +0,0 @@
Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated.

View file

@ -1 +0,0 @@
Update `INSTALL.md` Email section to talk about `account_threepid_delegates`.

View file

@ -1 +0,0 @@
Fix a small typo in `account_threepid_delegates` configuration option.

View file

@ -1 +0,0 @@
Port replication http server endpoints to async/await.

View file

@ -1 +0,0 @@
Port room rest handlers to async/await.

View file

@ -1 +0,0 @@
Add a CI job to test the `synapse_port_db` script.

View file

@ -1 +0,0 @@
Remove redundant CLI parameters on CI's `flake8` step.

View file

@ -1 +0,0 @@
Fix exception when remote servers attempt to join a room that they're not allowed to join.

View file

@ -1 +0,0 @@
Port `federation_server.py` to async/await.

View file

@ -1 +0,0 @@
Port receipt and read markers to async/wait.

View file

@ -1 +0,0 @@
Prevent errors from appearing on Synapse startup if `git` is not installed.

View file

@ -1 +0,0 @@
Change cache descriptors to always return deferreds.

View file

@ -1 +0,0 @@
Split out state storage into separate data store.

View file

@ -1 +0,0 @@
Refactor EventContext for clarity.

View file

@ -1 +0,0 @@
Move `persist_events` out from main data store.

View file

@ -1 +0,0 @@
Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)).

View file

@ -1 +0,0 @@
Update the version of black used to 19.10b0.

View file

@ -1 +0,0 @@
Add some documentation about worker replication.

View file

@ -1 +0,0 @@
Appservice requests will no longer contain a double slash prefix when the appservice url provided ends in a slash.

View file

@ -1 +0,0 @@
Fix `/purge_room` admin API.

View file

@ -1 +0,0 @@
Document the use of `lint.sh` for code style enforcement & extend it to run on specified paths only.

View file

@ -1 +0,0 @@
Fix the `hidden` field in the `devices` table for SQLite versions prior to 3.23.0.

View file

@ -1 +0,0 @@
Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated.

View file

@ -1 +0,0 @@
Remove the dependency on psutil and replace functionality with the stdlib `resource` module.

1
changelog.d/6322.misc Normal file
View file

@ -0,0 +1 @@
Improve the performance of outputting structured logging.

1
changelog.d/6362.misc Normal file
View file

@ -0,0 +1 @@
Clean up some unnecessary quotation marks around the codebase.

1
changelog.d/6388.doc Normal file
View file

@ -0,0 +1 @@
Fix link in the user directory documentation.

1
changelog.d/6390.doc Normal file
View file

@ -0,0 +1 @@
Add build instructions to the docker readme.

1
changelog.d/6392.misc Normal file
View file

@ -0,0 +1 @@
Add a test scenario to make sure room history purges don't break `/messages` in the future.

1
changelog.d/6408.bugfix Normal file
View file

@ -0,0 +1 @@
Fix an intermittent exception when handling read-receipts.

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.5.1) stable; urgency=medium
* New synapse release 1.5.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 06 Nov 2019 10:02:14 +0000
matrix-synapse-py3 (1.5.0) stable; urgency=medium matrix-synapse-py3 (1.5.0) stable; urgency=medium
* New synapse release 1.5.0. * New synapse release 1.5.0.

View file

@ -130,3 +130,15 @@ docker run -it --rm \
This will generate the same configuration file as the legacy mode used, but This will generate the same configuration file as the legacy mode used, but
will store it in `/data/homeserver.yaml` instead of a temporary location. You will store it in `/data/homeserver.yaml` instead of a temporary location. You
can then use it as shown above at [Running synapse](#running-synapse). can then use it as shown above at [Running synapse](#running-synapse).
## Building the image
If you need to build the image from a Synapse checkout, use the following `docker
build` command from the repo's root:
```
docker build -t matrixdotorg/synapse -f docker/Dockerfile .
```
You can choose to build a different docker image by changing the value of the `-f` flag to
point to another Dockerfile.

View file

@ -169,11 +169,11 @@ def run_generate_config(environ, ownership):
# log("running %s" % (args, )) # log("running %s" % (args, ))
if ownership is not None: if ownership is not None:
args = ["su-exec", ownership] + args
os.execv("/sbin/su-exec", args)
# make sure that synapse has perms to write to the data dir. # make sure that synapse has perms to write to the data dir.
subprocess.check_output(["chown", ownership, data_dir]) subprocess.check_output(["chown", ownership, data_dir])
args = ["su-exec", ownership] + args
os.execv("/sbin/su-exec", args)
else: else:
os.execv("/usr/local/bin/python", args) os.execv("/usr/local/bin/python", args)

View file

@ -72,7 +72,7 @@ pid_file: DATADIR/homeserver.pid
# For example, for room version 1, default_room_version should be set # For example, for room version 1, default_room_version should be set
# to "1". # to "1".
# #
#default_room_version: "4" #default_room_version: "5"
# The GC threshold parameters to pass to `gc.set_threshold`, if defined # The GC threshold parameters to pass to `gc.set_threshold`, if defined
# #
@ -287,7 +287,7 @@ listeners:
# Used by phonehome stats to group together related servers. # Used by phonehome stats to group together related servers.
#server_context: context #server_context: context
# Resource-constrained Homeserver Settings # Resource-constrained homeserver Settings
# #
# If limit_remote_rooms.enabled is True, the room complexity will be # If limit_remote_rooms.enabled is True, the room complexity will be
# checked before a user joins a new remote room. If it is above # checked before a user joins a new remote room. If it is above
@ -743,11 +743,11 @@ uploads_path: "DATADIR/uploads"
## Captcha ## ## Captcha ##
# See docs/CAPTCHA_SETUP for full details of configuring this. # See docs/CAPTCHA_SETUP for full details of configuring this.
# This Home Server's ReCAPTCHA public key. # This homeserver's ReCAPTCHA public key.
# #
#recaptcha_public_key: "YOUR_PUBLIC_KEY" #recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key. # This homeserver's ReCAPTCHA private key.
# #
#recaptcha_private_key: "YOUR_PRIVATE_KEY" #recaptcha_private_key: "YOUR_PRIVATE_KEY"
@ -1270,7 +1270,7 @@ password_config:
# smtp_user: "exampleusername" # smtp_user: "exampleusername"
# smtp_pass: "examplepassword" # smtp_pass: "examplepassword"
# require_transport_security: false # require_transport_security: false
# notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>" # notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name: Matrix # app_name: Matrix
# #
# # Enable email notifications by default # # Enable email notifications by default

View file

@ -7,7 +7,6 @@ who are present in a publicly viewable room present on the server.
The directory info is stored in various tables, which can (typically after The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL here solution to fix it is to execute the SQL [here](../synapse/storage/data_stores/main/schema/delta/53/user_dir_populate.sql)
https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/delta/53/user_dir_populate.sql
and then restart synapse. This should then start a background task to and then restart synapse. This should then start a background task to
flush the current tables and regenerate the directory. flush the current tables and regenerate the directory.

View file

@ -20,11 +20,13 @@ from concurrent.futures import ThreadPoolExecutor
DISTS = ( DISTS = (
"debian:stretch", "debian:stretch",
"debian:buster", "debian:buster",
"debian:bullseye",
"debian:sid", "debian:sid",
"ubuntu:xenial", "ubuntu:xenial",
"ubuntu:bionic", "ubuntu:bionic",
"ubuntu:cosmic", "ubuntu:cosmic",
"ubuntu:disco", "ubuntu:disco",
"ubuntu:eoan",
) )
DESC = '''\ DESC = '''\

View file

@ -20,3 +20,23 @@ parts:
source: . source: .
plugin: python plugin: python
python-version: python3 python-version: python3
python-packages:
- '.[all]'
build-packages:
- libffi-dev
- libturbojpeg0-dev
- libssl-dev
- libxslt1-dev
- libpq-dev
- zlib1g-dev
stage-packages:
- libasn1-8-heimdal
- libgssapi3-heimdal
- libhcrypto4-heimdal
- libheimbase1-heimdal
- libheimntlm0-heimdal
- libhx509-5-heimdal
- libkrb5-26-heimdal
- libldap-2.4-2
- libpq5
- libsasl2-2

View file

@ -36,7 +36,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.5.0" __version__ = "1.6.0rc2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View file

@ -69,7 +69,7 @@ class FederationSenderSlaveStore(
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
def _get_federation_out_pos(self, db_conn): def _get_federation_out_pos(self, db_conn):
sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?" sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
sql = self.database_engine.convert_param_style(sql) sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor() txn = db_conn.cursor()

View file

@ -636,7 +636,7 @@ def run(hs):
if hs.config.report_stats: if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals") logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000, hs, stats) clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
# We need to defer this init for the cases that we daemonize # We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process # otherwise the process ID we get is that of the non-daemon process
@ -644,7 +644,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can # We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes # be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home, hs, stats) clock.call_later(5 * 60, start_phone_stats_home)
_base.start_reactor( _base.start_reactor(
"synapse-homeserver", "synapse-homeserver",

View file

@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient):
if not _is_valid_3pe_metadata(info): if not _is_valid_3pe_metadata(info):
logger.warning( logger.warning(
"query_3pe_protocol to %s did not return a" " valid result", uri "query_3pe_protocol to %s did not return a valid result", uri
) )
return None return None

View file

@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename):
for regex_obj in as_info["namespaces"][ns]: for regex_obj in as_info["namespaces"][ns]:
if not isinstance(regex_obj, dict): if not isinstance(regex_obj, dict):
raise ValueError( raise ValueError(
"Expected namespace entry in %s to be an object," " but got %s", "Expected namespace entry in %s to be an object, but got %s",
ns, ns,
regex_obj, regex_obj,
) )

View file

@ -35,11 +35,11 @@ class CaptchaConfig(Config):
## Captcha ## ## Captcha ##
# See docs/CAPTCHA_SETUP for full details of configuring this. # See docs/CAPTCHA_SETUP for full details of configuring this.
# This Home Server's ReCAPTCHA public key. # This homeserver's ReCAPTCHA public key.
# #
#recaptcha_public_key: "YOUR_PUBLIC_KEY" #recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key. # This homeserver's ReCAPTCHA private key.
# #
#recaptcha_private_key: "YOUR_PRIVATE_KEY" #recaptcha_private_key: "YOUR_PRIVATE_KEY"

View file

@ -305,7 +305,7 @@ class EmailConfig(Config):
# smtp_user: "exampleusername" # smtp_user: "exampleusername"
# smtp_pass: "examplepassword" # smtp_pass: "examplepassword"
# require_transport_security: false # require_transport_security: false
# notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>" # notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name: Matrix # app_name: Matrix
# #
# # Enable email notifications by default # # Enable email notifications by default

View file

@ -170,7 +170,7 @@ class _RoomDirectoryRule(object):
self.action = action self.action = action
else: else:
raise ConfigError( raise ConfigError(
"%s rules can only have action of 'allow'" " or 'deny'" % (option_name,) "%s rules can only have action of 'allow' or 'deny'" % (option_name,)
) )
self._alias_matches_all = alias == "*" self._alias_matches_all = alias == "*"

View file

@ -41,7 +41,7 @@ logger = logging.Logger(__name__)
# in the list. # in the list.
DEFAULT_BIND_ADDRESSES = ["::", "0.0.0.0"] DEFAULT_BIND_ADDRESSES = ["::", "0.0.0.0"]
DEFAULT_ROOM_VERSION = "4" DEFAULT_ROOM_VERSION = "5"
ROOM_COMPLEXITY_TOO_GREAT = ( ROOM_COMPLEXITY_TOO_GREAT = (
"Your homeserver is unable to join rooms this large or complex. " "Your homeserver is unable to join rooms this large or complex. "
@ -223,7 +223,7 @@ class ServerConfig(Config):
self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
except Exception as e: except Exception as e:
raise ConfigError( raise ConfigError(
"Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
) )
if self.public_baseurl is not None: if self.public_baseurl is not None:
@ -721,7 +721,7 @@ class ServerConfig(Config):
# Used by phonehome stats to group together related servers. # Used by phonehome stats to group together related servers.
#server_context: context #server_context: context
# Resource-constrained Homeserver Settings # Resource-constrained homeserver Settings
# #
# If limit_remote_rooms.enabled is True, the room complexity will be # If limit_remote_rooms.enabled is True, the room complexity will be
# checked before a user joins a new remote room. If it is above # checked before a user joins a new remote room. If it is above
@ -787,14 +787,14 @@ class ServerConfig(Config):
"--print-pidfile", "--print-pidfile",
action="store_true", action="store_true",
default=None, default=None,
help="Print the path to the pidfile just" " before daemonizing", help="Print the path to the pidfile just before daemonizing",
) )
server_group.add_argument( server_group.add_argument(
"--manhole", "--manhole",
metavar="PORT", metavar="PORT",
dest="manhole", dest="manhole",
type=int, type=int,
help="Turn on the twisted telnet manhole" " service on the given port.", help="Turn on the twisted telnet manhole service on the given port.",
) )

View file

@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Dict, Optional, Tuple, Union
from six import iteritems from six import iteritems
import attr import attr
@ -19,54 +21,113 @@ from frozendict import frozendict
from twisted.internet import defer from twisted.internet import defer
from synapse.appservice import ApplicationService
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
@attr.s(slots=True) @attr.s(slots=True)
class EventContext: class EventContext:
""" """
Holds information relevant to persisting an event
Attributes: Attributes:
state_group (int|None): state group id, if the state has been stored rejected: A rejection reason if the event was rejected, else False
as a state group. This is usually only None if e.g. the event is
an outlier.
rejected (bool|str): A rejection reason if the event was rejected, else
False
prev_group (int): Previously persisted state group. ``None`` for an _state_group: The ID of the state group for this event. Note that state events
outlier. are persisted with a state group which includes the new event, so this is
delta_ids (dict[(str, str), str]): Delta from ``prev_group``. effectively the state *after* the event in question.
(type, state_key) -> event_id. ``None`` for an outlier.
app_service: FIXME For a *rejected* state event, where the state of the rejected event is
ignored, this state_group should never make it into the
event_to_state_groups table. Indeed, inspecting this value for a rejected
state event is almost certainly incorrect.
For an outlier, where we don't have the state at the event, this will be
None.
Note that this is a private attribute: it should be accessed via
the ``state_group`` property.
state_group_before_event: The ID of the state group representing the state
of the room before this event.
If this is a non-state event, this will be the same as ``state_group``. If
it's a state event, it will be the same as ``prev_group``.
If ``state_group`` is None (ie, the event is an outlier),
``state_group_before_event`` will always also be ``None``.
prev_group: If it is known, ``state_group``'s prev_group. Note that this being
None does not necessarily mean that ``state_group`` does not have
a prev_group!
If the event is a state event, this is normally the same as ``prev_group``.
If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
will always also be ``None``.
Note that this *not* (necessarily) the state group associated with
``_prev_state_ids``.
delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
and ``state_group``.
app_service: If this event is being sent by a (local) application service, that
app service.
_current_state_ids: The room state map, including this event - ie, the state
in ``state_group``.
_current_state_ids (dict[(str, str), str]|None):
The current state map including the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id (type, state_key) -> event_id
_prev_state_ids (dict[(str, str), str]|None): FIXME: what is this for an outlier? it seems ill-defined. It seems like
The current state map excluding the current event. None if outlier it could be either {}, or the state we were given by the remote
or we haven't fetched the state from DB yet. server, depending on $THINGS
Note that this is a private attribute: it should be accessed via
``get_current_state_ids``. _AsyncEventContext impl calculates this
on-demand: it will be None until that happens.
_prev_state_ids: The room state map, excluding this event - ie, the state
in ``state_group_before_event``. For a non-state
event, this will be the same as _current_state_events.
Note that it is a completely different thing to prev_group!
(type, state_key) -> event_id (type, state_key) -> event_id
FIXME: again, what is this for an outlier?
As with _current_state_ids, this is a private attribute. It should be
accessed via get_prev_state_ids.
""" """
state_group = attr.ib(default=None) rejected = attr.ib(default=False, type=Union[bool, str])
rejected = attr.ib(default=False) _state_group = attr.ib(default=None, type=Optional[int])
prev_group = attr.ib(default=None) state_group_before_event = attr.ib(default=None, type=Optional[int])
delta_ids = attr.ib(default=None) prev_group = attr.ib(default=None, type=Optional[int])
app_service = attr.ib(default=None) delta_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
app_service = attr.ib(default=None, type=Optional[ApplicationService])
_prev_state_ids = attr.ib(default=None) _current_state_ids = attr.ib(
_current_state_ids = attr.ib(default=None) default=None, type=Optional[Dict[Tuple[str, str], str]]
)
_prev_state_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
@staticmethod @staticmethod
def with_state( def with_state(
state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None state_group,
state_group_before_event,
current_state_ids,
prev_state_ids,
prev_group=None,
delta_ids=None,
): ):
return EventContext( return EventContext(
current_state_ids=current_state_ids, current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids, prev_state_ids=prev_state_ids,
state_group=state_group, state_group=state_group,
state_group_before_event=state_group_before_event,
prev_group=prev_group, prev_group=prev_group,
delta_ids=delta_ids, delta_ids=delta_ids,
) )
@ -97,7 +158,8 @@ class EventContext:
"prev_state_id": prev_state_id, "prev_state_id": prev_state_id,
"event_type": event.type, "event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None, "event_state_key": event.state_key if event.is_state() else None,
"state_group": self.state_group, "state_group": self._state_group,
"state_group_before_event": self.state_group_before_event,
"rejected": self.rejected, "rejected": self.rejected,
"prev_group": self.prev_group, "prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids), "delta_ids": _encode_state_dict(self.delta_ids),
@ -123,6 +185,7 @@ class EventContext:
event_type=input["event_type"], event_type=input["event_type"],
event_state_key=input["event_state_key"], event_state_key=input["event_state_key"],
state_group=input["state_group"], state_group=input["state_group"],
state_group_before_event=input["state_group_before_event"],
prev_group=input["prev_group"], prev_group=input["prev_group"],
delta_ids=_decode_state_dict(input["delta_ids"]), delta_ids=_decode_state_dict(input["delta_ids"]),
rejected=input["rejected"], rejected=input["rejected"],
@ -134,22 +197,52 @@ class EventContext:
return context return context
@property
def state_group(self) -> Optional[int]:
"""The ID of the state group for this event.
Note that state events are persisted with a state group which includes the new
event, so this is effectively the state *after* the event in question.
For an outlier, where we don't have the state at the event, this will be None.
It is an error to access this for a rejected event, since rejected state should
not make it into the room state. Accessing this property will raise an exception
if ``rejected`` is set.
"""
if self.rejected:
raise RuntimeError("Attempt to access state_group of rejected event")
return self._state_group
@defer.inlineCallbacks @defer.inlineCallbacks
def get_current_state_ids(self, store): def get_current_state_ids(self, store):
"""Gets the current state IDs """
Gets the room state map, including this event - ie, the state in ``state_group``
It is an error to access this for a rejected event, since rejected state should
not make it into the room state. This method will raise an exception if
``rejected`` is set.
Returns: Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group Deferred[dict[(str, str), str]|None]: Returns None if state_group
is None, which happens when the associated event is an outlier. is None, which happens when the associated event is an outlier.
Maps a (type, state_key) to the event ID of the state event matching Maps a (type, state_key) to the event ID of the state event matching
this tuple. this tuple.
""" """
if self.rejected:
raise RuntimeError("Attempt to access state_ids of rejected event")
yield self._ensure_fetched(store) yield self._ensure_fetched(store)
return self._current_state_ids return self._current_state_ids
@defer.inlineCallbacks @defer.inlineCallbacks
def get_prev_state_ids(self, store): def get_prev_state_ids(self, store):
"""Gets the prev state IDs """
Gets the room state map, excluding this event.
For a non-state event, this will be the same as get_current_state_ids().
Returns: Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group Deferred[dict[(str, str), str]|None]: Returns None if state_group
@ -163,11 +256,17 @@ class EventContext:
def get_cached_current_state_ids(self): def get_cached_current_state_ids(self):
"""Gets the current state IDs if we have them already cached. """Gets the current state IDs if we have them already cached.
It is an error to access this for a rejected event, since rejected state should
not make it into the room state. This method will raise an exception if
``rejected`` is set.
Returns: Returns:
dict[(str, str), str]|None: Returns None if we haven't cached the dict[(str, str), str]|None: Returns None if we haven't cached the
state or if state_group is None, which happens when the associated state or if state_group is None, which happens when the associated
event is an outlier. event is an outlier.
""" """
if self.rejected:
raise RuntimeError("Attempt to access state_ids of rejected event")
return self._current_state_ids return self._current_state_ids

View file

@ -44,7 +44,7 @@ class TransactionActions(object):
response code and response body. response code and response body.
""" """
if not transaction.transaction_id: if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id") raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.get_received_txn_response(transaction.transaction_id, origin) return self.store.get_received_txn_response(transaction.transaction_id, origin)
@ -56,7 +56,7 @@ class TransactionActions(object):
Deferred Deferred
""" """
if not transaction.transaction_id: if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id") raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.set_received_txn_response( return self.store.set_received_txn_response(
transaction.transaction_id, origin, code, response transaction.transaction_id, origin, code, response

View file

@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter(
sent_pdus_destination_dist_total = Counter( sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", "synapse_federation_client_sent_pdu_destinations:total",
"" "Total number of PDUs queued for sending across all destinations", "Total number of PDUs queued for sending across all destinations",
) )

View file

@ -84,7 +84,7 @@ class TransactionManager(object):
txn_id = str(self._next_txn_id) txn_id = str(self._next_txn_id)
logger.debug( logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
destination, destination,
txn_id, txn_id,
len(pdus), len(pdus),
@ -103,7 +103,7 @@ class TransactionManager(object):
self._next_txn_id += 1 self._next_txn_id += 1
logger.info( logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
destination, destination,
txn_id, txn_id,
transaction.transaction_id, transaction.transaction_id,

View file

@ -102,8 +102,9 @@ class AuthHandler(BaseHandler):
login_types.append(t) login_types.append(t)
self._supported_login_types = login_types self._supported_login_types = login_types
self._account_ratelimiter = Ratelimiter() # Ratelimiter for failed auth during UIA. Uses same ratelimit config
self._failed_attempts_ratelimiter = Ratelimiter() # as per `rc_login.failed_attempts`.
self._failed_uia_attempts_ratelimiter = Ratelimiter()
self._clock = self.hs.get_clock() self._clock = self.hs.get_clock()
@ -133,12 +134,38 @@ class AuthHandler(BaseHandler):
AuthError if the client has completed a login flow, and it gives AuthError if the client has completed a login flow, and it gives
a different user to `requester` a different user to `requester`
LimitExceededError if the ratelimiter's failed request count for this
user is too high to proceed
""" """
user_id = requester.user.to_string()
# Check if we should be ratelimited due to too many previous failed attempts
self._failed_uia_attempts_ratelimiter.ratelimit(
user_id,
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=False,
)
# build a list of supported flows # build a list of supported flows
flows = [[login_type] for login_type in self._supported_login_types] flows = [[login_type] for login_type in self._supported_login_types]
try:
result, params, _ = yield self.check_auth(flows, request_body, clientip) result, params, _ = yield self.check_auth(flows, request_body, clientip)
except LoginError:
# Update the ratelimite to say we failed (`can_do_action` doesn't raise).
self._failed_uia_attempts_ratelimiter.can_do_action(
user_id,
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=True,
)
raise
# find the completed login type # find the completed login type
for login_type in self._supported_login_types: for login_type in self._supported_login_types:
@ -501,11 +528,8 @@ class AuthHandler(BaseHandler):
multiple matches multiple matches
Raises: Raises:
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
UserDeactivatedError if a user is found but is deactivated. UserDeactivatedError if a user is found but is deactivated.
""" """
self.ratelimit_login_per_account(user_id)
res = yield self._find_user_id_and_pwd_hash(user_id) res = yield self._find_user_id_and_pwd_hash(user_id)
if res is not None: if res is not None:
return res[0] return res[0]
@ -572,8 +596,6 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem accessing the database StoreError if there was a problem accessing the database
SynapseError if there was a problem with the request SynapseError if there was a problem with the request
LoginError if there was an authentication problem. LoginError if there was an authentication problem.
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
""" """
if username.startswith("@"): if username.startswith("@"):
@ -581,8 +603,6 @@ class AuthHandler(BaseHandler):
else: else:
qualified_user_id = UserID(username, self.hs.hostname).to_string() qualified_user_id = UserID(username, self.hs.hostname).to_string()
self.ratelimit_login_per_account(qualified_user_id)
login_type = login_submission.get("type") login_type = login_submission.get("type")
known_login_type = False known_login_type = False
@ -650,15 +670,6 @@ class AuthHandler(BaseHandler):
if not known_login_type: if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type) raise SynapseError(400, "Unknown login type %s" % login_type)
# unknown username or invalid password.
self._failed_attempts_ratelimiter.ratelimit(
qualified_user_id.lower(),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=True,
)
# We raise a 403 here, but note that if we're doing user-interactive # We raise a 403 here, but note that if we're doing user-interactive
# login, it turns all LoginErrors into a 401 anyway. # login, it turns all LoginErrors into a 401 anyway.
raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN) raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN)
@ -710,10 +721,6 @@ class AuthHandler(BaseHandler):
Returns: Returns:
Deferred[unicode] the canonical_user_id, or Deferred[None] if Deferred[unicode] the canonical_user_id, or Deferred[None] if
unknown user/bad password unknown user/bad password
Raises:
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
""" """
lookupres = yield self._find_user_id_and_pwd_hash(user_id) lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres: if not lookupres:
@ -742,7 +749,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", user_id) auth_api.validate_macaroon(macaroon, "login", user_id)
except Exception: except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN) raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
self.ratelimit_login_per_account(user_id)
yield self.auth.check_auth_blocking(user_id) yield self.auth.check_auth_blocking(user_id)
return user_id return user_id
@ -810,7 +817,7 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def add_threepid(self, user_id, medium, address, validated_at): def add_threepid(self, user_id, medium, address, validated_at):
# 'Canonicalise' email addresses down to lower case. # 'Canonicalise' email addresses down to lower case.
# We've now moving towards the Home Server being the entity that # We've now moving towards the homeserver being the entity that
# is responsible for validating threepids used for resetting passwords # is responsible for validating threepids used for resetting passwords
# on accounts, so in future Synapse will gain knowledge of specific # on accounts, so in future Synapse will gain knowledge of specific
# types (mediums) of threepid. For now, we still use the existing # types (mediums) of threepid. For now, we still use the existing
@ -912,35 +919,6 @@ class AuthHandler(BaseHandler):
else: else:
return defer.succeed(False) return defer.succeed(False)
def ratelimit_login_per_account(self, user_id):
"""Checks whether the process must be stopped because of ratelimiting.
Checks against two ratelimiters: the generic one for login attempts per
account and the one specific to failed attempts.
Args:
user_id (unicode): complete @user:id
Raises:
LimitExceededError if one of the ratelimiters' login requests count
for this user is too high too proceed.
"""
self._failed_attempts_ratelimiter.ratelimit(
user_id.lower(),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=False,
)
self._account_ratelimiter.ratelimit(
user_id.lower(),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_account.per_second,
burst_count=self.hs.config.rc_login_account.burst_count,
update=True,
)
@attr.s @attr.s
class MacaroonGenerator(object): class MacaroonGenerator(object):

View file

@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
if not service.is_interested_in_alias(room_alias.to_string()): if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError( raise SynapseError(
400, 400,
"This application service has not reserved" " this kind of alias.", "This application service has not reserved this kind of alias.",
errcode=Codes.EXCLUSIVE, errcode=Codes.EXCLUSIVE,
) )
else: else:
@ -283,7 +283,7 @@ class DirectoryHandler(BaseHandler):
def on_directory_query(self, args): def on_directory_query(self, args):
room_alias = RoomAlias.from_string(args["room_alias"]) room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias): if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room Alias is not hosted on this Home Server") raise SynapseError(400, "Room Alias is not hosted on this homeserver")
result = yield self.get_association_from_room_alias(room_alias) result = yield self.get_association_from_room_alias(room_alias)

View file

@ -1688,7 +1688,11 @@ class FederationHandler(BaseHandler):
# hack around with a try/finally instead. # hack around with a try/finally instead.
success = False success = False
try: try:
if not event.internal_metadata.is_outlier() and not backfilled: if (
not event.internal_metadata.is_outlier()
and not backfilled
and not context.rejected
):
yield self.action_generator.handle_push_actions_for_event( yield self.action_generator.handle_push_actions_for_event(
event, context event, context
) )
@ -2276,6 +2280,7 @@ class FederationHandler(BaseHandler):
return EventContext.with_state( return EventContext.with_state(
state_group=state_group, state_group=state_group,
state_group_before_event=context.state_group_before_event,
current_state_ids=current_state_ids, current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids, prev_state_ids=prev_state_ids,
prev_group=prev_group, prev_group=prev_group,

View file

@ -127,7 +127,9 @@ class PaginationHandler(object):
self._purges_in_progress_by_room.add(room_id) self._purges_in_progress_by_room.add(room_id)
try: try:
with (yield self.pagination_lock.write(room_id)): with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(room_id, token, delete_local_events) yield self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete") logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception: except Exception:
@ -170,7 +172,7 @@ class PaginationHandler(object):
if joined: if joined:
raise SynapseError(400, "Users are still joined to this room") raise SynapseError(400, "Users are still joined to this room")
await self.store.purge_room(room_id) await self.storage.purge_events.purge_room(room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_messages( def get_messages(

View file

@ -152,7 +152,7 @@ class BaseProfileHandler(BaseHandler):
by_admin (bool): Whether this change was made by an administrator. by_admin (bool): Whether this change was made by an administrator.
""" """
if not self.hs.is_mine(target_user): if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this homeserver")
if not by_admin and target_user != requester.user: if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname") raise AuthError(400, "Cannot set another user's displayname")
@ -207,7 +207,7 @@ class BaseProfileHandler(BaseHandler):
"""target_user is the user whose avatar_url is to be changed; """target_user is the user whose avatar_url is to be changed;
auth_user is the user attempting to make this change.""" auth_user is the user attempting to make this change."""
if not self.hs.is_mine(target_user): if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this homeserver")
if not by_admin and target_user != requester.user: if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url") raise AuthError(400, "Cannot set another user's avatar_url")
@ -231,7 +231,7 @@ class BaseProfileHandler(BaseHandler):
def on_profile_query(self, args): def on_profile_query(self, args):
user = UserID.from_string(args["user_id"]) user = UserID.from_string(args["user_id"])
if not self.hs.is_mine(user): if not self.hs.is_mine(user):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this homeserver")
just_field = args.get("field", None) just_field = args.get("field", None)

View file

@ -24,7 +24,6 @@ from synapse.api.errors import (
AuthError, AuthError,
Codes, Codes,
ConsentNotGivenError, ConsentNotGivenError,
LimitExceededError,
RegistrationError, RegistrationError,
SynapseError, SynapseError,
) )
@ -168,6 +167,7 @@ class RegistrationHandler(BaseHandler):
Raises: Raises:
RegistrationError if there was a problem registering. RegistrationError if there was a problem registering.
""" """
yield self.check_registration_ratelimit(address)
yield self.auth.check_auth_blocking(threepid=threepid) yield self.auth.check_auth_blocking(threepid=threepid)
password_hash = None password_hash = None
@ -217,8 +217,13 @@ class RegistrationHandler(BaseHandler):
else: else:
# autogen a sequential user ID # autogen a sequential user ID
fail_count = 0
user = None user = None
while not user: while not user:
# Fail after being unable to find a suitable ID a few times
if fail_count > 10:
raise SynapseError(500, "Unable to find a suitable guest user ID")
localpart = yield self._generate_user_id() localpart = yield self._generate_user_id()
user = UserID(localpart, self.hs.hostname) user = UserID(localpart, self.hs.hostname)
user_id = user.to_string() user_id = user.to_string()
@ -233,10 +238,14 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=default_display_name, create_profile_with_displayname=default_display_name,
address=address, address=address,
) )
# Successfully registered
break
except SynapseError: except SynapseError:
# if user id is taken, just generate another # if user id is taken, just generate another
user = None user = None
user_id = None user_id = None
fail_count += 1
if not self.hs.config.user_consent_at_registration: if not self.hs.config.user_consent_at_registration:
yield self._auto_join_rooms(user_id) yield self._auto_join_rooms(user_id)
@ -414,6 +423,29 @@ class RegistrationHandler(BaseHandler):
ratelimit=False, ratelimit=False,
) )
def check_registration_ratelimit(self, address):
"""A simple helper method to check whether the registration rate limit has been hit
for a given IP address
Args:
address (str|None): the IP address used to perform the registration. If this is
None, no ratelimiting will be performed.
Raises:
LimitExceededError: If the rate limit has been exceeded.
"""
if not address:
return
time_now = self.clock.time()
self.ratelimiter.ratelimit(
address,
time_now_s=time_now,
rate_hz=self.hs.config.rc_registration.per_second,
burst_count=self.hs.config.rc_registration.burst_count,
)
def register_with_store( def register_with_store(
self, self,
user_id, user_id,
@ -446,22 +478,6 @@ class RegistrationHandler(BaseHandler):
Returns: Returns:
Deferred Deferred
""" """
# Don't rate limit for app services
if appservice_id is None and address is not None:
time_now = self.clock.time()
allowed, time_allowed = self.ratelimiter.can_do_action(
address,
time_now_s=time_now,
rate_hz=self.hs.config.rc_registration.per_second,
burst_count=self.hs.config.rc_registration.burst_count,
)
if not allowed:
raise LimitExceededError(
retry_after_ms=int(1000 * (time_allowed - time_now))
)
if self.hs.config.worker_app: if self.hs.config.worker_app:
return self._register_client( return self._register_client(
user_id=user_id, user_id=user_id,

View file

@ -515,6 +515,15 @@ class RoomMemberHandler(object):
yield self.store.set_room_is_public(old_room_id, False) yield self.store.set_room_is_public(old_room_id, False)
yield self.store.set_room_is_public(room_id, True) yield self.store.set_room_is_public(room_id, True)
# Check if any groups we own contain the predecessor room
local_group_ids = yield self.store.get_local_groups_for_room(old_room_id)
for group_id in local_group_ids:
# Add new the new room to those groups
yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
# Remove the old room from those groups
yield self.store.remove_room_from_group(group_id, old_room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids): def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
"""Copy user-specific information when they join a new room when that new room is the """Copy user-specific information when they join a new room when that new room is the

View file

@ -120,7 +120,7 @@ class TypingHandler(object):
auth_user_id = auth_user.to_string() auth_user_id = auth_user.to_string()
if not self.is_mine_id(target_user_id): if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this homeserver")
if target_user_id != auth_user_id: if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state") raise AuthError(400, "Cannot set another user's typing state")
@ -150,7 +150,7 @@ class TypingHandler(object):
auth_user_id = auth_user.to_string() auth_user_id = auth_user.to_string()
if not self.is_mine_id(target_user_id): if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this homeserver")
if target_user_id != auth_user_id: if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state") raise AuthError(400, "Cannot set another user's typing state")

View file

@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False):
return {b"true": True, b"false": False}[args[name][0]] return {b"true": True, b"false": False}[args[name][0]]
except Exception: except Exception:
message = ( message = (
"Boolean query parameter %r must be one of" " ['true', 'false']" "Boolean query parameter %r must be one of ['true', 'false']"
) % (name,) ) % (name,)
raise SynapseError(400, message) raise SynapseError(400, message)
else: else:

View file

@ -261,6 +261,18 @@ def parse_drain_configs(
) )
class StoppableLogPublisher(LogPublisher):
"""
A log publisher that can tell its observers to shut down any external
communications.
"""
def stop(self):
for obs in self._observers:
if hasattr(obs, "stop"):
obs.stop()
def setup_structured_logging( def setup_structured_logging(
hs, hs,
config, config,
@ -336,7 +348,7 @@ def setup_structured_logging(
# We should never get here, but, just in case, throw an error. # We should never get here, but, just in case, throw an error.
raise ConfigError("%s drain type cannot be configured" % (observer.type,)) raise ConfigError("%s drain type cannot be configured" % (observer.type,))
publisher = LogPublisher(*observers) publisher = StoppableLogPublisher(*observers)
log_filter = LogLevelFilterPredicate() log_filter = LogLevelFilterPredicate()
for namespace, namespace_config in log_config.get( for namespace, namespace_config in log_config.get(

View file

@ -17,25 +17,29 @@
Log formatters that output terse JSON. Log formatters that output terse JSON.
""" """
import json
import sys import sys
import traceback
from collections import deque from collections import deque
from ipaddress import IPv4Address, IPv6Address, ip_address from ipaddress import IPv4Address, IPv6Address, ip_address
from math import floor from math import floor
from typing import IO from typing import IO, Optional
import attr import attr
from simplejson import dumps
from zope.interface import implementer from zope.interface import implementer
from twisted.application.internet import ClientService from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.endpoints import ( from twisted.internet.endpoints import (
HostnameEndpoint, HostnameEndpoint,
TCP4ClientEndpoint, TCP4ClientEndpoint,
TCP6ClientEndpoint, TCP6ClientEndpoint,
) )
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol from twisted.internet.protocol import Factory, Protocol
from twisted.logger import FileLogObserver, ILogObserver, Logger from twisted.logger import FileLogObserver, ILogObserver, Logger
from twisted.python.failure import Failure
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
def flatten_event(event: dict, metadata: dict, include_time: bool = False): def flatten_event(event: dict, metadata: dict, include_time: bool = False):
@ -141,11 +145,49 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
def formatEvent(_event: dict) -> str: def formatEvent(_event: dict) -> str:
flattened = flatten_event(_event, metadata) flattened = flatten_event(_event, metadata)
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" return _encoder.encode(flattened) + "\n"
return FileLogObserver(outFile, formatEvent) return FileLogObserver(outFile, formatEvent)
@attr.s
@implementer(IPushProducer)
class LogProducer(object):
"""
An IPushProducer that writes logs from its buffer to its transport when it
is resumed.
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
"""
transport = attr.ib(type=ITransport)
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)
def pauseProducing(self):
self._paused = True
def stopProducing(self):
self._paused = True
self._buffer = None
def resumeProducing(self):
self._paused = False
while self._paused is False and (self._buffer and self.transport.connected):
try:
event = self._buffer.popleft()
self.transport.write(_encoder.encode(event).encode("utf8"))
self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
traceback.print_exc(file=sys.__stderr__)
break
@attr.s @attr.s
@implementer(ILogObserver) @implementer(ILogObserver)
class TerseJSONToTCPLogObserver(object): class TerseJSONToTCPLogObserver(object):
@ -153,7 +195,7 @@ class TerseJSONToTCPLogObserver(object):
An IObserver that writes JSON logs to a TCP target. An IObserver that writes JSON logs to a TCP target.
Args: Args:
hs (HomeServer): The Homeserver that is being logged for. hs (HomeServer): The homeserver that is being logged for.
host: The host of the logging target. host: The host of the logging target.
port: The logging target's port. port: The logging target's port.
metadata: Metadata to be added to each log entry. metadata: Metadata to be added to each log entry.
@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object):
metadata = attr.ib(type=dict) metadata = attr.ib(type=dict)
maximum_buffer = attr.ib(type=int) maximum_buffer = attr.ib(type=int)
_buffer = attr.ib(default=attr.Factory(deque), type=deque) _buffer = attr.ib(default=attr.Factory(deque), type=deque)
_writer = attr.ib(default=None) _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
_logger = attr.ib(default=attr.Factory(Logger)) _logger = attr.ib(default=attr.Factory(Logger))
_producer = attr.ib(default=None, type=Optional[LogProducer])
def start(self) -> None: def start(self) -> None:
@ -187,38 +230,43 @@ class TerseJSONToTCPLogObserver(object):
factory = Factory.forProtocol(Protocol) factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
self._service.startService() self._service.startService()
self._connect()
def _write_loop(self) -> None: def stop(self):
self._service.stopService()
def _connect(self) -> None:
""" """
Implement the write loop. Triggers an attempt to connect then write to the remote if not already writing.
""" """
if self._writer: if self._connection_waiter:
return return
self._writer = self._service.whenConnected() self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
@self._writer.addBoth @self._connection_waiter.addErrback
def writer(r): def fail(r):
if isinstance(r, Failure):
r.printTraceback(file=sys.__stderr__) r.printTraceback(file=sys.__stderr__)
self._writer = None self._connection_waiter = None
self.hs.get_reactor().callLater(1, self._write_loop) self._connect()
@self._connection_waiter.addCallback
def writer(r):
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
self._producer.resumeProducing()
return return
try: # If the producer is still producing, stop it.
for event in self._buffer: if self._producer:
r.transport.write( self._producer.stopProducing()
dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
"utf8"
)
)
r.transport.write(b"\n")
self._buffer.clear()
except Exception as e:
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
self._writer = False # Make a new producer and start it.
self.hs.get_reactor().callLater(1, self._write_loop) self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
r.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None
def _handle_pressure(self) -> None: def _handle_pressure(self) -> None:
""" """
@ -277,4 +325,4 @@ class TerseJSONToTCPLogObserver(object):
self._logger.failure("Failed clearing backpressure") self._logger.failure("Failed clearing backpressure")
# Try and write immediately. # Try and write immediately.
self._write_loop() self._connect()

View file

@ -246,7 +246,7 @@ class HttpPusher(object):
# fixed, we don't suddenly deliver a load # fixed, we don't suddenly deliver a load
# of old notifications. # of old notifications.
logger.warning( logger.warning(
"Giving up on a notification to user %s, " "pushkey %s", "Giving up on a notification to user %s, pushkey %s",
self.user_id, self.user_id,
self.pushkey, self.pushkey,
) )
@ -299,8 +299,7 @@ class HttpPusher(object):
# for sanity, we only remove the pushkey if it # for sanity, we only remove the pushkey if it
# was the one we actually sent... # was the one we actually sent...
logger.warning( logger.warning(
("Ignoring rejected pushkey %s because we" " didn't send it"), ("Ignoring rejected pushkey %s because we didn't send it"), pk,
pk,
) )
else: else:
logger.info("Pushkey %s was rejected: removing", pk) logger.info("Pushkey %s was rejected: removing", pk)

View file

@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
MESSAGE_FROM_PERSON_IN_ROOM = ( MESSAGE_FROM_PERSON_IN_ROOM = (
"You have a message on %(app)s from %(person)s " "in the %(room)s room..." "You have a message on %(app)s from %(person)s in the %(room)s room..."
) )
MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..." MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..." MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = (
"You have messages on %(app)s from %(person)s and others..." "You have messages on %(app)s from %(person)s and others..."
) )
INVITE_FROM_PERSON_TO_ROOM = ( INVITE_FROM_PERSON_TO_ROOM = (
"%(person)s has invited you to join the " "%(room)s room on %(app)s..." "%(person)s has invited you to join the %(room)s room on %(app)s..."
) )
INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..." INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."

View file

@ -75,6 +75,8 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
async def _handle_request(self, request, user_id): async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
self.registration_handler.check_registration_ratelimit(content["address"])
await self.registration_handler.register_with_store( await self.registration_handler.register_with_store(
user_id=user_id, user_id=user_id,
password_hash=content["password_hash"], password_hash=content["password_hash"],

View file

@ -14,62 +14,39 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import hashlib
import hmac
import logging import logging
import platform import platform
import re import re
from six import text_type
from six.moves import http_client
import synapse import synapse
from synapse.api.constants import Membership, UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.servlet import ( from synapse.http.servlet import RestServlet, parse_json_object_from_request
RestServlet,
assert_params_in_dict,
parse_integer,
parse_json_object_from_request,
parse_string,
)
from synapse.rest.admin._base import ( from synapse.rest.admin._base import (
assert_requester_is_admin, assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns, historical_admin_path_patterns,
) )
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.rooms import ShutdownRoomRestServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.users import UserAdminServlet from synapse.rest.admin.users import (
from synapse.types import UserID, create_requester AccountValidityRenewServlet,
from synapse.util.async_helpers import maybe_awaitable DeactivateAccountRestServlet,
GetUsersPaginatedRestServlet,
ResetPasswordRestServlet,
SearchUsersRestServlet,
UserAdminServlet,
UserRegisterServlet,
UsersRestServlet,
WhoisRestServlet,
)
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class UsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, user_id):
target_user = UserID.from_string(user_id)
await assert_requester_is_admin(self.auth, request)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
ret = await self.handlers.admin_handler.get_users()
return 200, ret
class VersionServlet(RestServlet): class VersionServlet(RestServlet):
PATTERNS = (re.compile("^/_synapse/admin/v1/server_version$"),) PATTERNS = (re.compile("^/_synapse/admin/v1/server_version$"),)
@ -83,159 +60,6 @@ class VersionServlet(RestServlet):
return 200, self.res return 200, self.res
class UserRegisterServlet(RestServlet):
"""
Attributes:
NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
nonces (dict[str, int]): The nonces that we will accept. A dict of
nonce to the time it was generated, in int seconds.
"""
PATTERNS = historical_admin_path_patterns("/register")
NONCE_TIMEOUT = 60
def __init__(self, hs):
self.handlers = hs.get_handlers()
self.reactor = hs.get_reactor()
self.nonces = {}
self.hs = hs
def _clear_old_nonces(self):
"""
Clear out old nonces that are older than NONCE_TIMEOUT.
"""
now = int(self.reactor.seconds())
for k, v in list(self.nonces.items()):
if now - v > self.NONCE_TIMEOUT:
del self.nonces[k]
def on_GET(self, request):
"""
Generate a new nonce.
"""
self._clear_old_nonces()
nonce = self.hs.get_secrets().token_hex(64)
self.nonces[nonce] = int(self.reactor.seconds())
return 200, {"nonce": nonce}
async def on_POST(self, request):
self._clear_old_nonces()
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
body = parse_json_object_from_request(request)
if "nonce" not in body:
raise SynapseError(400, "nonce must be specified", errcode=Codes.BAD_JSON)
nonce = body["nonce"]
if nonce not in self.nonces:
raise SynapseError(400, "unrecognised nonce")
# Delete the nonce, so it can't be reused, even if it's invalid
del self.nonces[nonce]
if "username" not in body:
raise SynapseError(
400, "username must be specified", errcode=Codes.BAD_JSON
)
else:
if (
not isinstance(body["username"], text_type)
or len(body["username"]) > 512
):
raise SynapseError(400, "Invalid username")
username = body["username"].encode("utf-8")
if b"\x00" in username:
raise SynapseError(400, "Invalid username")
if "password" not in body:
raise SynapseError(
400, "password must be specified", errcode=Codes.BAD_JSON
)
else:
if (
not isinstance(body["password"], text_type)
or len(body["password"]) > 512
):
raise SynapseError(400, "Invalid password")
password = body["password"].encode("utf-8")
if b"\x00" in password:
raise SynapseError(400, "Invalid password")
admin = body.get("admin", None)
user_type = body.get("user_type", None)
if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
raise SynapseError(400, "Invalid user type")
got_mac = body["mac"]
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
digestmod=hashlib.sha1,
)
want_mac.update(nonce.encode("utf8"))
want_mac.update(b"\x00")
want_mac.update(username)
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
if user_type:
want_mac.update(b"\x00")
want_mac.update(user_type.encode("utf8"))
want_mac = want_mac.hexdigest()
if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
raise SynapseError(403, "HMAC incorrect")
# Reuse the parts of RegisterRestServlet to reduce code duplication
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
register = RegisterRestServlet(self.hs)
user_id = await register.registration_handler.register_user(
localpart=body["username"].lower(),
password=body["password"],
admin=bool(admin),
user_type=user_type,
)
result = await register._create_registration_details(user_id, body)
return 200, result
class WhoisRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/whois/(?P<user_id>[^/]*)")
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, user_id):
target_user = UserID.from_string(user_id)
requester = await self.auth.get_user_by_req(request)
auth_user = requester.user
if target_user != auth_user:
await assert_user_is_admin(self.auth, auth_user)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only whois a local user")
ret = await self.handlers.admin_handler.get_whois(target_user)
return 200, ret
class PurgeHistoryRestServlet(RestServlet): class PurgeHistoryRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns( PATTERNS = historical_admin_path_patterns(
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?" "/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
@ -342,369 +166,6 @@ class PurgeHistoryStatusRestServlet(RestServlet):
return 200, purge_status.asdict() return 200, purge_status.asdict()
class DeactivateAccountRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/deactivate/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self._deactivate_account_handler = hs.get_deactivate_account_handler()
self.auth = hs.get_auth()
async def on_POST(self, request, target_user_id):
await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request, allow_empty_body=True)
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
http_client.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
UserID.from_string(target_user_id)
result = await self._deactivate_account_handler.deactivate_account(
target_user_id, erase
)
if result:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
return 200, {"id_server_unbind_result": id_server_unbind_result}
class ShutdownRoomRestServlet(RestServlet):
"""Shuts down a room by removing all local users from the room and blocking
all future invites and joins to the room. Any local aliases will be repointed
to a new room created by `new_room_user_id` and kicked users will be auto
joined to the new room.
"""
PATTERNS = historical_admin_path_patterns("/shutdown_room/(?P<room_id>[^/]+)")
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
" violation will be blocked."
)
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self._room_creation_handler = hs.get_room_creation_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
async def on_POST(self, request, room_id):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
content = parse_json_object_from_request(request)
assert_params_in_dict(content, ["new_room_user_id"])
new_room_user_id = content["new_room_user_id"]
room_creator_requester = create_requester(new_room_user_id)
message = content.get("message", self.DEFAULT_MESSAGE)
room_name = content.get("room_name", "Content Violation Notification")
info = await self._room_creation_handler.create_room(
room_creator_requester,
config={
"preset": "public_chat",
"name": room_name,
"power_level_content_override": {"users_default": -10},
},
ratelimit=False,
)
new_room_id = info["room_id"]
requester_user_id = requester.user.to_string()
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
)
# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
await self.store.block_room(room_id, requester_user_id)
users = await self.state.get_current_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
logger.info("Kicking %r from %r...", user_id, room_id)
try:
target_requester = create_requester(user_id)
await self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
action=Membership.LEAVE,
content={},
ratelimit=False,
require_consent=False,
)
await self.room_member_handler.forget(target_requester.user, room_id)
await self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=new_room_id,
action=Membership.JOIN,
content={},
ratelimit=False,
require_consent=False,
)
kicked_users.append(user_id)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
failed_to_kick_users.append(user_id)
await self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = await maybe_awaitable(
self.store.get_aliases_for_room(room_id)
)
await self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
return (
200,
{
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
"local_aliases": aliases_for_room,
"new_room_id": new_room_id,
},
)
class ResetPasswordRestServlet(RestServlet):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/reset_password/
@user:to_reset_password?access_token=admin_access_token
JsonBodyToSend:
{
"new_password": "secret"
}
Returns:
200 OK with empty object if success otherwise an error.
"""
PATTERNS = historical_admin_path_patterns(
"/reset_password/(?P<target_user_id>[^/]*)"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.auth = hs.get_auth()
self._set_password_handler = hs.get_set_password_handler()
async def on_POST(self, request, target_user_id):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
"""
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
UserID.from_string(target_user_id)
params = parse_json_object_from_request(request)
assert_params_in_dict(params, ["new_password"])
new_password = params["new_password"]
await self._set_password_handler.set_password(
target_user_id, new_password, requester
)
return 200, {}
class GetUsersPaginatedRestServlet(RestServlet):
"""Get request to get specific number of users from Synapse.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/users_paginate/
@admin:user?access_token=admin_access_token&start=0&limit=10
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
PATTERNS = historical_admin_path_patterns(
"/users_paginate/(?P<target_user_id>[^/]*)"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, target_user_id):
"""Get request to get specific number of users from Synapse.
This needs user to have administrator access in Synapse.
"""
await assert_requester_is_admin(self.auth, request)
target_user = UserID.from_string(target_user_id)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
order = "name" # order by name in user table
start = parse_integer(request, "start", required=True)
limit = parse_integer(request, "limit", required=True)
logger.info("limit: %s, start: %s", limit, start)
ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
return 200, ret
async def on_POST(self, request, target_user_id):
"""Post request to get specific number of users from Synapse..
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/users_paginate/
@admin:user?access_token=admin_access_token
JsonBodyToSend:
{
"start": "0",
"limit": "10
}
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
await assert_requester_is_admin(self.auth, request)
UserID.from_string(target_user_id)
order = "name" # order by name in user table
params = parse_json_object_from_request(request)
assert_params_in_dict(params, ["limit", "start"])
limit = params["limit"]
start = params["start"]
logger.info("limit: %s, start: %s", limit, start)
ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
return 200, ret
class SearchUsersRestServlet(RestServlet):
"""Get request to search user table for specific users according to
search term.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/search_users/
@admin:user?access_token=admin_access_token&term=alice
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, target_user_id):
"""Get request to search user table for specific users according to
search term.
This needs user to have a administrator access in Synapse.
"""
await assert_requester_is_admin(self.auth, request)
target_user = UserID.from_string(target_user_id)
# To allow all users to get the users list
# if not is_admin and target_user != auth_user:
# raise AuthError(403, "You are not a server admin")
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
term = parse_string(request, "term", required=True)
logger.info("term: %s ", term)
ret = await self.handlers.admin_handler.search_users(term)
return 200, ret
class DeleteGroupAdminRestServlet(RestServlet):
"""Allows deleting of local groups
"""
PATTERNS = historical_admin_path_patterns("/delete_group/(?P<group_id>[^/]*)")
def __init__(self, hs):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
async def on_POST(self, request, group_id):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
if not self.is_mine_id(group_id):
raise SynapseError(400, "Can only delete local groups")
await self.group_server.delete_group(group_id, requester.user.to_string())
return 200, {}
class AccountValidityRenewServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/account_validity/validity$")
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
self.hs = hs
self.account_activity_handler = hs.get_account_validity_handler()
self.auth = hs.get_auth()
async def on_POST(self, request):
await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request)
if "user_id" not in body:
raise SynapseError(400, "Missing property 'user_id' in the request body")
expiration_ts = await self.account_activity_handler.renew_account_for_user(
body["user_id"],
body.get("expiration_ts"),
not body.get("enable_renewal_emails", True),
)
res = {"expiration_ts": expiration_ts}
return 200, res
######################################################################################## ########################################################################################
# #
# please don't add more servlets here: this file is already long and unwieldy. Put # please don't add more servlets here: this file is already long and unwieldy. Put

View file

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet
from synapse.rest.admin._base import (
assert_user_is_admin,
historical_admin_path_patterns,
)
logger = logging.getLogger(__name__)
class DeleteGroupAdminRestServlet(RestServlet):
"""Allows deleting of local groups
"""
PATTERNS = historical_admin_path_patterns("/delete_group/(?P<group_id>[^/]*)")
def __init__(self, hs):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
async def on_POST(self, request, group_id):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
if not self.is_mine_id(group_id):
raise SynapseError(400, "Can only delete local groups")
await self.group_server.delete_group(group_id, requester.user.to_string())
return 200, {}

157
synapse/rest/admin/rooms.py Normal file
View file

@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.api.constants import Membership
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.rest.admin._base import (
assert_user_is_admin,
historical_admin_path_patterns,
)
from synapse.types import create_requester
from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
class ShutdownRoomRestServlet(RestServlet):
"""Shuts down a room by removing all local users from the room and blocking
all future invites and joins to the room. Any local aliases will be repointed
to a new room created by `new_room_user_id` and kicked users will be auto
joined to the new room.
"""
PATTERNS = historical_admin_path_patterns("/shutdown_room/(?P<room_id>[^/]+)")
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
" violation will be blocked."
)
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self._room_creation_handler = hs.get_room_creation_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
async def on_POST(self, request, room_id):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
content = parse_json_object_from_request(request)
assert_params_in_dict(content, ["new_room_user_id"])
new_room_user_id = content["new_room_user_id"]
room_creator_requester = create_requester(new_room_user_id)
message = content.get("message", self.DEFAULT_MESSAGE)
room_name = content.get("room_name", "Content Violation Notification")
info = await self._room_creation_handler.create_room(
room_creator_requester,
config={
"preset": "public_chat",
"name": room_name,
"power_level_content_override": {"users_default": -10},
},
ratelimit=False,
)
new_room_id = info["room_id"]
requester_user_id = requester.user.to_string()
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
)
# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
await self.store.block_room(room_id, requester_user_id)
users = await self.state.get_current_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
logger.info("Kicking %r from %r...", user_id, room_id)
try:
target_requester = create_requester(user_id)
await self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
action=Membership.LEAVE,
content={},
ratelimit=False,
require_consent=False,
)
await self.room_member_handler.forget(target_requester.user, room_id)
await self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=new_room_id,
action=Membership.JOIN,
content={},
ratelimit=False,
require_consent=False,
)
kicked_users.append(user_id)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
failed_to_kick_users.append(user_id)
await self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = await maybe_awaitable(
self.store.get_aliases_for_room(room_id)
)
await self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
return (
200,
{
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
"local_aliases": aliases_for_room,
"new_room_id": new_room_id,
},
)

View file

@ -12,17 +12,419 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import hashlib
import hmac
import logging
import re import re
from synapse.api.errors import SynapseError from six import text_type
from six.moves import http_client
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import ( from synapse.http.servlet import (
RestServlet, RestServlet,
assert_params_in_dict, assert_params_in_dict,
parse_integer,
parse_json_object_from_request, parse_json_object_from_request,
parse_string,
)
from synapse.rest.admin._base import (
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
) )
from synapse.rest.admin import assert_requester_is_admin, assert_user_is_admin
from synapse.types import UserID from synapse.types import UserID
logger = logging.getLogger(__name__)
class UsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.admin_handler = hs.get_handlers().admin_handler
async def on_GET(self, request, user_id):
target_user = UserID.from_string(user_id)
await assert_requester_is_admin(self.auth, request)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
ret = await self.admin_handler.get_users()
return 200, ret
class GetUsersPaginatedRestServlet(RestServlet):
"""Get request to get specific number of users from Synapse.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/users_paginate/
@admin:user?access_token=admin_access_token&start=0&limit=10
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
PATTERNS = historical_admin_path_patterns(
"/users_paginate/(?P<target_user_id>[^/]*)"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, target_user_id):
"""Get request to get specific number of users from Synapse.
This needs user to have administrator access in Synapse.
"""
await assert_requester_is_admin(self.auth, request)
target_user = UserID.from_string(target_user_id)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
order = "name" # order by name in user table
start = parse_integer(request, "start", required=True)
limit = parse_integer(request, "limit", required=True)
logger.info("limit: %s, start: %s", limit, start)
ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
return 200, ret
async def on_POST(self, request, target_user_id):
"""Post request to get specific number of users from Synapse..
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/users_paginate/
@admin:user?access_token=admin_access_token
JsonBodyToSend:
{
"start": "0",
"limit": "10
}
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
await assert_requester_is_admin(self.auth, request)
UserID.from_string(target_user_id)
order = "name" # order by name in user table
params = parse_json_object_from_request(request)
assert_params_in_dict(params, ["limit", "start"])
limit = params["limit"]
start = params["start"]
logger.info("limit: %s, start: %s", limit, start)
ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
return 200, ret
class UserRegisterServlet(RestServlet):
"""
Attributes:
NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
nonces (dict[str, int]): The nonces that we will accept. A dict of
nonce to the time it was generated, in int seconds.
"""
PATTERNS = historical_admin_path_patterns("/register")
NONCE_TIMEOUT = 60
def __init__(self, hs):
self.handlers = hs.get_handlers()
self.reactor = hs.get_reactor()
self.nonces = {}
self.hs = hs
def _clear_old_nonces(self):
"""
Clear out old nonces that are older than NONCE_TIMEOUT.
"""
now = int(self.reactor.seconds())
for k, v in list(self.nonces.items()):
if now - v > self.NONCE_TIMEOUT:
del self.nonces[k]
def on_GET(self, request):
"""
Generate a new nonce.
"""
self._clear_old_nonces()
nonce = self.hs.get_secrets().token_hex(64)
self.nonces[nonce] = int(self.reactor.seconds())
return 200, {"nonce": nonce}
async def on_POST(self, request):
self._clear_old_nonces()
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
body = parse_json_object_from_request(request)
if "nonce" not in body:
raise SynapseError(400, "nonce must be specified", errcode=Codes.BAD_JSON)
nonce = body["nonce"]
if nonce not in self.nonces:
raise SynapseError(400, "unrecognised nonce")
# Delete the nonce, so it can't be reused, even if it's invalid
del self.nonces[nonce]
if "username" not in body:
raise SynapseError(
400, "username must be specified", errcode=Codes.BAD_JSON
)
else:
if (
not isinstance(body["username"], text_type)
or len(body["username"]) > 512
):
raise SynapseError(400, "Invalid username")
username = body["username"].encode("utf-8")
if b"\x00" in username:
raise SynapseError(400, "Invalid username")
if "password" not in body:
raise SynapseError(
400, "password must be specified", errcode=Codes.BAD_JSON
)
else:
if (
not isinstance(body["password"], text_type)
or len(body["password"]) > 512
):
raise SynapseError(400, "Invalid password")
password = body["password"].encode("utf-8")
if b"\x00" in password:
raise SynapseError(400, "Invalid password")
admin = body.get("admin", None)
user_type = body.get("user_type", None)
if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
raise SynapseError(400, "Invalid user type")
got_mac = body["mac"]
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
digestmod=hashlib.sha1,
)
want_mac.update(nonce.encode("utf8"))
want_mac.update(b"\x00")
want_mac.update(username)
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
if user_type:
want_mac.update(b"\x00")
want_mac.update(user_type.encode("utf8"))
want_mac = want_mac.hexdigest()
if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
raise SynapseError(403, "HMAC incorrect")
# Reuse the parts of RegisterRestServlet to reduce code duplication
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
register = RegisterRestServlet(self.hs)
user_id = await register.registration_handler.register_user(
localpart=body["username"].lower(),
password=body["password"],
admin=bool(admin),
user_type=user_type,
)
result = await register._create_registration_details(user_id, body)
return 200, result
class WhoisRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/whois/(?P<user_id>[^/]*)")
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, user_id):
target_user = UserID.from_string(user_id)
requester = await self.auth.get_user_by_req(request)
auth_user = requester.user
if target_user != auth_user:
await assert_user_is_admin(self.auth, auth_user)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only whois a local user")
ret = await self.handlers.admin_handler.get_whois(target_user)
return 200, ret
class DeactivateAccountRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/deactivate/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self._deactivate_account_handler = hs.get_deactivate_account_handler()
self.auth = hs.get_auth()
async def on_POST(self, request, target_user_id):
await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request, allow_empty_body=True)
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
http_client.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
UserID.from_string(target_user_id)
result = await self._deactivate_account_handler.deactivate_account(
target_user_id, erase
)
if result:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
return 200, {"id_server_unbind_result": id_server_unbind_result}
class AccountValidityRenewServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/account_validity/validity$")
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
self.hs = hs
self.account_activity_handler = hs.get_account_validity_handler()
self.auth = hs.get_auth()
async def on_POST(self, request):
await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request)
if "user_id" not in body:
raise SynapseError(400, "Missing property 'user_id' in the request body")
expiration_ts = await self.account_activity_handler.renew_account_for_user(
body["user_id"],
body.get("expiration_ts"),
not body.get("enable_renewal_emails", True),
)
res = {"expiration_ts": expiration_ts}
return 200, res
class ResetPasswordRestServlet(RestServlet):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/reset_password/
@user:to_reset_password?access_token=admin_access_token
JsonBodyToSend:
{
"new_password": "secret"
}
Returns:
200 OK with empty object if success otherwise an error.
"""
PATTERNS = historical_admin_path_patterns(
"/reset_password/(?P<target_user_id>[^/]*)"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.auth = hs.get_auth()
self._set_password_handler = hs.get_set_password_handler()
async def on_POST(self, request, target_user_id):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
"""
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
UserID.from_string(target_user_id)
params = parse_json_object_from_request(request)
assert_params_in_dict(params, ["new_password"])
new_password = params["new_password"]
await self._set_password_handler.set_password(
target_user_id, new_password, requester
)
return 200, {}
class SearchUsersRestServlet(RestServlet):
"""Get request to search user table for specific users according to
search term.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/search_users/
@admin:user?access_token=admin_access_token&term=alice
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, target_user_id):
"""Get request to search user table for specific users according to
search term.
This needs user to have a administrator access in Synapse.
"""
await assert_requester_is_admin(self.auth, request)
target_user = UserID.from_string(target_user_id)
# To allow all users to get the users list
# if not is_admin and target_user != auth_user:
# raise AuthError(403, "You are not a server admin")
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
term = parse_string(request, "term", required=True)
logger.info("term: %s ", term)
ret = await self.handlers.admin_handler.search_users(term)
return 200, ret
class UserAdminServlet(RestServlet): class UserAdminServlet(RestServlet):
""" """

View file

@ -92,8 +92,11 @@ class LoginRestServlet(RestServlet):
self.auth_handler = self.hs.get_auth_handler() self.auth_handler = self.hs.get_auth_handler()
self.registration_handler = hs.get_registration_handler() self.registration_handler = hs.get_registration_handler()
self.handlers = hs.get_handlers() self.handlers = hs.get_handlers()
self._clock = hs.get_clock()
self._well_known_builder = WellKnownBuilder(hs) self._well_known_builder = WellKnownBuilder(hs)
self._address_ratelimiter = Ratelimiter() self._address_ratelimiter = Ratelimiter()
self._account_ratelimiter = Ratelimiter()
self._failed_attempts_ratelimiter = Ratelimiter()
def on_GET(self, request): def on_GET(self, request):
flows = [] flows = []
@ -202,6 +205,16 @@ class LoginRestServlet(RestServlet):
# (See add_threepid in synapse/handlers/auth.py) # (See add_threepid in synapse/handlers/auth.py)
address = address.lower() address = address.lower()
# We also apply account rate limiting using the 3PID as a key, as
# otherwise using 3PID bypasses the ratelimiting based on user ID.
self._failed_attempts_ratelimiter.ratelimit(
(medium, address),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=False,
)
# Check for login providers that support 3pid login types # Check for login providers that support 3pid login types
( (
canonical_user_id, canonical_user_id,
@ -211,7 +224,8 @@ class LoginRestServlet(RestServlet):
) )
if canonical_user_id: if canonical_user_id:
# Authentication through password provider and 3pid succeeded # Authentication through password provider and 3pid succeeded
result = yield self._register_device_with_callback(
result = yield self._complete_login(
canonical_user_id, login_submission, callback_3pid canonical_user_id, login_submission, callback_3pid
) )
return result return result
@ -225,6 +239,21 @@ class LoginRestServlet(RestServlet):
logger.warning( logger.warning(
"unknown 3pid identifier medium %s, address %r", medium, address "unknown 3pid identifier medium %s, address %r", medium, address
) )
# We mark that we've failed to log in here, as
# `check_password_provider_3pid` might have returned `None` due
# to an incorrect password, rather than the account not
# existing.
#
# If it returned None but the 3PID was bound then we won't hit
# this code path, which is fine as then the per-user ratelimit
# will kick in below.
self._failed_attempts_ratelimiter.can_do_action(
(medium, address),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=True,
)
raise LoginError(403, "", errcode=Codes.FORBIDDEN) raise LoginError(403, "", errcode=Codes.FORBIDDEN)
identifier = {"type": "m.id.user", "user": user_id} identifier = {"type": "m.id.user", "user": user_id}
@ -236,29 +265,84 @@ class LoginRestServlet(RestServlet):
if "user" not in identifier: if "user" not in identifier:
raise SynapseError(400, "User identifier is missing 'user' key") raise SynapseError(400, "User identifier is missing 'user' key")
if identifier["user"].startswith("@"):
qualified_user_id = identifier["user"]
else:
qualified_user_id = UserID(identifier["user"], self.hs.hostname).to_string()
# Check if we've hit the failed ratelimit (but don't update it)
self._failed_attempts_ratelimiter.ratelimit(
qualified_user_id.lower(),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=False,
)
try:
canonical_user_id, callback = yield self.auth_handler.validate_login( canonical_user_id, callback = yield self.auth_handler.validate_login(
identifier["user"], login_submission identifier["user"], login_submission
) )
except LoginError:
# The user has failed to log in, so we need to update the rate
# limiter. Using `can_do_action` avoids us raising a ratelimit
# exception and masking the LoginError. The actual ratelimiting
# should have happened above.
self._failed_attempts_ratelimiter.can_do_action(
qualified_user_id.lower(),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=True,
)
raise
result = yield self._register_device_with_callback( result = yield self._complete_login(
canonical_user_id, login_submission, callback canonical_user_id, login_submission, callback
) )
return result return result
@defer.inlineCallbacks @defer.inlineCallbacks
def _register_device_with_callback(self, user_id, login_submission, callback=None): def _complete_login(
""" Registers a device with a given user_id. Optionally run a callback self, user_id, login_submission, callback=None, create_non_existant_users=False
function after registration has completed. ):
"""Called when we've successfully authed the user and now need to
actually login them in (e.g. create devices). This gets called on
all succesful logins.
Applies the ratelimiting for succesful login attempts against an
account.
Args: Args:
user_id (str): ID of the user to register. user_id (str): ID of the user to register.
login_submission (dict): Dictionary of login information. login_submission (dict): Dictionary of login information.
callback (func|None): Callback function to run after registration. callback (func|None): Callback function to run after registration.
create_non_existant_users (bool): Whether to create the user if
they don't exist. Defaults to False.
Returns: Returns:
result (Dict[str,str]): Dictionary of account information after result (Dict[str,str]): Dictionary of account information after
successful registration. successful registration.
""" """
# Before we actually log them in we check if they've already logged in
# too often. This happens here rather than before as we don't
# necessarily know the user before now.
self._account_ratelimiter.ratelimit(
user_id.lower(),
time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_account.per_second,
burst_count=self.hs.config.rc_login_account.burst_count,
update=True,
)
if create_non_existant_users:
user_id = yield self.auth_handler.check_user_exists(user_id)
if not user_id:
user_id = yield self.registration_handler.register_user(
localpart=UserID.from_string(user_id).localpart
)
device_id = login_submission.get("device_id") device_id = login_submission.get("device_id")
initial_display_name = login_submission.get("initial_device_display_name") initial_display_name = login_submission.get("initial_device_display_name")
device_id, access_token = yield self.registration_handler.register_device( device_id, access_token = yield self.registration_handler.register_device(
@ -285,7 +369,7 @@ class LoginRestServlet(RestServlet):
token token
) )
result = yield self._register_device_with_callback(user_id, login_submission) result = yield self._complete_login(user_id, login_submission)
return result return result
@defer.inlineCallbacks @defer.inlineCallbacks
@ -313,15 +397,8 @@ class LoginRestServlet(RestServlet):
raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED) raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
user_id = UserID(user, self.hs.hostname).to_string() user_id = UserID(user, self.hs.hostname).to_string()
result = yield self._complete_login(
registered_user_id = yield self.auth_handler.check_user_exists(user_id) user_id, login_submission, create_non_existant_users=True
if not registered_user_id:
registered_user_id = yield self.registration_handler.register_user(
localpart=user
)
result = yield self._register_device_with_callback(
registered_user_id, login_submission
) )
return result return result

View file

@ -56,6 +56,9 @@ logger = logging.getLogger(__name__)
_charset_match = re.compile(br"<\s*meta[^>]*charset\s*=\s*([a-z0-9-]+)", flags=re.I) _charset_match = re.compile(br"<\s*meta[^>]*charset\s*=\s*([a-z0-9-]+)", flags=re.I)
_content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I) _content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
OG_TAG_NAME_MAXLEN = 50
OG_TAG_VALUE_MAXLEN = 1000
class PreviewUrlResource(DirectServeResource): class PreviewUrlResource(DirectServeResource):
isLeaf = True isLeaf = True
@ -119,7 +122,7 @@ class PreviewUrlResource(DirectServeResource):
pattern = entry[attrib] pattern = entry[attrib]
value = getattr(url_tuple, attrib) value = getattr(url_tuple, attrib)
logger.debug( logger.debug(
"Matching attrib '%s' with value '%s' against" " pattern '%s'", "Matching attrib '%s' with value '%s' against pattern '%s'",
attrib, attrib,
value, value,
pattern, pattern,
@ -171,7 +174,7 @@ class PreviewUrlResource(DirectServeResource):
ts (int): ts (int):
Returns: Returns:
Deferred[str]: json-encoded og data Deferred[bytes]: json-encoded og data
""" """
# check the URL cache in the DB (which will also provide us with # check the URL cache in the DB (which will also provide us with
# historical previews, if we have any) # historical previews, if we have any)
@ -272,6 +275,18 @@ class PreviewUrlResource(DirectServeResource):
logger.warning("Failed to find any OG data in %s", url) logger.warning("Failed to find any OG data in %s", url)
og = {} og = {}
# filter out any stupidly long values
keys_to_remove = []
for k, v in og.items():
# values can be numeric as well as strings, hence the cast to str
if len(k) > OG_TAG_NAME_MAXLEN or len(str(v)) > OG_TAG_VALUE_MAXLEN:
logger.warning(
"Pruning overlong tag %s from OG data", k[:OG_TAG_NAME_MAXLEN]
)
keys_to_remove.append(k)
for k in keys_to_remove:
del og[k]
logger.debug("Calculated OG for %s as %s", url, og) logger.debug("Calculated OG for %s as %s", url, og)
jsonog = json.dumps(og) jsonog = json.dumps(og)
@ -506,6 +521,10 @@ def _calc_og(tree, media_uri):
og = {} og = {}
for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"): for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
if "content" in tag.attrib: if "content" in tag.attrib:
# if we've got more than 50 tags, someone is taking the piss
if len(og) >= 50:
logger.warning("Skipping OG for page with too many 'og:' tags")
return {}
og[tag.attrib["property"]] = tag.attrib["content"] og[tag.attrib["property"]] = tag.attrib["content"]
# TODO: grab article: meta tags too, e.g.: # TODO: grab article: meta tags too, e.g.:

View file

@ -54,7 +54,7 @@ class ConsentServerNotices(object):
) )
if "body" not in self._server_notice_content: if "body" not in self._server_notice_content:
raise ConfigError( raise ConfigError(
"user_consent server_notice_consent must contain a 'body' " "key." "user_consent server_notice_consent must contain a 'body' key."
) )
self._consent_uri_builder = ConsentURIBuilder(hs.config) self._consent_uri_builder = ConsentURIBuilder(hs.config)

View file

@ -16,6 +16,7 @@
import logging import logging
from collections import namedtuple from collections import namedtuple
from typing import Iterable, Optional
from six import iteritems, itervalues from six import iteritems, itervalues
@ -27,6 +28,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.state import v1, v2 from synapse.state import v1, v2
@ -212,15 +214,17 @@ class StateHandler(object):
return joined_hosts return joined_hosts
@defer.inlineCallbacks @defer.inlineCallbacks
def compute_event_context(self, event, old_state=None): def compute_event_context(
self, event: EventBase, old_state: Optional[Iterable[EventBase]] = None
):
"""Build an EventContext structure for the event. """Build an EventContext structure for the event.
This works out what the current state should be for the event, and This works out what the current state should be for the event, and
generates a new state group if necessary. generates a new state group if necessary.
Args: Args:
event (synapse.events.EventBase): event:
old_state (dict|None): The state at the event if it can't be old_state: The state at the event if it can't be
calculated from existing events. This is normally only specified calculated from existing events. This is normally only specified
when receiving an event from federation where we don't have the when receiving an event from federation where we don't have the
prev events for, e.g. when backfilling. prev events for, e.g. when backfilling.
@ -232,6 +236,9 @@ class StateHandler(object):
# If this is an outlier, then we know it shouldn't have any current # If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and # state. Certainly store.get_current_state won't return any, and
# persisting the event won't store the state group. # persisting the event won't store the state group.
# FIXME: why do we populate current_state_ids? I thought the point was
# that we weren't supposed to have any state for outliers?
if old_state: if old_state:
prev_state_ids = {(s.type, s.state_key): s.event_id for s in old_state} prev_state_ids = {(s.type, s.state_key): s.event_id for s in old_state}
if event.is_state(): if event.is_state():
@ -248,114 +255,104 @@ class StateHandler(object):
# group for it. # group for it.
context = EventContext.with_state( context = EventContext.with_state(
state_group=None, state_group=None,
state_group_before_event=None,
current_state_ids=current_state_ids, current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids, prev_state_ids=prev_state_ids,
) )
return context return context
#
# first of all, figure out the state before the event
#
if old_state: if old_state:
# We already have the state, so we don't need to calculate it. # if we're given the state before the event, then we use that
# Let's just correctly fill out the context and create a state_ids_before_event = {
# new state group for it. (s.type, s.state_key): s.event_id for s in old_state
}
state_group_before_event = None
state_group_before_event_prev_group = None
deltas_to_state_group_before_event = None
prev_state_ids = {(s.type, s.state_key): s.event_id for s in old_state}
if event.is_state():
key = (event.type, event.state_key)
if key in prev_state_ids:
replaces = prev_state_ids[key]
if replaces != event.event_id: # Paranoia check
event.unsigned["replaces_state"] = replaces
current_state_ids = dict(prev_state_ids)
current_state_ids[key] = event.event_id
else: else:
current_state_ids = prev_state_ids # otherwise, we'll need to resolve the state across the prev_events.
state_group = yield self.state_store.store_state_group(
event.event_id,
event.room_id,
prev_group=None,
delta_ids=None,
current_state_ids=current_state_ids,
)
context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
)
return context
logger.debug("calling resolve_state_groups from compute_event_context") logger.debug("calling resolve_state_groups from compute_event_context")
entry = yield self.resolve_state_groups_for_events( entry = yield self.resolve_state_groups_for_events(
event.room_id, event.prev_event_ids() event.room_id, event.prev_event_ids()
) )
prev_state_ids = entry.state state_ids_before_event = entry.state
prev_group = None state_group_before_event = entry.state_group
delta_ids = None state_group_before_event_prev_group = entry.prev_group
deltas_to_state_group_before_event = entry.delta_ids
if event.is_state(): #
# If this is a state event then we need to create a new state # make sure that we have a state group at that point. If it's not a state event,
# group for the state after this event. # that will be the state group for the new event. If it *is* a state event,
# it might get rejected (in which case we'll need to persist it with the
# previous state group)
#
if not state_group_before_event:
state_group_before_event = yield self.state_store.store_state_group(
event.event_id,
event.room_id,
prev_group=state_group_before_event_prev_group,
delta_ids=deltas_to_state_group_before_event,
current_state_ids=state_ids_before_event,
)
# XXX: can we update the state cache entry for the new state group? or
# could we set a flag on resolve_state_groups_for_events to tell it to
# always make a state group?
#
# now if it's not a state event, we're done
#
if not event.is_state():
return EventContext.with_state(
state_group_before_event=state_group_before_event,
state_group=state_group_before_event,
current_state_ids=state_ids_before_event,
prev_state_ids=state_ids_before_event,
prev_group=state_group_before_event_prev_group,
delta_ids=deltas_to_state_group_before_event,
)
#
# otherwise, we'll need to create a new state group for after the event
#
key = (event.type, event.state_key) key = (event.type, event.state_key)
if key in prev_state_ids: if key in state_ids_before_event:
replaces = prev_state_ids[key] replaces = state_ids_before_event[key]
if replaces != event.event_id:
event.unsigned["replaces_state"] = replaces event.unsigned["replaces_state"] = replaces
current_state_ids = dict(prev_state_ids) state_ids_after_event = dict(state_ids_before_event)
current_state_ids[key] = event.event_id state_ids_after_event[key] = event.event_id
if entry.state_group:
# If the state at the event has a state group assigned then
# we can use that as the prev group
prev_group = entry.state_group
delta_ids = {key: event.event_id} delta_ids = {key: event.event_id}
elif entry.prev_group:
# If the state at the event only has a prev group, then we can
# use that as a prev group too.
prev_group = entry.prev_group
delta_ids = dict(entry.delta_ids)
delta_ids[key] = event.event_id
state_group = yield self.state_store.store_state_group( state_group_after_event = yield self.state_store.store_state_group(
event.event_id, event.event_id,
event.room_id, event.room_id,
prev_group=prev_group, prev_group=state_group_before_event,
delta_ids=delta_ids, delta_ids=delta_ids,
current_state_ids=current_state_ids, current_state_ids=state_ids_after_event,
) )
else:
current_state_ids = prev_state_ids
prev_group = entry.prev_group
delta_ids = entry.delta_ids
if entry.state_group is None: return EventContext.with_state(
entry.state_group = yield self.state_store.store_state_group( state_group=state_group_after_event,
event.event_id, state_group_before_event=state_group_before_event,
event.room_id, current_state_ids=state_ids_after_event,
prev_group=entry.prev_group, prev_state_ids=state_ids_before_event,
delta_ids=entry.delta_ids, prev_group=state_group_before_event,
current_state_ids=current_state_ids,
)
entry.state_id = entry.state_group
state_group = entry.state_group
context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
prev_group=prev_group,
delta_ids=delta_ids, delta_ids=delta_ids,
) )
return context
@measure_func() @measure_func()
@defer.inlineCallbacks @defer.inlineCallbacks
def resolve_state_groups_for_events(self, room_id, event_ids): def resolve_state_groups_for_events(self, room_id, event_ids):

View file

@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage from synapse.storage.persist_events import EventsPersistenceStorage
from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage from synapse.storage.state import StateGroupStorage
__all__ = ["DataStores", "DataStore"] __all__ = ["DataStores", "DataStore"]
@ -46,6 +47,7 @@ class Storage(object):
self.main = stores.main self.main = stores.main
self.persistence = EventsPersistenceStorage(hs, stores) self.persistence = EventsPersistenceStorage(hs, stores)
self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores) self.state = StateGroupStorage(hs, stores)

View file

@ -361,14 +361,11 @@ class SQLBaseStore(object):
expiration_ts, expiration_ts,
) )
self._simple_insert_txn( self._simple_upsert_txn(
txn, txn,
"account_validity", "account_validity",
values={ keyvalues={"user_id": user_id},
"user_id": user_id, values={"expiration_ts_ms": expiration_ts, "email_sent": False},
"expiration_ts_ms": expiration_ts,
"email_sent": False,
},
) )
def start_profiling(self): def start_profiling(self):
@ -412,16 +409,15 @@ class SQLBaseStore(object):
i = 0 i = 0
N = 5 N = 5
while True: while True:
try: cursor = LoggingTransaction(
txn = conn.cursor() conn.cursor(),
txn = LoggingTransaction(
txn,
name, name,
self.database_engine, self.database_engine,
after_callbacks, after_callbacks,
exception_callbacks, exception_callbacks,
) )
r = func(txn, *args, **kwargs) try:
r = func(cursor, *args, **kwargs)
conn.commit() conn.commit()
return r return r
except self.database_engine.module.OperationalError as e: except self.database_engine.module.OperationalError as e:
@ -459,6 +455,40 @@ class SQLBaseStore(object):
) )
continue continue
raise raise
finally:
# we're either about to retry with a new cursor, or we're about to
# release the connection. Once we release the connection, it could
# get used for another query, which might do a conn.rollback().
#
# In the latter case, even though that probably wouldn't affect the
# results of this transaction, python's sqlite will reset all
# statements on the connection [1], which will make our cursor
# invalid [2].
#
# In any case, continuing to read rows after commit()ing seems
# dubious from the PoV of ACID transactional semantics
# (sqlite explicitly says that once you commit, you may see rows
# from subsequent updates.)
#
# In psycopg2, cursors are essentially a client-side fabrication -
# all the data is transferred to the client side when the statement
# finishes executing - so in theory we could go on streaming results
# from the cursor, but attempting to do so would make us
# incompatible with sqlite, so let's make sure we're not doing that
# by closing the cursor.
#
# (*named* cursors in psycopg2 are different and are proper server-
# side things, but (a) we don't use them and (b) they are implicitly
# closed by ending the transaction anyway.)
#
# In short, if we haven't finished with the cursor yet, that's a
# problem waiting to bite us.
#
# TL;DR: we're done with the cursor, so we can close it.
#
# [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
# [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
cursor.close()
except Exception as e: except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", name, e) logger.debug("[TXN FAIL] {%s} %s", name, e)
raise raise
@ -854,7 +884,7 @@ class SQLBaseStore(object):
allvalues.update(values) allvalues.update(values)
latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values) latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % ( sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
table, table,
", ".join(k for k in allvalues), ", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues), ", ".join("?" for _ in allvalues),

View file

@ -358,8 +358,21 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
def _add_messages_to_local_device_inbox_txn( def _add_messages_to_local_device_inbox_txn(
self, txn, stream_id, messages_by_user_then_device self, txn, stream_id, messages_by_user_then_device
): ):
sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?" # Compatible method of performing an upsert
txn.execute(sql, (stream_id, stream_id)) sql = "SELECT stream_id FROM device_max_stream_id"
txn.execute(sql)
rows = txn.fetchone()
if rows:
db_stream_id = rows[0]
if db_stream_id < stream_id:
# Insert the new stream_id
sql = "UPDATE device_max_stream_id SET stream_id = ?"
else:
# No rows, perform an insert
sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
txn.execute(sql, (stream_id,))
local_by_user_then_device = {} local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items(): for user_id, messages_by_device in messages_by_user_then_device.items():
@ -367,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
devices = list(messages_by_device.keys()) devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*": if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids. # Handle wildcard device_ids.
sql = "SELECT device_id FROM devices" " WHERE user_id = ?" sql = "SELECT device_id FROM devices WHERE user_id = ?"
txn.execute(sql, (user_id,)) txn.execute(sql, (user_id,))
message_json = json.dumps(messages_by_device["*"]) message_json = json.dumps(messages_by_device["*"])
for row in txn: for row in txn:

View file

@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
result.setdefault(user_id, {})[device_id] = None result.setdefault(user_id, {})[device_id] = None
# get signatures on the device # get signatures on the device
signature_sql = ( signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % (
"SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s" " OR ".join("(" + q + ")" for q in signature_query_clauses)
) % (" OR ".join("(" + q + ")" for q in signature_query_clauses)) )
txn.execute(signature_sql, signature_query_params) txn.execute(signature_sql, signature_query_params)
rows = self.cursor_to_dict(txn) rows = self.cursor_to_dict(txn)

View file

@ -713,9 +713,7 @@ class EventsStore(
metadata_json = encode_json(event.internal_metadata.get_dict()) metadata_json = encode_json(event.internal_metadata.get_dict())
sql = ( sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
"UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
)
txn.execute(sql, (metadata_json, event.event_id)) txn.execute(sql, (metadata_json, event.event_id))
# Add an entry to the ex_outlier_stream table to replicate the # Add an entry to the ex_outlier_stream table to replicate the
@ -732,7 +730,7 @@ class EventsStore(
}, },
) )
sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?" sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id)) txn.execute(sql, (False, event.event_id))
# Update the event_backward_extremities table now that this # Update the event_backward_extremities table now that this
@ -1375,6 +1373,10 @@ class EventsStore(
if True, we will delete local events as well as remote ones if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their (instead of just marking them as outliers and deleting their
state groups). state groups).
Returns:
Deferred[set[int]]: The set of state groups that are referenced by
deleted events.
""" """
return self.runInteraction( return self.runInteraction(
@ -1475,7 +1477,7 @@ class EventsStore(
# We do joins against events_to_purge for e.g. calculating state # We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index. # groups to purge, etc., so lets make an index.
txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)") txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
txn.execute("SELECT event_id, should_delete FROM events_to_purge") txn.execute("SELECT event_id, should_delete FROM events_to_purge")
event_rows = txn.fetchall() event_rows = txn.fetchall()
@ -1511,11 +1513,10 @@ class EventsStore(
[(room_id, event_id) for event_id, in new_backwards_extrems], [(room_id, event_id) for event_id, in new_backwards_extrems],
) )
logger.info("[purge] finding redundant state groups") logger.info("[purge] finding state groups referenced by deleted events")
# Get all state groups that are referenced by events that are to be # Get all state groups that are referenced by events that are to be
# deleted. We then go and check if they are referenced by other events # deleted.
# or state groups, and if not we delete them.
txn.execute( txn.execute(
""" """
SELECT DISTINCT state_group FROM events_to_purge SELECT DISTINCT state_group FROM events_to_purge
@ -1528,60 +1529,6 @@ class EventsStore(
"[purge] found %i referenced state groups", len(referenced_state_groups) "[purge] found %i referenced state groups", len(referenced_state_groups)
) )
logger.info("[purge] finding state groups that can be deleted")
_ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
state_groups_to_delete, remaining_state_groups = _
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
logger.info("[purge] removing events from event_to_state_groups") logger.info("[purge] removing events from event_to_state_groups")
txn.execute( txn.execute(
"DELETE FROM event_to_state_groups " "DELETE FROM event_to_state_groups "
@ -1668,138 +1615,35 @@ class EventsStore(
logger.info("[purge] done") logger.info("[purge] done")
def _find_unreferenced_groups_during_purge(self, txn, state_groups): return referenced_state_groups
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).
Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}
# Set of events that we have found to be referenced by events
referenced_groups = set()
# Set of state groups we've already seen
state_groups_seen = set(state_groups)
# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search
# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE ep.event_id IS NULL AND
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "state_group", current_search
)
txn.execute(sql + clause, list(args))
referenced = set(sg for sg, in txn)
referenced_groups |= referenced
# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group"),
)
prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs
for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]
to_delete = state_groups_seen - referenced_groups
to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)
return to_delete, to_dedelta
def purge_room(self, room_id): def purge_room(self, room_id):
"""Deletes all record of a room """Deletes all record of a room
Args: Args:
room_id (str): room_id (str)
Returns:
Deferred[List[int]]: The list of state groups to delete.
""" """
return self.runInteraction("purge_room", self._purge_room_txn, room_id) return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id): def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states # First we fetch all the state groups that should be deleted, before
logger.info("[purge] removing %s from state_groups_state", room_id) # we delete that information.
txn.execute( txn.execute(
""" """
DELETE FROM state_groups_state WHERE state_group IN ( SELECT DISTINCT state_group FROM events
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) INNER JOIN event_to_state_groups USING(event_id)
WHERE events.room_id = ? WHERE events.room_id = ?
)
""", """,
(room_id,), (room_id,),
) )
# ... and the state group edges state_groups = [row[0] for row in txn]
logger.info("[purge] removing %s from state_group_edges", room_id)
txn.execute( # Now we delete tables which lack an index on room_id but have one on event_id
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# and then tables which lack an index on room_id but have one on event_id
for table in ( for table in (
"event_auth", "event_auth",
"event_edges", "event_edges",
@ -1887,6 +1731,165 @@ class EventsStore(
logger.info("[purge] done") logger.info("[purge] done")
return state_groups
def purge_unreferenced_state_groups(
self, room_id: str, state_groups_to_delete
) -> defer.Deferred:
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.
Args:
room_id: The room the state groups belong to (must all be in the
same room).
state_groups_to_delete (Collection[int]): Set of all state groups
to delete.
"""
return self.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
state_groups_to_delete,
)
def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=state_groups_to_delete,
keyvalues={},
retcols=("state_group",),
)
remaining_state_groups = set(
row["state_group"]
for row in rows
if row["state_group"] not in state_groups_to_delete
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
@defer.inlineCallbacks
def get_previous_state_groups(self, state_groups):
"""Fetch the previous groups of the given state groups.
Args:
state_groups (Iterable[int])
Returns:
Deferred[dict[int, int]]: mapping from state group to previous
state group.
"""
rows = yield self._simple_select_many_batch(
table="state_group_edges",
column="prev_state_group",
iterable=state_groups,
keyvalues={},
retcols=("prev_state_group", "state_group"),
desc="get_previous_state_groups",
)
return {row["state_group"]: row["prev_state_group"] for row in rows}
def purge_room_state(self, room_id, state_groups_to_delete):
"""Deletes all record of a room from state tables
Args:
room_id (str):
state_groups_to_delete (list[int]): State groups to delete
"""
return self.runInteraction(
"purge_room_state",
self._purge_room_state_txn,
room_id,
state_groups_to_delete,
)
def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
self._simple_delete_many_txn(
txn,
table="state_groups_state",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
self._simple_delete_many_txn(
txn,
table="state_group_edges",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
self._simple_delete_many_txn(
txn,
table="state_groups",
column="id",
iterable=state_groups_to_delete,
keyvalues={},
)
async def is_event_after(self, event_id1, event_id2): async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream """Returns True if event_id1 is after event_id2 in the stream
""" """

View file

@ -21,6 +21,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventContentFields
from synapse.storage._base import make_in_list_sql_clause from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.background_updates import BackgroundUpdateStore
@ -85,6 +86,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
"event_fix_redactions_bytes", self._event_fix_redactions_bytes "event_fix_redactions_bytes", self._event_fix_redactions_bytes
) )
self.register_background_update_handler(
"event_store_labels", self._event_store_labels
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size): def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"] target_min_stream_id = progress["target_min_stream_id_inclusive"]
@ -503,3 +508,68 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
yield self._end_background_update("event_fix_redactions_bytes") yield self._end_background_update("event_fix_redactions_bytes")
return 1 return 1
@defer.inlineCallbacks
def _event_store_labels(self, progress, batch_size):
"""Background update handler which will store labels for existing events."""
last_event_id = progress.get("last_event_id", "")
def _event_store_labels_txn(txn):
txn.execute(
"""
SELECT event_id, json FROM event_json
LEFT JOIN event_labels USING (event_id)
WHERE event_id > ? AND label IS NULL
ORDER BY event_id LIMIT ?
""",
(last_event_id, batch_size),
)
results = list(txn)
nbrows = 0
last_row_event_id = ""
for (event_id, event_json_raw) in results:
try:
event_json = json.loads(event_json_raw)
self._simple_insert_many_txn(
txn=txn,
table="event_labels",
values=[
{
"event_id": event_id,
"label": label,
"room_id": event_json["room_id"],
"topological_ordering": event_json["depth"],
}
for label in event_json["content"].get(
EventContentFields.LABELS, []
)
if isinstance(label, str)
],
)
except Exception as e:
logger.warning(
"Unable to load event %s (no labels will be imported): %s",
event_id,
e,
)
nbrows += 1
last_row_event_id = event_id
self._background_update_progress_txn(
txn, "event_store_labels", {"last_event_id": last_row_event_id}
)
return nbrows
num_rows = yield self.runInteraction(
desc="event_store_labels", func=_event_store_labels_txn
)
if not num_rows:
yield self._end_background_update("event_store_labels")
return num_rows

View file

@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
if filter_id_response is not None: if filter_id_response is not None:
return filter_id_response[0] return filter_id_response[0]
sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?" sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
txn.execute(sql, (user_localpart,)) txn.execute(sql, (user_localpart,))
max_id = txn.fetchone()[0] max_id = txn.fetchone()[0]
if max_id is None: if max_id is None:

View file

@ -553,6 +553,21 @@ class GroupServerStore(SQLBaseStore):
desc="remove_user_from_summary", desc="remove_user_from_summary",
) )
def get_local_groups_for_room(self, room_id):
"""Get all of the local group that contain a given room
Args:
room_id (str): The ID of a room
Returns:
Deferred[list[str]]: A twisted.Deferred containing a list of group ids
containing this room
"""
return self._simple_select_onecol(
table="group_rooms",
keyvalues={"room_id": room_id},
retcol="group_id",
desc="get_local_groups_for_room",
)
def get_users_for_summary_by_role(self, group_id, include_private=False): def get_users_for_summary_by_role(self, group_id, include_private=False):
"""Get the users and roles that should be included in a summary request """Get the users and roles that should be included in a summary request

View file

@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
if len(media_ids) == 0: if len(media_ids) == 0:
return return
sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?" sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
def _delete_url_cache_txn(txn): def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids]) txn.executemany(sql, [(media_id,) for media_id in media_ids])
@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
return return
def _delete_url_cache_media_txn(txn): def _delete_url_cache_media_txn(txn):
sql = "DELETE FROM local_media_repository" " WHERE media_id = ?" sql = "DELETE FROM local_media_repository WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids]) txn.executemany(sql, [(media_id,) for media_id in media_ids])
sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?" sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids]) txn.executemany(sql, [(media_id,) for media_id in media_ids])

View file

@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
args.append(limit) args.append(limit)
txn.execute(sql, args) txn.execute(sql, args)
return (r[0:5] + (json.loads(r[5]),) for r in txn) return list(r[0:5] + (json.loads(r[5]),) for r in txn)
return self.runInteraction( return self.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn "get_all_updated_receipts", get_all_updated_receipts_txn

Some files were not shown because too many files have changed in this diff Show more