forked from MirrorHub/synapse
Don't measure across event stream call, as it lasts for a long time.
This commit is contained in:
parent
c486b7b41c
commit
549698b1e0
1 changed files with 122 additions and 122 deletions
|
@ -128,8 +128,7 @@ class Pusher(object):
|
||||||
try:
|
try:
|
||||||
if wait > 0:
|
if wait > 0:
|
||||||
yield synapse.util.async.sleep(wait)
|
yield synapse.util.async.sleep(wait)
|
||||||
with Measure(self.clock, "push"):
|
yield self.get_and_dispatch()
|
||||||
yield self.get_and_dispatch()
|
|
||||||
wait = 0
|
wait = 0
|
||||||
except:
|
except:
|
||||||
if wait == 0:
|
if wait == 0:
|
||||||
|
@ -151,115 +150,27 @@ class Pusher(object):
|
||||||
only_keys=("room", "receipt",),
|
only_keys=("room", "receipt",),
|
||||||
)
|
)
|
||||||
|
|
||||||
# limiting to 1 may get 1 event plus 1 presence event, so
|
with Measure(self.clock, "push"):
|
||||||
# pick out the actual event
|
# limiting to 1 may get 1 event plus 1 presence event, so
|
||||||
single_event = None
|
# pick out the actual event
|
||||||
read_receipt = None
|
single_event = None
|
||||||
for c in chunk['chunk']:
|
read_receipt = None
|
||||||
if 'event_id' in c: # Hmmm...
|
for c in chunk['chunk']:
|
||||||
single_event = c
|
if 'event_id' in c: # Hmmm...
|
||||||
elif c['type'] == 'm.receipt':
|
single_event = c
|
||||||
read_receipt = c
|
elif c['type'] == 'm.receipt':
|
||||||
|
read_receipt = c
|
||||||
|
|
||||||
have_updated_badge = False
|
have_updated_badge = False
|
||||||
if read_receipt:
|
if read_receipt:
|
||||||
for receipt_part in read_receipt['content'].values():
|
for receipt_part in read_receipt['content'].values():
|
||||||
if 'm.read' in receipt_part:
|
if 'm.read' in receipt_part:
|
||||||
if self.user_id in receipt_part['m.read'].keys():
|
if self.user_id in receipt_part['m.read'].keys():
|
||||||
have_updated_badge = True
|
have_updated_badge = True
|
||||||
|
|
||||||
if not single_event:
|
if not single_event:
|
||||||
if have_updated_badge:
|
if have_updated_badge:
|
||||||
yield self.update_badge()
|
yield self.update_badge()
|
||||||
self.last_token = chunk['end']
|
|
||||||
yield self.store.update_pusher_last_token(
|
|
||||||
self.app_id,
|
|
||||||
self.pushkey,
|
|
||||||
self.user_id,
|
|
||||||
self.last_token
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
if not self.alive:
|
|
||||||
return
|
|
||||||
|
|
||||||
processed = False
|
|
||||||
|
|
||||||
rule_evaluator = yield \
|
|
||||||
push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
|
|
||||||
self.user_id, self.profile_tag, single_event['room_id'], self.store
|
|
||||||
)
|
|
||||||
|
|
||||||
actions = yield rule_evaluator.actions_for_event(single_event)
|
|
||||||
tweaks = rule_evaluator.tweaks_for_actions(actions)
|
|
||||||
|
|
||||||
if 'notify' in actions:
|
|
||||||
self.badge = yield self._get_badge_count()
|
|
||||||
rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
|
|
||||||
self.has_unread = True
|
|
||||||
if isinstance(rejected, list) or isinstance(rejected, tuple):
|
|
||||||
processed = True
|
|
||||||
for pk in rejected:
|
|
||||||
if pk != self.pushkey:
|
|
||||||
# for sanity, we only remove the pushkey if it
|
|
||||||
# was the one we actually sent...
|
|
||||||
logger.warn(
|
|
||||||
("Ignoring rejected pushkey %s because we"
|
|
||||||
" didn't send it"), pk
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.info(
|
|
||||||
"Pushkey %s was rejected: removing",
|
|
||||||
pk
|
|
||||||
)
|
|
||||||
yield self.hs.get_pusherpool().remove_pusher(
|
|
||||||
self.app_id, pk, self.user_id
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if have_updated_badge:
|
|
||||||
yield self.update_badge()
|
|
||||||
processed = True
|
|
||||||
|
|
||||||
if not self.alive:
|
|
||||||
return
|
|
||||||
|
|
||||||
if processed:
|
|
||||||
self.backoff_delay = Pusher.INITIAL_BACKOFF
|
|
||||||
self.last_token = chunk['end']
|
|
||||||
yield self.store.update_pusher_last_token_and_success(
|
|
||||||
self.app_id,
|
|
||||||
self.pushkey,
|
|
||||||
self.user_id,
|
|
||||||
self.last_token,
|
|
||||||
self.clock.time_msec()
|
|
||||||
)
|
|
||||||
if self.failing_since:
|
|
||||||
self.failing_since = None
|
|
||||||
yield self.store.update_pusher_failing_since(
|
|
||||||
self.app_id,
|
|
||||||
self.pushkey,
|
|
||||||
self.user_id,
|
|
||||||
self.failing_since)
|
|
||||||
else:
|
|
||||||
if not self.failing_since:
|
|
||||||
self.failing_since = self.clock.time_msec()
|
|
||||||
yield self.store.update_pusher_failing_since(
|
|
||||||
self.app_id,
|
|
||||||
self.pushkey,
|
|
||||||
self.user_id,
|
|
||||||
self.failing_since
|
|
||||||
)
|
|
||||||
|
|
||||||
if (self.failing_since and
|
|
||||||
self.failing_since <
|
|
||||||
self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
|
|
||||||
# we really only give up so that if the URL gets
|
|
||||||
# fixed, we don't suddenly deliver a load
|
|
||||||
# of old notifications.
|
|
||||||
logger.warn("Giving up on a notification to user %s, "
|
|
||||||
"pushkey %s",
|
|
||||||
self.user_id, self.pushkey)
|
|
||||||
self.backoff_delay = Pusher.INITIAL_BACKOFF
|
|
||||||
self.last_token = chunk['end']
|
self.last_token = chunk['end']
|
||||||
yield self.store.update_pusher_last_token(
|
yield self.store.update_pusher_last_token(
|
||||||
self.app_id,
|
self.app_id,
|
||||||
|
@ -267,25 +178,114 @@ class Pusher(object):
|
||||||
self.user_id,
|
self.user_id,
|
||||||
self.last_token
|
self.last_token
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
self.failing_since = None
|
if not self.alive:
|
||||||
yield self.store.update_pusher_failing_since(
|
return
|
||||||
|
|
||||||
|
processed = False
|
||||||
|
|
||||||
|
rule_evaluator = yield \
|
||||||
|
push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
|
||||||
|
self.user_id, self.profile_tag, single_event['room_id'], self.store
|
||||||
|
)
|
||||||
|
|
||||||
|
actions = yield rule_evaluator.actions_for_event(single_event)
|
||||||
|
tweaks = rule_evaluator.tweaks_for_actions(actions)
|
||||||
|
|
||||||
|
if 'notify' in actions:
|
||||||
|
self.badge = yield self._get_badge_count()
|
||||||
|
rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
|
||||||
|
self.has_unread = True
|
||||||
|
if isinstance(rejected, list) or isinstance(rejected, tuple):
|
||||||
|
processed = True
|
||||||
|
for pk in rejected:
|
||||||
|
if pk != self.pushkey:
|
||||||
|
# for sanity, we only remove the pushkey if it
|
||||||
|
# was the one we actually sent...
|
||||||
|
logger.warn(
|
||||||
|
("Ignoring rejected pushkey %s because we"
|
||||||
|
" didn't send it"), pk
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"Pushkey %s was rejected: removing",
|
||||||
|
pk
|
||||||
|
)
|
||||||
|
yield self.hs.get_pusherpool().remove_pusher(
|
||||||
|
self.app_id, pk, self.user_id
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if have_updated_badge:
|
||||||
|
yield self.update_badge()
|
||||||
|
processed = True
|
||||||
|
|
||||||
|
if not self.alive:
|
||||||
|
return
|
||||||
|
|
||||||
|
if processed:
|
||||||
|
self.backoff_delay = Pusher.INITIAL_BACKOFF
|
||||||
|
self.last_token = chunk['end']
|
||||||
|
yield self.store.update_pusher_last_token_and_success(
|
||||||
self.app_id,
|
self.app_id,
|
||||||
self.pushkey,
|
self.pushkey,
|
||||||
self.user_id,
|
self.user_id,
|
||||||
self.failing_since
|
self.last_token,
|
||||||
|
self.clock.time_msec()
|
||||||
)
|
)
|
||||||
|
if self.failing_since:
|
||||||
|
self.failing_since = None
|
||||||
|
yield self.store.update_pusher_failing_since(
|
||||||
|
self.app_id,
|
||||||
|
self.pushkey,
|
||||||
|
self.user_id,
|
||||||
|
self.failing_since)
|
||||||
else:
|
else:
|
||||||
logger.warn("Failed to dispatch push for user %s "
|
if not self.failing_since:
|
||||||
"(failing for %dms)."
|
self.failing_since = self.clock.time_msec()
|
||||||
"Trying again in %dms",
|
yield self.store.update_pusher_failing_since(
|
||||||
self.user_id,
|
self.app_id,
|
||||||
self.clock.time_msec() - self.failing_since,
|
self.pushkey,
|
||||||
self.backoff_delay)
|
self.user_id,
|
||||||
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
|
self.failing_since
|
||||||
self.backoff_delay *= 2
|
)
|
||||||
if self.backoff_delay > Pusher.MAX_BACKOFF:
|
|
||||||
self.backoff_delay = Pusher.MAX_BACKOFF
|
if (self.failing_since and
|
||||||
|
self.failing_since <
|
||||||
|
self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
|
||||||
|
# we really only give up so that if the URL gets
|
||||||
|
# fixed, we don't suddenly deliver a load
|
||||||
|
# of old notifications.
|
||||||
|
logger.warn("Giving up on a notification to user %s, "
|
||||||
|
"pushkey %s",
|
||||||
|
self.user_id, self.pushkey)
|
||||||
|
self.backoff_delay = Pusher.INITIAL_BACKOFF
|
||||||
|
self.last_token = chunk['end']
|
||||||
|
yield self.store.update_pusher_last_token(
|
||||||
|
self.app_id,
|
||||||
|
self.pushkey,
|
||||||
|
self.user_id,
|
||||||
|
self.last_token
|
||||||
|
)
|
||||||
|
|
||||||
|
self.failing_since = None
|
||||||
|
yield self.store.update_pusher_failing_since(
|
||||||
|
self.app_id,
|
||||||
|
self.pushkey,
|
||||||
|
self.user_id,
|
||||||
|
self.failing_since
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warn("Failed to dispatch push for user %s "
|
||||||
|
"(failing for %dms)."
|
||||||
|
"Trying again in %dms",
|
||||||
|
self.user_id,
|
||||||
|
self.clock.time_msec() - self.failing_since,
|
||||||
|
self.backoff_delay)
|
||||||
|
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
|
||||||
|
self.backoff_delay *= 2
|
||||||
|
if self.backoff_delay > Pusher.MAX_BACKOFF:
|
||||||
|
self.backoff_delay = Pusher.MAX_BACKOFF
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.alive = False
|
self.alive = False
|
||||||
|
|
Loading…
Reference in a new issue