From 033a517febc434269eefc75e4d9646d015beae54 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 16:59:27 +0000 Subject: [PATCH 1/8] Indirect invalidations of _get_event_cache via a helper method to keep all uses of the cache lexically within one .py file --- synapse/storage/_base.py | 3 +++ synapse/storage/events.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6fa63f052..374db1a30 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -732,6 +732,9 @@ class SQLBaseStore(object): return [e for e in events if e] + def _invalidate_get_event_cache(self, event_id): + self._get_event_cache.pop(event_id) + def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a86230d92..2425f57f5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -94,7 +94,7 @@ class EventsStore(SQLBaseStore): current_state=None): # Remove the any existing cache entries for the event_id - self._get_event_cache.pop(event.event_id) + self._invalidate_get_event_cache(event.event_id) # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table @@ -356,7 +356,7 @@ class EventsStore(SQLBaseStore): def _store_redaction(self, txn, event): # invalidate the cache for the redacted event - self._get_event_cache.pop(event.redacts) + self._invalidate_get_event_cache(event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) From 1b988b051b203ec17352b7422be141e622b4fa42 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 17:26:32 +0000 Subject: [PATCH 2/8] Store the rejected reason in (Frozen)Event structs --- synapse/events/__init__.py | 6 ++++-- synapse/storage/_base.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 64e08223b..e4495ccf1 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -46,9 +46,10 @@ def _event_dict_property(key): class EventBase(object): def __init__(self, event_dict, signatures={}, unsigned={}, - internal_metadata_dict={}): + internal_metadata_dict={}, rejected_reason=None): self.signatures = signatures self.unsigned = unsigned + self.rejected_reason = rejected_reason self._event_dict = event_dict @@ -109,7 +110,7 @@ class EventBase(object): class FrozenEvent(EventBase): - def __init__(self, event_dict, internal_metadata_dict={}): + def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): event_dict = dict(event_dict) # Signatures is a dict of dicts, and this is faster than doing a @@ -128,6 +129,7 @@ class FrozenEvent(EventBase): signatures=signatures, unsigned=unsigned, internal_metadata_dict=internal_metadata_dict, + rejected_reason=rejected_reason, ) @staticmethod diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 374db1a30..7f5ad9b0f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -784,6 +784,7 @@ class SQLBaseStore(object): txn, internal_metadata, js, redacted, check_redacted=check_redacted, get_prev_content=get_prev_content, + rejected_reason=rejected_reason, ) cache[(check_redacted, get_prev_content, allow_rejected)] = result return result @@ -791,7 +792,8 @@ class SQLBaseStore(object): return None def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False): + check_redacted=True, get_prev_content=False, + rejected_reason=None): start_time = time.time() * 1000 @@ -806,7 +808,11 @@ class SQLBaseStore(object): internal_metadata = json.loads(internal_metadata) start_time = update_counter("decode_internal", start_time) - ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) start_time = update_counter("build_frozen_event", start_time) if check_redacted and redacted: From f173d40a32cba919e088917fe42ac300a10e0ad2 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 17:33:26 +0000 Subject: [PATCH 3/8] Use FrozenEvent's reject_reason to decide whether to return it; don't include allow_rejected in the main getEvents cache key --- synapse/storage/_base.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7f5ad9b0f..919295eab 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -749,10 +749,13 @@ class SQLBaseStore(object): try: # Separate cache entries for each way to invoke _get_event_txn - ret = cache[(check_redacted, get_prev_content, allow_rejected)] - + ret = cache[(check_redacted, get_prev_content)] cache_counter.inc_hits("*getEvent*") - return ret + + if allow_rejected or not ret.rejected_reason: + return ret + else: + return None except KeyError: cache_counter.inc_misses("*getEvent*") pass @@ -779,14 +782,15 @@ class SQLBaseStore(object): start_time = update_counter("select_event", start_time) + result = self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=rejected_reason, + ) + cache[(check_redacted, get_prev_content)] = result + if allow_rejected or not rejected_reason: - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=rejected_reason, - ) - cache[(check_redacted, get_prev_content, allow_rejected)] = result return result else: return None From 953e40f9dc086a47d811d1fe029734b3178266f3 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:12:16 +0000 Subject: [PATCH 4/8] Implement the main getEvent cache using Cache() instead of a custom application of LruCache; also unify its two-level structure into just one --- synapse/storage/_base.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 919295eab..5c7bd22e6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -241,10 +241,8 @@ class SQLBaseStore(object): self._txn_perf_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters() - self._get_event_cache = LruCache(hs.config.event_cache_size) - - # Pretend the getEventCache is just another named cache - caches_by_name["*getEvent*"] = self._get_event_cache + self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, + max_entries=hs.config.event_cache_size) def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -733,7 +731,9 @@ class SQLBaseStore(object): return [e for e in events if e] def _invalidate_get_event_cache(self, event_id): - self._get_event_cache.pop(event_id) + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, get_prev_content) def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): @@ -745,19 +745,14 @@ class SQLBaseStore(object): sql_getevents_timer.inc_by(curr_time - last_time, desc) return curr_time - cache = self._get_event_cache.setdefault(event_id, {}) - try: - # Separate cache entries for each way to invoke _get_event_txn - ret = cache[(check_redacted, get_prev_content)] - cache_counter.inc_hits("*getEvent*") + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) if allow_rejected or not ret.rejected_reason: return ret else: return None except KeyError: - cache_counter.inc_misses("*getEvent*") pass finally: start_time = update_counter("event_cache", start_time) @@ -788,7 +783,7 @@ class SQLBaseStore(object): get_prev_content=get_prev_content, rejected_reason=rejected_reason, ) - cache[(check_redacted, get_prev_content)] = result + self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) if allow_rejected or not rejected_reason: return result From a198894bf737566e368db546121aca026ed1fbeb Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 26 Mar 2015 11:53:58 +0000 Subject: [PATCH 5/8] Appease pep8 --- synapse/storage/_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5c7bd22e6..cf4c76d33 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -733,7 +733,8 @@ class SQLBaseStore(object): def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, get_prev_content) + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): From 6f9dea7483ed01d17522857c5b103971a0050d8f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:07:20 +0100 Subject: [PATCH 6/8] SYN-339: Cancel the notifier timeout when the notifier fires --- synapse/notifier.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 12573f3f5..0fa77d28c 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -59,8 +59,8 @@ class _NotificationListener(object): self.limit = limit self.timeout = timeout self.deferred = deferred - self.rooms = rooms + self.timer = None def notified(self): return self.deferred.called @@ -93,6 +93,13 @@ class _NotificationListener(object): self.appservice, set() ).discard(self) + # Cancel the timeout for this notifer if one exists. + if self.timer is not None: + try: + notifier.clock.cancel_call_later(self.timer) + except: + logger.exception("Failed to cancel notifier timer") + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -325,14 +332,20 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + timer = [None] + if timeout: timed_out = [False] def _timeout_listener(): timed_out[0] = True + timer[0] = None listener[0].notify(self, [], from_token, from_token) - self.clock.call_later(timeout/1000., _timeout_listener) + # We create multiple notification listeners so we have to manage + # canceling the timeout ourselves. + timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) + while not result and not timed_out[0]: yield deferred deferred = defer.Deferred() @@ -347,6 +360,12 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + if timer[0] is not None: + try: + self.clock.cancel_call_later(timer[0]) + except: + logger.exception("Failed to cancel notifer timer") + defer.returnValue(result) def get_events_for(self, user, rooms, pagination_config, timeout): @@ -400,8 +419,11 @@ class Notifier(object): if not timeout: _timeout_listener() else: - self.clock.call_later(timeout/1000.0, _timeout_listener) - + # Only add the timer if the listener hasn't been notified + if not listener.notified(): + listener.timer = self.clock.call_later( + timeout/1000.0, _timeout_listener + ) return @log_function From 23d285ad57ca76e8ff2d33f1f6e476930689d9a7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:41:50 +0100 Subject: [PATCH 7/8] Unset the timer in the timeout callback so that we don't try to cancel it if it has been called --- synapse/notifier.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index 0fa77d28c..e6f37c373 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -404,6 +404,7 @@ class Notifier(object): def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. + listener.timer = None listener.notify( self, [], From 1280a47fc671b718239e06030d469d99aa5ea513 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:42:21 +0100 Subject: [PATCH 8/8] Add comment --- synapse/notifier.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index e6f37c373..d750a6fcf 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -404,6 +404,7 @@ class Notifier(object): def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. + # Remove the timer from the listener so we don't try to cancel it. listener.timer = None listener.notify( self,