mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 04:43:51 +01:00
Move the check for federated device_messages.
Move the check into _attempt_new_transaction. Only delete messages if there were messages to delete.
This commit is contained in:
parent
31a07d2335
commit
cb98ac261b
1 changed files with 15 additions and 11 deletions
|
@ -177,6 +177,12 @@ class TransactionQueue(object):
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||||
|
|
||||||
|
device_message_edus, device_stream_id = (
|
||||||
|
yield self._get_new_device_messages(destination)
|
||||||
|
)
|
||||||
|
|
||||||
|
pending_edus.extend(device_message_edus)
|
||||||
|
|
||||||
if pending_pdus:
|
if pending_pdus:
|
||||||
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||||
destination, len(pending_pdus))
|
destination, len(pending_pdus))
|
||||||
|
@ -186,7 +192,9 @@ class TransactionQueue(object):
|
||||||
return
|
return
|
||||||
|
|
||||||
yield self._send_new_transaction(
|
yield self._send_new_transaction(
|
||||||
destination, pending_pdus, pending_edus, pending_failures
|
destination, pending_pdus, pending_edus, pending_failures,
|
||||||
|
device_stream_id,
|
||||||
|
should_delete_from_device_stream=bool(device_message_edus)
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -210,7 +218,8 @@ class TransactionQueue(object):
|
||||||
@measure_func("_send_new_transaction")
|
@measure_func("_send_new_transaction")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
|
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
|
||||||
pending_failures):
|
pending_failures, device_stream_id,
|
||||||
|
should_delete_from_device_stream):
|
||||||
|
|
||||||
# Sort based on the order field
|
# Sort based on the order field
|
||||||
pending_pdus.sort(key=lambda t: t[1])
|
pending_pdus.sort(key=lambda t: t[1])
|
||||||
|
@ -231,12 +240,6 @@ class TransactionQueue(object):
|
||||||
self.store,
|
self.store,
|
||||||
)
|
)
|
||||||
|
|
||||||
device_message_edus, device_stream_id = (
|
|
||||||
yield self._get_new_device_messages(destination)
|
|
||||||
)
|
|
||||||
|
|
||||||
edus.extend(device_message_edus)
|
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"TX [%s] {%s} Attempting new transaction"
|
"TX [%s] {%s} Attempting new transaction"
|
||||||
" (pdus: %d, edus: %d, failures: %d)",
|
" (pdus: %d, edus: %d, failures: %d)",
|
||||||
|
@ -327,9 +330,10 @@ class TransactionQueue(object):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Remove the acknowledged device messages from the database
|
# Remove the acknowledged device messages from the database
|
||||||
yield self.store.delete_device_msgs_for_remote(
|
if should_delete_from_device_stream:
|
||||||
destination, device_stream_id
|
yield self.store.delete_device_msgs_for_remote(
|
||||||
)
|
destination, device_stream_id
|
||||||
|
)
|
||||||
self.last_device_stream_id_by_dest[destination] = device_stream_id
|
self.last_device_stream_id_by_dest[destination] = device_stream_id
|
||||||
except NotRetryingDestination:
|
except NotRetryingDestination:
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
Loading…
Reference in a new issue