rest of the changes

This commit is contained in:
Amber Brown 2018-05-21 19:48:57 -05:00
parent df9f72d9e5
commit fcc525b0b7
8 changed files with 67 additions and 336 deletions

View file

@ -16,86 +16,38 @@
import logging import logging
import synapse.metrics from prometheus_client.core import Counter, Histogram
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for("synapse.http.server")
# total number of responses served, split by method/servlet/tag # total number of responses served, split by method/servlet/tag
response_count = metrics.register_counter( response_count = Counter("synapse_http_server_response_count", "", ["method", "servlet", "tag"])
"response_count",
labels=["method", "servlet", "tag"],
alternative_names=(
# the following are all deprecated aliases for the same metric
metrics.name_prefix + x for x in (
"_requests",
"_response_time:count",
"_response_ru_utime:count",
"_response_ru_stime:count",
"_response_db_txn_count:count",
"_response_db_txn_duration:count",
)
)
)
requests_counter = metrics.register_counter( requests_counter = Counter("synapse_http_server_requests_received", "", ["method", "servlet"])
"requests_received",
labels=["method", "servlet", ],
)
outgoing_responses_counter = metrics.register_counter( outgoing_responses_counter = Counter("synapse_http_server_responses", "", ["method", "code"])
"responses",
labels=["method", "code"],
)
response_timer = metrics.register_counter( response_timer = Histogram("synapse_http_server_response_time_seconds", "", ["method", "servlet", "tag"])
"response_time_seconds",
labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_time:total",
),
)
response_ru_utime = metrics.register_counter( response_ru_utime = Counter("synapse_http_server_response_ru_utime_seconds", "", ["method", "servlet", "tag"])
"response_ru_utime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_utime:total",
),
)
response_ru_stime = metrics.register_counter( response_ru_stime = Counter("synapse_http_server_response_ru_stime_seconds", "", ["method", "servlet", "tag"])
"response_ru_stime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_stime:total",
),
)
response_db_txn_count = metrics.register_counter( response_db_txn_count = Counter("synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"])
"response_db_txn_count", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_count:total",
),
)
# seconds spent waiting for db txns, excluding scheduling time, when processing # seconds spent waiting for db txns, excluding scheduling time, when processing
# this request # this request
response_db_txn_duration = metrics.register_counter( response_db_txn_duration = Counter("synapse_http_server_response_db_txn_duration_seconds", "", ["method", "servlet", "tag"])
"response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_duration:total",
),
)
# seconds spent waiting for a db connection, when processing this request # seconds spent waiting for a db connection, when processing this request
response_db_sched_duration = metrics.register_counter( response_db_sched_duration = Counter("synapse_http_request_response_db_sched_duration_seconds", "", ["method", "servlet", "tag"]
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
) )
# size in bytes of the response written # size in bytes of the response written
response_size = metrics.register_counter( response_size = Counter("synapse_http_request_response_size", "", ["method", "servlet", "tag"]
"response_size", labels=["method", "servlet", "tag"]
) )
@ -119,31 +71,19 @@ class RequestMetrics(object):
) )
return return
outgoing_responses_counter.inc(request.method, str(request.code)) outgoing_responses_counter.labels(request.method, str(request.code)).inc()
response_count.inc(request.method, self.name, tag) response_count.labels(request.method, self.name, tag).inc()
response_timer.inc_by( response_timer.labels(request.method, self.name, tag).observe(time_msec - self.start)
time_msec - self.start, request.method,
self.name, tag
)
ru_utime, ru_stime = context.get_resource_usage() ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by( response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime)
ru_utime, request.method, self.name, tag response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime)
) response_db_txn_count.labels(request.method, self.name, tag).inc(context.db_txn_count)
response_ru_stime.inc_by( response_db_txn_duration.labels(request.method, self.name, tag).inc(context.db_txn_duration_ms / 1000.)
ru_stime, request.method, self.name, tag response_db_sched_duration.labels(request.method, self.name, tag).inc(
) context.db_sched_duration_ms / 1000.)
response_db_txn_count.inc_by(
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
context.db_txn_duration_ms / 1000., request.method, self.name, tag
)
response_db_sched_duration.inc_by(
context.db_sched_duration_ms / 1000., request.method, self.name, tag
)
response_size.inc_by(request.sentLength, request.method, self.name, tag) response_size.labels(request.method, self.name, tag).inc(request.sentLength)

View file

