Compare commits

...

10 commits

Author SHA1 Message Date
Erik Johnston c5422e2ec1 Log callbacks 2021-11-08 14:52:51 +00:00
Erik Johnston f2d2842f56 Fixup 2021-11-08 14:35:26 +00:00
Erik Johnston 61e28c41cd Fixup 2021-11-08 14:33:58 +00:00
Erik Johnston 49c9424c0e install timers 2021-11-08 14:32:48 +00:00
Erik Johnston 50b8170181 Log long callFromThread 2021-11-08 14:29:19 +00:00
Erik Johnston da78f01a64 Log slow ticks 2021-11-08 14:11:39 +00:00
Erik Johnston 1783084fd9 More tracing 2021-11-08 11:41:29 +00:00
Erik Johnston 5e88964994 Trace event fetches 2021-11-08 11:35:27 +00:00
Erik Johnston 58864265d1 Merge remote-tracking branch 'origin/develop' into erikj/slow_sync_diag 2021-11-08 11:34:03 +00:00
Erik Johnston ee223fc7b9 Add spans for sync 2021-11-01 15:15:23 +00:00
3 changed files with 124 additions and 54 deletions

View file

@ -34,7 +34,13 @@ from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
@ -483,6 +489,7 @@ class SyncHandler:
return now_token, ephemeral_by_room
@trace
async def _load_filtered_recents(
self,
room_id: str,
@ -639,6 +646,7 @@ class SyncHandler:
state_ids[(event.type, event.state_key)] = event.event_id
return state_ids
@trace
async def get_state_at(
self,
room_id: str,
@ -671,6 +679,7 @@ class SyncHandler:
state = {}
return state
@trace
async def compute_summary(
self,
room_id: str,
@ -819,6 +828,7 @@ class SyncHandler:
logger.debug("found LruCache for %r", cache_key)
return cache
@trace
async def compute_state_delta(
self,
room_id: str,
@ -1064,9 +1074,10 @@ class SyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
user_id, now_token.room_key
)
with start_active_span("get_rooms_for_user_at"):
joined_room_ids = await self.get_rooms_for_user_at(
user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
@ -1077,15 +1088,18 @@ class SyncHandler:
logger.debug("Fetching account data")
account_data_by_room = await self._generate_sync_entry_for_account_data(
sync_result_builder
)
with start_active_span("_generate_sync_entry_for_account_data"):
account_data_by_room = await self._generate_sync_entry_for_account_data(
sync_result_builder
)
logger.debug("Fetching room data")
res = await self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)
with start_active_span("_generate_sync_entry_for_rooms"):
res = await self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)
newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res
_, _, newly_left_rooms, newly_left_users = res
@ -1094,22 +1108,25 @@ class SyncHandler:
)
if self.hs_config.server.use_presence and not block_all_presence_data:
logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence(
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
)
with start_active_span("_generate_sync_entry_for_presence"):
await self._generate_sync_entry_for_presence(
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
)
logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder)
with start_active_span("_generate_sync_entry_for_to_device"):
await self._generate_sync_entry_for_to_device(sync_result_builder)
device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)
with start_active_span("_generate_sync_entry_for_device_list"):
device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)
logger.debug("Fetching OTK data")
device_id = sync_config.device_id
@ -1120,15 +1137,19 @@ class SyncHandler:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = (
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)
with start_active_span("count_e2e_one_time_keys"):
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = (
await self.store.get_e2e_unused_fallback_key_types(
user_id, device_id
)
)
logger.debug("Fetching group data")
await self._generate_sync_entry_for_groups(sync_result_builder)
with start_active_span("_generate_sync_entry_for_groups"):
await self._generate_sync_entry_for_groups(sync_result_builder)
num_events = 0
@ -1478,12 +1499,13 @@ class SyncHandler:
if block_all_room_ephemeral:
ephemeral_by_room: Dict[str, List[JsonDict]] = {}
else:
now_token, ephemeral_by_room = await self.ephemeral_by_room(
sync_result_builder,
now_token=sync_result_builder.now_token,
since_token=sync_result_builder.since_token,
)
sync_result_builder.now_token = now_token
with start_active_span("ephemeral_by_room"):
now_token, ephemeral_by_room = await self.ephemeral_by_room(
sync_result_builder,
now_token=sync_result_builder.now_token,
since_token=sync_result_builder.since_token,
)
sync_result_builder.now_token = now_token
# We check up front if anything has changed, if it hasn't then there is
# no point in going further.
@ -1493,18 +1515,20 @@ class SyncHandler:
have_changed = await self._have_rooms_changed(sync_result_builder)
log_kv({"rooms_have_changed": have_changed})
if not have_changed:
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
if not tags_by_room:
logger.debug("no-oping sync")
return set(), set(), set(), set()
with start_active_span("get_updated_tags"):
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
if not tags_by_room:
logger.debug("no-oping sync")
return set(), set(), set(), set()
ignored_account_data = (
await self.store.get_global_account_data_by_type_for_user(
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
with start_active_span("get_global_account_data_by_type_for_user"):
ignored_account_data = (
await self.store.get_global_account_data_by_type_for_user(
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
)
)
)
# If there is ignored users account data and it matches the proper type,
# then use it.
@ -1515,12 +1539,15 @@ class SyncHandler:
ignored_users = frozenset(ignored_users_data.keys())
if since_token:
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
with start_active_span("_get_rooms_changed"):
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
)
with start_active_span("get_updated_tags"):
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
@ -1547,7 +1574,8 @@ class SyncHandler:
)
logger.debug("Generated room entry for %s", room_entry.room_id)
await concurrently_execute(handle_room_entries, room_entries, 10)
with start_active_span("handle_room_entries"):
await concurrently_execute(handle_room_entries, room_entries, 10)
sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)

