forked from MirrorHub/synapse
Merge pull request #4075 from matrix-org/rav/fix_pusher_logcontexts
Clean up the way logcontexts and threads work in the pushers
This commit is contained in:
commit
e0b9d5f0af
8 changed files with 142 additions and 141 deletions
1
changelog.d/4075.misc
Normal file
1
changelog.d/4075.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Clean up threading and logcontexts in pushers
|
|
@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
|||
else:
|
||||
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
elif stream_name == "events":
|
||||
self.pusher_pool.on_new_notifications(
|
||||
yield self.pusher_pool.on_new_notifications(
|
||||
token, token,
|
||||
)
|
||||
elif stream_name == "receipts":
|
||||
self.pusher_pool.on_new_receipts(
|
||||
yield self.pusher_pool.on_new_receipts(
|
||||
token, token, set(row.room_id for row in rows)
|
||||
)
|
||||
except Exception:
|
||||
|
@ -183,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
|||
def start_pusher(self, user_id, app_id, pushkey):
|
||||
key = "%s:%s" % (app_id, pushkey)
|
||||
logger.info("Starting pusher %r / %r", user_id, key)
|
||||
return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
|
||||
return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
|
||||
|
||||
|
||||
def start(config_options):
|
||||
|
|
|
@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
if not backfilled: # Never notify for backfilled events
|
||||
for event, _ in event_and_contexts:
|
||||
self._notify_persisted_event(event, max_stream_id)
|
||||
yield self._notify_persisted_event(event, max_stream_id)
|
||||
|
||||
def _notify_persisted_event(self, event, max_stream_id):
|
||||
"""Checks to see if notifier/pushers should be notified about the
|
||||
|
@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
|
|||
extra_users=extra_users
|
||||
)
|
||||
|
||||
self.pusher_pool.on_new_notifications(
|
||||
return self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
|
|
|
@ -779,7 +779,7 @@ class EventCreationHandler(object):
|
|||
event, context=context
|
||||
)
|
||||
|
||||
self.pusher_pool.on_new_notifications(
|
||||
yield self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
|
|||
"receipt_key", max_batch_id, rooms=affected_room_ids
|
||||
)
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
self.hs.get_pusherpool().on_new_receipts(
|
||||
yield self.hs.get_pusherpool().on_new_receipts(
|
||||
min_batch_id, max_batch_id, affected_room_ids,
|
||||
)
|
||||
|
||||
|
|
|
@ -18,8 +18,7 @@ import logging
|
|||
from twisted.internet import defer
|
||||
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
|
||||
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -71,18 +70,11 @@ class EmailPusher(object):
|
|||
# See httppusher
|
||||
self.max_stream_ordering = None
|
||||
|
||||
self.processing = False
|
||||
self._is_processing = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_started(self):
|
||||
if self.mailer is not None:
|
||||
try:
|
||||
self.throttle_params = yield self.store.get_throttle_params_by_room(
|
||||
self.pusher_id
|
||||
)
|
||||
yield self._process()
|
||||
except Exception:
|
||||
logger.exception("Error starting email pusher")
|
||||
self._start_processing()
|
||||
|
||||
def on_stop(self):
|
||||
if self.timed_call:
|
||||
|
@ -92,43 +84,52 @@ class EmailPusher(object):
|
|||
pass
|
||||
self.timed_call = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
|
||||
yield self._process()
|
||||
self._start_processing()
|
||||
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id):
|
||||
# We could wake up and cancel the timer but there tend to be quite a
|
||||
# lot of read receipts so it's probably less work to just let the
|
||||
# timer fire
|
||||
return defer.succeed(None)
|
||||
pass
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_timer(self):
|
||||
self.timed_call = None
|
||||
yield self._process()
|
||||
self._start_processing()
|
||||
|
||||
def _start_processing(self):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
run_as_background_process("emailpush.process", self._process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process(self):
|
||||
if self.processing:
|
||||
return
|
||||
# we should never get here if we are already processing
|
||||
assert not self._is_processing
|
||||
|
||||
with LoggingContext("emailpush._process"):
|
||||
with Measure(self.clock, "emailpush._process"):
|
||||
try:
|
||||
self._is_processing = True
|
||||
|
||||
if self.throttle_params is None:
|
||||
# this is our first loop: load up the throttle params
|
||||
self.throttle_params = yield self.store.get_throttle_params_by_room(
|
||||
self.pusher_id
|
||||
)
|
||||
|
||||
# if the max ordering changes while we're running _unsafe_process,
|
||||
# call it again, and so on until we've caught up.
|
||||
while True:
|
||||
starting_max_ordering = self.max_stream_ordering
|
||||
try:
|
||||
self.processing = True
|
||||
# if the max ordering changes while we're running _unsafe_process,
|
||||
# call it again, and so on until we've caught up.
|
||||
while True:
|
||||
starting_max_ordering = self.max_stream_ordering
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
except Exception:
|
||||
logger.exception("Exception processing notifs")
|
||||
if self.max_stream_ordering == starting_max_ordering:
|
||||
break
|
||||
finally:
|
||||
self.processing = False
|
||||
yield self._unsafe_process()
|
||||
except Exception:
|
||||
logger.exception("Exception processing notifs")
|
||||
if self.max_stream_ordering == starting_max_ordering:
|
||||
break
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
|
|
|
@ -22,9 +22,8 @@ from prometheus_client import Counter
|
|||
from twisted.internet import defer
|
||||
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.push import PusherConfigException
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
from . import push_rule_evaluator, push_tools
|
||||
|
||||
|
@ -61,7 +60,7 @@ class HttpPusher(object):
|
|||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||
self.failing_since = pusherdict['failing_since']
|
||||
self.timed_call = None
|
||||
self.processing = False
|
||||
self._is_processing = False
|
||||
|
||||
# This is the highest stream ordering we know it's safe to process.
|
||||
# When new events arrive, we'll be given a window of new events: we
|
||||
|
@ -92,34 +91,27 @@ class HttpPusher(object):
|
|||
self.data_minus_url.update(self.data)
|
||||
del self.data_minus_url['url']
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_started(self):
|
||||
try:
|
||||
yield self._process()
|
||||
except Exception:
|
||||
logger.exception("Error starting http pusher")
|
||||
self._start_processing()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
|
||||
yield self._process()
|
||||
self._start_processing()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id):
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
|
||||
# We could check the receipts are actually m.read receipts here,
|
||||
# but currently that's the only type of receipt anyway...
|
||||
with LoggingContext("push.on_new_receipts"):
|
||||
with Measure(self.clock, "push.on_new_receipts"):
|
||||
badge = yield push_tools.get_badge_count(
|
||||
self.hs.get_datastore(), self.user_id
|
||||
)
|
||||
yield self._send_badge(badge)
|
||||
run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_badge(self):
|
||||
badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
|
||||
yield self._send_badge(badge)
|
||||
|
||||
def on_timer(self):
|
||||
yield self._process()
|
||||
self._start_processing()
|
||||
|
||||
def on_stop(self):
|
||||
if self.timed_call:
|
||||
|
@ -129,27 +121,31 @@ class HttpPusher(object):
|
|||
pass
|
||||
self.timed_call = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process(self):
|
||||
if self.processing:
|
||||
def _start_processing(self):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
with LoggingContext("push._process"):
|
||||
with Measure(self.clock, "push._process"):
|
||||
run_as_background_process("httppush.process", self._process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process(self):
|
||||
# we should never get here if we are already processing
|
||||
assert not self._is_processing
|
||||
|
||||
try:
|
||||
self._is_processing = True
|
||||
# if the max ordering changes while we're running _unsafe_process,
|
||||
# call it again, and so on until we've caught up.
|
||||
while True:
|
||||
starting_max_ordering = self.max_stream_ordering
|
||||
try:
|
||||
self.processing = True
|
||||
# if the max ordering changes while we're running _unsafe_process,
|
||||
# call it again, and so on until we've caught up.
|
||||
while True:
|
||||
starting_max_ordering = self.max_stream_ordering
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
except Exception:
|
||||
logger.exception("Exception processing notifs")
|
||||
if self.max_stream_ordering == starting_max_ordering:
|
||||
break
|
||||
finally:
|
||||
self.processing = False
|
||||
yield self._unsafe_process()
|
||||
except Exception:
|
||||
logger.exception("Exception processing notifs")
|
||||
if self.max_stream_ordering == starting_max_ordering:
|
||||
break
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
|
|
|
@ -20,24 +20,39 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.push.pusher import PusherFactory
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PusherPool:
|
||||
"""
|
||||
The pusher pool. This is responsible for dispatching notifications of new events to
|
||||
the http and email pushers.
|
||||
|
||||
It provides three methods which are designed to be called by the rest of the
|
||||
application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
|
||||
delegates to each of the relevant pushers.
|
||||
|
||||
Note that it is expected that each pusher will have its own 'processing' loop which
|
||||
will send out the notifications in the background, rather than blocking until the
|
||||
notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
|
||||
Pusher.on_new_receipts are not expected to return deferreds.
|
||||
"""
|
||||
def __init__(self, _hs):
|
||||
self.hs = _hs
|
||||
self.pusher_factory = PusherFactory(_hs)
|
||||
self.start_pushers = _hs.config.start_pushers
|
||||
self._should_start_pushers = _hs.config.start_pushers
|
||||
self.store = self.hs.get_datastore()
|
||||
self.clock = self.hs.get_clock()
|
||||
self.pushers = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start(self):
|
||||
pushers = yield self.store.get_all_pushers()
|
||||
self._start_pushers(pushers)
|
||||
"""Starts the pushers off in a background process.
|
||||
"""
|
||||
if not self._should_start_pushers:
|
||||
logger.info("Not starting pushers because they are disabled in the config")
|
||||
return
|
||||
run_as_background_process("start_pushers", self._start_pushers)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_pusher(self, user_id, access_token, kind, app_id,
|
||||
|
@ -86,7 +101,7 @@ class PusherPool:
|
|||
last_stream_ordering=last_stream_ordering,
|
||||
profile_tag=profile_tag,
|
||||
)
|
||||
yield self._refresh_pusher(app_id, pushkey, user_id)
|
||||
yield self.start_pusher_by_id(app_id, pushkey, user_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
|
||||
|
@ -123,45 +138,23 @@ class PusherPool:
|
|||
p['app_id'], p['pushkey'], p['user_name'],
|
||||
)
|
||||
|
||||
def on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
run_as_background_process(
|
||||
"on_new_notifications",
|
||||
self._on_new_notifications, min_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
def on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
try:
|
||||
users_affected = yield self.store.get_push_action_users_in_range(
|
||||
min_stream_id, max_stream_id
|
||||
)
|
||||
|
||||
deferreds = []
|
||||
|
||||
for u in users_affected:
|
||||
if u in self.pushers:
|
||||
for p in self.pushers[u].values():
|
||||
deferreds.append(
|
||||
run_in_background(
|
||||
p.on_new_notifications,
|
||||
min_stream_id, max_stream_id,
|
||||
)
|
||||
)
|
||||
p.on_new_notifications(min_stream_id, max_stream_id)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(deferreds, consumeErrors=True),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Exception in pusher on_new_notifications")
|
||||
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||
run_as_background_process(
|
||||
"on_new_receipts",
|
||||
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||
try:
|
||||
# Need to subtract 1 from the minimum because the lower bound here
|
||||
# is not inclusive
|
||||
|
@ -171,26 +164,20 @@ class PusherPool:
|
|||
# This returns a tuple, user_id is at index 3
|
||||
users_affected = set([r[3] for r in updated_receipts])
|
||||
|
||||
deferreds = []
|
||||
|
||||
for u in users_affected:
|
||||
if u in self.pushers:
|
||||
for p in self.pushers[u].values():
|
||||
deferreds.append(
|
||||
run_in_background(
|
||||
p.on_new_receipts,
|
||||
min_stream_id, max_stream_id,
|
||||
)
|
||||
)
|
||||
p.on_new_receipts(min_stream_id, max_stream_id)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(deferreds, consumeErrors=True),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Exception in pusher on_new_receipts")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _refresh_pusher(self, app_id, pushkey, user_id):
|
||||
def start_pusher_by_id(self, app_id, pushkey, user_id):
|
||||
"""Look up the details for the given pusher, and start it"""
|
||||
if not self._should_start_pushers:
|
||||
return
|
||||
|
||||
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
|
||||
app_id, pushkey
|
||||
)
|
||||
|
@ -201,34 +188,50 @@ class PusherPool:
|
|||
p = r
|
||||
|
||||
if p:
|
||||
self._start_pusher(p)
|
||||
|
||||
self._start_pushers([p])
|
||||
@defer.inlineCallbacks
|
||||
def _start_pushers(self):
|
||||
"""Start all the pushers
|
||||
|
||||
def _start_pushers(self, pushers):
|
||||
if not self.start_pushers:
|
||||
logger.info("Not starting pushers because they are disabled in the config")
|
||||
return
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
pushers = yield self.store.get_all_pushers()
|
||||
logger.info("Starting %d pushers", len(pushers))
|
||||
for pusherdict in pushers:
|
||||
try:
|
||||
p = self.pusher_factory.create_pusher(pusherdict)
|
||||
except Exception:
|
||||
logger.exception("Couldn't start a pusher: caught Exception")
|
||||
continue
|
||||
if p:
|
||||
appid_pushkey = "%s:%s" % (
|
||||
pusherdict['app_id'],
|
||||
pusherdict['pushkey'],
|
||||
)
|
||||
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
|
||||
|
||||
if appid_pushkey in byuser:
|
||||
byuser[appid_pushkey].on_stop()
|
||||
byuser[appid_pushkey] = p
|
||||
run_in_background(p.on_started)
|
||||
|
||||
self._start_pusher(pusherdict)
|
||||
logger.info("Started pushers")
|
||||
|
||||
def _start_pusher(self, pusherdict):
|
||||
"""Start the given pusher
|
||||
|
||||
Args:
|
||||
pusherdict (dict):
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
try:
|
||||
p = self.pusher_factory.create_pusher(pusherdict)
|
||||
except Exception:
|
||||
logger.exception("Couldn't start a pusher: caught Exception")
|
||||
return
|
||||
|
||||
if not p:
|
||||
return
|
||||
|
||||
appid_pushkey = "%s:%s" % (
|
||||
pusherdict['app_id'],
|
||||
pusherdict['pushkey'],
|
||||
)
|
||||
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
|
||||
|
||||
if appid_pushkey in byuser:
|
||||
byuser[appid_pushkey].on_stop()
|
||||
byuser[appid_pushkey] = p
|
||||
p.on_started()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def remove_pusher(self, app_id, pushkey, user_id):
|
||||
appid_pushkey = "%s:%s" % (app_id, pushkey)
|
||||
|
|
Loading…
Reference in a new issue