diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 88184caec..c4c6667b6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -685,6 +685,7 @@ class _TransactionQueue(object): self.transport_layer = transport_layer self._clock = hs.get_clock() + self.store = hs.get_datastore() # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -775,11 +776,18 @@ class _TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - dropping transaction for now") - return + + (retry_last_ts, retry_interval) = (0, 0) + retry_timings = yield self.store.get_destination_retry_timings(destination) + if retry_timings: + (retry_last_ts, retry_interval) = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination) + return + else: + logger.info("TX [%s] is ready for retry", destination) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending request @@ -866,7 +874,7 @@ class _TransactionQueue(object): if code == 200: if retry_last_ts: # this host is alive! reset retry schedule - self.store.set_destination_retry_timings(destination, 0, 0) + yield self.store.set_destination_retry_timings(destination, 0, 0) deferred.callback(None) else: self.start_retrying(destination, retry_interval) @@ -899,12 +907,13 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + @defer.inlineCallbacks def start_retrying(self, destination, retry_interval): # track that this destination is having problems and we should # give it a chance to recover before trying it again if retry_interval: retry_interval *= 2 else: - retry_interval = 2 # try again at first after 2 seconds - self.store.set_destination_retry_timings(destination, + retry_interval = 2000 # try again at first after 2 seconds + yield self.store.set_destination_retry_timings(destination, int(self._clock.time_msec()), retry_interval) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7a79e2d11..cfb502977 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -211,7 +211,8 @@ class FederationHandler(BaseHandler): # if we're receiving valid events from an origin, # it's probably a good idea to mark it as not in retry-state # for sending (although this is a bit of a leap) - if ((self.store.get_destination_retry_timings(origin))[0]): + retry_timings = yield self.store.get_destination_retry_timings(origin) + if (retry_timings and retry_timings.retry_last_ts): self.store.set_destination_retry_timings(origin, 0, 0) room = yield self.store.get_room(event.room_id) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index fa51766e0..237b02445 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table from collections import namedtuple +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -218,8 +220,8 @@ class TransactionStore(SQLBaseStore): None if not retrying Otherwise a DestinationsTable.EntryType for the retry scheme """ - if self.destination_retry_cache[destination]: - return self.destination_retry_cache[destination] + if destination in self.destination_retry_cache: + return defer.succeed(self.destination_retry_cache[destination]) return self.runInteraction( "get_destination_retry_timings", @@ -228,11 +230,13 @@ class TransactionStore(SQLBaseStore): def _get_destination_retry_timings(cls, txn, destination): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) - result = DestinationsTable.decode_single_result(txn.fetchone()) - if result and result.retry_last_ts > 0: - return result - else: - return None + result = txn.fetchall() + if result: + result = DestinationsTable.decode_single_result(result) + if result.retry_last_ts > 0: + return result + else: + return None def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. @@ -257,12 +261,11 @@ class TransactionStore(SQLBaseStore): query = ( "INSERT OR REPLACE INTO %s " - "(retry_last_ts, retry_interval) " - "VALUES (?, ?) " - "WHERE destination = ?" + "(destination, retry_last_ts, retry_interval) " + "VALUES (?, ?, ?) " ) % DestinationsTable.table_name - txn.execute(query, (retry_last_ts, retry_interval, destination)) + txn.execute(query, (destination, retry_last_ts, retry_interval)) def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction.