0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 14:43:51 +01:00

Merge pull request #22 from matrix-org/federation_retries

Federation retries
This commit is contained in:
Erik Johnston 2014-12-10 10:35:57 +00:00
commit 6497caee7c
14 changed files with 277 additions and 24 deletions

View file

@ -334,7 +334,7 @@ class ReplicationLayer(object):
defer.returnValue(response) defer.returnValue(response)
return return
logger.debug("[%s] Transacition is new", transaction.transaction_id) logger.debug("[%s] Transaction is new", transaction.transaction_id)
with PreserveLoggingContext(): with PreserveLoggingContext():
dl = [] dl = []
@ -685,6 +685,7 @@ class _TransactionQueue(object):
self.transport_layer = transport_layer self.transport_layer = transport_layer
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.store = hs.get_datastore()
# Is a mapping from destinations -> deferreds. Used to keep track # Is a mapping from destinations -> deferreds. Used to keep track
# of which destinations have transactions in flight and when they are # of which destinations have transactions in flight and when they are
@ -728,8 +729,14 @@ class _TransactionQueue(object):
(pdu, deferred, order) (pdu, deferred, order)
) )
def eb(failure):
if not deferred.called:
deferred.errback(failure)
else:
logger.warn("Failed to send pdu", failure)
with PreserveLoggingContext(): with PreserveLoggingContext():
self._attempt_new_transaction(destination) self._attempt_new_transaction(destination).addErrback(eb)
deferreds.append(deferred) deferreds.append(deferred)
@ -739,6 +746,9 @@ class _TransactionQueue(object):
def enqueue_edu(self, edu): def enqueue_edu(self, edu):
destination = edu.destination destination = edu.destination
if destination == self.server_name:
return
deferred = defer.Deferred() deferred = defer.Deferred()
self.pending_edus_by_dest.setdefault(destination, []).append( self.pending_edus_by_dest.setdefault(destination, []).append(
(edu, deferred) (edu, deferred)
@ -748,7 +758,7 @@ class _TransactionQueue(object):
if not deferred.called: if not deferred.called:
deferred.errback(failure) deferred.errback(failure)
else: else:
logger.exception("Failed to send edu", failure) logger.warn("Failed to send edu", failure)
with PreserveLoggingContext(): with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb) self._attempt_new_transaction(destination).addErrback(eb)
@ -770,10 +780,33 @@ class _TransactionQueue(object):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _attempt_new_transaction(self, destination): def _attempt_new_transaction(self, destination):
(retry_last_ts, retry_interval) = (0, 0)
retry_timings = yield self.store.get_destination_retry_timings(
destination
)
if retry_timings:
(retry_last_ts, retry_interval) = (
retry_timings.retry_last_ts, retry_timings.retry_interval
)
if retry_last_ts + retry_interval > int(self._clock.time_msec()):
logger.info(
"TX [%s] not ready for retry yet - "
"dropping transaction for now",
destination,
)
return
else:
logger.info("TX [%s] is ready for retry", destination)
if destination in self.pending_transactions: if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
return return
# list of (pending_pdu, deferred, order) # list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, [])
@ -781,7 +814,14 @@ class _TransactionQueue(object):
if not pending_pdus and not pending_edus and not pending_failures: if not pending_pdus and not pending_edus and not pending_failures:
return return
logger.debug("TX [%s] Attempting new transaction", destination) logger.debug(
"TX [%s] Attempting new transaction "
"(pdus: %d, edus: %d, failures: %d)",
destination,
len(pending_pdus),
len(pending_edus),
len(pending_failures)
)
# Sort based on the order field # Sort based on the order field
pending_pdus.sort(key=lambda t: t[2]) pending_pdus.sort(key=lambda t: t[2])
@ -814,7 +854,11 @@ class _TransactionQueue(object):
yield self.transaction_actions.prepare_to_send(transaction) yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination) logger.debug("TX [%s] Persisted transaction", destination)
logger.debug("TX [%s] Sending transaction...", destination) logger.info(
"TX [%s] Sending transaction [%s]",
destination,
transaction.transaction_id,
)
# Actually send the transaction # Actually send the transaction
@ -835,6 +879,8 @@ class _TransactionQueue(object):
transaction, json_data_cb transaction, json_data_cb
) )
logger.info("TX [%s] got %d response", destination, code)
logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination) logger.debug("TX [%s] Marking as delivered...", destination)
@ -847,8 +893,14 @@ class _TransactionQueue(object):
for deferred in deferreds: for deferred in deferreds:
if code == 200: if code == 200:
if retry_last_ts:
# this host is alive! reset retry schedule
yield self.store.set_destination_retry_timings(
destination, 0, 0
)
deferred.callback(None) deferred.callback(None)
else: else:
self.set_retrying(destination, retry_interval)
deferred.errback(RuntimeError("Got status %d" % code)) deferred.errback(RuntimeError("Got status %d" % code))
# Ensures we don't continue until all callbacks on that # Ensures we don't continue until all callbacks on that
@ -861,11 +913,15 @@ class _TransactionQueue(object):
logger.debug("TX [%s] Yielded to callbacks", destination) logger.debug("TX [%s] Yielded to callbacks", destination)
except Exception as e: except Exception as e:
logger.error("TX Problem in _attempt_transaction")
# We capture this here as there as nothing actually listens # We capture this here as there as nothing actually listens
# for this finishing functions deferred. # for this finishing functions deferred.
logger.exception(e) logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
self.set_retrying(destination, retry_interval)
for deferred in deferreds: for deferred in deferreds:
if not deferred.called: if not deferred.called:
@ -877,3 +933,22 @@ class _TransactionQueue(object):
# Check to see if there is anything else to send. # Check to see if there is anything else to send.
self._attempt_new_transaction(destination) self._attempt_new_transaction(destination)
@defer.inlineCallbacks
def set_retrying(self, destination, retry_interval):
# track that this destination is having problems and we should
# give it a chance to recover before trying it again
if retry_interval:
retry_interval *= 2
# plateau at hourly retries for now
if retry_interval >= 60 * 60 * 1000:
retry_interval = 60 * 60 * 1000
else:
retry_interval = 2000 # try again at first after 2 seconds
yield self.store.set_destination_retry_timings(
destination,
int(self._clock.time_msec()),
retry_interval
)

