forked from MirrorHub/synapse
Make starting pushers faster during start up
We start all pushers on start up and immediately start a background process to fetch push to send. This makes start up incredibly painful when dealing with many pushers. Instead, let's do a quick fast DB check to see if there *may* be push to send and only start the background processes for those pushers. We also stagger starting up and doing those checks so that we don't try and handle all pushers at once.
This commit is contained in:
parent
297bf2547e
commit
5bec8d660d
4 changed files with 73 additions and 8 deletions
|
@ -72,8 +72,15 @@ class EmailPusher(object):
|
||||||
|
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
def on_started(self):
|
def on_started(self, should_check_for_notifs):
|
||||||
if self.mailer is not None:
|
"""Called when this pusher has been started.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
should_check_for_notifs (bool): Whether we should immediately
|
||||||
|
check for push to send. Set to False only if it's known there
|
||||||
|
is nothing to send
|
||||||
|
"""
|
||||||
|
if should_check_for_notifs and self.mailer is not None:
|
||||||
self._start_processing()
|
self._start_processing()
|
||||||
|
|
||||||
def on_stop(self):
|
def on_stop(self):
|
||||||
|
|
|
@ -112,7 +112,15 @@ class HttpPusher(object):
|
||||||
self.data_minus_url.update(self.data)
|
self.data_minus_url.update(self.data)
|
||||||
del self.data_minus_url['url']
|
del self.data_minus_url['url']
|
||||||
|
|
||||||
def on_started(self):
|
def on_started(self, should_check_for_notifs):
|
||||||
|
"""Called when this pusher has been started.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
should_check_for_notifs (bool): Whether we should immediately
|
||||||
|
check for push to send. Set to False only if it's known there
|
||||||
|
is nothing to send
|
||||||
|
"""
|
||||||
|
if should_check_for_notifs:
|
||||||
self._start_processing()
|
self._start_processing()
|
||||||
|
|
||||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||||
|
|
|
@ -21,6 +21,7 @@ from twisted.internet import defer
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.push import PusherConfigException
|
from synapse.push import PusherConfigException
|
||||||
from synapse.push.pusher import PusherFactory
|
from synapse.push.pusher import PusherFactory
|
||||||
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -197,7 +198,7 @@ class PusherPool:
|
||||||
p = r
|
p = r
|
||||||
|
|
||||||
if p:
|
if p:
|
||||||
self._start_pusher(p)
|
yield self._start_pusher(p)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _start_pushers(self):
|
def _start_pushers(self):
|
||||||
|
@ -208,10 +209,14 @@ class PusherPool:
|
||||||
"""
|
"""
|
||||||
pushers = yield self.store.get_all_pushers()
|
pushers = yield self.store.get_all_pushers()
|
||||||
logger.info("Starting %d pushers", len(pushers))
|
logger.info("Starting %d pushers", len(pushers))
|
||||||
for pusherdict in pushers:
|
|
||||||
self._start_pusher(pusherdict)
|
# Stagger starting up the pushers so we don't completely drown the
|
||||||
|
# process on start up.
|
||||||
|
yield concurrently_execute(self._start_pusher, pushers, 10)
|
||||||
|
|
||||||
logger.info("Started pushers")
|
logger.info("Started pushers")
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def _start_pusher(self, pusherdict):
|
def _start_pusher(self, pusherdict):
|
||||||
"""Start the given pusher
|
"""Start the given pusher
|
||||||
|
|
||||||
|
@ -248,7 +253,22 @@ class PusherPool:
|
||||||
if appid_pushkey in byuser:
|
if appid_pushkey in byuser:
|
||||||
byuser[appid_pushkey].on_stop()
|
byuser[appid_pushkey].on_stop()
|
||||||
byuser[appid_pushkey] = p
|
byuser[appid_pushkey] = p
|
||||||
p.on_started()
|
|
||||||
|
# Check if there *may* be push to process. We do this as this check is a
|
||||||
|
# lot cheaper to do than actually fetching the exact rows we need to
|
||||||
|
# push.
|
||||||
|
user_id = pusherdict["user_name"]
|
||||||
|
last_stream_ordering = pusherdict["last_stream_ordering"]
|
||||||
|
if last_stream_ordering:
|
||||||
|
have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
|
||||||
|
user_id, last_stream_ordering,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# We always want to default to starting up the pusher rather than
|
||||||
|
# risk missing push.
|
||||||
|
have_notifs = True
|
||||||
|
|
||||||
|
p.on_started(have_notifs)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def remove_pusher(self, app_id, pushkey, user_id):
|
def remove_pusher(self, app_id, pushkey, user_id):
|
||||||
|
|
|
@ -386,6 +386,36 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
||||||
# Now return the first `limit`
|
# Now return the first `limit`
|
||||||
defer.returnValue(notifs[:limit])
|
defer.returnValue(notifs[:limit])
|
||||||
|
|
||||||
|
def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
|
||||||
|
"""A fast check to see if there might be something to push for the
|
||||||
|
user since the given stream ordering. May return false positives.
|
||||||
|
|
||||||
|
Useful to know whether to bother starting a pusher on start up or not.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id (str)
|
||||||
|
min_stream_ordering (int)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[bool]: True if there may be push to process, False if
|
||||||
|
there definitely isn't.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _get_if_maybe_push_in_range_for_user_txn(txn):
|
||||||
|
sql = """
|
||||||
|
SELECT 1 FROM event_push_actions
|
||||||
|
WHERE user_id = ? AND stream_ordering > ?
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (user_id, min_stream_ordering,))
|
||||||
|
return bool(txn.fetchone())
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_if_maybe_push_in_range_for_user",
|
||||||
|
_get_if_maybe_push_in_range_for_user_txn,
|
||||||
|
)
|
||||||
|
|
||||||
def add_push_actions_to_staging(self, event_id, user_id_actions):
|
def add_push_actions_to_staging(self, event_id, user_id_actions):
|
||||||
"""Add the push actions for the event to the push action staging area.
|
"""Add the push actions for the event to the push action staging area.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue