Make on_started synchronous too

This brings it into line with on_new_notifications and on_new_receipts. It
requires a little bit of hoop-jumping in EmailPusher to load the throttle
params before the first loop.
This commit is contained in:
Richard van der Hoff 2018-10-22 16:12:11 +01:00
parent e7a16c6210
commit f749607c91
3 changed files with 21 additions and 11 deletions

View file

@ -72,16 +72,9 @@ class EmailPusher(object):
self.processing = False self.processing = False
@defer.inlineCallbacks
def on_started(self): def on_started(self):
if self.mailer is not None: if self.mailer is not None:
try: self._start_processing()
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
self._start_processing()
except Exception:
logger.exception("Error starting email pusher")
def on_stop(self): def on_stop(self):
if self.timed_call: if self.timed_call:
@ -116,6 +109,12 @@ class EmailPusher(object):
try: try:
self.processing = True self.processing = True
if self.throttle_params is None:
# this is our first loop: load up the throttle params
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
# if the max ordering changes while we're running _unsafe_process, # if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up. # call it again, and so on until we've caught up.
while True: while True:

View file

@ -93,7 +93,6 @@ class HttpPusher(object):
def on_started(self): def on_started(self):
self._start_processing() self._start_processing()
return defer.succeed(None)
def on_new_notifications(self, min_stream_ordering, max_stream_ordering): def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)

View file

@ -19,12 +19,24 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.push.pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import run_in_background
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PusherPool: class PusherPool:
"""
The pusher pool. This is responsible for dispatching notifications of new events to
the http and email pushers.
It provides three methods which are designed to be called by the rest of the
application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
delegates to each of the relevant pushers.
Note that it is expected that each pusher will have its own 'processing' loop which
will send out the notifications in the background, rather than blocking until the
notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
Pusher.on_new_receipts are not expected to return deferreds.
"""
def __init__(self, _hs): def __init__(self, _hs):
self.hs = _hs self.hs = _hs
self.pusher_factory = PusherFactory(_hs) self.pusher_factory = PusherFactory(_hs)
@ -216,7 +228,7 @@ class PusherPool:
if appid_pushkey in byuser: if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop() byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p byuser[appid_pushkey] = p
run_in_background(p.on_started) p.on_started()
@defer.inlineCallbacks @defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id): def remove_pusher(self, app_id, pushkey, user_id):