forked from MirrorHub/synapse
Merge pull request #1930 from matrix-org/rav/fix_txnq_race
Fix a race in transaction queue
This commit is contained in:
commit
c927d6de9b
1 changed files with 21 additions and 9 deletions
|
@ -303,18 +303,10 @@ class TransactionQueue(object):
|
||||||
try:
|
try:
|
||||||
self.pending_transactions[destination] = 1
|
self.pending_transactions[destination] = 1
|
||||||
|
|
||||||
|
# XXX: what's this for?
|
||||||
yield run_on_reactor()
|
yield run_on_reactor()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
|
||||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
|
||||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
|
||||||
|
|
||||||
pending_edus.extend(
|
|
||||||
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
|
||||||
)
|
|
||||||
|
|
||||||
limiter = yield get_retry_limiter(
|
limiter = yield get_retry_limiter(
|
||||||
destination,
|
destination,
|
||||||
self.clock,
|
self.clock,
|
||||||
|
@ -326,6 +318,24 @@ class TransactionQueue(object):
|
||||||
yield self._get_new_device_messages(destination)
|
yield self._get_new_device_messages(destination)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# BEGIN CRITICAL SECTION
|
||||||
|
#
|
||||||
|
# In order to avoid a race condition, we need to make sure that
|
||||||
|
# the following code (from popping the queues up to the point
|
||||||
|
# where we decide if we actually have any pending messages) is
|
||||||
|
# atomic - otherwise new PDUs or EDUs might arrive in the
|
||||||
|
# meantime, but not get sent because we hold the
|
||||||
|
# pending_transactions flag.
|
||||||
|
|
||||||
|
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||||
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
|
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||||
|
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||||
|
|
||||||
|
pending_edus.extend(
|
||||||
|
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
||||||
|
)
|
||||||
|
|
||||||
pending_edus.extend(device_message_edus)
|
pending_edus.extend(device_message_edus)
|
||||||
if pending_presence:
|
if pending_presence:
|
||||||
pending_edus.append(
|
pending_edus.append(
|
||||||
|
@ -355,6 +365,8 @@ class TransactionQueue(object):
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# END CRITICAL SECTION
|
||||||
|
|
||||||
success = yield self._send_new_transaction(
|
success = yield self._send_new_transaction(
|
||||||
destination, pending_pdus, pending_edus, pending_failures,
|
destination, pending_pdus, pending_edus, pending_failures,
|
||||||
limiter=limiter,
|
limiter=limiter,
|
||||||
|
|
Loading…
Reference in a new issue