forked from MirrorHub/synapse
Merge remote-tracking branch 'origin/develop' into shellcheck
Fixes a merge conflict with debian/changelog Signed-off-by: Dan Callahan <danc@element.io>
This commit is contained in:
commit
0dffa9d0e0
99 changed files with 1335 additions and 323 deletions
68
CHANGES.md
68
CHANGES.md
|
@ -1,3 +1,71 @@
|
|||
Synapse 1.46.0rc1 (2021-10-27)
|
||||
==============================
|
||||
|
||||
The cause of the [performance regression affecting Synapse 1.44](https://github.com/matrix-org/synapse/issues/11049) has been identified and fixed. ([\#11177](https://github.com/matrix-org/synapse/issues/11177))
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Add support for Ubuntu 21.10 "Impish Indri". ([\#11024](https://github.com/matrix-org/synapse/issues/11024))
|
||||
- Port the Password Auth Providers module interface to the new generic interface. ([\#10548](https://github.com/matrix-org/synapse/issues/10548), [\#11180](https://github.com/matrix-org/synapse/issues/11180))
|
||||
- Experimental support for the thread relation defined in [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). ([\#11088](https://github.com/matrix-org/synapse/issues/11088), [\#11181](https://github.com/matrix-org/synapse/issues/11181), [\#11192](https://github.com/matrix-org/synapse/issues/11192))
|
||||
- Users admin API can now also modify user type in addition to allowing it to be set on user creation. ([\#11174](https://github.com/matrix-org/synapse/issues/11174))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Newly-created public rooms are now only assigned an alias if the room's creation has not been blocked by permission settings. Contributed by @AndrewFerr. ([\#10930](https://github.com/matrix-org/synapse/issues/10930))
|
||||
- Fix a long-standing bug which meant that events received over federation were sometimes incorrectly accepted into the room state. ([\#11001](https://github.com/matrix-org/synapse/issues/11001), [\#11009](https://github.com/matrix-org/synapse/issues/11009), [\#11012](https://github.com/matrix-org/synapse/issues/11012))
|
||||
- Fix 500 error on `/messages` when the server accumulates more than 5 backwards extremities at a given depth for a room. ([\#11027](https://github.com/matrix-org/synapse/issues/11027))
|
||||
- Fix a bug where setting a user's `external_id` via the admin API returns 500 and deletes user's existing external mappings if that external ID is already mapped. ([\#11051](https://github.com/matrix-org/synapse/issues/11051))
|
||||
- Fix a long-standing bug where users excluded from the user directory were added into the directory if they belonged to a room which became public or private. ([\#11075](https://github.com/matrix-org/synapse/issues/11075))
|
||||
- Fix a long-standing bug when attempting to preview URLs which are in the `windows-1252` character encoding. ([\#11077](https://github.com/matrix-org/synapse/issues/11077), [\#11089](https://github.com/matrix-org/synapse/issues/11089))
|
||||
- Fix broken export-data admin command and add test script checking the command to CI. ([\#11078](https://github.com/matrix-org/synapse/issues/11078))
|
||||
- Show an error when timestamp in seconds is provided to the `/purge_media_cache` Admin API. ([\#11101](https://github.com/matrix-org/synapse/issues/11101))
|
||||
- Fix local users who left all their rooms being removed from the user directory, even if the `search_all_users` config option was enabled. ([\#11103](https://github.com/matrix-org/synapse/issues/11103))
|
||||
- Fix a bug which caused the module API's `get_user_ip_and_agents` function to always fail on workers. `get_user_ip_and_agents` was introduced in 1.44.0 and did not function correctly on worker processes at the time. ([\#11112](https://github.com/matrix-org/synapse/issues/11112))
|
||||
- Identity server connection is no longer ignoring `ip_range_whitelist`. ([\#11120](https://github.com/matrix-org/synapse/issues/11120))
|
||||
- Fix a bug introduced in Synapse 1.45.0 breaking the configuration file parsing script. ([\#11145](https://github.com/matrix-org/synapse/issues/11145))
|
||||
- Fix a performance regression introduced in 1.44.0 which could cause client requests to time out when making large numbers of outbound requests. ([\#11177](https://github.com/matrix-org/synapse/issues/11177), [\#11190](https://github.com/matrix-org/synapse/issues/11190))
|
||||
- Resolve and share `state_groups` for all [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical events in batch. ([\#10975](https://github.com/matrix-org/synapse/issues/10975))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Fix broken links relating to module API deprecation in the upgrade notes. ([\#11069](https://github.com/matrix-org/synapse/issues/11069))
|
||||
- Add more information about what happens when a user is deactivated. ([\#11083](https://github.com/matrix-org/synapse/issues/11083))
|
||||
- Clarify the the sample log config can be copied from the documentation without issue. ([\#11092](https://github.com/matrix-org/synapse/issues/11092))
|
||||
- Update the admin API documentation with an updated list of the characters allowed in registration tokens. ([\#11093](https://github.com/matrix-org/synapse/issues/11093))
|
||||
- Document Synapse's behaviour when dealing with multiple modules registering the same callbacks and/or handlers for the same HTTP endpoints. ([\#11096](https://github.com/matrix-org/synapse/issues/11096))
|
||||
- Fix instances of `[example]{.title-ref}` in the upgrade documentation as a result of prior RST to Markdown conversion. ([\#11118](https://github.com/matrix-org/synapse/issues/11118))
|
||||
- Document the version of Synapse each module callback was introduced in. ([\#11132](https://github.com/matrix-org/synapse/issues/11132))
|
||||
- Document the version of Synapse that introduced each module API method. ([\#11183](https://github.com/matrix-org/synapse/issues/11183))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
- Fix spurious warnings about losing the logging context on the `ReplicationCommandHandler` when losing the replication connection. ([\#10984](https://github.com/matrix-org/synapse/issues/10984))
|
||||
- Include rejected status when we log events. ([\#11008](https://github.com/matrix-org/synapse/issues/11008))
|
||||
- Add some extra logging to the event persistence code. ([\#11014](https://github.com/matrix-org/synapse/issues/11014))
|
||||
- Rearrange the internal workings of the incremental user directory updates. ([\#11035](https://github.com/matrix-org/synapse/issues/11035))
|
||||
- Fix a long-standing bug where users excluded from the directory could still be added to the `users_who_share_private_rooms` table after a regular user joins a private room. ([\#11143](https://github.com/matrix-org/synapse/issues/11143))
|
||||
- Add and improve type hints. ([\#10972](https://github.com/matrix-org/synapse/issues/10972), [\#11055](https://github.com/matrix-org/synapse/issues/11055), [\#11066](https://github.com/matrix-org/synapse/issues/11066), [\#11076](https://github.com/matrix-org/synapse/issues/11076), [\#11095](https://github.com/matrix-org/synapse/issues/11095), [\#11109](https://github.com/matrix-org/synapse/issues/11109), [\#11121](https://github.com/matrix-org/synapse/issues/11121), [\#11146](https://github.com/matrix-org/synapse/issues/11146))
|
||||
- Mark the Synapse package as containing type annotations and fix export declarations so that Synapse pluggable modules may be type checked against Synapse. ([\#11054](https://github.com/matrix-org/synapse/issues/11054))
|
||||
- Remove dead code from `MediaFilePaths`. ([\#11056](https://github.com/matrix-org/synapse/issues/11056))
|
||||
- Be more lenient when parsing oEmbed response versions. ([\#11065](https://github.com/matrix-org/synapse/issues/11065))
|
||||
- Create a separate module for the retention configuration. ([\#11070](https://github.com/matrix-org/synapse/issues/11070))
|
||||
- Clean up some of the federation event authentication code for clarity. ([\#11115](https://github.com/matrix-org/synapse/issues/11115), [\#11116](https://github.com/matrix-org/synapse/issues/11116), [\#11122](https://github.com/matrix-org/synapse/issues/11122))
|
||||
- Add docstrings and comments to the application service ephemeral event sending code. ([\#11138](https://github.com/matrix-org/synapse/issues/11138))
|
||||
- Update the `sign_json` script to support inline configuration of the signing key. ([\#11139](https://github.com/matrix-org/synapse/issues/11139))
|
||||
- Fix broken link in the docker image README. ([\#11144](https://github.com/matrix-org/synapse/issues/11144))
|
||||
- Always dump logs from unit tests during CI runs. ([\#11068](https://github.com/matrix-org/synapse/issues/11068))
|
||||
- Add tests for `MediaFilePaths` class. ([\#11057](https://github.com/matrix-org/synapse/issues/11057))
|
||||
- Simplify the user admin API tests. ([\#11048](https://github.com/matrix-org/synapse/issues/11048))
|
||||
- Add a test for the workaround introduced in [\#11042](https://github.com/matrix-org/synapse/pull/11042) concerning the behaviour of third-party rule modules and `SynapseError`s. ([\#11071](https://github.com/matrix-org/synapse/issues/11071))
|
||||
|
||||
|
||||
Synapse 1.45.1 (2021-10-20)
|
||||
===========================
|
||||
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
Port the Password Auth Providers module interface to the new generic interface.
|
1
changelog.d/10943.misc
Normal file
1
changelog.d/10943.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add type annotations for the `log_function` decorator.
|
1
changelog.d/10969.bugfix
Normal file
1
changelog.d/10969.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a long-standing bug where messages in the `device_inbox` table for deleted devices would persist indefinitely. Contributed by @dklimpel and @JohannesKleine.
|
|
@ -1 +0,0 @@
|
|||
Add type hints to `synapse.storage.databases.main.client_ips`.
|
|
@ -1 +0,0 @@
|
|||
Resolve and share `state_groups` for all [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical events in batch.
|
|
@ -1 +0,0 @@
|
|||
Fix spurious warnings about losing the logging context on the `ReplicationCommandHandler` when losing the replication connection.
|
|
@ -1 +0,0 @@
|
|||
Fix a long-standing bug which meant that events received over federation were sometimes incorrectly accepted into the room state.
|
|
@ -1 +0,0 @@
|
|||
Include rejected status when we log events.
|
|
@ -1 +0,0 @@
|
|||
Fix a long-standing bug which meant that events received over federation were sometimes incorrectly accepted into the room state.
|
|
@ -1 +0,0 @@
|
|||
Add some extra logging to the event persistence code.
|
|
@ -1 +0,0 @@
|
|||
Add support for Ubuntu 21.10 "Impish Indri".
|
|
@ -1 +0,0 @@
|
|||
Fix 500 error on `/messages` when the server accumulates more than 5 backwards extremities at a given depth for a room.
|
|
@ -1 +0,0 @@
|
|||
Rearrange the internal workings of the incremental user directory updates.
|
|
@ -1 +0,0 @@
|
|||
Simplify the user admin API tests.
|
|
@ -1 +0,0 @@
|
|||
Fix a bug where setting a user's external_id via the admin API returns 500 and deletes users existing external mappings if that external ID is already mapped.
|
|
@ -1 +0,0 @@
|
|||
Mark the Synapse package as containing type annotations and fix export declarations so that Synapse pluggable modules may be type checked against Synapse.
|
|
@ -1 +0,0 @@
|
|||
Improve type hints for `_wrap_in_base_path` decorator used by `MediaFilePaths`.
|
|
@ -1 +0,0 @@
|
|||
Remove dead code from `MediaFilePaths`.
|
|
@ -1 +0,0 @@
|
|||
Add tests for `MediaFilePaths` class.
|
|
@ -1 +0,0 @@
|
|||
Be more lenient when parsing oEmbed response versions.
|
|
@ -1 +0,0 @@
|
|||
Add type hints to `synapse.events`.
|
|
@ -1 +0,0 @@
|
|||
Always dump logs from unit tests during CI runs.
|
|
@ -1 +0,0 @@
|
|||
Fix broken links relating to module API deprecation in the upgrade notes.
|
|
@ -1 +0,0 @@
|
|||
Create a separate module for the retention configuration.
|
|
@ -1 +0,0 @@
|
|||
Add a test for the workaround introduced in [\#11042](https://github.com/matrix-org/synapse/pull/11042) concerning the behaviour of third-party rule modules and `SynapseError`s.
|
|
@ -1 +0,0 @@
|
|||
Fix a long-standing bug where users excluded from the user directory were added into the directory if they belonged to a room which became public or private.
|
|
@ -1 +0,0 @@
|
|||
Fix type hints in the relations tests.
|
|
@ -1 +0,0 @@
|
|||
Fix a long-standing bug when attempting to preview URLs which are in the `windows-1252` character encoding.
|
|
@ -1 +0,0 @@
|
|||
Fix broken export-data admin command and add test script checking the command to CI.
|
|
@ -1 +0,0 @@
|
|||
Add more information about what happens when a user is deactivated.
|
|
@ -1 +0,0 @@
|
|||
Experimental support for the thread relation defined in [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).
|
|
@ -1 +0,0 @@
|
|||
Fix a long-standing bug when attempting to preview URLs which are in the `windows-1252` character encoding.
|
|
@ -1 +0,0 @@
|
|||
Clarify the the sample log config can be copied from the documentation without issue.
|
|
@ -1 +0,0 @@
|
|||
Update the admin API documentation with an updated list of the characters allowed in registration tokens.
|
|
@ -1 +0,0 @@
|
|||
Add type hints to most `HomeServer` parameters.
|
|
@ -1 +0,0 @@
|
|||
Document Synapse's behaviour when dealing with multiple modules registering the same callbacks and/or handlers for the same HTTP endpoints.
|
|
@ -1 +0,0 @@
|
|||
Show an error when timestamp in seconds is provided to the `/purge_media_cache` Admin API.
|
|
@ -1 +0,0 @@
|
|||
Fix local users who left all their rooms being removed from the user directory, even if the "search_all_users" config option was enabled.
|
|
@ -1 +0,0 @@
|
|||
Add missing type hints to `synapse.api` module.
|
|
@ -1 +0,0 @@
|
|||
Clean up some of the federation event authentication code for clarity.
|
|
@ -1 +0,0 @@
|
|||
Clean up some of the federation event authentication code for clarity.
|
|
@ -1 +0,0 @@
|
|||
Fix instances of `[example]{.title-ref}` in the upgrade documentation as a result of prior RST to Markdown conversion.
|
|
@ -1 +0,0 @@
|
|||
Identity server connection is no longer ignoring `ip_range_whitelist`.
|
|
@ -1 +0,0 @@
|
|||
Add type hints for event fetching.
|
|
@ -1 +0,0 @@
|
|||
Clean up some of the federation event authentication code for clarity.
|
1
changelog.d/11126.feature
Normal file
1
changelog.d/11126.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add an `on_new_event` third-party rules callback to allow Synapse modules to act after an event has been sent into a room.
|
|
@ -1 +0,0 @@
|
|||
Document the version of Synapse each module callback was introduced in.
|
|
@ -1 +0,0 @@
|
|||
Add docstrings and comments to the application service ephemeral event sending code.
|
|
@ -1 +0,0 @@
|
|||
Update the `sign_json` script to support inline configuration of the signing key.
|
|
@ -1 +0,0 @@
|
|||
Fix a long-standing bug where users excluded from the directory could still be added to the `users_who_share_private_rooms` table after a regular user joins a private room.
|
|
@ -1 +0,0 @@
|
|||
Fix broken link in the docker image README.
|
|
@ -1 +0,0 @@
|
|||
Fix a bug introduced in Synapse v1.45.0 breaking the configuration file parsing script.
|
|
@ -1 +0,0 @@
|
|||
Add missing type hints to `synapse.crypto`.
|
1
changelog.d/11187.feature
Normal file
1
changelog.d/11187.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Support the stable room type field for [MSC3288](https://github.com/matrix-org/matrix-doc/pull/3288).
|
1
changelog.d/11191.bugfix
Normal file
1
changelog.d/11191.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a bug introduced in Synapse 1.45.0 which prevented the `synapse_review_recent_signups` script from running. Contributed by @samuel-p.
|
1
changelog.d/11194.misc
Normal file
1
changelog.d/11194.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Refactor `Filter` to check different fields depending on the data type.
|
8
debian/changelog
vendored
8
debian/changelog
vendored
|
@ -1,9 +1,15 @@
|
|||
matrix-synapse-py3 (1.45.1+nmu1) UNRELEASED; urgency=medium
|
||||
matrix-synapse-py3 (1.47.0+nmu1) UNRELEASED; urgency=medium
|
||||
|
||||
* Update scripts to pass Shellcheck lints.
|
||||
|
||||
-- root <root@cae79a6e79d7> Fri, 22 Oct 2021 22:20:31 +0000
|
||||
|
||||
matrix-synapse-py3 (1.46.0~rc1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.46.0~rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 26 Oct 2021 14:04:04 +0100
|
||||
|
||||
matrix-synapse-py3 (1.45.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.45.1.
|
||||
|
|
|
@ -50,7 +50,8 @@ It returns a JSON body like the following:
|
|||
"auth_provider": "<provider2>",
|
||||
"external_id": "<user_id_provider_2>"
|
||||
}
|
||||
]
|
||||
],
|
||||
"user_type": null
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -97,7 +98,8 @@ with a body of:
|
|||
],
|
||||
"avatar_url": "<avatar_url>",
|
||||
"admin": false,
|
||||
"deactivated": false
|
||||
"deactivated": false,
|
||||
"user_type": null
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -135,6 +137,9 @@ Body parameters:
|
|||
unchanged on existing accounts and set to `false` for new accounts.
|
||||
A user cannot be erased by deactivating with this API. For details on
|
||||
deactivating users see [Deactivate Account](#deactivate-account).
|
||||
- `user_type` - string or null, optional. If provided, the user type will be
|
||||
adjusted. If `null` given, the user type will be cleared. Other
|
||||
allowed options are: `bot` and `support`.
|
||||
|
||||
If the user already exists then optional parameters default to the current value.
|
||||
|
||||
|
|
|
@ -119,6 +119,27 @@ callback returns `True`, Synapse falls through to the next one. The value of the
|
|||
callback that does not return `True` will be used. If this happens, Synapse will not call
|
||||
any of the subsequent implementations of this callback.
|
||||
|
||||
### `on_new_event`
|
||||
|
||||
_First introduced in Synapse v1.47.0_
|
||||
|
||||
```python
|
||||
async def on_new_event(
|
||||
event: "synapse.events.EventBase",
|
||||
state_events: "synapse.types.StateMap",
|
||||
) -> None:
|
||||
```
|
||||
|
||||
Called after sending an event into a room. The module is passed the event, as well
|
||||
as the state of the room _after_ the event. This means that if the event is a state event,
|
||||
it will be included in this state.
|
||||
|
||||
Note that this callback is called when the event has already been processed and stored
|
||||
into the room, which means this callback cannot be used to deny persisting the event. To
|
||||
deny an incoming event, see [`check_event_for_spam`](spam_checker_callbacks.md#check_event_for_spam) instead.
|
||||
|
||||
If multiple modules implement this callback, Synapse runs them all in order.
|
||||
|
||||
## Example
|
||||
|
||||
The example below is a module that implements the third-party rules callback
|
||||
|
|
|
@ -47,7 +47,7 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.45.1"
|
||||
__version__ = "1.46.0rc1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
|
|
@ -20,7 +20,12 @@ from typing import List
|
|||
|
||||
import attr
|
||||
|
||||
from synapse.config._base import RootConfig, find_config_files, read_config_files
|
||||
from synapse.config._base import (
|
||||
Config,
|
||||
RootConfig,
|
||||
find_config_files,
|
||||
read_config_files,
|
||||
)
|
||||
from synapse.config.database import DatabaseConfig
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
|
||||
from synapse.storage.engines import create_engine
|
||||
|
@ -126,7 +131,7 @@ def main():
|
|||
config_dict,
|
||||
)
|
||||
|
||||
since_ms = time.time() * 1000 - config.parse_duration(config_args.since)
|
||||
since_ms = time.time() * 1000 - Config.parse_duration(config_args.since)
|
||||
exclude_users_with_email = config_args.exclude_emails
|
||||
include_context = not config_args.only_users
|
||||
|
||||
|
|
|
@ -18,7 +18,8 @@ import json
|
|||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Awaitable,
|
||||
Container,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
|
@ -217,19 +218,19 @@ class FilterCollection:
|
|||
return self._filter_json
|
||||
|
||||
def timeline_limit(self) -> int:
|
||||
return self._room_timeline_filter.limit()
|
||||
return self._room_timeline_filter.limit
|
||||
|
||||
def presence_limit(self) -> int:
|
||||
return self._presence_filter.limit()
|
||||
return self._presence_filter.limit
|
||||
|
||||
def ephemeral_limit(self) -> int:
|
||||
return self._room_ephemeral_filter.limit()
|
||||
return self._room_ephemeral_filter.limit
|
||||
|
||||
def lazy_load_members(self) -> bool:
|
||||
return self._room_state_filter.lazy_load_members()
|
||||
return self._room_state_filter.lazy_load_members
|
||||
|
||||
def include_redundant_members(self) -> bool:
|
||||
return self._room_state_filter.include_redundant_members()
|
||||
return self._room_state_filter.include_redundant_members
|
||||
|
||||
def filter_presence(
|
||||
self, events: Iterable[UserPresenceState]
|
||||
|
@ -276,19 +277,25 @@ class Filter:
|
|||
def __init__(self, filter_json: JsonDict):
|
||||
self.filter_json = filter_json
|
||||
|
||||
self.types = self.filter_json.get("types", None)
|
||||
self.not_types = self.filter_json.get("not_types", [])
|
||||
self.limit = filter_json.get("limit", 10)
|
||||
self.lazy_load_members = filter_json.get("lazy_load_members", False)
|
||||
self.include_redundant_members = filter_json.get(
|
||||
"include_redundant_members", False
|
||||
)
|
||||
|
||||
self.rooms = self.filter_json.get("rooms", None)
|
||||
self.not_rooms = self.filter_json.get("not_rooms", [])
|
||||
self.types = filter_json.get("types", None)
|
||||
self.not_types = filter_json.get("not_types", [])
|
||||
|
||||
self.senders = self.filter_json.get("senders", None)
|
||||
self.not_senders = self.filter_json.get("not_senders", [])
|
||||
self.rooms = filter_json.get("rooms", None)
|
||||
self.not_rooms = filter_json.get("not_rooms", [])
|
||||
|
||||
self.contains_url = self.filter_json.get("contains_url", None)
|
||||
self.senders = filter_json.get("senders", None)
|
||||
self.not_senders = filter_json.get("not_senders", [])
|
||||
|
||||
self.labels = self.filter_json.get("org.matrix.labels", None)
|
||||
self.not_labels = self.filter_json.get("org.matrix.not_labels", [])
|
||||
self.contains_url = filter_json.get("contains_url", None)
|
||||
|
||||
self.labels = filter_json.get("org.matrix.labels", None)
|
||||
self.not_labels = filter_json.get("org.matrix.not_labels", [])
|
||||
|
||||
def filters_all_types(self) -> bool:
|
||||
return "*" in self.not_types
|
||||
|
@ -302,76 +309,95 @@ class Filter:
|
|||
def check(self, event: FilterEvent) -> bool:
|
||||
"""Checks whether the filter matches the given event.
|
||||
|
||||
Args:
|
||||
event: The event, account data, or presence to check against this
|
||||
filter.
|
||||
|
||||
Returns:
|
||||
True if the event matches
|
||||
True if the event matches the filter.
|
||||
"""
|
||||
# We usually get the full "events" as dictionaries coming through,
|
||||
# except for presence which actually gets passed around as its own
|
||||
# namedtuple type.
|
||||
if isinstance(event, UserPresenceState):
|
||||
sender: Optional[str] = event.user_id
|
||||
room_id = None
|
||||
ev_type = "m.presence"
|
||||
contains_url = False
|
||||
labels: List[str] = []
|
||||
user_id = event.user_id
|
||||
field_matchers = {
|
||||
"senders": lambda v: user_id == v,
|
||||
"types": lambda v: "m.presence" == v,
|
||||
}
|
||||
return self._check_fields(field_matchers)
|
||||
else:
|
||||
content = event.get("content")
|
||||
# Content is assumed to be a dict below, so ensure it is. This should
|
||||
# always be true for events, but account_data has been allowed to
|
||||
# have non-dict content.
|
||||
if not isinstance(content, dict):
|
||||
content = {}
|
||||
|
||||
sender = event.get("sender", None)
|
||||
if not sender:
|
||||
# Presence events had their 'sender' in content.user_id, but are
|
||||
# now handled above. We don't know if anything else uses this
|
||||
# form. TODO: Check this and probably remove it.
|
||||
content = event.get("content")
|
||||
# account_data has been allowed to have non-dict content, so
|
||||
# check type first
|
||||
if isinstance(content, dict):
|
||||
sender = content.get("user_id")
|
||||
sender = content.get("user_id")
|
||||
|
||||
room_id = event.get("room_id", None)
|
||||
ev_type = event.get("type", None)
|
||||
|
||||
content = event.get("content") or {}
|
||||
# check if there is a string url field in the content for filtering purposes
|
||||
contains_url = isinstance(content.get("url"), str)
|
||||
labels = content.get(EventContentFields.LABELS, [])
|
||||
|
||||
return self.check_fields(room_id, sender, ev_type, labels, contains_url)
|
||||
field_matchers = {
|
||||
"rooms": lambda v: room_id == v,
|
||||
"senders": lambda v: sender == v,
|
||||
"types": lambda v: _matches_wildcard(ev_type, v),
|
||||
"labels": lambda v: v in labels,
|
||||
}
|
||||
|
||||
def check_fields(
|
||||
self,
|
||||
room_id: Optional[str],
|
||||
sender: Optional[str],
|
||||
event_type: Optional[str],
|
||||
labels: Container[str],
|
||||
contains_url: bool,
|
||||
) -> bool:
|
||||
result = self._check_fields(field_matchers)
|
||||
if not result:
|
||||
return result
|
||||
|
||||
contains_url_filter = self.contains_url
|
||||
if contains_url_filter is not None:
|
||||
contains_url = isinstance(content.get("url"), str)
|
||||
if contains_url_filter != contains_url:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _check_fields(self, field_matchers: Dict[str, Callable[[str], bool]]) -> bool:
|
||||
"""Checks whether the filter matches the given event fields.
|
||||
|
||||
Args:
|
||||
field_matchers: A map of attribute name to callable to use for checking
|
||||
particular fields.
|
||||
|
||||
The attribute name and an inverse (not_<attribute name>) must
|
||||
exist on the Filter.
|
||||
|
||||
The callable should return true if the event's value matches the
|
||||
filter's value.
|
||||
|
||||
Returns:
|
||||
True if the event fields match
|
||||
"""
|
||||
literal_keys = {
|
||||
"rooms": lambda v: room_id == v,
|
||||
"senders": lambda v: sender == v,
|
||||
"types": lambda v: _matches_wildcard(event_type, v),
|
||||
"labels": lambda v: v in labels,
|
||||
}
|
||||
|
||||
for name, match_func in literal_keys.items():
|
||||
for name, match_func in field_matchers.items():
|
||||
# If the event matches one of the disallowed values, reject it.
|
||||
not_name = "not_%s" % (name,)
|
||||
disallowed_values = getattr(self, not_name)
|
||||
if any(map(match_func, disallowed_values)):
|
||||
return False
|
||||
|
||||
# Other the event does not match at least one of the allowed values,
|
||||
# reject it.
|
||||
allowed_values = getattr(self, name)
|
||||
if allowed_values is not None:
|
||||
if not any(map(match_func, allowed_values)):
|
||||
return False
|
||||
|
||||
contains_url_filter = self.filter_json.get("contains_url")
|
||||
if contains_url_filter is not None:
|
||||
if contains_url_filter != contains_url:
|
||||
return False
|
||||
|
||||
# Otherwise, accept it.
|
||||
return True
|
||||
|
||||
def filter_rooms(self, room_ids: Iterable[str]) -> Set[str]:
|
||||
|
@ -385,10 +411,10 @@ class Filter:
|
|||
"""
|
||||
room_ids = set(room_ids)
|
||||
|
||||
disallowed_rooms = set(self.filter_json.get("not_rooms", []))
|
||||
disallowed_rooms = set(self.not_rooms)
|
||||
room_ids -= disallowed_rooms
|
||||
|
||||
allowed_rooms = self.filter_json.get("rooms", None)
|
||||
allowed_rooms = self.rooms
|
||||
if allowed_rooms is not None:
|
||||
room_ids &= set(allowed_rooms)
|
||||
|
||||
|
@ -397,15 +423,6 @@ class Filter:
|
|||
def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
|
||||
return list(filter(self.check, events))
|
||||
|
||||
def limit(self) -> int:
|
||||
return self.filter_json.get("limit", 10)
|
||||
|
||||
def lazy_load_members(self) -> bool:
|
||||
return self.filter_json.get("lazy_load_members", False)
|
||||
|
||||
def include_redundant_members(self) -> bool:
|
||||
return self.filter_json.get("include_redundant_members", False)
|
||||
|
||||
def with_room_ids(self, room_ids: Iterable[str]) -> "Filter":
|
||||
"""Returns a new filter with the given room IDs appended.
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import twisted
|
|||
from twisted.internet import defer, error, reactor
|
||||
from twisted.logger import LoggingFile, LogLevel
|
||||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
|
||||
import synapse
|
||||
from synapse.api.constants import MAX_PDU_SIZE
|
||||
|
@ -48,6 +49,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
|
|||
from synapse.metrics.jemalloc import setup_jemalloc_stats
|
||||
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
|
||||
from synapse.util.daemonize import daemonize_process
|
||||
from synapse.util.gai_resolver import GAIResolver
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
@ -338,9 +340,19 @@ async def start(hs: "HomeServer"):
|
|||
Args:
|
||||
hs: homeserver instance
|
||||
"""
|
||||
reactor = hs.get_reactor()
|
||||
|
||||
# We want to use a separate thread pool for the resolver so that large
|
||||
# numbers of DNS requests don't starve out other users of the threadpool.
|
||||
resolver_threadpool = ThreadPool(name="gai_resolver")
|
||||
resolver_threadpool.start()
|
||||
reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop)
|
||||
reactor.installNameResolver(
|
||||
GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool)
|
||||
)
|
||||
|
||||
# Set up the SIGHUP machinery.
|
||||
if hasattr(signal, "SIGHUP"):
|
||||
reactor = hs.get_reactor()
|
||||
|
||||
@wrap_as_background_process("sighup")
|
||||
def handle_sighup(*args, **kwargs):
|
||||
|
|
|
@ -36,6 +36,7 @@ CHECK_THREEPID_CAN_BE_INVITED_CALLBACK = Callable[
|
|||
CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK = Callable[
|
||||
[str, StateMap[EventBase], str], Awaitable[bool]
|
||||
]
|
||||
ON_NEW_EVENT_CALLBACK = Callable[[EventBase, StateMap[EventBase]], Awaitable]
|
||||
|
||||
|
||||
def load_legacy_third_party_event_rules(hs: "HomeServer") -> None:
|
||||
|
@ -152,6 +153,7 @@ class ThirdPartyEventRules:
|
|||
self._check_visibility_can_be_modified_callbacks: List[
|
||||
CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK
|
||||
] = []
|
||||
self._on_new_event_callbacks: List[ON_NEW_EVENT_CALLBACK] = []
|
||||
|
||||
def register_third_party_rules_callbacks(
|
||||
self,
|
||||
|
@ -163,6 +165,7 @@ class ThirdPartyEventRules:
|
|||
check_visibility_can_be_modified: Optional[
|
||||
CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK
|
||||
] = None,
|
||||
on_new_event: Optional[ON_NEW_EVENT_CALLBACK] = None,
|
||||
) -> None:
|
||||
"""Register callbacks from modules for each hook."""
|
||||
if check_event_allowed is not None:
|
||||
|
@ -181,6 +184,9 @@ class ThirdPartyEventRules:
|
|||
check_visibility_can_be_modified,
|
||||
)
|
||||
|
||||
if on_new_event is not None:
|
||||
self._on_new_event_callbacks.append(on_new_event)
|
||||
|
||||
async def check_event_allowed(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> Tuple[bool, Optional[dict]]:
|
||||
|
@ -321,6 +327,31 @@ class ThirdPartyEventRules:
|
|||
|
||||
return True
|
||||
|
||||
async def on_new_event(self, event_id: str) -> None:
|
||||
"""Let modules act on events after they've been sent (e.g. auto-accepting
|
||||
invites, etc.)
|
||||
|
||||
Args:
|
||||
event_id: The ID of the event.
|
||||
|
||||
Raises:
|
||||
ModuleFailureError if a callback raised any exception.
|
||||
"""
|
||||
# Bail out early without hitting the store if we don't have any callbacks
|
||||
if len(self._on_new_event_callbacks) == 0:
|
||||
return
|
||||
|
||||
event = await self.store.get_event(event_id)
|
||||
state_events = await self._get_state_map_for_room(event.room_id)
|
||||
|
||||
for callback in self._on_new_event_callbacks:
|
||||
try:
|
||||
await callback(event, state_events)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to run module API callback %s: %s", callback, e
|
||||
)
|
||||
|
||||
async def _get_state_map_for_room(self, room_id: str) -> StateMap[EventBase]:
|
||||
"""Given a room ID, return the state events of that room.
|
||||
|
||||
|
|
|
@ -227,7 +227,7 @@ class FederationClient(FederationBase):
|
|||
)
|
||||
|
||||
async def backfill(
|
||||
self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
|
||||
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
|
||||
) -> Optional[List[EventBase]]:
|
||||
"""Requests some more historic PDUs for the given room from the
|
||||
given destination server.
|
||||
|
@ -237,6 +237,8 @@ class FederationClient(FederationBase):
|
|||
room_id: The room_id to backfill.
|
||||
limit: The maximum number of events to return.
|
||||
extremities: our current backwards extremities, to backfill from
|
||||
Must be a Collection that is falsy when empty.
|
||||
(Iterable is not enough here!)
|
||||
"""
|
||||
logger.debug("backfill extrem=%s", extremities)
|
||||
|
||||
|
@ -250,11 +252,22 @@ class FederationClient(FederationBase):
|
|||
|
||||
logger.debug("backfill transaction_data=%r", transaction_data)
|
||||
|
||||
if not isinstance(transaction_data, dict):
|
||||
# TODO we probably want an exception type specific to federation
|
||||
# client validation.
|
||||
raise TypeError("Backfill transaction_data is not a dict.")
|
||||
|
||||
transaction_data_pdus = transaction_data.get("pdus")
|
||||
if not isinstance(transaction_data_pdus, list):
|
||||
# TODO we probably want an exception type specific to federation
|
||||
# client validation.
|
||||
raise TypeError("transaction_data.pdus is not a list.")
|
||||
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
|
||||
pdus = [
|
||||
event_from_pdu_json(p, room_version, outlier=False)
|
||||
for p in transaction_data["pdus"]
|
||||
for p in transaction_data_pdus
|
||||
]
|
||||
|
||||
# Check signatures and hash of pdus, removing any from the list that fail checks
|
||||
|
|
|
@ -295,14 +295,16 @@ class FederationServer(FederationBase):
|
|||
Returns:
|
||||
HTTP response code and body
|
||||
"""
|
||||
response = await self.transaction_actions.have_responded(origin, transaction)
|
||||
existing_response = await self.transaction_actions.have_responded(
|
||||
origin, transaction
|
||||
)
|
||||
|
||||
if response:
|
||||
if existing_response:
|
||||
logger.debug(
|
||||
"[%s] We've already responded to this request",
|
||||
transaction.transaction_id,
|
||||
)
|
||||
return response
|
||||
return existing_response
|
||||
|
||||
logger.debug("[%s] Transaction is new", transaction.transaction_id)
|
||||
|
||||
|
@ -632,7 +634,7 @@ class FederationServer(FederationBase):
|
|||
|
||||
async def on_make_knock_request(
|
||||
self, origin: str, room_id: str, user_id: str, supported_versions: List[str]
|
||||
) -> Dict[str, Union[EventBase, str]]:
|
||||
) -> JsonDict:
|
||||
"""We've received a /make_knock/ request, so we create a partial knock
|
||||
event for the room and hand that back, along with the room version, to the knocking
|
||||
homeserver. We do *not* persist or process this event until the other server has
|
||||
|
|
|
@ -149,7 +149,6 @@ class TransactionManager:
|
|||
)
|
||||
except HttpResponseException as e:
|
||||
code = e.code
|
||||
response = e.response
|
||||
|
||||
set_tag(tags.ERROR, True)
|
||||
|
||||
|
|
|
@ -15,7 +15,19 @@
|
|||
|
||||
import logging
|
||||
import urllib
|
||||
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Collection,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
import attr
|
||||
import ijson
|
||||
|
@ -100,7 +112,7 @@ class TransportLayerClient:
|
|||
|
||||
@log_function
|
||||
async def backfill(
|
||||
self, destination: str, room_id: str, event_tuples: Iterable[str], limit: int
|
||||
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
|
||||
) -> Optional[JsonDict]:
|
||||
"""Requests `limit` previous PDUs in a given context before list of
|
||||
PDUs.
|
||||
|
@ -108,7 +120,9 @@ class TransportLayerClient:
|
|||
Args:
|
||||
destination
|
||||
room_id
|
||||
event_tuples
|
||||
event_tuples:
|
||||
Must be a Collection that is falsy when empty.
|
||||
(Iterable is not enough here!)
|
||||
limit
|
||||
|
||||
Returns:
|
||||
|
@ -786,7 +800,7 @@ class TransportLayerClient:
|
|||
@log_function
|
||||
def join_group(
|
||||
self, destination: str, group_id: str, user_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
) -> Awaitable[JsonDict]:
|
||||
"""Attempts to join a group"""
|
||||
path = _create_v1_path("/groups/%s/users/%s/join", group_id, user_id)
|
||||
|
||||
|
|
|
@ -62,7 +62,6 @@ from synapse.http.server import finish_request, respond_with_html
|
|||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.storage.roommember import ProfileInfo
|
||||
from synapse.types import JsonDict, Requester, UserID
|
||||
from synapse.util import stringutils as stringutils
|
||||
|
@ -73,6 +72,7 @@ from synapse.util.stringutils import base62_encode
|
|||
from synapse.util.threepids import canonicalise_email
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.rest.client.login import LoginResponse
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
@ -1818,7 +1818,9 @@ def load_legacy_password_auth_providers(hs: "HomeServer") -> None:
|
|||
|
||||
|
||||
def load_single_legacy_password_auth_provider(
|
||||
module: Type, config: JsonDict, api: ModuleApi
|
||||
module: Type,
|
||||
config: JsonDict,
|
||||
api: "ModuleApi",
|
||||
) -> None:
|
||||
try:
|
||||
provider = module(config=config, account_handler=api)
|
||||
|
|
|
@ -145,7 +145,7 @@ class DirectoryHandler:
|
|||
if not self.config.roomdirectory.is_alias_creation_allowed(
|
||||
user_id, room_id, room_alias_str
|
||||
):
|
||||
# Lets just return a generic message, as there may be all sorts of
|
||||
# Let's just return a generic message, as there may be all sorts of
|
||||
# reasons why we said no. TODO: Allow configurable error messages
|
||||
# per alias creation rule?
|
||||
raise SynapseError(403, "Not allowed to create alias")
|
||||
|
@ -245,7 +245,7 @@ class DirectoryHandler:
|
|||
servers = result.servers
|
||||
else:
|
||||
try:
|
||||
fed_result = await self.federation.make_query(
|
||||
fed_result: Optional[JsonDict] = await self.federation.make_query(
|
||||
destination=room_alias.domain,
|
||||
query_type="directory",
|
||||
args={"room_alias": room_alias.to_string()},
|
||||
|
@ -461,7 +461,7 @@ class DirectoryHandler:
|
|||
if not self.config.roomdirectory.is_publishing_room_allowed(
|
||||
user_id, room_id, room_aliases
|
||||
):
|
||||
# Lets just return a generic message, as there may be all sorts of
|
||||
# Let's just return a generic message, as there may be all sorts of
|
||||
# reasons why we said no. TODO: Allow configurable error messages
|
||||
# per alias creation rule?
|
||||
raise SynapseError(403, "Not allowed to publish room")
|
||||
|
|
|
@ -361,6 +361,7 @@ class FederationEventHandler:
|
|||
# need to.
|
||||
await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
|
||||
|
||||
await self._check_for_soft_fail(event, None, origin=origin)
|
||||
await self._run_push_actions_and_persist_event(event, context)
|
||||
return event, context
|
||||
|
||||
|
@ -402,29 +403,28 @@ class FederationEventHandler:
|
|||
"""Persists the events returned by a send_join
|
||||
|
||||
Checks the auth chain is valid (and passes auth checks) for the
|
||||
state and event. Then persists the auth chain and state atomically.
|
||||
Persists the event separately. Notifies about the persisted events
|
||||
where appropriate.
|
||||
|
||||
Will attempt to fetch missing auth events.
|
||||
state and event. Then persists all of the events.
|
||||
Notifies about the persisted events where appropriate.
|
||||
|
||||
Args:
|
||||
origin: Where the events came from
|
||||
room_id,
|
||||
room_id:
|
||||
auth_events
|
||||
state
|
||||
event
|
||||
room_version: The room version we expect this room to have, and
|
||||
will raise if it doesn't match the version in the create event.
|
||||
|
||||
Returns:
|
||||
The stream ID after which all events have been persisted.
|
||||
|
||||
Raises:
|
||||
SynapseError if the response is in some way invalid.
|
||||
"""
|
||||
events_to_context = {}
|
||||
for e in itertools.chain(auth_events, state):
|
||||
e.internal_metadata.outlier = True
|
||||
events_to_context[e.event_id] = EventContext.for_outlier()
|
||||
|
||||
event_map = {
|
||||
e.event_id: e for e in itertools.chain(auth_events, state, [event])
|
||||
}
|
||||
event_map = {e.event_id: e for e in itertools.chain(auth_events, state)}
|
||||
|
||||
create_event = None
|
||||
for e in auth_events:
|
||||
|
@ -444,68 +444,40 @@ class FederationEventHandler:
|
|||
if room_version.identifier != room_version_id:
|
||||
raise SynapseError(400, "Room version mismatch")
|
||||
|
||||
missing_auth_events = set()
|
||||
for e in itertools.chain(auth_events, state, [event]):
|
||||
for e_id in e.auth_event_ids():
|
||||
if e_id not in event_map:
|
||||
missing_auth_events.add(e_id)
|
||||
# filter out any events we have already seen
|
||||
seen_remotes = await self._store.have_seen_events(room_id, event_map.keys())
|
||||
for s in seen_remotes:
|
||||
event_map.pop(s, None)
|
||||
|
||||
for e_id in missing_auth_events:
|
||||
m_ev = await self._federation_client.get_pdu(
|
||||
[origin],
|
||||
e_id,
|
||||
room_version=room_version,
|
||||
outlier=True,
|
||||
timeout=10000,
|
||||
)
|
||||
if m_ev and m_ev.event_id == e_id:
|
||||
event_map[e_id] = m_ev
|
||||
else:
|
||||
logger.info("Failed to find auth event %r", e_id)
|
||||
# persist the auth chain and state events.
|
||||
#
|
||||
# any invalid events here will be marked as rejected, and we'll carry on.
|
||||
#
|
||||
# any events whose auth events are missing (ie, not in the send_join response,
|
||||
# and not already in our db) will just be ignored. This is correct behaviour,
|
||||
# because the reason that auth_events are missing might be due to us being
|
||||
# unable to validate their signatures. The fact that we can't validate their
|
||||
# signatures right now doesn't mean that we will *never* be able to, so it
|
||||
# is premature to reject them.
|
||||
#
|
||||
await self._auth_and_persist_outliers(room_id, event_map.values())
|
||||
|
||||
for e in itertools.chain(auth_events, state, [event]):
|
||||
auth_for_e = [
|
||||
event_map[e_id] for e_id in e.auth_event_ids() if e_id in event_map
|
||||
]
|
||||
if create_event:
|
||||
auth_for_e.append(create_event)
|
||||
|
||||
try:
|
||||
validate_event_for_room_version(room_version, e)
|
||||
check_auth_rules_for_event(room_version, e, auth_for_e)
|
||||
except SynapseError as err:
|
||||
# we may get SynapseErrors here as well as AuthErrors. For
|
||||
# instance, there are a couple of (ancient) events in some
|
||||
# rooms whose senders do not have the correct sigil; these
|
||||
# cause SynapseErrors in auth.check. We don't want to give up
|
||||
# the attempt to federate altogether in such cases.
|
||||
|
||||
logger.warning("Rejecting %s because %s", e.event_id, err.msg)
|
||||
|
||||
if e == event:
|
||||
raise
|
||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||
|
||||
if auth_events or state:
|
||||
await self.persist_events_and_notify(
|
||||
room_id,
|
||||
[
|
||||
(e, events_to_context[e.event_id])
|
||||
for e in itertools.chain(auth_events, state)
|
||||
],
|
||||
# and now persist the join event itself.
|
||||
logger.info("Peristing join-via-remote %s", event)
|
||||
with nested_logging_context(suffix=event.event_id):
|
||||
context = await self._state_handler.compute_event_context(
|
||||
event, old_state=state
|
||||
)
|
||||
|
||||
new_event_context = await self._state_handler.compute_event_context(
|
||||
event, old_state=state
|
||||
)
|
||||
context = await self._check_event_auth(origin, event, context)
|
||||
if context.rejected:
|
||||
raise SynapseError(400, "Join event was rejected")
|
||||
|
||||
return await self.persist_events_and_notify(
|
||||
room_id, [(event, new_event_context)]
|
||||
)
|
||||
return await self.persist_events_and_notify(room_id, [(event, context)])
|
||||
|
||||
@log_function
|
||||
async def backfill(
|
||||
self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
|
||||
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
|
||||
) -> None:
|
||||
"""Trigger a backfill request to `dest` for the given `room_id`
|
||||
|
||||
|
@ -974,9 +946,15 @@ class FederationEventHandler:
|
|||
) -> None:
|
||||
"""Called when we have a new non-outlier event.
|
||||
|
||||
This is called when we have a new event to add to the room DAG - either directly
|
||||
via a /send request, retrieved via get_missing_events after a /send request, or
|
||||
backfilled after a client request.
|
||||
This is called when we have a new event to add to the room DAG. This can be
|
||||
due to:
|
||||
* events received directly via a /send request
|
||||
* events retrieved via get_missing_events after a /send request
|
||||
* events backfilled after a client request.
|
||||
|
||||
It's not currently used for events received from incoming send_{join,knock,leave}
|
||||
requests (which go via on_send_membership_event), nor for joins created by a
|
||||
remote join dance (which go via process_remote_join).
|
||||
|
||||
We need to do auth checks and put it through the StateHandler.
|
||||
|
||||
|
@ -1012,11 +990,19 @@ class FederationEventHandler:
|
|||
logger.exception("Unexpected AuthError from _check_event_auth")
|
||||
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
|
||||
|
||||
if not backfilled and not context.rejected:
|
||||
# For new (non-backfilled and non-outlier) events we check if the event
|
||||
# passes auth based on the current state. If it doesn't then we
|
||||
# "soft-fail" the event.
|
||||
await self._check_for_soft_fail(event, state, origin=origin)
|
||||
|
||||
await self._run_push_actions_and_persist_event(event, context, backfilled)
|
||||
|
||||
if backfilled:
|
||||
if backfilled or context.rejected:
|
||||
return
|
||||
|
||||
await self._maybe_kick_guest_users(event)
|
||||
|
||||
# For encrypted messages we check that we know about the sending device,
|
||||
# if we don't then we mark the device cache for that user as stale.
|
||||
if event.type == EventTypes.Encrypted:
|
||||
|
@ -1317,14 +1303,14 @@ class FederationEventHandler:
|
|||
for auth_event_id in event.auth_event_ids():
|
||||
ae = persisted_events.get(auth_event_id)
|
||||
if not ae:
|
||||
logger.warning(
|
||||
"Event %s relies on auth_event %s, which could not be found.",
|
||||
event,
|
||||
auth_event_id,
|
||||
)
|
||||
# the fact we can't find the auth event doesn't mean it doesn't
|
||||
# exist, which means it is premature to reject `event`. Instead we
|
||||
# just ignore it for now.
|
||||
logger.warning(
|
||||
"Dropping event %s, which relies on auth_event %s, which could not be found",
|
||||
event,
|
||||
auth_event_id,
|
||||
)
|
||||
return None
|
||||
auth.append(ae)
|
||||
|
||||
|
@ -1447,10 +1433,6 @@ class FederationEventHandler:
|
|||
except AuthError as e:
|
||||
logger.warning("Failed auth resolution for %r because %s", event, e)
|
||||
context.rejected = RejectedReason.AUTH_ERROR
|
||||
return context
|
||||
|
||||
await self._check_for_soft_fail(event, state, backfilled, origin=origin)
|
||||
await self._maybe_kick_guest_users(event)
|
||||
|
||||
return context
|
||||
|
||||
|
@ -1470,7 +1452,6 @@ class FederationEventHandler:
|
|||
self,
|
||||
event: EventBase,
|
||||
state: Optional[Iterable[EventBase]],
|
||||
backfilled: bool,
|
||||
origin: str,
|
||||
) -> None:
|
||||
"""Checks if we should soft fail the event; if so, marks the event as
|
||||
|
@ -1479,15 +1460,8 @@ class FederationEventHandler:
|
|||
Args:
|
||||
event
|
||||
state: The state at the event if we don't have all the event's prev events
|
||||
backfilled: Whether the event is from backfill
|
||||
origin: The host the event originates from.
|
||||
"""
|
||||
# For new (non-backfilled and non-outlier) events we check if the event
|
||||
# passes auth based on the current state. If it doesn't then we
|
||||
# "soft-fail" the event.
|
||||
if backfilled or event.internal_metadata.is_outlier():
|
||||
return
|
||||
|
||||
extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
|
||||
extrem_ids = set(extrem_ids_list)
|
||||
prev_event_ids = set(event.prev_event_ids())
|
||||
|
@ -1942,7 +1916,7 @@ class FederationEventHandler:
|
|||
event_pos = PersistedEventPosition(
|
||||
self._instance_name, event.internal_metadata.stream_ordering
|
||||
)
|
||||
self._notifier.on_new_room_event(
|
||||
await self._notifier.on_new_room_event(
|
||||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
|
||||
|
|
|
@ -879,6 +879,8 @@ class IdentityHandler:
|
|||
}
|
||||
|
||||
if room_type is not None:
|
||||
invite_config["room_type"] = room_type
|
||||
# TODO The unstable field is deprecated and should be removed in the future.
|
||||
invite_config["org.matrix.msc3288.room_type"] = room_type
|
||||
|
||||
# If a custom web client location is available, include it in the request.
|
||||
|
|
|
@ -1537,13 +1537,16 @@ class EventCreationHandler:
|
|||
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||
self._message_handler.maybe_schedule_expiry(event)
|
||||
|
||||
def _notify() -> None:
|
||||
async def _notify() -> None:
|
||||
try:
|
||||
self.notifier.on_new_room_event(
|
||||
await self.notifier.on_new_room_event(
|
||||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying about new room event")
|
||||
logger.exception(
|
||||
"Error notifying about new room event %s",
|
||||
event.event_id,
|
||||
)
|
||||
|
||||
run_in_background(_notify)
|
||||
|
||||
|
|
|
@ -438,7 +438,7 @@ class PaginationHandler:
|
|||
}
|
||||
|
||||
state = None
|
||||
if event_filter and event_filter.lazy_load_members() and len(events) > 0:
|
||||
if event_filter and event_filter.lazy_load_members and len(events) > 0:
|
||||
# TODO: remove redundant members
|
||||
|
||||
# FIXME: we also care about invite targets etc.
|
||||
|
|
|
@ -52,6 +52,7 @@ import synapse.metrics
|
|||
from synapse.api.constants import EventTypes, Membership, PresenceState
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.events.presence_router import PresenceRouter
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.utils import log_function
|
||||
|
@ -1551,6 +1552,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
|||
is_guest: bool = False,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
include_offline: bool = True,
|
||||
service: Optional[ApplicationService] = None,
|
||||
) -> Tuple[List[UserPresenceState], int]:
|
||||
# The process for getting presence events are:
|
||||
# 1. Get the rooms the user is in.
|
||||
|
|
|
@ -456,7 +456,11 @@ class ProfileHandler:
|
|||
continue
|
||||
|
||||
new_name = profile.get("displayname")
|
||||
if not isinstance(new_name, str):
|
||||
new_name = None
|
||||
new_avatar = profile.get("avatar_url")
|
||||
if not isinstance(new_avatar, str):
|
||||
new_avatar = None
|
||||
|
||||
# We always hit update to update the last_check timestamp
|
||||
await self.store.update_remote_profile_cache(user_id, new_name, new_avatar)
|
||||
|
|
|
@ -773,6 +773,15 @@ class RoomCreationHandler:
|
|||
if not allowed_by_third_party_rules:
|
||||
raise SynapseError(403, "Room visibility value not allowed.")
|
||||
|
||||
if is_public:
|
||||
if not self.config.roomdirectory.is_publishing_room_allowed(
|
||||
user_id, room_id, room_alias
|
||||
):
|
||||
# Let's just return a generic message, as there may be all sorts of
|
||||
# reasons why we said no. TODO: Allow configurable error messages
|
||||
# per alias creation rule?
|
||||
raise SynapseError(403, "Not allowed to publish room")
|
||||
|
||||
directory_handler = self.hs.get_directory_handler()
|
||||
if room_alias:
|
||||
await directory_handler.create_association(
|
||||
|
@ -783,15 +792,6 @@ class RoomCreationHandler:
|
|||
check_membership=False,
|
||||
)
|
||||
|
||||
if is_public:
|
||||
if not self.config.roomdirectory.is_publishing_room_allowed(
|
||||
user_id, room_id, room_alias
|
||||
):
|
||||
# Lets just return a generic message, as there may be all sorts of
|
||||
# reasons why we said no. TODO: Allow configurable error messages
|
||||
# per alias creation rule?
|
||||
raise SynapseError(403, "Not allowed to publish room")
|
||||
|
||||
preset_config = config.get(
|
||||
"preset",
|
||||
RoomCreationPreset.PRIVATE_CHAT
|
||||
|
@ -1173,7 +1173,7 @@ class RoomContextHandler:
|
|||
else:
|
||||
last_event_id = event_id
|
||||
|
||||
if event_filter and event_filter.lazy_load_members():
|
||||
if event_filter and event_filter.lazy_load_members:
|
||||
state_filter = StateFilter.from_lazy_load_member_list(
|
||||
ev.sender
|
||||
for ev in itertools.chain(
|
||||
|
|
|
@ -249,7 +249,7 @@ class SearchHandler:
|
|||
)
|
||||
|
||||
events.sort(key=lambda e: -rank_map[e.event_id])
|
||||
allowed_events = events[: search_filter.limit()]
|
||||
allowed_events = events[: search_filter.limit]
|
||||
|
||||
for e in allowed_events:
|
||||
rm = room_groups.setdefault(
|
||||
|
@ -271,13 +271,13 @@ class SearchHandler:
|
|||
# We keep looping and we keep filtering until we reach the limit
|
||||
# or we run out of things.
|
||||
# But only go around 5 times since otherwise synapse will be sad.
|
||||
while len(room_events) < search_filter.limit() and i < 5:
|
||||
while len(room_events) < search_filter.limit and i < 5:
|
||||
i += 1
|
||||
search_result = await self.store.search_rooms(
|
||||
room_ids,
|
||||
search_term,
|
||||
keys,
|
||||
search_filter.limit() * 2,
|
||||
search_filter.limit * 2,
|
||||
pagination_token=pagination_token,
|
||||
)
|
||||
|
||||
|
@ -299,9 +299,9 @@ class SearchHandler:
|
|||
)
|
||||
|
||||
room_events.extend(events)
|
||||
room_events = room_events[: search_filter.limit()]
|
||||
room_events = room_events[: search_filter.limit]
|
||||
|
||||
if len(results) < search_filter.limit() * 2:
|
||||
if len(results) < search_filter.limit * 2:
|
||||
pagination_token = None
|
||||
break
|
||||
else:
|
||||
|
@ -311,7 +311,7 @@ class SearchHandler:
|
|||
group = room_groups.setdefault(event.room_id, {"results": []})
|
||||
group["results"].append(event.event_id)
|
||||
|
||||
if room_events and len(room_events) >= search_filter.limit():
|
||||
if room_events and len(room_events) >= search_filter.limit:
|
||||
last_event_id = room_events[-1].event_id
|
||||
pagination_token = results_map[last_event_id]["pagination_token"]
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import logging
|
||||
from functools import wraps
|
||||
from inspect import getcallargs
|
||||
from typing import Callable, TypeVar, cast
|
||||
|
||||
_TIME_FUNC_ID = 0
|
||||
|
||||
|
@ -41,7 +42,10 @@ def _log_debug_as_f(f, msg, msg_args):
|
|||
logger.handle(record)
|
||||
|
||||
|
||||
def log_function(f):
|
||||
F = TypeVar("F", bound=Callable)
|
||||
|
||||
|
||||
def log_function(f: F) -> F:
|
||||
"""Function decorator that logs every call to that function."""
|
||||
func_name = f.__name__
|
||||
|
||||
|
@ -69,4 +73,4 @@ def log_function(f):
|
|||
return f(*args, **kwargs)
|
||||
|
||||
wrapped.__name__ = func_name
|
||||
return wrapped
|
||||
return cast(F, wrapped)
|
||||
|
|
|
@ -46,6 +46,7 @@ from synapse.http.site import SynapseRequest
|
|||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.client.login import LoginResponse
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main.roommember import ProfileInfo
|
||||
from synapse.storage.state import StateFilter
|
||||
|
@ -61,6 +62,7 @@ from synapse.util import Clock
|
|||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.generic_worker import GenericWorkerSlavedStore
|
||||
from synapse.server import HomeServer
|
||||
|
||||
"""
|
||||
|
@ -111,7 +113,9 @@ class ModuleApi:
|
|||
def __init__(self, hs: "HomeServer", auth_handler):
|
||||
self._hs = hs
|
||||
|
||||
self._store = hs.get_datastore()
|
||||
# TODO: Fix this type hint once the types for the data stores have been ironed
|
||||
# out.
|
||||
self._store: Union[DataStore, "GenericWorkerSlavedStore"] = hs.get_datastore()
|
||||
self._auth = hs.get_auth()
|
||||
self._auth_handler = auth_handler
|
||||
self._server_name = hs.hostname
|
||||
|
@ -150,27 +154,42 @@ class ModuleApi:
|
|||
|
||||
@property
|
||||
def register_spam_checker_callbacks(self):
|
||||
"""Registers callbacks for spam checking capabilities."""
|
||||
"""Registers callbacks for spam checking capabilities.
|
||||
|
||||
Added in Synapse v1.37.0.
|
||||
"""
|
||||
return self._spam_checker.register_callbacks
|
||||
|
||||
@property
|
||||
def register_account_validity_callbacks(self):
|
||||
"""Registers callbacks for account validity capabilities."""
|
||||
"""Registers callbacks for account validity capabilities.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
"""
|
||||
return self._account_validity_handler.register_account_validity_callbacks
|
||||
|
||||
@property
|
||||
def register_third_party_rules_callbacks(self):
|
||||
"""Registers callbacks for third party event rules capabilities."""
|
||||
"""Registers callbacks for third party event rules capabilities.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
"""
|
||||
return self._third_party_event_rules.register_third_party_rules_callbacks
|
||||
|
||||
@property
|
||||
def register_presence_router_callbacks(self):
|
||||
"""Registers callbacks for presence router capabilities."""
|
||||
"""Registers callbacks for presence router capabilities.
|
||||
|
||||
Added in Synapse v1.42.0.
|
||||
"""
|
||||
return self._presence_router.register_presence_router_callbacks
|
||||
|
||||
@property
|
||||
def register_password_auth_provider_callbacks(self):
|
||||
"""Registers callbacks for password auth provider capabilities."""
|
||||
"""Registers callbacks for password auth provider capabilities.
|
||||
|
||||
Added in Synapse v1.46.0.
|
||||
"""
|
||||
return self._password_auth_provider.register_password_auth_provider_callbacks
|
||||
|
||||
def register_web_resource(self, path: str, resource: IResource):
|
||||
|
@ -181,6 +200,8 @@ class ModuleApi:
|
|||
If multiple modules register a resource for the same path, the module that
|
||||
appears the highest in the configuration file takes priority.
|
||||
|
||||
Added in Synapse v1.37.0.
|
||||
|
||||
Args:
|
||||
path: The path to register the resource for.
|
||||
resource: The resource to attach to this path.
|
||||
|
@ -195,6 +216,8 @@ class ModuleApi:
|
|||
"""Allows making outbound HTTP requests to remote resources.
|
||||
|
||||
An instance of synapse.http.client.SimpleHttpClient
|
||||
|
||||
Added in Synapse v1.22.0.
|
||||
"""
|
||||
return self._http_client
|
||||
|
||||
|
@ -204,22 +227,32 @@ class ModuleApi:
|
|||
public room list.
|
||||
|
||||
An instance of synapse.module_api.PublicRoomListManager
|
||||
|
||||
Added in Synapse v1.22.0.
|
||||
"""
|
||||
return self._public_room_list_manager
|
||||
|
||||
@property
|
||||
def public_baseurl(self) -> str:
|
||||
"""The configured public base URL for this homeserver."""
|
||||
"""The configured public base URL for this homeserver.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
"""
|
||||
return self._hs.config.server.public_baseurl
|
||||
|
||||
@property
|
||||
def email_app_name(self) -> str:
|
||||
"""The application name configured in the homeserver's configuration."""
|
||||
"""The application name configured in the homeserver's configuration.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
"""
|
||||
return self._hs.config.email.email_app_name
|
||||
|
||||
async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
|
||||
"""Get user info by user_id
|
||||
|
||||
Added in Synapse v1.41.0.
|
||||
|
||||
Args:
|
||||
user_id: Fully qualified user id.
|
||||
Returns:
|
||||
|
@ -235,6 +268,8 @@ class ModuleApi:
|
|||
) -> Requester:
|
||||
"""Check the access_token provided for a request
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
|
||||
Args:
|
||||
req: Incoming HTTP request
|
||||
allow_guest: True if guest users should be allowed. If this
|
||||
|
@ -260,6 +295,8 @@ class ModuleApi:
|
|||
async def is_user_admin(self, user_id: str) -> bool:
|
||||
"""Checks if a user is a server admin.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
|
||||
Args:
|
||||
user_id: The Matrix ID of the user to check.
|
||||
|
||||
|
@ -274,6 +311,8 @@ class ModuleApi:
|
|||
Takes a user id provided by the user and adds the @ and :domain to
|
||||
qualify it, if necessary
|
||||
|
||||
Added in Synapse v0.25.0.
|
||||
|
||||
Args:
|
||||
username (str): provided user id
|
||||
|
||||
|
@ -287,6 +326,8 @@ class ModuleApi:
|
|||
async def get_profile_for_user(self, localpart: str) -> ProfileInfo:
|
||||
"""Look up the profile info for the user with the given localpart.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
|
||||
Args:
|
||||
localpart: The localpart to look up profile information for.
|
||||
|
||||
|
@ -299,6 +340,8 @@ class ModuleApi:
|
|||
"""Look up the threepids (email addresses and phone numbers) associated with the
|
||||
given Matrix user ID.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
|
||||
Args:
|
||||
user_id: The Matrix user ID to look up threepids for.
|
||||
|
||||
|
@ -313,6 +356,8 @@ class ModuleApi:
|
|||
def check_user_exists(self, user_id):
|
||||
"""Check if user exists.
|
||||
|
||||
Added in Synapse v0.25.0.
|
||||
|
||||
Args:
|
||||
user_id (str): Complete @user:id
|
||||
|
||||
|
@ -332,6 +377,8 @@ class ModuleApi:
|
|||
return that device to the user. Prefer separate calls to register_user and
|
||||
register_device.
|
||||
|
||||
Added in Synapse v0.25.0.
|
||||
|
||||
Args:
|
||||
localpart (str): The localpart of the new user.
|
||||
displayname (str|None): The displayname of the new user.
|
||||
|
@ -352,6 +399,8 @@ class ModuleApi:
|
|||
):
|
||||
"""Registers a new user with given localpart and optional displayname, emails.
|
||||
|
||||
Added in Synapse v1.2.0.
|
||||
|
||||
Args:
|
||||
localpart (str): The localpart of the new user.
|
||||
displayname (str|None): The displayname of the new user.
|
||||
|
@ -375,6 +424,8 @@ class ModuleApi:
|
|||
def register_device(self, user_id, device_id=None, initial_display_name=None):
|
||||
"""Register a device for a user and generate an access token.
|
||||
|
||||
Added in Synapse v1.2.0.
|
||||
|
||||
Args:
|
||||
user_id (str): full canonical @user:id
|
||||
device_id (str|None): The device ID to check, or None to generate
|
||||
|
@ -398,6 +449,8 @@ class ModuleApi:
|
|||
) -> defer.Deferred:
|
||||
"""Record a mapping from an external user id to a mxid
|
||||
|
||||
Added in Synapse v1.9.0.
|
||||
|
||||
Args:
|
||||
auth_provider: identifier for the remote auth provider
|
||||
external_id: id on that system
|
||||
|
@ -417,6 +470,8 @@ class ModuleApi:
|
|||
) -> str:
|
||||
"""Generate a login token suitable for m.login.token authentication
|
||||
|
||||
Added in Synapse v1.9.0.
|
||||
|
||||
Args:
|
||||
user_id: gives the ID of the user that the token is for
|
||||
|
||||
|
@ -436,6 +491,8 @@ class ModuleApi:
|
|||
def invalidate_access_token(self, access_token):
|
||||
"""Invalidate an access token for a user
|
||||
|
||||
Added in Synapse v0.25.0.
|
||||
|
||||
Args:
|
||||
access_token(str): access token
|
||||
|
||||
|
@ -466,6 +523,8 @@ class ModuleApi:
|
|||
def run_db_interaction(self, desc, func, *args, **kwargs):
|
||||
"""Run a function with a database connection
|
||||
|
||||
Added in Synapse v0.25.0.
|
||||
|
||||
Args:
|
||||
desc (str): description for the transaction, for metrics etc
|
||||
func (func): function to be run. Passed a database cursor object
|
||||
|
@ -489,6 +548,8 @@ class ModuleApi:
|
|||
|
||||
This is deprecated in favor of complete_sso_login_async.
|
||||
|
||||
Added in Synapse v1.11.1.
|
||||
|
||||
Args:
|
||||
registered_user_id: The MXID that has been registered as a previous step of
|
||||
of this SSO login.
|
||||
|
@ -515,6 +576,8 @@ class ModuleApi:
|
|||
want their access token sent to `client_redirect_url`, or redirect them to that
|
||||
URL with a token directly if the URL matches with one of the whitelisted clients.
|
||||
|
||||
Added in Synapse v1.13.0.
|
||||
|
||||
Args:
|
||||
registered_user_id: The MXID that has been registered as a previous step of
|
||||
of this SSO login.
|
||||
|
@ -543,6 +606,8 @@ class ModuleApi:
|
|||
(This is exposed for compatibility with the old SpamCheckerApi. We should
|
||||
probably deprecate it and replace it with an async method in a subclass.)
|
||||
|
||||
Added in Synapse v1.22.0.
|
||||
|
||||
Args:
|
||||
room_id: The room ID to get state events in.
|
||||
types: The event type and state key (using None
|
||||
|
@ -563,6 +628,8 @@ class ModuleApi:
|
|||
async def create_and_send_event_into_room(self, event_dict: JsonDict) -> EventBase:
|
||||
"""Create and send an event into a room. Membership events are currently not supported.
|
||||
|
||||
Added in Synapse v1.22.0.
|
||||
|
||||
Args:
|
||||
event_dict: A dictionary representing the event to send.
|
||||
Required keys are `type`, `room_id`, `sender` and `content`.
|
||||
|
@ -603,6 +670,8 @@ class ModuleApi:
|
|||
|
||||
Note that this method can only be run on the process that is configured to write to the
|
||||
presence stream. By default this is the main process.
|
||||
|
||||
Added in Synapse v1.32.0.
|
||||
"""
|
||||
if self._hs._instance_name not in self._hs.config.worker.writers.presence:
|
||||
raise Exception(
|
||||
|
@ -657,6 +726,8 @@ class ModuleApi:
|
|||
|
||||
Waits `msec` initially before calling `f` for the first time.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
|
||||
Args:
|
||||
f: The function to call repeatedly. f can be either synchronous or
|
||||
asynchronous, and must follow Synapse's logcontext rules.
|
||||
|
@ -696,6 +767,8 @@ class ModuleApi:
|
|||
):
|
||||
"""Send an email on behalf of the homeserver.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
|
||||
Args:
|
||||
recipient: The email address for the recipient.
|
||||
subject: The email's subject.
|
||||
|
@ -719,6 +792,8 @@ class ModuleApi:
|
|||
By default, Synapse will look for these templates in its configured template
|
||||
directory, but another directory to search in can be provided.
|
||||
|
||||
Added in Synapse v1.39.0.
|
||||
|
||||
Args:
|
||||
filenames: The name of the template files to look for.
|
||||
custom_template_directory: An additional directory to look for the files in.
|
||||
|
@ -736,13 +811,13 @@ class ModuleApi:
|
|||
"""
|
||||
Checks whether an ID (user id, room, ...) comes from this homeserver.
|
||||
|
||||
Added in Synapse v1.44.0.
|
||||
|
||||
Args:
|
||||
id: any Matrix id (e.g. user id, room id, ...), either as a raw id,
|
||||
e.g. string "@user:example.com" or as a parsed UserID, RoomID, ...
|
||||
Returns:
|
||||
True if id comes from this homeserver, False otherwise.
|
||||
|
||||
Added in Synapse v1.44.0.
|
||||
"""
|
||||
if isinstance(id, DomainSpecificString):
|
||||
return self._hs.is_mine(id)
|
||||
|
@ -755,6 +830,8 @@ class ModuleApi:
|
|||
"""
|
||||
Return the list of user IPs and agents for a user.
|
||||
|
||||
Added in Synapse v1.44.0.
|
||||
|
||||
Args:
|
||||
user_id: the id of a user, local or remote
|
||||
since_ts: a timestamp in seconds since the epoch,
|
||||
|
@ -763,8 +840,6 @@ class ModuleApi:
|
|||
The list of all UserIpAndAgent that the user has
|
||||
used to connect to this homeserver since `since_ts`.
|
||||
If the user is remote, this list is empty.
|
||||
|
||||
Added in Synapse v1.44.0.
|
||||
"""
|
||||
# Don't hit the db if this is not a local user.
|
||||
is_mine = False
|
||||
|
@ -803,6 +878,8 @@ class PublicRoomListManager:
|
|||
async def room_is_in_public_room_list(self, room_id: str) -> bool:
|
||||
"""Checks whether a room is in the public room list.
|
||||
|
||||
Added in Synapse v1.22.0.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room.
|
||||
|
||||
|
@ -819,6 +896,8 @@ class PublicRoomListManager:
|
|||
async def add_room_to_public_room_list(self, room_id: str) -> None:
|
||||
"""Publishes a room to the public room list.
|
||||
|
||||
Added in Synapse v1.22.0.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room.
|
||||
"""
|
||||
|
@ -827,6 +906,8 @@ class PublicRoomListManager:
|
|||
async def remove_room_from_public_room_list(self, room_id: str) -> None:
|
||||
"""Removes a room from the public room list.
|
||||
|
||||
Added in Synapse v1.22.0.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room.
|
||||
"""
|
||||
|
|
|
@ -220,6 +220,8 @@ class Notifier:
|
|||
# down.
|
||||
self.remote_server_up_callbacks: List[Callable[[str], None]] = []
|
||||
|
||||
self._third_party_rules = hs.get_third_party_event_rules()
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
self._pusher_pool = hs.get_pusherpool()
|
||||
|
@ -267,7 +269,7 @@ class Notifier:
|
|||
"""
|
||||
self.replication_callbacks.append(cb)
|
||||
|
||||
def on_new_room_event(
|
||||
async def on_new_room_event(
|
||||
self,
|
||||
event: EventBase,
|
||||
event_pos: PersistedEventPosition,
|
||||
|
@ -275,9 +277,10 @@ class Notifier:
|
|||
extra_users: Optional[Collection[UserID]] = None,
|
||||
):
|
||||
"""Unwraps event and calls `on_new_room_event_args`."""
|
||||
self.on_new_room_event_args(
|
||||
await self.on_new_room_event_args(
|
||||
event_pos=event_pos,
|
||||
room_id=event.room_id,
|
||||
event_id=event.event_id,
|
||||
event_type=event.type,
|
||||
state_key=event.get("state_key"),
|
||||
membership=event.content.get("membership"),
|
||||
|
@ -285,9 +288,10 @@ class Notifier:
|
|||
extra_users=extra_users or [],
|
||||
)
|
||||
|
||||
def on_new_room_event_args(
|
||||
async def on_new_room_event_args(
|
||||
self,
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
event_type: str,
|
||||
state_key: Optional[str],
|
||||
membership: Optional[str],
|
||||
|
@ -302,7 +306,10 @@ class Notifier:
|
|||
listening to the room, and any listeners for the users in the
|
||||
`extra_users` param.
|
||||
|
||||
The events can be peristed out of order. The notifier will wait
|
||||
This also notifies modules listening on new events via the
|
||||
`on_new_event` callback.
|
||||
|
||||
The events can be persisted out of order. The notifier will wait
|
||||
until all previous events have been persisted before notifying
|
||||
the client streams.
|
||||
"""
|
||||
|
@ -318,6 +325,8 @@ class Notifier:
|
|||
)
|
||||
self._notify_pending_new_room_events(max_room_stream_token)
|
||||
|
||||
await self._third_party_rules.on_new_event(event_id)
|
||||
|
||||
self.notify_replication()
|
||||
|
||||
def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
|
||||
|
|
|
@ -207,11 +207,12 @@ class ReplicationDataHandler:
|
|||
|
||||
max_token = self.store.get_room_max_token()
|
||||
event_pos = PersistedEventPosition(instance_name, token)
|
||||
self.notifier.on_new_room_event_args(
|
||||
await self.notifier.on_new_room_event_args(
|
||||
event_pos=event_pos,
|
||||
max_room_stream_token=max_token,
|
||||
extra_users=extra_users,
|
||||
room_id=row.data.room_id,
|
||||
event_id=row.data.event_id,
|
||||
event_type=row.data.type,
|
||||
state_key=row.data.state_key,
|
||||
membership=row.data.membership,
|
||||
|
|
|
@ -326,6 +326,9 @@ class UserRestServletV2(RestServlet):
|
|||
target_user.to_string()
|
||||
)
|
||||
|
||||
if "user_type" in body:
|
||||
await self.store.set_user_type(target_user, user_type)
|
||||
|
||||
user = await self.admin_handler.get_user(target_user)
|
||||
assert user is not None
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ from typing import (
|
|||
FrozenSet,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
|
@ -519,7 +520,7 @@ class StateResolutionHandler:
|
|||
self,
|
||||
room_id: str,
|
||||
room_version: str,
|
||||
state_groups_ids: Dict[int, StateMap[str]],
|
||||
state_groups_ids: Mapping[int, StateMap[str]],
|
||||
event_map: Optional[Dict[str, EventBase]],
|
||||
state_res_store: "StateResolutionStore",
|
||||
) -> _StateCacheEntry:
|
||||
|
@ -703,7 +704,7 @@ class StateResolutionHandler:
|
|||
|
||||
|
||||
def _make_state_cache_entry(
|
||||
new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]]
|
||||
new_state: StateMap[str], state_groups_ids: Mapping[int, StateMap[str]]
|
||||
) -> _StateCacheEntry:
|
||||
"""Given a resolved state, and a set of input state groups, pick one to base
|
||||
a new state group on (if any), and return an appropriately-constructed
|
||||
|
|
|
@ -478,6 +478,58 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
|
|||
|
||||
return {(d["user_id"], d["device_id"]): d for d in res}
|
||||
|
||||
async def get_user_ip_and_agents(
|
||||
self, user: UserID, since_ts: int = 0
|
||||
) -> List[LastConnectionInfo]:
|
||||
"""Fetch the IPs and user agents for a user since the given timestamp.
|
||||
|
||||
The result might be slightly out of date as client IPs are inserted in batches.
|
||||
|
||||
Args:
|
||||
user: The user for which to fetch IP addresses and user agents.
|
||||
since_ts: The timestamp after which to fetch IP addresses and user agents,
|
||||
in milliseconds.
|
||||
|
||||
Returns:
|
||||
A list of dictionaries, each containing:
|
||||
* `access_token`: The access token used.
|
||||
* `ip`: The IP address used.
|
||||
* `user_agent`: The last user agent seen for this access token and IP
|
||||
address combination.
|
||||
* `last_seen`: The timestamp at which this access token and IP address
|
||||
combination was last seen, in milliseconds.
|
||||
|
||||
Only the latest user agent for each access token and IP address combination
|
||||
is available.
|
||||
"""
|
||||
user_id = user.to_string()
|
||||
|
||||
def get_recent(txn: LoggingTransaction) -> List[Tuple[str, str, str, int]]:
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT access_token, ip, user_agent, last_seen FROM user_ips
|
||||
WHERE last_seen >= ? AND user_id = ?
|
||||
ORDER BY last_seen
|
||||
DESC
|
||||
""",
|
||||
(since_ts, user_id),
|
||||
)
|
||||
return cast(List[Tuple[str, str, str, int]], txn.fetchall())
|
||||
|
||||
rows = await self.db_pool.runInteraction(
|
||||
desc="get_user_ip_and_agents", func=get_recent
|
||||
)
|
||||
|
||||
return [
|
||||
{
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
for access_token, ip, user_agent, last_seen in rows
|
||||
]
|
||||
|
||||
|
||||
class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore):
|
||||
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
|
||||
|
@ -622,49 +674,43 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore):
|
|||
async def get_user_ip_and_agents(
|
||||
self, user: UserID, since_ts: int = 0
|
||||
) -> List[LastConnectionInfo]:
|
||||
"""
|
||||
Fetch IP/User Agent connection since a given timestamp.
|
||||
"""
|
||||
user_id = user.to_string()
|
||||
results: Dict[Tuple[str, str], Tuple[str, int]] = {}
|
||||
"""Fetch the IPs and user agents for a user since the given timestamp.
|
||||
|
||||
Args:
|
||||
user: The user for which to fetch IP addresses and user agents.
|
||||
since_ts: The timestamp after which to fetch IP addresses and user agents,
|
||||
in milliseconds.
|
||||
|
||||
Returns:
|
||||
A list of dictionaries, each containing:
|
||||
* `access_token`: The access token used.
|
||||
* `ip`: The IP address used.
|
||||
* `user_agent`: The last user agent seen for this access token and IP
|
||||
address combination.
|
||||
* `last_seen`: The timestamp at which this access token and IP address
|
||||
combination was last seen, in milliseconds.
|
||||
|
||||
Only the latest user agent for each access token and IP address combination
|
||||
is available.
|
||||
"""
|
||||
results: Dict[Tuple[str, str], LastConnectionInfo] = {
|
||||
(connection["access_token"], connection["ip"]): connection
|
||||
for connection in await super().get_user_ip_and_agents(user, since_ts)
|
||||
}
|
||||
|
||||
# Overlay data that is pending insertion on top of the results from the
|
||||
# database.
|
||||
user_id = user.to_string()
|
||||
for key in self._batch_row_update:
|
||||
(
|
||||
uid,
|
||||
access_token,
|
||||
ip,
|
||||
) = key
|
||||
uid, access_token, ip = key
|
||||
if uid == user_id:
|
||||
user_agent, _, last_seen = self._batch_row_update[key]
|
||||
if last_seen >= since_ts:
|
||||
results[(access_token, ip)] = (user_agent, last_seen)
|
||||
results[(access_token, ip)] = {
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
|
||||
def get_recent(txn: LoggingTransaction) -> List[Tuple[str, str, str, int]]:
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT access_token, ip, user_agent, last_seen FROM user_ips
|
||||
WHERE last_seen >= ? AND user_id = ?
|
||||
ORDER BY last_seen
|
||||
DESC
|
||||
""",
|
||||
(since_ts, user_id),
|
||||
)
|
||||
return cast(List[Tuple[str, str, str, int]], txn.fetchall())
|
||||
|
||||
rows = await self.db_pool.runInteraction(
|
||||
desc="get_user_ip_and_agents", func=get_recent
|
||||
)
|
||||
|
||||
results.update(
|
||||
((access_token, ip), (user_agent, last_seen))
|
||||
for access_token, ip, user_agent, last_seen in rows
|
||||
)
|
||||
return [
|
||||
{
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
for (access_token, ip), (user_agent, last_seen) in results.items()
|
||||
]
|
||||
return list(results.values())
|
||||
|
|
|
@ -19,9 +19,10 @@ from synapse.logging import issue9533_logger
|
|||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
@ -555,6 +556,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
|
||||
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||
REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
|
||||
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
@ -570,6 +572,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
|||
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
self.REMOVE_DELETED_DEVICES,
|
||||
self._remove_deleted_devices_from_device_inbox,
|
||||
)
|
||||
|
||||
async def _background_drop_index_device_inbox(self, progress, batch_size):
|
||||
def reindex_txn(conn):
|
||||
txn = conn.cursor()
|
||||
|
@ -582,6 +589,89 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
return 1
|
||||
|
||||
async def _remove_deleted_devices_from_device_inbox(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""A background update that deletes all device_inboxes for deleted devices.
|
||||
|
||||
This should only need to be run once (when users upgrade to v1.46.0)
|
||||
|
||||
Args:
|
||||
progress: JsonDict used to store progress of this background update
|
||||
batch_size: the maximum number of rows to retrieve in a single select query
|
||||
|
||||
Returns:
|
||||
The number of deleted rows
|
||||
"""
|
||||
|
||||
def _remove_deleted_devices_from_device_inbox_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> int:
|
||||
"""stream_id is not unique
|
||||
we need to use an inclusive `stream_id >= ?` clause,
|
||||
since we might not have deleted all dead device messages for the stream_id
|
||||
returned from the previous query
|
||||
|
||||
Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
|
||||
to avoid problems of deleting a large number of rows all at once
|
||||
due to a single device having lots of device messages.
|
||||
"""
|
||||
|
||||
last_stream_id = progress.get("stream_id", 0)
|
||||
|
||||
sql = """
|
||||
SELECT device_id, user_id, stream_id
|
||||
FROM device_inbox
|
||||
WHERE
|
||||
stream_id >= ?
|
||||
AND (device_id, user_id) NOT IN (
|
||||
SELECT device_id, user_id FROM devices
|
||||
)
|
||||
ORDER BY stream_id
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, (last_stream_id, batch_size))
|
||||
rows = txn.fetchall()
|
||||
|
||||
num_deleted = 0
|
||||
for row in rows:
|
||||
num_deleted += self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
"device_inbox",
|
||||
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
|
||||
)
|
||||
|
||||
if rows:
|
||||
# send more than stream_id to progress
|
||||
# otherwise it can happen in large deployments that
|
||||
# no change of status is visible in the log file
|
||||
# it may be that the stream_id does not change in several runs
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
self.REMOVE_DELETED_DEVICES,
|
||||
{
|
||||
"device_id": rows[-1][0],
|
||||
"user_id": rows[-1][1],
|
||||
"stream_id": rows[-1][2],
|
||||
},
|
||||
)
|
||||
|
||||
return num_deleted
|
||||
|
||||
number_deleted = await self.db_pool.runInteraction(
|
||||
"_remove_deleted_devices_from_device_inbox",
|
||||
_remove_deleted_devices_from_device_inbox_txn,
|
||||
)
|
||||
|
||||
# The task is finished when no more lines are deleted.
|
||||
if not number_deleted:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
self.REMOVE_DELETED_DEVICES
|
||||
)
|
||||
|
||||
return number_deleted
|
||||
|
||||
|
||||
class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
|
||||
pass
|
||||
|
|
|
@ -1134,19 +1134,14 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
|||
raise StoreError(500, "Problem storing device.")
|
||||
|
||||
async def delete_device(self, user_id: str, device_id: str) -> None:
|
||||
"""Delete a device.
|
||||
"""Delete a device and its device_inbox.
|
||||
|
||||
Args:
|
||||
user_id: The ID of the user which owns the device
|
||||
device_id: The ID of the device to delete
|
||||
"""
|
||||
await self.db_pool.simple_delete_one(
|
||||
table="devices",
|
||||
keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
|
||||
desc="delete_device",
|
||||
)
|
||||
|
||||
self.device_id_exists_cache.invalidate((user_id, device_id))
|
||||
await self.delete_devices(user_id, [device_id])
|
||||
|
||||
async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
|
||||
"""Deletes several devices.
|
||||
|
@ -1155,13 +1150,25 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
|||
user_id: The ID of the user which owns the devices
|
||||
device_ids: The IDs of the devices to delete
|
||||
"""
|
||||
await self.db_pool.simple_delete_many(
|
||||
table="devices",
|
||||
column="device_id",
|
||||
iterable=device_ids,
|
||||
keyvalues={"user_id": user_id, "hidden": False},
|
||||
desc="delete_devices",
|
||||
)
|
||||
|
||||
def _delete_devices_txn(txn: LoggingTransaction) -> None:
|
||||
self.db_pool.simple_delete_many_txn(
|
||||
txn,
|
||||
table="devices",
|
||||
column="device_id",
|
||||
values=device_ids,
|
||||
keyvalues={"user_id": user_id, "hidden": False},
|
||||
)
|
||||
|
||||
self.db_pool.simple_delete_many_txn(
|
||||
txn,
|
||||
table="device_inbox",
|
||||
column="device_id",
|
||||
values=device_ids,
|
||||
keyvalues={"user_id": user_id},
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction("delete_devices", _delete_devices_txn)
|
||||
for device_id in device_ids:
|
||||
self.device_id_exists_cache.invalidate((user_id, device_id))
|
||||
|
||||
|
|
|
@ -17,11 +17,15 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
|||
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import EventContentFields
|
||||
from synapse.api.constants import EventContentFields, RelationTypes
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import make_event_from_dict
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingTransaction,
|
||||
make_tuple_comparison_clause,
|
||||
)
|
||||
from synapse.storage.databases.main.events import PersistEventsStore
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict
|
||||
|
@ -167,6 +171,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
|||
self._purged_chain_cover_index,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"event_thread_relation", self._event_thread_relation
|
||||
)
|
||||
|
||||
################################################################################
|
||||
|
||||
# bg updates for replacing stream_ordering with a BIGINT
|
||||
|
@ -1091,6 +1099,79 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
|||
|
||||
return result
|
||||
|
||||
async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int:
|
||||
"""Background update handler which will store thread relations for existing events."""
|
||||
last_event_id = progress.get("last_event_id", "")
|
||||
|
||||
def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_id, json FROM event_json
|
||||
LEFT JOIN event_relations USING (event_id)
|
||||
WHERE event_id > ? AND event_relations.event_id IS NULL
|
||||
ORDER BY event_id LIMIT ?
|
||||
""",
|
||||
(last_event_id, batch_size),
|
||||
)
|
||||
|
||||
results = list(txn)
|
||||
missing_thread_relations = []
|
||||
for (event_id, event_json_raw) in results:
|
||||
try:
|
||||
event_json = db_to_json(event_json_raw)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Unable to load event %s (no relations will be updated): %s",
|
||||
event_id,
|
||||
e,
|
||||
)
|
||||
continue
|
||||
|
||||
# If there's no relation (or it is not a thread), skip!
|
||||
relates_to = event_json["content"].get("m.relates_to")
|
||||
if not relates_to or not isinstance(relates_to, dict):
|
||||
continue
|
||||
if relates_to.get("rel_type") != RelationTypes.THREAD:
|
||||
continue
|
||||
|
||||
# Get the parent ID.
|
||||
parent_id = relates_to.get("event_id")
|
||||
if not isinstance(parent_id, str):
|
||||
continue
|
||||
|
||||
missing_thread_relations.append((event_id, parent_id))
|
||||
|
||||
# Insert the missing data.
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn=txn,
|
||||
table="event_relations",
|
||||
values=[
|
||||
{
|
||||
"event_id": event_id,
|
||||
"relates_to_Id": parent_id,
|
||||
"relation_type": RelationTypes.THREAD,
|
||||
}
|
||||
for event_id, parent_id in missing_thread_relations
|
||||
],
|
||||
)
|
||||
|
||||
if results:
|
||||
latest_event_id = results[-1][0]
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, "event_thread_relation", {"last_event_id": latest_event_id}
|
||||
)
|
||||
|
||||
return len(results)
|
||||
|
||||
num_rows = await self.db_pool.runInteraction(
|
||||
desc="event_thread_relation", func=_event_thread_relation_txn
|
||||
)
|
||||
|
||||
if not num_rows:
|
||||
await self.db_pool.updates._end_background_update("event_thread_relation")
|
||||
|
||||
return num_rows
|
||||
|
||||
async def _background_populate_stream_ordering2(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
|
|
|
@ -91,7 +91,7 @@ class ProfileWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
async def update_remote_profile_cache(
|
||||
self, user_id: str, displayname: str, avatar_url: str
|
||||
self, user_id: str, displayname: Optional[str], avatar_url: Optional[str]
|
||||
) -> int:
|
||||
return await self.db_pool.simple_update(
|
||||
table="remote_profile_cache",
|
||||
|
|
|
@ -499,6 +499,24 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
|
|||
|
||||
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
|
||||
|
||||
async def set_user_type(self, user: UserID, user_type: Optional[UserTypes]) -> None:
|
||||
"""Sets the user type.
|
||||
|
||||
Args:
|
||||
user: user ID of the user.
|
||||
user_type: type of the user or None for a user without a type.
|
||||
"""
|
||||
|
||||
def set_user_type_txn(txn):
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn, "users", {"name": user.to_string()}, {"user_type": user_type}
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_by_id, (user.to_string(),)
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction("set_user_type", set_user_type_txn)
|
||||
|
||||
def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]:
|
||||
sql = """
|
||||
SELECT users.name as user_id,
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
-- Remove messages from the device_inbox table which were orphaned
|
||||
-- when a device was deleted using Synapse earlier than 1.46.0.
|
||||
-- This runs as background task, but may take a bit to finish.
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(6402, 'remove_deleted_devices_from_device_inbox', '{}');
|
18
synapse/storage/schema/main/delta/65/02_thread_relations.sql
Normal file
18
synapse/storage/schema/main/delta/65/02_thread_relations.sql
Normal file
|
@ -0,0 +1,18 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Check old events for thread relations.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(6502, 'event_thread_relation', '{}');
|
136
synapse/util/gai_resolver.py
Normal file
136
synapse/util/gai_resolver.py
Normal file
|
@ -0,0 +1,136 @@
|
|||
# This is a direct lift from
|
||||
# https://github.com/twisted/twisted/blob/release-21.2.0-10091/src/twisted/internet/_resolver.py.
|
||||
# We copy it here as we need to instantiate `GAIResolver` manually, but it is a
|
||||
# private class.
|
||||
|
||||
|
||||
from socket import (
|
||||
AF_INET,
|
||||
AF_INET6,
|
||||
AF_UNSPEC,
|
||||
SOCK_DGRAM,
|
||||
SOCK_STREAM,
|
||||
gaierror,
|
||||
getaddrinfo,
|
||||
)
|
||||
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet.address import IPv4Address, IPv6Address
|
||||
from twisted.internet.interfaces import IHostnameResolver, IHostResolution
|
||||
from twisted.internet.threads import deferToThreadPool
|
||||
|
||||
|
||||
@implementer(IHostResolution)
|
||||
class HostResolution:
|
||||
"""
|
||||
The in-progress resolution of a given hostname.
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
"""
|
||||
Create a L{HostResolution} with the given name.
|
||||
"""
|
||||
self.name = name
|
||||
|
||||
def cancel(self):
|
||||
# IHostResolution.cancel
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
_any = frozenset([IPv4Address, IPv6Address])
|
||||
|
||||
_typesToAF = {
|
||||
frozenset([IPv4Address]): AF_INET,
|
||||
frozenset([IPv6Address]): AF_INET6,
|
||||
_any: AF_UNSPEC,
|
||||
}
|
||||
|
||||
_afToType = {
|
||||
AF_INET: IPv4Address,
|
||||
AF_INET6: IPv6Address,
|
||||
}
|
||||
|
||||
_transportToSocket = {
|
||||
"TCP": SOCK_STREAM,
|
||||
"UDP": SOCK_DGRAM,
|
||||
}
|
||||
|
||||
_socktypeToType = {
|
||||
SOCK_STREAM: "TCP",
|
||||
SOCK_DGRAM: "UDP",
|
||||
}
|
||||
|
||||
|
||||
@implementer(IHostnameResolver)
|
||||
class GAIResolver:
|
||||
"""
|
||||
L{IHostnameResolver} implementation that resolves hostnames by calling
|
||||
L{getaddrinfo} in a thread.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor, getThreadPool=None, getaddrinfo=getaddrinfo):
|
||||
"""
|
||||
Create a L{GAIResolver}.
|
||||
@param reactor: the reactor to schedule result-delivery on
|
||||
@type reactor: L{IReactorThreads}
|
||||
@param getThreadPool: a function to retrieve the thread pool to use for
|
||||
scheduling name resolutions. If not supplied, the use the given
|
||||
C{reactor}'s thread pool.
|
||||
@type getThreadPool: 0-argument callable returning a
|
||||
L{twisted.python.threadpool.ThreadPool}
|
||||
@param getaddrinfo: a reference to the L{getaddrinfo} to use - mainly
|
||||
parameterized for testing.
|
||||
@type getaddrinfo: callable with the same signature as L{getaddrinfo}
|
||||
"""
|
||||
self._reactor = reactor
|
||||
self._getThreadPool = (
|
||||
reactor.getThreadPool if getThreadPool is None else getThreadPool
|
||||
)
|
||||
self._getaddrinfo = getaddrinfo
|
||||
|
||||
def resolveHostName(
|
||||
self,
|
||||
resolutionReceiver,
|
||||
hostName,
|
||||
portNumber=0,
|
||||
addressTypes=None,
|
||||
transportSemantics="TCP",
|
||||
):
|
||||
"""
|
||||
See L{IHostnameResolver.resolveHostName}
|
||||
@param resolutionReceiver: see interface
|
||||
@param hostName: see interface
|
||||
@param portNumber: see interface
|
||||
@param addressTypes: see interface
|
||||
@param transportSemantics: see interface
|
||||
@return: see interface
|
||||
"""
|
||||
pool = self._getThreadPool()
|
||||
addressFamily = _typesToAF[
|
||||
_any if addressTypes is None else frozenset(addressTypes)
|
||||
]
|
||||
socketType = _transportToSocket[transportSemantics]
|
||||
|
||||
def get():
|
||||
try:
|
||||
return self._getaddrinfo(
|
||||
hostName, portNumber, addressFamily, socketType
|
||||
)
|
||||
except gaierror:
|
||||
return []
|
||||
|
||||
d = deferToThreadPool(self._reactor, pool, get)
|
||||
resolution = HostResolution(hostName)
|
||||
resolutionReceiver.resolutionBegan(resolution)
|
||||
|
||||
@d.addCallback
|
||||
def deliverResults(result):
|
||||
for family, socktype, _proto, _cannoname, sockaddr in result:
|
||||
addrType = _afToType[family]
|
||||
resolutionReceiver.addressResolved(
|
||||
addrType(_socktypeToType.get(socktype, "TCP"), *sockaddr)
|
||||
)
|
||||
resolutionReceiver.resolutionComplete()
|
||||
|
||||
return resolution
|
|
@ -160,6 +160,37 @@ class DeviceTestCase(unittest.HomeserverTestCase):
|
|||
# we'd like to check the access token was invalidated, but that's a
|
||||
# bit of a PITA.
|
||||
|
||||
def test_delete_device_and_device_inbox(self):
|
||||
self._record_users()
|
||||
|
||||
# add an device_inbox
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"device_inbox",
|
||||
{
|
||||
"user_id": user1,
|
||||
"device_id": "abc",
|
||||
"stream_id": 1,
|
||||
"message_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# delete the device
|
||||
self.get_success(self.handler.delete_device(user1, "abc"))
|
||||
|
||||
# check that the device_inbox was deleted
|
||||
res = self.get_success(
|
||||
self.store.db_pool.simple_select_one(
|
||||
table="device_inbox",
|
||||
keyvalues={"user_id": user1, "device_id": "abc"},
|
||||
retcols=("user_id", "device_id"),
|
||||
allow_none=True,
|
||||
desc="get_device_id_from_device_inbox",
|
||||
)
|
||||
)
|
||||
self.assertIsNone(res)
|
||||
|
||||
def test_update_device(self):
|
||||
self._record_users()
|
||||
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
|
||||
from unittest.mock import Mock
|
||||
|
||||
import synapse
|
||||
import synapse.api.errors
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.config.room_directory import RoomDirectoryConfig
|
||||
from synapse.rest.client import directory, login, room
|
||||
|
@ -432,6 +432,106 @@ class TestCreateAliasACL(unittest.HomeserverTestCase):
|
|||
self.assertEquals(200, channel.code, channel.result)
|
||||
|
||||
|
||||
class TestCreatePublishedRoomACL(unittest.HomeserverTestCase):
|
||||
data = {"room_alias_name": "unofficial_test"}
|
||||
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets_for_client_rest_resource,
|
||||
login.register_servlets,
|
||||
directory.register_servlets,
|
||||
room.register_servlets,
|
||||
]
|
||||
hijack_auth = False
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.allowed_user_id = self.register_user("allowed", "pass")
|
||||
self.allowed_access_token = self.login("allowed", "pass")
|
||||
|
||||
self.denied_user_id = self.register_user("denied", "pass")
|
||||
self.denied_access_token = self.login("denied", "pass")
|
||||
|
||||
# This time we add custom room list publication rules
|
||||
config = {}
|
||||
config["alias_creation_rules"] = []
|
||||
config["room_list_publication_rules"] = [
|
||||
{"user_id": "*", "alias": "*", "action": "deny"},
|
||||
{"user_id": self.allowed_user_id, "alias": "*", "action": "allow"},
|
||||
]
|
||||
|
||||
rd_config = RoomDirectoryConfig()
|
||||
rd_config.read_config(config)
|
||||
|
||||
self.hs.config.roomdirectory.is_publishing_room_allowed = (
|
||||
rd_config.is_publishing_room_allowed
|
||||
)
|
||||
|
||||
return hs
|
||||
|
||||
def test_denied_without_publication_permission(self):
|
||||
"""
|
||||
Try to create a room, register an alias for it, and publish it,
|
||||
as a user without permission to publish rooms.
|
||||
(This is used as both a standalone test & as a helper function.)
|
||||
"""
|
||||
self.helper.create_room_as(
|
||||
self.denied_user_id,
|
||||
tok=self.denied_access_token,
|
||||
extra_content=self.data,
|
||||
is_public=True,
|
||||
expect_code=403,
|
||||
)
|
||||
|
||||
def test_allowed_when_creating_private_room(self):
|
||||
"""
|
||||
Try to create a room, register an alias for it, and NOT publish it,
|
||||
as a user without permission to publish rooms.
|
||||
(This is used as both a standalone test & as a helper function.)
|
||||
"""
|
||||
self.helper.create_room_as(
|
||||
self.denied_user_id,
|
||||
tok=self.denied_access_token,
|
||||
extra_content=self.data,
|
||||
is_public=False,
|
||||
expect_code=200,
|
||||
)
|
||||
|
||||
def test_allowed_with_publication_permission(self):
|
||||
"""
|
||||
Try to create a room, register an alias for it, and publish it,
|
||||
as a user WITH permission to publish rooms.
|
||||
(This is used as both a standalone test & as a helper function.)
|
||||
"""
|
||||
self.helper.create_room_as(
|
||||
self.allowed_user_id,
|
||||
tok=self.allowed_access_token,
|
||||
extra_content=self.data,
|
||||
is_public=False,
|
||||
expect_code=200,
|
||||
)
|
||||
|
||||
def test_can_create_as_private_room_after_rejection(self):
|
||||
"""
|
||||
After failing to publish a room with an alias as a user without publish permission,
|
||||
retry as the same user, but without publishing the room.
|
||||
|
||||
This should pass, but used to fail because the alias was registered by the first
|
||||
request, even though the room creation was denied.
|
||||
"""
|
||||
self.test_denied_without_publication_permission()
|
||||
self.test_allowed_when_creating_private_room()
|
||||
|
||||
def test_can_create_with_permission_after_rejection(self):
|
||||
"""
|
||||
After failing to publish a room with an alias as a user without publish permission,
|
||||
retry as someone with permission, using the same alias.
|
||||
|
||||
This also used to fail because of the alias having been registered by the first
|
||||
request, leaving it unavailable for any other user's new rooms.
|
||||
"""
|
||||
self.test_denied_without_publication_permission()
|
||||
self.test_allowed_with_publication_permission()
|
||||
|
||||
|
||||
class TestRoomListSearchDisabled(unittest.HomeserverTestCase):
|
||||
user_id = "@test:test"
|
||||
|
||||
|
|
|
@ -2270,6 +2270,57 @@ class UserRestTestCase(unittest.HomeserverTestCase):
|
|||
self.assertEqual("@user:test", channel.json_body["name"])
|
||||
self.assertTrue(channel.json_body["admin"])
|
||||
|
||||
def test_set_user_type(self):
|
||||
"""
|
||||
Test changing user type.
|
||||
"""
|
||||
|
||||
# Set to support type
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
self.url_other_user,
|
||||
access_token=self.admin_user_tok,
|
||||
content={"user_type": UserTypes.SUPPORT},
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual("@user:test", channel.json_body["name"])
|
||||
self.assertEqual(UserTypes.SUPPORT, channel.json_body["user_type"])
|
||||
|
||||
# Get user
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url_other_user,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual("@user:test", channel.json_body["name"])
|
||||
self.assertEqual(UserTypes.SUPPORT, channel.json_body["user_type"])
|
||||
|
||||
# Change back to a regular user
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
self.url_other_user,
|
||||
access_token=self.admin_user_tok,
|
||||
content={"user_type": None},
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual("@user:test", channel.json_body["name"])
|
||||
self.assertIsNone(channel.json_body["user_type"])
|
||||
|
||||
# Get user
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
self.url_other_user,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual("@user:test", channel.json_body["name"])
|
||||
self.assertIsNone(channel.json_body["user_type"])
|
||||
|
||||
def test_accidental_deactivation_prevention(self):
|
||||
"""
|
||||
Ensure an account can't accidentally be deactivated by using a str value
|
||||
|
|
|
@ -15,7 +15,7 @@ import threading
|
|||
from typing import TYPE_CHECKING, Dict, Optional, Tuple
|
||||
from unittest.mock import Mock
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
|
||||
|
@ -25,6 +25,7 @@ from synapse.types import JsonDict, Requester, StateMap
|
|||
from synapse.util.frozenutils import unfreeze
|
||||
|
||||
from tests import unittest
|
||||
from tests.test_utils import make_awaitable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.module_api import ModuleApi
|
||||
|
@ -74,7 +75,7 @@ class LegacyChangeEvents(LegacyThirdPartyRulesTestModule):
|
|||
return d
|
||||
|
||||
|
||||
class ThirdPartyRulesTestCase(unittest.HomeserverTestCase):
|
||||
class ThirdPartyRulesTestCase(unittest.FederatingHomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
login.register_servlets,
|
||||
|
@ -86,11 +87,29 @@ class ThirdPartyRulesTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
load_legacy_third_party_event_rules(hs)
|
||||
|
||||
# We're not going to be properly signing events as our remote homeserver is fake,
|
||||
# therefore disable event signature checks.
|
||||
# Note that these checks are not relevant to this test case.
|
||||
|
||||
# Have this homeserver auto-approve all event signature checking.
|
||||
async def approve_all_signature_checking(_, pdu):
|
||||
return pdu
|
||||
|
||||
hs.get_federation_server()._check_sigs_and_hash = approve_all_signature_checking
|
||||
|
||||
# Have this homeserver skip event auth checks. This is necessary due to
|
||||
# event auth checks ensuring that events were signed by the sender's homeserver.
|
||||
async def _check_event_auth(origin, event, context, *args, **kwargs):
|
||||
return context
|
||||
|
||||
hs.get_federation_event_handler()._check_event_auth = _check_event_auth
|
||||
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
# Create a user and room to play with during the tests
|
||||
# Create some users and a room to play with during the tests
|
||||
self.user_id = self.register_user("kermit", "monkey")
|
||||
self.invitee = self.register_user("invitee", "hackme")
|
||||
self.tok = self.login("kermit", "monkey")
|
||||
|
||||
# Some tests might prevent room creation on purpose.
|
||||
|
@ -424,6 +443,74 @@ class ThirdPartyRulesTestCase(unittest.HomeserverTestCase):
|
|||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(channel.json_body["i"], i)
|
||||
|
||||
def test_on_new_event(self):
|
||||
"""Test that the on_new_event callback is called on new events"""
|
||||
on_new_event = Mock(make_awaitable(None))
|
||||
self.hs.get_third_party_event_rules()._on_new_event_callbacks.append(
|
||||
on_new_event
|
||||
)
|
||||
|
||||
# Send a message event to the room and check that the callback is called.
|
||||
self.helper.send(room_id=self.room_id, tok=self.tok)
|
||||
self.assertEqual(on_new_event.call_count, 1)
|
||||
|
||||
# Check that the callback is also called on membership updates.
|
||||
self.helper.invite(
|
||||
room=self.room_id,
|
||||
src=self.user_id,
|
||||
targ=self.invitee,
|
||||
tok=self.tok,
|
||||
)
|
||||
|
||||
self.assertEqual(on_new_event.call_count, 2)
|
||||
|
||||
args, _ = on_new_event.call_args
|
||||
|
||||
self.assertEqual(args[0].membership, Membership.INVITE)
|
||||
self.assertEqual(args[0].state_key, self.invitee)
|
||||
|
||||
# Check that the invitee's membership is correct in the state that's passed down
|
||||
# to the callback.
|
||||
self.assertEqual(
|
||||
args[1][(EventTypes.Member, self.invitee)].membership,
|
||||
Membership.INVITE,
|
||||
)
|
||||
|
||||
# Send an event over federation and check that the callback is also called.
|
||||
self._send_event_over_federation()
|
||||
self.assertEqual(on_new_event.call_count, 3)
|
||||
|
||||
def _send_event_over_federation(self) -> None:
|
||||
"""Send a dummy event over federation and check that the request succeeds."""
|
||||
body = {
|
||||
"origin": self.hs.config.server.server_name,
|
||||
"origin_server_ts": self.clock.time_msec(),
|
||||
"pdus": [
|
||||
{
|
||||
"sender": self.user_id,
|
||||
"type": EventTypes.Message,
|
||||
"state_key": "",
|
||||
"content": {"body": "hello world", "msgtype": "m.text"},
|
||||
"room_id": self.room_id,
|
||||
"depth": 0,
|
||||
"origin_server_ts": self.clock.time_msec(),
|
||||
"prev_events": [],
|
||||
"auth_events": [],
|
||||
"signatures": {},
|
||||
"unsigned": {},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
channel = self.make_request(
|
||||
method="PUT",
|
||||
path="/_matrix/federation/v1/send/1",
|
||||
content=body,
|
||||
federation_auth_origin=self.hs.config.server.server_name.encode("utf8"),
|
||||
)
|
||||
|
||||
self.assertEqual(channel.code, 200, channel.result)
|
||||
|
||||
def _update_power_levels(self, event_default: int = 0):
|
||||
"""Updates the room's power levels.
|
||||
|
||||
|
|
90
tests/storage/databases/main/test_deviceinbox.py
Normal file
90
tests/storage/databases/main/test_deviceinbox.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import devices
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
devices.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.store = hs.get_datastore()
|
||||
self.user_id = self.register_user("foo", "pass")
|
||||
|
||||
def test_background_remove_deleted_devices_from_device_inbox(self):
|
||||
"""Test that the background task to delete old device_inboxes works properly."""
|
||||
|
||||
# create a valid device
|
||||
self.get_success(
|
||||
self.store.store_device(self.user_id, "cur_device", "display_name")
|
||||
)
|
||||
|
||||
# Add device_inbox to devices
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"device_inbox",
|
||||
{
|
||||
"user_id": self.user_id,
|
||||
"device_id": "cur_device",
|
||||
"stream_id": 1,
|
||||
"message_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"device_inbox",
|
||||
{
|
||||
"user_id": self.user_id,
|
||||
"device_id": "old_device",
|
||||
"stream_id": 2,
|
||||
"message_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Insert and run the background update.
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "remove_deleted_devices_from_device_inbox",
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# ... and tell the DataStore that it hasn't finished all updates yet
|
||||
self.store.db_pool.updates._all_done = False
|
||||
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Make sure the background task deleted old device_inbox
|
||||
res = self.get_success(
|
||||
self.store.db_pool.simple_select_onecol(
|
||||
table="device_inbox",
|
||||
keyvalues={},
|
||||
retcol="device_id",
|
||||
desc="get_device_id_from_device_inbox",
|
||||
)
|
||||
)
|
||||
self.assertEqual(1, len(res))
|
||||
self.assertEqual(res[0], "cur_device")
|
Loading…
Reference in a new issue