Compare commits
10 commits
develop
...
erikj/slow
Author | SHA1 | Date | |
---|---|---|---|
c5422e2ec1 | |||
f2d2842f56 | |||
61e28c41cd | |||
49c9424c0e | |||
50b8170181 | |||
da78f01a64 | |||
1783084fd9 | |||
5e88964994 | |||
58864265d1 | |||
ee223fc7b9 |
|
@ -34,7 +34,13 @@ from synapse.api.presence import UserPresenceState
|
|||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.context import current_context
|
||||
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
|
||||
from synapse.logging.opentracing import (
|
||||
SynapseTags,
|
||||
log_kv,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
trace,
|
||||
)
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.storage.roommember import MemberSummary
|
||||
from synapse.storage.state import StateFilter
|
||||
|
@ -483,6 +489,7 @@ class SyncHandler:
|
|||
|
||||
return now_token, ephemeral_by_room
|
||||
|
||||
@trace
|
||||
async def _load_filtered_recents(
|
||||
self,
|
||||
room_id: str,
|
||||
|
@ -639,6 +646,7 @@ class SyncHandler:
|
|||
state_ids[(event.type, event.state_key)] = event.event_id
|
||||
return state_ids
|
||||
|
||||
@trace
|
||||
async def get_state_at(
|
||||
self,
|
||||
room_id: str,
|
||||
|
@ -671,6 +679,7 @@ class SyncHandler:
|
|||
state = {}
|
||||
return state
|
||||
|
||||
@trace
|
||||
async def compute_summary(
|
||||
self,
|
||||
room_id: str,
|
||||
|
@ -819,6 +828,7 @@ class SyncHandler:
|
|||
logger.debug("found LruCache for %r", cache_key)
|
||||
return cache
|
||||
|
||||
@trace
|
||||
async def compute_state_delta(
|
||||
self,
|
||||
room_id: str,
|
||||
|
@ -1064,9 +1074,10 @@ class SyncHandler:
|
|||
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||
raise NotImplementedError()
|
||||
else:
|
||||
joined_room_ids = await self.get_rooms_for_user_at(
|
||||
user_id, now_token.room_key
|
||||
)
|
||||
with start_active_span("get_rooms_for_user_at"):
|
||||
joined_room_ids = await self.get_rooms_for_user_at(
|
||||
user_id, now_token.room_key
|
||||
)
|
||||
sync_result_builder = SyncResultBuilder(
|
||||
sync_config,
|
||||
full_state,
|
||||
|
@ -1077,15 +1088,18 @@ class SyncHandler:
|
|||
|
||||
logger.debug("Fetching account data")
|
||||
|
||||
account_data_by_room = await self._generate_sync_entry_for_account_data(
|
||||
sync_result_builder
|
||||
)
|
||||
with start_active_span("_generate_sync_entry_for_account_data"):
|
||||
account_data_by_room = await self._generate_sync_entry_for_account_data(
|
||||
sync_result_builder
|
||||
)
|
||||
|
||||
logger.debug("Fetching room data")
|
||||
|
||||
res = await self._generate_sync_entry_for_rooms(
|
||||
sync_result_builder, account_data_by_room
|
||||
)
|
||||
with start_active_span("_generate_sync_entry_for_rooms"):
|
||||
res = await self._generate_sync_entry_for_rooms(
|
||||
sync_result_builder, account_data_by_room
|
||||
)
|
||||
|
||||
newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res
|
||||
_, _, newly_left_rooms, newly_left_users = res
|
||||
|
||||
|
@ -1094,22 +1108,25 @@ class SyncHandler:
|
|||
)
|
||||
if self.hs_config.server.use_presence and not block_all_presence_data:
|
||||
logger.debug("Fetching presence data")
|
||||
await self._generate_sync_entry_for_presence(
|
||||
sync_result_builder,
|
||||
newly_joined_rooms,
|
||||
newly_joined_or_invited_or_knocked_users,
|
||||
)
|
||||
with start_active_span("_generate_sync_entry_for_presence"):
|
||||
await self._generate_sync_entry_for_presence(
|
||||
sync_result_builder,
|
||||
newly_joined_rooms,
|
||||
newly_joined_or_invited_or_knocked_users,
|
||||
)
|
||||
|
||||
logger.debug("Fetching to-device data")
|
||||
await self._generate_sync_entry_for_to_device(sync_result_builder)
|
||||
with start_active_span("_generate_sync_entry_for_to_device"):
|
||||
await self._generate_sync_entry_for_to_device(sync_result_builder)
|
||||
|
||||
device_lists = await self._generate_sync_entry_for_device_list(
|
||||
sync_result_builder,
|
||||
newly_joined_rooms=newly_joined_rooms,
|
||||
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
|
||||
newly_left_rooms=newly_left_rooms,
|
||||
newly_left_users=newly_left_users,
|
||||
)
|
||||
with start_active_span("_generate_sync_entry_for_device_list"):
|
||||
device_lists = await self._generate_sync_entry_for_device_list(
|
||||
sync_result_builder,
|
||||
newly_joined_rooms=newly_joined_rooms,
|
||||
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
|
||||
newly_left_rooms=newly_left_rooms,
|
||||
newly_left_users=newly_left_users,
|
||||
)
|
||||
|
||||
logger.debug("Fetching OTK data")
|
||||
device_id = sync_config.device_id
|
||||
|
@ -1120,15 +1137,19 @@ class SyncHandler:
|
|||
# * no change in OTK count since the provided since token
|
||||
# * the server has zero OTKs left for this device
|
||||
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
|
||||
one_time_key_counts = await self.store.count_e2e_one_time_keys(
|
||||
user_id, device_id
|
||||
)
|
||||
unused_fallback_key_types = (
|
||||
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
|
||||
)
|
||||
with start_active_span("count_e2e_one_time_keys"):
|
||||
one_time_key_counts = await self.store.count_e2e_one_time_keys(
|
||||
user_id, device_id
|
||||
)
|
||||
unused_fallback_key_types = (
|
||||
await self.store.get_e2e_unused_fallback_key_types(
|
||||
user_id, device_id
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug("Fetching group data")
|
||||
await self._generate_sync_entry_for_groups(sync_result_builder)
|
||||
with start_active_span("_generate_sync_entry_for_groups"):
|
||||
await self._generate_sync_entry_for_groups(sync_result_builder)
|
||||
|
||||
num_events = 0
|
||||
|
||||
|
@ -1478,12 +1499,13 @@ class SyncHandler:
|
|||
if block_all_room_ephemeral:
|
||||
ephemeral_by_room: Dict[str, List[JsonDict]] = {}
|
||||
else:
|
||||
now_token, ephemeral_by_room = await self.ephemeral_by_room(
|
||||
sync_result_builder,
|
||||
now_token=sync_result_builder.now_token,
|
||||
since_token=sync_result_builder.since_token,
|
||||
)
|
||||
sync_result_builder.now_token = now_token
|
||||
with start_active_span("ephemeral_by_room"):
|
||||
now_token, ephemeral_by_room = await self.ephemeral_by_room(
|
||||
sync_result_builder,
|
||||
now_token=sync_result_builder.now_token,
|
||||
since_token=sync_result_builder.since_token,
|
||||
)
|
||||
sync_result_builder.now_token = now_token
|
||||
|
||||
# We check up front if anything has changed, if it hasn't then there is
|
||||
# no point in going further.
|
||||
|
@ -1493,18 +1515,20 @@ class SyncHandler:
|
|||
have_changed = await self._have_rooms_changed(sync_result_builder)
|
||||
log_kv({"rooms_have_changed": have_changed})
|
||||
if not have_changed:
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, since_token.account_data_key
|
||||
)
|
||||
if not tags_by_room:
|
||||
logger.debug("no-oping sync")
|
||||
return set(), set(), set(), set()
|
||||
with start_active_span("get_updated_tags"):
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, since_token.account_data_key
|
||||
)
|
||||
if not tags_by_room:
|
||||
logger.debug("no-oping sync")
|
||||
return set(), set(), set(), set()
|
||||
|
||||
ignored_account_data = (
|
||||
await self.store.get_global_account_data_by_type_for_user(
|
||||
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
|
||||
with start_active_span("get_global_account_data_by_type_for_user"):
|
||||
ignored_account_data = (
|
||||
await self.store.get_global_account_data_by_type_for_user(
|
||||
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# If there is ignored users account data and it matches the proper type,
|
||||
# then use it.
|
||||
|
@ -1515,12 +1539,15 @@ class SyncHandler:
|
|||
ignored_users = frozenset(ignored_users_data.keys())
|
||||
|
||||
if since_token:
|
||||
room_changes = await self._get_rooms_changed(
|
||||
sync_result_builder, ignored_users
|
||||
)
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, since_token.account_data_key
|
||||
)
|
||||
with start_active_span("_get_rooms_changed"):
|
||||
room_changes = await self._get_rooms_changed(
|
||||
sync_result_builder, ignored_users
|
||||
)
|
||||
|
||||
with start_active_span("get_updated_tags"):
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, since_token.account_data_key
|
||||
)
|
||||
else:
|
||||
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
|
||||
|
||||
|
@ -1547,7 +1574,8 @@ class SyncHandler:
|
|||
)
|
||||
logger.debug("Generated room entry for %s", room_entry.room_id)
|
||||
|
||||
await concurrently_execute(handle_room_entries, room_entries, 10)
|
||||
with start_active_span("handle_room_entries"):
|
||||
await concurrently_execute(handle_room_entries, room_entries, 10)
|
||||
|
||||
sync_result_builder.invited.extend(invited)
|
||||
sync_result_builder.knocked.extend(knocked)
|
||||
|
|
|
@ -14,13 +14,14 @@
|
|||
|
||||
import functools
|
||||
import gc
|
||||
import inspect
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
|
||||
from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from prometheus_client import Counter, Gauge, Histogram
|
||||
|
@ -32,6 +33,7 @@ from prometheus_client.core import (
|
|||
)
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
|
||||
import synapse
|
||||
|
@ -584,6 +586,35 @@ MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
|
|||
_last_gc = [0.0, 0.0, 0.0]
|
||||
|
||||
|
||||
def callFromThreadTimer(func):
|
||||
@functools.wraps(func)
|
||||
def callFromThread(f: Callable[..., Any], *args: object, **kwargs: object) -> None:
|
||||
@functools.wraps(f)
|
||||
def g(*args, **kwargs):
|
||||
callbacks = None
|
||||
if inspect.ismethod(f):
|
||||
if isinstance(f.__self__, Deferred):
|
||||
callbacks = list(f.__self__.callbacks)
|
||||
|
||||
start = time.time()
|
||||
r = f(*args, **kwargs)
|
||||
end = time.time()
|
||||
|
||||
if end - start > 0.5:
|
||||
logger.warning(
|
||||
"callFromThread took %f seconds. name: %s, callbacks: %s",
|
||||
end - start,
|
||||
f,
|
||||
callbacks,
|
||||
)
|
||||
|
||||
return r
|
||||
|
||||
func(g, *args, **kwargs)
|
||||
|
||||
return callFromThread
|
||||
|
||||
|
||||
def runUntilCurrentTimer(reactor, func):
|
||||
@functools.wraps(func)
|
||||
def f(*args, **kwargs):
|
||||
|
@ -606,6 +637,13 @@ def runUntilCurrentTimer(reactor, func):
|
|||
ret = func(*args, **kwargs)
|
||||
end = time.time()
|
||||
|
||||
if end - start > 0.05:
|
||||
logger.warning(
|
||||
"runUntilCurrent took %f seconds. num_pending: %d",
|
||||
end - start,
|
||||
num_pending,
|
||||
)
|
||||
|
||||
# record the amount of wallclock time spent running pending calls.
|
||||
# This is a proxy for the actual amount of time between reactor polls,
|
||||
# since about 25% of time is actually spent running things triggered by
|
||||
|
@ -663,6 +701,8 @@ try:
|
|||
# per iteratation after fd polling.
|
||||
reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore
|
||||
|
||||
reactor.callFromThread = callFromThreadTimer(reactor.callFromThread)
|
||||
|
||||
# We manually run the GC each reactor tick so that we can get some metrics
|
||||
# about time spent doing GC,
|
||||
if not running_on_pypy:
|
||||
|
|
|
@ -48,6 +48,7 @@ from synapse.logging.context import (
|
|||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
|
@ -391,6 +392,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
return {e.event_id: e for e in events}
|
||||
|
||||
@trace
|
||||
async def get_events_as_list(
|
||||
self,
|
||||
event_ids: Collection[str],
|
||||
|
|
Loading…
Reference in a new issue