View file

@ -14,13 +14,14 @@
import functools
import gc
import inspect
import itertools
import logging
import os
import platform
import threading
import time
from typing import Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
import attr
from prometheus_client import Counter, Gauge, Histogram
@ -32,6 +33,7 @@ from prometheus_client.core import (
)
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.python.threadpool import ThreadPool
import synapse
@ -584,6 +586,35 @@ MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
_last_gc = [0.0, 0.0, 0.0]
def callFromThreadTimer(func):
@functools.wraps(func)
def callFromThread(f: Callable[..., Any], *args: object, **kwargs: object) -> None:
@functools.wraps(f)
def g(*args, **kwargs):
callbacks = None
if inspect.ismethod(f):
if isinstance(f.__self__, Deferred):
callbacks = list(f.__self__.callbacks)
start = time.time()
r = f(*args, **kwargs)
end = time.time()
if end - start > 0.5:
logger.warning(
"callFromThread took %f seconds. name: %s, callbacks: %s",
end - start,
f,
callbacks,
)
return r
func(g, *args, **kwargs)
return callFromThread
def runUntilCurrentTimer(reactor, func):
@functools.wraps(func)
def f(*args, **kwargs):
@ -606,6 +637,13 @@ def runUntilCurrentTimer(reactor, func):
ret = func(*args, **kwargs)
end = time.time()
if end - start > 0.05:
logger.warning(
"runUntilCurrent took %f seconds. num_pending: %d",
end - start,
num_pending,
)
# record the amount of wallclock time spent running pending calls.
# This is a proxy for the actual amount of time between reactor polls,
# since about 25% of time is actually spent running things triggered by
@ -663,6 +701,8 @@ try:
# per iteratation after fd polling.
reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore
reactor.callFromThread = callFromThreadTimer(reactor.callFromThread)
# We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC,
if not running_on_pypy:

View file

@ -48,6 +48,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@ -391,6 +392,7 @@ class EventsWorkerStore(SQLBaseStore):
return {e.event_id: e for e in events}
@trace
async def get_events_as_list(
self,
event_ids: Collection[str],