try this?
This commit is contained in:
parent
9abaa52ac1
commit
40142497b7
|
@ -178,6 +178,7 @@ class PerDestinationQueue(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _transaction_transmission_loop(self):
|
||||
acquired_lock = None
|
||||
pending_pdus = []
|
||||
try:
|
||||
self.transmission_loop_running = True
|
||||
|
@ -267,9 +268,12 @@ class PerDestinationQueue(object):
|
|||
|
||||
# END CRITICAL SECTION
|
||||
|
||||
acquired_lock = yield self._transaction_manager.limiter.acquire()
|
||||
success = yield self._transaction_manager.send_new_transaction(
|
||||
self._destination, pending_pdus, pending_edus
|
||||
)
|
||||
acquired_lock = acquired_lock.release()
|
||||
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
|
@ -331,6 +335,8 @@ class PerDestinationQueue(object):
|
|||
finally:
|
||||
# We want to be *very* sure we clear this after we stop processing
|
||||
self.transmission_loop_running = False
|
||||
if acquired_lock:
|
||||
acquired_lock.release()
|
||||
|
||||
def _get_rr_edus(self, force_flush):
|
||||
if not self._pending_rrs:
|
||||
|
|
|
@ -40,6 +40,8 @@ class TransactionManager(object):
|
|||
# HACK to get unique tx id
|
||||
self._next_txn_id = int(self.clock.time_msec())
|
||||
|
||||
self.limiter = defer.DeferredSemaphore(50)
|
||||
|
||||
@measure_func("_send_new_transaction")
|
||||
@defer.inlineCallbacks
|
||||
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||
|
|
Loading…
Reference in a new issue