0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-09-27 12:09:06 +02:00

Improve logging and opentracing for to-device message handling (#14598)

A batch of changes intended to make it easier to trace to-device messages through the system.

The intention here is that a client can set a property org.matrix.msgid in any to-device message it sends. That ID is then included in any tracing or logging related to the message. (Suggestions as to where this field should be documented welcome. I'm not enthusiastic about speccing it - it's very much an optional extra to help with debugging.)

I've also generally improved the data we send to opentracing for these messages.
This commit is contained in:
Richard van der Hoff 2022-12-06 09:52:55 +00:00 committed by GitHub
parent cee9445884
commit cb59e08062
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 136 additions and 46 deletions

View file

@ -0,0 +1 @@
Improve opentracing and logging for to-device message handling.

View file

@ -230,6 +230,9 @@ class EventContentFields:
# The authorising user for joining a restricted room. # The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server" AUTHORISING_USER: Final = "join_authorised_via_users_server"
# an unspecced field added to to-device messages to identify them uniquely-ish
TO_DEVICE_MSGID: Final = "org.matrix.msgid"
class RoomTypes: class RoomTypes:
"""Understood values of the room_type field of m.room.create events.""" """Understood values of the room_type field of m.room.create events."""

View file

@ -641,7 +641,7 @@ class PerDestinationQueue:
if not message_id: if not message_id:
continue continue
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
edus = [ edus = [
Edu( Edu(

View file

@ -578,9 +578,6 @@ class ApplicationServicesHandler:
device_id, device_id,
), messages in recipient_device_to_messages.items(): ), messages in recipient_device_to_messages.items():
for message_json in messages: for message_json in messages:
# Remove 'message_id' from the to-device message, as it's an internal ID
message_json.pop("message_id", None)
message_payload.append( message_payload.append(
{ {
"to_user_id": user_id, "to_user_id": user_id,

View file

@ -15,7 +15,7 @@
import logging import logging
from typing import TYPE_CHECKING, Any, Dict from typing import TYPE_CHECKING, Any, Dict
from synapse.api.constants import EduTypes, ToDeviceEventTypes from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background from synapse.logging.context import run_in_background
@ -216,14 +216,24 @@ class DeviceMessageHandler:
""" """
sender_user_id = requester.user.to_string() sender_user_id = requester.user.to_string()
message_id = random_string(16) set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
local_messages = {} local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {} remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items(): for user_id, by_device in messages.items():
# add an opentracing log entry for each message
for device_id, message_content in by_device.items():
log_kv(
{
"event": "send_to_device_message",
"user_id": user_id,
"device_id": device_id,
EventContentFields.TO_DEVICE_MSGID: message_content.get(
EventContentFields.TO_DEVICE_MSGID
),
}
)
# Ratelimit local cross-user key requests by the sending device. # Ratelimit local cross-user key requests by the sending device.
if ( if (
message_type == ToDeviceEventTypes.RoomKeyRequest message_type == ToDeviceEventTypes.RoomKeyRequest
@ -233,6 +243,7 @@ class DeviceMessageHandler:
requester, (sender_user_id, requester.device_id) requester, (sender_user_id, requester.device_id)
) )
if not allowed: if not allowed:
log_kv({"message": f"dropping key requests to {user_id}"})
logger.info( logger.info(
"Dropping room_key_request from %s to %s due to rate limit", "Dropping room_key_request from %s to %s due to rate limit",
sender_user_id, sender_user_id,
@ -247,18 +258,11 @@ class DeviceMessageHandler:
"content": message_content, "content": message_content,
"type": message_type, "type": message_type,
"sender": sender_user_id, "sender": sender_user_id,
"message_id": message_id,
} }
for device_id, message_content in by_device.items() for device_id, message_content in by_device.items()
} }
if messages_by_device: if messages_by_device:
local_messages[user_id] = messages_by_device local_messages[user_id] = messages_by_device
log_kv(
{
"user_id": user_id,
"device_id": list(messages_by_device),
}
)
else: else:
destination = get_domain_from_id(user_id) destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device remote_messages.setdefault(destination, {})[user_id] = by_device
@ -267,7 +271,11 @@ class DeviceMessageHandler:
remote_edu_contents = {} remote_edu_contents = {}
for destination, messages in remote_messages.items(): for destination, messages in remote_messages.items():
log_kv({"destination": destination}) # The EDU contains a "message_id" property which is used for
# idempotence. Make up a random one.
message_id = random_string(16)
log_kv({"destination": destination, "message_id": message_id})
remote_edu_contents[destination] = { remote_edu_contents[destination] = {
"messages": messages, "messages": messages,
"sender": sender_user_id, "sender": sender_user_id,

View file

@ -31,14 +31,20 @@ from typing import (
import attr import attr
from prometheus_client import Counter from prometheus_client import Counter
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.filtering import FilterCollection from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
@ -1586,6 +1592,7 @@ class SyncHandler:
else: else:
return DeviceListUpdates() return DeviceListUpdates()
@trace
async def _generate_sync_entry_for_to_device( async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder" self, sync_result_builder: "SyncResultBuilder"
) -> None: ) -> None:
@ -1605,11 +1612,16 @@ class SyncHandler:
) )
for message in messages: for message in messages:
# We pop here as we shouldn't be sending the message ID down log_kv(
# `/sync` {
message_id = message.pop("message_id", None) "event": "to_device_message",
if message_id: "sender": message["sender"],
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) "type": message["type"],
EventContentFields.TO_DEVICE_MSGID: message["content"].get(
EventContentFields.TO_DEVICE_MSGID
),
}
)
logger.debug( logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)", "Returning %d to-device messages between %d and %d (current token: %d)",

View file

@ -292,8 +292,15 @@ logger = logging.getLogger(__name__)
class SynapseTags: class SynapseTags:
# The message ID of any to_device message processed # The message ID of any to_device EDU processed
TO_DEVICE_MESSAGE_ID = "to_device.message_id" TO_DEVICE_EDU_ID = "to_device.edu_id"
# Details about to-device messages
TO_DEVICE_TYPE = "to_device.type"
TO_DEVICE_SENDER = "to_device.sender"
TO_DEVICE_RECIPIENT = "to_device.recipient"
TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID
# Whether the sync response has new data to be returned to the client. # Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data" SYNC_RESULT = "sync.new_data"

View file

@ -46,7 +46,6 @@ class SendToDeviceRestServlet(servlet.RestServlet):
def on_PUT( def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]: ) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("message_type", message_type)
set_tag("txn_id", txn_id) set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request( return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id request, self._put, request, message_type, txn_id

View file

@ -26,8 +26,15 @@ from typing import (
cast, cast,
) )
from synapse.api.constants import EventContentFields
from synapse.logging import issue9533_logger from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.replication.tcp.streams import ToDeviceStream from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import ( from synapse.storage.database import (
@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(recipient_user_id, recipient_device_id), [] (recipient_user_id, recipient_device_id), []
).append(message_dict) ).append(message_dict)
# start a new span for each message, so that we can tag each separately
with start_active_span("get_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
)
if limit is not None and rowcount == limit: if limit is not None and rowcount == limit:
# We ended up bumping up against the message limit. There may be more messages # We ended up bumping up against the message limit. There may be more messages
# to retrieve. Return what we have, as well as the last stream position that # to retrieve. Return what we have, as well as the last stream position that
@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore):
], ],
) )
if remote_messages_by_destination: for destination, edu in remote_messages_by_destination.items():
issue9533_logger.debug( if issue9533_logger.isEnabledFor(logging.DEBUG):
"Queued outgoing to-device messages with stream_id %i for %s", issue9533_logger.debug(
stream_id, "Queued outgoing to-device messages with "
list(remote_messages_by_destination.keys()), "stream_id %i, EDU message_id %s, type %s for %s: %s",
) stream_id,
edu["message_id"],
edu["type"],
destination,
[
f"{user_id}/{device_id} (msgid "
f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
for (user_id, messages_by_device) in edu["messages"].items()
for (device_id, msg) in messages_by_device.items()
],
)
for (user_id, messages_by_device) in edu["messages"].items():
for (device_id, msg) in messages_by_device.items():
with start_active_span("store_outgoing_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg.get(EventContentFields.TO_DEVICE_MSGID),
)
async with self._device_inbox_id_gen.get_next() as stream_id: async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec() now_ms = self._clock.time_msec()
@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# Only insert into the local inbox if the device exists on # Only insert into the local inbox if the device exists on
# this server # this server
device_id = row["device_id"] device_id = row["device_id"]
message_json = json_encoder.encode(messages_by_device[device_id])
with start_active_span("serialise_to_device_message"):
msg = messages_by_device[device_id]
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
)
message_json = json_encoder.encode(msg)
messages_json_for_user[device_id] = message_json messages_json_for_user[device_id] = message_json
if messages_json_for_user: if messages_json_for_user:
@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
], ],
) )
issue9533_logger.debug( if issue9533_logger.isEnabledFor(logging.DEBUG):
"Stored to-device messages with stream_id %i for %s", issue9533_logger.debug(
stream_id, "Stored to-device messages with stream_id %i: %s",
[ stream_id,
(user_id, device_id) [
for (user_id, messages_by_device) in local_by_user_then_device.items() f"{user_id}/{device_id} (msgid "
for device_id in messages_by_device.keys() f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
], for (
) user_id,
messages_by_device,
) in messages_by_user_then_device.items()
for (device_id, msg) in messages_by_device.items()
],
)
class DeviceInboxBackgroundUpdateStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore):

View file

@ -765,7 +765,12 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)] fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)]
messages = { messages = {
self.exclusive_as_user: { self.exclusive_as_user: {
device_id: to_device_message_content for device_id in fake_device_ids device_id: {
"type": "test_to_device_message",
"sender": "@some:sender",
"content": to_device_message_content,
}
for device_id in fake_device_ids
} }
} }