forked from MirrorHub/synapse
sanity-check the is_processing flag
... and rename it, for even more sanity
This commit is contained in:
parent
abd9914683
commit
e564306e31
2 changed files with 14 additions and 8 deletions
|
@ -70,7 +70,7 @@ class EmailPusher(object):
|
||||||
# See httppusher
|
# See httppusher
|
||||||
self.max_stream_ordering = None
|
self.max_stream_ordering = None
|
||||||
|
|
||||||
self.processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
def on_started(self):
|
def on_started(self):
|
||||||
if self.mailer is not None:
|
if self.mailer is not None:
|
||||||
|
@ -99,15 +99,18 @@ class EmailPusher(object):
|
||||||
self._start_processing()
|
self._start_processing()
|
||||||
|
|
||||||
def _start_processing(self):
|
def _start_processing(self):
|
||||||
if self.processing:
|
if self._is_processing:
|
||||||
return
|
return
|
||||||
|
|
||||||
run_as_background_process("emailpush.process", self._process)
|
run_as_background_process("emailpush.process", self._process)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _process(self):
|
def _process(self):
|
||||||
|
# we should never get here if we are already processing
|
||||||
|
assert not self._is_processing
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.processing = True
|
self._is_processing = True
|
||||||
|
|
||||||
if self.throttle_params is None:
|
if self.throttle_params is None:
|
||||||
# this is our first loop: load up the throttle params
|
# this is our first loop: load up the throttle params
|
||||||
|
@ -126,7 +129,7 @@ class EmailPusher(object):
|
||||||
if self.max_stream_ordering == starting_max_ordering:
|
if self.max_stream_ordering == starting_max_ordering:
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
self.processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _unsafe_process(self):
|
def _unsafe_process(self):
|
||||||
|
|
|
@ -60,7 +60,7 @@ class HttpPusher(object):
|
||||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||||
self.failing_since = pusherdict['failing_since']
|
self.failing_since = pusherdict['failing_since']
|
||||||
self.timed_call = None
|
self.timed_call = None
|
||||||
self.processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
# This is the highest stream ordering we know it's safe to process.
|
# This is the highest stream ordering we know it's safe to process.
|
||||||
# When new events arrive, we'll be given a window of new events: we
|
# When new events arrive, we'll be given a window of new events: we
|
||||||
|
@ -122,15 +122,18 @@ class HttpPusher(object):
|
||||||
self.timed_call = None
|
self.timed_call = None
|
||||||
|
|
||||||
def _start_processing(self):
|
def _start_processing(self):
|
||||||
if self.processing:
|
if self._is_processing:
|
||||||
return
|
return
|
||||||
|
|
||||||
run_as_background_process("httppush.process", self._process)
|
run_as_background_process("httppush.process", self._process)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _process(self):
|
def _process(self):
|
||||||
|
# we should never get here if we are already processing
|
||||||
|
assert not self._is_processing
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.processing = True
|
self._is_processing = True
|
||||||
# if the max ordering changes while we're running _unsafe_process,
|
# if the max ordering changes while we're running _unsafe_process,
|
||||||
# call it again, and so on until we've caught up.
|
# call it again, and so on until we've caught up.
|
||||||
while True:
|
while True:
|
||||||
|
@ -142,7 +145,7 @@ class HttpPusher(object):
|
||||||
if self.max_stream_ordering == starting_max_ordering:
|
if self.max_stream_ordering == starting_max_ordering:
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
self.processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _unsafe_process(self):
|
def _unsafe_process(self):
|
||||||
|
|
Loading…
Reference in a new issue