From c4e3029d55c228375725ccacd326e1ee5cc8dd73 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Feb 2015 16:08:13 +0000 Subject: [PATCH 1/7] Add cache layer to state group resolution --- synapse/state.py | 63 ++++++++++++++++++++++++++++++++++++++++ synapse/util/__init__.py | 10 ++++++- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/synapse/state.py b/synapse/state.py index 695a5e7ac..c45bab585 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -43,14 +43,30 @@ AuthEventTypes = ( ) +class _StateCacheEntry(object): + def __init__(self, state, state_group, ts): + self.state = state + self.state_group = state_group + self.ts = ts + + class StateHandler(object): """ Responsible for doing state conflict resolution. """ def __init__(self, hs): + self.clock = hs.get_clock() self.store = hs.get_datastore() self.hs = hs + # set of event_ids -> _StateCacheEntry. + self._state_cache = {} + + def f(): + self._prune_cache() + + self.clock.looping_call(f, 10*1000) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): """ Returns the current state for the room as a list. This is done by @@ -70,6 +86,11 @@ class StateHandler(object): for e_id, _, _ in events ] + cache = self._state_cache.get(set(event_ids), None) + if cache: + cache.ts = self.clock.time_msec() + defer.returnValue(cache.state_group, cache.state) + res = yield self.resolve_state_groups(event_ids) if event_type: @@ -177,6 +198,11 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) + cache = self._state_cache.get(set(event_ids), None) + if cache and cache.state_group: + cache.ts = self.clock.time_msec() + defer.returnValue(cache.state_group, cache.state) + state_groups = yield self.store.get_state_groups( event_ids ) @@ -200,6 +226,14 @@ class StateHandler(object): else: prev_states = [] + cache = _StateCacheEntry( + state=state, + state_group=name, + ts=self.clock.time_msec() + ) + + self._state_cache[set(event_ids)] = cache + defer.returnValue((name, state, prev_states)) state = {} @@ -245,6 +279,14 @@ class StateHandler(object): new_state = unconflicted_state new_state.update(resolved_state) + cache = _StateCacheEntry( + state=new_state, + state_group=None, + ts=self.clock.time_msec() + ) + + self._state_cache[set(event_ids)] = cache + defer.returnValue((None, new_state, prev_states)) @log_function @@ -328,3 +370,24 @@ class StateHandler(object): return -int(e.depth), hashlib.sha1(e.event_id).hexdigest() return sorted(events, key=key_func) + + def _prune_cache(self): + now = self.clock.time_msec() + + if len(self._state_cache) > 100: + sorted_entries = sorted( + self._state_cache.items(), + key=lambda k, v: v.ts, + ) + + for k, _ in sorted_entries[100:]: + self._state_cache.pop(k) + + keys_to_delete = set() + + for key, cache_entry in self._state_cache.items(): + if now - cache_entry.ts > 60*1000: + keys_to_delete.add(key) + + for k in keys_to_delete: + self._state_cache.pop(k) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 4e837a918..1fd5ba578 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -15,7 +15,7 @@ from synapse.util.logcontext import LoggingContext -from twisted.internet import reactor +from twisted.internet import reactor, task import time @@ -35,6 +35,14 @@ class Clock(object): """Returns the current system time in miliseconds since epoch.""" return self.time() * 1000 + def looping_call(self, f, msec): + l = task.LoopingCall(f) + l.start(msec/1000.0, now=False) + return l + + def looping_call(self, loop): + loop.stop() + def call_later(self, delay, callback): current_context = LoggingContext.current_context() From b4886264a36ae85bcf922f7d2bc56f0ae59b95e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Feb 2015 16:17:05 +0000 Subject: [PATCH 2/7] Bugfix cache layer --- synapse/state.py | 31 +++++++++++++++++++++---------- synapse/util/__init__.py | 2 +- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index c45bab585..7523573f2 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -86,18 +86,19 @@ class StateHandler(object): for e_id, _, _ in events ] - cache = self._state_cache.get(set(event_ids), None) + cache = self._state_cache.get(frozenset(event_ids), None) if cache: cache.ts = self.clock.time_msec() - defer.returnValue(cache.state_group, cache.state) - - res = yield self.resolve_state_groups(event_ids) + state = cache.state + else: + res = yield self.resolve_state_groups(event_ids) + state = res[1] if event_type: - defer.returnValue(res[1].get((event_type, state_key))) + defer.returnValue(state.get((event_type, state_key))) return - defer.returnValue(res[1].values()) + defer.returnValue(state.values()) @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): @@ -198,10 +199,16 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) - cache = self._state_cache.get(set(event_ids), None) + cache = self._state_cache.get(frozenset(event_ids), None) if cache and cache.state_group: cache.ts = self.clock.time_msec() - defer.returnValue(cache.state_group, cache.state) + prev_state = cache.state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + prev_states = [prev_state] + else: + prev_states = [] + defer.returnValue((cache.state_group, cache.state, prev_states)) state_groups = yield self.store.get_state_groups( event_ids @@ -232,7 +239,7 @@ class StateHandler(object): ts=self.clock.time_msec() ) - self._state_cache[set(event_ids)] = cache + self._state_cache[frozenset(event_ids)] = cache defer.returnValue((name, state, prev_states)) @@ -285,7 +292,7 @@ class StateHandler(object): ts=self.clock.time_msec() ) - self._state_cache[set(event_ids)] = cache + self._state_cache[frozenset(event_ids)] = cache defer.returnValue((None, new_state, prev_states)) @@ -372,6 +379,8 @@ class StateHandler(object): return sorted(events, key=key_func) def _prune_cache(self): + logger.debug("_prune_cache. before len: ", len(self._state_cache)) + now = self.clock.time_msec() if len(self._state_cache) > 100: @@ -391,3 +400,5 @@ class StateHandler(object): for k in keys_to_delete: self._state_cache.pop(k) + + logger.debug("_prune_cache. after len: ", len(self._state_cache)) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 1fd5ba578..fee76b0a9 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -40,7 +40,7 @@ class Clock(object): l.start(msec/1000.0, now=False) return l - def looping_call(self, loop): + def stop_looping_call(self, loop): loop.stop() def call_later(self, delay, callback): From 5bf318e9a60dd05120e9b78b49042337d38ec414 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Feb 2015 16:52:22 +0000 Subject: [PATCH 3/7] Bug fixes. --- synapse/app/homeserver.py | 2 + synapse/state.py | 82 ++++++++++++++++++++++++++------------- 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index f20ccfb5b..3a128af5f 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -274,6 +274,8 @@ def setup(): hs.get_pusherpool().start() + hs.get_state_handler().start_caching() + if config.daemonize: print config.pid_file daemon = Daemonize( diff --git a/synapse/state.py b/synapse/state.py index 7523573f2..49ac09863 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -43,6 +43,10 @@ AuthEventTypes = ( ) +SIZE_OF_CACHE = 1000 +EVICTION_TIMEOUT_SECONDS = 20 + + class _StateCacheEntry(object): def __init__(self, state, state_group, ts): self.state = state @@ -59,13 +63,22 @@ class StateHandler(object): self.store = hs.get_datastore() self.hs = hs - # set of event_ids -> _StateCacheEntry. + # dict of set of event_ids -> _StateCacheEntry. + self._state_cache = None + + def start_caching(self): + logger.debug("start_caching") + self._state_cache = {} def f(): - self._prune_cache() + logger.debug("Pruning") + try: + self._prune_cache() + except: + logger.exception("Prune") - self.clock.looping_call(f, 10*1000) + self.clock.looping_call(f, 5*1000) @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): @@ -86,7 +99,10 @@ class StateHandler(object): for e_id, _, _ in events ] - cache = self._state_cache.get(frozenset(event_ids), None) + cache = None + if self._state_cache is not None: + cache = self._state_cache.get(frozenset(event_ids), None) + if cache: cache.ts = self.clock.time_msec() state = cache.state @@ -199,16 +215,19 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) - cache = self._state_cache.get(frozenset(event_ids), None) - if cache and cache.state_group: - cache.ts = self.clock.time_msec() - prev_state = cache.state.get((event_type, state_key), None) - if prev_state: - prev_state = prev_state.event_id - prev_states = [prev_state] - else: - prev_states = [] - defer.returnValue((cache.state_group, cache.state, prev_states)) + if self._state_cache is not None: + cache = self._state_cache.get(frozenset(event_ids), None) + if cache and cache.state_group: + cache.ts = self.clock.time_msec() + prev_state = cache.state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + prev_states = [prev_state] + else: + prev_states = [] + defer.returnValue( + (cache.state_group, cache.state, prev_states) + ) state_groups = yield self.store.get_state_groups( event_ids @@ -233,15 +252,16 @@ class StateHandler(object): else: prev_states = [] - cache = _StateCacheEntry( - state=state, - state_group=name, - ts=self.clock.time_msec() - ) + if self._state_cache is not None: + cache = _StateCacheEntry( + state=state, + state_group=name, + ts=self.clock.time_msec() + ) - self._state_cache[frozenset(event_ids)] = cache + self._state_cache[frozenset(event_ids)] = cache - defer.returnValue((name, state, prev_states)) + defer.returnValue((name, state, prev_states)) state = {} for group, g_state in state_groups.items(): @@ -292,7 +312,8 @@ class StateHandler(object): ts=self.clock.time_msec() ) - self._state_cache[frozenset(event_ids)] = cache + if self._state_cache is not None: + self._state_cache[frozenset(event_ids)] = cache defer.returnValue((None, new_state, prev_states)) @@ -379,26 +400,33 @@ class StateHandler(object): return sorted(events, key=key_func) def _prune_cache(self): - logger.debug("_prune_cache. before len: ", len(self._state_cache)) + logger.debug("_prune_cache") + logger.debug( + "_prune_cache. before len: %d", + len(self._state_cache.keys()) + ) now = self.clock.time_msec() - if len(self._state_cache) > 100: + if len(self._state_cache.keys()) > SIZE_OF_CACHE: sorted_entries = sorted( self._state_cache.items(), key=lambda k, v: v.ts, ) - for k, _ in sorted_entries[100:]: + for k, _ in sorted_entries[SIZE_OF_CACHE:]: self._state_cache.pop(k) keys_to_delete = set() for key, cache_entry in self._state_cache.items(): - if now - cache_entry.ts > 60*1000: + if now - cache_entry.ts > EVICTION_TIMEOUT_SECONDS*1000: keys_to_delete.add(key) for k in keys_to_delete: self._state_cache.pop(k) - logger.debug("_prune_cache. after len: ", len(self._state_cache)) + logger.debug( + "_prune_cache. after len: %d", + len(self._state_cache.keys()) + ) From 164f6b9256782a5040c1a770c0758a31cf88e6e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Feb 2015 14:23:57 +0000 Subject: [PATCH 4/7] Fix tests --- tests/test_state.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_state.py b/tests/test_state.py index 019e794aa..fea25f702 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -21,6 +21,8 @@ from synapse.api.auth import Auth from synapse.api.constants import EventTypes, Membership from synapse.state import StateHandler +from .utils import MockClock + from mock import Mock @@ -138,10 +140,13 @@ class StateTestCase(unittest.TestCase): "add_event_hashes", ] ) - hs = Mock(spec=["get_datastore", "get_auth", "get_state_handler"]) + hs = Mock(spec=[ + "get_datastore", "get_auth", "get_state_handler", "get_clock", + ]) hs.get_datastore.return_value = self.store hs.get_state_handler.return_value = None hs.get_auth.return_value = Auth(hs) + hs.get_clock.return_value = MockClock() self.state = StateHandler(hs) self.event_id = 0 From d19e2ed02fb545bde9c8d1a317f07b9fb786d2c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Feb 2015 11:01:15 +0000 Subject: [PATCH 5/7] Move construction of object within if block --- synapse/state.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 49ac09863..2dea8f552 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -306,13 +306,13 @@ class StateHandler(object): new_state = unconflicted_state new_state.update(resolved_state) - cache = _StateCacheEntry( - state=new_state, - state_group=None, - ts=self.clock.time_msec() - ) - if self._state_cache is not None: + cache = _StateCacheEntry( + state=new_state, + state_group=None, + ts=self.clock.time_msec() + ) + self._state_cache[frozenset(event_ids)] = cache defer.returnValue((None, new_state, prev_states)) From 2b042ad67fd16c43f3cfce472a8d1b31cff867da Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Feb 2015 11:03:16 +0000 Subject: [PATCH 6/7] Oops, we do want to defer.return regardless of whether we are caching or not --- synapse/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/state.py b/synapse/state.py index 2dea8f552..31f503a1e 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -261,7 +261,7 @@ class StateHandler(object): self._state_cache[frozenset(event_ids)] = cache - defer.returnValue((name, state, prev_states)) + defer.returnValue((name, state, prev_states)) state = {} for group, g_state in state_groups.items(): From f8abbae99fe76409533518e828031de4933f38ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Feb 2015 15:45:50 +0000 Subject: [PATCH 7/7] Remove unnecessary logging --- synapse/state.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 64c58a393..98aaa2be5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -72,11 +72,7 @@ class StateHandler(object): self._state_cache = {} def f(): - logger.debug("Pruning") - try: - self._prune_cache() - except: - logger.exception("Prune") + self._prune_cache() self.clock.looping_call(f, 5*1000) @@ -400,7 +396,6 @@ class StateHandler(object): return sorted(events, key=key_func) def _prune_cache(self): - logger.debug("_prune_cache") logger.debug( "_prune_cache. before len: %d", len(self._state_cache.keys())