From 68e1a872fd3f7dc3bbd8bfcfcc5e6b88b1170a77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 10:58:46 +0100 Subject: [PATCH 01/27] Noop get_new_messages_for_device if token hasn't changed --- synapse/handlers/sync.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91934b0c81..8a3e04c28a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -571,15 +571,18 @@ class SyncHandler(object): user_id, device_id, since_stream_id ) - logger.debug("Getting messages up to %d", now_token.to_device_key) - messages, stream_id = yield self.store.get_new_messages_for_device( - user_id, device_id, now_token.to_device_key - ) - logger.debug("Got messages up to %d: %r", stream_id, messages) - sync_result_builder.now_token = now_token.copy_and_replace( - "to_device_key", stream_id - ) - sync_result_builder.to_device = messages + if since_stream_id and since_stream_id == int(now_token.to_device_key): + logger.debug("Getting messages up to %d", now_token.to_device_key) + messages, stream_id = yield self.store.get_new_messages_for_device( + user_id, device_id, now_token.to_device_key + ) + logger.debug("Got messages up to %d: %r", stream_id, messages) + sync_result_builder.now_token = now_token.copy_and_replace( + "to_device_key", stream_id + ) + sync_result_builder.to_device = messages + else: + sync_result_builder.to_device = [] @defer.inlineCallbacks def _generate_sync_entry_for_account_data(self, sync_result_builder): From 1ee6285905a9c7c038eba9149dbdd67c1945b2e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 11:17:46 +0100 Subject: [PATCH 02/27] Fix check --- synapse/handlers/sync.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8a3e04c28a..a8f8b9a75a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -565,13 +565,12 @@ class SyncHandler(object): if sync_result_builder.since_token is not None: since_stream_id = int(sync_result_builder.since_token.to_device_key) - if since_stream_id: + if since_stream_id != int(now_token.to_device_key): logger.debug("Deleting messages up to %d", since_stream_id) yield self.store.delete_messages_for_device( user_id, device_id, since_stream_id ) - if since_stream_id and since_stream_id == int(now_token.to_device_key): logger.debug("Getting messages up to %d", now_token.to_device_key) messages, stream_id = yield self.store.get_new_messages_for_device( user_id, device_id, now_token.to_device_key From 55e17d36975efbcc51fd88489da0d2681ccede8a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 10:54:30 +0100 Subject: [PATCH 03/27] Fix push room names for rooms with only an alias --- synapse/push/presentable_names.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py index f90b789c05..277da3cd35 100644 --- a/synapse/push/presentable_names.py +++ b/synapse/push/presentable_names.py @@ -74,7 +74,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True alias_event = yield store.get_event( alias_id, allow_none=True ) - if alias_event and alias_event.content and alias_event.get("aliases"): + if alias_event and alias_event.content.get("aliases"): the_aliases = alias_event.content["aliases"] if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]): defer.returnValue(the_aliases[0]) From c8cd87b21bea2fc4a54353849d0b137f09654b18 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 11:23:26 +0100 Subject: [PATCH 04/27] Comment about message deletion --- synapse/handlers/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a8f8b9a75a..14f2032afa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -566,6 +566,9 @@ class SyncHandler(object): since_stream_id = int(sync_result_builder.since_token.to_device_key) if since_stream_id != int(now_token.to_device_key): + # We only delete messages when a new message comes in, but that's + # fine so long as we delete them at some point. + logger.debug("Deleting messages up to %d", since_stream_id) yield self.store.delete_messages_for_device( user_id, device_id, since_stream_id From b3be9e4376b976a5cc4b37d70c4985342d5c30cb Mon Sep 17 00:00:00 2001 From: Fabian Niepelt Date: Tue, 30 Aug 2016 15:03:03 +0200 Subject: [PATCH 05/27] Add prerequisites to install on openSUSE to README Signed-off-by: Fabian Niepelt --- README.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.rst b/README.rst index 323f5b8db7..358038ade0 100644 --- a/README.rst +++ b/README.rst @@ -134,6 +134,12 @@ Installing prerequisites on Raspbian:: sudo pip install --upgrade ndg-httpsclient sudo pip install --upgrade virtualenv +Installing prerequisites on openSUSE:: + + sudo zypper in -t pattern devel_basis + sudo zypper in python-pip python-setuptools sqlite3 python-virtualenv \ + python-devel libffi-devel libopenssl-devel libjpeg62-devel + To install the synapse homeserver run:: virtualenv -p python2.7 ~/.synapse From bc1a8b1f7acd6cddcd46110d37706f0163004ecf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 15:00:14 +0100 Subject: [PATCH 06/27] Don't notify for online -> online transitions. Specifically, if currently_active remains true then we should not notify if only the last active time changes. --- synapse/handlers/presence.py | 7 ++++- tests/handlers/test_presence.py | 47 +++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 73752b2f89..b5a3bcd660 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -922,7 +922,12 @@ def should_notify(old_state, new_state): if new_state.currently_active != old_state.currently_active: return True - if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: + if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: + # Only notify about last active bumps if we're not currently acive + if not (old_state.currently_active and new_state.currently_active): + return True + + elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. return True diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index b531ba8540..d9e8f634ae 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -115,6 +115,53 @@ class PresenceUpdateTestCase(unittest.TestCase): ), ], any_order=True) + def test_online_to_online_last_active_noop(self): + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + prev_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10, + currently_active=True, + ) + + new_state = prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=now, + ) + + state, persist_and_notify, federation_ping = handle_update( + prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + ) + + self.assertFalse(persist_and_notify) + self.assertTrue(federation_ping) + self.assertTrue(state.currently_active) + self.assertEquals(new_state.state, state.state) + self.assertEquals(new_state.status_msg, state.status_msg) + self.assertEquals(state.last_federation_update_ts, now) + + self.assertEquals(wheel_timer.insert.call_count, 3) + wheel_timer.insert.assert_has_calls([ + call( + now=now, + obj=user_id, + then=new_state.last_active_ts + IDLE_TIMER + ), + call( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + ), + call( + now=now, + obj=user_id, + then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY + ), + ], any_order=True) + def test_online_to_online_last_active(self): wheel_timer = Mock() user_id = "@foo:bar" From 928d337c168582c902d0f664857855d41206a117 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 30 Aug 2016 15:33:05 +0100 Subject: [PATCH 07/27] Remove FUD over psql --- README.rst | 3 --- 1 file changed, 3 deletions(-) diff --git a/README.rst b/README.rst index 323f5b8db7..9eebed784b 100644 --- a/README.rst +++ b/README.rst @@ -230,9 +230,6 @@ The advantages of Postgres include: pointing at the same DB master, as well as enabling DB replication in synapse itself. -The only disadvantage is that the code is relatively new as of April 2015 and -may have a few regressions relative to SQLite. - For information on how to install and use PostgreSQL, please see `docs/postgres.rst `_. From 21b977ccfe11832aa1c7b448c6e657cf0df36ad4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 15:39:50 +0100 Subject: [PATCH 08/27] Occaisonally persist unpersisted presence updates --- synapse/handlers/presence.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 73752b2f89..a3c7952a4c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -191,6 +191,13 @@ class PresenceHandler(object): 5000, ) + self.clock.call_later( + 60, + self.clock.looping_call, + self._persist_unpersisted_changes, + 60 * 1000, + ) + metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer)) @defer.inlineCallbacks @@ -216,6 +223,27 @@ class PresenceHandler(object): ]) logger.info("Finished _on_shutdown") + @defer.inlineCallbacks + def _persist_unpersisted_changes(self): + """We periodically persist the unpersisted changes, as otherwise they + may stack up and slow down shutdown times. + """ + logger.info( + "Performing _persist_unpersisted_changes. Persiting %d unpersisted changes", + len(self.user_to_current_state) + ) + + unpersisted = self.unpersisted_users_changes + self.unpersisted_users_changes = set() + + if self.unpersisted_users_changes: + yield self.store.update_presence([ + self.user_to_current_state[user_id] + for user_id in unpersisted + ]) + + logger.info("Finished _persist_unpersisted_changes") + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes From 097330bae8cd948ea34a882290efd2182431b51c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 15:50:20 +0100 Subject: [PATCH 09/27] Check correct variable --- synapse/handlers/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a3c7952a4c..ab33952cd2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -236,7 +236,7 @@ class PresenceHandler(object): unpersisted = self.unpersisted_users_changes self.unpersisted_users_changes = set() - if self.unpersisted_users_changes: + if unpersisted: yield self.store.update_presence([ self.user_to_current_state[user_id] for user_id in unpersisted From e82247f990d8b47fc1d151d6151d82f9d802bebb Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 16:21:16 +0100 Subject: [PATCH 10/27] Allow application services to have an optional 'url' If 'url' is not specified, they will not be pushed for events or queries. This is useful for bots who simply wish to reserve large chunks of user/alias namespace, and don't care about being pushed for events. --- synapse/appservice/api.py | 11 +++++++++++ synapse/config/appservice.py | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 775417eb21..e596cb7376 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -67,6 +67,8 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_user(self, service, user_id): + if service.url == "": + defer.returnValue(False) uri = service.url + ("/users/%s" % urllib.quote(user_id)) response = None try: @@ -86,6 +88,8 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_alias(self, service, alias): + if service.url == "": + defer.returnValue(False) uri = service.url + ("/rooms/%s" % urllib.quote(alias)) response = None try: @@ -113,6 +117,8 @@ class ApplicationServiceApi(SimpleHttpClient): raise ValueError( "Unrecognised 'kind' argument %r to query_3pe()", kind ) + if service.url == "": + defer.returnValue([]) uri = "%s%s/thirdparty/%s/%s" % ( service.url, @@ -145,6 +151,8 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue([]) def get_3pe_protocol(self, service, protocol): + if service.url == "": + defer.returnValue({}) @defer.inlineCallbacks def _get(): uri = "%s%s/thirdparty/protocol/%s" % ( @@ -166,6 +174,9 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): + if service.url == "": + defer.returnValue(True) + events = self._serialize(events) if txn_id is None: diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index dfe43b0b4c..3488a28ff2 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -132,6 +132,12 @@ def _load_appservice(hostname, as_info, config_filename): for p in protocols: if not isinstance(p, str): raise KeyError("Bad value for 'protocols' item") + + if as_info["url"] == "": + logger.info( + "(%s) Explicitly empty 'url' provided. This application service will not receive events or queries.", + config_filename, + ) return ApplicationService( token=as_info["as_token"], url=as_info["url"], From 16b652f0a375a880175b54d5d439bde6a0604dd2 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 16:30:12 +0100 Subject: [PATCH 11/27] Flake8 --- synapse/appservice/api.py | 1 + synapse/config/appservice.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index e596cb7376..ee63f06359 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -153,6 +153,7 @@ class ApplicationServiceApi(SimpleHttpClient): def get_3pe_protocol(self, service, protocol): if service.url == "": defer.returnValue({}) + @defer.inlineCallbacks def _get(): uri = "%s%s/thirdparty/protocol/%s" % ( diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 3488a28ff2..5569c43b35 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -135,7 +135,8 @@ def _load_appservice(hostname, as_info, config_filename): if as_info["url"] == "": logger.info( - "(%s) Explicitly empty 'url' provided. This application service will not receive events or queries.", + "(%s) Explicitly empty 'url' provided. This application service " + + "will not receive events or queries.", config_filename, ) return ApplicationService( From 3e784eff7480ff3d90b1049cb0dabad899b6297f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 16:51:36 +0100 Subject: [PATCH 12/27] Remove state replication stream --- synapse/replication/resource.py | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 84993b33b3..817f50428f 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -40,7 +40,6 @@ STREAM_NAMES = ( ("backfill",), ("push_rules",), ("pushers",), - ("state",), ("caches",), ) @@ -130,7 +129,6 @@ class ReplicationResource(Resource): backfill_token = yield self.store.get_current_backfill_token() push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() - state_token = self.store.get_state_stream_token() caches_token = self.store.get_cache_stream_token() defer.returnValue(_ReplicationToken( @@ -142,7 +140,7 @@ class ReplicationResource(Resource): backfill_token, push_rules_token, pushers_token, - state_token, + 0, # State stream is no longer a thing caches_token, )) @@ -191,7 +189,6 @@ class ReplicationResource(Resource): yield self.receipts(writer, current_token, limit, request_streams) yield self.push_rules(writer, current_token, limit, request_streams) yield self.pushers(writer, current_token, limit, request_streams) - yield self.state(writer, current_token, limit, request_streams) yield self.caches(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) @@ -365,25 +362,6 @@ class ReplicationResource(Resource): "position", "user_id", "app_id", "pushkey" )) - @defer.inlineCallbacks - def state(self, writer, current_token, limit, request_streams): - current_position = current_token.state - - state = request_streams.get("state") - - if state is not None: - state_groups, state_group_state = ( - yield self.store.get_all_new_state_groups( - state, current_position, limit - ) - ) - writer.write_header_and_rows("state_groups", state_groups, ( - "position", "room_id", "event_id" - )) - writer.write_header_and_rows("state_group_state", state_group_state, ( - "position", "type", "state_key", "event_id" - )) - @defer.inlineCallbacks def caches(self, writer, current_token, limit, request_streams): current_position = current_token.caches From 5dc2a702cf2415a55bab8061053c0b47dc21dc93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 16:54:40 +0100 Subject: [PATCH 13/27] Make _state_groups_id_gen a normal IdGenerator --- synapse/storage/__init__.py | 2 +- synapse/storage/events.py | 81 +++++++++++++++++-------------------- synapse/storage/state.py | 3 -- 3 files changed, 39 insertions(+), 47 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4f4c723c5b..6c32773f25 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -115,7 +115,7 @@ class DataStore(RoomMemberStore, RoomStore, ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") - self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id") + self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5cbe8c5978..bc1bc97e19 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -271,39 +271,35 @@ class EventsStore(SQLBaseStore): len(events_and_contexts) ) - state_group_id_manager = self._state_groups_id_gen.get_next_mult( - len(events_and_contexts) - ) with stream_ordering_manager as stream_orderings: - with state_group_id_manager as state_group_ids: - for (event, context), stream, state_group_id in zip( - events_and_contexts, stream_orderings, state_group_ids - ): - event.internal_metadata.stream_ordering = stream - # Assign a state group_id in case a new id is needed for - # this context. In theory we only need to assign this - # for contexts that have current_state and aren't outliers - # but that make the code more complicated. Assigning an ID - # per event only causes the state_group_ids to grow as fast - # as the stream_ordering so in practise shouldn't be a problem. - context.new_state_group_id = state_group_id + for (event, context), stream, in zip( + events_and_contexts, stream_orderings + ): + event.internal_metadata.stream_ordering = stream + # Assign a state group_id in case a new id is needed for + # this context. In theory we only need to assign this + # for contexts that have current_state and aren't outliers + # but that make the code more complicated. Assigning an ID + # per event only causes the state_group_ids to grow as fast + # as the stream_ordering so in practise shouldn't be a problem. + context.new_state_group_id = self._state_groups_id_gen.get_next() - chunks = [ - events_and_contexts[x:x + 100] - for x in xrange(0, len(events_and_contexts), 100) - ] + chunks = [ + events_and_contexts[x:x + 100] + for x in xrange(0, len(events_and_contexts), 100) + ] - for chunk in chunks: - # We can't easily parallelize these since different chunks - # might contain the same event. :( - yield self.runInteraction( - "persist_events", - self._persist_events_txn, - events_and_contexts=chunk, - backfilled=backfilled, - delete_existing=delete_existing, - ) - persist_event_counter.inc_by(len(chunk)) + for chunk in chunks: + # We can't easily parallelize these since different chunks + # might contain the same event. :( + yield self.runInteraction( + "persist_events", + self._persist_events_txn, + events_and_contexts=chunk, + backfilled=backfilled, + delete_existing=delete_existing, + ) + persist_event_counter.inc_by(len(chunk)) @_retry_on_integrity_error @defer.inlineCallbacks @@ -312,19 +308,18 @@ class EventsStore(SQLBaseStore): delete_existing=False): try: with self._stream_id_gen.get_next() as stream_ordering: - with self._state_groups_id_gen.get_next() as state_group_id: - event.internal_metadata.stream_ordering = stream_ordering - context.new_state_group_id = state_group_id - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - current_state=current_state, - backfilled=backfilled, - delete_existing=delete_existing, - ) - persist_event_counter.inc() + event.internal_metadata.stream_ordering = stream_ordering + context.new_state_group_id = self._state_groups_id_gen.get_next() + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + current_state=current_state, + backfilled=backfilled, + delete_existing=delete_existing, + ) + persist_event_counter.inc() except _RollbackButIsFineException: pass diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b1d461fef5..0442353287 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -526,6 +526,3 @@ class StateStore(SQLBaseStore): return self.runInteraction( "get_all_new_state_groups", get_all_new_state_groups_txn ) - - def get_state_stream_token(self): - return self._state_groups_id_gen.get_current_token() From 572acde4832869c8f7f4daf555296bd1164364d0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 17:16:00 +0100 Subject: [PATCH 14/27] Use None instead of the empty string Change how we validate the 'url' field as a result. --- synapse/appservice/api.py | 10 +++++----- synapse/config/appservice.py | 15 +++++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index ee63f06359..cc4af23962 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -67,7 +67,7 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_user(self, service, user_id): - if service.url == "": + if service.url is None: defer.returnValue(False) uri = service.url + ("/users/%s" % urllib.quote(user_id)) response = None @@ -88,7 +88,7 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_alias(self, service, alias): - if service.url == "": + if service.url is None: defer.returnValue(False) uri = service.url + ("/rooms/%s" % urllib.quote(alias)) response = None @@ -117,7 +117,7 @@ class ApplicationServiceApi(SimpleHttpClient): raise ValueError( "Unrecognised 'kind' argument %r to query_3pe()", kind ) - if service.url == "": + if service.url is None: defer.returnValue([]) uri = "%s%s/thirdparty/%s/%s" % ( @@ -151,7 +151,7 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue([]) def get_3pe_protocol(self, service, protocol): - if service.url == "": + if service.url is None: defer.returnValue({}) @defer.inlineCallbacks @@ -175,7 +175,7 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): - if service.url == "": + if service.url is None: defer.returnValue(True) events = self._serialize(events) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 5569c43b35..759209a85b 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -86,7 +86,7 @@ def load_appservices(hostname, config_files): def _load_appservice(hostname, as_info, config_filename): required_string_fields = [ - "id", "url", "as_token", "hs_token", "sender_localpart" + "id", "as_token", "hs_token", "sender_localpart" ] for field in required_string_fields: if not isinstance(as_info.get(field), basestring): @@ -94,6 +94,13 @@ def _load_appservice(hostname, as_info, config_filename): field, config_filename, )) + # 'url' must either be a string or explicitly null, not missing + # to avoid accidentally turning off push for ASes. + if not isinstance(as_info.get("url"), basestring) and as_info.get("url", "") is not None: + raise KeyError( + "Required string field or explicit null: 'url' (%s)" % (config_filename,) + ) + localpart = as_info["sender_localpart"] if urllib.quote(localpart) != localpart: raise ValueError( @@ -133,10 +140,10 @@ def _load_appservice(hostname, as_info, config_filename): if not isinstance(p, str): raise KeyError("Bad value for 'protocols' item") - if as_info["url"] == "": + if as_info["url"] == None: logger.info( - "(%s) Explicitly empty 'url' provided. This application service " + - "will not receive events or queries.", + "(%s) Explicitly empty 'url' provided. This application service" + " will not receive events or queries.", config_filename, ) return ApplicationService( From c88278353512d778f460d8223a2ff173323e8f94 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 17:20:31 +0100 Subject: [PATCH 15/27] flake8 --- synapse/config/appservice.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 759209a85b..d7537e8d44 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -96,7 +96,8 @@ def _load_appservice(hostname, as_info, config_filename): # 'url' must either be a string or explicitly null, not missing # to avoid accidentally turning off push for ASes. - if not isinstance(as_info.get("url"), basestring) and as_info.get("url", "") is not None: + if (not isinstance(as_info.get("url"), basestring) and + as_info.get("url", "") is not None): raise KeyError( "Required string field or explicit null: 'url' (%s)" % (config_filename,) ) @@ -140,7 +141,7 @@ def _load_appservice(hostname, as_info, config_filename): if not isinstance(p, str): raise KeyError("Bad value for 'protocols' item") - if as_info["url"] == None: + if as_info["url"] is None: logger.info( "(%s) Explicitly empty 'url' provided. This application service" " will not receive events or queries.", From d80f64d370e2b7e7d56ef32bfa54c66fa1b1482a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 21:46:39 +0100 Subject: [PATCH 16/27] Fix email notifs by adding missing param --- synapse/push/mailer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 81b1af4a19..2cafcfd8f5 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -338,7 +338,7 @@ class Mailer(object): # want the generated-from-names one here otherwise we'll # end up with, "new message from Bob in the Bob room" room_name = yield calculate_room_name( - state_by_room[room_id], user_id, fallback_to_members=False + self.store, state_by_room[room_id], user_id, fallback_to_members=False ) my_member_event = state_by_room[room_id][("m.room.member", user_id)] From 1bb8ec296de4f6bf37b0b31f28bc0d7285f96ba0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 10:09:46 +0100 Subject: [PATCH 17/27] Generate state group ids in state layer --- synapse/state.py | 9 ++++++--- synapse/storage/events.py | 10 +--------- synapse/storage/state.py | 24 +++++++++++++++++------- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index daec983dc9..147416fd81 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -160,14 +160,14 @@ class StateHandler(object): else: context.current_state_ids = {} context.prev_state_events = [] - context.state_group = None + context.state_group = self.store.get_next_state_group() defer.returnValue(context) if old_state: context.current_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } - context.state_group = None + context.state_group = self.store.get_next_state_group() if event.is_state(): key = (event.type, event.state_key) @@ -193,7 +193,10 @@ class StateHandler(object): group, curr_state = ret context.current_state_ids = curr_state - context.state_group = group if not event.is_state() else None + if event.is_state() or group is None: + context.state_group = self.store.get_next_state_group() + else: + context.state_group = group if event.is_state(): key = (event.type, event.state_key) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bc1bc97e19..1a7d4c5199 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -276,13 +276,6 @@ class EventsStore(SQLBaseStore): events_and_contexts, stream_orderings ): event.internal_metadata.stream_ordering = stream - # Assign a state group_id in case a new id is needed for - # this context. In theory we only need to assign this - # for contexts that have current_state and aren't outliers - # but that make the code more complicated. Assigning an ID - # per event only causes the state_group_ids to grow as fast - # as the stream_ordering so in practise shouldn't be a problem. - context.new_state_group_id = self._state_groups_id_gen.get_next() chunks = [ events_and_contexts[x:x + 100] @@ -309,7 +302,6 @@ class EventsStore(SQLBaseStore): try: with self._stream_id_gen.get_next() as stream_ordering: event.internal_metadata.stream_ordering = stream_ordering - context.new_state_group_id = self._state_groups_id_gen.get_next() yield self.runInteraction( "persist_event", self._persist_event_txn, @@ -523,7 +515,7 @@ class EventsStore(SQLBaseStore): # Add an entry to the ex_outlier_stream table to replicate the # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering - state_group_id = context.state_group or context.new_state_group_id + state_group_id = context.state_group self._simple_insert_txn( txn, table="ex_outlier_stream", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0442353287..56bfdc0b55 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -83,6 +83,14 @@ class StateStore(SQLBaseStore): for group, event_id_map in group_to_ids.items() }) + def _have_persisted_state_group_txn(self, txn, state_group): + txn.execute( + "SELECT count(*) FROM state_groups_state WHERE state_group = ?", + (state_group,) + ) + row = txn.fetchone() + return row and row[0] + def _store_mult_state_groups_txn(self, txn, events_and_contexts): state_groups = {} for event, context in events_and_contexts: @@ -92,8 +100,10 @@ class StateStore(SQLBaseStore): if context.current_state_ids is None: continue - if context.state_group is not None: - state_groups[event.event_id] = context.state_group + state_groups[event.event_id] = context.state_group + + if self._have_persisted_state_group_txn(txn, context.state_group): + logger.info("Already persisted state_group: %r", context.state_group) continue state_event_ids = dict(context.current_state_ids) @@ -101,13 +111,11 @@ class StateStore(SQLBaseStore): if event.is_state(): state_event_ids[(event.type, event.state_key)] = event.event_id - state_group = context.new_state_group_id - self._simple_insert_txn( txn, table="state_groups", values={ - "id": state_group, + "id": context.state_group, "room_id": event.room_id, "event_id": event.event_id, }, @@ -118,7 +126,7 @@ class StateStore(SQLBaseStore): table="state_groups_state", values=[ { - "state_group": state_group, + "state_group": context.state_group, "room_id": event.room_id, "type": key[0], "state_key": key[1], @@ -127,7 +135,6 @@ class StateStore(SQLBaseStore): for key, state_id in state_event_ids.items() ], ) - state_groups[event.event_id] = state_group self._simple_insert_many_txn( txn, @@ -526,3 +533,6 @@ class StateStore(SQLBaseStore): return self.runInteraction( "get_all_new_state_groups", get_all_new_state_groups_txn ) + + def get_next_state_group(self): + return self._state_groups_id_gen.get_next() From ef0cc648cfe74a13268c0df8fc679e0c4f29ecbe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 11:12:02 +0100 Subject: [PATCH 18/27] Clean up old sent transactions --- .../storage/schema/delta/34/sent_txn_purge.py | 32 +++++++++++++++++++ synapse/storage/transactions.py | 2 ++ 2 files changed, 34 insertions(+) create mode 100644 synapse/storage/schema/delta/34/sent_txn_purge.py diff --git a/synapse/storage/schema/delta/34/sent_txn_purge.py b/synapse/storage/schema/delta/34/sent_txn_purge.py new file mode 100644 index 0000000000..81948e3431 --- /dev/null +++ b/synapse/storage/schema/delta/34/sent_txn_purge.py @@ -0,0 +1,32 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute("TRUNCATE sent_transactions") + else: + cur.execute("DELETE FROM sent_transactions") + + cur.execute("CREATE INDEX sent_transactions_ts ON sent_transactions(ts)") + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 58d4de4f1d..1c588bd46b 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -387,8 +387,10 @@ class TransactionStore(SQLBaseStore): def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 + six_hours_ago = now - 6 * 60 * 60 * 1000 def _cleanup_transactions_txn(txn): txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,)) return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) From c10cb581c6ce54e7dfa1f8a0f6449ee7f6d049d4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 13:55:02 +0100 Subject: [PATCH 19/27] Correctly handle the difference between prev and current state --- synapse/api/auth.py | 4 +- synapse/events/snapshot.py | 5 ++- synapse/handlers/federation.py | 31 ++++++++++----- synapse/handlers/message.py | 10 ++--- synapse/handlers/room_member.py | 6 +-- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/state.py | 31 +++++++++++---- synapse/storage/roommember.py | 27 +++++++++++-- synapse/storage/state.py | 3 -- .../replication/slave/storage/test_events.py | 4 +- tests/replication/test_resource.py | 10 +---- tests/test_state.py | 38 ++++++++----------- 12 files changed, 102 insertions(+), 69 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index f26e585623..fcf0b0d25f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -66,7 +66,7 @@ class Auth(object): @defer.inlineCallbacks def check_from_context(self, event, context, do_sig_check=True): auth_events_ids = yield self.compute_auth_events( - event, context.current_state_ids, for_verification=True, + event, context.prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -852,7 +852,7 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): - auth_ids = yield self.compute_auth_events(builder, context.current_state_ids) + auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids) auth_events_entries = yield self.store.add_event_hashes( auth_ids diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index c75afd02d8..e895b1c450 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -15,8 +15,9 @@ class EventContext(object): - def __init__(self, current_state_ids=None): - self.current_state_ids = current_state_ids + def __init__(self): + self.current_state_ids = None + self.prev_state_ids = None self.state_group = None self.rejected = False self.push_actions = [] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a7ea8fb98f..8e61d74b13 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -222,7 +222,7 @@ class FederationHandler(BaseHandler): # joined the room. Don't bother if the user is just # changing their profile info. newly_joined = True - prev_state_id = context.current_state_ids.get( + prev_state_id = context.prev_state_ids.get( (event.type, event.state_key) ) if prev_state_id: @@ -835,12 +835,12 @@ class FederationHandler(BaseHandler): self.replication_layer.send_pdu(new_pdu, destinations) - state_ids = context.current_state_ids.values() + state_ids = context.prev_state_ids.values() auth_chain = yield self.store.get_auth_chain(set( [event.event_id] + state_ids )) - state = yield self.store.get_events(context.current_state_ids.values()) + state = yield self.store.get_events(context.prev_state_ids.values()) defer.returnValue({ "state": state.values(), @@ -1333,7 +1333,7 @@ class FederationHandler(BaseHandler): if not auth_events: auth_events_ids = yield self.auth.compute_auth_events( - event, context.current_state_ids, for_verification=True, + event, context.prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -1432,6 +1432,11 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) event_auth_events = set(e_id for e_id, _ in event.auth_events) + if event.is_state(): + event_key = (event.type, event.state_key) + else: + event_key = None + if event_auth_events - current_state: have_events = yield self.store.have_events( event_auth_events - current_state @@ -1537,8 +1542,12 @@ class FederationHandler(BaseHandler): context.current_state_ids.update({ k: a.event_id for k, a in auth_events.items() + if k != event_key }) - context.state_group = None + context.prev_state_ids.update({ + k: a.event_id for k, a in auth_events.items() + }) + context.state_group = self.store.get_next_state_group() if different_auth and not event.internal_metadata.is_outlier(): logger.info("Different auth after resolution: %s", different_auth) @@ -1560,7 +1569,7 @@ class FederationHandler(BaseHandler): if do_resolution: # 1. Get what we think is the auth chain. auth_ids = yield self.auth.compute_auth_events( - event, context.current_state_ids + event, context.prev_state_ids ) local_auth_chain = yield self.store.get_auth_chain(auth_ids) @@ -1618,8 +1627,12 @@ class FederationHandler(BaseHandler): context.current_state_ids.update({ k: a.event_id for k, a in auth_events.items() + if k != event_key }) - context.state_group = None + context.prev_state_ids.update({ + k: a.event_id for k, a in auth_events.items() + }) + context.state_group = self.store.get_next_state_group() try: self.auth.check(event, auth_events=auth_events) @@ -1855,7 +1868,7 @@ class FederationHandler(BaseHandler): event.content["third_party_invite"]["signed"]["token"] ) original_invite = None - original_invite_id = context.current_state_ids.get(key) + original_invite_id = context.prev_state_ids.get(key) if original_invite_id: original_invite = yield self.store.get_event( original_invite_id, allow_none=True @@ -1893,7 +1906,7 @@ class FederationHandler(BaseHandler): signed = event.content["third_party_invite"]["signed"] token = signed["token"] - invite_event_id = context.current_state_ids.get( + invite_event_id = context.prev_state_ids.get( (EventTypes.ThirdPartyInvite, token,) ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e2f4387f60..3577db0595 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -272,7 +272,7 @@ class MessageHandler(BaseHandler): If so, returns the version of the event in context. Otherwise, returns None. """ - prev_event_id = context.current_state_ids.get((event.type, event.state_key)) + prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) prev_event = yield self.store.get_event(prev_event_id, allow_none=True) if not prev_event: return @@ -808,8 +808,8 @@ class MessageHandler(BaseHandler): event = builder.build() logger.debug( - "Created event %s with current state: %s", - event.event_id, context.current_state_ids, + "Created event %s with state: %s", + event.event_id, context.prev_state_ids, ) defer.returnValue( @@ -904,7 +904,7 @@ class MessageHandler(BaseHandler): if event.type == EventTypes.Redaction: auth_events_ids = yield self.auth.compute_auth_events( - event, context.current_state_ids, for_verification=True, + event, context.prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -924,7 +924,7 @@ class MessageHandler(BaseHandler): "You don't have permission to redact events" ) - if event.type == EventTypes.Create and context.current_state_ids: + if event.type == EventTypes.Create and context.prev_state_ids: raise AuthError( 403, "Changing the room create event is forbidden", diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index dd4b90ee24..3ba5335af7 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -93,7 +93,7 @@ class RoomMemberHandler(BaseHandler): ratelimit=ratelimit, ) - prev_member_event_id = context.current_state_ids.get( + prev_member_event_id = context.prev_state_ids.get( (EventTypes.Member, target.to_string()), None ) @@ -341,7 +341,7 @@ class RoomMemberHandler(BaseHandler): if event.membership == Membership.JOIN: if requester.is_guest: - guest_can_join = yield self._can_guest_join(context.current_state_ids) + guest_can_join = yield self._can_guest_join(context.prev_state_ids) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. @@ -355,7 +355,7 @@ class RoomMemberHandler(BaseHandler): ratelimit=ratelimit, ) - prev_member_event_id = context.current_state_ids.get( + prev_member_event_id = context.prev_state_ids.get( (EventTypes.Member, event.state_key), None ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 51cb21ee9d..6ff9a06de1 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -87,7 +87,7 @@ class BulkPushRuleEvaluator: ) room_members = yield self.store.get_joined_users_from_context( - event.room_id, context.state_group, context.current_state_ids + event, context ) evaluator = PushRuleEvaluatorForEvent(event, len(room_members)) diff --git a/synapse/state.py b/synapse/state.py index 147416fd81..a0f807e3b9 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -128,7 +128,7 @@ class StateHandler(object): def get_current_user_in_room(self, room_id): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) group, state_ids = yield self.resolve_state_groups(room_id, latest_event_ids) - joined_users = yield self.store.get_joined_users_from_context( + joined_users = yield self.store.get_joined_users_from_state( room_id, group, state_ids ) defer.returnValue(joined_users) @@ -154,27 +154,38 @@ class StateHandler(object): # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. if old_state: - context.current_state_ids = { + context.prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } + if event.is_state(): + context.current_state_events = dict(context.prev_state_ids) + key = (event.type, event.state_key) + context.current_state_events[key] = event.event_id + else: + context.current_state_events = context.prev_state_ids else: context.current_state_ids = {} + context.prev_state_ids = {} context.prev_state_events = [] context.state_group = self.store.get_next_state_group() defer.returnValue(context) if old_state: - context.current_state_ids = { + context.prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } context.state_group = self.store.get_next_state_group() if event.is_state(): key = (event.type, event.state_key) - if key in context.current_state_ids: - replaces = context.current_state_ids[key] + if key in context.prev_state_ids: + replaces = context.prev_state_ids[key] if replaces != event.event_id: # Paranoia check event.unsigned["replaces_state"] = replaces + context.current_state_ids = dict(context.prev_state_ids) + context.current_state_ids[key] = event.event_id + else: + context.current_state_ids = context.prev_state_ids context.prev_state_events = [] defer.returnValue(context) @@ -192,7 +203,7 @@ class StateHandler(object): group, curr_state = ret - context.current_state_ids = curr_state + context.prev_state_ids = curr_state if event.is_state() or group is None: context.state_group = self.store.get_next_state_group() else: @@ -200,9 +211,13 @@ class StateHandler(object): if event.is_state(): key = (event.type, event.state_key) - if key in context.current_state_ids: - replaces = context.current_state_ids[key] + if key in context.prev_state_ids: + replaces = context.prev_state_ids[key] event.unsigned["replaces_state"] = replaces + context.current_state_ids = dict(context.prev_state_ids) + context.current_state_ids[key] = event.event_id + else: + context.current_state_ids = context.prev_state_ids context.prev_state_events = [] defer.returnValue(context) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index cab1660830..6ab10db328 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -354,7 +354,8 @@ class RoomMemberStore(SQLBaseStore): desc="who_forgot" ) - def get_joined_users_from_context(self, room_id, state_group, state_ids): + def get_joined_users_from_context(self, event, context): + state_group = context.state_group if not state_group: # If state_group is None it means it has yet to be assigned a # state group, i.e. we need to make sure that calls with a state_group @@ -363,12 +364,24 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_users_from_context( - room_id, state_group, state_ids + event.room_id, state_group, context.current_state_ids, event=event, + ) + + def get_joined_users_from_state(self, room_id, state_group, state_ids): + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + return self._get_joined_users_from_context( + room_id, state_group, state_ids, ) @cachedInlineCallbacks(num_args=2, cache_context=True) def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, - cache_context): + cache_context, event=None): # We don't use `state_group`, its there so that we can cache based # on it. However, its important that its never None, since two current_state's # with a state_group of None are likely to be different. @@ -393,7 +406,13 @@ class RoomMemberStore(SQLBaseStore): desc="_get_joined_users_from_context", ) - defer.returnValue(set(row["user_id"] for row in rows)) + users_in_room = set(row["user_id"] for row in rows) + if event is not None and event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + if event.event_id in member_event_ids: + users_in_room.add(event.state_key) + + defer.returnValue(users_in_room) def is_host_joined(self, room_id, host, state_group, state_ids): if not state_group: diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 56bfdc0b55..dce5a2f135 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -108,9 +108,6 @@ class StateStore(SQLBaseStore): state_event_ids = dict(context.current_state_ids) - if event.is_state(): - state_event_ids[(event.type, event.state_key)] = event.event_id - self._simple_insert_txn( txn, table="state_groups", diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 218cb24889..44e859b5d1 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -312,7 +312,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): else: state_ids = None - context = EventContext(current_state_ids=state_ids) + context = EventContext() + context.current_state_ids = state_ids + context.prev_state_ids = state_ids context.push_actions = push_actions ordering = None diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index e70ac6f14d..b69832cc1b 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -60,8 +60,8 @@ class ReplicationResourceCase(unittest.TestCase): self.assertEquals(body, {}) @defer.inlineCallbacks - def test_events_and_state(self): - get = self.get(events="-1", state="-1", timeout="0") + def test_events(self): + get = self.get(events="-1", timeout="0") yield self.hs.get_handlers().room_creation_handler.create_room( synapse.types.create_requester(self.user), {} ) @@ -70,12 +70,6 @@ class ReplicationResourceCase(unittest.TestCase): self.assertEquals(body["events"]["field_names"], [ "position", "internal", "json", "state_group" ]) - self.assertEquals(body["state_groups"]["field_names"], [ - "position", "room_id", "event_id" - ]) - self.assertEquals(body["state_group_state"]["field_names"], [ - "position", "type", "state_key", "event_id" - ]) @defer.inlineCallbacks def test_presence(self): diff --git a/tests/test_state.py b/tests/test_state.py index de2d35145a..6454f994e3 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -86,17 +86,8 @@ class StateGroupStore(object): state_events = dict(context.current_state_ids) - if event.is_state(): - state_events[(event.type, event.state_key)] = event.event_id - - state_group = context.state_group - if not state_group: - state_group = self._next_group - self._next_group += 1 - - self._group_to_state[state_group] = state_events - - self._event_to_state_group[event.event_id] = state_group + self._group_to_state[context.state_group] = state_events + self._event_to_state_group[event.event_id] = context.state_group def get_events(self, event_ids, **kwargs): return { @@ -151,6 +142,7 @@ class StateTestCase(unittest.TestCase): "get_state_groups_ids", "add_event_hashes", "get_events", + "get_next_state_group", ] ) hs = Mock(spec_set=[ @@ -161,6 +153,8 @@ class StateTestCase(unittest.TestCase): hs.get_clock.return_value = MockClock() hs.get_auth.return_value = Auth(hs) + self.store.get_next_state_group.side_effect = Mock + self.state = StateHandler(hs) self.event_id = 0 @@ -209,7 +203,7 @@ class StateTestCase(unittest.TestCase): store.store_state_groups(event, context) context_store[event.event_id] = context - self.assertEqual(2, len(context_store["D"].current_state_ids)) + self.assertEqual(2, len(context_store["D"].prev_state_ids)) @defer.inlineCallbacks def test_branch_basic_conflict(self): @@ -265,7 +259,7 @@ class StateTestCase(unittest.TestCase): self.assertSetEqual( {"START", "A", "C"}, - {e_id for e_id in context_store["D"].current_state_ids.values()} + {e_id for e_id in context_store["D"].prev_state_ids.values()} ) @defer.inlineCallbacks @@ -331,7 +325,7 @@ class StateTestCase(unittest.TestCase): self.assertSetEqual( {"START", "A", "B", "C"}, - {e for e in context_store["E"].current_state_ids.values()} + {e for e in context_store["E"].prev_state_ids.values()} ) @defer.inlineCallbacks @@ -414,7 +408,7 @@ class StateTestCase(unittest.TestCase): self.assertSetEqual( {"A1", "A2", "A3", "A5", "B"}, - {e for e in context_store["D"].current_state_ids.values()} + {e for e in context_store["D"].prev_state_ids.values()} ) def _add_depths(self, nodes, edges): @@ -447,7 +441,7 @@ class StateTestCase(unittest.TestCase): set(e.event_id for e in old_state), set(context.current_state_ids.values()) ) - self.assertIsNone(context.state_group) + self.assertIsNotNone(context.state_group) @defer.inlineCallbacks def test_annotate_with_old_state(self): @@ -464,11 +458,9 @@ class StateTestCase(unittest.TestCase): ) self.assertEqual( - set(e.event_id for e in old_state), set(context.current_state_ids.values()) + set(e.event_id for e in old_state), set(context.prev_state_ids.values()) ) - self.assertIsNone(context.state_group) - @defer.inlineCallbacks def test_trivial_annotate_message(self): event = create_event(type="test_message", name="event") @@ -514,10 +506,10 @@ class StateTestCase(unittest.TestCase): self.assertEqual( set([e.event_id for e in old_state]), - set(context.current_state_ids.values()) + set(context.prev_state_ids.values()) ) - self.assertIsNone(context.state_group) + self.assertIsNotNone(context.state_group) @defer.inlineCallbacks def test_resolve_message_conflict(self): @@ -550,7 +542,7 @@ class StateTestCase(unittest.TestCase): self.assertEqual(len(context.current_state_ids), 6) - self.assertIsNone(context.state_group) + self.assertIsNotNone(context.state_group) @defer.inlineCallbacks def test_resolve_state_conflict(self): @@ -583,7 +575,7 @@ class StateTestCase(unittest.TestCase): self.assertEqual(len(context.current_state_ids), 6) - self.assertIsNone(context.state_group) + self.assertIsNotNone(context.state_group) @defer.inlineCallbacks def test_standard_depth_conflict(self): From 826ca61745e243e3214a07374fca14ce63d3033a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 14:45:04 +0100 Subject: [PATCH 20/27] Add storage function to SlaveStore --- synapse/replication/slave/storage/events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index b14ed4d2d8..cbebd5b2f7 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -123,6 +123,7 @@ class SlavedEventStore(BaseSlavedStore): get_state_groups_ids = DataStore.get_state_groups_ids.__func__ get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__ get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__ + get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__ get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__ _get_joined_users_from_context = ( RoomMemberStore.__dict__["_get_joined_users_from_context"] From f51888530d25f4b4e4d40b337ee65bbace8c7d69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 14:58:11 +0100 Subject: [PATCH 21/27] Always specify state_group so that its in the cache --- synapse/state.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/state.py b/synapse/state.py index a0f807e3b9..4d48cc4605 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -302,6 +302,8 @@ class StateHandler(object): if new_state_event_ids == frozenset(e_id for e_id in events): state_group = sg break + if not state_group: + state_group = self.store.get_next_state_group() if self._state_cache is not None: cache = _StateCacheEntry( From ed7a703d4c61feaae437cd4bc11c2afea2dc4ad4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 15:53:19 +0100 Subject: [PATCH 22/27] Handle the fact that workers can't generate state groups --- synapse/api/auth.py | 6 ++-- synapse/state.py | 81 +++++++++++++++++++++++++++++++-------------- 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index fcf0b0d25f..dcda40863f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -281,11 +281,13 @@ class Auth(object): with Measure(self.clock, "check_host_in_room"): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - group, curr_state_ids = yield self.state.resolve_state_groups( + entry = yield self.state.resolve_state_groups( room_id, latest_event_ids ) - ret = yield self.store.is_host_joined(room_id, host, group, curr_state_ids) + ret = yield self.store.is_host_joined( + room_id, host, entry.state_group, entry.state + ) defer.returnValue(ret) def check_event_sender_in_room(self, event, auth_events): diff --git a/synapse/state.py b/synapse/state.py index 4d48cc4605..b31bbcdbd2 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -43,11 +43,35 @@ SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 +_NEXT_STATE_ID = 1 + + +def _gen_state_id(): + global _NEXT_STATE_ID + s = "X%d" % (_NEXT_STATE_ID,) + _NEXT_STATE_ID += 1 + return s + + class _StateCacheEntry(object): - def __init__(self, state, state_group, ts): + __slots__ = ["state", "state_group", "state_id"] + + def __init__(self, state, state_group): self.state = state self.state_group = state_group + # The `state_id` is a unique ID we generate that can be used as ID for + # this collection of state. Usually this would be the same as the + # state group, but on worker instances we can't generate a new state + # group each time we resolve state, so we generate a separate one that + # isn't persisted and is used solely for caches. + # `state_id` is either a state_group (and so an int) or a string. This + # ensures we don't accidentally persist a state_id as a stateg_group + if state_group: + self.state_id = state_group + else: + self.state_id = _gen_state_id() + class StateHandler(object): """ Responsible for doing state conflict resolution. @@ -93,7 +117,8 @@ class StateHandler(object): if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - _, state = yield self.resolve_state_groups(room_id, latest_event_ids) + ret = yield self.resolve_state_groups(room_id, latest_event_ids) + state = ret.state if event_type: event_id = state.get((event_type, state_key)) @@ -116,7 +141,8 @@ class StateHandler(object): if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - _, state = yield self.resolve_state_groups(room_id, latest_event_ids) + ret = yield self.resolve_state_groups(room_id, latest_event_ids) + state = ret.state if event_type: defer.returnValue(state.get((event_type, state_key))) @@ -127,9 +153,9 @@ class StateHandler(object): @defer.inlineCallbacks def get_current_user_in_room(self, room_id): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - group, state_ids = yield self.resolve_state_groups(room_id, latest_event_ids) + entry = yield self.resolve_state_groups(room_id, latest_event_ids) joined_users = yield self.store.get_joined_users_from_state( - room_id, group, state_ids + room_id, entry.state_id, entry.state ) defer.returnValue(joined_users) @@ -191,23 +217,26 @@ class StateHandler(object): defer.returnValue(context) if event.is_state(): - ret = yield self.resolve_state_groups( + entry = yield self.resolve_state_groups( event.room_id, [e for e, _ in event.prev_events], event_type=event.type, state_key=event.state_key, ) else: - ret = yield self.resolve_state_groups( + entry = yield self.resolve_state_groups( event.room_id, [e for e, _ in event.prev_events], ) - group, curr_state = ret + curr_state = entry.state context.prev_state_ids = curr_state - if event.is_state() or group is None: + if event.is_state(): context.state_group = self.store.get_next_state_group() else: - context.state_group = group + if entry.state_group is None: + entry.state_group = self.store.get_next_state_group() + entry.state_id = entry.state_group + context.state_group = entry.state_group if event.is_state(): key = (event.type, event.state_key) @@ -249,16 +278,15 @@ class StateHandler(object): if len(group_names) == 1: name, state_list = state_groups_ids.items().pop() - defer.returnValue((name, state_list,)) + defer.returnValue(_StateCacheEntry( + state=state_list, + state_group=name, + )) if self._state_cache is not None: cache = self._state_cache.get(group_names, None) if cache: - cache.ts = self.clock.time_msec() - - defer.returnValue( - (cache.state_group, cache.state,) - ) + defer.returnValue(cache) logger.info( "Resolving state for %s with %d groups", room_id, len(state_groups_ids) @@ -302,19 +330,22 @@ class StateHandler(object): if new_state_event_ids == frozenset(e_id for e_id in events): state_group = sg break - if not state_group: - state_group = self.store.get_next_state_group() + if state_group is None: + # Worker instances don't have access to this method, but we want + # to set the state_group on the main instance to increase cache + # hits. + if hasattr(self.store, "get_next_state_group"): + state_group = self.store.get_next_state_group() + + cache = _StateCacheEntry( + state=new_state, + state_group=state_group, + ) if self._state_cache is not None: - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - ts=self.clock.time_msec() - ) - self._state_cache[group_names] = cache - defer.returnValue((state_group, new_state,)) + defer.returnValue(cache) def resolve_events(self, state_sets, event): logger.info( From 5405351b147cb5e1b62651fd0c5ad95c5e0569e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 16:19:36 +0100 Subject: [PATCH 23/27] Lower get_linearized_receipts_for_room cache size --- synapse/storage/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index ccc3811e84..9747a04a9a 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -145,7 +145,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000, tree=True) + @cachedInlineCallbacks(num_args=3, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. From 0cfd6c316193895b6fd5c761253305cf8688cef1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 16:25:57 +0100 Subject: [PATCH 24/27] Use state_groups table to test existence --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dce5a2f135..ec551b0b4f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,7 +85,7 @@ class StateStore(SQLBaseStore): def _have_persisted_state_group_txn(self, txn, state_group): txn.execute( - "SELECT count(*) FROM state_groups_state WHERE state_group = ?", + "SELECT count(*) FROM state_groups WHERE id = ?", (state_group,) ) row = txn.fetchone() From 516a272aca16670961c3d337a152ce507371c0cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 10:55:02 +0100 Subject: [PATCH 25/27] Ensure we only return a validated pdu in get_pdu --- synapse/federation/federation_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f2b3aceb49..7212903eac 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -267,7 +267,7 @@ class FederationClient(FederationBase): pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) - pdu = None + signed_pdu = None for destination in destinations: now = self._clock.time_msec() last_attempt = pdu_attempts.get(destination, 0) @@ -297,7 +297,7 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - pdu = yield self._check_sigs_and_hashes([pdu])[0] + signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] break @@ -320,10 +320,10 @@ class FederationClient(FederationBase): ) continue - if self._get_pdu_cache is not None and pdu: - self._get_pdu_cache[event_id] = pdu + if self._get_pdu_cache is not None and signed_pdu: + self._get_pdu_cache[event_id] = signed_pdu - defer.returnValue(pdu) + defer.returnValue(signed_pdu) @defer.inlineCallbacks @log_function From 265d847ffd3f9b7a123974a7c1cafd2487720c96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 14:50:06 +0100 Subject: [PATCH 26/27] Fix typo in log line --- synapse/handlers/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d22adadc38..cf82a2336e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -230,7 +230,7 @@ class PresenceHandler(object): """ logger.info( "Performing _persist_unpersisted_changes. Persiting %d unpersisted changes", - len(self.user_to_current_state) + len(self.unpersisted_users_changes) ) unpersisted = self.unpersisted_users_changes From 051a9ea92134b3bf8dec27cedf2e8410c731dfc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 14:55:03 +0100 Subject: [PATCH 27/27] Linearize state resolution to help caches --- synapse/state.py | 115 ++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 56 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index b31bbcdbd2..cd792afed1 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.auth import AuthEventTypes from synapse.events.snapshot import EventContext +from synapse.util.async import Linearizer from collections import namedtuple @@ -84,6 +85,7 @@ class StateHandler(object): # dict of set of event_ids -> _StateCacheEntry. self._state_cache = None + self.resolve_linearizer = Linearizer() def start_caching(self): logger.debug("start_caching") @@ -283,69 +285,70 @@ class StateHandler(object): state_group=name, )) - if self._state_cache is not None: - cache = self._state_cache.get(group_names, None) - if cache: - defer.returnValue(cache) + with (yield self.resolve_linearizer.queue(group_names)): + if self._state_cache is not None: + cache = self._state_cache.get(group_names, None) + if cache: + defer.returnValue(cache) - logger.info( - "Resolving state for %s with %d groups", room_id, len(state_groups_ids) - ) - - state = {} - for st in state_groups_ids.values(): - for key, e_id in st.items(): - state.setdefault(key, set()).add(e_id) - - conflicted_state = { - k: list(v) - for k, v in state.items() - if len(v) > 1 - } - - if conflicted_state: - logger.info("Resolving conflicted state for %r", room_id) - state_map = yield self.store.get_events( - [e_id for st in state_groups_ids.values() for e_id in st.values()], - get_prev_content=False + logger.info( + "Resolving state for %s with %d groups", room_id, len(state_groups_ids) ) - state_sets = [ - [state_map[e_id] for key, e_id in st.items() if e_id in state_map] - for st in state_groups_ids.values() - ] - new_state, _ = self._resolve_events( - state_sets, event_type, state_key - ) - new_state = { - key: e.event_id for key, e in new_state.items() - } - else: - new_state = { - key: e_ids.pop() for key, e_ids in state.items() + + state = {} + for st in state_groups_ids.values(): + for key, e_id in st.items(): + state.setdefault(key, set()).add(e_id) + + conflicted_state = { + k: list(v) + for k, v in state.items() + if len(v) > 1 } - state_group = None - new_state_event_ids = frozenset(new_state.values()) - for sg, events in state_groups_ids.items(): - if new_state_event_ids == frozenset(e_id for e_id in events): - state_group = sg - break - if state_group is None: - # Worker instances don't have access to this method, but we want - # to set the state_group on the main instance to increase cache - # hits. - if hasattr(self.store, "get_next_state_group"): - state_group = self.store.get_next_state_group() + if conflicted_state: + logger.info("Resolving conflicted state for %r", room_id) + state_map = yield self.store.get_events( + [e_id for st in state_groups_ids.values() for e_id in st.values()], + get_prev_content=False + ) + state_sets = [ + [state_map[e_id] for key, e_id in st.items() if e_id in state_map] + for st in state_groups_ids.values() + ] + new_state, _ = self._resolve_events( + state_sets, event_type, state_key + ) + new_state = { + key: e.event_id for key, e in new_state.items() + } + else: + new_state = { + key: e_ids.pop() for key, e_ids in state.items() + } - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - ) + state_group = None + new_state_event_ids = frozenset(new_state.values()) + for sg, events in state_groups_ids.items(): + if new_state_event_ids == frozenset(e_id for e_id in events): + state_group = sg + break + if state_group is None: + # Worker instances don't have access to this method, but we want + # to set the state_group on the main instance to increase cache + # hits. + if hasattr(self.store, "get_next_state_group"): + state_group = self.store.get_next_state_group() - if self._state_cache is not None: - self._state_cache[group_names] = cache + cache = _StateCacheEntry( + state=new_state, + state_group=state_group, + ) - defer.returnValue(cache) + if self._state_cache is not None: + self._state_cache[group_names] = cache + + defer.returnValue(cache) def resolve_events(self, state_sets, event): logger.info(