diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 6b0f8c2787..efb90c3c91 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore): # Returns an ObservableDeferred res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None) - if res and res.called and user_id in res.result: - # We'd only be adding to the set, so no point invalidating if the - # user is already there - return + if res: + if isinstance(res, defer.Deferred) and res.called: + res = res.result + if user_id in res: + # We'd only be adding to the set, so no point invalidating if the + # user is already there + return self.get_users_with_read_receipts_in_room.invalidate((room_id,)) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 9fd35a8134..4a83c46d98 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -18,8 +18,6 @@ import os CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) -DEBUG_CACHES = False - metrics = synapse.metrics.get_metrics_for("synapse.util.caches") caches_by_name = {} diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 9d0d0be1f9..807e147657 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry -from . import DEBUG_CACHES, register_cache +from . import register_cache from twisted.internet import defer from collections import namedtuple @@ -76,7 +76,7 @@ class Cache(object): self.cache = LruCache( max_size=max_entries, keylen=keylen, cache_type=cache_type, - size_callback=(lambda d: len(d.result)) if iterable else None, + size_callback=(lambda d: len(d)) if iterable else None, ) self.name = name @@ -96,6 +96,17 @@ class Cache(object): ) def get(self, key, default=_CacheSentinel, callback=None): + """Looks the key up in the caches. + + Args: + key(tuple) + default: What is returned if key is not in the caches. If not + specified then function throws KeyError instead + callback(fn): Gets called when the entry in the cache is invalidated + + Returns: + Either a Deferred or the raw result + """ callbacks = [callback] if callback else [] val = self._pending_deferred_cache.get(key, _CacheSentinel) if val is not _CacheSentinel: @@ -137,7 +148,7 @@ class Cache(object): if self.sequence == entry.sequence: existing_entry = self._pending_deferred_cache.pop(key, None) if existing_entry is entry: - self.cache.set(key, entry.deferred, entry.callbacks) + self.cache.set(key, result, entry.callbacks) else: entry.invalidate() else: @@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase): try: cached_result_d = cache.get(cache_key, callback=invalidate_callback) - observer = cached_result_d.observe() - if DEBUG_CACHES: - @defer.inlineCallbacks - def check_result(cached_result): - actual_result = yield self.function_to_call(obj, *args, **kwargs) - if actual_result != cached_result: - logger.error( - "Stale cache entry %s%r: cached: %r, actual %r", - self.orig.__name__, cache_key, - cached_result, actual_result, - ) - raise ValueError("Stale cache entry") - defer.returnValue(cached_result) - observer.addCallback(check_result) + if isinstance(cached_result_d, ObservableDeferred): + observer = cached_result_d.observe() + else: + observer = cached_result_d except KeyError: ret = defer.maybeDeferred( @@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase): try: res = cache.get(tuple(key), callback=invalidate_callback) - if not res.has_succeeded(): + if not isinstance(res, ObservableDeferred): + results[arg] = res + elif not res.has_succeeded(): res = res.observe() res.addCallback(lambda r, arg: (arg, r), arg) cached_defers[arg] = res