mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-17 03:43:52 +01:00
Handle pusher being deleted during processing.
Instead of throwing a StoreError lets break out of processing loop and mark the pusher as stopped.
This commit is contained in:
parent
58af30a6c7
commit
d02e41dcb2
3 changed files with 55 additions and 21 deletions
|
@ -234,13 +234,20 @@ class EmailPusher(object):
|
||||||
return
|
return
|
||||||
|
|
||||||
self.last_stream_ordering = last_stream_ordering
|
self.last_stream_ordering = last_stream_ordering
|
||||||
yield self.store.update_pusher_last_stream_ordering_and_success(
|
pusher_still_exists = (
|
||||||
self.app_id,
|
yield self.store.update_pusher_last_stream_ordering_and_success(
|
||||||
self.email,
|
self.app_id,
|
||||||
self.user_id,
|
self.email,
|
||||||
last_stream_ordering,
|
self.user_id,
|
||||||
self.clock.time_msec(),
|
last_stream_ordering,
|
||||||
|
self.clock.time_msec(),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
if not pusher_still_exists:
|
||||||
|
# The pusher has been deleted while we were processing, so
|
||||||
|
# lets just stop and return.
|
||||||
|
self.on_stop()
|
||||||
|
return
|
||||||
|
|
||||||
def seconds_until(self, ts_msec):
|
def seconds_until(self, ts_msec):
|
||||||
secs = (ts_msec - self.clock.time_msec()) / 1000
|
secs = (ts_msec - self.clock.time_msec()) / 1000
|
||||||
|
|
|
@ -199,13 +199,21 @@ class HttpPusher(object):
|
||||||
http_push_processed_counter.inc()
|
http_push_processed_counter.inc()
|
||||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||||
self.last_stream_ordering = push_action["stream_ordering"]
|
self.last_stream_ordering = push_action["stream_ordering"]
|
||||||
yield self.store.update_pusher_last_stream_ordering_and_success(
|
pusher_still_exists = (
|
||||||
self.app_id,
|
yield self.store.update_pusher_last_stream_ordering_and_success(
|
||||||
self.pushkey,
|
self.app_id,
|
||||||
self.user_id,
|
self.pushkey,
|
||||||
self.last_stream_ordering,
|
self.user_id,
|
||||||
self.clock.time_msec(),
|
self.last_stream_ordering,
|
||||||
|
self.clock.time_msec(),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
if not pusher_still_exists:
|
||||||
|
# The pusher has been deleted while we were processing, so
|
||||||
|
# lets just stop and return.
|
||||||
|
self.on_stop()
|
||||||
|
return
|
||||||
|
|
||||||
if self.failing_since:
|
if self.failing_since:
|
||||||
self.failing_since = None
|
self.failing_since = None
|
||||||
yield self.store.update_pusher_failing_since(
|
yield self.store.update_pusher_failing_since(
|
||||||
|
@ -234,12 +242,17 @@ class HttpPusher(object):
|
||||||
)
|
)
|
||||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||||
self.last_stream_ordering = push_action["stream_ordering"]
|
self.last_stream_ordering = push_action["stream_ordering"]
|
||||||
yield self.store.update_pusher_last_stream_ordering(
|
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
|
||||||
self.app_id,
|
self.app_id,
|
||||||
self.pushkey,
|
self.pushkey,
|
||||||
self.user_id,
|
self.user_id,
|
||||||
self.last_stream_ordering,
|
self.last_stream_ordering,
|
||||||
)
|
)
|
||||||
|
if not pusher_still_exists:
|
||||||
|
# The pusher has been deleted while we were processing, so
|
||||||
|
# lets just stop and return.
|
||||||
|
self.on_stop()
|
||||||
|
return
|
||||||
|
|
||||||
self.failing_since = None
|
self.failing_since = None
|
||||||
yield self.store.update_pusher_failing_since(
|
yield self.store.update_pusher_failing_since(
|
||||||
|
|
|
@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
|
||||||
def update_pusher_last_stream_ordering_and_success(
|
def update_pusher_last_stream_ordering_and_success(
|
||||||
self, app_id, pushkey, user_id, last_stream_ordering, last_success
|
self, app_id, pushkey, user_id, last_stream_ordering, last_success
|
||||||
):
|
):
|
||||||
yield self._simple_update_one(
|
"""Update the last stream ordering position we've processed up to for
|
||||||
"pushers",
|
the given pusher.
|
||||||
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
|
||||||
{
|
Args:
|
||||||
|
app_id (str)
|
||||||
|
pushkey (str)
|
||||||
|
last_stream_ordering (int)
|
||||||
|
last_success (int)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[bool]: Whether the pusher stil exists or not.
|
||||||
|
"""
|
||||||
|
updated = yield self._simple_update(
|
||||||
|
table="pushers",
|
||||||
|
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
||||||
|
updatevalues={
|
||||||
"last_stream_ordering": last_stream_ordering,
|
"last_stream_ordering": last_stream_ordering,
|
||||||
"last_success": last_success,
|
"last_success": last_success,
|
||||||
},
|
},
|
||||||
desc="update_pusher_last_stream_ordering_and_success",
|
desc="update_pusher_last_stream_ordering_and_success",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return bool(updated)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
|
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
|
||||||
yield self._simple_update_one(
|
yield self._simple_update(
|
||||||
"pushers",
|
table="pushers",
|
||||||
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
||||||
{"failing_since": failing_since},
|
updatevalues={"failing_since": failing_since},
|
||||||
desc="update_pusher_failing_since",
|
desc="update_pusher_failing_since",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue