Compare commits

...

7 commits

Author SHA1 Message Date
Andrew Morgan a7bf3f6f95 wip tests 2021-11-11 07:21:28 +00:00
Andrew Morgan 42f0ab1d84 Changelog 2021-11-10 16:11:01 +00:00
Andrew Morgan 6bcd8dee42 Add a to_device_stream_id column to the application_services_state table
This is for tracking the stream id that each application service has
been sent up to. In other words, there shouldn't be any need to process
stream ids below the recorded one here as the AS should have already
received them.

Note that there is no reliability built-in here. Reliability of delivery
if intended for a separate PR.
2021-11-10 16:11:01 +00:00
Andrew Morgan 43bdfbbc85 Add database method to fetch to-device messages by user_ids from db
This method is quite similar to the one below, except that it doesn't
support device ids, and supports querying with more than one user id,
both of which are relevant to application services.

The results are also formatted in a different data structure, so I'm
not sure how much we could really share here between the two methods.
2021-11-10 16:10:57 +00:00
Andrew Morgan 59b5eeefe1 Allow setting/getting stream id per appservice for to-device messages 2021-11-10 16:10:57 +00:00
Andrew Morgan 52c43d91f5 Add a new ephemeral AS handler for to_device message edus
Here we add the ability for the application service ephemeral message
processor to handle new events on the "to_device" stream.

We keep track of a stream id (token) per application service, and every
time a new to-device message comes in, for each appservice we pull the
messages between the last-recorded and current stream id and check
whether any of the messages are for a user in that appservice's user
namespace.

get_new_messages is implemented in the next commit.
2021-11-10 16:10:41 +00:00
Andrew Morgan 0772fc855b Add experimental config option to send to-device messages to AS's 2021-11-10 16:06:58 +00:00
12 changed files with 578 additions and 27 deletions

View file

@ -0,0 +1 @@
Add experimental support for sending to-device messages to application services, as specified by [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409). Disabled by default.

View file

@ -46,3 +46,11 @@ class ExperimentalConfig(Config):
# MSC3266 (room summary api)
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
# MSC2409 (this setting only relates to optionally sending to-device messages).
# Presence, typing and read receipt EDUs are already sent to application services that
# have opted in to receive them. This setting, if enabled, adds to-device messages
# to that list.
self.msc2409_to_device_messages_enabled: bool = experimental.get(
"msc2409_to_device_messages_enabled", False
)

View file

