Merge pull request #724 from matrix-org/erikj/push_measure

Add push index. Add extra Measure
This commit is contained in:
Erik Johnston 2016-04-14 11:46:46 +01:00
commit ff1d333a02
3 changed files with 50 additions and 25 deletions

View file

@ -21,6 +21,7 @@ import logging
import push_rule_evaluator import push_rule_evaluator
import push_tools import push_tools
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -85,9 +86,8 @@ class HttpPusher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering): def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
with Measure(self.clock, "push.on_new_notifications"): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) yield self._process()
yield self._process()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id): def on_new_receipts(self, min_stream_id, max_stream_id):
@ -95,16 +95,16 @@ class HttpPusher(object):
# We could check the receipts are actually m.read receipts here, # We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway... # but currently that's the only type of receipt anyway...
with Measure(self.clock, "push.on_new_receipts"): with LoggingContext("push._process"):
badge = yield push_tools.get_badge_count( with Measure(self.clock, "push.on_new_receipts"):
self.hs.get_datastore(), self.user_id badge = yield push_tools.get_badge_count(
) self.hs.get_datastore(), self.user_id
yield self.send_badge(badge) )
yield self._send_badge(badge)
@defer.inlineCallbacks @defer.inlineCallbacks
def on_timer(self): def on_timer(self):
with Measure(self.clock, "push.on_timer"): yield self._process()
yield self._process()
def on_stop(self): def on_stop(self):
if self.timed_call: if self.timed_call:
@ -114,20 +114,23 @@ class HttpPusher(object):
def _process(self): def _process(self):
if self.processing: if self.processing:
return return
try:
self.processing = True with LoggingContext("push._process"):
# if the max ordering changes while we're running _unsafe_process, with Measure(self.clock, "push._process"):
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try: try:
yield self._unsafe_process() self.processing = True
except: # if the max ordering changes while we're running _unsafe_process,
logger.exception("Exception processing notifs") # call it again, and so on until we've caught up.
if self.max_stream_ordering == starting_max_ordering: while True:
break starting_max_ordering = self.max_stream_ordering
finally: try:
self.processing = False yield self._unsafe_process()
except:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
@defer.inlineCallbacks @defer.inlineCallbacks
def _unsafe_process(self): def _unsafe_process(self):
@ -291,7 +294,7 @@ class HttpPusher(object):
defer.returnValue(rejected) defer.returnValue(rejected)
@defer.inlineCallbacks @defer.inlineCallbacks
def send_badge(self, badge): def _send_badge(self, badge):
logger.info("Sending updated badge count %d to %r", badge, self.user_id) logger.info("Sending updated badge count %d to %r", badge, self.user_id)
d = { d = {
'notification': { 'notification': {

View file

@ -137,7 +137,11 @@ class PusherStore(SQLBaseStore):
users = yield self.get_users_in_room(room_id) users = yield self.get_users_in_room(room_id)
result = yield self._simple_select_many_batch( result = yield self._simple_select_many_batch(
'pushers', 'user_name', users, ['user_name'] table='pushers',
column='user_name',
iterable=users,
retcols=['user_name'],
desc='get_users_with_pushers_in_room'
) )
defer.returnValue([r['user_name'] for r in result]) defer.returnValue([r['user_name'] for r in result])

View file

@ -0,0 +1,18 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE INDEX event_push_actions_stream_ordering on event_push_actions(
stream_ordering, user_id
);