From cb59e080627745d089d073d9dac276362d9abaf6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 6 Dec 2022 09:52:55 +0000 Subject: [PATCH] Improve logging and opentracing for to-device message handling (#14598) A batch of changes intended to make it easier to trace to-device messages through the system. The intention here is that a client can set a property org.matrix.msgid in any to-device message it sends. That ID is then included in any tracing or logging related to the message. (Suggestions as to where this field should be documented welcome. I'm not enthusiastic about speccing it - it's very much an optional extra to help with debugging.) I've also generally improved the data we send to opentracing for these messages. --- changelog.d/14598.feature | 1 + synapse/api/constants.py | 3 + .../sender/per_destination_queue.py | 2 +- synapse/handlers/appservice.py | 3 - synapse/handlers/devicemessage.py | 36 +++++--- synapse/handlers/sync.py | 26 ++++-- synapse/logging/opentracing.py | 11 ++- synapse/rest/client/sendtodevice.py | 1 - synapse/storage/databases/main/deviceinbox.py | 92 +++++++++++++++---- tests/handlers/test_appservice.py | 7 +- 10 files changed, 136 insertions(+), 46 deletions(-) create mode 100644 changelog.d/14598.feature diff --git a/changelog.d/14598.feature b/changelog.d/14598.feature new file mode 100644 index 000000000..88d561e28 --- /dev/null +++ b/changelog.d/14598.feature @@ -0,0 +1 @@ +Improve opentracing and logging for to-device message handling. \ No newline at end of file diff --git a/synapse/api/constants.py b/synapse/api/constants.py index bc04a0755..89723d24f 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -230,6 +230,9 @@ class EventContentFields: # The authorising user for joining a restricted room. AUTHORISING_USER: Final = "join_authorised_via_users_server" + # an unspecced field added to to-device messages to identify them uniquely-ish + TO_DEVICE_MSGID: Final = "org.matrix.msgid" + class RoomTypes: """Understood values of the room_type field of m.room.create events.""" diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 5af2784f1..ffc9d95ee 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -641,7 +641,7 @@ class PerDestinationQueue: if not message_id: continue - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id) edus = [ Edu( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f68027aae..5d1d21cdc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -578,9 +578,6 @@ class ApplicationServicesHandler: device_id, ), messages in recipient_device_to_messages.items(): for message_json in messages: - # Remove 'message_id' from the to-device message, as it's an internal ID - message_json.pop("message_id", None) - message_payload.append( { "to_user_id": user_id, diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 444c08bc2..75e89850f 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -15,7 +15,7 @@ import logging from typing import TYPE_CHECKING, Any, Dict -from synapse.api.constants import EduTypes, ToDeviceEventTypes +from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes from synapse.api.errors import SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background @@ -216,14 +216,24 @@ class DeviceMessageHandler: """ sender_user_id = requester.user.to_string() - message_id = random_string(16) - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) - - log_kv({"number_of_to_device_messages": len(messages)}) - set_tag("sender", sender_user_id) + set_tag(SynapseTags.TO_DEVICE_TYPE, message_type) + set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id) local_messages = {} remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {} for user_id, by_device in messages.items(): + # add an opentracing log entry for each message + for device_id, message_content in by_device.items(): + log_kv( + { + "event": "send_to_device_message", + "user_id": user_id, + "device_id": device_id, + EventContentFields.TO_DEVICE_MSGID: message_content.get( + EventContentFields.TO_DEVICE_MSGID + ), + } + ) + # Ratelimit local cross-user key requests by the sending device. if ( message_type == ToDeviceEventTypes.RoomKeyRequest @@ -233,6 +243,7 @@ class DeviceMessageHandler: requester, (sender_user_id, requester.device_id) ) if not allowed: + log_kv({"message": f"dropping key requests to {user_id}"}) logger.info( "Dropping room_key_request from %s to %s due to rate limit", sender_user_id, @@ -247,18 +258,11 @@ class DeviceMessageHandler: "content": message_content, "type": message_type, "sender": sender_user_id, - "message_id": message_id, } for device_id, message_content in by_device.items() } if messages_by_device: local_messages[user_id] = messages_by_device - log_kv( - { - "user_id": user_id, - "device_id": list(messages_by_device), - } - ) else: destination = get_domain_from_id(user_id) remote_messages.setdefault(destination, {})[user_id] = by_device @@ -267,7 +271,11 @@ class DeviceMessageHandler: remote_edu_contents = {} for destination, messages in remote_messages.items(): - log_kv({"destination": destination}) + # The EDU contains a "message_id" property which is used for + # idempotence. Make up a random one. + message_id = random_string(16) + log_kv({"destination": destination, "message_id": message_id}) + remote_edu_contents[destination] = { "messages": messages, "sender": sender_user_id, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0b395a104..dace9b606 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -31,14 +31,20 @@ from typing import ( import attr from prometheus_client import Counter -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.handlers.relations import BundledAggregations from synapse.logging.context import current_context -from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span +from synapse.logging.opentracing import ( + SynapseTags, + log_kv, + set_tag, + start_active_span, + trace, +) from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary @@ -1586,6 +1592,7 @@ class SyncHandler: else: return DeviceListUpdates() + @trace async def _generate_sync_entry_for_to_device( self, sync_result_builder: "SyncResultBuilder" ) -> None: @@ -1605,11 +1612,16 @@ class SyncHandler: ) for message in messages: - # We pop here as we shouldn't be sending the message ID down - # `/sync` - message_id = message.pop("message_id", None) - if message_id: - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + log_kv( + { + "event": "to_device_message", + "sender": message["sender"], + "type": message["type"], + EventContentFields.TO_DEVICE_MSGID: message["content"].get( + EventContentFields.TO_DEVICE_MSGID + ), + } + ) logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index b69060854..a705af835 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -292,8 +292,15 @@ logger = logging.getLogger(__name__) class SynapseTags: - # The message ID of any to_device message processed - TO_DEVICE_MESSAGE_ID = "to_device.message_id" + # The message ID of any to_device EDU processed + TO_DEVICE_EDU_ID = "to_device.edu_id" + + # Details about to-device messages + TO_DEVICE_TYPE = "to_device.type" + TO_DEVICE_SENDER = "to_device.sender" + TO_DEVICE_RECIPIENT = "to_device.recipient" + TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device" + TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID # Whether the sync response has new data to be returned to the client. SYNC_RESULT = "sync.new_data" diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py index 46a8b0382..55d52f0b2 100644 --- a/synapse/rest/client/sendtodevice.py +++ b/synapse/rest/client/sendtodevice.py @@ -46,7 +46,6 @@ class SendToDeviceRestServlet(servlet.RestServlet): def on_PUT( self, request: SynapseRequest, message_type: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("message_type", message_type) set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 73c95ffb6..48a54d9cb 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -26,8 +26,15 @@ from typing import ( cast, ) +from synapse.api.constants import EventContentFields from synapse.logging import issue9533_logger -from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.logging.opentracing import ( + SynapseTags, + log_kv, + set_tag, + start_active_span, + trace, +) from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore): (recipient_user_id, recipient_device_id), [] ).append(message_dict) + # start a new span for each message, so that we can tag each separately + with start_active_span("get_to_device_message"): + set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"]) + set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id) + set_tag( + SynapseTags.TO_DEVICE_MSGID, + message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID), + ) + if limit is not None and rowcount == limit: # We ended up bumping up against the message limit. There may be more messages # to retrieve. Return what we have, as well as the last stream position that @@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore): ], ) - if remote_messages_by_destination: - issue9533_logger.debug( - "Queued outgoing to-device messages with stream_id %i for %s", - stream_id, - list(remote_messages_by_destination.keys()), - ) + for destination, edu in remote_messages_by_destination.items(): + if issue9533_logger.isEnabledFor(logging.DEBUG): + issue9533_logger.debug( + "Queued outgoing to-device messages with " + "stream_id %i, EDU message_id %s, type %s for %s: %s", + stream_id, + edu["message_id"], + edu["type"], + destination, + [ + f"{user_id}/{device_id} (msgid " + f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})" + for (user_id, messages_by_device) in edu["messages"].items() + for (device_id, msg) in messages_by_device.items() + ], + ) + + for (user_id, messages_by_device) in edu["messages"].items(): + for (device_id, msg) in messages_by_device.items(): + with start_active_span("store_outgoing_to_device_message"): + set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"]) + set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"]) + set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) + set_tag( + SynapseTags.TO_DEVICE_MSGID, + msg.get(EventContentFields.TO_DEVICE_MSGID), + ) async with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self._clock.time_msec() @@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): # Only insert into the local inbox if the device exists on # this server device_id = row["device_id"] - message_json = json_encoder.encode(messages_by_device[device_id]) + + with start_active_span("serialise_to_device_message"): + msg = messages_by_device[device_id] + set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) + set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) + set_tag( + SynapseTags.TO_DEVICE_MSGID, + msg["content"].get(EventContentFields.TO_DEVICE_MSGID), + ) + message_json = json_encoder.encode(msg) + messages_json_for_user[device_id] = message_json if messages_json_for_user: @@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore): ], ) - issue9533_logger.debug( - "Stored to-device messages with stream_id %i for %s", - stream_id, - [ - (user_id, device_id) - for (user_id, messages_by_device) in local_by_user_then_device.items() - for device_id in messages_by_device.keys() - ], - ) + if issue9533_logger.isEnabledFor(logging.DEBUG): + issue9533_logger.debug( + "Stored to-device messages with stream_id %i: %s", + stream_id, + [ + f"{user_id}/{device_id} (msgid " + f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})" + for ( + user_id, + messages_by_device, + ) in messages_by_user_then_device.items() + for (device_id, msg) in messages_by_device.items() + ], + ) class DeviceInboxBackgroundUpdateStore(SQLBaseStore): diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 9ed26d87a..57bfbd773 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -765,7 +765,12 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)] messages = { self.exclusive_as_user: { - device_id: to_device_message_content for device_id in fake_device_ids + device_id: { + "type": "test_to_device_message", + "sender": "@some:sender", + "content": to_device_message_content, + } + for device_id in fake_device_ids } }