Compare commits

...

10 commits

Author SHA1 Message Date
Erik Johnston c5422e2ec1 Log callbacks 2021-11-08 14:52:51 +00:00
Erik Johnston f2d2842f56 Fixup 2021-11-08 14:35:26 +00:00
Erik Johnston 61e28c41cd Fixup 2021-11-08 14:33:58 +00:00
Erik Johnston 49c9424c0e install timers 2021-11-08 14:32:48 +00:00
Erik Johnston 50b8170181 Log long callFromThread 2021-11-08 14:29:19 +00:00
Erik Johnston da78f01a64 Log slow ticks 2021-11-08 14:11:39 +00:00
Erik Johnston 1783084fd9 More tracing 2021-11-08 11:41:29 +00:00
Erik Johnston 5e88964994 Trace event fetches 2021-11-08 11:35:27 +00:00
Erik Johnston 58864265d1 Merge remote-tracking branch 'origin/develop' into erikj/slow_sync_diag 2021-11-08 11:34:03 +00:00
Erik Johnston ee223fc7b9 Add spans for sync 2021-11-01 15:15:23 +00:00
3 changed files with 124 additions and 54 deletions

View file

@ -34,7 +34,13 @@ from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase from synapse.events import EventBase
from synapse.logging.context import current_context from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
@ -483,6 +489,7 @@ class SyncHandler:
return now_token, ephemeral_by_room return now_token, ephemeral_by_room
@trace
async def _load_filtered_recents( async def _load_filtered_recents(
self, self,
room_id: str, room_id: str,
@ -639,6 +646,7 @@ class SyncHandler:
state_ids[(event.type, event.state_key)] = event.event_id state_ids[(event.type, event.state_key)] = event.event_id
return state_ids return state_ids
@trace
async def get_state_at( async def get_state_at(
self, self,
room_id: str, room_id: str,
@ -671,6 +679,7 @@ class SyncHandler:
state = {} state = {}
return state return state
@trace
async def compute_summary( async def compute_summary(
self, self,
room_id: str, room_id: str,
@ -819,6 +828,7 @@ class SyncHandler:
logger.debug("found LruCache for %r", cache_key) logger.debug("found LruCache for %r", cache_key)
return cache return cache
@trace
async def compute_state_delta( async def compute_state_delta(
self, self,
room_id: str, room_id: str,
@ -1064,9 +1074,10 @@ class SyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144 # See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError() raise NotImplementedError()
else: else:
joined_room_ids = await self.get_rooms_for_user_at( with start_active_span("get_rooms_for_user_at"):
user_id, now_token.room_key joined_room_ids = await self.get_rooms_for_user_at(
) user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder( sync_result_builder = SyncResultBuilder(
sync_config, sync_config,
full_state, full_state,
@ -1077,15 +1088,18 @@ class SyncHandler:
logger.debug("Fetching account data") logger.debug("Fetching account data")
account_data_by_room = await self._generate_sync_entry_for_account_data( with start_active_span("_generate_sync_entry_for_account_data"):
sync_result_builder account_data_by_room = await self._generate_sync_entry_for_account_data(
) sync_result_builder
)
logger.debug("Fetching room data") logger.debug("Fetching room data")
res = await self._generate_sync_entry_for_rooms( with start_active_span("_generate_sync_entry_for_rooms"):
sync_result_builder, account_data_by_room res = await self._generate_sync_entry_for_rooms(
) sync_result_builder, account_data_by_room
)
newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res
_, _, newly_left_rooms, newly_left_users = res _, _, newly_left_rooms, newly_left_users = res
@ -1094,22 +1108,25 @@ class SyncHandler:
) )
if self.hs_config.server.use_presence and not block_all_presence_data: if self.hs_config.server.use_presence and not block_all_presence_data:
logger.debug("Fetching presence data") logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence( with start_active_span("_generate_sync_entry_for_presence"):
sync_result_builder, await self._generate_sync_entry_for_presence(
newly_joined_rooms, sync_result_builder,
newly_joined_or_invited_or_knocked_users, newly_joined_rooms,
) newly_joined_or_invited_or_knocked_users,
)
logger.debug("Fetching to-device data") logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder) with start_active_span("_generate_sync_entry_for_to_device"):
await self._generate_sync_entry_for_to_device(sync_result_builder)
device_lists = await self._generate_sync_entry_for_device_list( with start_active_span("_generate_sync_entry_for_device_list"):
sync_result_builder, device_lists = await self._generate_sync_entry_for_device_list(
newly_joined_rooms=newly_joined_rooms, sync_result_builder,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, newly_joined_rooms=newly_joined_rooms,
newly_left_rooms=newly_left_rooms, newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_users=newly_left_users, newly_left_rooms=newly_left_rooms,
) newly_left_users=newly_left_users,
)
logger.debug("Fetching OTK data") logger.debug("Fetching OTK data")
device_id = sync_config.device_id device_id = sync_config.device_id
@ -1120,15 +1137,19 @@ class SyncHandler:
# * no change in OTK count since the provided since token # * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device # * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_key_counts = await self.store.count_e2e_one_time_keys( with start_active_span("count_e2e_one_time_keys"):
user_id, device_id one_time_key_counts = await self.store.count_e2e_one_time_keys(
) user_id, device_id
unused_fallback_key_types = ( )
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) unused_fallback_key_types = (
) await self.store.get_e2e_unused_fallback_key_types(
user_id, device_id
)
)
logger.debug("Fetching group data") logger.debug("Fetching group data")
await self._generate_sync_entry_for_groups(sync_result_builder) with start_active_span("_generate_sync_entry_for_groups"):
await self._generate_sync_entry_for_groups(sync_result_builder)
num_events = 0 num_events = 0
@ -1478,12 +1499,13 @@ class SyncHandler:
if block_all_room_ephemeral: if block_all_room_ephemeral:
ephemeral_by_room: Dict[str, List[JsonDict]] = {} ephemeral_by_room: Dict[str, List[JsonDict]] = {}
else: else:
now_token, ephemeral_by_room = await self.ephemeral_by_room( with start_active_span("ephemeral_by_room"):
sync_result_builder, now_token, ephemeral_by_room = await self.ephemeral_by_room(
now_token=sync_result_builder.now_token, sync_result_builder,
since_token=sync_result_builder.since_token, now_token=sync_result_builder.now_token,
) since_token=sync_result_builder.since_token,
sync_result_builder.now_token = now_token )
sync_result_builder.now_token = now_token
# We check up front if anything has changed, if it hasn't then there is # We check up front if anything has changed, if it hasn't then there is
# no point in going further. # no point in going further.
@ -1493,18 +1515,20 @@ class SyncHandler:
have_changed = await self._have_rooms_changed(sync_result_builder) have_changed = await self._have_rooms_changed(sync_result_builder)
log_kv({"rooms_have_changed": have_changed}) log_kv({"rooms_have_changed": have_changed})
if not have_changed: if not have_changed:
tags_by_room = await self.store.get_updated_tags( with start_active_span("get_updated_tags"):
user_id, since_token.account_data_key tags_by_room = await self.store.get_updated_tags(
) user_id, since_token.account_data_key
if not tags_by_room: )
logger.debug("no-oping sync") if not tags_by_room:
return set(), set(), set(), set() logger.debug("no-oping sync")
return set(), set(), set(), set()
ignored_account_data = ( with start_active_span("get_global_account_data_by_type_for_user"):
await self.store.get_global_account_data_by_type_for_user( ignored_account_data = (
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id await self.store.get_global_account_data_by_type_for_user(
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
)
) )
)
# If there is ignored users account data and it matches the proper type, # If there is ignored users account data and it matches the proper type,
# then use it. # then use it.
@ -1515,12 +1539,15 @@ class SyncHandler:
ignored_users = frozenset(ignored_users_data.keys()) ignored_users = frozenset(ignored_users_data.keys())
if since_token: if since_token:
room_changes = await self._get_rooms_changed( with start_active_span("_get_rooms_changed"):
sync_result_builder, ignored_users room_changes = await self._get_rooms_changed(
) sync_result_builder, ignored_users
tags_by_room = await self.store.get_updated_tags( )
user_id, since_token.account_data_key
) with start_active_span("get_updated_tags"):
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else: else:
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
@ -1547,7 +1574,8 @@ class SyncHandler:
) )
logger.debug("Generated room entry for %s", room_entry.room_id) logger.debug("Generated room entry for %s", room_entry.room_id)
await concurrently_execute(handle_room_entries, room_entries, 10) with start_active_span("handle_room_entries"):
await concurrently_execute(handle_room_entries, room_entries, 10)
sync_result_builder.invited.extend(invited) sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked) sync_result_builder.knocked.extend(knocked)

View file

@ -14,13 +14,14 @@
import functools import functools
import gc import gc
import inspect
import itertools import itertools
import logging import logging
import os import os
import platform import platform
import threading import threading
import time import time
from typing import Callable, Dict, Iterable, Mapping, Optional, Tuple, Union from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
import attr import attr
from prometheus_client import Counter, Gauge, Histogram from prometheus_client import Counter, Gauge, Histogram
@ -32,6 +33,7 @@ from prometheus_client.core import (
) )
from twisted.internet import reactor from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.python.threadpool import ThreadPool from twisted.python.threadpool import ThreadPool
import synapse import synapse
@ -584,6 +586,35 @@ MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
_last_gc = [0.0, 0.0, 0.0] _last_gc = [0.0, 0.0, 0.0]
def callFromThreadTimer(func):
@functools.wraps(func)
def callFromThread(f: Callable[..., Any], *args: object, **kwargs: object) -> None:
@functools.wraps(f)
def g(*args, **kwargs):
callbacks = None
if inspect.ismethod(f):
if isinstance(f.__self__, Deferred):
callbacks = list(f.__self__.callbacks)
start = time.time()
r = f(*args, **kwargs)
end = time.time()
if end - start > 0.5:
logger.warning(
"callFromThread took %f seconds. name: %s, callbacks: %s",
end - start,
f,
callbacks,
)
return r
func(g, *args, **kwargs)
return callFromThread
def runUntilCurrentTimer(reactor, func): def runUntilCurrentTimer(reactor, func):
@functools.wraps(func) @functools.wraps(func)
def f(*args, **kwargs): def f(*args, **kwargs):
@ -606,6 +637,13 @@ def runUntilCurrentTimer(reactor, func):
ret = func(*args, **kwargs) ret = func(*args, **kwargs)
end = time.time() end = time.time()
if end - start > 0.05:
logger.warning(
"runUntilCurrent took %f seconds. num_pending: %d",
end - start,
num_pending,
)
# record the amount of wallclock time spent running pending calls. # record the amount of wallclock time spent running pending calls.
# This is a proxy for the actual amount of time between reactor polls, # This is a proxy for the actual amount of time between reactor polls,
# since about 25% of time is actually spent running things triggered by # since about 25% of time is actually spent running things triggered by
@ -663,6 +701,8 @@ try:
# per iteratation after fd polling. # per iteratation after fd polling.
reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore
reactor.callFromThread = callFromThreadTimer(reactor.callFromThread)
# We manually run the GC each reactor tick so that we can get some metrics # We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC, # about time spent doing GC,
if not running_on_pypy: if not running_on_pypy:

View file

@ -48,6 +48,7 @@ from synapse.logging.context import (
current_context, current_context,
make_deferred_yieldable, make_deferred_yieldable,
) )
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import ( from synapse.metrics.background_process_metrics import (
run_as_background_process, run_as_background_process,
wrap_as_background_process, wrap_as_background_process,
@ -391,6 +392,7 @@ class EventsWorkerStore(SQLBaseStore):
return {e.event_id: e for e in events} return {e.event_id: e for e in events}
@trace
async def get_events_as_list( async def get_events_as_list(
self, self,
event_ids: Collection[str], event_ids: Collection[str],