Compare commits
10 commits
develop
...
anoa/e2e_a
Author | SHA1 | Date | |
---|---|---|---|
f8c8a54670 | |||
cf1adc278a | |||
eeccdf0e98 | |||
95a49727a6 | |||
1df714c21c | |||
e423a94b28 | |||
0ce33e58fc | |||
2fd99fefed | |||
1e5a0f2fea | |||
795d0584f6 |
1
changelog.d/11138.misc
Normal file
1
changelog.d/11138.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add docstrings and comments to the application service ephemeral event sending code.
|
|
@ -183,40 +183,65 @@ class ApplicationServicesHandler:
|
|||
self,
|
||||
stream_key: str,
|
||||
new_token: Optional[int],
|
||||
users: Optional[Collection[Union[str, UserID]]] = None,
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> None:
|
||||
"""This is called by the notifier in the background
|
||||
when a ephemeral event handled by the homeserver.
|
||||
"""
|
||||
This is called by the notifier in the background when
|
||||
an ephemeral event is handled by the homeserver.
|
||||
|
||||
This will determine which appservices
|
||||
are interested in the event, and submit them.
|
||||
|
||||
Events will only be pushed to appservices
|
||||
that have opted into ephemeral events
|
||||
This will determine which appservices are
|
||||
interested in the event, and submit them.
|
||||
|
||||
Args:
|
||||
stream_key: The stream the event came from.
|
||||
new_token: The latest stream token
|
||||
users: The user(s) involved with the event.
|
||||
|
||||
When `stream_key` is "typing_key", "receipt_key" or "presence_key", events
|
||||
will only be pushed to appservices that have opted into ephemeral events.
|
||||
Appservices will only receive ephemeral events that fall within their
|
||||
registered user and room namespaces.
|
||||
|
||||
TODO: Update this bit
|
||||
|
||||
Any other value for `stream_key` will cause this function to return early.
|
||||
|
||||
new_token: The latest stream token.
|
||||
users: The users that should be informed of the new event, if any.
|
||||
"""
|
||||
if not self.notify_appservices:
|
||||
return
|
||||
|
||||
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
|
||||
return
|
||||
if stream_key in ("typing_key", "receipt_key", "presence_key"):
|
||||
# Check whether there are any appservices which have registered to receive
|
||||
# ephemeral events.
|
||||
#
|
||||
# Note that whether these events are actually relevant to these appservices
|
||||
# is decided later on.
|
||||
services = [
|
||||
service
|
||||
for service in self.store.get_app_services()
|
||||
if service.supports_ephemeral
|
||||
]
|
||||
if not services:
|
||||
# Bail out early if none of the target appservices have explicitly registered
|
||||
# to receive these ephemeral events.
|
||||
return
|
||||
|
||||
services = [
|
||||
service
|
||||
for service in self.store.get_app_services()
|
||||
if service.supports_ephemeral
|
||||
]
|
||||
if not services:
|
||||
elif stream_key == "to_device_key":
|
||||
# Appservices do not need to register explicit support for receiving device list
|
||||
# updates.
|
||||
#
|
||||
# Note that whether these events are actually relevant to these appservices is
|
||||
# decided later on.
|
||||
services = self.store.get_app_services()
|
||||
|
||||
else:
|
||||
# This stream_key is not supported.
|
||||
return
|
||||
|
||||
# We only start a new background process if necessary rather than
|
||||
# optimistically (to cut down on overhead).
|
||||
self._notify_interested_services_ephemeral(
|
||||
services, stream_key, new_token, users or []
|
||||
services, stream_key, new_token, users
|
||||
)
|
||||
|
||||
@wrap_as_background_process("notify_interested_services_ephemeral")
|
||||
|
@ -227,45 +252,186 @@ class ApplicationServicesHandler:
|
|||
new_token: Optional[int],
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> None:
|
||||
logger.debug("Checking interested services for %s" % (stream_key))
|
||||
logger.debug("Checking interested services for %s" % stream_key)
|
||||
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
||||
for service in services:
|
||||
# Only handle typing if we have the latest token
|
||||
if stream_key == "typing_key" and new_token is not None:
|
||||
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
|
||||
# for typing_key due to performance reasons and due to their highly
|
||||
# ephemeral nature.
|
||||
#
|
||||
# Instead we simply grab the latest typing update in _handle_typing
|
||||
# and, if it applies to this application service, send it off.
|
||||
events = await self._handle_typing(service, new_token)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
# We don't persist the token for typing_key for performance reasons
|
||||
|
||||
elif stream_key == "receipt_key":
|
||||
events = await self._handle_receipts(service)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
# TODO: We seem to update the stream token for each appservice,
|
||||
# even if sending the ephemeral events to the appservice failed.
|
||||
# This is expected for typing, receipt and presence, but will need
|
||||
# to be handled for device* streams.
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "read_receipt", new_token
|
||||
)
|
||||
|
||||
elif stream_key == "presence_key":
|
||||
events = await self._handle_presence(service, users)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "presence", new_token
|
||||
)
|
||||
|
||||
elif stream_key == "to_device_key" and new_token is not None:
|
||||
events = await self._handle_to_device(service, new_token, users)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "to_device", new_token
|
||||
)
|
||||
|
||||
async def _handle_to_device(
|
||||
self,
|
||||
service: ApplicationService,
|
||||
new_token: int,
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> List[JsonDict]:
|
||||
"""
|
||||
Given an application service, determine which events it should receive
|
||||
from those between the last-recorded typing event stream token for this
|
||||
appservice and the given stream token.
|
||||
|
||||
Args:
|
||||
service: The application service to check for which events it should receive.
|
||||
new_token: The latest to-device event stream token.
|
||||
users: The users that should receive new to-device messages.
|
||||
|
||||
Returns:
|
||||
A list of JSON dictionaries containing data derived from the typing events that
|
||||
should be sent to the given application service.
|
||||
"""
|
||||
# Get the stream token that this application service has processed up until
|
||||
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||
service, "to_device"
|
||||
)
|
||||
|
||||
# Filter out users that this appservice is not interested in
|
||||
users_appservice_is_interested_in: List[str] = []
|
||||
for user in users:
|
||||
if isinstance(user, UserID):
|
||||
user = user.to_string()
|
||||
|
||||
if service.is_interested_in_user(user):
|
||||
users_appservice_is_interested_in.append(user)
|
||||
|
||||
if not users_appservice_is_interested_in:
|
||||
# Return early if the AS was not interested in any of these users
|
||||
return []
|
||||
|
||||
# Retrieve the to-device messages for each user
|
||||
(
|
||||
recipient_user_id_device_id_to_messages,
|
||||
max_stream_token,
|
||||
) = await self.store.get_new_messages(
|
||||
users_appservice_is_interested_in, from_key, new_token, limit=100
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"*** Users: %s, from: %s, to: %s",
|
||||
users_appservice_is_interested_in,
|
||||
from_key,
|
||||
new_token,
|
||||
)
|
||||
logger.info(
|
||||
"*** Got to-device message: %s", recipient_user_id_device_id_to_messages
|
||||
)
|
||||
|
||||
# TODO: Keep pulling out if max_stream_token != new_token?
|
||||
|
||||
# According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
|
||||
# to the event JSON so that the application service will know which user/device
|
||||
# combination this messages was intended for.
|
||||
#
|
||||
# So we mangle this dict into a flat list of to-device messages with the relevant
|
||||
# user ID and device ID embedded inside each message dict.
|
||||
message_payload: List[JsonDict] = []
|
||||
for (
|
||||
user_id,
|
||||
device_id,
|
||||
), messages in recipient_user_id_device_id_to_messages.items():
|
||||
for message_json in messages:
|
||||
# Remove 'message_id' from the to-device message, as it's an internal ID
|
||||
message_json.pop("message_id", None)
|
||||
|
||||
message_payload.append(
|
||||
{
|
||||
"to_user_id": user_id,
|
||||
"to_device_id": device_id,
|
||||
**message_json,
|
||||
}
|
||||
)
|
||||
|
||||
logger.info("*** Ended up with messages: %s", message_payload)
|
||||
|
||||
return message_payload
|
||||
|
||||
async def _handle_typing(
|
||||
self, service: ApplicationService, new_token: int
|
||||
) -> List[JsonDict]:
|
||||
"""
|
||||
Given an application service, determine which events it should receive
|
||||
from the given typing event stream token and now.
|
||||
|
||||
Args:
|
||||
service: The application service to check for which events it should receive.
|
||||
new_token: The latest typing event stream token.
|
||||
|
||||
Returns:
|
||||
A list of JSON dictionaries containing data derived from the typing events that
|
||||
should be sent to the given application service.
|
||||
"""
|
||||
typing_source = self.event_sources.sources.typing
|
||||
# Get the typing events from just before current
|
||||
typing, _ = await typing_source.get_new_events_as(
|
||||
service=service,
|
||||
# For performance reasons, we don't persist the previous
|
||||
# token in the DB and instead fetch the latest typing information
|
||||
# token in the DB and instead fetch the latest typing event
|
||||
# for appservices.
|
||||
# TODO: It'd probably be more efficient to simply fetch the
|
||||
# typing event with the given 'new_token' stream token and
|
||||
# checking if the given service was interested, rather than
|
||||
# iterating over all typing events and only grabbing the
|
||||
# latest one.
|
||||
from_key=new_token - 1,
|
||||
)
|
||||
return typing
|
||||
|
||||
async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
|
||||
"""
|
||||
Given an application service, determine which events it should receive
|
||||
from those between the last-recorded typing event stream token for this
|
||||
appservice and the latest one.
|
||||
|
||||
Args:
|
||||
service: The application service to check for which events it should receive.
|
||||
new_token: A typing event stream token. Typing events between this token and
|
||||
the current event stream token will be checked.
|
||||
|
||||
Returns:
|
||||
A list of JSON dictionaries containing data derived from the typing events that
|
||||
should be sent to the given application service.
|
||||
"""
|
||||
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||
service, "read_receipt"
|
||||
)
|
||||
|
@ -278,6 +444,19 @@ class ApplicationServicesHandler:
|
|||
async def _handle_presence(
|
||||
self, service: ApplicationService, users: Collection[Union[str, UserID]]
|
||||
) -> List[JsonDict]:
|
||||
"""
|
||||
Given an application service and a list of users who should be receiving
|
||||
presence updates, return a list of presence updates destined for the
|
||||
application service.
|
||||
|
||||
Args:
|
||||
service: The application service that ephemeral events are being sent to.
|
||||
users: The users that should receive the presence update.
|
||||
|
||||
Returns:
|
||||
A list of json dictionaries containing data derived from the presence events
|
||||
that should be sent to the given application service.
|
||||
"""
|
||||
events: List[JsonDict] = []
|
||||
presence_source = self.event_sources.sources.presence
|
||||
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||
|
|
|
@ -454,6 +454,10 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||
) -> None:
|
||||
"""Notify that a user's device(s) has changed. Pokes the notifier, and
|
||||
remote servers if the user is local.
|
||||
|
||||
Args:
|
||||
user_id: The Matrix ID of the user who's device list has been updated.
|
||||
device_ids: The device IDs that have changed.
|
||||
"""
|
||||
if not device_ids:
|
||||
# No changes to notify about, so this is a no-op.
|
||||
|
|
|
@ -89,6 +89,13 @@ class DeviceMessageHandler:
|
|||
)
|
||||
|
||||
async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
|
||||
"""
|
||||
Handle receiving to-device messages from remote homeservers.
|
||||
|
||||
Args:
|
||||
origin: The remote homeserver.
|
||||
content: The JSON dictionary containing the to-device messages.
|
||||
"""
|
||||
local_messages = {}
|
||||
sender_user_id = content["sender"]
|
||||
if origin != get_domain_from_id(sender_user_id):
|
||||
|
@ -135,12 +142,16 @@ class DeviceMessageHandler:
|
|||
message_type, sender_user_id, by_device
|
||||
)
|
||||
|
||||
stream_id = await self.store.add_messages_from_remote_to_device_inbox(
|
||||
# Add messages to the database.
|
||||
# Retrieve the stream token of the last-processed to-device message.
|
||||
max_stream_token = await self.store.add_messages_from_remote_to_device_inbox(
|
||||
origin, message_id, local_messages
|
||||
)
|
||||
|
||||
# Notify listeners that there are new to-device messages to process,
|
||||
# handing them the latest stream token.
|
||||
self.notifier.on_new_event(
|
||||
"to_device_key", stream_id, users=local_messages.keys()
|
||||
"to_device_key", max_stream_token, users=local_messages.keys()
|
||||
)
|
||||
|
||||
async def _check_for_unknown_devices(
|
||||
|
@ -195,6 +206,14 @@ class DeviceMessageHandler:
|
|||
message_type: str,
|
||||
messages: Dict[str, Dict[str, JsonDict]],
|
||||
) -> None:
|
||||
"""
|
||||
Handle a request from a user to send to-device message(s).
|
||||
|
||||
Args:
|
||||
requester: The user that is sending the to-device messages.
|
||||
message_type: The type of to-device messages that are being sent.
|
||||
messages: A dictionary containing recipients mapped to messages intended for them.
|
||||
"""
|
||||
sender_user_id = requester.user.to_string()
|
||||
|
||||
message_id = random_string(16)
|
||||
|
@ -257,12 +276,16 @@ class DeviceMessageHandler:
|
|||
"org.matrix.opentracing_context": json_encoder.encode(context),
|
||||
}
|
||||
|
||||
stream_id = await self.store.add_messages_to_device_inbox(
|
||||
# Add messages to the database.
|
||||
# Retrieve the stream token of the last-processed to-device message.
|
||||
max_stream_token = await self.store.add_messages_to_device_inbox(
|
||||
local_messages, remote_edu_contents
|
||||
)
|
||||
|
||||
# Notify listeners that there are new to-device messages to process,
|
||||
# handing them the latest stream token.
|
||||
self.notifier.on_new_event(
|
||||
"to_device_key", stream_id, users=local_messages.keys()
|
||||
"to_device_key", max_stream_token, users=local_messages.keys()
|
||||
)
|
||||
|
||||
if self.federation_sender:
|
||||
|
|
|
@ -1483,11 +1483,38 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) ->
|
|||
def format_user_presence_state(
|
||||
state: UserPresenceState, now: int, include_user_id: bool = True
|
||||
) -> JsonDict:
|
||||
"""Convert UserPresenceState to a format that can be sent down to clients
|
||||
"""Convert UserPresenceState to a JSON format that can be sent down to clients
|
||||
and to other servers.
|
||||
|
||||
The "user_id" is optional so that this function can be used to format presence
|
||||
updates for client /sync responses and for federation /send requests.
|
||||
Args:
|
||||
state: The user presence state to format.
|
||||
now: The current timestamp since the epoch in ms.
|
||||
include_user_id: Whether to include `user_id` in the returned dictionary.
|
||||
As this function can be used both to format presence updates for client /sync
|
||||
responses and for federation /send requests, only the latter needs the include
|
||||
the `user_id` field.
|
||||
|
||||
Returns:
|
||||
A JSON dictionary with the following keys:
|
||||
* presence: The presence state as a str.
|
||||
* user_id: Optional. Included if `include_user_id` is truthy. The canonical
|
||||
Matrix ID of the user.
|
||||
* last_active_ago: Optional. Included if `last_active_ts` is set on `state`.
|
||||
The timestamp that the user was last active.
|
||||
* status_msg: Optional. Included if `status_msg` is set on `state`. The user's
|
||||
status.
|
||||
* currently_active: Optional. Included only if `state.state` is "online". Set to
|
||||
the value of `state.currently_active`.
|
||||
|
||||
Example:
|
||||
|
||||
{
|
||||
"presence": "online",
|
||||
"user_id": "@alice:example.com",
|
||||
"last_active_ago": 16783813918,
|
||||
"status_msg": "Hello world!",
|
||||
"currently_active": True
|
||||
}
|
||||
"""
|
||||
content: JsonDict = {"presence": state.state}
|
||||
if include_user_id:
|
||||
|
|
|
@ -241,12 +241,18 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
|||
async def get_new_events_as(
|
||||
self, from_key: int, service: ApplicationService
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
"""Returns a set of new receipt events that an appservice
|
||||
"""Returns a set of new read receipt events that an appservice
|
||||
may be interested in.
|
||||
|
||||
Args:
|
||||
from_key: the stream position at which events should be fetched from
|
||||
service: The appservice which may be interested
|
||||
|
||||
Returns:
|
||||
A two-tuple containing the following:
|
||||
* A list of json dictionaries derived from read receipts that the
|
||||
appservice may be interested in.
|
||||
* The current read receipt stream token.
|
||||
"""
|
||||
from_key = int(from_key)
|
||||
to_key = self.get_current_key()
|
||||
|
@ -261,6 +267,10 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
|||
)
|
||||
|
||||
# Then filter down to rooms that the AS can read
|
||||
# TODO: This doesn't seem to honour an appservice's registration of room or
|
||||
# namespace aliases. For instance, if an appservice registered a room namespace
|
||||
# that matched this room, but it didn't have any members in the room, then that
|
||||
# appservice wouldn't receive the read receipt.
|
||||
events = []
|
||||
for room_id, event in rooms_to_events.items():
|
||||
if not await service.matches_user_in_member_list(room_id, self.store):
|
||||
|
|
|
@ -467,15 +467,25 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
|
|||
Args:
|
||||
from_key: the stream position at which events should be fetched from
|
||||
service: The appservice which may be interested
|
||||
|
||||
Returns:
|
||||
A two-tuple containing the following:
|
||||
* A list of json dictionaries derived from typing events that the
|
||||
appservice may be interested in.
|
||||
* The latest known room serial.
|
||||
"""
|
||||
with Measure(self.clock, "typing.get_new_events_as"):
|
||||
from_key = int(from_key)
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
events = []
|
||||
for room_id in handler._room_serials.keys():
|
||||
if handler._room_serials[room_id] <= from_key:
|
||||
continue
|
||||
|
||||
# TODO: This doesn't seem to honour an appservice's registration of room or
|
||||
# namespace aliases. For instance, if an appservice registered a room namespace
|
||||
# that matched this room, but it didn't have any members in the room, then that
|
||||
# appservice wouldn't receive the typing event.
|
||||
if not await service.matches_user_in_member_list(
|
||||
room_id, handler.store
|
||||
):
|
||||
|
|
|
@ -378,14 +378,21 @@ class Notifier:
|
|||
self,
|
||||
stream_key: str,
|
||||
new_token: Union[int, RoomStreamToken],
|
||||
users: Optional[Collection[Union[str, UserID]]] = None,
|
||||
):
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> None:
|
||||
"""Notify application services of ephemeral event activity.
|
||||
|
||||
Args:
|
||||
stream_key: The stream the event came from.
|
||||
new_token: The value of the new stream token.
|
||||
users: The users that should be informed of the new event, if any.
|
||||
"""
|
||||
try:
|
||||
stream_token = None
|
||||
if isinstance(new_token, int):
|
||||
stream_token = new_token
|
||||
self.appservice_handler.notify_interested_services_ephemeral(
|
||||
stream_key, stream_token, users or []
|
||||
stream_key, stream_token, users
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of event")
|
||||
|
@ -402,10 +409,17 @@ class Notifier:
|
|||
new_token: Union[int, RoomStreamToken],
|
||||
users: Optional[Collection[Union[str, UserID]]] = None,
|
||||
rooms: Optional[Collection[str]] = None,
|
||||
):
|
||||
) -> None:
|
||||
"""Used to inform listeners that something has happened event wise.
|
||||
|
||||
Will wake up all listeners for the given users and rooms.
|
||||
|
||||
Args:
|
||||
stream_key: The stream the event came from.
|
||||
new_token: The value of the new stream token.
|
||||
users: The users that should be informed of the new event.
|
||||
rooms: A collection of room IDs for which each joined member will be
|
||||
informed of the new event.
|
||||
"""
|
||||
users = users or []
|
||||
rooms = rooms or []
|
||||
|
|
|
@ -387,7 +387,7 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
async def get_type_stream_id_for_appservice(
|
||||
self, service: ApplicationService, type: str
|
||||
) -> int:
|
||||
if type not in ("read_receipt", "presence"):
|
||||
if type not in ("read_receipt", "presence", "to_device"):
|
||||
raise ValueError(
|
||||
"Expected type to be a valid application stream id type, got %s"
|
||||
% (type,)
|
||||
|
@ -412,16 +412,16 @@ class ApplicationServiceTransactionWorkerStore(
|
|||
)
|
||||
|
||||
async def set_type_stream_id_for_appservice(
|
||||
self, service: ApplicationService, type: str, pos: Optional[int]
|
||||
self, service: ApplicationService, stream_type: str, pos: Optional[int]
|
||||
) -> None:
|
||||
if type not in ("read_receipt", "presence"):
|
||||
if stream_type not in ("read_receipt", "presence", "to_device"):
|
||||
raise ValueError(
|
||||
"Expected type to be a valid application stream id type, got %s"
|
||||
% (type,)
|
||||
)
|
||||
|
||||
def set_type_stream_id_for_appservice_txn(txn):
|
||||
stream_id_type = "%s_stream_id" % type
|
||||
stream_id_type = "%s_stream_id" % stream_type
|
||||
txn.execute(
|
||||
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
|
||||
% stream_id_type,
|
||||
|
|
|
@ -13,15 +13,17 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import List, Optional, Tuple
|
||||
from typing import Collection, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
@ -112,31 +114,125 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
def get_to_device_stream_token(self):
|
||||
return self._device_inbox_id_gen.get_current_token()
|
||||
|
||||
async def get_new_messages(
|
||||
self,
|
||||
user_ids: Collection[str],
|
||||
from_stream_token: int,
|
||||
to_stream_token: int,
|
||||
limit: int = 100,
|
||||
) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
|
||||
"""
|
||||
Retrieve to-device messages for a given set of user IDs.
|
||||
|
||||
Only to-device messages with stream tokens between the given boundaries
|
||||
(from < X <= to) are returned.
|
||||
|
||||
Note that multiple messages can have the same stream token. Stream tokens are
|
||||
unique to *messages*, but there might be multiple recipients of a message, and
|
||||
thus multiple entries in the device_inbox table with the same stream token.
|
||||
|
||||
Args:
|
||||
user_ids: The users to retrieve to-device messages for.
|
||||
from_stream_token: The lower boundary of stream token to filter with (exclusive).
|
||||
to_stream_token: The upper boundary of stream token to filter with (inclusive).
|
||||
limit: The maximum number of to-device messages to return.
|
||||
|
||||
Returns:
|
||||
A tuple containing the following:
|
||||
* A list of to-device messages
|
||||
"""
|
||||
# Bail out if none of these users have any messages
|
||||
for user_id in user_ids:
|
||||
if self._device_inbox_stream_cache.has_entity_changed(
|
||||
user_id, from_stream_token
|
||||
):
|
||||
break
|
||||
else:
|
||||
logger.info("*** Bailing out")
|
||||
return {}, to_stream_token
|
||||
|
||||
def get_new_messages_txn(txn):
|
||||
# Build a query to select messages from any of the given users that are between
|
||||
# the given stream token bounds
|
||||
sql = "SELECT stream_id, user_id, device_id, message_json FROM device_inbox"
|
||||
|
||||
# Scope to only the given users. We need to use this method as doing so is
|
||||
# different across database engines.
|
||||
many_clause_sql, many_clause_args = make_in_list_sql_clause(
|
||||
self.database_engine, "user_id", user_ids
|
||||
)
|
||||
|
||||
sql += (
|
||||
" WHERE %s"
|
||||
" AND ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC"
|
||||
" LIMIT ?"
|
||||
) % many_clause_sql
|
||||
|
||||
logger.info("*** %s\n\n%s", many_clause_sql, many_clause_args)
|
||||
logger.info("*** %s", sql)
|
||||
txn.execute(
|
||||
sql, (*many_clause_args, from_stream_token, to_stream_token, limit)
|
||||
)
|
||||
|
||||
# Create a dictionary of (user ID, device ID) -> list of messages that
|
||||
# that device is meant to receive.
|
||||
recipient_user_id_device_id_to_messages = {}
|
||||
|
||||
stream_pos = to_stream_token
|
||||
total_messages_processed = 0
|
||||
for row in txn:
|
||||
# Record the last-processed stream position, to return later.
|
||||
# Note that we process messages here in order of ascending stream token.
|
||||
stream_pos = row[0]
|
||||
recipient_user_id = row[1]
|
||||
recipient_device_id = row[2]
|
||||
message_dict = db_to_json(row[3])
|
||||
|
||||
recipient_user_id_device_id_to_messages.setdefault(
|
||||
(recipient_user_id, recipient_device_id), []
|
||||
).append(message_dict)
|
||||
total_messages_processed += 1
|
||||
|
||||
# This is needed (REVIEW: I think) as you can have multiple rows for a
|
||||
# single to-device message (due to multiple recipients).
|
||||
if total_messages_processed < limit:
|
||||
stream_pos = to_stream_token
|
||||
|
||||
return recipient_user_id_device_id_to_messages, stream_pos
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_new_messages", get_new_messages_txn
|
||||
)
|
||||
|
||||
async def get_new_messages_for_device(
|
||||
self,
|
||||
user_id: str,
|
||||
device_id: Optional[str],
|
||||
last_stream_id: int,
|
||||
current_stream_id: int,
|
||||
last_stream_token: int,
|
||||
current_stream_token: int,
|
||||
limit: int = 100,
|
||||
) -> Tuple[List[dict], int]:
|
||||
"""
|
||||
Args:
|
||||
user_id: The recipient user_id.
|
||||
device_id: The recipient device_id.
|
||||
last_stream_id: The last stream ID checked.
|
||||
current_stream_id: The current position of the to device
|
||||
last_stream_token: The last stream ID checked.
|
||||
current_stream_token: The current position of the to device
|
||||
message stream.
|
||||
limit: The maximum number of messages to retrieve.
|
||||
|
||||
Returns:
|
||||
A list of messages for the device and where in the stream the messages got to.
|
||||
A tuple containing:
|
||||
* A list of messages for the device
|
||||
* The max stream token of these messages. There may be more to retrieve
|
||||
if the given limit was reached.
|
||||
"""
|
||||
has_changed = self._device_inbox_stream_cache.has_entity_changed(
|
||||
user_id, last_stream_id
|
||||
user_id, last_stream_token
|
||||
)
|
||||
if not has_changed:
|
||||
return [], current_stream_id
|
||||
return [], current_stream_token
|
||||
|
||||
def get_new_messages_for_device_txn(txn):
|
||||
sql = (
|
||||
|
@ -147,20 +243,53 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(
|
||||
sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
|
||||
sql,
|
||||
(user_id, device_id, last_stream_token, current_stream_token, limit),
|
||||
)
|
||||
messages = []
|
||||
stream_pos = current_stream_token
|
||||
|
||||
for row in txn:
|
||||
stream_pos = row[0]
|
||||
messages.append(db_to_json(row[1]))
|
||||
if len(messages) < limit:
|
||||
stream_pos = current_stream_id
|
||||
stream_pos = current_stream_token
|
||||
return messages, stream_pos
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_new_messages_for_device", get_new_messages_for_device_txn
|
||||
)
|
||||
|
||||
# TODO: OpenTracing support
|
||||
async def delete_messages_for_appservice_users(
|
||||
self, service: ApplicationService, users: Iterable[str], up_to_stream_id: int
|
||||
) -> int:
|
||||
"""
|
||||
Delete to-device messages that have been delivered to an application service.
|
||||
|
||||
Only messages
|
||||
that are intended for users that the given application service has registered an exclusive
|
||||
namespace for will be deleted. If a message is intended for a user, but that user is not
|
||||
part of an exclusive namespace registered by the given application service, then that
|
||||
message will remain in-tact.
|
||||
|
||||
Args:
|
||||
service: The application service to inspect user namespaces of.
|
||||
users: The to-device message recipient users to scope to.
|
||||
up_to_stream_id: The maximum message stream token to consider, inclusive.
|
||||
|
||||
Returns:
|
||||
The number of messages deleted.
|
||||
# TODO: Useful now that we skip stream tokens?
|
||||
"""
|
||||
# TODO: Check cache
|
||||
|
||||
# Filter users based on exclusive namespace
|
||||
|
||||
# Only delete messages for those users up to the stream ID
|
||||
|
||||
# So potentially just a delete_messages_for_users method?
|
||||
|
||||
@trace
|
||||
async def delete_messages_for_device(
|
||||
self, user_id: str, device_id: Optional[str], up_to_stream_id: int
|
||||
|
@ -169,7 +298,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
Args:
|
||||
user_id: The recipient user_id.
|
||||
device_id: The recipient device_id.
|
||||
up_to_stream_id: Where to delete messages up to.
|
||||
up_to_stream_id: Where to delete messages up to, inclusive.
|
||||
|
||||
Returns:
|
||||
The number of messages deleted.
|
||||
|
@ -369,7 +498,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
|
||||
Args:
|
||||
local_messages_by_user_and_device:
|
||||
Dictionary of user_id to device_id to message.
|
||||
Dictionary of recipient user_id to recipient device_id to message.
|
||||
remote_messages_by_destination:
|
||||
Dictionary of destination server_name to the EDU JSON to send.
|
||||
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Add a column to track what to_device stream token that this application
|
||||
-- service has been caught up to.
|
||||
ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT;
|
Loading…
Reference in a new issue