Merge remote-tracking branch 'origin/develop' into neilj/fix_room_invite_mail_links

This commit is contained in:
Richard van der Hoff 2018-09-18 19:02:45 +01:00
commit 38ead946a9
31 changed files with 351 additions and 98 deletions

View file

@ -908,7 +908,7 @@ to install using pip and a virtualenv::
virtualenv -p python2.7 env virtualenv -p python2.7 env
source env/bin/activate source env/bin/activate
python synapse/python_dependencies.py | xargs pip install python -m synapse.python_dependencies | xargs pip install
pip install lxml mock pip install lxml mock
This will run a process of downloading and installing all the needed This will run a process of downloading and installing all the needed
@ -963,5 +963,13 @@ variable. The default is 0.5, which can be decreased to reduce RAM usage
in memory constrained enviroments, or increased if performance starts to in memory constrained enviroments, or increased if performance starts to
degrade. degrade.
Using `libjemalloc <http://jemalloc.net/>`_ can also yield a significant
improvement in overall amount, and especially in terms of giving back RAM
to the OS. To use it, the library must simply be put in the LD_PRELOAD
environment variable when launching Synapse. On Debian, this can be done
by installing the ``libjemalloc1`` package and adding this line to
``/etc/default/matrix-synapse``::
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1
.. _`key_management`: https://matrix.org/docs/spec/server_server/unstable.html#retrieving-server-keys .. _`key_management`: https://matrix.org/docs/spec/server_server/unstable.html#retrieving-server-keys

1
changelog.d/3860.misc Normal file
View file

@ -0,0 +1 @@
Fix typo in replication stream exception.

1
changelog.d/3871.misc Normal file
View file

@ -0,0 +1 @@
Add in flight real time metrics for Measure blocks

1
changelog.d/3872.misc Normal file
View file

@ -0,0 +1 @@
Disable buffering and automatic retrying in treq requests to prevent timeouts.

0
changelog.d/3874.bugfix Normal file
View file

1
changelog.d/3875.bugfix Normal file
View file

@ -0,0 +1 @@
Mitigate outbound federation randomly becoming wedged

1
changelog.d/3877.misc Normal file
View file

@ -0,0 +1 @@
mention jemalloc in the README

1
changelog.d/3879.bugfix Normal file
View file

@ -0,0 +1 @@
Don't ratelimit autojoins

1
changelog.d/3883.feature Normal file
View file

@ -0,0 +1 @@
Adding the ability to change MAX_UPLOAD_SIZE for the docker container variables.

1
changelog.d/3888.misc Normal file
View file

@ -0,0 +1 @@
Remove unmaintained "nuke-room-from-db.sh" script

1
changelog.d/3889.bugfix Normal file
View file

@ -0,0 +1 @@
Fix 500 error when deleting unknown room alias

1
changelog.d/3892.bugfix Normal file
View file

@ -0,0 +1 @@
Fix some b'abcd' noise in logs and metrics

1
changelog.d/3895.bugfix Normal file
View file

@ -0,0 +1 @@
Fix some b'abcd' noise in logs and metrics

1
changelog.d/3897.misc Normal file
View file

@ -0,0 +1 @@
Fix typo in README, synaspse -> synapse

View file

@ -88,6 +88,7 @@ variables are available for configuration:
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN * ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
uris to enable TURN for this homeserver. uris to enable TURN for this homeserver.
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required. * ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
* ``SYNAPSE_MAX_UPLOAD_SIZE``, set this variable to change the max upload size [default `10M`].
Shared secrets, that will be initialized to random values if not set: Shared secrets, that will be initialized to random values if not set:

View file

@ -85,7 +85,7 @@ federation_rc_concurrent: 3
media_store_path: "/data/media" media_store_path: "/data/media"
uploads_path: "/data/uploads" uploads_path: "/data/uploads"
max_upload_size: "10M" max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "10M" }}"
max_image_pixels: "32M" max_image_pixels: "32M"
dynamic_thumbnails: false dynamic_thumbnails: false

View file

