0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2025-01-19 01:51:55 +01:00

Opentrace device lists (#5853)

Trace device list changes.
This commit is contained in:
Jorik Schellekens 2019-09-03 10:21:30 +01:00 committed by GitHub
parent 36f34e6f3d
commit a90d16dabc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 58 deletions

1
changelog.d/5853.feature Normal file
View file

@ -0,0 +1 @@
Opentracing for device list updates.

View file

@ -25,6 +25,7 @@ from synapse.api.errors import (
HttpResponseException, HttpResponseException,
RequestSendFailed, RequestSendFailed,
) )
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler() self._auth_handler = hs.get_auth_handler()
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def get_devices_by_user(self, user_id): def get_devices_by_user(self, user_id):
""" """
@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
defer.Deferred: list[dict[str, X]]: info on each device defer.Deferred: list[dict[str, X]]: info on each device
""" """
set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id) device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
for device in devices: for device in devices:
_update_device_from_client_ips(device, ips) _update_device_from_client_ips(device, ips)
log_kv(device_map)
return devices return devices
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def get_device(self, user_id, device_id): def get_device(self, user_id, device_id):
""" Retrieve the given device """ Retrieve the given device
@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
raise errors.NotFoundError raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips) _update_device_from_client_ips(device, ips)
set_tag("device", device)
set_tag("ips", ips)
return device return device
@measure_func("device.get_user_ids_changed") @measure_func("device.get_user_ids_changed")
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token): def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly """Get list of users that have had the devices updated, or have newly
@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
user_id (str) user_id (str)
from_token (StreamToken) from_token (StreamToken)
""" """
set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id() now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id) room_ids = yield self.store.get_rooms_for_user(user_id)
@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
# special-case for an empty prev state: include all members # special-case for an empty prev state: include all members
# in the changed list # in the changed list
if not event_ids: if not event_ids:
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for key, event_id in iteritems(current_state_ids): for key, event_id in iteritems(current_state_ids):
etype, state_key = key etype, state_key = key
if etype != EventTypes.Member: if etype != EventTypes.Member:
@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = [] possibly_joined = []
possibly_left = [] possibly_left = []
return {"changed": list(possibly_joined), "left": list(possibly_left)} result = {"changed": list(possibly_joined), "left": list(possibly_left)}
log_kv(result)
return result
class DeviceHandler(DeviceWorkerHandler): class DeviceHandler(DeviceWorkerHandler):
@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.") raise errors.StoreError(500, "Couldn't generate a device ID.")
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_device(self, user_id, device_id): def delete_device(self, user_id, device_id):
""" Delete the given device """ Delete the given device
@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e: except errors.StoreError as e:
if e.code == 404: if e.code == 404:
# no match # no match
set_tag("error", True)
log_kv(
{"reason": "User doesn't have device id.", "device_id": device_id}
)
pass pass
else: else:
raise raise
@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
yield self.notify_device_update(user_id, [device_id]) yield self.notify_device_update(user_id, [device_id])
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None): def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices """Delete all of the user's devices
@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e: except errors.StoreError as e:
if e.code == 404: if e.code == 404:
# no match # no match
set_tag("error", True)
set_tag("reason", "User doesn't have that device id.")
pass pass
else: else:
raise raise
@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
else: else:
raise raise
@trace
@measure_func("notify_device_update") @measure_func("notify_device_update")
@defer.inlineCallbacks @defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids): def notify_device_update(self, user_id, device_ids):
@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name) hosts.discard(self.server_name)
set_tag("target_hosts", hosts)
position = yield self.store.add_device_change_to_streams( position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts) user_id, device_ids, list(hosts)
) )
@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler):
) )
for host in hosts: for host in hosts:
self.federation_sender.send_device_messages(host) self.federation_sender.send_device_messages(host)
log_kv({"message": "sent device update to host", "host": host})
@defer.inlineCallbacks @defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id): def on_federation_query_user_devices(self, user_id):
@ -451,12 +483,15 @@ class DeviceListUpdater(object):
iterable=True, iterable=True,
) )
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content): def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible """Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list. for parsing the EDU and adding to pending updates list.
""" """
set_tag("origin", origin)
set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id") user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id") device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@ -471,12 +506,30 @@ class DeviceListUpdater(object):
device_id, device_id,
origin, origin,
) )
set_tag("error", True)
log_kv(
{
"message": "Got a device list update edu from a user and "
"device which does not match the origin of the request.",
"user_id": user_id,
"device_id": device_id,
}
)
return return
room_ids = yield self.store.get_rooms_for_user(user_id) room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids: if not room_ids:
# We don't share any rooms with this user. Ignore update, as we # We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates. # probably won't get any further updates.
set_tag("error", True)
log_kv(
{
"message": "Got an update from a user for which "
"we don't share any rooms",
"other user_id": user_id,
}
)
logger.warning( logger.warning(
"Got device list update edu for %r/%r, but don't share a room", "Got device list update edu for %r/%r, but don't share a room",
user_id, user_id,
@ -578,6 +631,7 @@ class DeviceListUpdater(object):
request: request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
""" """
log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user. # Fetch all devices for the user.
origin = get_domain_from_id(user_id) origin = get_domain_from_id(user_id)
try: try:
@ -594,13 +648,20 @@ class DeviceListUpdater(object):
# eventually become consistent. # eventually become consistent.
return return
except FederationDeniedError as e: except FederationDeniedError as e:
set_tag("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e) logger.info(e)
return return
except Exception: except Exception as e:
# TODO: Remember that we are now out of sync and try again # TODO: Remember that we are now out of sync and try again
# later # later
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id) logger.exception("Failed to handle device list update for %s", user_id)
return return
log_kv({"result": result})
stream_id = result["stream_id"] stream_id = result["stream_id"]
devices = result["devices"] devices = result["devices"]

View file

@ -22,6 +22,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.logging.opentracing import ( from synapse.logging.opentracing import (
get_active_span_text_map, get_active_span_text_map,
log_kv,
set_tag, set_tag,
start_active_span, start_active_span,
whitelisted_homeserver, whitelisted_homeserver,
@ -86,7 +87,8 @@ class DeviceMessageHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages): def send_device_message(self, sender_user_id, message_type, messages):
set_tag("number_of_messages", len(messages))
set_tag("sender", sender_user_id)
local_messages = {} local_messages = {}
remote_messages = {} remote_messages = {}
for user_id, by_device in messages.items(): for user_id, by_device in messages.items():
@ -124,6 +126,7 @@ class DeviceMessageHandler(object):
else None, else None,
} }
log_kv({"local_messages": local_messages})
stream_id = yield self.store.add_messages_to_device_inbox( stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents local_messages, remote_edu_contents
) )
@ -132,6 +135,7 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys() "to_device_key", stream_id, users=local_messages.keys()
) )
log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys(): for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new # Enqueue a new federation transaction to send the new
# device messages to each remote destination. # device messages to each remote destination.

View file

@ -85,14 +85,14 @@ the function becomes the operation name for the span.
return something_usual_and_useful return something_usual_and_useful
Operation names can be explicitly set for functions by using Operation names can be explicitly set for a function by passing the
``trace_using_operation_name`` operation name to ``trace``
.. code-block:: python .. code-block:: python
from synapse.logging.opentracing import trace_using_operation_name from synapse.logging.opentracing import trace
@trace_using_operation_name("A *much* better operation name") @trace(opname="a_better_operation_name")
def interesting_badly_named_function(*args, **kwargs): def interesting_badly_named_function(*args, **kwargs):
# Does all kinds of cool and expected things # Does all kinds of cool and expected things
return something_usual_and_useful return something_usual_and_useful
@ -641,66 +641,26 @@ def extract_text_map(carrier):
# Tracing decorators # Tracing decorators
def trace(func): def trace(func=None, opname=None):
""" """
Decorator to trace a function. Decorator to trace a function.
Sets the operation name to that of the function's. Sets the operation name to that of the function's or that given
as operation_name. See the module's doc string for usage
examples.
""" """
if opentracing is None:
return func
@wraps(func) def decorator(func):
def _trace_inner(self, *args, **kwargs):
if opentracing is None:
return func(self, *args, **kwargs)
scope = start_active_span(func.__name__)
scope.__enter__()
try:
result = func(self, *args, **kwargs)
if isinstance(result, defer.Deferred):
def call_back(result):
scope.__exit__(None, None, None)
return result
def err_back(result):
scope.span.set_tag(tags.ERROR, True)
scope.__exit__(None, None, None)
return result
result.addCallbacks(call_back, err_back)
else:
scope.__exit__(None, None, None)
return result
except Exception as e:
scope.__exit__(type(e), None, e.__traceback__)
raise
return _trace_inner
def trace_using_operation_name(operation_name):
"""Decorator to trace a function. Explicitely sets the operation_name."""
def trace(func):
"""
Decorator to trace a function.
Sets the operation name to that of the function's.
"""
if opentracing is None: if opentracing is None:
return func return func
_opname = opname if opname else func.__name__
@wraps(func) @wraps(func)
def _trace_inner(self, *args, **kwargs): def _trace_inner(self, *args, **kwargs):
if opentracing is None: if opentracing is None:
return func(self, *args, **kwargs) return func(self, *args, **kwargs)
scope = start_active_span(operation_name) scope = start_active_span(_opname)
scope.__enter__() scope.__enter__()
try: try:
@ -717,6 +677,7 @@ def trace_using_operation_name(operation_name):
return result return result
result.addCallbacks(call_back, err_back) result.addCallbacks(call_back, err_back)
else: else:
scope.__exit__(None, None, None) scope.__exit__(None, None, None)
@ -728,7 +689,10 @@ def trace_using_operation_name(operation_name):
return _trace_inner return _trace_inner
return trace if func:
return decorator(func)
else:
return decorator
def tag_args(func): def tag_args(func):

View file

@ -24,7 +24,7 @@ from synapse.http.servlet import (
parse_json_object_from_request, parse_json_object_from_request,
parse_string, parse_string,
) )
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import StreamToken from synapse.types import StreamToken
from ._base import client_patterns from ._base import client_patterns
@ -69,7 +69,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler() self.e2e_keys_handler = hs.get_e2e_keys_handler()
@trace_using_operation_name("upload_keys") @trace(opname="upload_keys")
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request, device_id): def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester = yield self.auth.get_user_by_req(request, allow_guest=True)

View file

@ -19,6 +19,7 @@ from twisted.internet import defer
from synapse.http import servlet from synapse.http import servlet
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.logging.opentracing import set_tag, trace
from synapse.rest.client.transactions import HttpTransactionCache from synapse.rest.client.transactions import HttpTransactionCache
from ._base import client_patterns from ._base import client_patterns
@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
self.txns = HttpTransactionCache(hs) self.txns = HttpTransactionCache(hs)
self.device_message_handler = hs.get_device_message_handler() self.device_message_handler = hs.get_device_message_handler()
@trace(opname="sendToDevice")
def on_PUT(self, request, message_type, txn_id): def on_PUT(self, request, message_type, txn_id):
set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request( return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id request, self._put, request, message_type, txn_id
) )

View file

@ -19,6 +19,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
@ -72,6 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"get_new_messages_for_device", get_new_messages_for_device_txn "get_new_messages_for_device", get_new_messages_for_device_txn
) )
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
""" """
@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
last_deleted_stream_id = self._last_device_delete_cache.get( last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), None (user_id, device_id), None
) )
set_tag("last_deleted_stream_id", last_deleted_stream_id)
if last_deleted_stream_id: if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed( has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_deleted_stream_id user_id, last_deleted_stream_id
) )
if not has_changed: if not has_changed:
log_kv({"message": "No changes in cache since last check"})
return 0 return 0
def delete_messages_for_device_txn(txn): def delete_messages_for_device_txn(txn):
@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_messages_for_device", delete_messages_for_device_txn "delete_messages_for_device", delete_messages_for_device_txn
) )
log_kv(
{"message": "deleted {} messages for device".format(count), "count": count}
)
# Update the cache, ensuring that we only ever increase the value # Update the cache, ensuring that we only ever increase the value
last_deleted_stream_id = self._last_device_delete_cache.get( last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0 (user_id, device_id), 0
@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count return count
@trace
def get_new_device_msgs_for_remote( def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit self, destination, last_stream_id, current_stream_id, limit
): ):
@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
in the stream the messages got to. in the stream the messages got to.
""" """
set_tag("destination", destination)
set_tag("last_stream_id", last_stream_id)
set_tag("current_stream_id", current_stream_id)
set_tag("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id destination, last_stream_id
) )
if not has_changed or last_stream_id == current_stream_id: if not has_changed or last_stream_id == current_stream_id:
log_kv({"message": "No new messages in stream"})
return defer.succeed(([], current_stream_id)) return defer.succeed(([], current_stream_id))
if limit <= 0: if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction. # This can happen if we run out of room for EDUs in the transaction.
return defer.succeed(([], last_stream_id)) return defer.succeed(([], last_stream_id))
@trace
def get_new_messages_for_remote_destination_txn(txn): def get_new_messages_for_remote_destination_txn(txn):
sql = ( sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox" "SELECT stream_id, messages_json FROM device_federation_outbox"
@ -156,6 +174,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = row[0] stream_pos = row[0]
messages.append(json.loads(row[1])) messages.append(json.loads(row[1]))
if len(messages) < limit: if len(messages) < limit:
log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id stream_pos = current_stream_id
return messages, stream_pos return messages, stream_pos
@ -164,6 +183,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
get_new_messages_for_remote_destination_txn, get_new_messages_for_remote_destination_txn,
) )
@trace
def delete_device_msgs_for_remote(self, destination, up_to_stream_id): def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
"""Used to delete messages when the remote destination acknowledges """Used to delete messages when the remote destination acknowledges
their receipt. their receipt.
@ -214,6 +234,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
expiry_ms=30 * 60 * 1000, expiry_ms=30 * 60 * 1000,
) )
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def add_messages_to_device_inbox( def add_messages_to_device_inbox(
self, local_messages_by_user_then_device, remote_messages_by_destination self, local_messages_by_user_then_device, remote_messages_by_destination

View file

@ -23,6 +23,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.logging.opentracing import ( from synapse.logging.opentracing import (
get_active_span_text_map, get_active_span_text_map,
set_tag,
trace, trace,
whitelisted_homeserver, whitelisted_homeserver,
) )
@ -321,6 +322,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_device_stream_token(self): def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token() return self._device_list_id_gen.get_current_token()
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list): def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache. """Get the devices (and keys if any) for remote users from the cache.
@ -352,6 +354,9 @@ class DeviceWorkerStore(SQLBaseStore):
else: else:
results[user_id] = yield self._get_cached_devices_for_user(user_id) results[user_id] = yield self._get_cached_devices_for_user(user_id)
set_tag("in_cache", results)
set_tag("not_in_cache", user_ids_not_in_cache)
return user_ids_not_in_cache, results return user_ids_not_in_cache, results
@cachedInlineCallbacks(num_args=2, tree=True) @cachedInlineCallbacks(num_args=2, tree=True)