From 721086a291b7d187725ab3314dc1a4f11bd00f46 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Sep 2019 16:00:00 +0100 Subject: [PATCH] Awful hackery to try to get the fed sender to keep up Basically, if the federation sender starts getting behind, insert some sleeps into the transaction transmission code to give the fed sender a chance to catch up. Might have to experiment a bit with the numbers. --- changelog.d/6126.feature | 1 + synapse/federation/sender/__init__.py | 18 ++++++++++++++++++ .../federation/sender/per_destination_queue.py | 5 +++++ .../federation/sender/transaction_manager.py | 4 ++++ 4 files changed, 28 insertions(+) create mode 100644 changelog.d/6126.feature diff --git a/changelog.d/6126.feature b/changelog.d/6126.feature new file mode 100644 index 0000000000..1207ba6206 --- /dev/null +++ b/changelog.d/6126.feature @@ -0,0 +1 @@ +Group events into larger federation transactions at times of high traffic. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d46f4aaeb1..497485fac2 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -152,9 +152,24 @@ class FederationSender(object): @defer.inlineCallbacks def _process_event_queue_loop(self): + loop_start_time = self.clock.time_msec() try: self._is_processing = True while True: + # if we've been going around this loop for a long time without + # catching up, deprioritise transaction transmission. This should mean + # that events get batched into fewer transactions, which is more + # efficient, and hence give us a chance to catch up + if ( + self.clock.time_msec() - loop_start_time > 60 * 1000 + and not self._transaction_manager.deprioritise_transmission + ): + logger.warning( + "Event processing loop is getting behind: deprioritising " + "transaction transmission" + ) + self._transaction_manager.deprioritise_transmission = True + last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 @@ -251,6 +266,9 @@ class FederationSender(object): finally: self._is_processing = False + if self._transaction_manager.deprioritise_transmission: + logger.info("Event queue caught up: re-prioritising transmission") + self._transaction_manager.deprioritise_transmission = False def _send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index fad980b893..b890aaf840 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -189,6 +189,11 @@ class PerDestinationQueue(object): pending_pdus = [] while True: + if self._transaction_manager.deprioritise_transmission: + # if the event-processing loop has got behind, sleep to give it + # a chance to catch up + yield self._clock.sleep(2) + # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 5b6c79c51a..69679dbf65 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -49,6 +49,10 @@ class TransactionManager(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + # the federation sender sometimes sets this to delay transaction transmission, + # if the sender gets behind. + self.deprioritise_transmission = False + @measure_func("_send_new_transaction") @defer.inlineCallbacks def send_new_transaction(self, destination, pending_pdus, pending_edus):