Merge pull request #519 from matrix-org/dbkr/treecache

Make LRU caching tree-based so subtrees of the cache can be invalidated cheaply.
This commit is contained in:
David Baker 2016-01-22 14:47:48 +00:00
commit 7a3fe48ba4
6 changed files with 236 additions and 26 deletions

View file

@ -40,14 +40,20 @@ class EventPushActionsStore(SQLBaseStore):
'actions': json.dumps(actions) 'actions': json.dumps(actions)
}) })
def f(txn):
for uid, _, __ in tuples:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid)
)
return self._simple_insert_many_txn(txn, "event_push_actions", values)
yield self.runInteraction( yield self.runInteraction(
"set_actions_for_event_and_users", "set_actions_for_event_and_users",
self._simple_insert_many_txn, f,
"event_push_actions",
values
) )
@cachedInlineCallbacks(num_args=3) @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
def get_unread_event_push_actions_by_room_for_user( def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id self, room_id, user_id, last_read_event_id
): ):
@ -98,6 +104,11 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def remove_push_actions_for_event_id(self, room_id, event_id): def remove_push_actions_for_event_id(self, room_id, event_id):
def f(txn): def f(txn):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id,)
)
txn.execute( txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id) (room_id, event_id)

View file

@ -17,6 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache
from . import caches_by_name, DEBUG_CACHES, cache_counter from . import caches_by_name, DEBUG_CACHES, cache_counter
@ -36,9 +37,12 @@ _CacheSentinel = object()
class Cache(object): class Cache(object):
def __init__(self, name, max_entries=1000, keylen=1, lru=True): def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
if lru: if lru:
self.cache = LruCache(max_size=max_entries) cache_type = TreeCache if tree else dict
self.cache = LruCache(
max_size=max_entries, keylen=keylen, cache_type=cache_type
)
self.max_entries = None self.max_entries = None
else: else:
self.cache = OrderedDict() self.cache = OrderedDict()
@ -99,6 +103,15 @@ class Cache(object):
self.sequence += 1 self.sequence += 1
self.cache.pop(key, None) self.cache.pop(key, None)
def invalidate_many(self, key):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError(
"The cache key must be a tuple not %r" % (type(key),)
)
self.sequence += 1
self.cache.del_multi(key)
def invalidate_all(self): def invalidate_all(self):
self.check_thread() self.check_thread()
self.sequence += 1 self.sequence += 1
@ -122,7 +135,7 @@ class CacheDescriptor(object):
which can be used to insert values into the cache specifically, without which can be used to insert values into the cache specifically, without
calling the calculation function. calling the calculation function.
""" """
def __init__(self, orig, max_entries=1000, num_args=1, lru=True, def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False,
inlineCallbacks=False): inlineCallbacks=False):
self.orig = orig self.orig = orig
@ -134,6 +147,7 @@ class CacheDescriptor(object):
self.max_entries = max_entries self.max_entries = max_entries
self.num_args = num_args self.num_args = num_args
self.lru = lru self.lru = lru
self.tree = tree
self.arg_names = inspect.getargspec(orig).args[1:num_args+1] self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
@ -149,6 +163,7 @@ class CacheDescriptor(object):
max_entries=self.max_entries, max_entries=self.max_entries,
keylen=self.num_args, keylen=self.num_args,
lru=self.lru, lru=self.lru,
tree=self.tree,
) )
def __get__(self, obj, objtype=None): def __get__(self, obj, objtype=None):
@ -200,6 +215,7 @@ class CacheDescriptor(object):
wrapped.invalidate = self.cache.invalidate wrapped.invalidate = self.cache.invalidate
wrapped.invalidate_all = self.cache.invalidate_all wrapped.invalidate_all = self.cache.invalidate_all
wrapped.invalidate_many = self.cache.invalidate_many
wrapped.prefill = self.cache.prefill wrapped.prefill = self.cache.prefill
obj.__dict__[self.orig.__name__] = wrapped obj.__dict__[self.orig.__name__] = wrapped
@ -321,21 +337,23 @@ class CacheListDescriptor(object):
return wrapped return wrapped
def cached(max_entries=1000, num_args=1, lru=True): def cached(max_entries=1000, num_args=1, lru=True, tree=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru
)
def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False):
return lambda orig: CacheDescriptor( return lambda orig: CacheDescriptor(
orig, orig,
max_entries=max_entries, max_entries=max_entries,
num_args=num_args, num_args=num_args,
lru=lru, lru=lru,
tree=tree,
)
def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru,
tree=tree,
inlineCallbacks=True, inlineCallbacks=True,
) )