@ -1,57 +0,0 @@
#!/bin/bash
## CAUTION:
## This script will remove (hopefully) all trace of the given room ID from
## your homeserver.db
## Do not run it lightly.
set -e
if [ "$1" == "-h" ] || [ "$1" == "" ]; then
echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run"
echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db"
echo "or"
echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse"
exit
fi
ROOMID="$1"
cat <<EOF
DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_edges WHERE room_id = '$ROOMID';
DELETE FROM room_depth WHERE room_id = '$ROOMID';
DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM events WHERE room_id = '$ROOMID';
DELETE FROM event_json WHERE room_id = '$ROOMID';
DELETE FROM state_events WHERE room_id = '$ROOMID';
DELETE FROM current_state_events WHERE room_id = '$ROOMID';
DELETE FROM room_memberships WHERE room_id = '$ROOMID';
DELETE FROM feedback WHERE room_id = '$ROOMID';
DELETE FROM topics WHERE room_id = '$ROOMID';
DELETE FROM room_names WHERE room_id = '$ROOMID';
DELETE FROM rooms WHERE room_id = '$ROOMID';
DELETE FROM room_hosts WHERE room_id = '$ROOMID';
DELETE FROM room_aliases WHERE room_id = '$ROOMID';
DELETE FROM state_groups WHERE room_id = '$ROOMID';
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
DELETE FROM event_search WHERE room_id = '$ROOMID';
DELETE FROM guest_access WHERE room_id = '$ROOMID';
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
DELETE FROM room_tags WHERE room_id = '$ROOMID';
DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID';
DELETE FROM room_account_data WHERE room_id = '$ROOMID';
DELETE FROM event_push_actions WHERE room_id = '$ROOMID';
DELETE FROM local_invites WHERE room_id = '$ROOMID';
DELETE FROM pusher_throttle WHERE room_id = '$ROOMID';
DELETE FROM event_reports WHERE room_id = '$ROOMID';
DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID';
DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID';
DELETE FROM event_auth WHERE room_id = '$ROOMID';
DELETE FROM appservice_room_list WHERE room_id = '$ROOMID';
VACUUM;
EOF

View file

@ -20,7 +20,14 @@ import string
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError from synapse.api.errors import (
AuthError,
CodeMessageException,
Codes,
NotFoundError,
StoreError,
SynapseError,
)
from synapse.types import RoomAlias, UserID, get_domain_from_id from synapse.types import RoomAlias, UserID, get_domain_from_id
from ._base import BaseHandler from ._base import BaseHandler
@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler):
def delete_association(self, requester, user_id, room_alias): def delete_association(self, requester, user_id, room_alias):
# association deletion for human users # association deletion for human users
can_delete = yield self._user_can_delete_alias(room_alias, user_id) try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown room alias")
raise
if not can_delete: if not can_delete:
raise AuthError( raise AuthError(
403, "You don't have permission to delete the alias.", 403, "You don't have permission to delete the alias.",
@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler):
def _user_can_delete_alias(self, alias, user_id): def _user_can_delete_alias(self, alias, user_id):
creator = yield self.store.get_room_alias_creator(alias.to_string()) creator = yield self.store.get_room_alias_creator(alias.to_string())
if creator and creator == user_id: if creator is not None and creator == user_id:
defer.returnValue(True) defer.returnValue(True)
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))

View file

@ -269,14 +269,7 @@ class PaginationHandler(object):
if state_ids: if state_ids:
state = yield self.store.get_events(list(state_ids.values())) state = yield self.store.get_events(list(state_ids.values()))
state = state.values()
if state:
state = yield filter_events_for_client(
self.store,
user_id,
state.values(),
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec() time_now = self.clock.time_msec()

View file

@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler):
room_id=room_id, room_id=room_id,
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
action="join", action="join",
ratelimit=False,
) )

View file

@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout):
return value return value
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri): def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>""" """Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub( return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3', r'\1<redacted>\3',
uri uri
) )

View file