View file

@ -155,7 +155,7 @@ class TransportLayer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def send_transaction(self, transaction, json_data_callback=None): def send_transaction(self, transaction, json_data_callback=None):
""" Sends the given Transaction to it's destination """ Sends the given Transaction to its destination
Args: Args:
transaction (Transaction) transaction (Transaction)

View file

@ -207,6 +207,13 @@ class FederationHandler(BaseHandler):
e.msg, e.msg,
affected=event.event_id, affected=event.event_id,
) )
# if we're receiving valid events from an origin,
# it's probably a good idea to mark it as not in retry-state
# for sending (although this is a bit of a leap)
retry_timings = yield self.store.get_destination_retry_timings(origin)
if (retry_timings and retry_timings.retry_last_ts):
self.store.set_destination_retry_timings(origin, 0, 0)
room = yield self.store.get_room(event.room_id) room = yield self.store.get_room(event.room_id)

View file

@ -89,8 +89,8 @@ class MatrixFederationHttpClient(object):
("", "", path_bytes, param_bytes, query_bytes, "",) ("", "", path_bytes, param_bytes, query_bytes, "",)
) )
logger.debug("Sending request to %s: %s %s", logger.info("Sending request to %s: %s %s",
destination, method, url_bytes) destination, method, url_bytes)
logger.debug( logger.debug(
"Types: %s", "Types: %s",
@ -101,6 +101,8 @@ class MatrixFederationHttpClient(object):
] ]
) )
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
retries_left = 5 retries_left = 5
endpoint = self._getEndpoint(reactor, destination) endpoint = self._getEndpoint(reactor, destination)
@ -127,11 +129,20 @@ class MatrixFederationHttpClient(object):
break break
except Exception as e: except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError): if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn("DNS Lookup failed to %s with %s", destination, logger.warn(
e) "DNS Lookup failed to %s with %s",
destination,
e
)
raise SynapseError(400, "Domain specified not found.") raise SynapseError(400, "Domain specified not found.")
logger.exception("Got error in _create_request") logger.warn(
"Sending request failed to %s: %s %s : %s",
destination,
method,
url_bytes,
e
)
_print_ex(e) _print_ex(e)
if retries_left: if retries_left:
@ -140,15 +151,21 @@ class MatrixFederationHttpClient(object):
else: else:
raise raise
logger.info(
"Received response %d %s for %s: %s %s",
response.code,
response.phrase,
destination,
method,
url_bytes
)
if 200 <= response.code < 300: if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent? # We need to update the transactions table to say it was sent?
pass pass
else: else:
# :'( # :'(
# Update transactions table? # Update transactions table?
logger.error(
"Got response %d %s", response.code, response.phrase
)
raise CodeMessageException( raise CodeMessageException(
response.code, response.phrase response.code, response.phrase
) )
@ -284,7 +301,7 @@ def _print_ex(e):
for ex in e.reasons: for ex in e.reasons:
_print_ex(ex) _print_ex(ex)
else: else:
logger.exception(e) logger.warn(e)
class _JsonProducer(object): class _JsonProducer(object):

View file

@ -19,7 +19,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# FIXME: elsewhere we use FooStore to indicate something in the storage layer...
class HttpTransactionStore(object): class HttpTransactionStore(object):
def __init__(self): def __init__(self):