View file

@ -17,11 +17,27 @@
from functools import wraps from functools import wraps
import threading import threading
from synapse.util.caches.treecache import TreeCache
def enumerate_leaves(node, depth):
if depth == 0:
yield node
else:
for n in node.values():
for m in enumerate_leaves(n, depth - 1):
yield m
class LruCache(object): class LruCache(object):
"""Least-recently-used cache.""" """
def __init__(self, max_size): Least-recently-used cache.
cache = {} Supports del_multi only if cache_type=TreeCache
If cache_type=TreeCache, all keys must be tuples.
"""
def __init__(self, max_size, keylen=1, cache_type=dict):
cache = cache_type()
self.size = 0
list_root = [] list_root = []
list_root[:] = [list_root, list_root, None, None] list_root[:] = [list_root, list_root, None, None]
@ -44,6 +60,7 @@ class LruCache(object):
prev_node[NEXT] = node prev_node[NEXT] = node
next_node[PREV] = node next_node[PREV] = node
cache[key] = node cache[key] = node
self.size += 1
def move_node_to_front(node): def move_node_to_front(node):
prev_node = node[PREV] prev_node = node[PREV]
@ -62,7 +79,7 @@ class LruCache(object):
next_node = node[NEXT] next_node = node[NEXT]
prev_node[NEXT] = next_node prev_node[NEXT] = next_node
next_node[PREV] = prev_node next_node[PREV] = prev_node
cache.pop(node[KEY], None) self.size -= 1
@synchronized @synchronized
def cache_get(key, default=None): def cache_get(key, default=None):
@ -81,8 +98,10 @@ class LruCache(object):
node[VALUE] = value node[VALUE] = value
else: else:
add_node(key, value) add_node(key, value)
if len(cache) > max_size: if self.size > max_size:
delete_node(list_root[PREV]) todelete = list_root[PREV]
delete_node(todelete)
cache.pop(todelete[KEY], None)
@synchronized @synchronized
def cache_set_default(key, value): def cache_set_default(key, value):
@ -91,8 +110,10 @@ class LruCache(object):
return node[VALUE] return node[VALUE]
else: else:
add_node(key, value) add_node(key, value)
if len(cache) > max_size: if self.size > max_size:
delete_node(list_root[PREV]) todelete = list_root[PREV]
delete_node(todelete)
cache.pop(todelete[KEY], None)
return value return value
@synchronized @synchronized
@ -100,10 +121,22 @@ class LruCache(object):
node = cache.get(key, None) node = cache.get(key, None)
if node: if node:
delete_node(node) delete_node(node)
cache.pop(node[KEY], None)
return node[VALUE] return node[VALUE]
else: else:
return default return default
@synchronized
def cache_del_multi(key):
"""
This will only work if constructed with cache_type=TreeCache
"""
popped = cache.pop(key)
if popped is None:
return
for leaf in enumerate_leaves(popped, keylen - len(key)):
delete_node(leaf)
@synchronized @synchronized
def cache_clear(): def cache_clear():
list_root[NEXT] = list_root list_root[NEXT] = list_root
@ -112,7 +145,7 @@ class LruCache(object):
@synchronized @synchronized
def cache_len(): def cache_len():
return len(cache) return self.size
@synchronized @synchronized
def cache_contains(key): def cache_contains(key):
@ -123,6 +156,8 @@ class LruCache(object):
self.set = cache_set self.set = cache_set
self.setdefault = cache_set_default self.setdefault = cache_set_default
self.pop = cache_pop self.pop = cache_pop
if cache_type is TreeCache:
self.del_multi = cache_del_multi
self.len = cache_len self.len = cache_len
self.contains = cache_contains self.contains = cache_contains
self.clear = cache_clear self.clear = cache_clear

View file

