From 261d809a4779b03c81ada52ed3893b2ad8782a96 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 14:08:03 +0100 Subject: [PATCH 01/13] Sequence the modifications to the cache so that selects don't race with inserts --- synapse/storage/_base.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c328b5274..7f5477dee 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -31,6 +31,7 @@ import functools import simplejson as json import sys import time +import threading logger = logging.getLogger(__name__) @@ -68,9 +69,20 @@ class Cache(object): self.name = name self.keylen = keylen - + self.sequence = 0 + self.thread = None caches_by_name[name] = self.cache + def check_thread(self): + expected_thread = self.thread + if expected_thread is None: + self.thread = threading.current_thread() + else: + if expected_thread is not threading.current_thread(): + raise ValueError( + "Cache objects can only be accessed from the main thread" + ) + def get(self, *keyargs): if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) @@ -82,6 +94,11 @@ class Cache(object): cache_counter.inc_misses(self.name) raise KeyError() + def update(self, sequence, *args): + self.check_thread() + if self.sequence == sequence: + self.prefill(*args) + def prefill(self, *args): # because I can't *keyargs, value keyargs = args[:-1] value = args[-1] @@ -96,9 +113,10 @@ class Cache(object): self.cache[keyargs] = value def invalidate(self, *keyargs): + self.check_thread() if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) - + self.sequence += 1 self.cache.pop(keyargs, None) @@ -130,9 +148,11 @@ def cached(max_entries=1000, num_args=1, lru=False): try: defer.returnValue(cache.get(*keyargs)) except KeyError: + sequence = cache.sequence + ret = yield orig(self, *keyargs) - cache.prefill(*keyargs + (ret,)) + cache.update(sequence, *keyargs + (ret,)) defer.returnValue(ret) From a9aea68fd568182185e8d0ae478c56df8ac6be49 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 14:57:08 +0100 Subject: [PATCH 02/13] Invalidate the caches from the correct thread --- synapse/storage/event_federation.py | 10 +++++--- synapse/storage/events.py | 39 +++++++++++++++++++---------- synapse/storage/room.py | 4 +-- synapse/storage/roommember.py | 8 +++--- synapse/storage/signatures.py | 12 ++++----- synapse/storage/state.py | 2 +- 6 files changed, 46 insertions(+), 29 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 68f39bd68..3cd3fbdc9 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, room_id, depth): + def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) do_insert = depth < min_depth if min_depth else True @@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, outlier, event_id, prev_events, - room_id): + def _handle_prev_events(self, txn, invalidates, outlier, event_id, + prev_events, room_id): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -330,7 +330,9 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - self.get_latest_event_ids_in_room.invalidate(room_id) + invalidates.append(( + self.get_latest_event_ids_in_room.invalidate, room_id + )) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3c260ddc..b2ab4b02f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_token try: - yield self.runInteraction( + invalidates = yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, @@ -52,6 +52,11 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) + for invalidated in invalidates: + invalidated_callback = invalidated[0] + invalidated_args = invalidated[1:] + invalidated_callback(*invalidated_args) + except _RollbackButIsFineException: pass @@ -91,9 +96,10 @@ class EventsStore(SQLBaseStore): def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): + invalidates = [] # Remove the any existing cache entries for the event_id - self._invalidate_get_event_cache(event.event_id) + invalidates.append((self._invalidate_get_event_cache, event.event_id)) if stream_ordering is None: with self._stream_id_gen.get_next_txn(txn) as stream_ordering: @@ -150,10 +156,11 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, event, context) + self._store_state_groups_txn(txn, invalidates, event, context) self._update_min_depth_for_room_txn( txn, + invalidates, event.room_id, event.depth ) @@ -199,6 +206,7 @@ class EventsStore(SQLBaseStore): self._handle_prev_events( txn, + invalidates, outlier=outlier, event_id=event.event_id, prev_events=event.prev_events, @@ -206,13 +214,13 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) + self._store_room_member_txn(txn, invalidates, event) elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) + self._store_room_name_txn(txn, invalidates, event) elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) + self._store_room_topic_txn(txn, invalidates, event) elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) + self._store_redaction(txn, invalidates, event) event_dict = { k: v @@ -281,19 +289,22 @@ class EventsStore(SQLBaseStore): ) if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) + self._store_rejections_txn( + txn, invalidates, event.event_id, context.rejected + ) for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, + txn, invalidates, event.event_id, hash_alg, hash_bytes, ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes + txn, invalidates, event.event_id, prev_event_id, alg, + hash_bytes ) for auth_id, _ in event.auth_events: @@ -309,7 +320,7 @@ class EventsStore(SQLBaseStore): (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes + txn, invalidates, event.event_id, ref_alg, ref_hash_bytes ) if event.is_state(): @@ -356,9 +367,11 @@ class EventsStore(SQLBaseStore): } ) - def _store_redaction(self, txn, event): + return invalidates + + def _store_redaction(self, txn, invalidates, event): # invalidate the cache for the redacted event - self._invalidate_get_event_cache(event.redacts) + invalidates.append((self._invalidate_get_event_cache, event.redacts)) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index f95637763..d42d7ff0e 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic_txn(self, txn, event): + def _store_room_topic_txn(self, txn, invalidates, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( txn, @@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore): }, ) - def _store_room_name_txn(self, txn, event): + def _store_room_name_txn(self, txn, invalidates, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 09fb77a19..117da817b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, event): + def _store_room_member_txn(self, txn, invalidates, event): """Store a room member in the database. """ try: @@ -64,8 +64,10 @@ class RoomMemberStore(SQLBaseStore): } ) - self.get_rooms_for_user.invalidate(target_user_id) - self.get_joined_hosts_for_room.invalidate(event.room_id) + invalidates.extend([ + (self.get_rooms_for_user.invalidate, target_user_id), + (self.get_joined_hosts_for_room.invalidate, event.room_id), + ]) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index f05182863..e3979846e 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return dict(txn.fetchall()) - def _store_event_content_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + def _store_event_content_hash_txn(self, txn, invalidates, event_id, + algorithm, hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + def _store_event_reference_hash_txn(self, txn, invalidates, event_id, + algorithm, hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore): hashes[algorithm] = hash_bytes return results - def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, - algorithm, hash_bytes): + def _store_prev_event_hash_txn(self, txn, invalidates, event_id, + prev_event_id, algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7e55e8bed..35d11c27c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): f, ) - def _store_state_groups_txn(self, txn, event, context): + def _store_state_groups_txn(self, txn, invalidates, event, context): if context.current_state is None: return From d0fece8d3c4e9db3652785e41176e2a4241eebe1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 15:39:09 +0100 Subject: [PATCH 03/13] Missing return for when the event was already persisted --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b2ab4b02f..16359e876 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -202,7 +202,7 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - return + return invalidates self._handle_prev_events( txn, From bfa4a7f8b023d91f93d4a5f0e8bd592400a2e166 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 15:43:49 +0100 Subject: [PATCH 04/13] Invalidate the room_member cache if the current state events updates --- synapse/storage/events.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 16359e876..7dc49ceed 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -120,6 +120,11 @@ class EventsStore(SQLBaseStore): ) for s in current_state: + if s.type == EventTypes.Member: + invalidates.extend([ + (self.get_rooms_for_user.invalidate, s.state_key), + (self.get_joined_hosts_for_room.invalidate, s.room_id), + ]) self._simple_insert_txn( txn, "current_state_events", From 63075118a528d1abf0b146a961ec5c571bf058b2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 16:24:04 +0100 Subject: [PATCH 05/13] Add debug flag in synapse/storage/_base.py for debugging the cache logic by comparing what is in the cache with what was in the database on every access --- synapse/storage/_base.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7f5477dee..840a4994b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -33,6 +33,7 @@ import sys import time import threading +DEBUG_CACHES = False logger = logging.getLogger(__name__) @@ -146,7 +147,17 @@ def cached(max_entries=1000, num_args=1, lru=False): @defer.inlineCallbacks def wrapped(self, *keyargs): try: - defer.returnValue(cache.get(*keyargs)) + cached_result = cache.get(*keyargs) + if DEBUG_CACHES: + actual_result = yield orig(self, *keyargs) + if actual_result != cached_result: + logger.error( + "Stale cache entry %s%r: cached: %r, actual %r", + orig.__name__, keyargs, + cached_result, actual_result, + ) + raise ValueError("Stale cache entry") + defer.returnValue(cached_result) except KeyError: sequence = cache.sequence From 041b6cba612f5640fe490859a54f0ef140e29d33 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 16:32:44 +0100 Subject: [PATCH 06/13] SYN-369: Add comments to the sequence number logic in the cache --- synapse/storage/_base.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 840a4994b..579ed5637 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -98,6 +98,8 @@ class Cache(object): def update(self, sequence, *args): self.check_thread() if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) self.prefill(*args) def prefill(self, *args): # because I can't *keyargs, value @@ -117,6 +119,8 @@ class Cache(object): self.check_thread() if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) + # Increment the sequence number so that any SELECT statements that + # raced with the INSERT don't update the cache (SYN-369) self.sequence += 1 self.cache.pop(keyargs, None) @@ -159,6 +163,9 @@ def cached(max_entries=1000, num_args=1, lru=False): raise ValueError("Stale cache entry") defer.returnValue(cached_result) except KeyError: + # Get the sequence number of the cache before reading from the + # database so that we can tell if the cache is invalidated + # while the SELECT is executing (SYN-369) sequence = cache.sequence ret = yield orig(self, *keyargs) From 995154239358af589146ab4697e7cb4f100e2d84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 17:06:55 +0100 Subject: [PATCH 07/13] Add a comment about the zip(*[zip(sorted(...),...)]) --- synapse/storage/_base.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b7c3cf03c..94946587f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -453,6 +453,14 @@ class SQLBaseStore(object): if not values: return + # This is a *slight* abomination to get a list of tuples of key names + # and a list of tuples of value names. + # + # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}] + # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)] + # + # The sort is to ensure that we don't rely on dictionary iteration + # order. keys, vals = zip(*[ zip( *(sorted(i.items(), key=lambda kv: kv[0])) From d18f37e026a02b4e899bc96e600850007a613189 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:32:21 +0100 Subject: [PATCH 08/13] Collect the invalidate callbacks on the transaction object rather than passing around a separate list --- synapse/storage/_base.py | 18 ++++++++--- synapse/storage/event_federation.py | 10 +++--- synapse/storage/events.py | 48 +++++++++++++---------------- synapse/storage/room.py | 4 +-- synapse/storage/roommember.py | 8 ++--- synapse/storage/signatures.py | 12 ++++---- synapse/storage/state.py | 2 +- 7 files changed, 51 insertions(+), 51 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 579ed5637..ccf9697fa 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -185,12 +185,16 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name", "database_engine"] + __slots__ = ["txn", "name", "database_engine", "after_callbacks"] - def __init__(self, txn, name, database_engine): + def __init__(self, txn, name, database_engine, after_callbacks): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) object.__setattr__(self, "database_engine", database_engine) + object.__setattr__(self, "after_callbacks", after_callbacks) + + def call_after(self, callback, *args): + self.after_callbacks.append((callback, args)) def __getattr__(self, name): return getattr(self.txn, name) @@ -336,6 +340,8 @@ class SQLBaseStore(object): start_time = time.time() * 1000 + after_callbacks = [] + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: if self.database_engine.is_connection_closed(conn): @@ -360,10 +366,10 @@ class SQLBaseStore(object): while True: try: txn = conn.cursor() - return func( - LoggingTransaction(txn, name, self.database_engine), - *args, **kwargs + txn = LoggingTransaction( + txn, name, self.database_engine, after_callbacks ) + return func(txn, *args, **kwargs) except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -412,6 +418,8 @@ class SQLBaseStore(object): result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) def cursor_to_dict(self, cursor): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3cd3fbdc9..893344eff 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth): + def _update_min_depth_for_room_txn(self, txn, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) do_insert = depth < min_depth if min_depth else True @@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, invalidates, outlier, event_id, - prev_events, room_id): + def _handle_prev_events(self, txn, outlier, event_id, prev_events, + room_id): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -330,9 +330,9 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - invalidates.append(( + txn.call_after( self.get_latest_event_ids_in_room.invalidate, room_id - )) + ) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7dc49ceed..17f9d2728 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_token try: - invalidates = yield self.runInteraction( + yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, @@ -52,11 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - for invalidated in invalidates: - invalidated_callback = invalidated[0] - invalidated_args = invalidated[1:] - invalidated_callback(*invalidated_args) - except _RollbackButIsFineException: pass @@ -96,10 +91,9 @@ class EventsStore(SQLBaseStore): def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): - invalidates = [] # Remove the any existing cache entries for the event_id - invalidates.append((self._invalidate_get_event_cache, event.event_id)) + txn.call_after(self._invalidate_get_event_cache, event.event_id) if stream_ordering is None: with self._stream_id_gen.get_next_txn(txn) as stream_ordering: @@ -121,10 +115,12 @@ class EventsStore(SQLBaseStore): for s in current_state: if s.type == EventTypes.Member: - invalidates.extend([ - (self.get_rooms_for_user.invalidate, s.state_key), - (self.get_joined_hosts_for_room.invalidate, s.room_id), - ]) + txn.call_after( + self.get_rooms_for_user.invalidate, s.state_key + ) + txn.call_after( + self.get_joined_hosts_for_room.invalidate, s.room_id + ) self._simple_insert_txn( txn, "current_state_events", @@ -161,11 +157,10 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, invalidates, event, context) + self._store_state_groups_txn(txn, event, context) self._update_min_depth_for_room_txn( txn, - invalidates, event.room_id, event.depth ) @@ -207,11 +202,10 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - return invalidates + return self._handle_prev_events( txn, - invalidates, outlier=outlier, event_id=event.event_id, prev_events=event.prev_events, @@ -219,13 +213,13 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Member: - self._store_room_member_txn(txn, invalidates, event) + self._store_room_member_txn(txn, event) elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, invalidates, event) + self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, invalidates, event) + self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Redaction: - self._store_redaction(txn, invalidates, event) + self._store_redaction(txn, event) event_dict = { k: v @@ -295,20 +289,20 @@ class EventsStore(SQLBaseStore): if context.rejected: self._store_rejections_txn( - txn, invalidates, event.event_id, context.rejected + txn, event.event_id, context.rejected ) for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( - txn, invalidates, event.event_id, hash_alg, hash_bytes, + txn, event.event_id, hash_alg, hash_bytes, ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_prev_event_hash_txn( - txn, invalidates, event.event_id, prev_event_id, alg, + txn, event.event_id, prev_event_id, alg, hash_bytes ) @@ -325,7 +319,7 @@ class EventsStore(SQLBaseStore): (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( - txn, invalidates, event.event_id, ref_alg, ref_hash_bytes + txn, event.event_id, ref_alg, ref_hash_bytes ) if event.is_state(): @@ -372,11 +366,11 @@ class EventsStore(SQLBaseStore): } ) - return invalidates + return - def _store_redaction(self, txn, invalidates, event): + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event - invalidates.append((self._invalidate_get_event_cache, event.redacts)) + txn.call_after(self._invalidate_get_event_cache, event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index d42d7ff0e..f95637763 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic_txn(self, txn, invalidates, event): + def _store_room_topic_txn(self, txn, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( txn, @@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore): }, ) - def _store_room_name_txn(self, txn, invalidates, event): + def _store_room_name_txn(self, txn, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 117da817b..839c74f63 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, invalidates, event): + def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ try: @@ -64,10 +64,8 @@ class RoomMemberStore(SQLBaseStore): } ) - invalidates.extend([ - (self.get_rooms_for_user.invalidate, target_user_id), - (self.get_joined_hosts_for_room.invalidate, event.room_id), - ]) + txn.call_after(self.get_rooms_for_user.invalidate, target_user_id) + txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index e3979846e..f05182863 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return dict(txn.fetchall()) - def _store_event_content_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_content_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_reference_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore): hashes[algorithm] = hash_bytes return results - def _store_prev_event_hash_txn(self, txn, invalidates, event_id, - prev_event_id, algorithm, hash_bytes): + def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, + algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 35d11c27c..7e55e8bed 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): f, ) - def _store_state_groups_txn(self, txn, invalidates, event, context): + def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return From 3d5a955e08c21c076c55806c3c1e78a19c09ad4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 17:36:57 +0100 Subject: [PATCH 09/13] Missed events are not outliers --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6811a0e3d..904c7c094 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -491,7 +491,7 @@ class FederationClient(FederationBase): ] signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=True + destination, events, outlier=False ) have_gotten_all_from_destination = True From e45b05647e9242ba543562e3ad2bb4141e85ab8c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:38:10 +0100 Subject: [PATCH 10/13] Fix the --help option for synapse --- synapse/config/_base.py | 47 +++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index cd4bd28e8..2807abbc9 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -144,16 +144,17 @@ class Config(object): ) config_args, remaining_args = config_parser.parse_known_args(argv) - if not config_args.config_path: - config_parser.error( - "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" - " -c CONFIG-FILE\"" - ) - - config_dir_path = os.path.dirname(config_args.config_path[0]) - config_dir_path = os.path.abspath(config_dir_path) if config_args.generate_config: + if not config_args.config_path: + config_parser.error( + "Must supply a config file.\nA config file can be automatically" + " generated using \"--generate-config -h SERVER_NAME" + " -c CONFIG-FILE\"" + ) + + config_dir_path = os.path.dirname(config_args.config_path[0]) + config_dir_path = os.path.abspath(config_dir_path) + server_name = config_args.server_name if not server_name: print "Most specify a server_name to a generate config for." @@ -196,6 +197,25 @@ class Config(object): ) sys.exit(0) + parser = argparse.ArgumentParser( + parents=[config_parser], + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + obj.invoke_all("add_arguments", parser) + args = parser.parse_args(remaining_args) + + if not config_args.config_path: + config_parser.error( + "Must supply a config file.\nA config file can be automatically" + " generated using \"--generate-config -h SERVER_NAME" + " -c CONFIG-FILE\"" + ) + + config_dir_path = os.path.dirname(config_args.config_path[0]) + config_dir_path = os.path.abspath(config_dir_path) + specified_config = {} for config_path in config_args.config_path: yaml_config = cls.read_config_file(config_path) @@ -208,15 +228,6 @@ class Config(object): obj.invoke_all("read_config", config) - parser = argparse.ArgumentParser( - parents=[config_parser], - description=description, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - obj.invoke_all("add_arguments", parser) - args = parser.parse_args(remaining_args) - obj.invoke_all("read_arguments", args) return obj From deb0237166afe280847b625260620d8fb675f7d7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:45:11 +0100 Subject: [PATCH 11/13] Add some doc-string --- synapse/storage/_base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ccf9697fa..dbef179b2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -194,6 +194,10 @@ class LoggingTransaction(object): object.__setattr__(self, "after_callbacks", after_callbacks) def call_after(self, callback, *args): + """Call the given callback on the main twisted thread after the + transaction has finished. Used to invalidate the caches on the + correct thread. + """ self.after_callbacks.append((callback, args)) def __getattr__(self, name): From 977338a7afa5e95dba1ce230ba253daf2b239fb5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 18:12:44 +0100 Subject: [PATCH 12/13] Use buffer(...) when inserting into bytea column --- synapse/federation/persistence.py | 4 +--- synapse/storage/transactions.py | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 76a9dcd77..865766eb2 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,8 +23,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from syutil.jsonutil import encode_canonical_json - import logging @@ -71,7 +69,7 @@ class TransactionActions(object): transaction.transaction_id, transaction.origin, code, - encode_canonical_json(response) + response, ) @defer.inlineCallbacks diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 89dd7d894..b5b21a9b1 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached from collections import namedtuple +from syutil.jsonutil import encode_canonical_json import logging logger = logging.getLogger(__name__) @@ -82,7 +83,7 @@ class TransactionStore(SQLBaseStore): "transaction_id": transaction_id, "origin": origin, "response_code": code, - "response_json": response_dict, + "response_json": buffer(encode_canonical_json(response_dict)), }, or_ignore=True, desc="set_received_txn_response", From 0cf7e480b442f9f893b782ab1a437b556c1bbb54 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 18:20:01 +0100 Subject: [PATCH 13/13] And use buffer(...) there as well --- synapse/federation/persistence.py | 2 +- synapse/storage/transactions.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 865766eb2..1a7cc02f9 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -99,5 +99,5 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, response_code, - encode_canonical_json(response_dict) + response_dict, ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b5b21a9b1..624da4a9d 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -162,7 +162,8 @@ class TransactionStore(SQLBaseStore): return self.runInteraction( "delivered_txn", self._delivered_txn, - transaction_id, destination, code, response_dict + transaction_id, destination, code, + buffer(encode_canonical_json(response_dict)), ) def _delivered_txn(self, txn, transaction_id, destination,