Compare commits
7 commits
develop
...
anoa/e2e_a
Author | SHA1 | Date | |
---|---|---|---|
|
a7bf3f6f95 | ||
|
42f0ab1d84 | ||
|
6bcd8dee42 | ||
|
43bdfbbc85 | ||
|
59b5eeefe1 | ||
|
52c43d91f5 | ||
|
0772fc855b |
1
changelog.d/11215.feature
Normal file
1
changelog.d/11215.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add experimental support for sending to-device messages to application services, as specified by [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409). Disabled by default.
|
|
@ -46,3 +46,11 @@ class ExperimentalConfig(Config):
|
||||||
|
|
||||||
# MSC3266 (room summary api)
|
# MSC3266 (room summary api)
|
||||||
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
|
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
|
||||||
|
|
||||||
|
# MSC2409 (this setting only relates to optionally sending to-device messages).
|
||||||
|
# Presence, typing and read receipt EDUs are already sent to application services that
|
||||||
|
# have opted in to receive them. This setting, if enabled, adds to-device messages
|
||||||
|
# to that list.
|
||||||
|
self.msc2409_to_device_messages_enabled: bool = experimental.get(
|
||||||
|
"msc2409_to_device_messages_enabled", False
|
||||||
|
)
|
||||||
|
|
|
@ -12,7 +12,16 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
Collection,
|
||||||
|
Dict,
|
||||||
|
Iterable,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Tuple,
|
||||||
|
Union,
|
||||||
|
)
|
||||||
|
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
|
||||||
|
@ -42,6 +51,8 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION = 100
|
||||||
|
|
||||||
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
||||||
|
|
||||||
|
|
||||||
|
@ -55,6 +66,9 @@ class ApplicationServicesHandler:
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.notify_appservices = hs.config.appservice.notify_appservices
|
self.notify_appservices = hs.config.appservice.notify_appservices
|
||||||
self.event_sources = hs.get_event_sources()
|
self.event_sources = hs.get_event_sources()
|
||||||
|
self.msc2409_to_device_messages_enabled = (
|
||||||
|
hs.config.experimental.msc2409_to_device_messages_enabled
|
||||||
|
)
|
||||||
|
|
||||||
self.current_max = 0
|
self.current_max = 0
|
||||||
self.is_processing = False
|
self.is_processing = False
|
||||||
|
@ -199,8 +213,9 @@ class ApplicationServicesHandler:
|
||||||
Args:
|
Args:
|
||||||
stream_key: The stream the event came from.
|
stream_key: The stream the event came from.
|
||||||
|
|
||||||
`stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
|
`stream_key` can be "typing_key", "receipt_key", "presence_key" or
|
||||||
value for `stream_key` will cause this function to return early.
|
"to_device_key". Any other value for `stream_key` will cause this function
|
||||||
|
to return early.
|
||||||
|
|
||||||
Ephemeral events will only be pushed to appservices that have opted into
|
Ephemeral events will only be pushed to appservices that have opted into
|
||||||
receiving them by setting `push_ephemeral` to true in their registration
|
receiving them by setting `push_ephemeral` to true in their registration
|
||||||
|
@ -216,10 +231,6 @@ class ApplicationServicesHandler:
|
||||||
if not self.notify_appservices:
|
if not self.notify_appservices:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Ignore any unsupported streams
|
|
||||||
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Assert that new_token is an integer (and not a RoomStreamToken).
|
# Assert that new_token is an integer (and not a RoomStreamToken).
|
||||||
# All of the supported streams that this function handles use an
|
# All of the supported streams that this function handles use an
|
||||||
# integer to track progress (rather than a RoomStreamToken - a
|
# integer to track progress (rather than a RoomStreamToken - a
|
||||||
|
@ -233,6 +244,13 @@ class ApplicationServicesHandler:
|
||||||
# Additional context: https://github.com/matrix-org/synapse/pull/11137
|
# Additional context: https://github.com/matrix-org/synapse/pull/11137
|
||||||
assert isinstance(new_token, int)
|
assert isinstance(new_token, int)
|
||||||
|
|
||||||
|
# Ignore to-device messages if the feature flag is not enabled
|
||||||
|
if (
|
||||||
|
stream_key == "to_device_key"
|
||||||
|
and not self.msc2409_to_device_messages_enabled
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
# Check whether there are any appservices which have registered to receive
|
# Check whether there are any appservices which have registered to receive
|
||||||
# ephemeral events.
|
# ephemeral events.
|
||||||
#
|
#
|
||||||
|
@ -307,6 +325,30 @@ class ApplicationServicesHandler:
|
||||||
service, "presence", new_token
|
service, "presence", new_token
|
||||||
)
|
)
|
||||||
|
|
||||||
|
elif (
|
||||||
|
stream_key == "to_device_key"
|
||||||
|
and new_token is not None
|
||||||
|
and self.msc2409_to_device_messages_enabled
|
||||||
|
):
|
||||||
|
# Retrieve an iterable of to-device message events, as well as the
|
||||||
|
# maximum stream token of the messages we were able to retrieve.
|
||||||
|
events, max_stream_token = await self._handle_to_device(
|
||||||
|
service, new_token, users
|
||||||
|
)
|
||||||
|
if events:
|
||||||
|
self.scheduler.submit_ephemeral_events_for_as(
|
||||||
|
service, events
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: If max_stream_token != new_token, schedule another transaction immediately,
|
||||||
|
# instead of waiting for another to-device to be sent?
|
||||||
|
# https://github.com/matrix-org/synapse/issues/11150#issuecomment-960726449
|
||||||
|
|
||||||
|
# Persist the latest handled stream token for this appservice
|
||||||
|
await self.store.set_type_stream_id_for_appservice(
|
||||||
|
service, "to_device", max_stream_token
|
||||||
|
)
|
||||||
|
|
||||||
async def _handle_typing(
|
async def _handle_typing(
|
||||||
self, service: ApplicationService, new_token: int
|
self, service: ApplicationService, new_token: int
|
||||||
) -> List[JsonDict]:
|
) -> List[JsonDict]:
|
||||||
|
@ -440,6 +482,88 @@ class ApplicationServicesHandler:
|
||||||
|
|
||||||
return events
|
return events
|
||||||
|
|
||||||
|
async def _handle_to_device(
|
||||||
|
self,
|
||||||
|
service: ApplicationService,
|
||||||
|
new_token: int,
|
||||||
|
users: Collection[Union[str, UserID]],
|
||||||
|
) -> Tuple[List[JsonDict], int]:
|
||||||
|
"""
|
||||||
|
Given an application service, determine which events it should receive
|
||||||
|
from those between the last-recorded typing event stream token for this
|
||||||
|
appservice and the given stream token.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
service: The application service to check for which events it should receive.
|
||||||
|
new_token: The latest to-device event stream token.
|
||||||
|
users: The users that should receive new to-device messages.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A two-tuple containing the following:
|
||||||
|
* A list of JSON dictionaries containing data derived from the typing events
|
||||||
|
that should be sent to the given application service.
|
||||||
|
* The last-processed to-device message's stream id. If this does not equal
|
||||||
|
`new_token` then there are likely more events to send.
|
||||||
|
"""
|
||||||
|
# Get the stream token that this application service has processed up until
|
||||||
|
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||||
|
service, "to_device"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Filter out users that this appservice is not interested in
|
||||||
|
users_appservice_is_interested_in: List[str] = []
|
||||||
|
for user in users:
|
||||||
|
if isinstance(user, UserID):
|
||||||
|
user = user.to_string()
|
||||||
|
|
||||||
|
if service.is_interested_in_user(user):
|
||||||
|
users_appservice_is_interested_in.append(user)
|
||||||
|
|
||||||
|
if not users_appservice_is_interested_in:
|
||||||
|
# Return early if the AS was not interested in any of these users
|
||||||
|
return [], new_token
|
||||||
|
|
||||||
|
# Retrieve the to-device messages for each user
|
||||||
|
(
|
||||||
|
recipient_user_id_device_id_to_messages,
|
||||||
|
# Record the maximum stream token of the retrieved messages that
|
||||||
|
# this function was able to pull before hitting the max to-device
|
||||||
|
# message count limit.
|
||||||
|
# We return this value later, to ensure we only record the stream
|
||||||
|
# token we managed to get up to, so that the rest can be sent later.
|
||||||
|
max_stream_token,
|
||||||
|
) = await self.store.get_new_messages(
|
||||||
|
users_appservice_is_interested_in,
|
||||||
|
from_key,
|
||||||
|
new_token,
|
||||||
|
limit=MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION,
|
||||||
|
)
|
||||||
|
|
||||||
|
# According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
|
||||||
|
# to the event JSON so that the application service will know which user/device
|
||||||
|
# combination this messages was intended for.
|
||||||
|
#
|
||||||
|
# So we mangle this dict into a flat list of to-device messages with the relevant
|
||||||
|
# user ID and device ID embedded inside each message dict.
|
||||||
|
message_payload: List[JsonDict] = []
|
||||||
|
for (
|
||||||
|
user_id,
|
||||||
|
device_id,
|
||||||
|
), messages in recipient_user_id_device_id_to_messages.items():
|
||||||
|
for message_json in messages:
|
||||||
|
# Remove 'message_id' from the to-device message, as it's an internal ID
|
||||||
|
message_json.pop("message_id", None)
|
||||||
|
|
||||||
|
message_payload.append(
|
||||||
|
{
|
||||||
|
"to_user_id": user_id,
|
||||||
|
"to_device_id": device_id,
|
||||||
|
**message_json,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return message_payload, max_stream_token
|
||||||
|
|
||||||
async def query_user_exists(self, user_id: str) -> bool:
|
async def query_user_exists(self, user_id: str) -> bool:
|
||||||
"""Check if any application service knows this user_id exists.
|
"""Check if any application service knows this user_id exists.
|
||||||
|
|
||||||
|
|
|
@ -444,15 +444,23 @@ class Notifier:
|
||||||
|
|
||||||
self.notify_replication()
|
self.notify_replication()
|
||||||
|
|
||||||
# Notify appservices.
|
# Notify appservices of updates in ephemeral event streams.
|
||||||
try:
|
# Only the following streams are currently supported.
|
||||||
self.appservice_handler.notify_interested_services_ephemeral(
|
if stream_key in (
|
||||||
stream_key,
|
"typing_key",
|
||||||
new_token,
|
"receipt_key",
|
||||||
users,
|
"presence_key",
|
||||||
)
|
"to_device_key",
|
||||||
except Exception:
|
):
|
||||||
logger.exception("Error notifying application services of event")
|
# Notify appservices.
|
||||||
|
try:
|
||||||
|
self.appservice_handler.notify_interested_services_ephemeral(
|
||||||
|
stream_key,
|
||||||
|
new_token,
|
||||||
|
users,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error notifying application services of event")
|
||||||
|
|
||||||
def on_new_replication_data(self) -> None:
|
def on_new_replication_data(self) -> None:
|
||||||
"""Used to inform replication listeners that something has happened
|
"""Used to inform replication listeners that something has happened
|
||||||
|
|
|
@ -387,7 +387,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||||
async def get_type_stream_id_for_appservice(
|
async def get_type_stream_id_for_appservice(
|
||||||
self, service: ApplicationService, type: str
|
self, service: ApplicationService, type: str
|
||||||
) -> int:
|
) -> int:
|
||||||
if type not in ("read_receipt", "presence"):
|
if type not in ("read_receipt", "presence", "to_device"):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Expected type to be a valid application stream id type, got %s"
|
"Expected type to be a valid application stream id type, got %s"
|
||||||
% (type,)
|
% (type,)
|
||||||
|
@ -414,7 +414,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||||
async def set_type_stream_id_for_appservice(
|
async def set_type_stream_id_for_appservice(
|
||||||
self, service: ApplicationService, stream_type: str, pos: Optional[int]
|
self, service: ApplicationService, stream_type: str, pos: Optional[int]
|
||||||
) -> None:
|
) -> None:
|
||||||
if stream_type not in ("read_receipt", "presence"):
|
if stream_type not in ("read_receipt", "presence", "to_device"):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Expected type to be a valid application stream id type, got %s"
|
"Expected type to be a valid application stream id type, got %s"
|
||||||
% (stream_type,)
|
% (stream_type,)
|
||||||
|
|
|
@ -13,13 +13,17 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from synapse.logging import issue9533_logger
|
from synapse.logging import issue9533_logger
|
||||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||||
from synapse.replication.tcp.streams import ToDeviceStream
|
from synapse.replication.tcp.streams import ToDeviceStream
|
||||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
from synapse.storage.database import (
|
||||||
|
DatabasePool,
|
||||||
|
LoggingTransaction,
|
||||||
|
make_in_list_sql_clause,
|
||||||
|
)
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
@ -116,6 +120,97 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
def get_to_device_stream_token(self):
|
def get_to_device_stream_token(self):
|
||||||
return self._device_inbox_id_gen.get_current_token()
|
return self._device_inbox_id_gen.get_current_token()
|
||||||
|
|
||||||
|
async def get_new_messages(
|
||||||
|
self,
|
||||||
|
user_ids: Collection[str],
|
||||||
|
from_stream_id: int,
|
||||||
|
to_stream_id: int,
|
||||||
|
limit: int = 100,
|
||||||
|
) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
|
||||||
|
"""
|
||||||
|
Retrieve to-device messages for a given set of user IDs.
|
||||||
|
|
||||||
|
Only to-device messages with stream ids between the given boundaries
|
||||||
|
(from < X <= to) are returned.
|
||||||
|
|
||||||
|
Note that multiple messages can have the same stream id. Stream ids are
|
||||||
|
unique to *messages*, but there might be multiple recipients of a message, and
|
||||||
|
thus multiple entries in the device_inbox table with the same stream id.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_ids: The users to retrieve to-device messages for.
|
||||||
|
from_stream_id: The lower boundary of stream id to filter with (exclusive).
|
||||||
|
to_stream_id: The upper boundary of stream id to filter with (inclusive).
|
||||||
|
limit: The maximum number of to-device messages to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A tuple containing the following:
|
||||||
|
* A list of to-device messages
|
||||||
|
* The stream id that to-device messages were processed up to. The calling
|
||||||
|
function can check this value against `to_stream_id` to see if we hit
|
||||||
|
the given limit when fetching messages.
|
||||||
|
"""
|
||||||
|
# Bail out if none of these users have any messages
|
||||||
|
for user_id in user_ids:
|
||||||
|
if self._device_inbox_stream_cache.has_entity_changed(
|
||||||
|
user_id, from_stream_id
|
||||||
|
):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
return {}, to_stream_id
|
||||||
|
|
||||||
|
def get_new_messages_txn(txn: LoggingTransaction):
|
||||||
|
# Build a query to select messages from any of the given users that are between
|
||||||
|
# the given stream id bounds
|
||||||
|
|
||||||
|
# Scope to only the given users. We need to use this method as doing so is
|
||||||
|
# different across database engines.
|
||||||
|
many_clause_sql, many_clause_args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "user_id", user_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
|
SELECT stream_id, user_id, device_id, message_json FROM device_inbox
|
||||||
|
WHERE {many_clause_sql}
|
||||||
|
AND ? < stream_id AND stream_id <= ?
|
||||||
|
ORDER BY stream_id ASC
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (*many_clause_args, from_stream_id, to_stream_id, limit))
|
||||||
|
|
||||||
|
# Create a dictionary of (user ID, device ID) -> list of messages that
|
||||||
|
# that device is meant to receive.
|
||||||
|
recipient_user_id_device_id_to_messages = {}
|
||||||
|
|
||||||
|
stream_pos = to_stream_id
|
||||||
|
total_messages_processed = 0
|
||||||
|
for row in txn:
|
||||||
|
# Record the last-processed stream position, to return later.
|
||||||
|
# Note that we process messages here in order of ascending stream id.
|
||||||
|
stream_pos = row[0]
|
||||||
|
recipient_user_id = row[1]
|
||||||
|
recipient_device_id = row[2]
|
||||||
|
message_dict = db_to_json(row[3])
|
||||||
|
|
||||||
|
recipient_user_id_device_id_to_messages.setdefault(
|
||||||
|
(recipient_user_id, recipient_device_id), []
|
||||||
|
).append(message_dict)
|
||||||
|
total_messages_processed += 1
|
||||||
|
|
||||||
|
# If we don't end up hitting the limit, we still want to return the equivalent
|
||||||
|
# value of `to_stream_id` to the calling function. This is needed as we'll
|
||||||
|
# be processing up to `to_stream_id`, without necessarily fetching a message
|
||||||
|
# with a stream id of `to_stream_id`.
|
||||||
|
if total_messages_processed < limit:
|
||||||
|
stream_pos = to_stream_id
|
||||||
|
|
||||||
|
return recipient_user_id_device_id_to_messages, stream_pos
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_new_messages", get_new_messages_txn
|
||||||
|
)
|
||||||
|
|
||||||
async def get_new_messages_for_device(
|
async def get_new_messages_for_device(
|
||||||
self,
|
self,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Add a column to track what to_device stream id that this application
|
||||||
|
-- service has been caught up to.
|
||||||
|
ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT;
|
|
@ -253,20 +253,59 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_notify_interested_services_ephemeral(self):
|
def test_notify_interested_services_ephemeral_read_receipt(self):
|
||||||
"""
|
"""
|
||||||
Test sending ephemeral events to the appservice handler are scheduled
|
Test sending read receipts to the appservice handler are scheduled
|
||||||
to be pushed out to interested appservices, and that the stream ID is
|
to be pushed out to interested appservices, and that the stream ID is
|
||||||
updated accordingly.
|
updated accordingly.
|
||||||
"""
|
"""
|
||||||
|
# Create an application service that is guaranteed to be interested in
|
||||||
|
# any new events
|
||||||
interested_service = self._mkservice(is_interested=True)
|
interested_service = self._mkservice(is_interested=True)
|
||||||
services = [interested_service]
|
services = [interested_service]
|
||||||
|
|
||||||
self.mock_store.get_app_services.return_value = services
|
self.mock_store.get_app_services.return_value = services
|
||||||
|
|
||||||
|
# State that this application service has received up until stream ID 579
|
||||||
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
|
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
|
||||||
579
|
579
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Set up a dummy event that should be sent to the application service
|
||||||
|
event = Mock(event_id="event_1")
|
||||||
|
self.event_source.sources.receipt.get_new_events_as.return_value = (
|
||||||
|
make_awaitable(([event], None))
|
||||||
|
)
|
||||||
|
|
||||||
|
self.handler.notify_interested_services_ephemeral(
|
||||||
|
"receipt_key", 580, ["@fakerecipient:example.com"]
|
||||||
|
)
|
||||||
|
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
|
||||||
|
interested_service, [event]
|
||||||
|
)
|
||||||
|
self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
|
||||||
|
interested_service,
|
||||||
|
"read_receipt",
|
||||||
|
580,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_notify_interested_services_ephemeral_to_device(self):
|
||||||
|
"""
|
||||||
|
Test sending read receipts to the appservice handler are scheduled
|
||||||
|
to be pushed out to interested appservices, and that the stream ID is
|
||||||
|
updated accordingly.
|
||||||
|
"""
|
||||||
|
# Create an application service that is guaranteed to be interested in
|
||||||
|
# any new events
|
||||||
|
interested_service = self._mkservice(is_interested=True)
|
||||||
|
services = [interested_service]
|
||||||
|
self.mock_store.get_app_services.return_value = services
|
||||||
|
|
||||||
|
# State that this application service has received up until stream ID 579
|
||||||
|
self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
|
||||||
|
579
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set up a dummy event that should be sent to the application service
|
||||||
event = Mock(event_id="event_1")
|
event = Mock(event_id="event_1")
|
||||||
self.event_source.sources.receipt.get_new_events_as.return_value = (
|
self.event_source.sources.receipt.get_new_events_as.return_value = (
|
||||||
make_awaitable(([event], None))
|
make_awaitable(([event], None))
|
||||||
|
|
258
tests/handlers/test_appservice_ephemeral.py
Normal file
258
tests/handlers/test_appservice_ephemeral.py
Normal file
|
@ -0,0 +1,258 @@
|
||||||
|
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from typing import Dict, Iterable, Optional, Tuple, Union
|
||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
import synapse.rest.admin
|
||||||
|
import synapse.storage
|
||||||
|
from synapse.appservice import ApplicationService
|
||||||
|
from synapse.rest.client import login, receipts, room
|
||||||
|
from synapse.util.stringutils import random_string
|
||||||
|
from synapse.types import JsonDict
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets_for_client_rest_resource,
|
||||||
|
login.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
receipts.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def prepare(self, reactor, clock, hs):
|
||||||
|
# Mock the application service scheduler so that we can track any outgoing transactions
|
||||||
|
self.mock_scheduler = Mock()
|
||||||
|
self.mock_scheduler.submit_ephemeral_events_for_as = Mock()
|
||||||
|
|
||||||
|
hs.get_application_service_handler().scheduler = self.mock_scheduler
|
||||||
|
|
||||||
|
self.user1 = self.register_user("user1", "password")
|
||||||
|
self.token1 = self.login("user1", "password")
|
||||||
|
|
||||||
|
self.user2 = self.register_user("user2", "password")
|
||||||
|
self.token2 = self.login("user2", "password")
|
||||||
|
|
||||||
|
def test_application_services_receive_read_receipts(self):
|
||||||
|
"""
|
||||||
|
Test that when a user sends a read receipt in a room with another
|
||||||
|
user, and that is in an application service's user namespace, that
|
||||||
|
application service will receive that read receipt.
|
||||||
|
"""
|
||||||
|
(
|
||||||
|
interested_service,
|
||||||
|
_,
|
||||||
|
) = self._register_interested_and_uninterested_application_services()
|
||||||
|
|
||||||
|
# Create a room with both user1 and user2
|
||||||
|
room_id = self.helper.create_room_as(
|
||||||
|
self.user1, tok=self.token1, is_public=True
|
||||||
|
)
|
||||||
|
self.helper.join(room_id, self.user2, tok=self.token2)
|
||||||
|
|
||||||
|
# Have user2 send a message into the room
|
||||||
|
response_dict = self.helper.send(room_id, body="read me", tok=self.token2)
|
||||||
|
|
||||||
|
# Have user1 send a read receipt for the message with an empty body
|
||||||
|
self._send_read_receipt(room_id, response_dict["event_id"], self.token1)
|
||||||
|
|
||||||
|
# user2 should have been the recipient of that read receipt.
|
||||||
|
# Check if our application service - that is interested in user2 - received
|
||||||
|
# the read receipt as part of an AS transaction.
|
||||||
|
#
|
||||||
|
# The uninterested application service should not have been notified.
|
||||||
|
last_call = self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list[
|
||||||
|
0
|
||||||
|
]
|
||||||
|
service, events = last_call[0]
|
||||||
|
self.assertEqual(service, interested_service)
|
||||||
|
self.assertEqual(len(events), 1)
|
||||||
|
self.assertEqual(events[0]["type"], "m.receipt")
|
||||||
|
self.assertEqual(events[0]["room_id"], room_id)
|
||||||
|
|
||||||
|
# Assert that this was a read receipt from user1
|
||||||
|
read_receipts = list(events[0]["content"].values())
|
||||||
|
self.assertIn(self.user1, read_receipts[0]["m.read"])
|
||||||
|
|
||||||
|
# Clear mock stats
|
||||||
|
self.mock_scheduler.submit_ephemeral_events_for_as.reset_mock()
|
||||||
|
|
||||||
|
# Send 2 pairs of messages + read receipts
|
||||||
|
response_dict_1 = self.helper.send(room_id, body="read me1", tok=self.token2)
|
||||||
|
response_dict_2 = self.helper.send(room_id, body="read me2", tok=self.token2)
|
||||||
|
self._send_read_receipt(room_id, response_dict_1["event_id"], self.token1)
|
||||||
|
self._send_read_receipt(room_id, response_dict_2["event_id"], self.token1)
|
||||||
|
|
||||||
|
# Assert each transaction that was sent to the application service is as expected
|
||||||
|
self.assertEqual(2, self.mock_scheduler.submit_ephemeral_events_for_as.call_count)
|
||||||
|
|
||||||
|
first_call, second_call = self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list
|
||||||
|
service, events = first_call[0]
|
||||||
|
self.assertEqual(service, interested_service)
|
||||||
|
self.assertEqual(len(events), 1)
|
||||||
|
self.assertEqual(events[0]["type"], "m.receipt")
|
||||||
|
self.assertEqual(
|
||||||
|
self._event_id_from_read_receipt(events[0]), response_dict_1["event_id"]
|
||||||
|
)
|
||||||
|
|
||||||
|
service, events = second_call[0]
|
||||||
|
self.assertEqual(service, interested_service)
|
||||||
|
self.assertEqual(len(events), 1)
|
||||||
|
self.assertEqual(events[0]["type"], "m.receipt")
|
||||||
|
self.assertEqual(
|
||||||
|
self._event_id_from_read_receipt(events[0]), response_dict_2["event_id"]
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_application_services_receive_to_device(self):
|
||||||
|
"""
|
||||||
|
Test that when a user sends a to-device message to another user, and
|
||||||
|
that is in an application service's user namespace, that application
|
||||||
|
service will receive it.
|
||||||
|
"""
|
||||||
|
(
|
||||||
|
interested_service,
|
||||||
|
_,
|
||||||
|
) = self._register_interested_and_uninterested_application_services()
|
||||||
|
|
||||||
|
# Create a room with both user1 and user2
|
||||||
|
room_id = self.helper.create_room_as(
|
||||||
|
self.user1, tok=self.token1, is_public=True
|
||||||
|
)
|
||||||
|
self.helper.join(room_id, self.user2, tok=self.token2)
|
||||||
|
|
||||||
|
# Have user2 send a typing notification into the room
|
||||||
|
response_dict = self.helper.send(room_id, body="read me", tok=self.token2)
|
||||||
|
|
||||||
|
# Have user1 send a read receipt for the message with an empty body
|
||||||
|
channel = self.make_request(
|
||||||
|
"POST",
|
||||||
|
"/rooms/%s/receipt/m.read/%s" % (room_id, response_dict["event_id"]),
|
||||||
|
access_token=self.token1,
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, 200)
|
||||||
|
|
||||||
|
# user2 should have been the recipient of that read receipt.
|
||||||
|
# Check if our application service - that is interested in user2 - received
|
||||||
|
# the read receipt as part of an AS transaction.
|
||||||
|
#
|
||||||
|
# The uninterested application service should not have been notified.
|
||||||
|
service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[
|
||||||
|
0
|
||||||
|
]
|
||||||
|
self.assertEqual(service, interested_service)
|
||||||
|
self.assertEqual(events[0]["type"], "m.receipt")
|
||||||
|
self.assertEqual(events[0]["room_id"], room_id)
|
||||||
|
|
||||||
|
# Assert that this was a read receipt from user1
|
||||||
|
read_receipts = list(events[0]["content"].values())
|
||||||
|
self.assertIn(self.user1, read_receipts[0]["m.read"])
|
||||||
|
|
||||||
|
def _register_interested_and_uninterested_application_services(
|
||||||
|
self,
|
||||||
|
) -> Tuple[ApplicationService, ApplicationService]:
|
||||||
|
# Create an application service with exclusive interest in user2
|
||||||
|
interested_service = self._make_application_service(
|
||||||
|
namespaces={
|
||||||
|
ApplicationService.NS_USERS: [
|
||||||
|
{
|
||||||
|
"regex": "@user2:.+",
|
||||||
|
"exclusive": True,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
uninterested_service = self._make_application_service()
|
||||||
|
|
||||||
|
# Register this application service, along with another, uninterested one
|
||||||
|
services = [
|
||||||
|
uninterested_service,
|
||||||
|
interested_service,
|
||||||
|
]
|
||||||
|
self.hs.get_datastore().get_app_services = Mock(return_value=services)
|
||||||
|
|
||||||
|
return interested_service, uninterested_service
|
||||||
|
|
||||||
|
def _make_application_service(
|
||||||
|
self,
|
||||||
|
namespaces: Optional[
|
||||||
|
Dict[
|
||||||
|
Union[
|
||||||
|
ApplicationService.NS_USERS,
|
||||||
|
ApplicationService.NS_ALIASES,
|
||||||
|
ApplicationService.NS_ROOMS,
|
||||||
|
],
|
||||||
|
Iterable[Dict],
|
||||||
|
]
|
||||||
|
] = None,
|
||||||
|
supports_ephemeral: Optional[bool] = True,
|
||||||
|
) -> ApplicationService:
|
||||||
|
return ApplicationService(
|
||||||
|
token=None,
|
||||||
|
hostname="example.com",
|
||||||
|
id=random_string(10),
|
||||||
|
sender="@as:example.com",
|
||||||
|
rate_limited=False,
|
||||||
|
namespaces=namespaces,
|
||||||
|
supports_ephemeral=supports_ephemeral,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _send_read_receipt(self, room_id: str, event_id_to_read: str, tok: str) -> None:
|
||||||
|
"""
|
||||||
|
Send a read receipt of an event into a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: The room to event is part of.
|
||||||
|
event_id_to_read: The ID of the event being read.
|
||||||
|
tok: The access token of the sender.
|
||||||
|
"""
|
||||||
|
channel = self.make_request(
|
||||||
|
"POST",
|
||||||
|
"/rooms/%s/receipt/m.read/%s" % (room_id, event_id_to_read),
|
||||||
|
access_token=tok,
|
||||||
|
content="{}",
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
|
||||||
|
def _event_id_from_read_receipt(self, read_receipt_dict: JsonDict):
|
||||||
|
"""
|
||||||
|
Extracts the first event ID from a read receipt. Read receipt dictionaries
|
||||||
|
are in the form:
|
||||||
|
|
||||||
|
{
|
||||||
|
'type': 'm.receipt',
|
||||||
|
'room_id': '!PEzCqHyycBVxqMKIjI:test',
|
||||||
|
'content': {
|
||||||
|
'$DETIeTEH651c1N7sP_j-YZiaQqCaayHhYwmhZDVWDY8': { # We want this
|
||||||
|
'm.read': {
|
||||||
|
'@user1:test': {
|
||||||
|
'ts': 1300,
|
||||||
|
'hidden': False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Args:
|
||||||
|
read_receipt_dict: The dictionary returned from a POST read receipt call.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The (first) event ID the read receipt refers to.
|
||||||
|
"""
|
||||||
|
return list(read_receipt_dict["content"].keys())[0]
|
||||||
|
|
||||||
|
# TODO: Test that ephemeral messages aren't sent to application services that have
|
||||||
|
# ephemeral: false
|
|
@ -230,7 +230,7 @@ class RestHelper:
|
||||||
custom_headers: Optional[
|
custom_headers: Optional[
|
||||||
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
|
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
|
||||||
] = None,
|
] = None,
|
||||||
):
|
) -> JsonDict:
|
||||||
if body is None:
|
if body is None:
|
||||||
body = "body_text_here"
|
body = "body_text_here"
|
||||||
|
|
||||||
|
@ -257,7 +257,7 @@ class RestHelper:
|
||||||
custom_headers: Optional[
|
custom_headers: Optional[
|
||||||
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
|
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
|
||||||
] = None,
|
] = None,
|
||||||
):
|
) -> JsonDict:
|
||||||
if txn_id is None:
|
if txn_id is None:
|
||||||
txn_id = "m%s" % (str(time.time()))
|
txn_id = "m%s" % (str(time.time()))
|
||||||
|
|
||||||
|
|
|
@ -320,7 +320,7 @@ class HomeserverTestCase(TestCase):
|
||||||
def wait_for_background_updates(self) -> None:
|
def wait_for_background_updates(self) -> None:
|
||||||
"""Block until all background database updates have completed."""
|
"""Block until all background database updates have completed."""
|
||||||
while not self.get_success(
|
while not self.get_success(
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
|
||||||
):
|
):
|
||||||
self.get_success(
|
self.get_success(
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
||||||
|
|
|
@ -236,7 +236,7 @@ def setup_test_homeserver(
|
||||||
else:
|
else:
|
||||||
database_config = {
|
database_config = {
|
||||||
"name": "sqlite3",
|
"name": "sqlite3",
|
||||||
"args": {"database": ":memory:", "cp_min": 1, "cp_max": 1},
|
"args": {"database": "test.db", "cp_min": 1, "cp_max": 1},
|
||||||
}
|
}
|
||||||
|
|
||||||
if "db_txn_limit" in kwargs:
|
if "db_txn_limit" in kwargs:
|
||||||
|
|
Loading…
Reference in a new issue