forked from MirrorHub/synapse
Merge pull request #3933 from matrix-org/erikj/destination_retry_cache
Add a five minute cache to get_destination_retry_timings
This commit is contained in:
commit
8f5c23d0cd
3 changed files with 45 additions and 3 deletions
1
changelog.d/3933.misc
Normal file
1
changelog.d/3933.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add a cache to get_destination_retry_timings
|
|
@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
|
|
||||||
from ._base import SQLBaseStore, db_to_json
|
from ._base import SQLBaseStore, db_to_json
|
||||||
|
|
||||||
|
@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
SENTINEL = object()
|
||||||
|
|
||||||
|
|
||||||
class TransactionStore(SQLBaseStore):
|
class TransactionStore(SQLBaseStore):
|
||||||
"""A collection of queries for handling PDUs.
|
"""A collection of queries for handling PDUs.
|
||||||
|
@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore):
|
||||||
|
|
||||||
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
|
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
|
||||||
|
|
||||||
|
self._destination_retry_cache = ExpiringCache(
|
||||||
|
cache_name="get_destination_retry_timings",
|
||||||
|
clock=self._clock,
|
||||||
|
expiry_ms=5 * 60 * 1000,
|
||||||
|
)
|
||||||
|
|
||||||
def get_received_txn_response(self, transaction_id, origin):
|
def get_received_txn_response(self, transaction_id, origin):
|
||||||
"""For an incoming transaction from a given origin, check if we have
|
"""For an incoming transaction from a given origin, check if we have
|
||||||
already responded to it. If so, return the response code and response
|
already responded to it. If so, return the response code and response
|
||||||
|
@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def get_destination_retry_timings(self, destination):
|
def get_destination_retry_timings(self, destination):
|
||||||
"""Gets the current retry timings (if any) for a given destination.
|
"""Gets the current retry timings (if any) for a given destination.
|
||||||
|
|
||||||
|
@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore):
|
||||||
None if not retrying
|
None if not retrying
|
||||||
Otherwise a dict for the retry scheme
|
Otherwise a dict for the retry scheme
|
||||||
"""
|
"""
|
||||||
return self.runInteraction(
|
|
||||||
|
result = self._destination_retry_cache.get(destination, SENTINEL)
|
||||||
|
if result is not SENTINEL:
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
result = yield self.runInteraction(
|
||||||
"get_destination_retry_timings",
|
"get_destination_retry_timings",
|
||||||
self._get_destination_retry_timings, destination)
|
self._get_destination_retry_timings, destination)
|
||||||
|
|
||||||
|
# We don't hugely care about race conditions between getting and
|
||||||
|
# invalidating the cache, since we time out fairly quickly anyway.
|
||||||
|
self._destination_retry_cache[destination] = result
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
def _get_destination_retry_timings(self, txn, destination):
|
def _get_destination_retry_timings(self, txn, destination):
|
||||||
result = self._simple_select_one_txn(
|
result = self._simple_select_one_txn(
|
||||||
txn,
|
txn,
|
||||||
|
@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
retry_interval (int) - how long until next retry in ms
|
retry_interval (int) - how long until next retry in ms
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
self._destination_retry_cache.pop(destination)
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"set_destination_retry_timings",
|
"set_destination_retry_timings",
|
||||||
self._set_destination_retry_timings,
|
self._set_destination_retry_timings,
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
import logging
|
import logging
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
from six import itervalues
|
from six import iteritems, itervalues
|
||||||
|
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.util.caches import register_cache
|
from synapse.util.caches import register_cache
|
||||||
|
@ -24,6 +24,9 @@ from synapse.util.caches import register_cache
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
SENTINEL = object()
|
||||||
|
|
||||||
|
|
||||||
class ExpiringCache(object):
|
class ExpiringCache(object):
|
||||||
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
|
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
|
||||||
reset_expiry_on_get=False, iterable=False):
|
reset_expiry_on_get=False, iterable=False):
|
||||||
|
@ -95,6 +98,21 @@ class ExpiringCache(object):
|
||||||
|
|
||||||
return entry.value
|
return entry.value
|
||||||
|
|
||||||
|
def pop(self, key, default=SENTINEL):
|
||||||
|
"""Removes and returns the value with the given key from the cache.
|
||||||
|
|
||||||
|
If the key isn't in the cache then `default` will be returned if
|
||||||
|
specified, otherwise `KeyError` will get raised.
|
||||||
|
|
||||||
|
Identical functionality to `dict.pop(..)`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
value = self._cache.pop(key, default)
|
||||||
|
if value is SENTINEL:
|
||||||
|
raise KeyError(key)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
def __contains__(self, key):
|
def __contains__(self, key):
|
||||||
return key in self._cache
|
return key in self._cache
|
||||||
|
|
||||||
|
@ -122,7 +140,7 @@ class ExpiringCache(object):
|
||||||
|
|
||||||
keys_to_delete = set()
|
keys_to_delete = set()
|
||||||
|
|
||||||
for key, cache_entry in self._cache.items():
|
for key, cache_entry in iteritems(self._cache):
|
||||||
if now - cache_entry.time > self._expiry_ms:
|
if now - cache_entry.time > self._expiry_ms:
|
||||||
keys_to_delete.add(key)
|
keys_to_delete.add(key)
|
||||||
|
|
||||||
|
@ -146,6 +164,8 @@ class ExpiringCache(object):
|
||||||
|
|
||||||
|
|
||||||
class _CacheEntry(object):
|
class _CacheEntry(object):
|
||||||
|
__slots__ = ["time", "value"]
|
||||||
|
|
||||||
def __init__(self, time, value):
|
def __init__(self, time, value):
|
||||||
self.time = time
|
self.time = time
|
||||||
self.value = value
|
self.value = value
|
||||||
|
|
Loading…
Reference in a new issue