View file

@ -67,7 +67,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to # Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts. # database schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 8 SCHEMA_VERSION = 9
class _RollbackButIsFineException(Exception): class _RollbackButIsFineException(Exception):

View file

@ -650,7 +650,7 @@ class JoinHelper(object):
to dump the results into. to dump the results into.
Attributes: Attributes:
taples (list): List of `Table` classes tables (list): List of `Table` classes
EntryType (type) EntryType (type)
""" """

View file

@ -0,0 +1,23 @@
/* Copyright 2014 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- To track destination health
CREATE TABLE IF NOT EXISTS destinations(
destination TEXT PRIMARY KEY,
retry_last_ts INTEGER,
retry_interval INTEGER
);
PRAGMA user_version = 9;

View file

@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
-- To track destination health
CREATE TABLE IF NOT EXISTS destinations(
destination TEXT PRIMARY KEY,
retry_last_ts INTEGER,
retry_interval INTEGER
);

View file

@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
from collections import namedtuple from collections import namedtuple
from twisted.internet import defer
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs. """A collection of queries for handling PDUs.
""" """
# a write-through cache of DestinationsTable.EntryType indexed by
# destination string
destination_retry_cache = {}
def get_received_txn_response(self, transaction_id, origin): def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have """For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response already responded to it. If so, return the response code and response
@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination, def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts): origin_server_ts):
# First we find out what the prev_txs should be. # First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time, # Since we know that we are only sending one transaction at a time,
# we can simply take the last one. # we can simply take the last one.
query = "%s ORDER BY id DESC LIMIT 1" % ( query = "%s ORDER BY id DESC LIMIT 1" % (
@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall()) return ReceivedTransactionsTable.decode_results(txn.fetchall())
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
Args:
destination (str)
Returns:
None if not retrying
Otherwise a DestinationsTable.EntryType for the retry scheme
"""
if destination in self.destination_retry_cache:
return defer.succeed(self.destination_retry_cache[destination])
return self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)
def _get_destination_retry_timings(cls, txn, destination):
query = DestinationsTable.select_statement("destination = ?")
txn.execute(query, (destination,))
result = txn.fetchall()
if result:
result = DestinationsTable.decode_single_result(result)
if result.retry_last_ts > 0:
return result
else:
return None
def set_destination_retry_timings(self, destination,
retry_last_ts, retry_interval):
"""Sets the current retry timings for a given destination.
Both timings should be zero if retrying is no longer occuring.
Args:
destination (str)
retry_last_ts (int) - time of last retry attempt in unix epoch ms
retry_interval (int) - how long until next retry in ms
"""
self.destination_retry_cache[destination] = (
DestinationsTable.EntryType(
destination,
retry_last_ts,
retry_interval
)
)
# XXX: we could chose to not bother persisting this if our cache thinks
# this is a NOOP
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
destination,
retry_last_ts,
retry_interval,
)
def _set_destination_retry_timings(cls, txn, destination,
retry_last_ts, retry_interval):
query = (
"INSERT OR REPLACE INTO %s "
"(destination, retry_last_ts, retry_interval) "
"VALUES (?, ?, ?) "
) % DestinationsTable.table_name
txn.execute(query, (destination, retry_last_ts, retry_interval))
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.
Returns:
list: A list of `DestinationsTable.EntryType`
"""
return self.runInteraction(
"get_destinations_needing_retry",
self._get_destinations_needing_retry
)
def _get_destinations_needing_retry(cls, txn):
where = "retry_last_ts > 0 and retry_next_ts < now()"
query = DestinationsTable.select_statement(where)
txn.execute(query)
return DestinationsTable.decode_results(txn.fetchall())
class ReceivedTransactionsTable(Table): class ReceivedTransactionsTable(Table):
table_name = "received_transactions" table_name = "received_transactions"
@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
] ]
EntryType = namedtuple("TransactionsToPduEntry", fields) EntryType = namedtuple("TransactionsToPduEntry", fields)
class DestinationsTable(Table):
table_name = "destinations"
fields = [
"destination",
"retry_last_ts",
"retry_interval",
]
EntryType = namedtuple("DestinationsEntry", fields)

View file

@ -25,6 +25,7 @@ from synapse.server import HomeServer
from synapse.federation import initialize_http_replication from synapse.federation import initialize_http_replication
from synapse.api.events import SynapseEvent from synapse.api.events import SynapseEvent
from synapse.storage.transactions import DestinationsTable
def make_pdu(prev_pdus=[], **kwargs): def make_pdu(prev_pdus=[], **kwargs):
"""Provide some default fields for making a PduTuple.""" """Provide some default fields for making a PduTuple."""
@ -55,10 +56,14 @@ class FederationTestCase(unittest.TestCase):
"delivered_txn", "delivered_txn",
"get_received_txn_response", "get_received_txn_response",
"set_received_txn_response", "set_received_txn_response",
"get_destination_retry_timings",
]) ])
self.mock_persistence.get_received_txn_response.return_value = ( self.mock_persistence.get_received_txn_response.return_value = (
defer.succeed(None) defer.succeed(None)
) )
self.mock_persistence.get_destination_retry_timings.return_value = (
defer.succeed(DestinationsTable.EntryType("", 0, 0))
)
self.mock_config = Mock() self.mock_config = Mock()
self.mock_config.signing_key = [MockKey()] self.mock_config.signing_key = [MockKey()]
self.clock = MockClock() self.clock = MockClock()

View file

@ -53,6 +53,8 @@ class FederationTestCase(unittest.TestCase):
"persist_event", "persist_event",
"store_room", "store_room",
"get_room", "get_room",
"get_destination_retry_timings",
"set_destination_retry_timings",
]), ]),
resource_for_federation=NonCallableMock(), resource_for_federation=NonCallableMock(),
http_client=NonCallableMock(spec_set=[]), http_client=NonCallableMock(spec_set=[]),

View file

@ -30,7 +30,7 @@ from synapse.api.constants import PresenceState
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.handlers.presence import PresenceHandler, UserPresenceCache from synapse.handlers.presence import PresenceHandler, UserPresenceCache
from synapse.streams.config import SourcePaginationConfig from synapse.streams.config import SourcePaginationConfig
from synapse.storage.transactions import DestinationsTable
OFFLINE = PresenceState.OFFLINE OFFLINE = PresenceState.OFFLINE
UNAVAILABLE = PresenceState.UNAVAILABLE UNAVAILABLE = PresenceState.UNAVAILABLE
@ -528,6 +528,7 @@ class PresencePushTestCase(unittest.TestCase):
"delivered_txn", "delivered_txn",
"get_received_txn_response", "get_received_txn_response",
"set_received_txn_response", "set_received_txn_response",
"get_destination_retry_timings",
]), ]),
handlers=None, handlers=None,
resource_for_client=Mock(), resource_for_client=Mock(),
@ -539,6 +540,9 @@ class PresencePushTestCase(unittest.TestCase):
hs.handlers = JustPresenceHandlers(hs) hs.handlers = JustPresenceHandlers(hs)
self.datastore = hs.get_datastore() self.datastore = hs.get_datastore()
self.datastore.get_destination_retry_timings.return_value = (
defer.succeed(DestinationsTable.EntryType("", 0, 0))
)
def get_received_txn_response(*args): def get_received_txn_response(*args):
return defer.succeed(None) return defer.succeed(None)
@ -1037,6 +1041,7 @@ class PresencePollingTestCase(unittest.TestCase):
"delivered_txn", "delivered_txn",
"get_received_txn_response", "get_received_txn_response",
"set_received_txn_response", "set_received_txn_response",
"get_destination_retry_timings",
]), ]),
handlers=None, handlers=None,
resource_for_client=Mock(), resource_for_client=Mock(),
@ -1048,6 +1053,9 @@ class PresencePollingTestCase(unittest.TestCase):
hs.handlers = JustPresenceHandlers(hs) hs.handlers = JustPresenceHandlers(hs)
self.datastore = hs.get_datastore() self.datastore = hs.get_datastore()
self.datastore.get_destination_retry_timings.return_value = (
defer.succeed(DestinationsTable.EntryType("", 0, 0))
)
def get_received_txn_response(*args): def get_received_txn_response(*args):
return defer.succeed(None) return defer.succeed(None)

View file

@ -25,6 +25,8 @@ from ..utils import MockHttpResource, MockClock, DeferredMockCallable, MockKey
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.handlers.typing import TypingNotificationHandler from synapse.handlers.typing import TypingNotificationHandler
from synapse.storage.transactions import DestinationsTable
def _expect_edu(destination, edu_type, content, origin="test"): def _expect_edu(destination, edu_type, content, origin="test"):
return { return {
@ -72,6 +74,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
"delivered_txn", "delivered_txn",
"get_received_txn_response", "get_received_txn_response",
"set_received_txn_response", "set_received_txn_response",
"get_destination_retry_timings",
]), ]),
handlers=None, handlers=None,
resource_for_client=Mock(), resource_for_client=Mock(),
@ -89,6 +92,9 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.handler.push_update_to_clients = self.mock_update_client self.handler.push_update_to_clients = self.mock_update_client
self.datastore = hs.get_datastore() self.datastore = hs.get_datastore()
self.datastore.get_destination_retry_timings.return_value = (
defer.succeed(DestinationsTable.EntryType("", 0, 0))
)
def get_received_txn_response(*args): def get_received_txn_response(*args):
return defer.succeed(None) return defer.succeed(None)