@ -93,7 +93,7 @@ class SimpleHttpClient(object):
outgoing_requests_counter.labels(method).inc() outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this) # log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii'))) logger.info("Sending request %s %s", method, redact_uri(uri))
try: try:
request_deferred = treq.request( request_deferred = treq.request(
@ -108,14 +108,14 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, response.code).inc() incoming_responses_counter.labels(method, response.code).inc()
logger.info( logger.info(
"Received response to %s %s: %s", "Received response to %s %s: %s",
method, redact_uri(uri.encode('ascii')), response.code method, redact_uri(uri), response.code
) )
defer.returnValue(response) defer.returnValue(response)
except Exception as e: except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc() incoming_responses_counter.labels(method, "ERR").inc()
logger.info( logger.info(
"Error sending request to %s %s: %s %s", "Error sending request to %s %s: %s %s",
method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0] method, redact_uri(uri), type(e).__name__, e.args[0]
) )
raise raise

View file

@ -42,7 +42,9 @@ from synapse.api.errors import (
) )
from synapse.http.endpoint import matrix_federation_endpoint from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext from synapse.util import logcontext
from synapse.util.async_helpers import timeout_no_seriously
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound") outbound_logger = logging.getLogger("synapse.http.outbound")
@ -91,6 +93,7 @@ class MatrixFederationHttpClient(object):
self.server_name = hs.hostname self.server_name = hs.hostname
reactor = hs.get_reactor() reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor) pool = HTTPConnectionPool(reactor)
pool.retryAutomatically = False
pool.maxPersistentPerHost = 5 pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60 pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory( self.agent = Agent.usingEndpointFactory(
@ -221,14 +224,30 @@ class MatrixFederationHttpClient(object):
headers=Headers(headers_dict), headers=Headers(headers_dict),
data=data, data=data,
agent=self.agent, agent=self.agent,
reactor=self.hs.get_reactor() reactor=self.hs.get_reactor(),
unbuffered=True
) )
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
response = yield make_deferred_yieldable(
# Sometimes the timeout above doesn't work, so lets hack yet
# another layer of timeouts in in the vain hope that at some
# point the world made sense and this really really really
# should work.
request_deferred = timeout_no_seriously(
request_deferred, request_deferred,
timeout=_sec_timeout * 2,
reactor=self.hs.get_reactor(),
) )
log_result = "%d %s" % (response.code, response.phrase,) with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
log_result = "%d %s" % (
response.code,
response.phrase.decode('ascii', errors='replace'),
)
break break
except Exception as e: except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError): if not retry_on_dns_fail and isinstance(e, DNSLookupError):

View file

@ -162,7 +162,7 @@ class RequestMetrics(object):
with _in_flight_requests_lock: with _in_flight_requests_lock:
_in_flight_requests.add(self) _in_flight_requests.add(self)
def stop(self, time_sec, request): def stop(self, time_sec, response_code, sent_bytes):
with _in_flight_requests_lock: with _in_flight_requests_lock:
_in_flight_requests.discard(self) _in_flight_requests.discard(self)
@ -179,35 +179,35 @@ class RequestMetrics(object):
) )
return return
response_code = str(request.code) response_code = str(response_code)
outgoing_responses_counter.labels(request.method, response_code).inc() outgoing_responses_counter.labels(self.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc() response_count.labels(self.method, self.name, tag).inc()
response_timer.labels(request.method, self.name, tag, response_code).observe( response_timer.labels(self.method, self.name, tag, response_code).observe(
time_sec - self.start time_sec - self.start
) )
resource_usage = context.get_resource_usage() resource_usage = context.get_resource_usage()
response_ru_utime.labels(request.method, self.name, tag).inc( response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime, resource_usage.ru_utime,
) )
response_ru_stime.labels(request.method, self.name, tag).inc( response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime, resource_usage.ru_stime,
) )
response_db_txn_count.labels(request.method, self.name, tag).inc( response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count resource_usage.db_txn_count
) )
response_db_txn_duration.labels(request.method, self.name, tag).inc( response_db_txn_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_duration_sec resource_usage.db_txn_duration_sec
) )
response_db_sched_duration.labels(request.method, self.name, tag).inc( response_db_sched_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_sched_duration_sec resource_usage.db_sched_duration_sec
) )
response_size.labels(request.method, self.name, tag).inc(request.sentLength) response_size.labels(self.method, self.name, tag).inc(sent_bytes)
# We always call this at the end to ensure that we update the metrics # We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in # regardless of whether a call to /metrics while the request was in

View file

@ -82,10 +82,13 @@ class SynapseRequest(Request):
) )
def get_request_id(self): def get_request_id(self):
return "%s-%i" % (self.method, self.request_seq) return "%s-%i" % (self.method.decode('ascii'), self.request_seq)
def get_redacted_uri(self): def get_redacted_uri(self):
return redact_uri(self.uri) uri = self.uri
if isinstance(uri, bytes):
uri = self.uri.decode('ascii')
return redact_uri(uri)
def get_user_agent(self): def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@ -116,7 +119,7 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler # dispatching to the handler, so that the handler
# can update the servlet name in the request # can update the servlet name in the request
# metrics # metrics
requests_counter.labels(self.method, requests_counter.labels(self.method.decode('ascii'),
self.request_metrics.name).inc() self.request_metrics.name).inc()
@contextlib.contextmanager @contextlib.contextmanager
@ -277,15 +280,15 @@ class SynapseRequest(Request):
int(usage.db_txn_count), int(usage.db_txn_count),
self.sentLength, self.sentLength,
code, code,
self.method, self.method.decode('ascii'),
self.get_redacted_uri(), self.get_redacted_uri(),
self.clientproto, self.clientproto.decode('ascii', errors='replace'),
user_agent, user_agent,
usage.evt_db_fetch_count, usage.evt_db_fetch_count,
) )
try: try:
self.request_metrics.stop(self.finish_time, self) self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e: except Exception as e:
logger.warn("Failed to stop metrics: %r", e) logger.warn("Failed to stop metrics: %r", e)

View file

@ -18,8 +18,11 @@ import gc
import logging import logging
import os import os
import platform import platform
import threading
import time import time
import six
import attr import attr
from prometheus_client import Counter, Gauge, Histogram from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily from prometheus_client.core import REGISTRY, GaugeMetricFamily
@ -68,7 +71,7 @@ class LaterGauge(object):
return return
if isinstance(calls, dict): if isinstance(calls, dict):
for k, v in calls.items(): for k, v in six.iteritems(calls):
g.add_metric(k, v) g.add_metric(k, v)
else: else:
g.add_metric([], calls) g.add_metric([], calls)
@ -87,6 +90,109 @@ class LaterGauge(object):
all_gauges[self.name] = self all_gauges[self.name] = self
class InFlightGauge(object):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Each InFlightGauge will create a metric called `<name>_total` that counts
the number of in flight blocks, as well as a metrics for each item in the
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
callbacks.
Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
"""
def __init__(self, name, desc, labels, sub_metrics):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
)
# Counts number of in flight blocks for a given set of label values
self._registrations = {}
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
def register(self, key, callback):
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
value must also be given to `unregister`.
`callback` gets called with an object that has an attribute per
sub_metric, which should be updated with the necessary values. Note that
the metrics object is shared between all callbacks registered with the
same key.
Note that `callback` may be called on a separate thread.
"""
with self._lock:
self._registrations.setdefault(key, set()).add(callback)
def unregister(self, key, callback):
"""Registers that we've exited a block with labels `key`.
"""
with self._lock:
self._registrations.setdefault(key, set()).discard(callback)
def collect(self):
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
metrics_by_key = {}
# We copy so that we don't mutate the list while iterating
with self._lock:
keys = list(self._registrations)
for key in keys:
with self._lock:
callbacks = set(self._registrations[key])
in_flight.add_metric(key, len(callbacks))
metrics = self._metrics_class()
metrics_by_key[key] = metrics
for callback in callbacks:
callback(metrics)
yield in_flight
for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
def _register_with_collector(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# #
# Detailed CPU metrics # Detailed CPU metrics
# #

View file

@ -196,7 +196,7 @@ class Stream(object):
) )
if len(rows) >= MAX_EVENTS_BEHIND: if len(rows) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behined" % (self.NAME)) raise Exception("stream %s has fallen behind" % (self.NAME))
else: else:
rows = yield self.update_function( rows = yield self.update_function(
from_token, current_token, from_token, current_token,

View file

@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore):
}, },
retcol="creator", retcol="creator",
desc="get_room_alias_creator", desc="get_room_alias_creator",
allow_none=True
) )
@cached(max_entries=5000) @cached(max_entries=5000)

View file

@ -438,3 +438,55 @@ def _cancelled_to_timed_out_error(value, timeout):
value.trap(CancelledError) value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred") raise DeferredTimeoutError(timeout, "Deferred")
return value return value
def timeout_no_seriously(deferred, timeout, reactor):
"""The in build twisted deferred addTimeout (and the method above)
completely fail to time things out under some unknown circumstances.
Lets try a different way of timing things out and maybe that will make
things work?!
TODO: Kill this with fire.
"""
new_d = defer.Deferred()
timed_out = [False]
def time_it_out():
timed_out[0] = True
if not new_d.called:
new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
deferred.cancel()
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
return _cancelled_to_timed_out_error(value, timeout)
return value
deferred.addBoth(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result
deferred.addBoth(cancel_timeout)
def success_cb(val):
if not new_d.called:
new_d.callback(val)
def failure_cb(val):
if not new_d.called:
new_d.errback(val)
deferred.addCallbacks(success_cb, failure_cb)
return new_d

View file

@ -20,6 +20,7 @@ from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -45,6 +46,13 @@ block_db_txn_duration = Counter(
block_db_sched_duration = Counter( block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
# Tracks the number of blocks currently active
in_flight = InFlightGauge(
"synapse_util_metrics_block_in_flight", "",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
)
def measure_func(name): def measure_func(name):
def wrapper(func): def wrapper(func):
@ -82,10 +90,14 @@ class Measure(object):
self.start_usage = self.start_context.get_resource_usage() self.start_usage = self.start_context.get_resource_usage()
in_flight.register((self.name,), self._update_in_flight)
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context: if isinstance(exc_type, Exception) or not self.start_context:
return return
in_flight.unregister((self.name,), self._update_in_flight)
duration = self.clock.time() - self.start duration = self.clock.time() - self.start
block_counter.labels(self.name).inc() block_counter.labels(self.name).inc()
@ -120,3 +132,13 @@ class Measure(object):
if self.created_context: if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb) self.start_context.__exit__(exc_type, exc_val, exc_tb)
def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics
"""
duration = self.clock.time() - self.start
metrics.real_time_max = max(metrics.real_time_max, duration)
metrics.real_time_sum += duration
# TODO: Add other in flight metrics.

81
tests/test_metrics.py Normal file
View file

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.metrics import InFlightGauge
from tests import unittest
class TestMauLimit(unittest.TestCase):
def test_basic(self):
gauge = InFlightGauge(
"test1", "",
labels=["test_label"],
sub_metrics=["foo", "bar"],
)
def handle1(metrics):
metrics.foo += 2
metrics.bar = max(metrics.bar, 5)
def handle2(metrics):
metrics.foo += 3
metrics.bar = max(metrics.bar, 7)
gauge.register(("key1",), handle1)
self.assert_dict({
"test1_total": {("key1",): 1},
"test1_foo": {("key1",): 2},
"test1_bar": {("key1",): 5},
}, self.get_metrics_from_gauge(gauge))
gauge.unregister(("key1",), handle1)
self.assert_dict({
"test1_total": {("key1",): 0},
"test1_foo": {("key1",): 0},
"test1_bar": {("key1",): 0},
}, self.get_metrics_from_gauge(gauge))
gauge.register(("key1",), handle1)
gauge.register(("key2",), handle2)
self.assert_dict({
"test1_total": {("key1",): 1, ("key2",): 1},
"test1_foo": {("key1",): 2, ("key2",): 3},
"test1_bar": {("key1",): 5, ("key2",): 7},
}, self.get_metrics_from_gauge(gauge))
gauge.unregister(("key2",), handle2)
gauge.register(("key1",), handle2)
self.assert_dict({
"test1_total": {("key1",): 2, ("key2",): 0},
"test1_foo": {("key1",): 5, ("key2",): 0},
"test1_bar": {("key1",): 7, ("key2",): 0},
}, self.get_metrics_from_gauge(gauge))
def get_metrics_from_gauge(self, gauge):
results = {}
for r in gauge.collect():
results[r.name] = {
tuple(labels[x] for x in gauge.labels): value
for _, labels, value in r.samples
}
return results