Merge branch 'release-v1.14.0' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
407dbf8574
84
CHANGES.md
84
CHANGES.md
|
@ -1,3 +1,87 @@
|
|||
Synapse 1.14.0rc1 (2020-05-26)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Synapse's cache factor can now be configured in `homeserver.yaml` by the `caches.global_factor` setting. Additionally, `caches.per_cache_factors` controls the cache factors for individual caches. ([\#6391](https://github.com/matrix-org/synapse/issues/6391))
|
||||
- Add OpenID Connect login/registration support. Contributed by Quentin Gliech, on behalf of [les Connecteurs](https://connecteu.rs). ([\#7256](https://github.com/matrix-org/synapse/issues/7256), [\#7457](https://github.com/matrix-org/synapse/issues/7457))
|
||||
- Add room details admin endpoint. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#7317](https://github.com/matrix-org/synapse/issues/7317))
|
||||
- Allow for using more than one spam checker module at once. ([\#7435](https://github.com/matrix-org/synapse/issues/7435))
|
||||
- Add additional authentication checks for `m.room.power_levels` event per [MSC2209](https://github.com/matrix-org/matrix-doc/pull/2209). ([\#7502](https://github.com/matrix-org/synapse/issues/7502))
|
||||
- Implement room version 6 per [MSC2240](https://github.com/matrix-org/matrix-doc/pull/2240). ([\#7506](https://github.com/matrix-org/synapse/issues/7506))
|
||||
- Add highly experimental option to move event persistence off master. ([\#7281](https://github.com/matrix-org/synapse/issues/7281), [\#7374](https://github.com/matrix-org/synapse/issues/7374), [\#7436](https://github.com/matrix-org/synapse/issues/7436), [\#7440](https://github.com/matrix-org/synapse/issues/7440), [\#7475](https://github.com/matrix-org/synapse/issues/7475), [\#7490](https://github.com/matrix-org/synapse/issues/7490), [\#7491](https://github.com/matrix-org/synapse/issues/7491), [\#7492](https://github.com/matrix-org/synapse/issues/7492), [\#7493](https://github.com/matrix-org/synapse/issues/7493), [\#7495](https://github.com/matrix-org/synapse/issues/7495), [\#7515](https://github.com/matrix-org/synapse/issues/7515), [\#7516](https://github.com/matrix-org/synapse/issues/7516), [\#7517](https://github.com/matrix-org/synapse/issues/7517), [\#7542](https://github.com/matrix-org/synapse/issues/7542))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind. ([\#7384](https://github.com/matrix-org/synapse/issues/7384))
|
||||
- Allow expired user accounts to log out their device sessions. ([\#7443](https://github.com/matrix-org/synapse/issues/7443))
|
||||
- Fix a bug that would cause Synapse not to resync out-of-sync device lists. ([\#7453](https://github.com/matrix-org/synapse/issues/7453))
|
||||
- Prevent rooms with 0 members or with invalid version strings from breaking group queries. ([\#7465](https://github.com/matrix-org/synapse/issues/7465))
|
||||
- Workaround for an upstream Twisted bug that caused Synapse to become unresponsive after startup. ([\#7473](https://github.com/matrix-org/synapse/issues/7473))
|
||||
- Fix Redis reconnection logic that can result in missed updates over replication if master reconnects to Redis without restarting. ([\#7482](https://github.com/matrix-org/synapse/issues/7482))
|
||||
- When sending `m.room.member` events, omit `displayname` and `avatar_url` if they aren't set instead of setting them to `null`. Contributed by Aaron Raimist. ([\#7497](https://github.com/matrix-org/synapse/issues/7497))
|
||||
- Fix incorrect `method` label on `synapse_http_matrixfederationclient_{requests,responses}` prometheus metrics. ([\#7503](https://github.com/matrix-org/synapse/issues/7503))
|
||||
- Ignore incoming presence events from other homeservers if presence is disabled locally. ([\#7508](https://github.com/matrix-org/synapse/issues/7508))
|
||||
- Fix a long-standing bug that broke the update remote profile background process. ([\#7511](https://github.com/matrix-org/synapse/issues/7511))
|
||||
- Hash passwords as early as possible during password reset. ([\#7538](https://github.com/matrix-org/synapse/issues/7538))
|
||||
- Fix bug where a local user leaving a room could fail under rare circumstances. ([\#7548](https://github.com/matrix-org/synapse/issues/7548))
|
||||
- Fix "Missing RelayState parameter" error when using user interactive authentication with SAML for some SAML providers. ([\#7552](https://github.com/matrix-org/synapse/issues/7552))
|
||||
- Fix exception `'GenericWorkerReplicationHandler' object has no attribute 'send_federation_ack'`, introduced in v1.13.0. ([\#7564](https://github.com/matrix-org/synapse/issues/7564))
|
||||
- `synctl` now warns if it was unable to stop Synapse and will not attempt to start Synapse if nothing was stopped. Contributed by Romain Bouyé. ([\#6590](https://github.com/matrix-org/synapse/issues/6590))
|
||||
|
||||
|
||||
Updates to the Docker image
|
||||
---------------------------
|
||||
|
||||
- Update docker runtime image to Alpine v3.11. Contributed by @Starbix. ([\#7398](https://github.com/matrix-org/synapse/issues/7398))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Update information about mapping providers for SAML and OpenID. ([\#7458](https://github.com/matrix-org/synapse/issues/7458))
|
||||
- Add additional reverse proxy example for Caddy v2. Contributed by Jeff Peeler. ([\#7463](https://github.com/matrix-org/synapse/issues/7463))
|
||||
- Fix copy-paste error in `ServerNoticesConfig` docstring. Contributed by @ptman. ([\#7477](https://github.com/matrix-org/synapse/issues/7477))
|
||||
- Improve the formatting of `reverse_proxy.md`. ([\#7514](https://github.com/matrix-org/synapse/issues/7514))
|
||||
- Change the systemd worker service to check that the worker config file exists instead of silently failing. Contributed by David Vo. ([\#7528](https://github.com/matrix-org/synapse/issues/7528))
|
||||
- Minor clarifications to the TURN docs. ([\#7533](https://github.com/matrix-org/synapse/issues/7533))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Add typing annotations in `synapse.federation`. ([\#7382](https://github.com/matrix-org/synapse/issues/7382))
|
||||
- Convert the room handler to async/await. ([\#7396](https://github.com/matrix-org/synapse/issues/7396))
|
||||
- Improve performance of `get_e2e_cross_signing_key`. ([\#7428](https://github.com/matrix-org/synapse/issues/7428))
|
||||
- Improve performance of `mark_as_sent_devices_by_remote`. ([\#7429](https://github.com/matrix-org/synapse/issues/7429), [\#7562](https://github.com/matrix-org/synapse/issues/7562))
|
||||
- Add type hints to the SAML handler. ([\#7445](https://github.com/matrix-org/synapse/issues/7445))
|
||||
- Remove storage method `get_hosts_in_room` that is no longer called anywhere. ([\#7448](https://github.com/matrix-org/synapse/issues/7448))
|
||||
- Fix some typos in the `notice_expiry` templates. ([\#7449](https://github.com/matrix-org/synapse/issues/7449))
|
||||
- Convert the federation handler to async/await. ([\#7459](https://github.com/matrix-org/synapse/issues/7459))
|
||||
- Convert the search handler to async/await. ([\#7460](https://github.com/matrix-org/synapse/issues/7460))
|
||||
- Add type hints to `synapse.event_auth`. ([\#7505](https://github.com/matrix-org/synapse/issues/7505))
|
||||
- Convert the room member handler to async/await. ([\#7507](https://github.com/matrix-org/synapse/issues/7507))
|
||||
- Add type hints to room member handler. ([\#7513](https://github.com/matrix-org/synapse/issues/7513))
|
||||
- Fix typing annotations in `tests.replication`. ([\#7518](https://github.com/matrix-org/synapse/issues/7518))
|
||||
- Remove some redundant Python 2 support code. ([\#7519](https://github.com/matrix-org/synapse/issues/7519))
|
||||
- All endpoints now respond with a 200 OK for `OPTIONS` requests. ([\#7534](https://github.com/matrix-org/synapse/issues/7534), [\#7560](https://github.com/matrix-org/synapse/issues/7560))
|
||||
- Synapse now exports [detailed allocator statistics](https://doc.pypy.org/en/latest/gc_info.html#gc-get-stats) and basic GC timings as Prometheus metrics (`pypy_gc_time_seconds_total` and `pypy_memory_bytes`) when run under PyPy. Contributed by Ivan Shapovalov. ([\#7536](https://github.com/matrix-org/synapse/issues/7536))
|
||||
- Remove Ubuntu Cosmic and Disco from the list of distributions which we provide `.deb`s for, due to end-of-life. ([\#7539](https://github.com/matrix-org/synapse/issues/7539))
|
||||
- Make worker processes return a stubbed-out response to `GET /presence` requests. ([\#7545](https://github.com/matrix-org/synapse/issues/7545))
|
||||
- Optimise some references to `hs.config`. ([\#7546](https://github.com/matrix-org/synapse/issues/7546))
|
||||
- On upgrade room only send canonical alias once. ([\#7547](https://github.com/matrix-org/synapse/issues/7547))
|
||||
- Fix some indentation inconsistencies in the sample config. ([\#7550](https://github.com/matrix-org/synapse/issues/7550))
|
||||
- Include `synapse.http.site` in type checking. ([\#7553](https://github.com/matrix-org/synapse/issues/7553))
|
||||
- Fix some test code to not mangle stacktraces, to make it easier to debug errors. ([\#7554](https://github.com/matrix-org/synapse/issues/7554))
|
||||
- Refresh apt cache when building `dh_virtualenv` docker image. ([\#7555](https://github.com/matrix-org/synapse/issues/7555))
|
||||
- Stop logging some expected HTTP request errors as exceptions. ([\#7556](https://github.com/matrix-org/synapse/issues/7556), [\#7563](https://github.com/matrix-org/synapse/issues/7563))
|
||||
- Convert sending mail to async/await. ([\#7557](https://github.com/matrix-org/synapse/issues/7557))
|
||||
- Simplify `reap_monthly_active_users`. ([\#7558](https://github.com/matrix-org/synapse/issues/7558))
|
||||
|
||||
|
||||
Synapse 1.13.0 (2020-05-19)
|
||||
===========================
|
||||
|
||||
|
|
12
UPGRADE.rst
12
UPGRADE.rst
|
@ -75,9 +75,15 @@ for example:
|
|||
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
|
||||
Upgrading to v1.13.0
|
||||
Upgrading to v1.14.0
|
||||
====================
|
||||
|
||||
This version includes a database update which is run as part of the upgrade,
|
||||
and which may take a couple of minutes in the case of a large server. Synapse
|
||||
will not respond to HTTP requests while this update is taking place.
|
||||
|
||||
Upgrading to v1.13.0
|
||||
====================
|
||||
|
||||
Incorrect database migration in old synapse versions
|
||||
----------------------------------------------------
|
||||
|
@ -136,12 +142,12 @@ back to v1.12.4 you need to:
|
|||
2. Decrease the schema version in the database:
|
||||
|
||||
.. code:: sql
|
||||
|
||||
|
||||
UPDATE schema_version SET version = 57;
|
||||
|
||||
3. Downgrade Synapse by following the instructions for your installation method
|
||||
in the "Rolling back to older versions" section above.
|
||||
|
||||
|
||||
|
||||
Upgrading to v1.12.0
|
||||
====================
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
Synapse's cache factor can now be configured in `homeserver.yaml` by the `caches.global_factor` setting. Additionally, `caches.per_cache_factors` controls the cache factors for individual caches.
|
|
@ -1 +0,0 @@
|
|||
`synctl` now warns if it was unable to stop Synapse and will not attempt to start Synapse if nothing was stopped. Contributed by Romain Bouyé.
|
|
@ -1 +0,0 @@
|
|||
Add OpenID Connect login/registration support. Contributed by Quentin Gliech, on behalf of [les Connecteurs](https://connecteu.rs).
|
|
@ -1 +0,0 @@
|
|||
Add MultiWriterIdGenerator to support multiple concurrent writers of streams.
|
|
@ -1 +0,0 @@
|
|||
Add room details admin endpoint. Contributed by Awesome Technologies Innovationslabor GmbH.
|
|
@ -1 +0,0 @@
|
|||
Move catchup of replication streams logic to worker.
|
|
@ -1 +0,0 @@
|
|||
Add an experimental room version which strictly adheres to the canonical JSON specification.
|
|
@ -1 +0,0 @@
|
|||
Add typing annotations in `synapse.federation`.
|
|
@ -1 +0,0 @@
|
|||
Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind.
|
|
@ -1 +0,0 @@
|
|||
Convert the room handler to async/await.
|
|
@ -1 +0,0 @@
|
|||
Update docker runtime image to Alpine v3.11. Contributed by @Starbix.
|
|
@ -1 +0,0 @@
|
|||
Improve performance of `get_e2e_cross_signing_key`.
|
|
@ -1 +0,0 @@
|
|||
Improve performance of `mark_as_sent_devices_by_remote`.
|
|
@ -1 +0,0 @@
|
|||
Allow for using more than one spam checker module at once.
|
|
@ -1 +0,0 @@
|
|||
Support any process writing to cache invalidation stream.
|
|
@ -1 +0,0 @@
|
|||
Refactor event persistence database functions in preparation for allowing them to be run on non-master processes.
|
|
@ -1 +0,0 @@
|
|||
Allow expired user accounts to log out their device sessions.
|
|
@ -1 +0,0 @@
|
|||
Add type hints to the SAML handler.
|
|
@ -1 +0,0 @@
|
|||
Remove storage method `get_hosts_in_room` that is no longer called anywhere.
|
|
@ -1 +0,0 @@
|
|||
Fix some typos in the notice_expiry templates.
|
|
@ -1 +0,0 @@
|
|||
Fix a bug that would cause Synapse not to resync out-of-sync device lists.
|
|
@ -1 +0,0 @@
|
|||
Add OpenID Connect login/registration support. Contributed by Quentin Gliech, on behalf of [les Connecteurs](https://connecteu.rs).
|
|
@ -1 +0,0 @@
|
|||
Update information about mapping providers for SAML and OpenID.
|
|
@ -1 +0,0 @@
|
|||
Convert the federation handler to async/await.
|
|
@ -1 +0,0 @@
|
|||
Convert the search handler to async/await.
|
|
@ -1 +0,0 @@
|
|||
Add additional reverse proxy example for Caddy v2. Contributed by Jeff Peeler.
|
|
@ -1 +0,0 @@
|
|||
Prevent rooms with 0 members or with invalid version strings from breaking group queries.
|
|
@ -1 +0,0 @@
|
|||
Fix linting errors in new version of Flake8.
|
|
@ -1 +0,0 @@
|
|||
Workaround for an upstream Twisted bug that caused Synapse to become unresponsive after startup.
|
|
@ -1 +0,0 @@
|
|||
Have all instance correctly respond to REPLICATE command.
|
|
@ -1 +0,0 @@
|
|||
Fix copy-paste error in `ServerNoticesConfig` docstring. Contributed by @ptman.
|
|
@ -1 +0,0 @@
|
|||
Fix Redis reconnection logic that can result in missed updates over replication if master reconnects to Redis without restarting.
|
|
@ -1 +0,0 @@
|
|||
Clean up replication unit tests.
|
|
@ -1 +0,0 @@
|
|||
Move event stream handling out of slave store.
|
|
@ -1 +0,0 @@
|
|||
Allow censoring of events to happen on workers.
|
|
@ -1 +0,0 @@
|
|||
Move EventStream handling into default ReplicationDataHandler.
|
|
@ -1 +0,0 @@
|
|||
Add `instance_map` config and route replication calls.
|
|
@ -1 +0,0 @@
|
|||
When sending `m.room.member` events, omit `displayname` and `avatar_url` if they aren't set instead of setting them to `null`. Contributed by Aaron Raimist.
|
|
@ -1 +0,0 @@
|
|||
Add additional authentication checks for m.room.power_levels event per [MSC2209](https://github.com/matrix-org/matrix-doc/pull/2209).
|
|
@ -1 +0,0 @@
|
|||
Fix incorrect `method` label on `synapse_http_matrixfederationclient_{requests,responses}` prometheus metrics.
|
|
@ -1 +0,0 @@
|
|||
Add type hints to `synapse.event_auth`.
|
|
@ -1 +0,0 @@
|
|||
Implement room version 6 per [MSC2240](https://github.com/matrix-org/matrix-doc/pull/2240).
|
|
@ -1 +0,0 @@
|
|||
Convert the room member handler to async/await.
|
|
@ -1 +0,0 @@
|
|||
Ignore incoming presence events from other homeservers if presence is disabled locally.
|
|
@ -1 +0,0 @@
|
|||
Fix a long-standing bug that broke the update remote profile background process.
|
|
@ -1 +0,0 @@
|
|||
Add type hints to room member handler.
|
|
@ -1 +0,0 @@
|
|||
Improve the formatting of `reverse_proxy.md`.
|
|
@ -1 +0,0 @@
|
|||
Allow `ReplicationRestResource` to be added to workers.
|
|
@ -1 +0,0 @@
|
|||
Add a worker store for search insertion, required for moving event persistence off master.
|
|
@ -1 +0,0 @@
|
|||
Add option to move event persistence off master.
|
|
@ -1 +0,0 @@
|
|||
Fix typing annotations in `tests.replication`.
|
|
@ -1 +0,0 @@
|
|||
Remove some redundant Python 2 support code.
|
|
@ -1 +0,0 @@
|
|||
Change the systemd worker service to check that the worker config file exists instead of silently failing. Contributed by David Vo.
|
|
@ -1 +0,0 @@
|
|||
Minor clarifications to the TURN docs.
|
|
@ -1 +0,0 @@
|
|||
All endpoints now respond with a 200 OK for `OPTIONS` requests.
|
|
@ -1 +0,0 @@
|
|||
Synapse now exports [detailed allocator statistics](https://doc.pypy.org/en/latest/gc_info.html#gc-get-stats) and basic GC timings as Prometheus metrics (`pypy_gc_time_seconds_total` and `pypy_memory_bytes`) when run under PyPy. Contributed by Ivan Shapovalov.
|
|
@ -1 +0,0 @@
|
|||
Hash passwords as early as possible during password reset.
|
|
@ -1 +0,0 @@
|
|||
Remove Ubuntu Cosmic and Disco from the list of distributions which we provide `.deb`s for, due to end-of-life.
|
|
@ -1 +0,0 @@
|
|||
Add ability to wait for replication streams.
|
|
@ -1 +0,0 @@
|
|||
Make worker processes return a stubbed-out response to `GET /presence` requests.
|
|
@ -1 +0,0 @@
|
|||
Optimise some references to `hs.config`.
|
|
@ -1 +0,0 @@
|
|||
On upgrade room only send canonical alias once.
|
|
@ -1 +0,0 @@
|
|||
Fix bug where a local user leaving a room could fail under rare circumstances.
|
|
@ -1 +0,0 @@
|
|||
Fix some indentation inconsistencies in the sample config.
|
|
@ -1 +0,0 @@
|
|||
Fix "Missing RelayState parameter" error when using user interactive authentication with SAML for some SAML providers.
|
|
@ -1 +0,0 @@
|
|||
Include `synapse.http.site` in type checking.
|
|
@ -1 +0,0 @@
|
|||
Fix some test code to not mangle stacktraces, to make it easier to debug errors.
|
|
@ -1 +0,0 @@
|
|||
Refresh apt cache when building dh_virtualenv docker image.
|
|
@ -1 +0,0 @@
|
|||
Stop logging some expected HTTP request errors as exceptions.
|
|
@ -1 +0,0 @@
|
|||
Convert sending mail to async/await.
|
|
@ -1 +0,0 @@
|
|||
Simplify `reap_monthly_active_users`.
|
|
@ -1 +0,0 @@
|
|||
All endpoints now respond with a 200 OK for `OPTIONS` requests.
|
1
changelog.d/7578.bugfix
Normal file
1
changelog.d/7578.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix cache config to not apply cache factor to event cache. Regression in v1.14.0rc1.
|
1
changelog.d/7579.bugfix
Normal file
1
changelog.d/7579.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix bug where `ReplicationStreamer` was not always started when replication was enabled. Bug introduced in v1.14.0rc1.
|
1
changelog.d/7580.bugfix
Normal file
1
changelog.d/7580.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix specifying individual cache factors for caches with special characters in their name. Regression in v1.14.0rc1.
|
|
@ -643,6 +643,12 @@ caches:
|
|||
# takes priority over setting through the config file.
|
||||
# Ex. SYNAPSE_CACHE_FACTOR_GET_USERS_WHO_SHARE_ROOM_WITH_USER=2.0
|
||||
#
|
||||
# Some caches have '*' and other characters that are not
|
||||
# alphanumeric or underscores. These caches can be named with or
|
||||
# without the special characters stripped. For example, to specify
|
||||
# the cache factor for `*stateGroupCache*` via an environment
|
||||
# variable would be `SYNAPSE_CACHE_FACTOR_STATEGROUPCACHE=2.0`.
|
||||
#
|
||||
per_cache_factors:
|
||||
#get_users_who_share_room_with_user: 2.0
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.13.0"
|
||||
__version__ = "1.14.0rc1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
import contextlib
|
||||
import logging
|
||||
import sys
|
||||
from typing import Dict, Iterable
|
||||
from typing import Dict, Iterable, Optional, Set
|
||||
|
||||
from typing_extensions import ContextManager
|
||||
|
||||
|
@ -214,7 +214,7 @@ class KeyUploadServlet(RestServlet):
|
|||
self.main_uri + request.uri.decode("ascii"), body, headers=headers
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse() from e
|
||||
raise e.to_synapse_error() from e
|
||||
except RequestSendFailed as e:
|
||||
raise SynapseError(502, "Failed to talk to master") from e
|
||||
|
||||
|
@ -677,10 +677,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
|||
self.notify_pushers = hs.config.start_pushers
|
||||
self.pusher_pool = hs.get_pusherpool()
|
||||
|
||||
self.send_handler = None # type: Optional[FederationSenderHandler]
|
||||
if hs.config.send_federation:
|
||||
self.send_handler = FederationSenderHandler(hs, self)
|
||||
else:
|
||||
self.send_handler = None
|
||||
self.send_handler = FederationSenderHandler(hs)
|
||||
|
||||
async def on_rdata(self, stream_name, instance_name, token, rows):
|
||||
await super().on_rdata(stream_name, instance_name, token, rows)
|
||||
|
@ -718,7 +717,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
|||
if entities:
|
||||
self.notifier.on_new_event("to_device_key", token, users=entities)
|
||||
elif stream_name == DeviceListsStream.NAME:
|
||||
all_room_ids = set()
|
||||
all_room_ids = set() # type: Set[str]
|
||||
for row in rows:
|
||||
if row.entity.startswith("@"):
|
||||
room_ids = await self.store.get_rooms_for_user(row.entity)
|
||||
|
@ -769,24 +768,33 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
|||
|
||||
|
||||
class FederationSenderHandler(object):
|
||||
"""Processes the replication stream and forwards the appropriate entries
|
||||
to the federation sender.
|
||||
"""Processes the fedration replication stream
|
||||
|
||||
This class is only instantiate on the worker responsible for sending outbound
|
||||
federation transactions. It receives rows from the replication stream and forwards
|
||||
the appropriate entries to the FederationSender class.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: GenericWorkerServer, replication_client):
|
||||
def __init__(self, hs: GenericWorkerServer):
|
||||
self.store = hs.get_datastore()
|
||||
self._is_mine_id = hs.is_mine_id
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self.replication_client = replication_client
|
||||
self._hs = hs
|
||||
|
||||
# if the worker is restarted, we want to pick up where we left off in
|
||||
# the replication stream, so load the position from the database.
|
||||
#
|
||||
# XXX is this actually worthwhile? Whenever the master is restarted, we'll
|
||||
# drop some rows anyway (which is mostly fine because we're only dropping
|
||||
# typing and presence notifications). If the replication stream is
|
||||
# unreliable, why do we do all this hoop-jumping to store the position in the
|
||||
# database? See also https://github.com/matrix-org/synapse/issues/7535.
|
||||
#
|
||||
self.federation_position = self.store.federation_out_pos_startup
|
||||
|
||||
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
|
||||
|
||||
self._last_ack = self.federation_position
|
||||
|
||||
self._room_serials = {}
|
||||
self._room_typing = {}
|
||||
|
||||
def on_start(self):
|
||||
# There may be some events that are persisted but haven't been sent,
|
||||
# so send them now.
|
||||
|
@ -849,22 +857,34 @@ class FederationSenderHandler(object):
|
|||
await self.federation_sender.send_read_receipt(receipt_info)
|
||||
|
||||
async def update_token(self, token):
|
||||
"""Update the record of where we have processed to in the federation stream.
|
||||
|
||||
Called after we have processed a an update received over replication. Sends
|
||||
a FEDERATION_ACK back to the master, and stores the token that we have processed
|
||||
in `federation_stream_position` so that we can restart where we left off.
|
||||
"""
|
||||
try:
|
||||
self.federation_position = token
|
||||
|
||||
# We linearize here to ensure we don't have races updating the token
|
||||
with (await self._fed_position_linearizer.queue(None)):
|
||||
if self._last_ack < self.federation_position:
|
||||
await self.store.update_federation_out_pos(
|
||||
"federation", self.federation_position
|
||||
)
|
||||
#
|
||||
# XXX this appears to be redundant, since the ReplicationCommandHandler
|
||||
# has a linearizer which ensures that we only process one line of
|
||||
# replication data at a time. Should we remove it, or is it doing useful
|
||||
# service for robustness? Or could we replace it with an assertion that
|
||||
# we're not being re-entered?
|
||||
|
||||
# We ACK this token over replication so that the master can drop
|
||||
# its in memory queues
|
||||
self.replication_client.send_federation_ack(
|
||||
self.federation_position
|
||||
)
|
||||
self._last_ack = self.federation_position
|
||||
with (await self._fed_position_linearizer.queue(None)):
|
||||
await self.store.update_federation_out_pos(
|
||||
"federation", self.federation_position
|
||||
)
|
||||
|
||||
# We ACK this token over replication so that the master can drop
|
||||
# its in memory queues
|
||||
self._hs.get_tcp_replication().send_federation_ack(
|
||||
self.federation_position
|
||||
)
|
||||
self._last_ack = self.federation_position
|
||||
except Exception:
|
||||
logger.exception("Error updating federation stream position")
|
||||
|
||||
|
|
|
@ -14,13 +14,17 @@
|
|||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import re
|
||||
from typing import Callable, Dict
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
# The prefix for all cache factor-related environment variables
|
||||
_CACHES = {}
|
||||
_CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"
|
||||
|
||||
# Map from canonicalised cache name to cache.
|
||||
_CACHES = {}
|
||||
|
||||
_DEFAULT_FACTOR_SIZE = 0.5
|
||||
_DEFAULT_EVENT_CACHE_SIZE = "10K"
|
||||
|
||||
|
@ -37,6 +41,20 @@ class CacheProperties(object):
|
|||
properties = CacheProperties()
|
||||
|
||||
|
||||
def _canonicalise_cache_name(cache_name: str) -> str:
|
||||
"""Gets the canonical form of the cache name.
|
||||
|
||||
Since we specify cache names in config and environment variables we need to
|
||||
ignore case and special characters. For example, some caches have asterisks
|
||||
in their name to denote that they're not attached to a particular database
|
||||
function, and these asterisks need to be stripped out
|
||||
"""
|
||||
|
||||
cache_name = re.sub(r"[^A-Za-z_1-9]", "", cache_name)
|
||||
|
||||
return cache_name.lower()
|
||||
|
||||
|
||||
def add_resizable_cache(cache_name: str, cache_resize_callback: Callable):
|
||||
"""Register a cache that's size can dynamically change
|
||||
|
||||
|
@ -45,7 +63,10 @@ def add_resizable_cache(cache_name: str, cache_resize_callback: Callable):
|
|||
cache_resize_callback: A callback function that will be ran whenever
|
||||
the cache needs to be resized
|
||||
"""
|
||||
_CACHES[cache_name.lower()] = cache_resize_callback
|
||||
# Some caches have '*' in them which we strip out.
|
||||
cache_name = _canonicalise_cache_name(cache_name)
|
||||
|
||||
_CACHES[cache_name] = cache_resize_callback
|
||||
|
||||
# Ensure all loaded caches are sized appropriately
|
||||
#
|
||||
|
@ -105,6 +126,12 @@ class CacheConfig(Config):
|
|||
# takes priority over setting through the config file.
|
||||
# Ex. SYNAPSE_CACHE_FACTOR_GET_USERS_WHO_SHARE_ROOM_WITH_USER=2.0
|
||||
#
|
||||
# Some caches have '*' and other characters that are not
|
||||
# alphanumeric or underscores. These caches can be named with or
|
||||
# without the special characters stripped. For example, to specify
|
||||
# the cache factor for `*stateGroupCache*` via an environment
|
||||
# variable would be `SYNAPSE_CACHE_FACTOR_STATEGROUPCACHE=2.0`.
|
||||
#
|
||||
per_cache_factors:
|
||||
#get_users_who_share_room_with_user: 2.0
|
||||
"""
|
||||
|
@ -130,10 +157,17 @@ class CacheConfig(Config):
|
|||
if not isinstance(individual_factors, dict):
|
||||
raise ConfigError("caches.per_cache_factors must be a dictionary")
|
||||
|
||||
# Canonicalise the cache names *before* updating with the environment
|
||||
# variables.
|
||||
individual_factors = {
|
||||
_canonicalise_cache_name(key): val
|
||||
for key, val in individual_factors.items()
|
||||
}
|
||||
|
||||
# Override factors from environment if necessary
|
||||
individual_factors.update(
|
||||
{
|
||||
key[len(_CACHE_PREFIX) + 1 :].lower(): float(val)
|
||||
_canonicalise_cache_name(key[len(_CACHE_PREFIX) + 1 :]): float(val)
|
||||
for key, val in self._environ.items()
|
||||
if key.startswith(_CACHE_PREFIX + "_")
|
||||
}
|
||||
|
@ -142,9 +176,9 @@ class CacheConfig(Config):
|
|||
for cache, factor in individual_factors.items():
|
||||
if not isinstance(factor, (int, float)):
|
||||
raise ConfigError(
|
||||
"caches.per_cache_factors.%s must be a number" % (cache.lower(),)
|
||||
"caches.per_cache_factors.%s must be a number" % (cache,)
|
||||
)
|
||||
self.cache_factors[cache.lower()] = factor
|
||||
self.cache_factors[cache] = factor
|
||||
|
||||
# Resize all caches (if necessary) with the new factors we've loaded
|
||||
self.resize_all_caches()
|
||||
|
|
|
@ -159,6 +159,9 @@ class ReplicationCommandHandler:
|
|||
hs.config.redis_port,
|
||||
)
|
||||
|
||||
# First let's ensure that we have a ReplicationStreamer started.
|
||||
hs.get_replication_streamer()
|
||||
|
||||
# We need two connections to redis, one for the subscription stream and
|
||||
# one to send commands to (as you can't send further redis commands to a
|
||||
# connection after SUBSCRIBE is called).
|
||||
|
|
|
@ -55,10 +55,6 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
|
|||
|
||||
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
|
||||
|
||||
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = (
|
||||
"drop_device_lists_outbound_last_success_non_unique_idx"
|
||||
)
|
||||
|
||||
|
||||
class DeviceWorkerStore(SQLBaseStore):
|
||||
def get_device(self, user_id, device_id):
|
||||
|
@ -749,19 +745,13 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
|||
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
|
||||
)
|
||||
|
||||
# create a unique index on device_lists_outbound_last_success
|
||||
self.db.updates.register_background_index_update(
|
||||
# a pair of background updates that were added during the 1.14 release cycle,
|
||||
# but replaced with 58/06dlols_unique_idx.py
|
||||
self.db.updates.register_noop_background_update(
|
||||
"device_lists_outbound_last_success_unique_idx",
|
||||
index_name="device_lists_outbound_last_success_unique_idx",
|
||||
table="device_lists_outbound_last_success",
|
||||
columns=["destination", "user_id"],
|
||||
unique=True,
|
||||
)
|
||||
|
||||
# once that completes, we can remove the old non-unique index.
|
||||
self.db.updates.register_background_update_handler(
|
||||
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX,
|
||||
self._drop_device_lists_outbound_last_success_non_unique_idx,
|
||||
self.db.updates.register_noop_background_update(
|
||||
"drop_device_lists_outbound_last_success_non_unique_idx",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -838,20 +828,6 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
return rows
|
||||
|
||||
async def _drop_device_lists_outbound_last_success_non_unique_idx(
|
||||
self, progress, batch_size
|
||||
):
|
||||
def f(txn):
|
||||
txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx")
|
||||
|
||||
await self.db.runInteraction(
|
||||
"drop_device_lists_outbound_last_success_non_unique_idx", f,
|
||||
)
|
||||
await self.db.updates._end_background_update(
|
||||
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX
|
||||
)
|
||||
return 1
|
||||
|
||||
|
||||
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
/* Copyright 2020 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- register a background update which will create a unique index on
|
||||
-- device_lists_outbound_last_success
|
||||
INSERT into background_updates (ordering, update_name, progress_json)
|
||||
VALUES (5804, 'device_lists_outbound_last_success_unique_idx', '{}');
|
||||
|
||||
-- once that completes, we can drop the old index.
|
||||
INSERT into background_updates (ordering, update_name, progress_json, depends_on)
|
||||
VALUES (
|
||||
5804,
|
||||
'drop_device_lists_outbound_last_success_non_unique_idx',
|
||||
'{}',
|
||||
'device_lists_outbound_last_success_unique_idx'
|
||||
);
|
|
@ -0,0 +1,80 @@
|
|||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
This migration rebuilds the device_lists_outbound_last_success table without duplicate
|
||||
entries, and with a UNIQUE index.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from io import StringIO
|
||||
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
|
||||
from synapse.storage.prepare_database import execute_statements_from_stream
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
|
||||
# some instances might already have this index, in which case we can skip this
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT 1 FROM pg_class WHERE relkind = 'i'
|
||||
AND relname = 'device_lists_outbound_last_success_unique_idx'
|
||||
"""
|
||||
)
|
||||
|
||||
if cur.rowcount:
|
||||
logger.info(
|
||||
"Unique index exists on device_lists_outbound_last_success: "
|
||||
"skipping rebuild"
|
||||
)
|
||||
return
|
||||
|
||||
logger.info("Rebuilding device_lists_outbound_last_success with unique index")
|
||||
execute_statements_from_stream(cur, StringIO(_rebuild_commands))
|
||||
|
||||
|
||||
# there might be duplicates, so the easiest way to achieve this is to create a new
|
||||
# table with the right data, and renaming it into place
|
||||
|
||||
_rebuild_commands = """
|
||||
DROP TABLE IF EXISTS device_lists_outbound_last_success_new;
|
||||
|
||||
CREATE TABLE device_lists_outbound_last_success_new (
|
||||
destination TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
stream_id BIGINT NOT NULL
|
||||
);
|
||||
|
||||
-- this took about 30 seconds on matrix.org's 16 million rows.
|
||||
INSERT INTO device_lists_outbound_last_success_new
|
||||
SELECT destination, user_id, MAX(stream_id) FROM device_lists_outbound_last_success
|
||||
GROUP BY destination, user_id;
|
||||
|
||||
-- and this another 30 seconds.
|
||||
CREATE UNIQUE INDEX device_lists_outbound_last_success_unique_idx
|
||||
ON device_lists_outbound_last_success_new (destination, user_id);
|
||||
|
||||
DROP TABLE device_lists_outbound_last_success;
|
||||
|
||||
ALTER TABLE device_lists_outbound_last_success_new
|
||||
RENAME TO device_lists_outbound_last_success;
|
||||
"""
|
|
@ -78,7 +78,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
|
|||
"device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
|
||||
"device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
|
||||
"event_search": "event_search_event_id_idx",
|
||||
"device_lists_outbound_last_success": "device_lists_outbound_last_success_unique_idx",
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -19,10 +19,12 @@ import logging
|
|||
import os
|
||||
import re
|
||||
from collections import Counter
|
||||
from typing import TextIO
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.storage.engines.postgres import PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -479,8 +481,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
|
|||
)
|
||||
|
||||
logger.info("applying schema %s for %s", name, modname)
|
||||
for statement in get_statements(stream):
|
||||
cur.execute(statement)
|
||||
execute_statements_from_stream(cur, stream)
|
||||
|
||||
# Mark as done.
|
||||
cur.execute(
|
||||
|
@ -538,8 +539,12 @@ def get_statements(f):
|
|||
|
||||
def executescript(txn, schema_path):
|
||||
with open(schema_path, "r") as f:
|
||||
for statement in get_statements(f):
|
||||
txn.execute(statement)
|
||||
execute_statements_from_stream(txn, f)
|
||||
|
||||
|
||||
def execute_statements_from_stream(cur: Cursor, f: TextIO):
|
||||
for statement in get_statements(f):
|
||||
cur.execute(statement)
|
||||
|
||||
|
||||
def _get_or_create_schema_state(txn, database_engine):
|
||||
|
|
|
@ -81,6 +81,7 @@ class LruCache(object):
|
|||
"""
|
||||
cache = cache_type()
|
||||
self.cache = cache # Used for introspection.
|
||||
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
||||
|
||||
# Save the original max size, and apply the default size factor.
|
||||
self._original_max_size = max_size
|
||||
|
@ -294,6 +295,9 @@ class LruCache(object):
|
|||
Returns:
|
||||
bool: Whether the cache changed size or not.
|
||||
"""
|
||||
if not self.apply_cache_factor_from_config:
|
||||
return False
|
||||
|
||||
new_size = int(self._original_max_size * factor)
|
||||
if new_size != self.max_size:
|
||||
self.max_size = new_size
|
||||
|
|
|
@ -125,3 +125,47 @@ class CacheConfigTests(TestCase):
|
|||
cache = LruCache(100)
|
||||
add_resizable_cache("foo", cache_resize_callback=cache.set_cache_factor)
|
||||
self.assertEqual(cache.max_size, 150)
|
||||
|
||||
def test_cache_with_asterisk_in_name(self):
|
||||
"""Some caches have asterisks in their name, test that they are set correctly.
|
||||
"""
|
||||
|
||||
config = {
|
||||
"caches": {
|
||||
"per_cache_factors": {"*cache_a*": 5, "cache_b": 6, "cache_c": 2}
|
||||
}
|
||||
}
|
||||
t = TestConfig()
|
||||
t.caches._environ = {
|
||||
"SYNAPSE_CACHE_FACTOR_CACHE_A": "2",
|
||||
"SYNAPSE_CACHE_FACTOR_CACHE_B": 3,
|
||||
}
|
||||
t.read_config(config, config_dir_path="", data_dir_path="")
|
||||
|
||||
cache_a = LruCache(100)
|
||||
add_resizable_cache("*cache_a*", cache_resize_callback=cache_a.set_cache_factor)
|
||||
self.assertEqual(cache_a.max_size, 200)
|
||||
|
||||
cache_b = LruCache(100)
|
||||
add_resizable_cache("*Cache_b*", cache_resize_callback=cache_b.set_cache_factor)
|
||||
self.assertEqual(cache_b.max_size, 300)
|
||||
|
||||
cache_c = LruCache(100)
|
||||
add_resizable_cache("*cache_c*", cache_resize_callback=cache_c.set_cache_factor)
|
||||
self.assertEqual(cache_c.max_size, 200)
|
||||
|
||||
def test_apply_cache_factor_from_config(self):
|
||||
"""Caches can disable applying cache factor updates, mainly used by
|
||||
event cache size.
|
||||
"""
|
||||
|
||||
config = {"caches": {"event_cache_size": "10k"}}
|
||||
t = TestConfig()
|
||||
t.read_config(config, config_dir_path="", data_dir_path="")
|
||||
|
||||
cache = LruCache(
|
||||
max_size=t.caches.event_cache_size, apply_cache_factor_from_config=False,
|
||||
)
|
||||
add_resizable_cache("event_cache", cache_resize_callback=cache.set_cache_factor)
|
||||
|
||||
self.assertEqual(cache.max_size, 10240)
|
||||
|
|
71
tests/replication/test_federation_ack.py
Normal file
71
tests/replication/test_federation_ack.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from synapse.app.generic_worker import GenericWorkerServer
|
||||
from synapse.replication.tcp.commands import FederationAckCommand
|
||||
from synapse.replication.tcp.protocol import AbstractConnection
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
class FederationAckTestCase(HomeserverTestCase):
|
||||
def default_config(self) -> dict:
|
||||
config = super().default_config()
|
||||
config["worker_app"] = "synapse.app.federation_sender"
|
||||
config["send_federation"] = True
|
||||
return config
|
||||
|
||||
def make_homeserver(self, reactor, clock):
|
||||
hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer)
|
||||
return hs
|
||||
|
||||
def test_federation_ack_sent(self):
|
||||
"""A FEDERATION_ACK should be sent back after each RDATA federation
|
||||
|
||||
This test checks that the federation sender is correctly sending back
|
||||
FEDERATION_ACK messages. The test works by spinning up a federation_sender
|
||||
worker server, and then fishing out its ReplicationCommandHandler. We wire
|
||||
the RCH up to a mock connection (so that we can observe the command being sent)
|
||||
and then poke in an RDATA row.
|
||||
|
||||
XXX: it might be nice to do this by pretending to be a synapse master worker
|
||||
(or a redis server), and having the worker connect to us via a mocked-up TCP
|
||||
transport, rather than assuming that the implementation has a
|
||||
ReplicationCommandHandler.
|
||||
"""
|
||||
rch = self.hs.get_tcp_replication()
|
||||
|
||||
# wire up the ReplicationCommandHandler to a mock connection
|
||||
mock_connection = mock.Mock(spec=AbstractConnection)
|
||||
rch.new_connection(mock_connection)
|
||||
|
||||
# tell it it received an RDATA row
|
||||
self.get_success(
|
||||
rch.on_rdata(
|
||||
"federation",
|
||||
"master",
|
||||
token=10,
|
||||
rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
|
||||
)
|
||||
)
|
||||
|
||||
# now check that the FEDERATION_ACK was sent
|
||||
mock_connection.send_command.assert_called_once()
|
||||
cmd = mock_connection.send_command.call_args[0][0]
|
||||
assert isinstance(cmd, FederationAckCommand)
|
||||
self.assertEqual(cmd.token, 10)
|
Loading…
Reference in a new issue