Merge pull request #562 from matrix-org/erikj/push_metric

Add metrics to pushers
This commit is contained in:
Erik Johnston 2016-02-08 14:57:40 +00:00
commit cca5c06679
3 changed files with 149 additions and 31 deletions

View file

@ -208,6 +208,9 @@ class JsonResource(HttpServer, resource.Resource):
if request.method == "OPTIONS": if request.method == "OPTIONS":
self._send_response(request, 200, {}) self._send_response(request, 200, {})
return return
start_context = LoggingContext.current_context()
# Loop through all the registered callbacks to check if the method # Loop through all the registered callbacks to check if the method
# and path regex match # and path regex match
for path_entry in self.path_regexs.get(request.method, []): for path_entry in self.path_regexs.get(request.method, []):
@ -243,6 +246,13 @@ class JsonResource(HttpServer, resource.Resource):
if context: if context:
tag = context.tag tag = context.tag
if context != start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
context, self.start_context
)
return
incoming_requests_counter.inc(request.method, servlet_classname, tag) incoming_requests_counter.inc(request.method, servlet_classname, tag)
response_timer.inc_by( response_timer.inc_by(

View file

@ -17,6 +17,8 @@ from twisted.internet import defer
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken from synapse.types import StreamToken
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
import synapse.util.async import synapse.util.async
import push_rule_evaluator as push_rule_evaluator import push_rule_evaluator as push_rule_evaluator
@ -27,6 +29,16 @@ import random
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_NEXT_ID = 1
def _get_next_id():
global _NEXT_ID
_id = _NEXT_ID
_NEXT_ID += 1
return _id
# Pushers could now be moved to pull out of the event_push_actions table instead # Pushers could now be moved to pull out of the event_push_actions table instead
# of listening on the event stream: this would avoid them having to run the # of listening on the event stream: this would avoid them having to run the
# rules again. # rules again.
@ -57,6 +69,8 @@ class Pusher(object):
self.alive = True self.alive = True
self.badge = None self.badge = None
self.name = "Pusher-%d" % (_get_next_id(),)
# The last value of last_active_time that we saw # The last value of last_active_time that we saw
self.last_last_active_time = 0 self.last_last_active_time = 0
self.has_unread = True self.has_unread = True
@ -86,38 +100,46 @@ class Pusher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
if not self.last_token: with LoggingContext(self.name):
# First-time setup: get a token to start from (we can't if not self.last_token:
# just start from no token, ie. 'now' # First-time setup: get a token to start from (we can't
# because we need the result to be reproduceable in case # just start from no token, ie. 'now'
# we fail to dispatch the push) # because we need the result to be reproduceable in case
config = PaginationConfig(from_token=None, limit='1') # we fail to dispatch the push)
chunk = yield self.evStreamHandler.get_stream( config = PaginationConfig(from_token=None, limit='1')
self.user_id, config, timeout=0, affect_presence=False chunk = yield self.evStreamHandler.get_stream(
) self.user_id, config, timeout=0, affect_presence=False
self.last_token = chunk['end']
self.store.update_pusher_last_token(
self.app_id, self.pushkey, self.user_id, self.last_token
)
logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_id, self.last_token)
wait = 0
while self.alive:
try:
if wait > 0:
yield synapse.util.async.sleep(wait)
yield self.get_and_dispatch()
wait = 0
except:
if wait == 0:
wait = 1
else:
wait = min(wait * 2, 1800)
logger.exception(
"Exception in pusher loop for pushkey %s. Pausing for %ds",
self.pushkey, wait
) )
self.last_token = chunk['end']
self.store.update_pusher_last_token(
self.app_id, self.pushkey, self.user_id, self.last_token
)
logger.info("New pusher %s for user %s starting from token %s",
self.pushkey, self.user_id, self.last_token)
else:
logger.info(
"Old pusher %s for user %s starting",
self.pushkey, self.user_id,
)
wait = 0
while self.alive:
try:
if wait > 0:
yield synapse.util.async.sleep(wait)
with Measure(self.clock, "push"):
yield self.get_and_dispatch()
wait = 0
except:
if wait == 0:
wait = 1
else:
wait = min(wait * 2, 1800)
logger.exception(
"Exception in pusher loop for pushkey %s. Pausing for %ds",
self.pushkey, wait
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_and_dispatch(self): def get_and_dispatch(self):

86
synapse/util/metrics.py Normal file
View file

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.logcontext import LoggingContext
import synapse.metrics
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
block_timer = metrics.register_distribution(
"block_timer",
labels=["block_name"]
)
block_ru_utime = metrics.register_distribution(
"block_ru_utime", labels=["block_name"]
)
block_ru_stime = metrics.register_distribution(
"block_ru_stime", labels=["block_name"]
)
block_db_txn_count = metrics.register_distribution(
"block_db_txn_count", labels=["block_name"]
)
block_db_txn_duration = metrics.register_distribution(
"block_db_txn_duration", labels=["block_name"]
)
class Measure(object):
__slots__ = ["clock", "name", "start_context", "start"]
def __init__(self, clock, name):
self.clock = clock
self.name = name
self.start_context = None
self.start = None
def __enter__(self):
self.start = self.clock.time_msec()
self.start_context = LoggingContext.current_context()
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
return
duration = self.clock.time_msec() - self.start
block_timer.inc_by(duration, self.name)
context = LoggingContext.current_context()
if not context:
return
if context != self.start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
context, self.start_context
)
return
ru_utime, ru_stime = context.get_resource_usage()
block_ru_utime.inc_by(ru_utime, self.name)
block_ru_stime.inc_by(ru_stime, self.name)
block_db_txn_count.inc_by(context.db_txn_count, self.name)
block_db_txn_duration.inc_by(context.db_txn_duration, self.name)