Add a new ephemeral AS handler for to_device message edus

Here we add the ability for the application service ephemeral message
processor to handle new events on the "to_device" stream.

We keep track of a stream id (token) per application service, and every
time a new to-device message comes in, for each appservice we pull the
messages between the last-recorded and current stream id and check
whether any of the messages are for a user in that appservice's user
namespace.

get_new_messages is implemented in the next commit.
This commit is contained in:
Andrew Morgan 2021-11-05 15:47:33 +00:00
parent 0772fc855b
commit 52c43d91f5
2 changed files with 145 additions and 16 deletions

View file

@ -12,7 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)
from prometheus_client import Counter
@ -42,6 +51,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION = 100
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
@ -202,8 +213,9 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
`stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
value for `stream_key` will cause this function to return early.
`stream_key` can be "typing_key", "receipt_key", "presence_key" or
"to_device_key". Any other value for `stream_key` will cause this function
to return early.
Ephemeral events will only be pushed to appservices that have opted into
receiving them by setting `push_ephemeral` to true in their registration
@ -219,10 +231,6 @@ class ApplicationServicesHandler:
if not self.notify_appservices:
return
# Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
# Assert that new_token is an integer (and not a RoomStreamToken).
# All of the supported streams that this function handles use an
# integer to track progress (rather than a RoomStreamToken - a
@ -236,6 +244,13 @@ class ApplicationServicesHandler:
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
# Ignore to-device messages if the feature flag is not enabled
if (
stream_key == "to_device_key"
and not self.msc2409_to_device_messages_enabled
):
return
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
@ -310,6 +325,30 @@ class ApplicationServicesHandler:
service, "presence", new_token
)
elif (
stream_key == "to_device_key"
and new_token is not None
and self.msc2409_to_device_messages_enabled
):
# Retrieve an iterable of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
events, max_stream_token = await self._handle_to_device(
service, new_token, users
)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)
# TODO: If max_stream_token != new_token, schedule another transaction immediately,
# instead of waiting for another to-device to be sent?
# https://github.com/matrix-org/synapse/issues/11150#issuecomment-960726449
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "to_device", max_stream_token
)
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
@ -443,6 +482,88 @@ class ApplicationServicesHandler:
return events
async def _handle_to_device(
self,
service: ApplicationService,
new_token: int,
users: Collection[Union[str, UserID]],
) -> Tuple[List[JsonDict], int]:
"""
Given an application service, determine which events it should receive
from those between the last-recorded typing event stream token for this
appservice and the given stream token.
Args:
service: The application service to check for which events it should receive.
new_token: The latest to-device event stream token.
users: The users that should receive new to-device messages.
Returns:
A two-tuple containing the following:
* A list of JSON dictionaries containing data derived from the typing events
that should be sent to the given application service.
* The last-processed to-device message's stream id. If this does not equal
`new_token` then there are likely more events to send.
"""
# Get the stream token that this application service has processed up until
from_key = await self.store.get_type_stream_id_for_appservice(
service, "to_device"
)
# Filter out users that this appservice is not interested in
users_appservice_is_interested_in: List[str] = []
for user in users:
if isinstance(user, UserID):
user = user.to_string()
if service.is_interested_in_user(user):
users_appservice_is_interested_in.append(user)
if not users_appservice_is_interested_in:
# Return early if the AS was not interested in any of these users
return [], new_token
# Retrieve the to-device messages for each user
(
recipient_user_id_device_id_to_messages,
# Record the maximum stream token of the retrieved messages that
# this function was able to pull before hitting the max to-device
# message count limit.
# We return this value later, to ensure we only record the stream
# token we managed to get up to, so that the rest can be sent later.
max_stream_token,
) = await self.store.get_new_messages(
users_appservice_is_interested_in,
from_key,
new_token,
limit=MAX_TO_DEVICE_MESSAGES_PER_AS_TRANSACTION,
)
# According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
# to the event JSON so that the application service will know which user/device
# combination this messages was intended for.
#
# So we mangle this dict into a flat list of to-device messages with the relevant
# user ID and device ID embedded inside each message dict.
message_payload: List[JsonDict] = []
for (
user_id,
device_id,
), messages in recipient_user_id_device_id_to_messages.items():
for message_json in messages:
# Remove 'message_id' from the to-device message, as it's an internal ID
message_json.pop("message_id", None)
message_payload.append(
{
"to_user_id": user_id,
"to_device_id": device_id,
**message_json,
}
)
return message_payload, max_stream_token
async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.

View file

@ -444,15 +444,23 @@ class Notifier:
self.notify_replication()
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")
# Notify appservices of updates in ephemeral event streams.
# Only the following streams are currently supported.
if stream_key in (
"typing_key",
"receipt_key",
"presence_key",
"to_device_key",
):
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened