Report state res metrics to Prometheus and log

This commit is contained in:
Richard van der Hoff 2020-09-29 13:07:45 +01:00
parent 8412c08a87
commit 057f04fa9f

View file

@ -13,42 +13,46 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import heapq
import logging import logging
from collections import namedtuple from collections import defaultdict, namedtuple
from typing import ( from typing import (
Any,
Awaitable, Awaitable,
Callable,
DefaultDict,
Dict, Dict,
Iterable, Iterable,
List, List,
Optional, Optional,
Sequence, Sequence,
Set, Set,
Tuple,
Union, Union,
overload, overload,
) )
import attr import attr
from frozendict import frozendict from frozendict import frozendict
from prometheus_client import Histogram from prometheus_client import Counter, Histogram
from typing_extensions import Literal from typing_extensions import Literal
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.state import v1, v2 from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo from synapse.storage.roommember import ProfileInfo
from synapse.types import Collection, StateMap from synapse.types import Collection, StateMap
from synapse.util import Clock
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics_logger = logging.getLogger("synapse.state.metrics")
# Metrics for number of state groups involved in a resolution. # Metrics for number of state groups involved in a resolution.
state_groups_histogram = Histogram( state_groups_histogram = Histogram(
@ -459,6 +463,33 @@ class StateHandler:
return {key: state_map[ev_id] for key, ev_id in new_state.items()} return {key: state_map[ev_id] for key, ev_id in new_state.items()}
@attr.s(slots=True)
class _StateResMetrics:
"""Keeps track of some usage metrics about state res."""
# System and User CPU time, in seconds
cpu_time = attr.ib(type=float, default=0.0)
# time spent on database transactions (excluding scheduling time). This roughly
# corresponds to the amount of work done on the db server, excluding event fetches.
db_time = attr.ib(type=float, default=0.0)
# number of events fetched from the db.
db_events = attr.ib(type=int, default=0)
_biggest_room_by_cpu_counter = Counter(
"synapse_state_res_cpu_for_biggest_room_seconds",
"CPU time spent performing state resolution for the single most expensive "
"room for state resolution",
)
_biggest_room_by_db_counter = Counter(
"synapse_state_res_db_for_biggest_room_seconds",
"Database time spent performing state resolution for the single most "
"expensive room for state resolution",
)
class StateResolutionHandler: class StateResolutionHandler:
"""Responsible for doing state conflict resolution. """Responsible for doing state conflict resolution.
@ -481,6 +512,17 @@ class StateResolutionHandler:
reset_expiry_on_get=True, reset_expiry_on_get=True,
) )
#
# stuff for tracking time spent on state-res by room
#
# tracks the amount of work done on state res per room
self._state_res_metrics = defaultdict(
_StateResMetrics
) # type: DefaultDict[str, _StateResMetrics]
self.clock.looping_call(self._report_metrics, 120 * 1000)
@log_function @log_function
async def resolve_state_groups( async def resolve_state_groups(
self, self,
@ -578,21 +620,83 @@ class StateResolutionHandler:
Returns: Returns:
a map from (type, state_key) to event_id. a map from (type, state_key) to event_id.
""" """
with Measure(self.clock, "state._resolve_events"): try:
v = KNOWN_ROOM_VERSIONS[room_version] with Measure(self.clock, "state._resolve_events") as m:
if v.state_res == StateResolutionVersions.V1: v = KNOWN_ROOM_VERSIONS[room_version]
return await v1.resolve_events_with_store( if v.state_res == StateResolutionVersions.V1:
room_id, state_sets, event_map, state_res_store.get_events return await v1.resolve_events_with_store(
) room_id, state_sets, event_map, state_res_store.get_events
else: )
return await v2.resolve_events_with_store( else:
self.clock, return await v2.resolve_events_with_store(
room_id, self.clock,
room_version, room_id,
state_sets, room_version,
event_map, state_sets,
state_res_store, event_map,
) state_res_store,
)
finally:
self._record_state_res_metrics(room_id, m.get_resource_usage())
def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage):
room_metrics = self._state_res_metrics[room_id]
room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime
room_metrics.db_time += rusage.db_txn_duration_sec
room_metrics.db_events += rusage.evt_db_fetch_count
def _report_metrics(self):
if not self._state_res_metrics:
# no state res has happened since the last iteration: don't bother logging.
return
self._report_biggest(
lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter,
)
self._report_biggest(
lambda i: i.db_time, "DB time", _biggest_room_by_db_counter,
)
self._state_res_metrics.clear()
def _report_biggest(
self,
extract_key: Callable[[_StateResMetrics], Any],
metric_name: str,
prometheus_counter_metric: Counter,
) -> None:
"""Report metrics on the biggest rooms for state res
Args:
extract_key: a callable which, given a _StateResMetrics, extracts a single
metric to sort by.
metric_name: the name of the metric we have extracted, for the log line
prometheus_counter_metric: a prometheus metric recording the sum of the
the extracted metric
"""
n_to_log = 10
if not metrics_logger.isEnabledFor(logging.DEBUG):
# only need the most expensive if we don't have debug logging, which
# allows nlargest() to degrade to max()
n_to_log = 1
items = self._state_res_metrics.items()
# log the N biggest rooms
biggest = heapq.nlargest(
n_to_log, items, key=lambda i: extract_key(i[1])
) # type: List[Tuple[str, _StateResMetrics]]
metrics_logger.debug(
"%i biggest rooms for state-res by %s: %s",
len(biggest),
metric_name,
["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest],
)
# report info on the single biggest to prometheus
_, biggest_metrics = biggest[0]
prometheus_counter_metric.inc(extract_key(biggest_metrics))
def _make_state_cache_entry( def _make_state_cache_entry(