@ -0,0 +1,60 @@
SENTINEL = object()
class TreeCache(object):
"""
Tree-based backing store for LruCache. Allows subtrees of data to be deleted
efficiently.
Keys must be tuples.
"""
def __init__(self):
self.root = {}
def __setitem__(self, key, value):
return self.set(key, value)
def __contains__(self, key):
return self.get(key, SENTINEL) is not SENTINEL
def set(self, key, value):
node = self.root
for k in key[:-1]:
node = node.setdefault(k, {})
node[key[-1]] = value
def get(self, key, default=None):
node = self.root
for k in key[:-1]:
node = node.get(k, None)
if node is None:
return default
return node.get(key[-1], default)
def clear(self):
self.root = {}
def pop(self, key, default=None):
nodes = []
node = self.root
for k in key[:-1]:
node = node.get(k, None)
nodes.append(node) # don't add the root node
if node is None:
return default
popped = node.pop(key[-1], SENTINEL)
if popped is SENTINEL:
return default
node_and_keys = zip(nodes, key)
node_and_keys.reverse()
node_and_keys.append((self.root, None))
for i in range(len(node_and_keys) - 1):
n, k = node_and_keys[i]
if n:
break
node_and_keys[i+1][0].pop(k)
return popped

View file

@ -17,6 +17,7 @@
from .. import unittest from .. import unittest
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache
class LruCacheTestCase(unittest.TestCase): class LruCacheTestCase(unittest.TestCase):
@ -52,3 +53,22 @@ class LruCacheTestCase(unittest.TestCase):
cache["key"] = 1 cache["key"] = 1
self.assertEquals(cache.pop("key"), 1) self.assertEquals(cache.pop("key"), 1)
self.assertEquals(cache.pop("key"), None) self.assertEquals(cache.pop("key"), None)
def test_del_multi(self):
cache = LruCache(4, 2, cache_type=TreeCache)
cache[("animal", "cat")] = "mew"
cache[("animal", "dog")] = "woof"
cache[("vehicles", "car")] = "vroom"
cache[("vehicles", "train")] = "chuff"
self.assertEquals(len(cache), 4)
self.assertEquals(cache.get(("animal", "cat")), "mew")
self.assertEquals(cache.get(("vehicles", "car")), "vroom")
cache.del_multi(("animal",))
self.assertEquals(len(cache), 2)
self.assertEquals(cache.get(("animal", "cat")), None)
self.assertEquals(cache.get(("animal", "dog")), None)
self.assertEquals(cache.get(("vehicles", "car")), "vroom")
self.assertEquals(cache.get(("vehicles", "train")), "chuff")
# Man from del_multi say "Yes".

View file

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .. import unittest
from synapse.util.caches.treecache import TreeCache
class TreeCacheTestCase(unittest.TestCase):
def test_get_set_onelevel(self):
cache = TreeCache()
cache[("a",)] = "A"
cache[("b",)] = "B"
self.assertEquals(cache.get(("a",)), "A")
self.assertEquals(cache.get(("b",)), "B")
def test_pop_onelevel(self):
cache = TreeCache()
cache[("a",)] = "A"
cache[("b",)] = "B"
self.assertEquals(cache.pop(("a",)), "A")
self.assertEquals(cache.pop(("a",)), None)
self.assertEquals(cache.get(("b",)), "B")
def test_get_set_twolevel(self):
cache = TreeCache()
cache[("a", "a")] = "AA"
cache[("a", "b")] = "AB"
cache[("b", "a")] = "BA"
self.assertEquals(cache.get(("a", "a")), "AA")
self.assertEquals(cache.get(("a", "b")), "AB")
self.assertEquals(cache.get(("b", "a")), "BA")
def test_pop_twolevel(self):
cache = TreeCache()
cache[("a", "a")] = "AA"
cache[("a", "b")] = "AB"
cache[("b", "a")] = "BA"
self.assertEquals(cache.pop(("a", "a")), "AA")
self.assertEquals(cache.get(("a", "a")), None)
self.assertEquals(cache.get(("a", "b")), "AB")
self.assertEquals(cache.pop(("b", "a")), "BA")
self.assertEquals(cache.pop(("b", "a")), None)
def test_pop_mixedlevel(self):
cache = TreeCache()
cache[("a", "a")] = "AA"
cache[("a", "b")] = "AB"
cache[("b", "a")] = "BA"
self.assertEquals(cache.get(("a", "a")), "AA")
cache.pop(("a",))
self.assertEquals(cache.get(("a", "a")), None)
self.assertEquals(cache.get(("a", "b")), None)
self.assertEquals(cache.get(("b", "a")), "BA")