@ -138,8 +138,8 @@ def wrap_request_handler_with_logging(h):
# dispatching to the handler, so that the handler # dispatching to the handler, so that the handler
# can update the servlet name in the request # can update the servlet name in the request
# metrics # metrics
requests_counter.inc(request.method, requests_counter.labels(request.method,
request.request_metrics.name) request.request_metrics.name).inc()
yield d yield d
return wrapped_request_handler return wrapped_request_handler

View file

@ -22,35 +22,29 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.event_auth import get_user_power_level from synapse.event_auth import get_user_power_level
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.metrics import get_metrics_for from synapse.util.caches import register_cache
from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.state import POWER_KEY from synapse.state import POWER_KEY
from collections import namedtuple from collections import namedtuple
from prometheus_client import Counter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
rules_by_room = {} rules_by_room = {}
push_metrics = get_metrics_for(__name__)
push_rules_invalidation_counter = push_metrics.register_counter( push_rules_invalidation_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_invalidation_counter", "")
"push_rules_invalidation_counter" push_rules_state_size_counter = Counter("synapse_push_bulk_push_role_evaluator_push_rules_state_size_counter", "")
)
push_rules_state_size_counter = push_metrics.register_counter(
"push_rules_state_size_counter"
)
# Measures whether we use the fast path of using state deltas, or if we have to # Measures whether we use the fast path of using state deltas, or if we have to
# recalculate from scratch # recalculate from scratch
push_rules_delta_state_cache_metric = cache_metrics.register_cache( push_rules_delta_state_cache_metric = register_cache(
"cache", "cache",
size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values "push_rules_delta_state_cache_metric",
cache_name="push_rules_delta_state_cache_metric", cache=[], # Meaningless size, as this isn't a cache that stores values
) )
@ -64,10 +58,10 @@ class BulkPushRuleEvaluator(object):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.room_push_rule_cache_metrics = cache_metrics.register_cache( self.room_push_rule_cache_metrics = register_cache(
"cache", "cache",
size_callback=lambda: 0, # There's not good value for this "room_push_rule_cache",
cache_name="room_push_rule_cache", cache=[], # Meaningless size, as this isn't a cache that stores values
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -309,7 +303,7 @@ class RulesForRoom(object):
current_state_ids = context.current_state_ids current_state_ids = context.current_state_ids
push_rules_delta_state_cache_metric.inc_misses() push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc_by(len(current_state_ids)) push_rules_state_size_counter.inc(len(current_state_ids))
logger.debug( logger.debug(
"Looking for member changes in %r %r", state_group, current_state_ids "Looking for member changes in %r %r", state_group, current_state_ids

View file

@ -22,20 +22,19 @@ from .streams import STREAMS_MAP, FederationStream
from .protocol import ServerReplicationStreamProtocol from .protocol import ServerReplicationStreamProtocol
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure, measure_func
from synapse.metrics import LaterGauge
import logging import logging
import synapse.metrics
from prometheus_client import Counter
metrics = synapse.metrics.get_metrics_for(__name__) stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
stream_updates_counter = metrics.register_counter(
"stream_updates", labels=["stream_name"]
) )
user_sync_counter = metrics.register_counter("user_sync") user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = metrics.register_counter("federation_ack") federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = metrics.register_counter("remove_pusher") remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
invalidate_cache_counter = metrics.register_counter("invalidate_cache") invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", "")
user_ip_cache_counter = metrics.register_counter("user_ip_cache") user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -73,7 +72,8 @@ class ReplicationStreamer(object):
# Current connections. # Current connections.
self.connections = [] self.connections = []
metrics.register_callback("total_connections", lambda: len(self.connections)) l = LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], lambda: len(self.connections))
l.register()
# List of streams that clients can subscribe to. # List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been # We only support federation stream if federation sending hase been
@ -85,17 +85,15 @@ class ReplicationStreamer(object):
self.streams_by_name = {stream.NAME: stream for stream in self.streams} self.streams_by_name = {stream.NAME: stream for stream in self.streams}
metrics.register_callback( LaterGauge(
"connections_per_stream", "synapse_replication_tcp_resource_connections_per_stream", "", ["stream_name"],
lambda: { lambda: {
(stream_name,): len([ (stream_name,): len([
conn for conn in self.connections conn for conn in self.connections
if stream_name in conn.replication_streams if stream_name in conn.replication_streams
]) ])
for stream_name in self.streams_by_name for stream_name in self.streams_by_name
}, }).register()
labels=["stream_name"],
)
self.federation_sender = None self.federation_sender = None
if not hs.config.send_federation: if not hs.config.send_federation:
@ -175,7 +173,7 @@ class ReplicationStreamer(object):
logger.info( logger.info(
"Streaming: %s -> %s", stream.NAME, updates[-1][0] "Streaming: %s -> %s", stream.NAME, updates[-1][0]
) )
stream_updates_counter.inc_by(len(updates), stream.NAME) stream_updates_counter.labels(stream.NAME).inc(len(updates))
# Some streams return multiple rows with the same stream IDs, # Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do # we need to make sure they get sent out in batches. We do

View file

@ -18,8 +18,8 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.descriptors import Cache from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
import synapse.metrics
from prometheus_client import Histogram
from twisted.internet import defer from twisted.internet import defer
@ -34,13 +34,10 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn") transaction_logger = logging.getLogger("synapse.storage.txn")
perf_logger = logging.getLogger("synapse.storage.TIME") perf_logger = logging.getLogger("synapse.storage.TIME")
sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "")
metrics = synapse.metrics.get_metrics_for("synapse.storage") sql_query_timer = Histogram("synapse_storage_query_time", "", ["verb"])
sql_txn_timer = Histogram("synapse_storage_transaction_time", "", ["desc"])
sql_scheduling_timer = metrics.register_distribution("schedule_time")
sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
class LoggingTransaction(object): class LoggingTransaction(object):
@ -117,7 +114,7 @@ class LoggingTransaction(object):
finally: finally:
msecs = (time.time() * 1000) - start msecs = (time.time() * 1000) - start
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
sql_query_timer.inc_by(msecs, sql.split()[0]) sql_query_timer.labels(sql.split()[0]).observe(msecs)
class PerformanceCounters(object): class PerformanceCounters(object):
@ -287,7 +284,7 @@ class SQLBaseStore(object):
self._current_txn_total_time += duration self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end) self._txn_perf_counters.update(desc, start, end)
sql_txn_timer.inc_by(duration, desc) sql_txn_timer.labels(desc).observe(duration)
@defer.inlineCallbacks @defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs): def runInteraction(self, desc, func, *args, **kwargs):
@ -349,7 +346,7 @@ class SQLBaseStore(object):
def inner_func(conn, *args, **kwargs): def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection") as context: with LoggingContext("runWithConnection") as context:
sched_duration_ms = time.time() * 1000 - start_time sched_duration_ms = time.time() * 1000 - start_time
sql_scheduling_timer.inc_by(sched_duration_ms) sql_scheduling_timer.observe(sched_duration_ms)
current_context.add_database_scheduled(sched_duration_ms) current_context.add_database_scheduled(sched_duration_ms)
if self.database_engine.is_connection_closed(conn): if self.database_engine.is_connection_closed(conn):