@ -12,7 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)
from prometheus_client import Counter
@ -42,6 +51,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION = 100
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
@ -55,6 +66,9 @@ class ApplicationServicesHandler:
self.clock = hs.get_clock()
self.notify_appservices = hs.config.appservice.notify_appservices
self.event_sources = hs.get_event_sources()
self.msc2409_to_device_messages_enabled = (
hs.config.experimental.msc2409_to_device_messages_enabled
)
self.current_max = 0
self.is_processing = False
@ -199,8 +213,9 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
`stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
value for `stream_key` will cause this function to return early.
`stream_key` can be "typing_key", "receipt_key", "presence_key" or
"to_device_key". Any other value for `stream_key` will cause this function
to return early.
Ephemeral events will only be pushed to appservices that have opted into
receiving them by setting `push_ephemeral` to true in their registration
@ -216,10 +231,6 @@ class ApplicationServicesHandler:
if not self.notify_appservices:
return
# Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
# Assert that new_token is an integer (and not a RoomStreamToken).
# All of the supported streams that this function handles use an
# integer to track progress (rather than a RoomStreamToken - a
@ -233,6 +244,13 @@ class ApplicationServicesHandler:
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
# Ignore to-device messages if the feature flag is not enabled
if (
stream_key == "to_device_key"
and not self.msc2409_to_device_messages_enabled
):
return
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
@ -307,6 +325,30 @@ class ApplicationServicesHandler:
service, "presence", new_token
)
elif (
stream_key == "to_device_key"
and new_token is not None
and self.msc2409_to_device_messages_enabled
):
# Retrieve an iterable of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
events, max_stream_token = await self._handle_to_device(
service, new_token, users
)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)
# TODO: If max_stream_token != new_token, schedule another transaction immediately,
# instead of waiting for another to-device to be sent?
# https://github.com/matrix-org/synapse/issues/11150#issuecomment-960726449
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "to_device", max_stream_token
)
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
@ -440,6 +482,88 @@ class ApplicationServicesHandler:
return events
async def _handle_to_device(
self,
service: ApplicationService,
new_token: int,
users: Collection[Union[str, UserID]],
) -> Tuple[List[JsonDict], int]:
"""
Given an application service, determine which events it should receive
from those between the last-recorded typing event stream token for this
appservice and the given stream token.
Args:
service: The application service to check for which events it should receive.
new_token: The latest to-device event stream token.
users: The users that should receive new to-device messages.
Returns:
A two-tuple containing the following:
* A list of JSON dictionaries containing data derived from the typing events
that should be sent to the given application service.
* The last-processed to-device message's stream id. If this does not equal
`new_token` then there are likely more events to send.
"""
# Get the stream token that this application service has processed up until
from_key = await self.store.get_type_stream_id_for_appservice(
service, "to_device"
)
# Filter out users that this appservice is not interested in
users_appservice_is_interested_in: List[str] = []
for user in users:
if isinstance(user, UserID):
user = user.to_string()
if service.is_interested_in_user(user):
users_appservice_is_interested_in.append(user)
if not users_appservice_is_interested_in:
# Return early if the AS was not interested in any of these users
return [], new_token
# Retrieve the to-device messages for each user
(
recipient_user_id_device_id_to_messages,
# Record the maximum stream token of the retrieved messages that
# this function was able to pull before hitting the max to-device
# message count limit.
# We return this value later, to ensure we only record the stream
# token we managed to get up to, so that the rest can be sent later.
max_stream_token,
) = await self.store.get_new_messages(
users_appservice_is_interested_in,
from_key,
new_token,
limit=MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION,
)
# According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
# to the event JSON so that the application service will know which user/device
# combination this messages was intended for.
#
# So we mangle this dict into a flat list of to-device messages with the relevant
# user ID and device ID embedded inside each message dict.
message_payload: List[JsonDict] = []
for (
user_id,
device_id,
), messages in recipient_user_id_device_id_to_messages.items():
for message_json in messages:
# Remove 'message_id' from the to-device message, as it's an internal ID
message_json.pop("message_id", None)
message_payload.append(
{
"to_user_id": user_id,
"to_device_id": device_id,
**message_json,
}
)
return message_payload, max_stream_token
async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.

View file

@ -444,15 +444,23 @@ class Notifier:
self.notify_replication()
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")
# Notify appservices of updates in ephemeral event streams.
# Only the following streams are currently supported.
if stream_key in (
"typing_key",
"receipt_key",
"presence_key",
"to_device_key",
):
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened

View file

@ -387,7 +387,7 @@ class ApplicationServiceTransactionWorkerStore(
async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
if type not in ("read_receipt", "presence"):
if type not in ("read_receipt", "presence", "to_device"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
@ -414,7 +414,7 @@ class ApplicationServiceTransactionWorkerStore(
async def set_type_stream_id_for_appservice(
self, service: ApplicationService, stream_type: str, pos: Optional[int]
) -> None:
if stream_type not in ("read_receipt", "presence"):
if stream_type not in ("read_receipt", "presence", "to_device"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (stream_type,)

View file

@ -13,13 +13,17 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.database import (
DatabasePool,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import JsonDict
@ -116,6 +120,97 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def get_to_device_stream_token(self):
return self._device_inbox_id_gen.get_current_token()
async def get_new_messages(
self,
user_ids: Collection[str],
from_stream_id: int,
to_stream_id: int,
limit: int = 100,
) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
"""
Retrieve to-device messages for a given set of user IDs.
Only to-device messages with stream ids between the given boundaries
(from < X <= to) are returned.
Note that multiple messages can have the same stream id. Stream ids are
unique to *messages*, but there might be multiple recipients of a message, and
thus multiple entries in the device_inbox table with the same stream id.
Args:
user_ids: The users to retrieve to-device messages for.
from_stream_id: The lower boundary of stream id to filter with (exclusive).
to_stream_id: The upper boundary of stream id to filter with (inclusive).
limit: The maximum number of to-device messages to return.
Returns:
A tuple containing the following:
* A list of to-device messages
* The stream id that to-device messages were processed up to. The calling
function can check this value against `to_stream_id` to see if we hit
the given limit when fetching messages.
"""
# Bail out if none of these users have any messages
for user_id in user_ids:
if self._device_inbox_stream_cache.has_entity_changed(
user_id, from_stream_id
):
break
else:
return {}, to_stream_id
def get_new_messages_txn(txn: LoggingTransaction):
# Build a query to select messages from any of the given users that are between
# the given stream id bounds
# Scope to only the given users. We need to use this method as doing so is
# different across database engines.
many_clause_sql, many_clause_args = make_in_list_sql_clause(
self.database_engine, "user_id", user_ids
)
sql = f"""
SELECT stream_id, user_id, device_id, message_json FROM device_inbox
WHERE {many_clause_sql}
AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (*many_clause_args, from_stream_id, to_stream_id, limit))
# Create a dictionary of (user ID, device ID) -> list of messages that
# that device is meant to receive.
recipient_user_id_device_id_to_messages = {}
stream_pos = to_stream_id
total_messages_processed = 0
for row in txn:
# Record the last-processed stream position, to return later.
# Note that we process messages here in order of ascending stream id.
stream_pos = row[0]
recipient_user_id = row[1]
recipient_device_id = row[2]
message_dict = db_to_json(row[3])
recipient_user_id_device_id_to_messages.setdefault(
(recipient_user_id, recipient_device_id), []
).append(message_dict)
total_messages_processed += 1
# If we don't end up hitting the limit, we still want to return the equivalent
# value of `to_stream_id` to the calling function. This is needed as we'll
# be processing up to `to_stream_id`, without necessarily fetching a message
# with a stream id of `to_stream_id`.
if total_messages_processed < limit:
stream_pos = to_stream_id
return recipient_user_id_device_id_to_messages, stream_pos
return await self.db_pool.runInteraction(
"get_new_messages", get_new_messages_txn
)
async def get_new_messages_for_device(
self,
user_id: str,

View file

@ -0,0 +1,18 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a column to track what to_device stream id that this application
-- service has been caught up to.
ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT;

View file

@ -253,20 +253,59 @@ class AppServiceHandlerTestCase(unittest.TestCase):
},
)
def test_notify_interested_services_ephemeral(self):
def test_notify_interested_services_ephemeral_read_receipt(self):
"""
Test sending ephemeral events to the appservice handler are scheduled
Test sending read receipts to the appservice handler are scheduled
to be pushed out to interested appservices, and that the stream ID is
updated accordingly.
"""
# Create an application service that is guaranteed to be interested in
# any new events
interested_service = self._mkservice(is_interested=True)
services = [interested_service]
self.mock_store.get_app_services.return_value = services
# State that this application service has received up until stream ID 579
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
579
)
# Set up a dummy event that should be sent to the application service
event = Mock(event_id="event_1")
self.event_source.sources.receipt.get_new_events_as.return_value = (
make_awaitable(([event], None))
)
self.handler.notify_interested_services_ephemeral(
"receipt_key", 580, ["@fakerecipient:example.com"]
)
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
interested_service, [event]
)
self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
interested_service,
"read_receipt",
580,
)
def test_notify_interested_services_ephemeral_to_device(self):
"""
Test sending read receipts to the appservice handler are scheduled
to be pushed out to interested appservices, and that the stream ID is
updated accordingly.
"""
# Create an application service that is guaranteed to be interested in
# any new events
interested_service = self._mkservice(is_interested=True)
services = [interested_service]
self.mock_store.get_app_services.return_value = services
# State that this application service has received up until stream ID 579
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
579
)
# Set up a dummy event that should be sent to the application service
event = Mock(event_id="event_1")
self.event_source.sources.receipt.get_new_events_as.return_value = (
make_awaitable(([event], None))

View file

@ -0,0 +1,258 @@
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Iterable, Optional, Tuple, Union
from unittest.mock import Mock
import synapse.rest.admin
import synapse.storage
from synapse.appservice import ApplicationService
from synapse.rest.client import login, receipts, room
from synapse.util.stringutils import random_string
from synapse.types import JsonDict
from tests import unittest
class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets_for_client_rest_resource,
login.register_servlets,
room.register_servlets,
receipts.register_servlets,
]
def prepare(self, reactor, clock, hs):
# Mock the application service scheduler so that we can track any outgoing transactions
self.mock_scheduler = Mock()
self.mock_scheduler.submit_ephemeral_events_for_as = Mock()
hs.get_application_service_handler().scheduler = self.mock_scheduler
self.user1 = self.register_user("user1", "password")
self.token1 = self.login("user1", "password")
self.user2 = self.register_user("user2", "password")
self.token2 = self.login("user2", "password")
def test_application_services_receive_read_receipts(self):
"""
Test that when a user sends a read receipt in a room with another
user, and that is in an application service's user namespace, that
application service will receive that read receipt.
"""
(
interested_service,
_,
) = self._register_interested_and_uninterested_application_services()
# Create a room with both user1 and user2
room_id = self.helper.create_room_as(
self.user1, tok=self.token1, is_public=True
)
self.helper.join(room_id, self.user2, tok=self.token2)
# Have user2 send a message into the room
response_dict = self.helper.send(room_id, body="read me", tok=self.token2)
# Have user1 send a read receipt for the message with an empty body
self._send_read_receipt(room_id, response_dict["event_id"], self.token1)
# user2 should have been the recipient of that read receipt.
# Check if our application service - that is interested in user2 - received
# the read receipt as part of an AS transaction.
#
# The uninterested application service should not have been notified.
last_call = self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list[
0
]
service, events = last_call[0]
self.assertEqual(service, interested_service)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]["type"], "m.receipt")
self.assertEqual(events[0]["room_id"], room_id)
# Assert that this was a read receipt from user1
read_receipts = list(events[0]["content"].values())
self.assertIn(self.user1, read_receipts[0]["m.read"])
# Clear mock stats
self.mock_scheduler.submit_ephemeral_events_for_as.reset_mock()
# Send 2 pairs of messages + read receipts
response_dict_1 = self.helper.send(room_id, body="read me1", tok=self.token2)
response_dict_2 = self.helper.send(room_id, body="read me2", tok=self.token2)
self._send_read_receipt(room_id, response_dict_1["event_id"], self.token1)
self._send_read_receipt(room_id, response_dict_2["event_id"], self.token1)
# Assert each transaction that was sent to the application service is as expected
self.assertEqual(2, self.mock_scheduler.submit_ephemeral_events_for_as.call_count)
first_call, second_call = self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list
service, events = first_call[0]
self.assertEqual(service, interested_service)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]["type"], "m.receipt")
self.assertEqual(
self._event_id_from_read_receipt(events[0]), response_dict_1["event_id"]
)
service, events = second_call[0]
self.assertEqual(service, interested_service)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]["type"], "m.receipt")
self.assertEqual(
self._event_id_from_read_receipt(events[0]), response_dict_2["event_id"]
)
def test_application_services_receive_to_device(self):
"""
Test that when a user sends a to-device message to another user, and
that is in an application service's user namespace, that application
service will receive it.
"""
(
interested_service,
_,
) = self._register_interested_and_uninterested_application_services()
# Create a room with both user1 and user2
room_id = self.helper.create_room_as(
self.user1, tok=self.token1, is_public=True
)
self.helper.join(room_id, self.user2, tok=self.token2)
# Have user2 send a typing notification into the room
response_dict = self.helper.send(room_id, body="read me", tok=self.token2)
# Have user1 send a read receipt for the message with an empty body
channel = self.make_request(
"POST",
"/rooms/%s/receipt/m.read/%s" % (room_id, response_dict["event_id"]),
access_token=self.token1,
)
self.assertEqual(channel.code, 200)
# user2 should have been the recipient of that read receipt.
# Check if our application service - that is interested in user2 - received
# the read receipt as part of an AS transaction.
#
# The uninterested application service should not have been notified.
service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[
0
]
self.assertEqual(service, interested_service)
self.assertEqual(events[0]["type"], "m.receipt")
self.assertEqual(events[0]["room_id"], room_id)
# Assert that this was a read receipt from user1
read_receipts = list(events[0]["content"].values())
self.assertIn(self.user1, read_receipts[0]["m.read"])
def _register_interested_and_uninterested_application_services(
self,
) -> Tuple[ApplicationService, ApplicationService]:
# Create an application service with exclusive interest in user2
interested_service = self._make_application_service(
namespaces={
ApplicationService.NS_USERS: [
{
"regex": "@user2:.+",
"exclusive": True,
}
],
},
)
uninterested_service = self._make_application_service()
# Register this application service, along with another, uninterested one
services = [
uninterested_service,
interested_service,
]
self.hs.get_datastore().get_app_services = Mock(return_value=services)
return interested_service, uninterested_service
def _make_application_service(
self,
namespaces: Optional[
Dict[
Union[
ApplicationService.NS_USERS,
ApplicationService.NS_ALIASES,
ApplicationService.NS_ROOMS,
],
Iterable[Dict],
]
] = None,
supports_ephemeral: Optional[bool] = True,
) -> ApplicationService:
return ApplicationService(
token=None,
hostname="example.com",
id=random_string(10),
sender="@as:example.com",
rate_limited=False,
namespaces=namespaces,
supports_ephemeral=supports_ephemeral,
)
def _send_read_receipt(self, room_id: str, event_id_to_read: str, tok: str) -> None:
"""
Send a read receipt of an event into a room.
Args:
room_id: The room to event is part of.
event_id_to_read: The ID of the event being read.
tok: The access token of the sender.
"""
channel = self.make_request(
"POST",
"/rooms/%s/receipt/m.read/%s" % (room_id, event_id_to_read),
access_token=tok,
content="{}",
)
self.assertEqual(channel.code, 200, channel.json_body)
def _event_id_from_read_receipt(self, read_receipt_dict: JsonDict):
"""
Extracts the first event ID from a read receipt. Read receipt dictionaries
are in the form:
{
'type': 'm.receipt',
'room_id': '!PEzCqHyycBVxqMKIjI:test',
'content': {
'$DETIeTEH651c1N7sP_j-YZiaQqCaayHhYwmhZDVWDY8': { # We want this
'm.read': {
'@user1:test': {
'ts': 1300,
'hidden': False
}
}
}
}
}
Args:
read_receipt_dict: The dictionary returned from a POST read receipt call.
Returns:
The (first) event ID the read receipt refers to.
"""
return list(read_receipt_dict["content"].keys())[0]
# TODO: Test that ephemeral messages aren't sent to application services that have
# ephemeral: false

View file

@ -230,7 +230,7 @@ class RestHelper:
custom_headers: Optional[
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
] = None,
):
) -> JsonDict:
if body is None:
body = "body_text_here"
@ -257,7 +257,7 @@ class RestHelper:
custom_headers: Optional[
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
] = None,
):
) -> JsonDict:
if txn_id is None:
txn_id = "m%s" % (str(time.time()))

View file

@ -320,7 +320,7 @@ class HomeserverTestCase(TestCase):
def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1

View file

@ -236,7 +236,7 @@ def setup_test_homeserver(
else:
database_config = {
"name": "sqlite3",
"args": {"database": ":memory:", "cp_min": 1, "cp_max": 1},
"args": {"database": "test.db", "cp_min": 1, "cp_max": 1},
}
if "db_txn_limit" in kwargs: