0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-05-17 11:03:46 +02:00

Merge remote-tracking branch 'upstream/release-v1.105'

This commit is contained in:
Tulir Asokan 2024-04-11 14:56:20 +03:00
commit 15947bbd71
30 changed files with 1105 additions and 362 deletions

View file

@ -1,3 +1,35 @@
# Synapse 1.105.0rc1 (2024-04-11)
### Features
- Stabilize support for [MSC4010](https://github.com/matrix-org/matrix-spec-proposals/pull/4010) which clarifies the interaction of push rules and account data. Contributed by @clokep. ([\#17022](https://github.com/element-hq/synapse/issues/17022))
- Stabilize support for [MSC3981](https://github.com/matrix-org/matrix-spec-proposals/pull/3981): `/relations` recursion. Contributed by @clokep. ([\#17023](https://github.com/element-hq/synapse/issues/17023))
- Add support for moving `/pushrules` off of main process. ([\#17037](https://github.com/element-hq/synapse/issues/17037), [\#17038](https://github.com/element-hq/synapse/issues/17038))
### Bugfixes
- Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. ([\#16930](https://github.com/element-hq/synapse/issues/16930), [\#16932](https://github.com/element-hq/synapse/issues/16932), [\#16942](https://github.com/element-hq/synapse/issues/16942), [\#17064](https://github.com/element-hq/synapse/issues/17064), [\#17065](https://github.com/element-hq/synapse/issues/17065), [\#17066](https://github.com/element-hq/synapse/issues/17066))
- Fix server notice rooms not always being created as unencrypted rooms, even when `encryption_enabled_by_default_for_room_type` is in use (server notices are always unencrypted). ([\#17033](https://github.com/element-hq/synapse/issues/17033))
- Fix the `.m.rule.encrypted_room_one_to_one` and `.m.rule.room_one_to_one` default underride push rules being in the wrong order. Contributed by @Sumpy1. ([\#17043](https://github.com/element-hq/synapse/issues/17043))
### Internal Changes
- Refactor auth chain fetching to reduce duplication. ([\#17044](https://github.com/element-hq/synapse/issues/17044))
- Improve database performance by adding a missing index to `access_tokens.refresh_token_id`. ([\#17045](https://github.com/element-hq/synapse/issues/17045), [\#17054](https://github.com/element-hq/synapse/issues/17054))
- Improve database performance by reducing number of receipts fetched when sending push notifications. ([\#17049](https://github.com/element-hq/synapse/issues/17049))
### Updates to locked dependencies
* Bump packaging from 23.2 to 24.0. ([\#17027](https://github.com/element-hq/synapse/issues/17027))
* Bump regex from 1.10.3 to 1.10.4. ([\#17028](https://github.com/element-hq/synapse/issues/17028))
* Bump ruff from 0.3.2 to 0.3.5. ([\#17060](https://github.com/element-hq/synapse/issues/17060))
* Bump serde_json from 1.0.114 to 1.0.115. ([\#17041](https://github.com/element-hq/synapse/issues/17041))
* Bump types-pillow from 10.2.0.20240125 to 10.2.0.20240406. ([\#17061](https://github.com/element-hq/synapse/issues/17061))
* Bump types-requests from 2.31.0.20240125 to 2.31.0.20240406. ([\#17063](https://github.com/element-hq/synapse/issues/17063))
* Bump typing-extensions from 4.9.0 to 4.11.0. ([\#17062](https://github.com/element-hq/synapse/issues/17062))
# Synapse 1.104.0 (2024-04-02) # Synapse 1.104.0 (2024-04-02)
### Bugfixes ### Bugfixes

8
Cargo.lock generated
View file

@ -306,9 +306,9 @@ dependencies = [
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.10.3" version = "1.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -367,9 +367,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.114" version = "1.0.115"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.105.0~rc1) stable; urgency=medium
* New Synapse release 1.105.0rc1.
-- Synapse Packaging team <packages@matrix.org> Thu, 11 Apr 2024 12:15:49 +0100
matrix-synapse-py3 (1.104.0) stable; urgency=medium matrix-synapse-py3 (1.104.0) stable; urgency=medium
* New Synapse release 1.104.0. * New Synapse release 1.104.0.

View file

@ -310,6 +310,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"shared_extra_conf": {}, "shared_extra_conf": {},
"worker_extra_conf": "", "worker_extra_conf": "",
}, },
"push_rules": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
} }
# Templates for sections that may be inserted multiple times in config files # Templates for sections that may be inserted multiple times in config files
@ -401,6 +408,7 @@ def add_worker_roles_to_shared_config(
"receipts", "receipts",
"to_device", "to_device",
"typing", "typing",
"push_rules",
] ]
# Worker-type specific sharding config. Now a single worker can fulfill multiple # Worker-type specific sharding config. Now a single worker can fulfill multiple

View file

@ -532,6 +532,13 @@ the stream writer for the `presence` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
##### The `push_rules` stream
The following endpoints should be routed directly to the worker configured as
the stream writer for the `push` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
#### Restrict outbound federation traffic to a specific set of workers #### Restrict outbound federation traffic to a specific set of workers
The The

64
poetry.lock generated
View file

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. # This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]] [[package]]
name = "alabaster" name = "alabaster"
@ -1602,13 +1602,13 @@ tests = ["Sphinx", "doubles", "flake8", "flake8-quotes", "gevent", "mock", "pyte
[[package]] [[package]]
name = "packaging" name = "packaging"
version = "23.2" version = "24.0"
description = "Core utilities for Python packages" description = "Core utilities for Python packages"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
files = [ files = [
{file = "packaging-23.2-py3-none-any.whl", hash = "sha256:8c491190033a9af7e1d931d0b5dacc2ef47509b34dd0de67ed209b5203fc88c7"}, {file = "packaging-24.0-py3-none-any.whl", hash = "sha256:2ddfb553fdf02fb784c234c7ba6ccc288296ceabec964ad2eae3777778130bc5"},
{file = "packaging-23.2.tar.gz", hash = "sha256:048fb0e9405036518eaaf48a55953c750c11e1a1b68e0dd1a9d62ed0c092cfc5"}, {file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"},
] ]
[[package]] [[package]]
@ -2444,28 +2444,28 @@ files = [
[[package]] [[package]]
name = "ruff" name = "ruff"
version = "0.3.2" version = "0.3.5"
description = "An extremely fast Python linter and code formatter, written in Rust." description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
files = [ files = [
{file = "ruff-0.3.2-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:77f2612752e25f730da7421ca5e3147b213dca4f9a0f7e0b534e9562c5441f01"}, {file = "ruff-0.3.5-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:aef5bd3b89e657007e1be6b16553c8813b221ff6d92c7526b7e0227450981eac"},
{file = "ruff-0.3.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:9966b964b2dd1107797be9ca7195002b874424d1d5472097701ae8f43eadef5d"}, {file = "ruff-0.3.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:89b1e92b3bd9fca249153a97d23f29bed3992cff414b222fcd361d763fc53f12"},
{file = "ruff-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b83d17ff166aa0659d1e1deaf9f2f14cbe387293a906de09bc4860717eb2e2da"}, {file = "ruff-0.3.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5e55771559c89272c3ebab23326dc23e7f813e492052391fe7950c1a5a139d89"},
{file = "ruff-0.3.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:bb875c6cc87b3703aeda85f01c9aebdce3d217aeaca3c2e52e38077383f7268a"}, {file = "ruff-0.3.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:dabc62195bf54b8a7876add6e789caae0268f34582333cda340497c886111c39"},
{file = "ruff-0.3.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:be75e468a6a86426430373d81c041b7605137a28f7014a72d2fc749e47f572aa"}, {file = "ruff-0.3.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a05f3793ba25f194f395578579c546ca5d83e0195f992edc32e5907d142bfa3"},
{file = "ruff-0.3.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:967978ac2d4506255e2f52afe70dda023fc602b283e97685c8447d036863a302"}, {file = "ruff-0.3.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:dfd3504e881082959b4160ab02f7a205f0fadc0a9619cc481982b6837b2fd4c0"},
{file = "ruff-0.3.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1231eacd4510f73222940727ac927bc5d07667a86b0cbe822024dd00343e77e9"}, {file = "ruff-0.3.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:87258e0d4b04046cf1d6cc1c56fadbf7a880cc3de1f7294938e923234cf9e498"},
{file = "ruff-0.3.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2c6d613b19e9a8021be2ee1d0e27710208d1603b56f47203d0abbde906929a9b"}, {file = "ruff-0.3.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:712e71283fc7d9f95047ed5f793bc019b0b0a29849b14664a60fd66c23b96da1"},
{file = "ruff-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c8439338a6303585d27b66b4626cbde89bb3e50fa3cae86ce52c1db7449330a7"}, {file = "ruff-0.3.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a532a90b4a18d3f722c124c513ffb5e5eaff0cc4f6d3aa4bda38e691b8600c9f"},
{file = "ruff-0.3.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:de8b480d8379620cbb5ea466a9e53bb467d2fb07c7eca54a4aa8576483c35d36"}, {file = "ruff-0.3.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:122de171a147c76ada00f76df533b54676f6e321e61bd8656ae54be326c10296"},
{file = "ruff-0.3.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:b74c3de9103bd35df2bb05d8b2899bf2dbe4efda6474ea9681280648ec4d237d"}, {file = "ruff-0.3.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d80a6b18a6c3b6ed25b71b05eba183f37d9bc8b16ace9e3d700997f00b74660b"},
{file = "ruff-0.3.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f380be9fc15a99765c9cf316b40b9da1f6ad2ab9639e551703e581a5e6da6745"}, {file = "ruff-0.3.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:a7b6e63194c68bca8e71f81de30cfa6f58ff70393cf45aab4c20f158227d5936"},
{file = "ruff-0.3.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:0ac06a3759c3ab9ef86bbeca665d31ad3aa9a4b1c17684aadb7e61c10baa0df4"}, {file = "ruff-0.3.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a759d33a20c72f2dfa54dae6e85e1225b8e302e8ac655773aff22e542a300985"},
{file = "ruff-0.3.2-py3-none-win32.whl", hash = "sha256:9bd640a8f7dd07a0b6901fcebccedadeb1a705a50350fb86b4003b805c81385a"}, {file = "ruff-0.3.5-py3-none-win32.whl", hash = "sha256:9d8605aa990045517c911726d21293ef4baa64f87265896e491a05461cae078d"},
{file = "ruff-0.3.2-py3-none-win_amd64.whl", hash = "sha256:0c1bdd9920cab5707c26c8b3bf33a064a4ca7842d91a99ec0634fec68f9f4037"}, {file = "ruff-0.3.5-py3-none-win_amd64.whl", hash = "sha256:dc56bb16a63c1303bd47563c60482a1512721053d93231cf7e9e1c6954395a0e"},
{file = "ruff-0.3.2-py3-none-win_arm64.whl", hash = "sha256:5f65103b1d76e0d600cabd577b04179ff592064eaa451a70a81085930e907d0b"}, {file = "ruff-0.3.5-py3-none-win_arm64.whl", hash = "sha256:faeeae9905446b975dcf6d4499dc93439b131f1443ee264055c5716dd947af55"},
{file = "ruff-0.3.2.tar.gz", hash = "sha256:fa78ec9418eb1ca3db392811df3376b46471ae93792a81af2d1cbb0e5dcb5142"}, {file = "ruff-0.3.5.tar.gz", hash = "sha256:a067daaeb1dc2baf9b82a32dae67d154d95212080c80435eb052d95da647763d"},
] ]
[[package]] [[package]]
@ -3109,13 +3109,13 @@ files = [
[[package]] [[package]]
name = "types-pillow" name = "types-pillow"
version = "10.2.0.20240125" version = "10.2.0.20240406"
description = "Typing stubs for Pillow" description = "Typing stubs for Pillow"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
files = [ files = [
{file = "types-Pillow-10.2.0.20240125.tar.gz", hash = "sha256:c449b2c43b9fdbe0494a7b950e6b39a4e50516091213fec24ef3f33c1d017717"}, {file = "types-Pillow-10.2.0.20240406.tar.gz", hash = "sha256:62e0cc1f17caba40e72e7154a483f4c7f3bea0e1c34c0ebba9de3c7745bc306d"},
{file = "types_Pillow-10.2.0.20240125-py3-none-any.whl", hash = "sha256:322dbae32b4b7918da5e8a47c50ac0f24b0aa72a804a23857620f2722b03c858"}, {file = "types_Pillow-10.2.0.20240406-py3-none-any.whl", hash = "sha256:5ac182e8afce53de30abca2fdf9cbec7b2500e549d0be84da035a729a84c7c47"},
] ]
[[package]] [[package]]
@ -3156,13 +3156,13 @@ files = [
[[package]] [[package]]
name = "types-requests" name = "types-requests"
version = "2.31.0.20240125" version = "2.31.0.20240406"
description = "Typing stubs for requests" description = "Typing stubs for requests"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
files = [ files = [
{file = "types-requests-2.31.0.20240125.tar.gz", hash = "sha256:03a28ce1d7cd54199148e043b2079cdded22d6795d19a2c2a6791a4b2b5e2eb5"}, {file = "types-requests-2.31.0.20240406.tar.gz", hash = "sha256:4428df33c5503945c74b3f42e82b181e86ec7b724620419a2966e2de604ce1a1"},
{file = "types_requests-2.31.0.20240125-py3-none-any.whl", hash = "sha256:9592a9a4cb92d6d75d9b491a41477272b710e021011a2a3061157e2fb1f1a5d1"}, {file = "types_requests-2.31.0.20240406-py3-none-any.whl", hash = "sha256:6216cdac377c6b9a040ac1c0404f7284bd13199c0e1bb235f4324627e8898cf5"},
] ]
[package.dependencies] [package.dependencies]
@ -3181,13 +3181,13 @@ files = [
[[package]] [[package]]
name = "typing-extensions" name = "typing-extensions"
version = "4.9.0" version = "4.11.0"
description = "Backported and Experimental Type Hints for Python 3.8+" description = "Backported and Experimental Type Hints for Python 3.8+"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
files = [ files = [
{file = "typing_extensions-4.9.0-py3-none-any.whl", hash = "sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd"}, {file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"},
{file = "typing_extensions-4.9.0.tar.gz", hash = "sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783"}, {file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"},
] ]
[[package]] [[package]]
@ -3451,4 +3451,4 @@ user-search = ["pyicu"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.8.0" python-versions = "^3.8.0"
content-hash = "b510fa05f4ea33194bec079f5d04ebb3f9ffbb5c1ea96a0341d57ba770ef81e6" content-hash = "4abda113a01f162bb3978b0372956d569364533aa39f57863c234363f8449a4f"

View file

@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry] [tool.poetry]
name = "matrix-synapse" name = "matrix-synapse"
version = "1.104.0" version = "1.105.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol" description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later" license = "AGPL-3.0-or-later"
@ -321,7 +321,7 @@ all = [
# This helps prevents merge conflicts when running a batch of dependabot updates. # This helps prevents merge conflicts when running a batch of dependabot updates.
isort = ">=5.10.1" isort = ">=5.10.1"
black = ">=22.7.0" black = ">=22.7.0"
ruff = "0.3.2" ruff = "0.3.5"
# Type checking only works with the pydantic.v1 compat module from pydantic v2 # Type checking only works with the pydantic.v1 compat module from pydantic v2
pydantic = "^2" pydantic = "^2"

View file

@ -641,9 +641,9 @@ msgpack==1.0.7 ; python_version >= "3.8" and python_full_version < "4.0.0" \
netaddr==1.2.1 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \ netaddr==1.2.1 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:6eb8fedf0412c6d294d06885c110de945cf4d22d2b510d0404f4e06950857987 \ --hash=sha256:6eb8fedf0412c6d294d06885c110de945cf4d22d2b510d0404f4e06950857987 \
--hash=sha256:bd9e9534b0d46af328cf64f0e5a23a5a43fca292df221c85580b27394793496e --hash=sha256:bd9e9534b0d46af328cf64f0e5a23a5a43fca292df221c85580b27394793496e
packaging==23.2 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \ packaging==24.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:048fb0e9405036518eaaf48a55953c750c11e1a1b68e0dd1a9d62ed0c092cfc5 \ --hash=sha256:2ddfb553fdf02fb784c234c7ba6ccc288296ceabec964ad2eae3777778130bc5 \
--hash=sha256:8c491190033a9af7e1d931d0b5dacc2ef47509b34dd0de67ed209b5203fc88c7 --hash=sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9
parameterized==0.9.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \ parameterized==0.9.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:4e0758e3d41bea3bbd05ec14fc2c24736723f243b28d702081aef438c9372b1b \ --hash=sha256:4e0758e3d41bea3bbd05ec14fc2c24736723f243b28d702081aef438c9372b1b \
--hash=sha256:7fc905272cefa4f364c1a3429cbbe9c0f98b793988efb5bf90aac80f08db09b1 --hash=sha256:7fc905272cefa4f364c1a3429cbbe9c0f98b793988efb5bf90aac80f08db09b1
@ -1057,9 +1057,9 @@ twisted[tls]==23.10.0 ; python_full_version >= "3.8.0" and python_full_version <
txredisapi==1.4.10 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \ txredisapi==1.4.10 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:0a6ea77f27f8cf092f907654f08302a97b48fa35f24e0ad99dfb74115f018161 \ --hash=sha256:0a6ea77f27f8cf092f907654f08302a97b48fa35f24e0ad99dfb74115f018161 \
--hash=sha256:7609a6af6ff4619a3189c0adfb86aeda789afba69eb59fc1e19ac0199e725395 --hash=sha256:7609a6af6ff4619a3189c0adfb86aeda789afba69eb59fc1e19ac0199e725395
typing-extensions==4.9.0 ; python_version >= "3.8" and python_full_version < "4.0.0" \ typing-extensions==4.11.0 ; python_version >= "3.8" and python_full_version < "4.0.0" \
--hash=sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783 \ --hash=sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0 \
--hash=sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd --hash=sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a
unpaddedbase64==2.1.0 ; python_full_version >= "3.8.0" and python_version < "4.0" \ unpaddedbase64==2.1.0 ; python_full_version >= "3.8.0" and python_version < "4.0" \
--hash=sha256:485eff129c30175d2cd6f0cd8d2310dff51e666f7f36175f738d75dfdbd0b1c6 \ --hash=sha256:485eff129c30175d2cd6f0cd8d2310dff51e666f7f36175f738d75dfdbd0b1c6 \
--hash=sha256:7273c60c089de39d90f5d6d4a7883a79e319dc9d9b1c8924a7fab96178a5f005 --hash=sha256:7273c60c089de39d90f5d6d4a7883a79e319dc9d9b1c8924a7fab96178a5f005

View file

@ -304,12 +304,12 @@ pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
default_enabled: true, default_enabled: true,
}, },
PushRule { PushRule {
rule_id: Cow::Borrowed("global/underride/.m.rule.room_one_to_one"), rule_id: Cow::Borrowed("global/underride/.m.rule.encrypted_room_one_to_one"),
priority_class: 1, priority_class: 1,
conditions: Cow::Borrowed(&[ conditions: Cow::Borrowed(&[
Condition::Known(KnownCondition::EventMatch(EventMatchCondition { Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
key: Cow::Borrowed("type"), key: Cow::Borrowed("type"),
pattern: Cow::Borrowed("m.room.message"), pattern: Cow::Borrowed("m.room.encrypted"),
})), })),
Condition::Known(KnownCondition::RoomMemberCount { Condition::Known(KnownCondition::RoomMemberCount {
is: Some(Cow::Borrowed("2")), is: Some(Cow::Borrowed("2")),
@ -320,12 +320,12 @@ pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
default_enabled: true, default_enabled: true,
}, },
PushRule { PushRule {
rule_id: Cow::Borrowed("global/underride/.m.rule.encrypted_room_one_to_one"), rule_id: Cow::Borrowed("global/underride/.m.rule.room_one_to_one"),
priority_class: 1, priority_class: 1,
conditions: Cow::Borrowed(&[ conditions: Cow::Borrowed(&[
Condition::Known(KnownCondition::EventMatch(EventMatchCondition { Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
key: Cow::Borrowed("type"), key: Cow::Borrowed("type"),
pattern: Cow::Borrowed("m.room.encrypted"), pattern: Cow::Borrowed("m.room.message"),
})), })),
Condition::Known(KnownCondition::RoomMemberCount { Condition::Known(KnownCondition::RoomMemberCount {
is: Some(Cow::Borrowed("2")), is: Some(Cow::Borrowed("2")),

View file

@ -60,7 +60,7 @@ from synapse.logging.context import (
) )
from synapse.notifier import ReplicationNotifier from synapse.notifier import ReplicationNotifier
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.databases.main import FilteringWorkerStore, PushRuleStore from synapse.storage.databases.main import FilteringWorkerStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore from synapse.storage.databases.main.account_data import AccountDataWorkerStore
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
@ -77,10 +77,8 @@ from synapse.storage.databases.main.media_repository import (
) )
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.profile import ProfileWorkerStore from synapse.storage.databases.main.profile import ProfileWorkerStore
from synapse.storage.databases.main.pusher import ( from synapse.storage.databases.main.push_rule import PusherWorkerStore
PusherBackgroundUpdatesStore, from synapse.storage.databases.main.pusher import PusherBackgroundUpdatesStore
PusherWorkerStore,
)
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import ( from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore, RegistrationBackgroundUpdateStore,
@ -245,7 +243,6 @@ class Store(
AccountDataWorkerStore, AccountDataWorkerStore,
FilteringWorkerStore, FilteringWorkerStore,
ProfileWorkerStore, ProfileWorkerStore,
PushRuleStore,
PusherWorkerStore, PusherWorkerStore,
PusherBackgroundUpdatesStore, PusherBackgroundUpdatesStore,
PresenceBackgroundUpdateStore, PresenceBackgroundUpdateStore,

View file

@ -393,11 +393,6 @@ class ExperimentalConfig(Config):
# MSC3967: Do not require UIA when first uploading cross signing keys # MSC3967: Do not require UIA when first uploading cross signing keys
self.msc3967_enabled = experimental.get("msc3967_enabled", False) self.msc3967_enabled = experimental.get("msc3967_enabled", False)
# MSC3981: Recurse relations
self.msc3981_recurse_relations = experimental.get(
"msc3981_recurse_relations", False
)
# MSC3861: Matrix architecture change to delegate authentication via OIDC # MSC3861: Matrix architecture change to delegate authentication via OIDC
try: try:
self.msc3861 = MSC3861(**experimental.get("msc3861", {})) self.msc3861 = MSC3861(**experimental.get("msc3861", {}))
@ -409,11 +404,6 @@ class ExperimentalConfig(Config):
# Check that none of the other config options conflict with MSC3861 when enabled # Check that none of the other config options conflict with MSC3861 when enabled
self.msc3861.check_config_conflicts(self.root) self.msc3861.check_config_conflicts(self.root)
# MSC4010: Do not allow setting m.push_rules account data.
self.msc4010_push_rules_account_data = experimental.get(
"msc4010_push_rules_account_data", False
)
self.msc4028_push_encrypted_events = experimental.get( self.msc4028_push_encrypted_events = experimental.get(
"msc4028_push_encrypted_events", False "msc4028_push_encrypted_events", False
) )

View file

@ -156,6 +156,8 @@ class WriterLocations:
can only be a single instance. can only be a single instance.
presence: The instances that write to the presence stream. Currently presence: The instances that write to the presence stream. Currently
can only be a single instance. can only be a single instance.
push_rules: The instances that write to the push stream. Currently
can only be a single instance.
""" """
events: List[str] = attr.ib( events: List[str] = attr.ib(
@ -182,6 +184,10 @@ class WriterLocations:
default=["master"], default=["master"],
converter=_instance_to_list_converter, converter=_instance_to_list_converter,
) )
push_rules: List[str] = attr.ib(
default=["master"],
converter=_instance_to_list_converter,
)
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
@ -341,6 +347,7 @@ class WorkerConfig(Config):
"account_data", "account_data",
"receipts", "receipts",
"presence", "presence",
"push_rules",
): ):
instances = _instance_to_list_converter(getattr(self.writers, stream)) instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances: for instance in instances:
@ -378,6 +385,11 @@ class WorkerConfig(Config):
"Must only specify one instance to handle `presence` messages." "Must only specify one instance to handle `presence` messages."
) )
if len(self.writers.push_rules) != 1:
raise ConfigError(
"Must only specify one instance to handle `push` messages."
)
self.events_shard_config = RoutableShardedWorkerHandlingConfig( self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events self.writers.events
) )

View file

@ -968,6 +968,7 @@ class RoomCreationHandler:
room_alias=room_alias, room_alias=room_alias,
power_level_content_override=power_level_content_override, power_level_content_override=power_level_content_override,
creator_join_profile=creator_join_profile, creator_join_profile=creator_join_profile,
ignore_forced_encryption=ignore_forced_encryption,
) )
# we avoid dropping the lock between invites, as otherwise joins can # we avoid dropping the lock between invites, as otherwise joins can

View file

@ -51,6 +51,7 @@ from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing from synapse.logging import opentracing
from synapse.metrics import event_processing_positions from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.push import ReplicationCopyPusherRestServlet
from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import ( from synapse.types import (
JsonDict, JsonDict,
@ -181,6 +182,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
hs.config.server.forgotten_room_retention_period hs.config.server.forgotten_room_retention_period
) )
self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_writer = hs.config.worker.writers.push_rules[0]
self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs)
def _on_user_joined_room(self, event_id: str, room_id: str) -> None: def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred. """Notify the rate limiter that a room join has occurred.
@ -1281,9 +1288,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
old_room_id, new_room_id, user_id old_room_id, new_room_id, user_id
) )
# Copy over push rules # Copy over push rules
await self.store.copy_push_rules_from_room_to_room_for_user( if self._is_push_writer:
old_room_id, new_room_id, user_id await self.store.copy_push_rules_from_room_to_room_for_user(
) old_room_id, new_room_id, user_id
)
else:
await self._copy_push_client(
instance_name=self._push_writer,
user_id=user_id,
old_room_id=old_room_id,
new_room_id=new_room_id,
)
except Exception: except Exception:
logger.exception( logger.exception(
"Error copying tags and/or push rules from rooms %s to %s for user %s. " "Error copying tags and/or push rules from rooms %s to %s for user %s. "

View file

@ -953,7 +953,7 @@ class SyncHandler:
batch: TimelineBatch, batch: TimelineBatch,
sync_config: SyncConfig, sync_config: SyncConfig,
since_token: Optional[StreamToken], since_token: Optional[StreamToken],
now_token: StreamToken, end_token: StreamToken,
full_state: bool, full_state: bool,
) -> MutableStateMap[EventBase]: ) -> MutableStateMap[EventBase]:
"""Works out the difference in state between the end of the previous sync and """Works out the difference in state between the end of the previous sync and
@ -964,7 +964,9 @@ class SyncHandler:
batch: The timeline batch for the room that will be sent to the user. batch: The timeline batch for the room that will be sent to the user.
sync_config: sync_config:
since_token: Token of the end of the previous batch. May be `None`. since_token: Token of the end of the previous batch. May be `None`.
now_token: Token of the end of the current batch. end_token: Token of the end of the current batch. Normally this will be
the same as the global "now_token", but if the user has left the room,
the point just after their leave event.
full_state: Whether to force returning the full state. full_state: Whether to force returning the full state.
`lazy_load_members` still applies when `full_state` is `True`. `lazy_load_members` still applies when `full_state` is `True`.
@ -1044,7 +1046,7 @@ class SyncHandler:
room_id, room_id,
sync_config.user, sync_config.user,
batch, batch,
now_token, end_token,
members_to_fetch, members_to_fetch,
timeline_state, timeline_state,
) )
@ -1058,7 +1060,7 @@ class SyncHandler:
room_id, room_id,
batch, batch,
since_token, since_token,
now_token, end_token,
members_to_fetch, members_to_fetch,
timeline_state, timeline_state,
) )
@ -1129,7 +1131,7 @@ class SyncHandler:
room_id: str, room_id: str,
syncing_user: UserID, syncing_user: UserID,
batch: TimelineBatch, batch: TimelineBatch,
now_token: StreamToken, end_token: StreamToken,
members_to_fetch: Optional[Set[str]], members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str], timeline_state: StateMap[str],
) -> StateMap[str]: ) -> StateMap[str]:
@ -1142,7 +1144,9 @@ class SyncHandler:
room_id: The room we are calculating for. room_id: The room we are calculating for.
syncing_user: The user that is calling `/sync`. syncing_user: The user that is calling `/sync`.
batch: The timeline batch for the room that will be sent to the user. batch: The timeline batch for the room that will be sent to the user.
now_token: Token of the end of the current batch. end_token: Token of the end of the current batch. Normally this will be
the same as the global "now_token", but if the user has left the room,
the point just after their leave event.
members_to_fetch: If lazy-loading is enabled, the memberships needed for members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline. events in the timeline.
timeline_state: The contribution to the room state from state events in timeline_state: The contribution to the room state from state events in
@ -1182,15 +1186,16 @@ class SyncHandler:
await_full_state = True await_full_state = True
lazy_load_members = False lazy_load_members = False
if batch: state_at_timeline_end = await self.get_state_at(
state_at_timeline_end = ( room_id,
await self._state_storage_controller.get_state_ids_for_event( stream_position=end_token,
batch.events[-1].event_id, state_filter=state_filter,
state_filter=state_filter, await_full_state=await_full_state,
await_full_state=await_full_state, )
)
)
if batch:
# Strictly speaking, this returns the state *after* the first event in the
# timeline, but that is good enough here.
state_at_timeline_start = ( state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event( await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, batch.events[0].event_id,
@ -1199,13 +1204,6 @@ class SyncHandler:
) )
) )
else: else:
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_start = state_at_timeline_end state_at_timeline_start = state_at_timeline_end
state_ids = _calculate_state( state_ids = _calculate_state(
@ -1222,7 +1220,7 @@ class SyncHandler:
room_id: str, room_id: str,
batch: TimelineBatch, batch: TimelineBatch,
since_token: StreamToken, since_token: StreamToken,
now_token: StreamToken, end_token: StreamToken,
members_to_fetch: Optional[Set[str]], members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str], timeline_state: StateMap[str],
) -> StateMap[str]: ) -> StateMap[str]:
@ -1238,7 +1236,9 @@ class SyncHandler:
room_id: The room we are calculating for. room_id: The room we are calculating for.
batch: The timeline batch for the room that will be sent to the user. batch: The timeline batch for the room that will be sent to the user.
since_token: Token of the end of the previous batch. since_token: Token of the end of the previous batch.
now_token: Token of the end of the current batch. end_token: Token of the end of the current batch. Normally this will be
the same as the global "now_token", but if the user has left the room,
the point just after their leave event.
members_to_fetch: If lazy-loading is enabled, the memberships needed for members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline. Otherwise, `None`. events in the timeline. Otherwise, `None`.
timeline_state: The contribution to the room state from state events in timeline_state: The contribution to the room state from state events in
@ -1258,25 +1258,70 @@ class SyncHandler:
await_full_state = True await_full_state = True
lazy_load_members = False lazy_load_members = False
if batch.limited: # For a non-gappy sync if the events in the timeline are simply a linear
if batch: # chain (i.e. no merging/branching of the graph), then we know the state
state_at_timeline_start = ( # delta between the end of the previous sync and start of the new one is
await self._state_storage_controller.get_state_ids_for_event( # empty.
batch.events[0].event_id, #
state_filter=state_filter, # c.f. #16941 for an example of why we can't do this for all non-gappy
await_full_state=await_full_state, # syncs.
) is_linear_timeline = True
) if batch.events:
else: # We need to make sure the first event in our batch points to the
# We can get here if the user has ignored the senders of all # last event in the previous batch.
# the recent events. last_event_id_prev_batch = (
state_at_timeline_start = await self.get_state_at( await self.store.get_last_event_in_room_before_stream_ordering(
room_id, room_id,
stream_position=now_token, end_token=since_token.room_key,
)
)
prev_event_id = last_event_id_prev_batch
for e in batch.events:
if e.prev_event_ids() != [prev_event_id]:
is_linear_timeline = False
break
prev_event_id = e.event_id
if is_linear_timeline and not batch.limited:
state_ids: StateMap[str] = {}
if lazy_load_members:
if members_to_fetch and batch.events:
# We're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here. The caller will then dedupe any redundant
# ones.
state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
),
await_full_state=False,
)
return state_ids
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter, state_filter=state_filter,
await_full_state=await_full_state, await_full_state=await_full_state,
) )
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
if batch.limited:
# for now, we disable LL for gappy syncs - see # for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346 # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way # N.B. this slows down incr syncs as we are now processing way
@ -1291,58 +1336,28 @@ class SyncHandler:
# about them). # about them).
state_filter = StateFilter.all() state_filter = StateFilter.all()
state_at_previous_sync = await self.get_state_at( state_at_previous_sync = await self.get_state_at(
room_id, room_id,
stream_position=since_token, stream_position=since_token,
state_filter=state_filter, state_filter=state_filter,
await_full_state=await_full_state, await_full_state=await_full_state,
) )
if batch: state_at_timeline_end = await self.get_state_at(
state_at_timeline_end = ( room_id,
await self._state_storage_controller.get_state_ids_for_event( stream_position=end_token,
batch.events[-1].event_id, state_filter=state_filter,
state_filter=state_filter, await_full_state=await_full_state,
await_full_state=await_full_state, )
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_ids = _calculate_state( state_ids = _calculate_state(
timeline_contains=timeline_state, timeline_contains=timeline_state,
timeline_start=state_at_timeline_start, timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end, timeline_end=state_at_timeline_end,
previous_timeline_end=state_at_previous_sync, previous_timeline_end=state_at_previous_sync,
lazy_load_members=lazy_load_members, lazy_load_members=lazy_load_members,
) )
else:
state_ids = {}
if lazy_load_members:
if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here. The caller will then dedupe any redundant ones.
state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
),
await_full_state=False,
)
return state_ids return state_ids
async def _find_missing_partial_state_memberships( async def _find_missing_partial_state_memberships(
@ -2343,6 +2358,7 @@ class SyncHandler:
full_state=False, full_state=False,
since_token=since_token, since_token=since_token,
upto_token=leave_token, upto_token=leave_token,
end_token=leave_token,
out_of_band=leave_event.internal_metadata.is_out_of_band_membership(), out_of_band=leave_event.internal_metadata.is_out_of_band_membership(),
) )
) )
@ -2380,6 +2396,7 @@ class SyncHandler:
full_state=False, full_state=False,
since_token=None if newly_joined else since_token, since_token=None if newly_joined else since_token,
upto_token=prev_batch_token, upto_token=prev_batch_token,
end_token=now_token,
) )
else: else:
entry = RoomSyncResultBuilder( entry = RoomSyncResultBuilder(
@ -2390,6 +2407,7 @@ class SyncHandler:
full_state=False, full_state=False,
since_token=since_token, since_token=since_token,
upto_token=since_token, upto_token=since_token,
end_token=now_token,
) )
room_entries.append(entry) room_entries.append(entry)
@ -2448,6 +2466,7 @@ class SyncHandler:
full_state=True, full_state=True,
since_token=since_token, since_token=since_token,
upto_token=now_token, upto_token=now_token,
end_token=now_token,
) )
) )
elif event.membership == Membership.INVITE: elif event.membership == Membership.INVITE:
@ -2477,6 +2496,7 @@ class SyncHandler:
full_state=True, full_state=True,
since_token=since_token, since_token=since_token,
upto_token=leave_token, upto_token=leave_token,
end_token=leave_token,
) )
) )
@ -2547,6 +2567,7 @@ class SyncHandler:
{ {
"since_token": since_token, "since_token": since_token,
"upto_token": upto_token, "upto_token": upto_token,
"end_token": room_builder.end_token,
} }
) )
@ -2620,7 +2641,7 @@ class SyncHandler:
batch, batch,
sync_config, sync_config,
since_token, since_token,
now_token, room_builder.end_token,
full_state=full_state, full_state=full_state,
) )
else: else:
@ -2780,6 +2801,61 @@ def _calculate_state(
e for t, e in timeline_start.items() if t[0] == EventTypes.Member e for t, e in timeline_start.items() if t[0] == EventTypes.Member
) )
# Naively, we would just return the difference between the state at the start
# of the timeline (`timeline_start_ids`) and that at the end of the previous sync
# (`previous_timeline_end_ids`). However, that fails in the presence of forks in
# the DAG.
#
# For example, consider a DAG such as the following:
#
# E1
# ↗ ↖
# | S2
# | ↑
# --|------|----
# | |
# E3 |
# ↖ /
# E4
#
# ... and a filter that means we only return 2 events, represented by the dashed
# horizontal line. Assuming S2 was *not* included in the previous sync, we need to
# include it in the `state` section.
#
# Note that the state at the start of the timeline (E3) does not include S2. So,
# to make sure it gets included in the calculation here, we actually look at
# the state at the *end* of the timeline, and subtract any events that are present
# in the timeline.
#
# ----------
#
# Aside 1: You may then wonder if we need to include `timeline_start` in the
# calculation. Consider a linear DAG:
#
# E1
# ↑
# S2
# ↑
# ----|------
# |
# E3
# ↑
# S4
# ↑
# E5
#
# ... where S2 and S4 change the same piece of state; and where we have a filter
# that returns 3 events (E3, S4, E5). We still need to tell the client about S2,
# because it might affect the display of E3. However, the state at the end of the
# timeline only tells us about S4; if we don't inspect `timeline_start` we won't
# find out about S2.
#
# (There are yet more complicated cases in which a state event is excluded from the
# timeline, but whose effect actually lands in the DAG in the *middle* of the
# timeline. We have no way to represent that in the /sync response, and we don't
# even try; it is ether omitted or plonked into `state` as if it were at the start
# of the timeline, depending on what else is in the timeline.)
state_ids = ( state_ids = (
(timeline_end_ids | timeline_start_ids) (timeline_end_ids | timeline_start_ids)
- previous_timeline_end_ids - previous_timeline_end_ids
@ -2882,13 +2958,30 @@ class RoomSyncResultBuilder:
Attributes: Attributes:
room_id room_id
rtype: One of `"joined"` or `"archived"` rtype: One of `"joined"` or `"archived"`
events: List of events to include in the room (more events may be added events: List of events to include in the room (more events may be added
when generating result). when generating result).
newly_joined: If the user has newly joined the room newly_joined: If the user has newly joined the room
full_state: Whether the full state should be sent in result full_state: Whether the full state should be sent in result
since_token: Earliest point to return events from, or None since_token: Earliest point to return events from, or None
upto_token: Latest point to return events from.
upto_token: Latest point to return events from. If `events` is populated,
this is set to the token at the start of `events`
end_token: The last point in the timeline that the client should see events
from. Normally this will be the same as the global `now_token`, but in
the case of rooms where the user has left the room, this will be the point
just after their leave event.
This is used in the calculation of the state which is returned in `state`:
any state changes *up to* `end_token` (and not beyond!) which are not
reflected in the timeline need to be returned in `state`.
out_of_band: whether the events in the room are "out of band" events out_of_band: whether the events in the room are "out of band" events
and the server isn't in the room. and the server isn't in the room.
""" """
@ -2900,5 +2993,5 @@ class RoomSyncResultBuilder:
full_state: bool full_state: bool
since_token: Optional[StreamToken] since_token: Optional[StreamToken]
upto_token: StreamToken upto_token: StreamToken
end_token: StreamToken
out_of_band: bool = False out_of_band: bool = False

View file

@ -77,5 +77,46 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
return 200, {} return 200, {}
class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
"""Copies push rules from an old room to new room.
Request format:
POST /_synapse/replication/copy_push_rules/:user_id/:old_room_id/:new_room_id
{}
"""
NAME = "copy_push_rules"
PATH_ARGS = ("user_id", "old_room_id", "new_room_id")
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._store = hs.get_datastores().main
@staticmethod
async def _serialize_payload(user_id: str, old_room_id: str, new_room_id: str) -> JsonDict: # type: ignore[override]
return {}
async def _handle_request( # type: ignore[override]
self,
request: Request,
content: JsonDict,
user_id: str,
old_room_id: str,
new_room_id: str,
) -> Tuple[int, JsonDict]:
await self._store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)
return 200, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationRemovePusherRestServlet(hs).register(http_server) ReplicationRemovePusherRestServlet(hs).register(http_server)
ReplicationCopyPusherRestServlet(hs).register(http_server)

View file

@ -66,6 +66,7 @@ from synapse.replication.tcp.streams import (
FederationStream, FederationStream,
PresenceFederationStream, PresenceFederationStream,
PresenceStream, PresenceStream,
PushRulesStream,
ReceiptsStream, ReceiptsStream,
Stream, Stream,
ToDeviceStream, ToDeviceStream,
@ -178,6 +179,12 @@ class ReplicationCommandHandler:
continue continue
if isinstance(stream, PushRulesStream):
if hs.get_instance_name() in hs.config.worker.writers.push_rules:
self._streams_to_replicate.append(stream)
continue
# Only add any other streams if we're on master. # Only add any other streams if we're on master.
if hs.config.worker.worker_app is not None: if hs.config.worker.worker_app is not None:
continue continue

View file

@ -81,8 +81,7 @@ class AccountDataServlet(RestServlet):
raise AuthError(403, "Cannot add account data for other users.") raise AuthError(403, "Cannot add account data for other users.")
# Raise an error if the account data type cannot be set directly. # Raise an error if the account data type cannot be set directly.
if self._hs.config.experimental.msc4010_push_rules_account_data: _check_can_set_account_data_type(account_data_type)
_check_can_set_account_data_type(account_data_type)
body = parse_json_object_from_request(request) body = parse_json_object_from_request(request)
@ -108,10 +107,7 @@ class AccountDataServlet(RestServlet):
raise AuthError(403, "Cannot get account data for other users.") raise AuthError(403, "Cannot get account data for other users.")
# Push rules are stored in a separate table and must be queried separately. # Push rules are stored in a separate table and must be queried separately.
if ( if account_data_type == AccountDataTypes.PUSH_RULES:
self._hs.config.experimental.msc4010_push_rules_account_data
and account_data_type == AccountDataTypes.PUSH_RULES
):
account_data: Optional[JsonMapping] = ( account_data: Optional[JsonMapping] = (
await self._push_rules_handler.push_rules_for_user(requester.user) await self._push_rules_handler.push_rules_for_user(requester.user)
) )
@ -162,8 +158,7 @@ class UnstableAccountDataServlet(RestServlet):
raise AuthError(403, "Cannot delete account data for other users.") raise AuthError(403, "Cannot delete account data for other users.")
# Raise an error if the account data type cannot be set directly. # Raise an error if the account data type cannot be set directly.
if self._hs.config.experimental.msc4010_push_rules_account_data: _check_can_set_account_data_type(account_data_type)
_check_can_set_account_data_type(account_data_type)
await self.handler.remove_account_data_for_user(user_id, account_data_type) await self.handler.remove_account_data_for_user(user_id, account_data_type)
@ -209,15 +204,7 @@ class RoomAccountDataServlet(RestServlet):
) )
# Raise an error if the account data type cannot be set directly. # Raise an error if the account data type cannot be set directly.
if self._hs.config.experimental.msc4010_push_rules_account_data: _check_can_set_account_data_type(account_data_type)
_check_can_set_account_data_type(account_data_type)
elif account_data_type == ReceiptTypes.FULLY_READ:
raise SynapseError(
405,
"Cannot set m.fully_read through this API."
" Use /rooms/!roomId:server.name/read_markers",
Codes.BAD_JSON,
)
body = parse_json_object_from_request(request) body = parse_json_object_from_request(request)
@ -256,10 +243,7 @@ class RoomAccountDataServlet(RestServlet):
) )
# Room-specific push rules are not currently supported. # Room-specific push rules are not currently supported.
if ( if account_data_type == AccountDataTypes.PUSH_RULES:
self._hs.config.experimental.msc4010_push_rules_account_data
and account_data_type == AccountDataTypes.PUSH_RULES
):
account_data: Optional[JsonMapping] = {} account_data: Optional[JsonMapping] = {}
else: else:
account_data = await self.store.get_account_data_for_room_and_type( account_data = await self.store.get_account_data_for_room_and_type(
@ -317,8 +301,7 @@ class UnstableRoomAccountDataServlet(RestServlet):
) )
# Raise an error if the account data type cannot be set directly. # Raise an error if the account data type cannot be set directly.
if self._hs.config.experimental.msc4010_push_rules_account_data: _check_can_set_account_data_type(account_data_type)
_check_can_set_account_data_type(account_data_type)
await self.handler.remove_account_data_for_room( await self.handler.remove_account_data_for_room(
user_id, room_id, account_data_type user_id, room_id, account_data_type

View file

@ -59,12 +59,14 @@ class PushRuleRestServlet(RestServlet):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self._is_worker = hs.config.worker.worker_app is not None self._is_push_worker = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
self._push_rules_handler = hs.get_push_rules_handler() self._push_rules_handler = hs.get_push_rules_handler()
self._push_rule_linearizer = Linearizer(name="push_rules") self._push_rule_linearizer = Linearizer(name="push_rules")
async def on_PUT(self, request: SynapseRequest, path: str) -> Tuple[int, JsonDict]: async def on_PUT(self, request: SynapseRequest, path: str) -> Tuple[int, JsonDict]:
if self._is_worker: if not self._is_push_worker:
raise Exception("Cannot handle PUT /push_rules on worker") raise Exception("Cannot handle PUT /push_rules on worker")
requester = await self.auth.get_user_by_req(request) requester = await self.auth.get_user_by_req(request)
@ -137,7 +139,7 @@ class PushRuleRestServlet(RestServlet):
async def on_DELETE( async def on_DELETE(
self, request: SynapseRequest, path: str self, request: SynapseRequest, path: str
) -> Tuple[int, JsonDict]: ) -> Tuple[int, JsonDict]:
if self._is_worker: if not self._is_push_worker:
raise Exception("Cannot handle DELETE /push_rules on worker") raise Exception("Cannot handle DELETE /push_rules on worker")
requester = await self.auth.get_user_by_req(request) requester = await self.auth.get_user_by_req(request)

View file

@ -55,7 +55,6 @@ class RelationPaginationServlet(RestServlet):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self._store = hs.get_datastores().main self._store = hs.get_datastores().main
self._relations_handler = hs.get_relations_handler() self._relations_handler = hs.get_relations_handler()
self._support_recurse = hs.config.experimental.msc3981_recurse_relations
async def on_GET( async def on_GET(
self, self,
@ -70,12 +69,9 @@ class RelationPaginationServlet(RestServlet):
pagination_config = await PaginationConfig.from_request( pagination_config = await PaginationConfig.from_request(
self._store, request, default_limit=5, default_dir=Direction.BACKWARDS self._store, request, default_limit=5, default_dir=Direction.BACKWARDS
) )
if self._support_recurse: recurse = parse_boolean(request, "recurse", default=False) or parse_boolean(
recurse = parse_boolean(request, "recurse", default=False) or parse_boolean( request, "org.matrix.msc3981.recurse", default=False
request, "org.matrix.msc3981.recurse", default=False )
)
else:
recurse = False
# The unstable version of this API returns an extra field for client # The unstable version of this API returns an extra field for client
# compatibility, see https://github.com/matrix-org/synapse/issues/12930. # compatibility, see https://github.com/matrix-org/synapse/issues/12930.

View file

@ -132,7 +132,8 @@ class VersionsRestServlet(RestServlet):
# Adds support for relation-based redactions as per MSC3912. # Adds support for relation-based redactions as per MSC3912.
"org.matrix.msc3912": self.config.experimental.msc3912_enabled, "org.matrix.msc3912": self.config.experimental.msc3912_enabled,
# Whether recursively provide relations is supported. # Whether recursively provide relations is supported.
"org.matrix.msc3981": self.config.experimental.msc3981_recurse_relations, # TODO This is no longer needed once unstable MSC3981 does not need to be supported.
"org.matrix.msc3981": True,
# Adds support for deleting account data. # Adds support for deleting account data.
"org.matrix.msc3391": self.config.experimental.msc3391_enabled, "org.matrix.msc3391": self.config.experimental.msc3391_enabled,
# Allows clients to inhibit profile update propagation. # Allows clients to inhibit profile update propagation.

View file

@ -63,7 +63,7 @@ from .openid import OpenIdStore
from .presence import PresenceStore from .presence import PresenceStore
from .profile import ProfileStore from .profile import ProfileStore
from .purge_events import PurgeEventsStore from .purge_events import PurgeEventsStore
from .push_rule import PushRuleStore from .push_rule import PushRulesWorkerStore
from .pusher import PusherStore from .pusher import PusherStore
from .receipts import ReceiptsStore from .receipts import ReceiptsStore
from .registration import RegistrationStore from .registration import RegistrationStore
@ -130,7 +130,6 @@ class DataStore(
RejectionsStore, RejectionsStore,
FilteringWorkerStore, FilteringWorkerStore,
PusherStore, PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore, ApplicationServiceTransactionStore,
EventPushActionsStore, EventPushActionsStore,
ServerMetricsStore, ServerMetricsStore,
@ -140,6 +139,7 @@ class DataStore(
SearchStore, SearchStore,
TagsStore, TagsStore,
AccountDataStore, AccountDataStore,
PushRulesWorkerStore,
StreamWorkerStore, StreamWorkerStore,
OpenIdStore, OpenIdStore,
ClientIpWorkerStore, ClientIpWorkerStore,

View file

@ -27,6 +27,7 @@ from typing import (
Collection, Collection,
Dict, Dict,
FrozenSet, FrozenSet,
Generator,
Iterable, Iterable,
List, List,
Optional, Optional,
@ -279,64 +280,16 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# Now we look up all links for the chains we have, adding chains that # Now we look up all links for the chains we have, adding chains that
# are reachable from any event. # are reachable from any event.
#
# This query is structured to first get all chain IDs reachable, and
# then pull out all links from those chains. This does pull out more
# rows than is strictly necessary, however there isn't a way of
# structuring the recursive part of query to pull out the links without
# also returning large quantities of redundant data (which can make it a
# lot slower).
sql = """
WITH RECURSIVE links(chain_id) AS (
SELECT
DISTINCT origin_chain_id
FROM event_auth_chain_links WHERE %s
UNION
SELECT
target_chain_id
FROM event_auth_chain_links
INNER JOIN links ON (chain_id = origin_chain_id)
)
SELECT
origin_chain_id, origin_sequence_number,
target_chain_id, target_sequence_number
FROM links
INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id)
"""
# A map from chain ID to max sequence number *reachable* from any event ID. # A map from chain ID to max sequence number *reachable* from any event ID.
chains: Dict[int, int] = {} chains: Dict[int, int] = {}
for links in self._get_chain_links(txn, set(event_chains.keys())):
# Add all linked chains reachable from initial set of chains.
chains_to_fetch = set(event_chains.keys())
while chains_to_fetch:
batch2 = tuple(itertools.islice(chains_to_fetch, 1000))
chains_to_fetch.difference_update(batch2)
clause, args = make_in_list_sql_clause(
txn.database_engine, "origin_chain_id", batch2
)
txn.execute(sql % (clause,), args)
links: Dict[int, List[Tuple[int, int, int]]] = {}
for (
origin_chain_id,
origin_sequence_number,
target_chain_id,
target_sequence_number,
) in txn:
links.setdefault(origin_chain_id, []).append(
(origin_sequence_number, target_chain_id, target_sequence_number)
)
for chain_id in links: for chain_id in links:
if chain_id not in event_chains: if chain_id not in event_chains:
continue continue
_materialize(chain_id, event_chains[chain_id], links, chains) _materialize(chain_id, event_chains[chain_id], links, chains)
chains_to_fetch.difference_update(chains)
# Add the initial set of chains, excluding the sequence corresponding to # Add the initial set of chains, excluding the sequence corresponding to
# initial event. # initial event.
for chain_id, seq_no in event_chains.items(): for chain_id, seq_no in event_chains.items():
@ -380,6 +333,68 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return results return results
@classmethod
def _get_chain_links(
cls, txn: LoggingTransaction, chains_to_fetch: Set[int]
) -> Generator[Dict[int, List[Tuple[int, int, int]]], None, None]:
"""Fetch all auth chain links from the given set of chains, and all
links from those chains, recursively.
Note: This may return links that are not reachable from the given
chains.
Returns a generator that produces dicts from origin chain ID to 3-tuple
of origin sequence number, target chain ID and target sequence number.
"""
# This query is structured to first get all chain IDs reachable, and
# then pull out all links from those chains. This does pull out more
# rows than is strictly necessary, however there isn't a way of
# structuring the recursive part of query to pull out the links without
# also returning large quantities of redundant data (which can make it a
# lot slower).
sql = """
WITH RECURSIVE links(chain_id) AS (
SELECT
DISTINCT origin_chain_id
FROM event_auth_chain_links WHERE %s
UNION
SELECT
target_chain_id
FROM event_auth_chain_links
INNER JOIN links ON (chain_id = origin_chain_id)
)
SELECT
origin_chain_id, origin_sequence_number,
target_chain_id, target_sequence_number
FROM links
INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id)
"""
while chains_to_fetch:
batch2 = tuple(itertools.islice(chains_to_fetch, 1000))
chains_to_fetch.difference_update(batch2)
clause, args = make_in_list_sql_clause(
txn.database_engine, "origin_chain_id", batch2
)
txn.execute(sql % (clause,), args)
links: Dict[int, List[Tuple[int, int, int]]] = {}
for (
origin_chain_id,
origin_sequence_number,
target_chain_id,
target_sequence_number,
) in txn:
links.setdefault(origin_chain_id, []).append(
(origin_sequence_number, target_chain_id, target_sequence_number)
)
chains_to_fetch.difference_update(links)
yield links
def _get_auth_chain_ids_txn( def _get_auth_chain_ids_txn(
self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool
) -> Set[str]: ) -> Set[str]:
@ -564,53 +579,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# Now we look up all links for the chains we have, adding chains that # Now we look up all links for the chains we have, adding chains that
# are reachable from any event. # are reachable from any event.
#
# This query is structured to first get all chain IDs reachable, and
# then pull out all links from those chains. This does pull out more
# rows than is strictly necessary, however there isn't a way of
# structuring the recursive part of query to pull out the links without
# also returning large quantities of redundant data (which can make it a
# lot slower).
sql = """
WITH RECURSIVE links(chain_id) AS (
SELECT
DISTINCT origin_chain_id
FROM event_auth_chain_links WHERE %s
UNION
SELECT
target_chain_id
FROM event_auth_chain_links
INNER JOIN links ON (chain_id = origin_chain_id)
)
SELECT
origin_chain_id, origin_sequence_number,
target_chain_id, target_sequence_number
FROM links
INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id)
"""
# (We need to take a copy of `seen_chains` as we want to mutate it in
# the loop)
chains_to_fetch = set(seen_chains)
while chains_to_fetch:
batch2 = tuple(itertools.islice(chains_to_fetch, 1000))
clause, args = make_in_list_sql_clause(
txn.database_engine, "origin_chain_id", batch2
)
txn.execute(sql % (clause,), args)
links: Dict[int, List[Tuple[int, int, int]]] = {}
for (
origin_chain_id,
origin_sequence_number,
target_chain_id,
target_sequence_number,
) in txn:
links.setdefault(origin_chain_id, []).append(
(origin_sequence_number, target_chain_id, target_sequence_number)
)
# (We need to take a copy of `seen_chains` as the function mutates it)
for links in self._get_chain_links(txn, set(seen_chains)):
for chains in set_to_chain: for chains in set_to_chain:
for chain_id in links: for chain_id in links:
if chain_id not in chains: if chain_id not in chains:
@ -618,7 +589,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
_materialize(chain_id, chains[chain_id], links, chains) _materialize(chain_id, chains[chain_id], links, chains)
chains_to_fetch.difference_update(chains)
seen_chains.update(chains) seen_chains.update(chains)
# Now for each chain we figure out the maximum sequence number reachable # Now for each chain we figure out the maximum sequence number reachable

View file

@ -106,7 +106,7 @@ from synapse.storage.database import (
) )
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
@ -859,37 +859,86 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return await self.db_pool.runInteraction("get_push_action_users_in_range", f) return await self.db_pool.runInteraction("get_push_action_users_in_range", f)
def _get_receipts_by_room_txn( def _get_receipts_for_room_and_threads_txn(
self, txn: LoggingTransaction, user_id: str self,
txn: LoggingTransaction,
user_id: str,
room_ids: StrCollection,
thread_ids: StrCollection,
) -> Dict[str, _RoomReceipt]: ) -> Dict[str, _RoomReceipt]:
""" """
Generate a map of room ID to the latest stream ordering that has been Get (private) read receipts for a user in each of the given room IDs
read by the given user. and thread IDs.
Args: Note: The corresponding room ID for each thread must appear in
txn: `room_ids` arg.
user_id: The user to fetch receipts for.
Returns: Returns:
A map including all rooms the user is in with a receipt. It maps A map including all rooms the user is in with a receipt. It maps
room IDs to _RoomReceipt instances room IDs to _RoomReceipt instances
""" """
receipt_types_clause, args = make_in_list_sql_clause(
receipt_types_clause, receipts_args = make_in_list_sql_clause(
self.database_engine, self.database_engine,
"receipt_type", "receipt_type",
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
) )
thread_ids_clause, thread_ids_args = make_in_list_sql_clause(
self.database_engine,
"thread_id",
thread_ids,
)
room_ids_clause, room_ids_args = make_in_list_sql_clause(
self.database_engine,
"room_id",
room_ids,
)
# We use the union of two (almost identical) queries here, the first to
# fetch the specific thread receipts and the second to fetch the
# unthreaded receipts.
#
# This SQL is optimized to use the indices we have on
# `receipts_linearized`.
#
# We compare room ID and thread IDs independently due to the above,
# which means that this query might return more rows than we need if the
# same thread ID appears across different rooms (e.g. 'main' thread ID).
# This doesn't cause any logic issues, and isn't a performance concern
# given this function generally gets called with only one room and
# thread ID.
sql = f""" sql = f"""
SELECT room_id, thread_id, MAX(stream_ordering) SELECT room_id, thread_id, MAX(stream_ordering)
FROM receipts_linearized FROM receipts_linearized
INNER JOIN events USING (room_id, event_id) INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause} WHERE {receipt_types_clause}
AND {thread_ids_clause}
AND {room_ids_clause}
AND user_id = ?
GROUP BY room_id, thread_id
UNION ALL
SELECT room_id, thread_id, MAX(stream_ordering)
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause}
AND {room_ids_clause}
AND thread_id IS NULL
AND user_id = ? AND user_id = ?
GROUP BY room_id, thread_id GROUP BY room_id, thread_id
""" """
args.extend((user_id,)) args = list(receipts_args)
args.extend(thread_ids_args)
args.extend(room_ids_args)
args.append(user_id)
args.extend(receipts_args)
args.extend(room_ids_args)
args.append(user_id)
txn.execute(sql, args) txn.execute(sql, args)
result: Dict[str, _RoomReceipt] = {} result: Dict[str, _RoomReceipt] = {}
@ -925,12 +974,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries. The list will have between 0~limit entries.
""" """
receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_http_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
)
def get_push_actions_txn( def get_push_actions_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, str, bool]]: ) -> List[Tuple[str, str, str, int, str, bool]]:
@ -952,6 +995,27 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
"get_unread_push_actions_for_user_in_range_http", get_push_actions_txn "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn
) )
room_ids = set()
thread_ids = []
for (
_,
room_id,
thread_id,
_,
_,
_,
) in push_actions:
room_ids.add(room_id)
thread_ids.append(thread_id)
receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_http_receipts",
self._get_receipts_for_room_and_threads_txn,
user_id=user_id,
room_ids=room_ids,
thread_ids=thread_ids,
)
notifs = [ notifs = [
HttpPushAction( HttpPushAction(
event_id=event_id, event_id=event_id,
@ -998,12 +1062,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries. The list will have between 0~limit entries.
""" """
receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_email_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
)
def get_push_actions_txn( def get_push_actions_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, str, bool, int]]: ) -> List[Tuple[str, str, str, int, str, bool, int]]:
@ -1026,6 +1084,28 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
"get_unread_push_actions_for_user_in_range_email", get_push_actions_txn "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
) )
room_ids = set()
thread_ids = []
for (
_,
room_id,
thread_id,
_,
_,
_,
_,
) in push_actions:
room_ids.add(room_id)
thread_ids.append(thread_id)
receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_email_receipts",
self._get_receipts_for_room_and_threads_txn,
user_id=user_id,
room_ids=room_ids,
thread_ids=thread_ids,
)
# Make a list of dicts from the two sets of results. # Make a list of dicts from the two sets of results.
notifs = [ notifs = [
EmailPushAction( EmailPushAction(

View file

@ -53,11 +53,7 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import IdGenerator, StreamIdGenerator
AbstractStreamIdGenerator,
IdGenerator,
StreamIdGenerator,
)
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util import json_encoder, unwrapFirstError from synapse.util import json_encoder, unwrapFirstError
@ -130,6 +126,8 @@ class PushRulesWorkerStore(
`get_max_push_rules_stream_id` which can be called in the initializer. `get_max_push_rules_stream_id` which can be called in the initializer.
""" """
_push_rules_stream_id_gen: StreamIdGenerator
def __init__( def __init__(
self, self,
database: DatabasePool, database: DatabasePool,
@ -138,6 +136,10 @@ class PushRulesWorkerStore(
): ):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self._is_push_writer = (
hs.get_instance_name() in hs.config.worker.writers.push_rules
)
# In the worker store this is an ID tracker which we overwrite in the non-worker # In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process. # class below that is used on the main process.
self._push_rules_stream_id_gen = StreamIdGenerator( self._push_rules_stream_id_gen = StreamIdGenerator(
@ -145,7 +147,7 @@ class PushRulesWorkerStore(
hs.get_replication_notifier(), hs.get_replication_notifier(),
"push_rules_stream", "push_rules_stream",
"stream_id", "stream_id",
is_writer=hs.config.worker.worker_app is None, is_writer=self._is_push_writer,
) )
push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict( push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
@ -162,6 +164,9 @@ class PushRulesWorkerStore(
prefilled_cache=push_rules_prefill, prefilled_cache=push_rules_prefill,
) )
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
def get_max_push_rules_stream_id(self) -> int: def get_max_push_rules_stream_id(self) -> int:
"""Get the position of the push rules stream. """Get the position of the push rules stream.
@ -383,23 +388,6 @@ class PushRulesWorkerStore(
"get_all_push_rule_updates", get_all_push_rule_updates_txn "get_all_push_rule_updates", get_all_push_rule_updates_txn
) )
class PushRuleStore(PushRulesWorkerStore):
# Because we have write access, this will be a StreamIdGenerator
# (see PushRulesWorkerStore.__init__)
_push_rules_stream_id_gen: AbstractStreamIdGenerator
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
async def add_push_rule( async def add_push_rule(
self, self,
user_id: str, user_id: str,
@ -410,6 +398,9 @@ class PushRuleStore(PushRulesWorkerStore):
before: Optional[str] = None, before: Optional[str] = None,
after: Optional[str] = None, after: Optional[str] = None,
) -> None: ) -> None:
if not self._is_push_writer:
raise Exception("Not a push writer")
conditions_json = json_encoder.encode(conditions) conditions_json = json_encoder.encode(conditions)
actions_json = json_encoder.encode(actions) actions_json = json_encoder.encode(actions)
async with self._push_rules_stream_id_gen.get_next() as stream_id: async with self._push_rules_stream_id_gen.get_next() as stream_id:
@ -455,6 +446,9 @@ class PushRuleStore(PushRulesWorkerStore):
before: str, before: str,
after: str, after: str,
) -> None: ) -> None:
if not self._is_push_writer:
raise Exception("Not a push writer")
relative_to_rule = before or after relative_to_rule = before or after
sql = """ sql = """
@ -524,6 +518,9 @@ class PushRuleStore(PushRulesWorkerStore):
conditions_json: str, conditions_json: str,
actions_json: str, actions_json: str,
) -> None: ) -> None:
if not self._is_push_writer:
raise Exception("Not a push writer")
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
# Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first
# then re-select the count/max below. # then re-select the count/max below.
@ -575,6 +572,9 @@ class PushRuleStore(PushRulesWorkerStore):
actions_json: str, actions_json: str,
update_stream: bool = True, update_stream: bool = True,
) -> None: ) -> None:
if not self._is_push_writer:
raise Exception("Not a push writer")
"""Specialised version of simple_upsert_txn that picks a push_rule_id """Specialised version of simple_upsert_txn that picks a push_rule_id
using the _push_rule_id_gen if it needs to insert the rule. It assumes using the _push_rule_id_gen if it needs to insert the rule. It assumes
that the "push_rules" table is locked""" that the "push_rules" table is locked"""
@ -653,6 +653,8 @@ class PushRuleStore(PushRulesWorkerStore):
user_id: The matrix ID of the push rule owner user_id: The matrix ID of the push rule owner
rule_id: The rule_id of the rule to be deleted rule_id: The rule_id of the rule to be deleted
""" """
if not self._is_push_writer:
raise Exception("Not a push writer")
def delete_push_rule_txn( def delete_push_rule_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
@ -704,6 +706,9 @@ class PushRuleStore(PushRulesWorkerStore):
Raises: Raises:
RuleNotFoundException if the rule does not exist. RuleNotFoundException if the rule does not exist.
""" """
if not self._is_push_writer:
raise Exception("Not a push writer")
async with self._push_rules_stream_id_gen.get_next() as stream_id: async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token() event_stream_ordering = self._stream_id_gen.get_current_token()
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
@ -727,6 +732,9 @@ class PushRuleStore(PushRulesWorkerStore):
enabled: bool, enabled: bool,
is_default_rule: bool, is_default_rule: bool,
) -> None: ) -> None:
if not self._is_push_writer:
raise Exception("Not a push writer")
new_id = self._push_rules_enable_id_gen.get_next() new_id = self._push_rules_enable_id_gen.get_next()
if not is_default_rule: if not is_default_rule:
@ -796,6 +804,9 @@ class PushRuleStore(PushRulesWorkerStore):
Raises: Raises:
RuleNotFoundException if the rule does not exist. RuleNotFoundException if the rule does not exist.
""" """
if not self._is_push_writer:
raise Exception("Not a push writer")
actions_json = json_encoder.encode(actions) actions_json = json_encoder.encode(actions)
def set_push_rule_actions_txn( def set_push_rule_actions_txn(
@ -865,6 +876,9 @@ class PushRuleStore(PushRulesWorkerStore):
op: str, op: str,
data: Optional[JsonDict] = None, data: Optional[JsonDict] = None,
) -> None: ) -> None:
if not self._is_push_writer:
raise Exception("Not a push writer")
values = { values = {
"stream_id": stream_id, "stream_id": stream_id,
"event_stream_ordering": event_stream_ordering, "event_stream_ordering": event_stream_ordering,
@ -882,9 +896,6 @@ class PushRuleStore(PushRulesWorkerStore):
self.push_rules_stream_cache.entity_has_changed, user_id, stream_id self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
) )
def get_max_push_rules_stream_id(self) -> int:
return self._push_rules_stream_id_gen.get_current_token()
async def copy_push_rule_from_room_to_room( async def copy_push_rule_from_room_to_room(
self, new_room_id: str, user_id: str, rule: PushRule self, new_room_id: str, user_id: str, rule: PushRule
) -> None: ) -> None:
@ -895,6 +906,9 @@ class PushRuleStore(PushRulesWorkerStore):
user_id : ID of user the push rule belongs to. user_id : ID of user the push rule belongs to.
rule: A push rule. rule: A push rule.
""" """
if not self._is_push_writer:
raise Exception("Not a push writer")
# Create new rule id # Create new rule id
rule_id_scope = "/".join(rule.rule_id.split("/")[:-1]) rule_id_scope = "/".join(rule.rule_id.split("/")[:-1])
new_rule_id = rule_id_scope + "/" + new_room_id new_rule_id = rule_id_scope + "/" + new_room_id
@ -930,6 +944,9 @@ class PushRuleStore(PushRulesWorkerStore):
new_room_id: ID of the new room. new_room_id: ID of the new room.
user_id: ID of user to copy push rules for. user_id: ID of user to copy push rules for.
""" """
if not self._is_push_writer:
raise Exception("Not a push writer")
# Retrieve push rules for this user # Retrieve push rules for this user
user_push_rules = await self.get_push_rules_for_user(user_id) user_push_rules = await self.get_push_rules_for_user(user_id)

View file

@ -2108,6 +2108,13 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
unique=False, unique=False,
) )
self.db_pool.updates.register_background_index_update(
update_name="access_tokens_refresh_token_id_idx",
index_name="access_tokens_refresh_token_id_idx",
table="access_tokens",
columns=("refresh_token_id",),
)
async def _background_update_set_deactivated_flag( async def _background_update_set_deactivated_flag(
self, progress: JsonDict, batch_size: int self, progress: JsonDict, batch_size: int
) -> int: ) -> int:

View file

@ -0,0 +1,15 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2023 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8404, 'access_tokens_refresh_token_id_idx', '{}');

View file

@ -17,14 +17,16 @@
# [This file includes modifications made by New Vector Limited] # [This file includes modifications made by New Vector Limited]
# #
# #
from typing import Collection, List, Optional from typing import Collection, ContextManager, List, Optional
from unittest.mock import AsyncMock, Mock, patch from unittest.mock import AsyncMock, Mock, patch
from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, JoinRules from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, ResourceLimitError from synapse.api.errors import Codes, ResourceLimitError
from synapse.api.filtering import Filtering from synapse.api.filtering import FilterCollection, Filtering
from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
@ -33,7 +35,7 @@ from synapse.handlers.sync import SyncConfig, SyncResult
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import knock, login, room from synapse.rest.client import knock, login, room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import UserID, create_requester from synapse.types import JsonDict, UserID, create_requester
from synapse.util import Clock from synapse.util import Clock
import tests.unittest import tests.unittest
@ -258,13 +260,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
# Eve tries to join the room. We monkey patch the internal logic which selects # Eve tries to join the room. We monkey patch the internal logic which selects
# the prev_events used when creating the join event, such that the ban does not # the prev_events used when creating the join event, such that the ban does not
# precede the join. # precede the join.
mocked_get_prev_events = patch.object( with self._patch_get_latest_events([last_room_creation_event_id]):
self.hs.get_datastores().main,
"get_prev_events_for_room",
new_callable=AsyncMock,
return_value=[last_room_creation_event_id],
)
with mocked_get_prev_events:
self.helper.join(room_id, eve, tok=eve_token) self.helper.join(room_id, eve, tok=eve_token)
# Eve makes a second, incremental sync. # Eve makes a second, incremental sync.
@ -288,6 +284,460 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
) )
self.assertEqual(eve_initial_sync_after_join.joined, []) self.assertEqual(eve_initial_sync_after_join.joined, [])
def test_state_includes_changes_on_forks(self) -> None:
"""State changes that happen on a fork of the DAG must be included in `state`
Given the following DAG:
E1
| S2
|
--|------|----
| |
E3 |
/
E4
... and a filter that means we only return 2 events, represented by the dashed
horizontal line: `S2` must be included in the `state` section.
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
# Do an initial sync as Alice to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)
# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
# Send a final event, joining the two branches of the dag
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
# do an incremental sync, with a filter that will ensure we only get two of
# the three new events.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 2}}}
),
),
since_token=initial_sync_result.next_batch,
)
)
# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertTrue(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event, e4_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)
def test_state_includes_changes_on_forks_when_events_excluded(self) -> None:
"""A variation on the previous test, but where one event is filtered
The DAG is the same as the previous test, but E4 is excluded by the filter.
E1
| S2
|
--|------|----
| |
E3 |
/
(E4)
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
# Do an initial sync as Alice to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)
# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
# Send a final event, joining the two branches of the dag
self.helper.send(room_id, "e4", type="not_a_normal_message", tok=alice_tok)[
"event_id"
]
# do an incremental sync, with a filter that will only return E3, excluding S2
# and E4.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs,
{
"room": {
"timeline": {
"limit": 1,
"not_types": ["not_a_normal_message"],
}
}
},
),
),
since_token=initial_sync_result.next_batch,
)
)
# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertTrue(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)
def test_state_includes_changes_on_long_lived_forks(self) -> None:
"""State changes that happen on a fork of the DAG must be included in `state`
Given the following DAG:
E1
| S2
|
--|------|----
E3 |
--|------|----
| E4
| |
... and a filter that means we only return 1 event, represented by the dashed
horizontal lines: `S2` must be included in the `state` section on the second sync.
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
# Do an initial sync as Alice to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)
# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
# Do an incremental sync, this will return E3 but *not* S2 at this
# point.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 1}}}
),
),
since_token=initial_sync_result.next_batch,
)
)
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertTrue(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[],
)
# Now send another event that points to S2, but not E3.
with self._patch_get_latest_events([s2_event]):
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
# Doing an incremental sync should return S2 in state.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 1}}}
),
),
since_token=incremental_sync.next_batch,
)
)
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertFalse(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e4_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)
def test_state_includes_changes_on_ungappy_syncs(self) -> None:
"""Test `state` where the sync is not gappy.
We start with a DAG like this:
E1
| S2
|
--|---
|
E3
... and initialsync with `limit=1`, represented by the horizontal dashed line.
At this point, we do not expect S2 to appear in the response at all (since
it is excluded from the timeline by the `limit`, and the state is based on the
state after the most recent event before the sync token (E3), which doesn't
include S2.
Now more events arrive, and we do an incremental sync:
E1
| S2
|
E3 |
|
--|------|----
| |
E4 |
/
E5
This is the last chance for us to tell the client about S2, so it *must* be
included in the response.
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
# Do an initial sync to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)
# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
# Another initial sync, with limit=1
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 1}}}
),
),
)
)
room_sync = initial_sync_result.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()])
# More events, E4 and E5
with self._patch_get_latest_events([e3_event]):
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
e5_event = self.helper.send(room_id, "e5", tok=alice_tok)["event_id"]
# Now incremental sync
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice),
since_token=initial_sync_result.next_batch,
)
)
# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertFalse(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e4_event, e5_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)
@parameterized.expand(
[
(False, False),
(True, False),
(False, True),
(True, True),
]
)
def test_archived_rooms_do_not_include_state_after_leave(
self, initial_sync: bool, empty_timeline: bool
) -> None:
"""If the user leaves the room, state changes that happen after they leave are not returned.
We try with both a zero and a normal timeline limit,
and we try both an initial sync and an incremental sync for both.
"""
if empty_timeline and not initial_sync:
# FIXME synapse doesn't return the room at all in this situation!
self.skipTest("Synapse does not correctly handle this case")
# Alice creates the room, and bob joins.
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
bob = self.register_user("bob", "password")
bob_tok = self.login(bob, "password")
bob_requester = create_requester(bob)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
self.helper.join(room_id, bob, tok=bob_tok)
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
bob_requester, generate_sync_config(bob)
)
)
# Alice sends a message and a state
before_message_event = self.helper.send(room_id, "before", tok=alice_tok)[
"event_id"
]
before_state_event = self.helper.send_state(
room_id, "test_state", {"body": "before"}, tok=alice_tok
)["event_id"]
# Bob leaves
leave_event = self.helper.leave(room_id, bob, tok=bob_tok)["event_id"]
# Alice sends some more stuff
self.helper.send(room_id, "after", tok=alice_tok)["event_id"]
self.helper.send_state(room_id, "test_state", {"body": "after"}, tok=alice_tok)[
"event_id"
]
# And now, Bob resyncs.
filter_dict: JsonDict = {"room": {"include_leave": True}}
if empty_timeline:
filter_dict["room"]["timeline"] = {"limit": 0}
sync_room_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
bob_requester,
generate_sync_config(
bob, filter_collection=FilterCollection(self.hs, filter_dict)
),
since_token=None if initial_sync else initial_sync_result.next_batch,
)
).archived[0]
if empty_timeline:
# The timeline should be empty
self.assertEqual(sync_room_result.timeline.events, [])
# And the state should include the leave event...
self.assertEqual(
sync_room_result.state[("m.room.member", bob)].event_id, leave_event
)
# ... and the state change before he left.
self.assertEqual(
sync_room_result.state[("test_state", "")].event_id, before_state_event
)
else:
# The last three events in the timeline should be those leading up to the
# leave
self.assertEqual(
[e.event_id for e in sync_room_result.timeline.events[-3:]],
[before_message_event, before_state_event, leave_event],
)
# ... And the state should be empty
self.assertEqual(sync_room_result.state, {})
def _patch_get_latest_events(self, latest_events: List[str]) -> ContextManager:
"""Monkey-patch `get_prev_events_for_room`
Returns a context manager which will replace the implementation of
`get_prev_events_for_room` with one which returns `latest_events`.
"""
return patch.object(
self.hs.get_datastores().main,
"get_prev_events_for_room",
new_callable=AsyncMock,
return_value=latest_events,
)
def test_call_invite_in_public_room_not_returned(self) -> None: def test_call_invite_in_public_room_not_returned(self) -> None:
user = self.register_user("alice", "password") user = self.register_user("alice", "password")
tok = self.login(user, "password") tok = self.login(user, "password")
@ -401,14 +851,26 @@ _request_key = 0
def generate_sync_config( def generate_sync_config(
user_id: str, device_id: Optional[str] = "device_id" user_id: str,
device_id: Optional[str] = "device_id",
filter_collection: Optional[FilterCollection] = None,
) -> SyncConfig: ) -> SyncConfig:
"""Generate a sync config (with a unique request key).""" """Generate a sync config (with a unique request key).
Args:
user_id: user who is syncing.
device_id: device that is syncing. Defaults to "device_id".
filter_collection: filter to apply. Defaults to the default filter (ie,
return everything, with a default limit)
"""
if filter_collection is None:
filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION
global _request_key global _request_key
_request_key += 1 _request_key += 1
return SyncConfig( return SyncConfig(
user=UserID.from_string(user_id), user=UserID.from_string(user_id),
filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION, filter_collection=filter_collection,
is_guest=False, is_guest=False,
request_key=("request_key", _request_key), request_key=("request_key", _request_key),
device_id=device_id, device_id=device_id,

View file

@ -35,7 +35,6 @@ from synapse.util import Clock
from tests import unittest from tests import unittest
from tests.server import FakeChannel from tests.server import FakeChannel
from tests.test_utils.event_injection import inject_event from tests.test_utils.event_injection import inject_event
from tests.unittest import override_config
class BaseRelationsTestCase(unittest.HomeserverTestCase): class BaseRelationsTestCase(unittest.HomeserverTestCase):
@ -957,7 +956,6 @@ class RelationPaginationTestCase(BaseRelationsTestCase):
class RecursiveRelationTestCase(BaseRelationsTestCase): class RecursiveRelationTestCase(BaseRelationsTestCase):
@override_config({"experimental_features": {"msc3981_recurse_relations": True}})
def test_recursive_relations(self) -> None: def test_recursive_relations(self) -> None:
"""Generate a complex, multi-level relationship tree and query it.""" """Generate a complex, multi-level relationship tree and query it."""
# Create a thread with a few messages in it. # Create a thread with a few messages in it.
@ -1003,7 +1001,7 @@ class RecursiveRelationTestCase(BaseRelationsTestCase):
channel = self.make_request( channel = self.make_request(
"GET", "GET",
f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}" f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}"
"?dir=f&limit=20&org.matrix.msc3981.recurse=true", "?dir=f&limit=20&recurse=true",
access_token=self.user_token, access_token=self.user_token,
) )
self.assertEqual(200, channel.code, channel.json_body) self.assertEqual(200, channel.code, channel.json_body)
@ -1024,7 +1022,6 @@ class RecursiveRelationTestCase(BaseRelationsTestCase):
], ],
) )
@override_config({"experimental_features": {"msc3981_recurse_relations": True}})
def test_recursive_relations_with_filter(self) -> None: def test_recursive_relations_with_filter(self) -> None:
"""The event_type and rel_type still apply.""" """The event_type and rel_type still apply."""
# Create a thread with a few messages in it. # Create a thread with a few messages in it.
@ -1052,7 +1049,7 @@ class RecursiveRelationTestCase(BaseRelationsTestCase):
channel = self.make_request( channel = self.make_request(
"GET", "GET",
f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}/{RelationTypes.ANNOTATION}" f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}/{RelationTypes.ANNOTATION}"
"?dir=f&limit=20&org.matrix.msc3981.recurse=true", "?dir=f&limit=20&recurse=true",
access_token=self.user_token, access_token=self.user_token,
) )
self.assertEqual(200, channel.code, channel.json_body) self.assertEqual(200, channel.code, channel.json_body)
@ -1065,7 +1062,7 @@ class RecursiveRelationTestCase(BaseRelationsTestCase):
channel = self.make_request( channel = self.make_request(
"GET", "GET",
f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}/{RelationTypes.ANNOTATION}/m.reaction" f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}/{RelationTypes.ANNOTATION}/m.reaction"
"?dir=f&limit=20&org.matrix.msc3981.recurse=true", "?dir=f&limit=20&recurse=true",
access_token=self.user_token, access_token=self.user_token,
) )
self.assertEqual(200, channel.code, channel.json_body) self.assertEqual(200, channel.code, channel.json_body)

View file

@ -170,8 +170,8 @@ class RestHelper:
targ: Optional[str] = None, targ: Optional[str] = None,
expect_code: int = HTTPStatus.OK, expect_code: int = HTTPStatus.OK,
tok: Optional[str] = None, tok: Optional[str] = None,
) -> None: ) -> JsonDict:
self.change_membership( return self.change_membership(
room=room, room=room,
src=src, src=src,
targ=targ, targ=targ,
@ -189,8 +189,8 @@ class RestHelper:
appservice_user_id: Optional[str] = None, appservice_user_id: Optional[str] = None,
expect_errcode: Optional[Codes] = None, expect_errcode: Optional[Codes] = None,
expect_additional_fields: Optional[dict] = None, expect_additional_fields: Optional[dict] = None,
) -> None: ) -> JsonDict:
self.change_membership( return self.change_membership(
room=room, room=room,
src=user, src=user,
targ=user, targ=user,
@ -242,8 +242,8 @@ class RestHelper:
user: Optional[str] = None, user: Optional[str] = None,
expect_code: int = HTTPStatus.OK, expect_code: int = HTTPStatus.OK,
tok: Optional[str] = None, tok: Optional[str] = None,
) -> None: ) -> JsonDict:
self.change_membership( return self.change_membership(
room=room, room=room,
src=user, src=user,
targ=user, targ=user,
@ -282,7 +282,7 @@ class RestHelper:
expect_code: int = HTTPStatus.OK, expect_code: int = HTTPStatus.OK,
expect_errcode: Optional[str] = None, expect_errcode: Optional[str] = None,
expect_additional_fields: Optional[dict] = None, expect_additional_fields: Optional[dict] = None,
) -> None: ) -> JsonDict:
""" """
Send a membership state event into a room. Send a membership state event into a room.
@ -298,6 +298,9 @@ class RestHelper:
using an application service access token in `tok`. using an application service access token in `tok`.
expect_code: The expected HTTP response code expect_code: The expected HTTP response code
expect_errcode: The expected Matrix error code expect_errcode: The expected Matrix error code
Returns:
The JSON response
""" """
temp_id = self.auth_user_id temp_id = self.auth_user_id
self.auth_user_id = src self.auth_user_id = src
@ -356,6 +359,7 @@ class RestHelper:
) )
self.auth_user_id = temp_id self.auth_user_id = temp_id
return channel.json_body
def send( def send(
self, self,