forked from MirrorHub/synapse
add a write-through cache on the retry schedule
This commit is contained in:
parent
9c43b258ec
commit
0d3fa1ac6e
1 changed files with 15 additions and 4 deletions
|
@ -26,6 +26,9 @@ class TransactionStore(SQLBaseStore):
|
||||||
"""A collection of queries for handling PDUs.
|
"""A collection of queries for handling PDUs.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# a write-through cache of DestinationsTable.EntryType indexed by destination string
|
||||||
|
destination_retry_cache = {}
|
||||||
|
|
||||||
def get_received_txn_response(self, transaction_id, origin):
|
def get_received_txn_response(self, transaction_id, origin):
|
||||||
"""For an incoming transaction from a given origin, check if we have
|
"""For an incoming transaction from a given origin, check if we have
|
||||||
already responded to it. If so, return the response code and response
|
already responded to it. If so, return the response code and response
|
||||||
|
@ -213,10 +216,11 @@ class TransactionStore(SQLBaseStore):
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
None if not retrying
|
None if not retrying
|
||||||
tuple: (retry_last_ts, retry_interval)
|
Otherwise a DestinationsTable.EntryType for the retry scheme
|
||||||
retry_ts: time of last retry attempt in unix epoch ms
|
|
||||||
retry_interval: how long until next retry in ms
|
|
||||||
"""
|
"""
|
||||||
|
if self.destination_retry_cache[destination]:
|
||||||
|
return self.destination_retry_cache[destination]
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"get_destination_retry_timings",
|
"get_destination_retry_timings",
|
||||||
self._get_destination_retry_timings, destination)
|
self._get_destination_retry_timings, destination)
|
||||||
|
@ -225,7 +229,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
query = DestinationsTable.select_statement("destination = ?")
|
query = DestinationsTable.select_statement("destination = ?")
|
||||||
txn.execute(query, (destination,))
|
txn.execute(query, (destination,))
|
||||||
result = DestinationsTable.decode_single_result(txn.fetchone())
|
result = DestinationsTable.decode_single_result(txn.fetchone())
|
||||||
if result and result[0] > 0:
|
if result and result.retry_last_ts > 0:
|
||||||
return result
|
return result
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
@ -239,6 +243,12 @@ class TransactionStore(SQLBaseStore):
|
||||||
retry_last_ts (int) - time of last retry attempt in unix epoch ms
|
retry_last_ts (int) - time of last retry attempt in unix epoch ms
|
||||||
retry_interval (int) - how long until next retry in ms
|
retry_interval (int) - how long until next retry in ms
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
self.destination_retry_cache[destination] = (
|
||||||
|
DestinationsTable.EntryType(destination, retry_last_ts, retry_interval)
|
||||||
|
)
|
||||||
|
|
||||||
|
# xxx: we could chose to not bother persisting this if our cache things this is a NOOP
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"set_destination_retry_timings",
|
"set_destination_retry_timings",
|
||||||
self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
|
self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
|
||||||
|
@ -260,6 +270,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
list: A list of `DestinationsTable.EntryType`
|
list: A list of `DestinationsTable.EntryType`
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"get_destinations_needing_retry",
|
"get_destinations_needing_retry",
|
||||||
self._get_destinations_needing_retry
|
self._get_destinations_needing_retry
|
||||||
|
|
Loading…
Reference in a new issue