Compare commits
7 commits
develop
...
anoa/e2e_a
Author | SHA1 | Date | |
---|---|---|---|
a7bf3f6f95 | |||
42f0ab1d84 | |||
6bcd8dee42 | |||
43bdfbbc85 | |||
59b5eeefe1 | |||
52c43d91f5 | |||
0772fc855b |
1
changelog.d/11215.feature
Normal file
1
changelog.d/11215.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add experimental support for sending to-device messages to application services, as specified by [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409). Disabled by default.
|
|
@ -46,3 +46,11 @@ class ExperimentalConfig(Config):
|
|||
|
||||
# MSC3266 (room summary api)
|
||||
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
|
||||
|
||||
# MSC2409 (this setting only relates to optionally sending to-device messages).
|
||||
# Presence, typing and read receipt EDUs are already sent to application services that
|
||||
# have opted in to receive them. This setting, if enabled, adds to-device messages
|
||||
# to that list.
|
||||
self.msc2409_to_device_messages_enabled: bool = experimental.get(
|
||||
"msc2409_to_device_messages_enabled", False
|
||||
)
|
||||
|
|
|
@ -12,7 +12,16 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Collection,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
|
@ -42,6 +51,8 @@ if TYPE_CHECKING:
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION = 100
|
||||
|
||||
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
||||
|
||||
|
||||
|
@ -55,6 +66,9 @@ class ApplicationServicesHandler:
|
|||
self.clock = hs.get_clock()
|
||||
self.notify_appservices = hs.config.appservice.notify_appservices
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.msc2409_to_device_messages_enabled = (
|
||||
hs.config.experimental.msc2409_to_device_messages_enabled
|
||||
)
|
||||
|
||||
self.current_max = 0
|
||||
self.is_processing = False
|
||||
|
@ -199,8 +213,9 @@ class ApplicationServicesHandler:
|
|||
Args:
|
||||
stream_key: The stream the event came from.
|
||||
|
||||
`stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
|
||||
value for `stream_key` will cause this function to return early.
|
||||
`stream_key` can be "typing_key", "receipt_key", "presence_key" or
|
||||
"to_device_key". Any other value for `stream_key` will cause this function
|
||||
to return early.
|
||||
|
||||
Ephemeral events will only be pushed to appservices that have opted into
|
||||
receiving them by setting `push_ephemeral` to true in their registration
|
||||
|
@ -216,10 +231,6 @@ class ApplicationServicesHandler:
|
|||
if not self.notify_appservices:
|
||||
return
|
||||
|
||||
# Ignore any unsupported streams
|
||||
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
|
||||
return
|
||||
|
||||
# Assert that new_token is an integer (and not a RoomStreamToken).
|
||||
# All of the supported streams that this function handles use an
|
||||
# integer to track progress (rather than a RoomStreamToken - a
|
||||
|
@ -233,6 +244,13 @@ class ApplicationServicesHandler:
|
|||
# Additional context: https://github.com/matrix-org/synapse/pull/11137
|
||||
assert isinstance(new_token, int)
|
||||
|
||||
# Ignore to-device messages if the feature flag is not enabled
|
||||
if (
|
||||
stream_key == "to_device_key"
|
||||
and not self.msc2409_to_device_messages_enabled
|
||||
):
|
||||
return
|
||||
|
||||
# Check whether there are any appservices which have registered to receive
|
||||
# ephemeral events.
|
||||
#
|
||||
|
@ -307,6 +325,30 @@ class ApplicationServicesHandler:
|
|||
service, "presence", new_token
|
||||
)
|
||||
|
||||
elif (
|
||||
stream_key == "to_device_key"
|
||||
and new_token is not None
|
||||
and self.msc2409_to_device_messages_enabled
|
||||
):
|
||||
# Retrieve an iterable of to-device message events, as well as the
|
||||
# maximum stream token of the messages we were able to retrieve.
|
||||
events, max_stream_token = await self._handle_to_device(
|
||||
service, new_token, users
|
||||
)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(
|
||||
service, events
|
||||
)
|
||||
|
||||
# TODO: If max_stream_token != new_token, schedule another transaction immediately,
|
||||
# instead of waiting for another to-device to be sent?
|
||||
# https://github.com/matrix-org/synapse/issues/11150#issuecomment-960726449
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "to_device", max_stream_token
|
||||
)
|
||||
|
||||
async def _handle_typing(
|
||||
self, service: ApplicationService, new_token: int
|
||||
) -> List[JsonDict]:
|
||||
|
@ -440,6 +482,88 @@ class ApplicationServicesHandler:
|
|||
|
||||
return events
|
||||
|
||||
async def _handle_to_device(
|
||||
self,
|
||||
service: ApplicationService,
|
||||
new_token: int,
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
"""
|
||||
Given an application service, determine which events it should receive
|
||||
from those between the last-recorded typing event stream token for this
|
||||
appservice and the given stream token.
|
||||
|
||||
Args:
|
||||
service: The application service to check for which events it should receive.
|
||||
new_token: The latest to-device event stream token.
|
||||
users: The users that should receive new to-device messages.
|
||||
|
||||
Returns:
|
||||
A two-tuple containing the following:
|
||||
* A list of JSON dictionaries containing data derived from the typing events
|
||||
that should be sent to the given application service.
|
||||
* The last-processed to-device message's stream id. If this does not equal
|
||||
`new_token` then there are likely more events to send.
|
||||
"""
|
||||
# Get the stream token that this application service has processed up until
|
||||
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||
service, "to_device"
|
||||
)
|
||||
|
||||
# Filter out users that this appservice is not interested in
|
||||
users_appservice_is_interested_in: List[str] = []
|
||||
for user in users:
|
||||
if isinstance(user, UserID):
|
||||
user = user.to_string()
|
||||
|
||||
if service.is_interested_in_user(user):
|
||||
users_appservice_is_interested_in.append(user)
|
||||
|
||||
if not users_appservice_is_interested_in:
|
||||
# Return early if the AS was not interested in any of these users
|
||||
return [], new_token
|
||||
|
||||
# Retrieve the to-device messages for each user
|
||||
(
|
||||
recipient_user_id_device_id_to_messages,
|
||||
# Record the maximum stream token of the retrieved messages that
|
||||
# this function was able to pull before hitting the max to-device
|
||||
# message count limit.
|
||||
# We return this value later, to ensure we only record the stream
|
||||
# token we managed to get up to, so that the rest can be sent later.
|
||||
max_stream_token,
|
||||
) = await self.store.get_new_messages(
|
||||
users_appservice_is_interested_in,
|
||||
from_key,
|
||||
new_token,
|
||||
limit=MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION,
|
||||
)
|
||||
|
||||
# According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
|
||||
# to the event JSON so that the application service will know which user/device
|
||||
# combination this messages was intended for.
|
||||
#
|
||||
# So we mangle this dict into a flat list of to-device messages with the relevant
|
||||
# user ID and device ID embedded inside each message dict.
|
||||
message_payload: List[JsonDict] = []
|
||||
for (
|
||||
user_id,
|
||||
device_id,
|
||||
), messages in recipient_user_id_device_id_to_messages.items():
|
||||
for message_json in messages:
|
||||
# Remove 'message_id' from the to-device message, as it's an internal ID
|
||||
message_json.pop("message_id", None)
|
||||
|
||||
message_payload.append(
|
||||
{
|
||||
"to_user_id": user_id,
|
||||
"to_device_id": device_id,
|
||||
**message_json,
|
||||
}
|
||||
)
|
||||
|
||||
return message_payload, max_stream_token
|
||||
|
||||
async def query_user_exists(self, user_id: str) -> bool:
|
||||
"""Check if any application service knows this user_id exists.
|
||||
|
||||
|
|
|
@ -444,15 +444,23 @@ class Notifier:
|
|||
|
||||
self.notify_replication()
|
||||
|
||||
# Notify appservices.
|
||||
try:
|
||||
self.appservice_handler.notify_interested_services_ephemeral(
|
||||
stream_key,
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of event")
|
||||
# Notify appservices of updates in ephemeral event streams.
|
||||
# Only the following streams are currently supported.
|
||||
if stream_key in (
|
||||
"typing_key",
|
||||
"receipt_key",
|
||||
"presence_key",
|
||||
"to_device_key",
|
||||
):
|
||||
# Notify appservices.
|
||||
try:
|
||||
self.appservice_handler.notify_interested_services_ephemeral(
|
||||
stream_key,
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of event")
|
||||
|
||||
def on_new_replication_data(self) -> None:
|
||||
"""Used to inform replication listeners that something has happened
|
||||
|
|
|
@ -387,7 +387,7 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
async def get_type_stream_id_for_appservice(
|
||||
self, service: ApplicationService, type: str
|
||||
) -> int:
|
||||
if type not in ("read_receipt", "presence"):
|
||||
if type not in ("read_receipt", "presence", "to_device"):
|
||||
raise ValueError(
|
||||
"Expected type to be a valid application stream id type, got %s"
|
||||
% (type,)
|
||||
|
@ -414,7 +414,7 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
async def set_type_stream_id_for_appservice(
|
||||
self, service: ApplicationService, stream_type: str, pos: Optional[int]
|
||||
) -> None:
|
||||
if stream_type not in ("read_receipt", "presence"):
|
||||
if stream_type not in ("read_receipt", "presence", "to_device"):
|
||||
raise ValueError(
|
||||
"Expected type to be a valid application stream id type, got %s"
|
||||
% (stream_type,)
|
||||
|
|
|
@ -13,13 +13,17 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple
|
||||
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingTransaction,
|
||||
make_in_list_sql_clause,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
|
@ -116,6 +120,97 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
def get_to_device_stream_token(self):
|
||||
return self._device_inbox_id_gen.get_current_token()
|
||||
|
||||
async def get_new_messages(
|
||||
self,
|
||||
user_ids: Collection[str],
|
||||
from_stream_id: int,
|
||||
to_stream_id: int,
|
||||
limit: int = 100,
|
||||
) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
|
||||
"""
|
||||
Retrieve to-device messages for a given set of user IDs.
|
||||
|
||||
Only to-device messages with stream ids between the given boundaries
|
||||
(from < X <= to) are returned.
|
||||
|
||||
Note that multiple messages can have the same stream id. Stream ids are
|
||||
unique to *messages*, but there might be multiple recipients of a message, and
|
||||
thus multiple entries in the device_inbox table with the same stream id.
|
||||
|
||||
Args:
|
||||
user_ids: The users to retrieve to-device messages for.
|
||||
from_stream_id: The lower boundary of stream id to filter with (exclusive).
|
||||
to_stream_id: The upper boundary of stream id to filter with (inclusive).
|
||||
limit: The maximum number of to-device messages to return.
|
||||
|
||||
Returns:
|
||||
A tuple containing the following:
|
||||
* A list of to-device messages
|
||||
* The stream id that to-device messages were processed up to. The calling
|
||||
function can check this value against `to_stream_id` to see if we hit
|
||||
the given limit when fetching messages.
|
||||
"""
|
||||
# Bail out if none of these users have any messages
|
||||
for user_id in user_ids:
|
||||
if self._device_inbox_stream_cache.has_entity_changed(
|
||||
user_id, from_stream_id
|
||||
):
|
||||
break
|
||||
else:
|
||||
return {}, to_stream_id
|
||||
|
||||
def get_new_messages_txn(txn: LoggingTransaction):
|
||||
# Build a query to select messages from any of the given users that are between
|
||||
# the given stream id bounds
|
||||
|
||||
# Scope to only the given users. We need to use this method as doing so is
|
||||
# different across database engines.
|
||||
many_clause_sql, many_clause_args = make_in_list_sql_clause(
|
||||
self.database_engine, "user_id", user_ids
|
||||
)
|
||||
|
||||
sql = f"""
|
||||
SELECT stream_id, user_id, device_id, message_json FROM device_inbox
|
||||
WHERE {many_clause_sql}
|
||||
AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, (*many_clause_args, from_stream_id, to_stream_id, limit))
|
||||
|
||||
# Create a dictionary of (user ID, device ID) -> list of messages that
|
||||
# that device is meant to receive.
|
||||
recipient_user_id_device_id_to_messages = {}
|
||||
|
||||
stream_pos = to_stream_id
|
||||
total_messages_processed = 0
|
||||
for row in txn:
|
||||
# Record the last-processed stream position, to return later.
|
||||
# Note that we process messages here in order of ascending stream id.
|
||||
stream_pos = row[0]
|
||||
recipient_user_id = row[1]
|
||||
recipient_device_id = row[2]
|
||||
message_dict = db_to_json(row[3])
|
||||
|
||||
recipient_user_id_device_id_to_messages.setdefault(
|
||||
(recipient_user_id, recipient_device_id), []
|
||||
).append(message_dict)
|
||||
total_messages_processed += 1
|
||||
|
||||
# If we don't end up hitting the limit, we still want to return the equivalent
|
||||
# value of `to_stream_id` to the calling function. This is needed as we'll
|
||||
# be processing up to `to_stream_id`, without necessarily fetching a message
|
||||
# with a stream id of `to_stream_id`.
|
||||
if total_messages_processed < limit:
|
||||
stream_pos = to_stream_id
|
||||
|
||||
return recipient_user_id_device_id_to_messages, stream_pos
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_new_messages", get_new_messages_txn
|
||||
)
|
||||
|
||||
async def get_new_messages_for_device(
|
||||
self,
|
||||
user_id: str,
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Add a column to track what to_device stream id that this application
|
||||
-- service has been caught up to.
|
||||
ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT;
|
|
@ -253,20 +253,59 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
|||
},
|
||||
)
|
||||
|
||||
def test_notify_interested_services_ephemeral(self):
|
||||
def test_notify_interested_services_ephemeral_read_receipt(self):
|
||||
"""
|
||||
Test sending ephemeral events to the appservice handler are scheduled
|
||||
Test sending read receipts to the appservice handler are scheduled
|
||||
to be pushed out to interested appservices, and that the stream ID is
|
||||
updated accordingly.
|
||||
"""
|
||||
# Create an application service that is guaranteed to be interested in
|
||||
# any new events
|
||||
interested_service = self._mkservice(is_interested=True)
|
||||
services = [interested_service]
|
||||
|
||||
self.mock_store.get_app_services.return_value = services
|
||||
|
||||
# State that this application service has received up until stream ID 579
|
||||
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
|
||||
579
|
||||
)
|
||||
|
||||
# Set up a dummy event that should be sent to the application service
|
||||
event = Mock(event_id="event_1")
|
||||
self.event_source.sources.receipt.get_new_events_as.return_value = (
|
||||
make_awaitable(([event], None))
|
||||
)
|
||||
|
||||
self.handler.notify_interested_services_ephemeral(
|
||||
"receipt_key", 580, ["@fakerecipient:example.com"]
|
||||
)
|
||||
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
|
||||
interested_service, [event]
|
||||
)
|
||||
self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
|
||||
interested_service,
|
||||
"read_receipt",
|
||||
580,
|
||||
)
|
||||
|
||||
def test_notify_interested_services_ephemeral_to_device(self):
|
||||
"""
|
||||
Test sending read receipts to the appservice handler are scheduled
|
||||
to be pushed out to interested appservices, and that the stream ID is
|
||||
updated accordingly.
|
||||
"""
|
||||
# Create an application service that is guaranteed to be interested in
|
||||
# any new events
|
||||
interested_service = self._mkservice(is_interested=True)
|
||||
services = [interested_service]
|
||||
self.mock_store.get_app_services.return_value = services
|
||||
|
||||
# State that this application service has received up until stream ID 579
|
||||
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
|
||||
579
|
||||
)
|
||||
|
||||
# Set up a dummy event that should be sent to the application service
|
||||
event = Mock(event_id="event_1")
|
||||
self.event_source.sources.receipt.get_new_events_as.return_value = (
|
||||
make_awaitable(([event], None))
|
||||
|
|
258
tests/handlers/test_appservice_ephemeral.py
Normal file
258
tests/handlers/test_appservice_ephemeral.py
Normal file
|
@ -0,0 +1,258 @@
|
|||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import Dict, Iterable, Optional, Tuple, Union
|
||||
from unittest.mock import Mock
|
||||
|
||||
import synapse.rest.admin
|
||||
import synapse.storage
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.rest.client import login, receipts, room
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.types import JsonDict
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets_for_client_rest_resource,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
receipts.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
# Mock the application service scheduler so that we can track any outgoing transactions
|
||||
self.mock_scheduler = Mock()
|
||||
self.mock_scheduler.submit_ephemeral_events_for_as = Mock()
|
||||
|
||||
hs.get_application_service_handler().scheduler = self.mock_scheduler
|
||||
|
||||
self.user1 = self.register_user("user1", "password")
|
||||
self.token1 = self.login("user1", "password")
|
||||
|
||||
self.user2 = self.register_user("user2", "password")
|
||||
self.token2 = self.login("user2", "password")
|
||||
|
||||
def test_application_services_receive_read_receipts(self):
|
||||
"""
|
||||
Test that when a user sends a read receipt in a room with another
|
||||
user, and that is in an application service's user namespace, that
|
||||
application service will receive that read receipt.
|
||||
"""
|
||||
(
|
||||
interested_service,
|
||||
_,
|
||||
) = self._register_interested_and_uninterested_application_services()
|
||||
|
||||
# Create a room with both user1 and user2
|
||||
room_id = self.helper.create_room_as(
|
||||
self.user1, tok=self.token1, is_public=True
|
||||
)
|
||||
self.helper.join(room_id, self.user2, tok=self.token2)
|
||||
|
||||
# Have user2 send a message into the room
|
||||
response_dict = self.helper.send(room_id, body="read me", tok=self.token2)
|
||||
|
||||
# Have user1 send a read receipt for the message with an empty body
|
||||
self._send_read_receipt(room_id, response_dict["event_id"], self.token1)
|
||||
|
||||
# user2 should have been the recipient of that read receipt.
|
||||
# Check if our application service - that is interested in user2 - received
|
||||
# the read receipt as part of an AS transaction.
|
||||
#
|
||||
# The uninterested application service should not have been notified.
|
||||
last_call = self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list[
|
||||
0
|
||||
]
|
||||
service, events = last_call[0]
|
||||
self.assertEqual(service, interested_service)
|
||||
self.assertEqual(len(events), 1)
|
||||
self.assertEqual(events[0]["type"], "m.receipt")
|
||||
self.assertEqual(events[0]["room_id"], room_id)
|
||||
|
||||
# Assert that this was a read receipt from user1
|
||||
read_receipts = list(events[0]["content"].values())
|
||||
self.assertIn(self.user1, read_receipts[0]["m.read"])
|
||||
|
||||
# Clear mock stats
|
||||
self.mock_scheduler.submit_ephemeral_events_for_as.reset_mock()
|
||||
|
||||
# Send 2 pairs of messages + read receipts
|
||||
response_dict_1 = self.helper.send(room_id, body="read me1", tok=self.token2)
|
||||
response_dict_2 = self.helper.send(room_id, body="read me2", tok=self.token2)
|
||||
self._send_read_receipt(room_id, response_dict_1["event_id"], self.token1)
|
||||
self._send_read_receipt(room_id, response_dict_2["event_id"], self.token1)
|
||||
|
||||
# Assert each transaction that was sent to the application service is as expected
|
||||
self.assertEqual(2, self.mock_scheduler.submit_ephemeral_events_for_as.call_count)
|
||||
|
||||
first_call, second_call = self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list
|
||||
service, events = first_call[0]
|
||||
self.assertEqual(service, interested_service)
|
||||
self.assertEqual(len(events), 1)
|
||||
self.assertEqual(events[0]["type"], "m.receipt")
|
||||
self.assertEqual(
|
||||
self._event_id_from_read_receipt(events[0]), response_dict_1["event_id"]
|
||||
)
|
||||
|
||||
service, events = second_call[0]
|
||||
self.assertEqual(service, interested_service)
|
||||
self.assertEqual(len(events), 1)
|
||||
self.assertEqual(events[0]["type"], "m.receipt")
|
||||
self.assertEqual(
|
||||
self._event_id_from_read_receipt(events[0]), response_dict_2["event_id"]
|
||||
)
|
||||
|
||||
def test_application_services_receive_to_device(self):
|
||||
"""
|
||||
Test that when a user sends a to-device message to another user, and
|
||||
that is in an application service's user namespace, that application
|
||||
service will receive it.
|
||||
"""
|
||||
(
|
||||
interested_service,
|
||||
_,
|
||||
) = self._register_interested_and_uninterested_application_services()
|
||||
|
||||
# Create a room with both user1 and user2
|
||||
room_id = self.helper.create_room_as(
|
||||
self.user1, tok=self.token1, is_public=True
|
||||
)
|
||||
self.helper.join(room_id, self.user2, tok=self.token2)
|
||||
|
||||
# Have user2 send a typing notification into the room
|
||||
response_dict = self.helper.send(room_id, body="read me", tok=self.token2)
|
||||
|
||||
# Have user1 send a read receipt for the message with an empty body
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/rooms/%s/receipt/m.read/%s" % (room_id, response_dict["event_id"]),
|
||||
access_token=self.token1,
|
||||
)
|
||||
self.assertEqual(channel.code, 200)
|
||||
|
||||
# user2 should have been the recipient of that read receipt.
|
||||
# Check if our application service - that is interested in user2 - received
|
||||
# the read receipt as part of an AS transaction.
|
||||
#
|
||||
# The uninterested application service should not have been notified.
|
||||
service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[
|
||||
0
|
||||
]
|
||||
self.assertEqual(service, interested_service)
|
||||
self.assertEqual(events[0]["type"], "m.receipt")
|
||||
self.assertEqual(events[0]["room_id"], room_id)
|
||||
|
||||
# Assert that this was a read receipt from user1
|
||||
read_receipts = list(events[0]["content"].values())
|
||||
self.assertIn(self.user1, read_receipts[0]["m.read"])
|
||||
|
||||
def _register_interested_and_uninterested_application_services(
|
||||
self,
|
||||
) -> Tuple[ApplicationService, ApplicationService]:
|
||||
# Create an application service with exclusive interest in user2
|
||||
interested_service = self._make_application_service(
|
||||
namespaces={
|
||||
ApplicationService.NS_USERS: [
|
||||
{
|
||||
"regex": "@user2:.+",
|
||||
"exclusive": True,
|
||||
}
|
||||
],
|
||||
},
|
||||
)
|
||||
uninterested_service = self._make_application_service()
|
||||
|
||||
# Register this application service, along with another, uninterested one
|
||||
services = [
|
||||
uninterested_service,
|
||||
interested_service,
|
||||
]
|
||||
self.hs.get_datastore().get_app_services = Mock(return_value=services)
|
||||
|
||||
return interested_service, uninterested_service
|
||||
|
||||
def _make_application_service(
|
||||
self,
|
||||
namespaces: Optional[
|
||||
Dict[
|
||||
Union[
|
||||
ApplicationService.NS_USERS,
|
||||
ApplicationService.NS_ALIASES,
|
||||
ApplicationService.NS_ROOMS,
|
||||
],
|
||||
Iterable[Dict],
|
||||
]
|
||||
] = None,
|
||||
supports_ephemeral: Optional[bool] = True,
|
||||
) -> ApplicationService:
|
||||
return ApplicationService(
|
||||
token=None,
|
||||
hostname="example.com",
|
||||
id=random_string(10),
|
||||
sender="@as:example.com",
|
||||
rate_limited=False,
|
||||
namespaces=namespaces,
|
||||
supports_ephemeral=supports_ephemeral,
|
||||
)
|
||||
|
||||
def _send_read_receipt(self, room_id: str, event_id_to_read: str, tok: str) -> None:
|
||||
"""
|
||||
Send a read receipt of an event into a room.
|
||||
|
||||
Args:
|
||||
room_id: The room to event is part of.
|
||||
event_id_to_read: The ID of the event being read.
|
||||
tok: The access token of the sender.
|
||||
"""
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/rooms/%s/receipt/m.read/%s" % (room_id, event_id_to_read),
|
||||
access_token=tok,
|
||||
content="{}",
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
def _event_id_from_read_receipt(self, read_receipt_dict: JsonDict):
|
||||
"""
|
||||
Extracts the first event ID from a read receipt. Read receipt dictionaries
|
||||
are in the form:
|
||||
|
||||
{
|
||||
'type': 'm.receipt',
|
||||
'room_id': '!PEzCqHyycBVxqMKIjI:test',
|
||||
'content': {
|
||||
'$DETIeTEH651c1N7sP_j-YZiaQqCaayHhYwmhZDVWDY8': { # We want this
|
||||
'm.read': {
|
||||
'@user1:test': {
|
||||
'ts': 1300,
|
||||
'hidden': False
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Args:
|
||||
read_receipt_dict: The dictionary returned from a POST read receipt call.
|
||||
|
||||
Returns:
|
||||
The (first) event ID the read receipt refers to.
|
||||
"""
|
||||
return list(read_receipt_dict["content"].keys())[0]
|
||||
|
||||
# TODO: Test that ephemeral messages aren't sent to application services that have
|
||||
# ephemeral: false
|
|
@ -230,7 +230,7 @@ class RestHelper:
|
|||
custom_headers: Optional[
|
||||
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
|
||||
] = None,
|
||||
):
|
||||
) -> JsonDict:
|
||||
if body is None:
|
||||
body = "body_text_here"
|
||||
|
||||
|
@ -257,7 +257,7 @@ class RestHelper:
|
|||
custom_headers: Optional[
|
||||
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
|
||||
] = None,
|
||||
):
|
||||
) -> JsonDict:
|
||||
if txn_id is None:
|
||||
txn_id = "m%s" % (str(time.time()))
|
||||
|
||||
|
|
|
@ -320,7 +320,7 @@ class HomeserverTestCase(TestCase):
|
|||
def wait_for_background_updates(self) -> None:
|
||||
"""Block until all background database updates have completed."""
|
||||
while not self.get_success(
|
||||
self.store.db_pool.updates.has_completed_background_updates()
|
||||
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
|
||||
):
|
||||
self.get_success(
|
||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
||||
|
|
|
@ -236,7 +236,7 @@ def setup_test_homeserver(
|
|||
else:
|
||||
database_config = {
|
||||
"name": "sqlite3",
|
||||
"args": {"database": ":memory:", "cp_min": 1, "cp_max": 1},
|
||||
"args": {"database": "test.db", "cp_min": 1, "cp_max": 1},
|
||||
}
|
||||
|
||||
if "db_txn_limit" in kwargs:
|
||||
|
|
Loading…
Reference in a new issue