forked from MirrorHub/synapse
Merge pull request #2158 from matrix-org/erikj/reduce_cache_size
Reduce cache size by not storing deferreds
This commit is contained in:
commit
b4da08cad8
3 changed files with 28 additions and 24 deletions
|
@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore):
|
||||||
# Returns an ObservableDeferred
|
# Returns an ObservableDeferred
|
||||||
res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
|
res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
|
||||||
|
|
||||||
if res and res.called and user_id in res.result:
|
if res:
|
||||||
# We'd only be adding to the set, so no point invalidating if the
|
if isinstance(res, defer.Deferred) and res.called:
|
||||||
# user is already there
|
res = res.result
|
||||||
return
|
if user_id in res:
|
||||||
|
# We'd only be adding to the set, so no point invalidating if the
|
||||||
|
# user is already there
|
||||||
|
return
|
||||||
|
|
||||||
self.get_users_with_read_receipts_in_room.invalidate((room_id,))
|
self.get_users_with_read_receipts_in_room.invalidate((room_id,))
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,6 @@ import os
|
||||||
|
|
||||||
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
|
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
|
||||||
|
|
||||||
DEBUG_CACHES = False
|
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
|
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
|
||||||
|
|
||||||
caches_by_name = {}
|
caches_by_name = {}
|
||||||
|
|
|
@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext
|
||||||
from synapse.util.caches.lrucache import LruCache
|
from synapse.util.caches.lrucache import LruCache
|
||||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
||||||
|
|
||||||
from . import DEBUG_CACHES, register_cache
|
from . import register_cache
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
@ -76,7 +76,7 @@ class Cache(object):
|
||||||
|
|
||||||
self.cache = LruCache(
|
self.cache = LruCache(
|
||||||
max_size=max_entries, keylen=keylen, cache_type=cache_type,
|
max_size=max_entries, keylen=keylen, cache_type=cache_type,
|
||||||
size_callback=(lambda d: len(d.result)) if iterable else None,
|
size_callback=(lambda d: len(d)) if iterable else None,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -96,6 +96,17 @@ class Cache(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
def get(self, key, default=_CacheSentinel, callback=None):
|
def get(self, key, default=_CacheSentinel, callback=None):
|
||||||
|
"""Looks the key up in the caches.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key(tuple)
|
||||||
|
default: What is returned if key is not in the caches. If not
|
||||||
|
specified then function throws KeyError instead
|
||||||
|
callback(fn): Gets called when the entry in the cache is invalidated
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Either a Deferred or the raw result
|
||||||
|
"""
|
||||||
callbacks = [callback] if callback else []
|
callbacks = [callback] if callback else []
|
||||||
val = self._pending_deferred_cache.get(key, _CacheSentinel)
|
val = self._pending_deferred_cache.get(key, _CacheSentinel)
|
||||||
if val is not _CacheSentinel:
|
if val is not _CacheSentinel:
|
||||||
|
@ -137,7 +148,7 @@ class Cache(object):
|
||||||
if self.sequence == entry.sequence:
|
if self.sequence == entry.sequence:
|
||||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||||
if existing_entry is entry:
|
if existing_entry is entry:
|
||||||
self.cache.set(key, entry.deferred, entry.callbacks)
|
self.cache.set(key, result, entry.callbacks)
|
||||||
else:
|
else:
|
||||||
entry.invalidate()
|
entry.invalidate()
|
||||||
else:
|
else:
|
||||||
|
@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase):
|
||||||
try:
|
try:
|
||||||
cached_result_d = cache.get(cache_key, callback=invalidate_callback)
|
cached_result_d = cache.get(cache_key, callback=invalidate_callback)
|
||||||
|
|
||||||
observer = cached_result_d.observe()
|
if isinstance(cached_result_d, ObservableDeferred):
|
||||||
if DEBUG_CACHES:
|
observer = cached_result_d.observe()
|
||||||
@defer.inlineCallbacks
|
else:
|
||||||
def check_result(cached_result):
|
observer = cached_result_d
|
||||||
actual_result = yield self.function_to_call(obj, *args, **kwargs)
|
|
||||||
if actual_result != cached_result:
|
|
||||||
logger.error(
|
|
||||||
"Stale cache entry %s%r: cached: %r, actual %r",
|
|
||||||
self.orig.__name__, cache_key,
|
|
||||||
cached_result, actual_result,
|
|
||||||
)
|
|
||||||
raise ValueError("Stale cache entry")
|
|
||||||
defer.returnValue(cached_result)
|
|
||||||
observer.addCallback(check_result)
|
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
ret = defer.maybeDeferred(
|
ret = defer.maybeDeferred(
|
||||||
|
@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = cache.get(tuple(key), callback=invalidate_callback)
|
res = cache.get(tuple(key), callback=invalidate_callback)
|
||||||
if not res.has_succeeded():
|
if not isinstance(res, ObservableDeferred):
|
||||||
|
results[arg] = res
|
||||||
|
elif not res.has_succeeded():
|
||||||
res = res.observe()
|
res = res.observe()
|
||||||
res.addCallback(lambda r, arg: (arg, r), arg)
|
res.addCallback(lambda r, arg: (arg, r), arg)
|
||||||
cached_defers[arg] = res
|
cached_defers[arg] = res
|
||||||
|
|
Loading…
Reference in a new issue