View file

@ -40,30 +40,24 @@ import synapse.metrics
from synapse.events import EventBase # noqa: F401 from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from prometheus_client import Counter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
metrics = synapse.metrics.get_metrics_for(__name__) event_counter = Counter("synapse_storage_events_persisted_events_sep", "", ["type", "origin_type", "origin_entity"])
persist_event_counter = metrics.register_counter("persisted_events")
event_counter = metrics.register_counter(
"persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
)
# The number of times we are recalculating the current state # The number of times we are recalculating the current state
state_delta_counter = metrics.register_counter( state_delta_counter = Counter("synapse_storage_events_state_delta", "")
"state_delta",
)
# The number of times we are recalculating state when there is only a # The number of times we are recalculating state when there is only a
# single forward extremity # single forward extremity
state_delta_single_event_counter = metrics.register_counter( state_delta_single_event_counter = Counter("synapse_storage_events_state_delta_single_event", "")
"state_delta_single_event",
)
# The number of times we are reculating state when we could have resonably # The number of times we are reculating state when we could have resonably
# calculated the delta when we calculated the state for an event we were # calculated the delta when we calculated the state for an event we were
# persisting. # persisting.
state_delta_reuse_delta_counter = metrics.register_counter( state_delta_reuse_delta_counter = Counter("synapse_storage_events_state_delta_reuse_delta", "")
"state_delta_reuse_delta",
)
def encode_json(json_object): def encode_json(json_object):
@ -445,7 +439,7 @@ class EventsStore(EventsWorkerStore):
state_delta_for_room=state_delta_for_room, state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties, new_forward_extremeties=new_forward_extremeties,
) )
persist_event_counter.inc_by(len(chunk)) persist_event_counter.inc(len(chunk))
synapse.metrics.event_persisted_position.set( synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering, chunk[-1][0].internal_metadata.stream_ordering,
) )
@ -460,7 +454,7 @@ class EventsStore(EventsWorkerStore):
origin_type = "remote" origin_type = "remote"
origin_entity = get_domain_from_id(event.sender) origin_entity = get_domain_from_id(event.sender)
event_counter.inc(event.type, origin_type, origin_entity) event_counter.labels(event.type, origin_type, origin_entity).inc()
for room_id, new_state in current_state_for_room.iteritems(): for room_id, new_state in current_state_for_room.iteritems():
self.get_current_state_ids.prefill( self.get_current_state_ids.prefill(

View file

@ -1,192 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests import unittest
from synapse.metrics.metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
_escape_label_value,
)
class CounterMetricTestCase(unittest.TestCase):
def test_scalar(self):
counter = CounterMetric("scalar")
self.assertEquals(counter.render(), [
'scalar 0',
])
counter.inc()
self.assertEquals(counter.render(), [
'scalar 1',
])
counter.inc_by(2)
self.assertEquals(counter.render(), [
'scalar 3'
])
def test_vector(self):
counter = CounterMetric("vector", labels=["method"])
# Empty counter doesn't yet know what values it has
self.assertEquals(counter.render(), [])
counter.inc("GET")
self.assertEquals(counter.render(), [
'vector{method="GET"} 1',
])
counter.inc("GET")
counter.inc("PUT")
self.assertEquals(counter.render(), [
'vector{method="GET"} 2',
'vector{method="PUT"} 1',
])
class CallbackMetricTestCase(unittest.TestCase):
def test_scalar(self):
d = dict()
metric = CallbackMetric("size", lambda: len(d))
self.assertEquals(metric.render(), [
'size 0',
])
d["key"] = "value"
self.assertEquals(metric.render(), [
'size 1',
])
def test_vector(self):
vals = dict()
metric = CallbackMetric("values", lambda: vals, labels=["type"])
self.assertEquals(metric.render(), [])
# Keys have to be tuples, even if they're 1-element
vals[("foo",)] = 1
vals[("bar",)] = 2
self.assertEquals(metric.render(), [
'values{type="bar"} 2',
'values{type="foo"} 1',
])
class DistributionMetricTestCase(unittest.TestCase):
def test_scalar(self):
metric = DistributionMetric("thing")
self.assertEquals(metric.render(), [
'thing:count 0',
'thing:total 0',
])
metric.inc_by(500)
self.assertEquals(metric.render(), [
'thing:count 1',
'thing:total 500',
])
def test_vector(self):
metric = DistributionMetric("queries", labels=["verb"])
self.assertEquals(metric.render(), [])
metric.inc_by(300, "SELECT")
metric.inc_by(200, "SELECT")
metric.inc_by(800, "INSERT")
self.assertEquals(metric.render(), [
'queries:count{verb="INSERT"} 1',
'queries:count{verb="SELECT"} 2',
'queries:total{verb="INSERT"} 800',
'queries:total{verb="SELECT"} 500',
])
class CacheMetricTestCase(unittest.TestCase):
def test_cache(self):
d = dict()
metric = CacheMetric("cache", lambda: len(d), "cache_name")
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 0',
'cache:total{name="cache_name"} 0',
'cache:size{name="cache_name"} 0',
'cache:evicted_size{name="cache_name"} 0',
])
metric.inc_misses()
d["key"] = "value"
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 0',
'cache:total{name="cache_name"} 1',
'cache:size{name="cache_name"} 1',
'cache:evicted_size{name="cache_name"} 0',
])
metric.inc_hits()
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 1',
'cache:total{name="cache_name"} 2',
'cache:size{name="cache_name"} 1',
'cache:evicted_size{name="cache_name"} 0',
])
metric.inc_evictions(2)
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 1',
'cache:total{name="cache_name"} 2',
'cache:size{name="cache_name"} 1',
'cache:evicted_size{name="cache_name"} 2',
])
class LabelValueEscapeTestCase(unittest.TestCase):
def test_simple(self):
string = "safjhsdlifhyskljfksdfh"
self.assertEqual(string, _escape_label_value(string))
def test_escape(self):
self.assertEqual(
"abc\\\"def\\nghi\\\\",
_escape_label_value("abc\"def\nghi\\"),
)
def test_sequence_of_escapes(self):
self.assertEqual(
"abc\\\"def\\nghi\\\\\\n",
_escape_label_value("abc\"def\nghi\\\n"),
)