Awful hackery to try to get the fed sender to keep up

Basically, if the federation sender starts getting behind, insert some sleeps
into the transaction transmission code to give the fed sender a chance to catch
up.

Might have to experiment a bit with the numbers.
This commit is contained in:
Richard van der Hoff 2019-09-27 16:00:00 +01:00
parent 7b55cca011
commit b852a8247d
4 changed files with 28 additions and 0 deletions

1
changelog.d/6126.feature Normal file
View file

@ -0,0 +1 @@
Group events into larger federation transactions at times of high traffic.

View file

@ -152,9 +152,24 @@ class FederationSender(object):
@defer.inlineCallbacks
def _process_event_queue_loop(self):
loop_start_time = self.clock.time_msec()
try:
self._is_processing = True
while True:
# if we've been going around this loop for a long time without
# catching up, deprioritise transaction transmission. This should mean
# that events get batched into fewer transactions, which is more
# efficient, and hence give us a chance to catch up
if (
self.clock.time_msec() - loop_start_time > 60 * 1000
and not self._transaction_manager.deprioritise_transmission
):
logger.warning(
"Event processing loop is getting behind: deprioritising "
"transaction transmission"
)
self._transaction_manager.deprioritise_transmission = True
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
@ -252,6 +267,9 @@ class FederationSender(object):
finally:
self._is_processing = False
if self._transaction_manager.deprioritise_transmission:
logger.info("Event queue caught up: re-prioritising transmission")
self._transaction_manager.deprioritise_transmission = False
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have

View file

@ -189,6 +189,11 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
if self._transaction_manager.deprioritise_transmission:
# if the event-processing loop has got behind, sleep to give it
# a chance to catch up
yield self._clock.sleep(2)
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2

View file

@ -49,6 +49,10 @@ class TransactionManager(object):
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
# the federation sender sometimes sets this to delay transaction transmission,
# if the sender gets behind.
self.deprioritise_transmission = False
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):