From 9e982750ee5d0872c2157a444070878f2e3a6e4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 Mar 2016 13:24:31 +0000 Subject: [PATCH 01/59] Persist rejection of invites over federation --- synapse/handlers/federation.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 27f2b40bf..86ed37e9f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -813,7 +813,23 @@ class FederationHandler(BaseHandler): target_hosts, signed_event ) - defer.returnValue(None) + + context = yield self.state_handler.compute_event_context(event) + + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=False, + ) + + target_user = UserID.from_string(event.state_key) + with PreserveLoggingContext(): + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) + + defer.returnValue(event) @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, From e5f0e5893127b9474ed8ea38827a9d143cbff1e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 Mar 2016 13:48:40 +0000 Subject: [PATCH 02/59] Remove needless PreserveLoggingContext --- synapse/handlers/federation.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 86ed37e9f..f599e817a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -823,11 +823,10 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) defer.returnValue(event) From b6e8420aeed9921ba7d0fd4c8ebaf1b64d5f677c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Mar 2016 17:01:43 +0000 Subject: [PATCH 03/59] Add replication stream for pushers --- synapse/replication/resource.py | 25 +++++++- synapse/storage/__init__.py | 5 +- synapse/storage/pusher.py | 63 ++++++++++++++----- .../schema/delta/30/deleted_pushers.sql | 24 +++++++ synapse/storage/util/id_generators.py | 7 ++- tests/replication/test_resource.py | 1 + 6 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 synapse/storage/schema/delta/30/deleted_pushers.sql diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index adc1eb1d0..8c1ae0fbc 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -37,6 +37,7 @@ STREAM_NAMES = ( ("user_account_data", "room_account_data", "tag_account_data",), ("backfill",), ("push_rules",), + ("pushers",), ) @@ -65,6 +66,7 @@ class ReplicationResource(Resource): * "tag_account_data": Per room per user tags. * "backfill": Old events that have been backfilled from other servers. * "push_rules": Per user changes to push rules. + * "pushers": Per user changes to their pushers. The API takes two additional query parameters: @@ -120,6 +122,7 @@ class ReplicationResource(Resource): stream_token = yield self.sources.get_current_token() backfill_token = yield self.store.get_current_backfill_token() push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() + pushers_token = self.store.get_pushers_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -129,6 +132,7 @@ class ReplicationResource(Resource): int(stream_token.account_data_key), backfill_token, push_rules_token, + pushers_token, )) @request_handler @@ -151,6 +155,7 @@ class ReplicationResource(Resource): yield self.typing(writer, current_token) # TODO: implement limit yield self.receipts(writer, current_token, limit) yield self.push_rules(writer, current_token, limit) + yield self.pushers(writer, current_token, limit) self.streams(writer, current_token) logger.info("Replicated %d rows", writer.total) @@ -297,6 +302,24 @@ class ReplicationResource(Resource): "priority_class", "priority", "conditions", "actions" )) + @defer.inlineCallbacks + def pushers(self, writer, current_token, limit): + current_position = current_token.pushers + + pushers = parse_integer(writer.request, "pushers") + if pushers is not None: + updated, deleted = yield self.store.get_all_updated_pushers( + pushers, current_position, limit + ) + writer.write_header_and_rows("pushers", updated, ( + "position", "user_id", "access_token", "profile_tag", "kind", + "app_id", "app_display_name", "device_display_name", "pushkey", + "ts", "lang", "data" + )) + writer.write_header_and_rows("deleted", deleted, ( + "position", "user_id", "app_id", "pushkey" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -327,7 +350,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules" + "push_rules", "pushers" ))): __slots__ = [] diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 168eb27b0..250ba536e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -119,12 +119,15 @@ class DataStore(RoomMemberStore, RoomStore, self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") - self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_stream_id_gen = ChainedIdGenerator( self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" ) + self._pushers_id_gen = StreamIdGenerator( + db_conn, "pushers", "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) events_max = self._stream_id_gen.get_max_token() event_cache_prefill, min_event_val = self._get_cache_dict( diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 7693ab908..29da3bbd1 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -16,8 +16,6 @@ from ._base import SQLBaseStore from twisted.internet import defer -from synapse.api.errors import StoreError - from canonicaljson import encode_canonical_json import logging @@ -79,12 +77,41 @@ class PusherStore(SQLBaseStore): rows = yield self.runInteraction("get_all_pushers", get_pushers) defer.returnValue(rows) + def get_pushers_stream_token(self): + return self._pushers_id_gen.get_max_token() + + def get_all_updated_pushers(self, last_id, current_id, limit): + def get_all_updated_pushers_txn(txn): + sql = ( + "SELECT id, user_name, access_token, profile_tag, kind," + " app_id, app_display_name, device_display_name, pushkey, ts," + " lang, data" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + updated = txn.fetchall() + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + deleted = txn.fetchall() + + return (updated, deleted) + return self.runInteraction( + "get_all_updated_pushers", get_all_updated_pushers_txn + ) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data, profile_tag=""): - try: - next_id = self._pushers_id_gen.get_next() + with self._pushers_id_gen.get_next() as stream_id: yield self._simple_upsert( "pushers", dict( @@ -101,23 +128,29 @@ class PusherStore(SQLBaseStore): lang=lang, data=encode_canonical_json(data), profile_tag=profile_tag, - ), - insertion_values=dict( - id=next_id, + id=stream_id, ), desc="add_pusher", ) - except Exception as e: - logger.error("create_pusher with failed: %s", e) - raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): - yield self._simple_delete_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, 'user_name': user_id}, - desc="delete_pusher_by_app_id_pushkey_user_id", - ) + def delete_pusher_txn(txn, stream_id): + self._simple_delete_one( + txn, + "pushers", + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} + ) + self._simple_upsert_txn( + txn, + "deleted_pushers", + {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, + {"stream_id", stream_id}, + ) + with self._pushers_id_gen.get_next() as stream_id: + yield self.runInteraction( + "delete_pusher", delete_pusher_txn, stream_id + ) @defer.inlineCallbacks def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): diff --git a/synapse/storage/schema/delta/30/deleted_pushers.sql b/synapse/storage/schema/delta/30/deleted_pushers.sql new file mode 100644 index 000000000..cdcf79ac8 --- /dev/null +++ b/synapse/storage/schema/delta/30/deleted_pushers.sql @@ -0,0 +1,24 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS deleted_pushers( + stream_id BIGINT NOT NULL, + app_id TEXT NOT NULL, + pushkey TEXT NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (app_id, pushkey, user_id) +); + +CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 610ddad42..a02dfc7d5 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -49,9 +49,14 @@ class StreamIdGenerator(object): with stream_id_gen.get_next() as stream_id: # ... persist event ... """ - def __init__(self, db_conn, table, column): + def __init__(self, db_conn, table, column, extra_tables=[]): self._lock = threading.Lock() self._current_max = _load_max_id(db_conn, table, column) + for table, column in extra_tables: + self._current_max = max( + self._current_max, + _load_max_id(db_conn, table, column) + ) self._unfinished_ids = deque() def get_next(self): diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 4a42eb336..f4b5fb332 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -131,6 +131,7 @@ class ReplicationResourceCase(unittest.TestCase): test_timeout_tag_account_data = _test_timeout("tag_account_data") test_timeout_backfill = _test_timeout("backfill") test_timeout_push_rules = _test_timeout("push_rules") + test_timeout_pushers = _test_timeout("pushers") @defer.inlineCallbacks def send_text_message(self, room_id, message): From 12904932c41c73714543b817157f09073fcc2625 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Mar 2016 17:41:06 +0000 Subject: [PATCH 04/59] Hook up adding a pusher to the notifier for replication. --- synapse/notifier.py | 6 ++++++ synapse/rest/client/v1/pusher.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index 9b69b0333..f00cd8c58 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -282,6 +282,12 @@ class Notifier(object): self.notify_replication() + def on_new_replication_data(self): + """Used to inform replication listeners that something has happend + without waking up any of the normal user event streams""" + with PreserveLoggingContext(): + self.notify_replication() + @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START): diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index ee029b4f7..9881f068c 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -29,6 +29,10 @@ logger = logging.getLogger(__name__) class PusherRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/pushers/set$") + def __init__(self, hs): + super(PusherRestServlet, self).__init__(hs) + self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_POST(self, request): requester = yield self.auth.get_user_by_req(request) @@ -87,6 +91,8 @@ class PusherRestServlet(ClientV1RestServlet): raise SynapseError(400, "Config Error: " + pce.message, errcode=Codes.MISSING_PARAM) + self.notifier.on_new_replication_data() + defer.returnValue((200, {})) def on_OPTIONS(self, _): From ee32d622cec56f2ab7b11577d15e4b805477d13f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Mar 2016 17:47:36 +0000 Subject: [PATCH 05/59] Fix a couple of errors when deleting pushers --- synapse/storage/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 29da3bbd1..87b2ac577 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -136,7 +136,7 @@ class PusherStore(SQLBaseStore): @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): - self._simple_delete_one( + self._simple_delete_one_txn( txn, "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} @@ -145,7 +145,7 @@ class PusherStore(SQLBaseStore): txn, "deleted_pushers", {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, - {"stream_id", stream_id}, + {"stream_id": stream_id}, ) with self._pushers_id_gen.get_next() as stream_id: yield self.runInteraction( From a877209c8b0c7c476ee6676c6d00c4cacdc83207 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 16 Mar 2016 09:45:37 +0000 Subject: [PATCH 06/59] Password reset docs and script Replace the bash/perl gen_password script with a python one, and write a note on how to use it. --- README.rst | 20 ++++++++++++++++++++ scripts/gen_password | 1 - scripts/hash_password | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) delete mode 100644 scripts/gen_password create mode 100755 scripts/hash_password diff --git a/README.rst b/README.rst index 8a745259b..a48a0802b 100644 --- a/README.rst +++ b/README.rst @@ -525,6 +525,26 @@ Logging In To An Existing Account Just enter the ``@localpart:my.domain.here`` Matrix user ID and password into the form and click the Login button. +Password reset +============== + +Synapse does not yet support a password-reset function (see +https://matrix.org/jira/browse/SYN-11). In the meantime, it is possible to +manually reset a user's password via direct database access. + +First calculate the hash of the new password: + + $ source ~/.synapse/bin/activate + $ ./scripts/hash_password + Password: + Confirm password: + $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +Then update the `users` table in the database: + + UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' + WHERE name='@test:test.com'; + Identity Servers ================ diff --git a/scripts/gen_password b/scripts/gen_password deleted file mode 100644 index 7afd3a5df..000000000 --- a/scripts/gen_password +++ /dev/null @@ -1 +0,0 @@ -perl -MCrypt::Random -MCrypt::Eksblowfish::Bcrypt -e 'print Crypt::Eksblowfish::Bcrypt::bcrypt("secret", "\$2\$12\$" . Crypt::Eksblowfish::Bcrypt::en_base64(Crypt::Random::makerandom_octet(Length=>16)))."\n"' diff --git a/scripts/hash_password b/scripts/hash_password new file mode 100755 index 000000000..e78460098 --- /dev/null +++ b/scripts/hash_password @@ -0,0 +1,39 @@ +#!/usr/bin/env python + +import argparse +import bcrypt +import getpass + +bcrypt_rounds=12 + +def prompt_for_pass(): + password = getpass.getpass("Password: ") + + if not password: + raise Exception("Password cannot be blank.") + + confirm_password = getpass.getpass("Confirm password: ") + + if password != confirm_password: + raise Exception("Passwords do not match.") + + return password + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Calculate the hash of a new password, so that passwords" + " can be reset") + parser.add_argument( + "-p", "--password", + default=None, + help="New password for user. Will prompt if omitted.", + ) + + args = parser.parse_args() + password = args.password + + if not password: + password = prompt_for_pass() + + print bcrypt.hashpw(password, bcrypt.gensalt(bcrypt_rounds)) + From ba660ecde20544ac1cfc163a5586f4f202627afa Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 16 Mar 2016 10:35:00 +0000 Subject: [PATCH 07/59] Add a comment to offer a hint to an explanation for why we have a unique constraint on (app_id, pushkey, user_id) --- synapse/storage/schema/delta/30/deleted_pushers.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/schema/delta/30/deleted_pushers.sql b/synapse/storage/schema/delta/30/deleted_pushers.sql index cdcf79ac8..712c454aa 100644 --- a/synapse/storage/schema/delta/30/deleted_pushers.sql +++ b/synapse/storage/schema/delta/30/deleted_pushers.sql @@ -18,6 +18,7 @@ CREATE TABLE IF NOT EXISTS deleted_pushers( app_id TEXT NOT NULL, pushkey TEXT NOT NULL, user_id TEXT NOT NULL, + /* We only track the most recent delete for each app_id, pushkey and user_id. */ UNIQUE (app_id, pushkey, user_id) ); From 660ae8e0f3c7f667b3a24b02f095d60c2b09531f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 16 Mar 2016 10:40:38 +0000 Subject: [PATCH 08/59] Clarify that we do have reset functionality via the IS --- README.rst | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/README.rst b/README.rst index a48a0802b..285fc5aa8 100644 --- a/README.rst +++ b/README.rst @@ -525,27 +525,6 @@ Logging In To An Existing Account Just enter the ``@localpart:my.domain.here`` Matrix user ID and password into the form and click the Login button. -Password reset -============== - -Synapse does not yet support a password-reset function (see -https://matrix.org/jira/browse/SYN-11). In the meantime, it is possible to -manually reset a user's password via direct database access. - -First calculate the hash of the new password: - - $ source ~/.synapse/bin/activate - $ ./scripts/hash_password - Password: - Confirm password: - $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx - -Then update the `users` table in the database: - - UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' - WHERE name='@test:test.com'; - - Identity Servers ================ @@ -565,6 +544,26 @@ as the primary means of identity and E2E encryption is not complete. As such, we are running a single identity server (https://matrix.org) at the current time. +Password reset +============== + +If a user has registered an email address to their account using an identity +server, they can request a password-reset token via clients such as Vector. + +A manual password reset can be done via direct database access as follows. + +First calculate the hash of the new password: + + $ source ~/.synapse/bin/activate + $ ./scripts/hash_password + Password: + Confirm password: + $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +Then update the `users` table in the database: + + UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' + WHERE name='@test:test.com'; Where's the spec?! ================== From c12b9d719a3cf1eeb9c4c8d354dbaecab5e76233 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 11:56:24 +0000 Subject: [PATCH 09/59] Make registration idempotent: if you specify the same session, make it give you an access token for the user that was registered on previous uses of that session. Tweak the UI auth layer to not delete sessions when their auth has completed and hence expire themn so they don't hang around until server restart. Allow server-side data to be associated with UI auth sessions. --- synapse/handlers/auth.py | 58 +++++++++++++++++++----- synapse/rest/client/v2_alpha/register.py | 27 ++++++++++- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 5c0ea636b..5dc9d9175 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -27,6 +27,7 @@ import logging import bcrypt import pymacaroons import simplejson +import time import synapse.util.stringutils as stringutils @@ -35,6 +36,7 @@ logger = logging.getLogger(__name__) class AuthHandler(BaseHandler): + SESSION_EXPIRE_SECS = 48 * 60 * 60 def __init__(self, hs): super(AuthHandler, self).__init__(hs) @@ -66,15 +68,18 @@ class AuthHandler(BaseHandler): 'auth' key: this method prompts for auth if none is sent. clientip (str): The IP address of the client. Returns: - A tuple of (authed, dict, dict) where authed is true if the client - has successfully completed an auth flow. If it is true, the first - dict contains the authenticated credentials of each stage. + A tuple of (authed, dict, dict, session_id) where authed is true if + the client has successfully completed an auth flow. If it is true + the first dict contains the authenticated credentials of each stage. If authed is false, the first dictionary is the server response to the login request and should be passed back to the client. In either case, the second dict contains the parameters for this request (which may have been given only in a previous call). + + session_id is the ID of this session, either passed in by the client + or assigned by the call to check_auth """ authdict = None @@ -103,7 +108,10 @@ class AuthHandler(BaseHandler): if not authdict: defer.returnValue( - (False, self._auth_dict_for_flows(flows, session), clientdict) + ( + False, self._auth_dict_for_flows(flows, session), + clientdict, session['id'] + ) ) if 'creds' not in session: @@ -122,12 +130,11 @@ class AuthHandler(BaseHandler): for f in flows: if len(set(f) - set(creds.keys())) == 0: logger.info("Auth completed with creds: %r", creds) - self._remove_session(session) - defer.returnValue((True, creds, clientdict)) + defer.returnValue((True, creds, clientdict, session['id'])) ret = self._auth_dict_for_flows(flows, session) ret['completed'] = creds.keys() - defer.returnValue((False, ret, clientdict)) + defer.returnValue((False, ret, clientdict, session['id'])) @defer.inlineCallbacks def add_oob_auth(self, stagetype, authdict, clientip): @@ -154,6 +161,29 @@ class AuthHandler(BaseHandler): defer.returnValue(True) defer.returnValue(False) + def set_session_data(self, session_id, key, value): + """ + Store a key-value pair into the sessions data associated with this + request. This data is stored server-side and cannot be modified by + the client. + :param session_id: (string) The ID of this session as returned from check_auth + :param key: (string) The key to store the data under + :param value: (any) The data to store + """ + sess = self._get_session_info(session_id) + sess.setdefault('serverdict', {})[key] = value + self._save_session(sess) + + def get_session_data(self, session_id, key, default=None): + """ + Retrieve data stored with set_session_data + :param session_id: (string) The ID of this session as returned from check_auth + :param key: (string) The key to store the data under + :param default: (any) Value to return if the key has not been set + """ + sess = self._get_session_info(session_id) + return sess.setdefault('serverdict', {}).get(key, default) + @defer.inlineCallbacks def _check_password_auth(self, authdict, _): if "user" not in authdict or "password" not in authdict: @@ -263,7 +293,7 @@ class AuthHandler(BaseHandler): if not session_id: # create a new session while session_id is None or session_id in self.sessions: - session_id = stringutils.random_string(24) + session_id = stringutils.random_string_with_symbols(24) self.sessions[session_id] = { "id": session_id, } @@ -455,11 +485,17 @@ class AuthHandler(BaseHandler): def _save_session(self, session): # TODO: Persistent storage logger.debug("Saving session %s", session) + session["last_used"] = time.time() self.sessions[session["id"]] = session + self._prune_sessions() - def _remove_session(self, session): - logger.debug("Removing session %s", session) - del self.sessions[session["id"]] + def _prune_sessions(self): + for sid,sess in self.sessions.items(): + last_used = 0 + if 'last_used' in sess: + last_used = sess['last_used'] + if last_used < time.time() - AuthHandler.SESSION_EXPIRE_SECS: + del self.sessions[sid] def hash(self, password): """Computes a secure hash of password. diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 533ff136e..649491bdf 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -139,7 +139,7 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY] ] - authed, result, params = yield self.auth_handler.check_auth( + authed, result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) ) @@ -147,6 +147,24 @@ class RegisterRestServlet(RestServlet): defer.returnValue((401, result)) return + # have we already registered a user for this session + registered_user_id = self.auth_handler.get_session_data( + session_id, "registered_user_id", None + ) + if registered_user_id is not None: + logger.info( + "Already registered user ID %r for this session", + registered_user_id + ) + access_token = yield self.auth_handler.issue_access_token(registered_user_id) + refresh_token = yield self.auth_handler.issue_refresh_token(registered_user_id) + defer.returnValue((200, { + "user_id": registered_user_id, + "access_token": access_token, + "home_server": self.hs.hostname, + "refresh_token": refresh_token, + })) + # NB: This may be from the auth handler and NOT from the POST if 'password' not in params: raise SynapseError(400, "Missing password.", Codes.MISSING_PARAM) @@ -161,6 +179,13 @@ class RegisterRestServlet(RestServlet): guest_access_token=guest_access_token, ) + # remember that we've now registered that user account, and with what + # user ID (since the user may not have specified) + logger.info("%r", body) + self.auth_handler.set_session_data( + session_id, "registered_user_id", user_id + ) + if result and LoginType.EMAIL_IDENTITY in result: threepid = result[LoginType.EMAIL_IDENTITY] From 99797947aa5a7cdf8fe12043b4f25a155bcf4555 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 12:51:34 +0000 Subject: [PATCH 10/59] pep8 & remove debug logging --- synapse/handlers/auth.py | 2 +- synapse/rest/client/v2_alpha/register.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 5dc9d9175..a9f5e3710 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -490,7 +490,7 @@ class AuthHandler(BaseHandler): self._prune_sessions() def _prune_sessions(self): - for sid,sess in self.sessions.items(): + for sid, sess in self.sessions.items(): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 649491bdf..c440430e2 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -149,7 +149,7 @@ class RegisterRestServlet(RestServlet): # have we already registered a user for this session registered_user_id = self.auth_handler.get_session_data( - session_id, "registered_user_id", None + session_id, "registered_user_id", None ) if registered_user_id is not None: logger.info( @@ -157,7 +157,9 @@ class RegisterRestServlet(RestServlet): registered_user_id ) access_token = yield self.auth_handler.issue_access_token(registered_user_id) - refresh_token = yield self.auth_handler.issue_refresh_token(registered_user_id) + refresh_token = yield self.auth_handler.issue_refresh_token( + registered_user_id + ) defer.returnValue((200, { "user_id": registered_user_id, "access_token": access_token, @@ -181,9 +183,8 @@ class RegisterRestServlet(RestServlet): # remember that we've now registered that user account, and with what # user ID (since the user may not have specified) - logger.info("%r", body) self.auth_handler.set_session_data( - session_id, "registered_user_id", user_id + session_id, "registered_user_id", user_id ) if result and LoginType.EMAIL_IDENTITY in result: From ff7d3dc3a03538b08f36342b15492a348b2b0391 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 14:25:14 +0000 Subject: [PATCH 11/59] Fix tests --- tests/rest/client/v2_alpha/test_register.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 9a202e9dd..affd42c01 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -22,9 +22,10 @@ class RegisterRestServletTestCase(unittest.TestCase): side_effect=lambda x: defer.succeed(self.appservice)) ) - self.auth_result = (False, None, None) + self.auth_result = (False, None, None, None) self.auth_handler = Mock( - check_auth=Mock(side_effect=lambda x, y, z: self.auth_result) + check_auth=Mock(side_effect=lambda x, y, z: self.auth_result), + get_session_data=Mock(return_value=None) ) self.registration_handler = Mock() self.identity_handler = Mock() @@ -112,7 +113,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.auth_result = (True, None, { "username": "kermit", "password": "monkey" - }) + }, None) self.registration_handler.register = Mock(return_value=(user_id, token)) (code, result) = yield self.servlet.on_POST(self.request) @@ -135,7 +136,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.auth_result = (True, None, { "username": "kermit", "password": "monkey" - }) + }, None) self.registration_handler.register = Mock(return_value=("@user:id", "t")) d = self.servlet.on_POST(self.request) return self.assertFailure(d, SynapseError) From f5e90422f5d70afaf9bdf97cc620b563cf31a8eb Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 14:33:19 +0000 Subject: [PATCH 12/59] take extra return val from check_auth in account too --- synapse/rest/client/v2_alpha/account.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index dd4ea4558..7f8a6a4cf 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -43,7 +43,7 @@ class PasswordRestServlet(RestServlet): body = parse_json_object_from_request(request) - authed, result, params = yield self.auth_handler.check_auth([ + authed, result, params, _ = yield self.auth_handler.check_auth([ [LoginType.PASSWORD], [LoginType.EMAIL_IDENTITY] ], body, self.hs.get_ip_from_request(request)) From 742b6c6d158f46a71724821ce13b8ad535df08bc Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 15:42:35 +0000 Subject: [PATCH 13/59] Use hs get_clock instead of time.time() --- synapse/handlers/auth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a9f5e3710..dba6c76df 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) class AuthHandler(BaseHandler): - SESSION_EXPIRE_SECS = 48 * 60 * 60 + SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000 def __init__(self, hs): super(AuthHandler, self).__init__(hs) @@ -494,7 +494,7 @@ class AuthHandler(BaseHandler): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] - if last_used < time.time() - AuthHandler.SESSION_EXPIRE_SECS: + if last_used < self.hs.get_clock().time() - AuthHandler.SESSION_EXPIRE_MS: del self.sessions[sid] def hash(self, password): From 9671e6750c1756c7f76888b87eedfe7516ef748b Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 15:51:28 +0000 Subject: [PATCH 14/59] Replace other time.time(). --- synapse/handlers/auth.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index dba6c76df..9fa834709 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -27,7 +27,6 @@ import logging import bcrypt import pymacaroons import simplejson -import time import synapse.util.stringutils as stringutils @@ -485,7 +484,7 @@ class AuthHandler(BaseHandler): def _save_session(self, session): # TODO: Persistent storage logger.debug("Saving session %s", session) - session["last_used"] = time.time() + session["last_used"] = self.hs.get_clock().time_msec() self.sessions[session["id"]] = session self._prune_sessions() From 3176aebf9d827eeb939438deea49e12ceddc5b3e Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 15:55:49 +0000 Subject: [PATCH 15/59] string with symbols is a bit too symboly. --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 9fa834709..85f9b8271 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -292,7 +292,7 @@ class AuthHandler(BaseHandler): if not session_id: # create a new session while session_id is None or session_id in self.sessions: - session_id = stringutils.random_string_with_symbols(24) + session_id = stringutils.random_string(24) self.sessions[session_id] = { "id": session_id, } From 3ee7d7dc7f1793beefee433f780af81a64dfa590 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 16:18:52 +0000 Subject: [PATCH 16/59] time_msec() --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 85f9b8271..cdf9e9000 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -493,7 +493,7 @@ class AuthHandler(BaseHandler): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] - if last_used < self.hs.get_clock().time() - AuthHandler.SESSION_EXPIRE_MS: + if last_used < self.hs.get_clock().time_msec() - AuthHandler.SESSION_EXPIRE_MS: del self.sessions[sid] def hash(self, password): From b58d10a87595bd64305f46c2ac86252a67d7b0e4 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 16:22:20 +0000 Subject: [PATCH 17/59] pep8 --- synapse/handlers/auth.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index cdf9e9000..d7233cd0d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -493,7 +493,8 @@ class AuthHandler(BaseHandler): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] - if last_used < self.hs.get_clock().time_msec() - AuthHandler.SESSION_EXPIRE_MS: + now = self.hs.get_clock().time_msec() + if last_used < now - AuthHandler.SESSION_EXPIRE_MS: del self.sessions[sid] def hash(self, password): From a7daa5ae131cc860769d859cf03b48cefdc0500a Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 19:36:57 +0000 Subject: [PATCH 18/59] Make registration idempotent, part 2: be idempotent if the client specifies a username. --- synapse/handlers/auth.py | 14 ++++++++++++++ synapse/handlers/register.py | 12 +++++++++++- synapse/rest/client/v2_alpha/register.py | 22 +++++++++++++++++----- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d7233cd0d..82d458b42 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -160,6 +160,20 @@ class AuthHandler(BaseHandler): defer.returnValue(True) defer.returnValue(False) + def get_session_id(self, clientdict): + """ + Gets the session ID for a client given the client dictionary + :param clientdict: The dictionary sent by the client in the request + :return: The string session ID the client sent. If the client did not + send a session ID, returns None. + """ + sid = None + if clientdict and 'auth' in clientdict: + authdict = clientdict['auth'] + if 'session' in authdict: + sid = authdict['session'] + return sid + def set_session_data(self, session_id, key, value): """ Store a key-value pair into the sessions data associated with this diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6ffb8c0da..f287ee247 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -47,7 +47,8 @@ class RegistrationHandler(BaseHandler): self._next_generated_user_id = None @defer.inlineCallbacks - def check_username(self, localpart, guest_access_token=None): + def check_username(self, localpart, guest_access_token=None, + assigned_user_id=None): yield run_on_reactor() if urllib.quote(localpart.encode('utf-8')) != localpart: @@ -60,6 +61,15 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() + if assigned_user_id: + if user_id == assigned_user_id: + return + else: + raise SynapseError( + 400, + "A different user ID has already been registered for this session", + ) + yield self.check_user_id_not_appservice_exclusive(user_id) users = yield self.store.get_users_by_id_case_insensitive(user_id) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c440430e2..b8590560d 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.constants import LoginType +from synapse.types import UserID from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import RestServlet, parse_json_object_from_request @@ -122,10 +123,25 @@ class RegisterRestServlet(RestServlet): guest_access_token = body.get("guest_access_token", None) + session_id = self.auth_handler.get_session_id(body) + logger.error("session id: %r", session_id) + registered_user_id = None + if session_id: + # if we get a registered user id out of here, it means we previously + # registered a user for this session, so we could just return the + # user here. We carry on and go through the auth checks though, + # for paranoia. + registered_user_id = self.auth_handler.get_session_data( + session_id, "registered_user_id", None + ) + logger.error("already regged: %r", registered_user_id) + logger.error("check: %r", desired_username) + if desired_username is not None: yield self.registration_handler.check_username( desired_username, - guest_access_token=guest_access_token + guest_access_token=guest_access_token, + assigned_user_id=registered_user_id, ) if self.hs.config.enable_registration_captcha: @@ -147,10 +163,6 @@ class RegisterRestServlet(RestServlet): defer.returnValue((401, result)) return - # have we already registered a user for this session - registered_user_id = self.auth_handler.get_session_data( - session_id, "registered_user_id", None - ) if registered_user_id is not None: logger.info( "Already registered user ID %r for this session", From f984decd6636baa4974a136e2ce8d4fecab3146f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 19:40:48 +0000 Subject: [PATCH 19/59] Unused import --- synapse/rest/client/v2_alpha/register.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b8590560d..d3e66740a 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -16,7 +16,6 @@ from twisted.internet import defer from synapse.api.constants import LoginType -from synapse.types import UserID from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import RestServlet, parse_json_object_from_request From 5670205e2a0e4b87005be743eb6cdfd817fe89ae Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 19:49:42 +0000 Subject: [PATCH 20/59] remove debug logging --- synapse/rest/client/v2_alpha/register.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index d3e66740a..d32c06c88 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -123,7 +123,6 @@ class RegisterRestServlet(RestServlet): guest_access_token = body.get("guest_access_token", None) session_id = self.auth_handler.get_session_id(body) - logger.error("session id: %r", session_id) registered_user_id = None if session_id: # if we get a registered user id out of here, it means we previously @@ -133,8 +132,6 @@ class RegisterRestServlet(RestServlet): registered_user_id = self.auth_handler.get_session_data( session_id, "registered_user_id", None ) - logger.error("already regged: %r", registered_user_id) - logger.error("check: %r", desired_username) if desired_username is not None: yield self.registration_handler.check_username( From 4ebb688f4ffcef2784df0aa7f5b088e72b28c07b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 17 Mar 2016 10:59:12 +0000 Subject: [PATCH 21/59] Add option to definitions.py to search for functions a function refers to --- scripts-dev/definitions.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/scripts-dev/definitions.py b/scripts-dev/definitions.py index 8340c7261..47dac7772 100755 --- a/scripts-dev/definitions.py +++ b/scripts-dev/definitions.py @@ -86,9 +86,12 @@ def used_names(prefix, item, defs, names): for name, funcs in defs.get('class', {}).items(): used_names(prefix + name + ".", name, funcs, names) + path = prefix.rstrip('.') for used in defs.get('uses', ()): if used in names: - names[used].setdefault('used', {}).setdefault(item, []).append(prefix.rstrip('.')) + if item: + names[item].setdefault('uses', []).append(used) + names[used].setdefault('used', {}).setdefault(item, []).append(path) if __name__ == '__main__': @@ -113,6 +116,10 @@ if __name__ == '__main__': "--referrers", default=0, type=int, help="Include referrers up to the given depth" ) + parser.add_argument( + "--referred", default=0, type=int, + help="Include referred down to the given depth" + ) parser.add_argument( "--format", default="yaml", help="Output format, one of 'yaml' or 'dot'" @@ -161,6 +168,20 @@ if __name__ == '__main__': continue result[name] = definition + referred_depth = args.referred + referred = set() + while referred_depth: + referred_depth -= 1 + for entry in result.values(): + for uses in entry.get("uses", ()): + referred.add(uses) + for name, definition in names.items(): + if not name in referred: + continue + if ignore and any(pattern.match(name) for pattern in ignore): + continue + result[name] = definition + if args.format == 'yaml': yaml.dump(result, sys.stdout, default_flow_style=False) elif args.format == 'dot': From 673c96ce97052126f5bfd11c7dcc19880614ec25 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 17 Mar 2016 11:01:28 +0000 Subject: [PATCH 22/59] Remove dead code left over from presence changes --- synapse/handlers/events.py | 70 -------------------------------- synapse/handlers/presence.py | 4 -- synapse/storage/roommember.py | 24 ----------- tests/storage/test_roommember.py | 10 ----- 4 files changed, 108 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 72a31a975..f25a25252 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -18,7 +18,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event -from synapse.util.logcontext import preserve_context_over_fn from synapse.api.constants import Membership, EventTypes from synapse.events import EventBase @@ -31,20 +30,6 @@ import random logger = logging.getLogger(__name__) -def started_user_eventstream(distributor, user): - return preserve_context_over_fn( - distributor.fire, - "started_user_eventstream", user - ) - - -def stopped_user_eventstream(distributor, user): - return preserve_context_over_fn( - distributor.fire, - "stopped_user_eventstream", user - ) - - class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -63,61 +48,6 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() - @defer.inlineCallbacks - def started_stream(self, user): - """Tells the presence handler that we have started an eventstream for - the user: - - Args: - user (User): The user who started a stream. - Returns: - A deferred that completes once their presence has been updated. - """ - if user not in self._streams_per_user: - # Make sure we set the streams per user to 1 here rather than - # setting it to zero and incrementing the value below. - # Otherwise this may race with stopped_stream causing the - # user to be erased from the map before we have a chance - # to increment it. - self._streams_per_user[user] = 1 - if user in self._stop_timer_per_user: - try: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(user) - ) - except: - logger.exception("Failed to cancel event timer") - else: - yield started_user_eventstream(self.distributor, user) - else: - self._streams_per_user[user] += 1 - - def stopped_stream(self, user): - """If there are no streams for a user this starts a timer that will - notify the presence handler that we haven't got an event stream for - the user unless the user starts a new stream in 30 seconds. - - Args: - user (User): The user who stopped a stream. - """ - self._streams_per_user[user] -= 1 - if not self._streams_per_user[user]: - del self._streams_per_user[user] - - # 30 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - logger.debug("_later stopped_user_eventstream %s", user) - - self._stop_timer_per_user.pop(user, None) - - return stopped_user_eventstream(self.distributor, user) - - logger.debug("Scheduling _later: for %s", user) - self._stop_timer_per_user[user] = ( - self.clock.call_later(30, _later) - ) - @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f6cf34317..cfbcf2d32 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -73,10 +73,6 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -def user_presence_changed(distributor, user, statuscache): - return distributor.fire("user_presence_changed", user, statuscache) - - def collect_presencelike_data(distributor, user, content): return distributor.fire("collect_presencelike_data", user, content) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3065b0c1a..0cd89260f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -251,30 +251,6 @@ class RoomMemberStore(SQLBaseStore): user_id, membership_list=[Membership.JOIN], ) - @defer.inlineCallbacks - def user_rooms_intersect(self, user_id_list): - """ Checks whether all the users whose IDs are given in a list share a - room. - - This is a "hot path" function that's called a lot, e.g. by presence for - generating the event stream. As such, it is implemented locally by - wrapping logic around heavily-cached database queries. - """ - if len(user_id_list) < 2: - defer.returnValue(True) - - deferreds = [self.get_rooms_for_user(u) for u in user_id_list] - - results = yield defer.DeferredList(deferreds, consumeErrors=True) - - # A list of sets of strings giving room IDs for each user - room_id_lists = [set([r.room_id for r in result[1]]) for result in results] - - # There isn't a setintersection(*list_of_sets) - ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0 - - defer.returnValue(ret) - @defer.inlineCallbacks def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 677d11f68..b029ff058 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -91,11 +91,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): ) )] ) - self.assertFalse( - (yield self.store.user_rooms_intersect( - [self.u_alice.to_string(), self.u_bob.to_string()] - )) - ) @defer.inlineCallbacks def test_two_members(self): @@ -108,11 +103,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): yield self.store.get_room_members(self.room.to_string()) )} ) - self.assertTrue(( - yield self.store.user_rooms_intersect([ - self.u_alice.to_string(), self.u_bob.to_string() - ]) - )) @defer.inlineCallbacks def test_room_hosts(self): From 2cd9260500efa82713edd365f54d491ac0328fb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Mar 2016 11:09:03 +0000 Subject: [PATCH 23/59] Update aliases event after deletion Attempt to update the appropriate `m.room.aliases` event after deleting an alias. This may fail due to the deleter not being in the room. Will also check if the canonical alias of the event is set to the deleted alias, and if so will attempt to delete it. --- synapse/handlers/directory.py | 52 +++++++++++++++++++++++++---- synapse/rest/client/v1/directory.py | 3 +- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index c4aaa1191..be9f2a21b 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -32,6 +32,8 @@ class DirectoryHandler(BaseHandler): def __init__(self, hs): super(DirectoryHandler, self).__init__(hs) + self.state = hs.get_state_handler() + self.federation = hs.get_replication_layer() self.federation.register_query_handler( "directory", self.on_directory_query @@ -93,7 +95,7 @@ class DirectoryHandler(BaseHandler): yield self._create_association(room_alias, room_id, servers) @defer.inlineCallbacks - def delete_association(self, user_id, room_alias): + def delete_association(self, requester, user_id, room_alias): # association deletion for human users can_delete = yield self._user_can_delete_alias(room_alias, user_id) @@ -112,7 +114,25 @@ class DirectoryHandler(BaseHandler): errcode=Codes.EXCLUSIVE ) - yield self._delete_association(room_alias) + room_id = yield self._delete_association(room_alias) + + try: + yield self.send_room_alias_update_event( + requester, + requester.user.to_string(), + room_id + ) + + yield self._update_canonical_alias( + requester, + requester.user.to_string(), + room_id, + room_alias, + ) + except AuthError as e: + logger.info("Failed to update alias events: %s", e) + + defer.returnValue(room_id) @defer.inlineCallbacks def delete_appservice_association(self, service, room_alias): @@ -129,11 +149,9 @@ class DirectoryHandler(BaseHandler): if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") - yield self.store.delete_room_alias(room_alias) + room_id = yield self.store.delete_room_alias(room_alias) - # TODO - Looks like _update_room_alias_event has never been implemented - # if room_id: - # yield self._update_room_alias_events(user_id, room_id) + defer.returnValue(room_id) @defer.inlineCallbacks def get_association(self, room_alias): @@ -233,6 +251,28 @@ class DirectoryHandler(BaseHandler): ratelimit=False ) + @defer.inlineCallbacks + def _update_canonical_alias(self, requester, user_id, room_id, room_alias): + alias_event = yield self.state.get_current_state( + room_id, EventTypes.CanonicalAlias, "" + ) + + if alias_event.content.get("alias", "") != room_alias.to_string(): + return + + msg_handler = self.hs.get_handlers().message_handler + yield msg_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": room_id, + "sender": user_id, + "content": {}, + }, + ratelimit=False + ) + @defer.inlineCallbacks def get_association_from_room_alias(self, room_alias): result = yield self.store.get_association_from_room_alias( diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 60c5ec77a..59a23d6cb 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -127,8 +127,9 @@ class ClientDirectoryServer(ClientV1RestServlet): room_alias = RoomAlias.from_string(room_alias) yield dir_handler.delete_association( - user.to_string(), room_alias + requester, user.to_string(), room_alias ) + logger.info( "User %s deleted alias %s", user.to_string(), From 7a386126206d4fea2b2c561dde6577e5cff107f3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 17 Mar 2016 11:54:19 +0000 Subject: [PATCH 24/59] Remove another unused function from presence --- synapse/handlers/presence.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cfbcf2d32..d0c8f1328 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -73,10 +73,6 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -def collect_presencelike_data(distributor, user, content): - return distributor.fire("collect_presencelike_data", user, content) - - class PresenceHandler(BaseHandler): def __init__(self, hs): From 56aa4e7a9a6846a72e9031e29555b05ed119e679 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Mar 2016 15:24:19 +0000 Subject: [PATCH 25/59] Check canonical alias event exists --- synapse/handlers/directory.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index be9f2a21b..6bcc5a5e2 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -257,7 +257,8 @@ class DirectoryHandler(BaseHandler): room_id, EventTypes.CanonicalAlias, "" ) - if alias_event.content.get("alias", "") != room_alias.to_string(): + alias_str = room_alias.to_string() + if not alias_event or alias_event.content.get("alias", "") != alias_str: return msg_handler = self.hs.get_handlers().message_handler From 3c5f25507b7a62165a782420add4f9dedcec0b3e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 13:55:16 +0000 Subject: [PATCH 26/59] Yield on EDU handling --- synapse/federation/federation_server.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e8bfbe7cb..a961b17ae 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -137,8 +137,8 @@ class FederationServer(FederationBase): logger.exception("Failed to handle PDU") if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( + for edu in (Edu(**x) for x in transaction.edus): + yield self.received_edu( transaction.origin, edu.edu_type, edu.content @@ -161,11 +161,12 @@ class FederationServer(FederationBase): ) defer.returnValue((200, response)) + @defer.inlineCallbacks def received_edu(self, origin, edu_type, content): received_edus_counter.inc() if edu_type in self.edu_handlers: - self.edu_handlers[edu_type](origin, content) + yield self.edu_handlers[edu_type](origin, content) else: logger.warn("Received EDU of type %s with no handler", edu_type) From 67ed8065dba960055c2e3d1740af12229b7d19a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 14:31:31 +0000 Subject: [PATCH 27/59] Dedupe requested event list in _get_events --- synapse/storage/events.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 552e7ca35..285c586cf 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -526,6 +526,9 @@ class EventsStore(SQLBaseStore): if not event_ids: defer.returnValue([]) + event_id_list = event_ids + event_ids = set(event_ids) + event_map = self._get_events_from_cache( event_ids, check_redacted=check_redacted, @@ -535,23 +538,18 @@ class EventsStore(SQLBaseStore): missing_events_ids = [e for e in event_ids if e not in event_map] - if not missing_events_ids: - defer.returnValue([ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ]) + if missing_events_ids: + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - missing_events = yield self._enqueue_events( - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - event_map.update(missing_events) + event_map.update(missing_events) defer.returnValue([ - event_map[e_id] for e_id in event_ids + event_map[e_id] for e_id in event_id_list if e_id in event_map and event_map[e_id] ]) From 58e207cd77b3b68b1908078418c9c9e9c3830ea5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 14:31:44 +0000 Subject: [PATCH 28/59] Don't assume existence of event_id in __str__ --- synapse/events/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bbfa5a726..abed6b5e6 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -168,5 +168,7 @@ class FrozenEvent(EventBase): def __repr__(self): return "" % ( - self.event_id, self.type, self.get("state_key", None), + self.get("event_id", None), + self.get("type", None), + self.get("state_key", None), ) From 9adf0e92bc3dbe4305beaf602406fc5ca51ea37e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 15:12:50 +0000 Subject: [PATCH 29/59] Catch exceptions from EDU handling --- synapse/federation/federation_server.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a961b17ae..76820b924 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -166,7 +166,12 @@ class FederationServer(FederationBase): received_edus_counter.inc() if edu_type in self.edu_handlers: - yield self.edu_handlers[edu_type](origin, content) + try: + yield self.edu_handlers[edu_type](origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception as e: + logger.exception("Failed to handle edu %r", edu_type, e) else: logger.warn("Received EDU of type %s with no handler", edu_type) From 58f8226c7f2aaf9ebe39703edc91ad5cf1b01112 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Mar 2016 14:20:34 +0000 Subject: [PATCH 30/59] remove unused current_state variable from on_receive_pdu --- synapse/handlers/federation.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f599e817a..c172877bd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -123,7 +123,6 @@ class FederationHandler(BaseHandler): # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work - current_state = None is_in_room = yield self.auth.check_host_in_room( event.room_id, self.server_name @@ -187,7 +186,6 @@ class FederationHandler(BaseHandler): event, state=state, backfilled=backfilled, - current_state=current_state, ) except AuthError as e: raise FederationError( From 3e7fac0d56dca5b389ef7a671c1cd6b0795724c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Mar 2016 14:03:20 +0000 Subject: [PATCH 31/59] Add published room list edit API --- synapse/api/auth.py | 54 ++++++++++++++++++++++++++--- synapse/handlers/directory.py | 16 +++++++++ synapse/rest/client/v1/directory.py | 42 ++++++++++++++++++++++ synapse/storage/room.py | 8 +++++ 4 files changed, 116 insertions(+), 4 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3038df4ab..4f9c3c9db 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -814,17 +814,16 @@ class Auth(object): return auth_ids - @log_function - def _can_send_event(self, event, auth_events): + def _get_send_level(self, etype, state_key, auth_events): key = (EventTypes.PowerLevels, "", ) send_level_event = auth_events.get(key) send_level = None if send_level_event: send_level = send_level_event.content.get("events", {}).get( - event.type + etype ) if send_level is None: - if hasattr(event, "state_key"): + if state_key is not None: send_level = send_level_event.content.get( "state_default", 50 ) @@ -838,6 +837,13 @@ class Auth(object): else: send_level = 0 + return send_level + + @log_function + def _can_send_event(self, event, auth_events): + send_level = self._get_send_level( + event.type, event.get("state_key", None), auth_events + ) user_level = self._get_user_power_level(event.user_id, auth_events) if user_level < send_level: @@ -982,3 +988,43 @@ class Auth(object): "You don't have permission to add ops level greater " "than your own" ) + + @defer.inlineCallbacks + def check_can_change_room_list(self, room_id, user): + """Check if the user is allowed to edit the room's entry in the + published room list. + + Args: + room_id (str) + user (UserID) + """ + + is_admin = yield self.is_server_admin(user) + if is_admin: + defer.returnValue(True) + + user_id = user.to_string() + yield self.check_joined_room(room_id, user_id) + + # We currently require the user is a "moderator" in the room. We do this + # by checking if they would (theoretically) be able to change the + # m.room.aliases events + power_level_event = yield self.state.get_current_state( + room_id, EventTypes.PowerLevels, "" + ) + + auth_events = {} + if power_level_event: + auth_events[(EventTypes.PowerLevels, "")] = power_level_event + + send_level = self._get_send_level( + EventTypes.Aliases, "", auth_events + ) + user_level = self._get_user_power_level(user_id, auth_events) + + if user_level < send_level: + raise AuthError( + 403, + "This server requires you to be a moderator in the room to" + " edit its room list entry" + ) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 6bcc5a5e2..b2617c889 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -317,3 +317,19 @@ class DirectoryHandler(BaseHandler): is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) defer.returnValue(is_admin) + + @defer.inlineCallbacks + def edit_published_room_list(self, requester, room_id, visibility): + if requester.is_guest: + raise AuthError(403, "Guests cannot edit the published room list") + + if visibility not in ["public", "private"]: + raise SynapseError(400, "Invalide visibility setting") + + room = yield self.store.get_room(room_id) + if room is None: + raise SynapseError(400, "Unknown room") + + yield self.auth.check_can_change_room_list(room_id, requester.user) + + yield self.store.set_room_is_public(room_id, visibility == "public") diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 59a23d6cb..8ac09419d 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -30,6 +30,7 @@ logger = logging.getLogger(__name__) def register_servlets(hs, http_server): ClientDirectoryServer(hs).register(http_server) + ClientDirectoryListServer(hs).register(http_server) class ClientDirectoryServer(ClientV1RestServlet): @@ -137,3 +138,44 @@ class ClientDirectoryServer(ClientV1RestServlet): ) defer.returnValue((200, {})) + + +class ClientDirectoryListServer(ClientV1RestServlet): + PATTERNS = client_path_patterns("/directory/list/room/(?P[^/]*)$") + + def __init__(self, hs): + super(ClientDirectoryListServer, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + room = yield self.store.get_room(room_id) + if room is None: + raise SynapseError(400, "Unknown room") + + defer.returnValue((200, { + "visibility": "public" if room["is_public"] else "private" + })) + + @defer.inlineCallbacks + def on_PUT(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + + content = parse_json_object_from_request(request) + visibility = content.get("visibility", "public") + + yield self.handlers.directory_handler.edit_published_room_list( + requester, room_id, visibility, + ) + + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_DELETE(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + + yield self.handlers.directory_handler.edit_published_room_list( + requester, room_id, "private", + ) + + defer.returnValue((200, {})) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 46ab38a31..9be977f38 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -77,6 +77,14 @@ class RoomStore(SQLBaseStore): allow_none=True, ) + def set_room_is_public(self, room_id, is_public): + return self._simple_update_one( + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"is_public": is_public}, + desc="set_room_is_public", + ) + def get_public_room_ids(self): return self._simple_select_onecol( table="rooms", From 5244c0b48ebb86273bbb79a1935cf5893aa6f310 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Mar 2016 18:03:08 +0000 Subject: [PATCH 32/59] Remove unused backfilled parameter from persist_event --- synapse/federation/federation_server.py | 1 - synapse/handlers/federation.py | 38 ++++++++++--------------- synapse/storage/events.py | 22 +++----------- 3 files changed, 19 insertions(+), 42 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 76820b924..429ab6dde 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -531,7 +531,6 @@ class FederationServer(FederationBase): yield self.handler.on_receive_pdu( origin, pdu, - backfilled=False, state=state, auth_chain=auth_chain, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c172877bd..267fedf11 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -102,7 +102,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, backfilled, state=None, + def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. @@ -185,7 +185,6 @@ class FederationHandler(BaseHandler): origin, event, state=state, - backfilled=backfilled, ) except AuthError as e: raise FederationError( @@ -214,18 +213,17 @@ class FederationHandler(BaseHandler): except StoreError: logger.exception("Failed to store room.") - if not backfilled: - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + with PreserveLoggingContext(): + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) if event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -645,7 +643,7 @@ class FederationHandler(BaseHandler): continue try: - self.on_receive_pdu(origin, p, backfilled=False) + self.on_receive_pdu(origin, p) except: logger.exception("Couldn't handle pdu") @@ -777,7 +775,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=False, ) target_user = UserID.from_string(event.state_key) @@ -817,7 +814,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=False, ) target_user = UserID.from_string(event.state_key) @@ -1072,8 +1068,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, backfilled=False, - current_state=None, auth_events=None): + def _handle_new_event(self, origin, event, state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() @@ -1083,7 +1078,7 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not backfilled and not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( event, context, self @@ -1092,9 +1087,7 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=backfilled, - is_new_state=(not outlier and not backfilled), - current_state=current_state, + is_new_state=not outlier, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1192,7 +1185,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, new_event_context, - backfilled=False, is_new_state=True, current_state=state, ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 285c586cf..e444b64ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, backfilled=False, + def persist_event(self, event, context, is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - self.min_stream_token -= 1 - stream_ordering = self.min_stream_token - - if stream_ordering is None: - stream_ordering_manager = self._stream_id_gen.get_next() - else: - @contextmanager - def stream_ordering_manager(): - yield stream_ordering - stream_ordering_manager = stream_ordering_manager() - try: - with stream_ordering_manager as stream_ordering: + with self._stream_id_gen.get_next() as stream_ordering: event.internal_metadata.stream_ordering = stream_ordering yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, context=context, - backfilled=backfilled, is_new_state=is_new_state, current_state=current_state, ) @@ -166,7 +152,7 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) @log_function - def _persist_event_txn(self, txn, event, context, backfilled, + def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table @@ -198,7 +184,7 @@ class EventsStore(SQLBaseStore): return self._persist_events_txn( txn, [(event, context)], - backfilled=backfilled, + backfilled=False, is_new_state=is_new_state, ) From d3654694d09ccbc672c8a2373f09b4a3a24442b8 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 22 Mar 2016 00:52:31 +0000 Subject: [PATCH 33/59] an invalide is something else... --- synapse/handlers/directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index b2617c889..f6143521f 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -324,7 +324,7 @@ class DirectoryHandler(BaseHandler): raise AuthError(403, "Guests cannot edit the published room list") if visibility not in ["public", "private"]: - raise SynapseError(400, "Invalide visibility setting") + raise SynapseError(400, "Invalid visibility setting") room = yield self.store.get_room(room_id) if room is None: From b5912776207b328ad1c2722ef01f836b54e5383e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 10:32:50 +0000 Subject: [PATCH 34/59] Make stateGroupCache honour CACHE_SIZE_FACTOR --- synapse/storage/_base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7dc67ecd5..583b77a83 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -26,6 +26,10 @@ from twisted.internet import defer import sys import time import threading +import os + + +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) logger = logging.getLogger(__name__) @@ -163,7 +167,9 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000) + self._state_group_cache = DictionaryCache( + "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR + ) self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] From 97785bfc0fe42619183e73432b897d2740fa74f8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 10:41:44 +0000 Subject: [PATCH 35/59] Doc string --- synapse/handlers/directory.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f6143521f..8eeb22581 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -320,6 +320,12 @@ class DirectoryHandler(BaseHandler): @defer.inlineCallbacks def edit_published_room_list(self, requester, room_id, visibility): + """Edit the entry of the room in the published room list. + + requester + room_id (str) + visibility (str): "public" or "private" + """ if requester.is_guest: raise AuthError(403, "Guests cannot edit the published room list") From 2c86187a1bd61901ecf4adca3dcced9f68cf26de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 11:59:31 +0000 Subject: [PATCH 36/59] Don't cache events in _state_group_cache Instead, simply cache the event ids, relying on the event cache to cache the actual events. The problem was that while the state groups cache was limited in the number of groups it could hold, each individual group could consist of thousands of events. --- synapse/storage/state.py | 108 +++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 60 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 8ed8a21b0..f06c734c4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -172,7 +172,7 @@ class StateStore(SQLBaseStore): defer.returnValue(events) def _get_state_groups_from_groups(self, groups, types): - """Returns dictionary state_group -> state event ids + """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ def f(txn, groups): if types is not None: @@ -183,7 +183,8 @@ class StateStore(SQLBaseStore): where_clause = "" sql = ( - "SELECT state_group, event_id FROM state_groups_state WHERE" + "SELECT state_group, event_id, type, state_key" + " FROM state_groups_state WHERE" " state_group IN (%s) %s" % ( ",".join("?" for _ in groups), where_clause, @@ -199,7 +200,8 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - results.setdefault(row["state_group"], []).append(row["event_id"]) + key = (row["type"], row["state_key"]) + results.setdefault(row["state_group"], {})[key] = row["event_id"] return results chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] @@ -296,7 +298,7 @@ class StateStore(SQLBaseStore): where a `state_key` of `None` matches all state_keys for the `type`. """ - is_all, state_dict = self._state_group_cache.get(group) + is_all, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} missing_types = set() @@ -308,7 +310,7 @@ class StateStore(SQLBaseStore): if type_to_key.get(typ, object()) is not None: type_to_key.setdefault(typ, set()).add(state_key) - if (typ, state_key) not in state_dict: + if (typ, state_key) not in state_dict_ids: missing_types.add((typ, state_key)) sentinel = object() @@ -326,7 +328,7 @@ class StateStore(SQLBaseStore): got_all = not (missing_types or types is None) return { - k: v for k, v in state_dict.items() + k: v for k, v in state_dict_ids.items() if include(k[0], k[1]) }, missing_types, got_all @@ -340,8 +342,9 @@ class StateStore(SQLBaseStore): Args: group: The state group to lookup """ - is_all, state_dict = self._state_group_cache.get(group) - return state_dict, is_all + is_all, state_dict_ids = self._state_group_cache.get(group) + + return state_dict_ids, is_all @defer.inlineCallbacks def _get_state_for_groups(self, groups, types=None): @@ -354,84 +357,69 @@ class StateStore(SQLBaseStore): missing_groups = [] if types is not None: for group in set(groups): - state_dict, missing_types, got_all = self._get_some_state_from_cache( + state_dict_ids, missing_types, got_all = self._get_some_state_from_cache( group, types ) - results[group] = state_dict + results[group] = state_dict_ids if not got_all: missing_groups.append(group) else: for group in set(groups): - state_dict, got_all = self._get_all_state_from_cache( + state_dict_ids, got_all = self._get_all_state_from_cache( group ) - results[group] = state_dict + + results[group] = state_dict_ids if not got_all: missing_groups.append(group) - if not missing_groups: - defer.returnValue({ - group: { - type_tuple: event - for type_tuple, event in state.items() - if event - } - for group, state in results.items() - }) + if missing_groups: + # Okay, so we have some missing_types, lets fetch them. + cache_seq_num = self._state_group_cache.sequence - # Okay, so we have some missing_types, lets fetch them. - cache_seq_num = self._state_group_cache.sequence + group_to_state_dict = yield self._get_state_groups_from_groups( + missing_groups, types + ) - group_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types - ) + # Now we want to update the cache with all the things we fetched + # from the database. + for group, group_state_dict in group_to_state_dict.items(): + if types: + # We delibrately put key -> None mappings into the cache to + # cache absence of the key, on the assumption that if we've + # explicitly asked for some types then we will probably ask + # for them again. + state_dict = {key: None for key in types} + state_dict.update(results[group]) + results[group] = state_dict + else: + state_dict = results[group] + + state_dict.update(group_state_dict) + + self._state_group_cache.update( + cache_seq_num, + key=group, + value=state_dict, + full=(types is None), + ) state_events = yield self._get_events( - [e_id for l in group_state_dict.values() for e_id in l], + [ev_id for sd in results.values() for ev_id in sd.values()], get_prev_content=False ) state_events = {e.event_id: e for e in state_events} - # Now we want to update the cache with all the things we fetched - # from the database. - for group, state_ids in group_state_dict.items(): - if types: - # We delibrately put key -> None mappings into the cache to - # cache absence of the key, on the assumption that if we've - # explicitly asked for some types then we will probably ask - # for them again. - state_dict = {key: None for key in types} - state_dict.update(results[group]) - results[group] = state_dict - else: - state_dict = results[group] - - for event_id in state_ids: - try: - state_event = state_events[event_id] - state_dict[(state_event.type, state_event.state_key)] = state_event - except KeyError: - # Hmm. So we do don't have that state event? Interesting. - logger.warn( - "Can't find state event %r for state group %r", - event_id, group, - ) - - self._state_group_cache.update( - cache_seq_num, - key=group, - value=state_dict, - full=(types is None), - ) - # Remove all the entries with None values. The None values were just # used for bookkeeping in the cache. for group, state_dict in results.items(): results[group] = { - key: event for key, event in state_dict.items() if event + key: state_events[event_id] + for key, event_id in state_dict.items() + if event_id and event_id in state_events } defer.returnValue(results) From 5defb25ac622d979c13d9e9d311e69c2ef7c15a5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Mar 2016 13:52:45 +0000 Subject: [PATCH 37/59] Use get_users_in_room to count the number of room members rather than using read_receipts --- synapse/push/bulk_push_rule_evaluator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 87d5061fb..76d7eb7ce 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -107,7 +107,9 @@ class BulkPushRuleEvaluator: users_dict.items(), [event], {event.event_id: current_state} ) - evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room)) + room_members = yield self.store.get_users_in_room(self.room_id) + + evaluator = PushRuleEvaluatorForEvent(event, len(room_members)) condition_cache = {} From 76d18a577661257abe0d047129383dbd09bbe9b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 14:08:13 +0000 Subject: [PATCH 38/59] Bump get_aliases_for_room cache --- synapse/storage/directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 012a0b414..ef231a04d 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -155,7 +155,7 @@ class DirectoryStore(SQLBaseStore): return room_id - @cached() + @cached(max_entries=5000) def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", From 6cf0ba14663c263ea5f423a4b24b3915b8349e36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 14:18:21 +0000 Subject: [PATCH 39/59] Bump get_unread_event_push_actions_by_room_for_user cache --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5820539a9..dc5830450 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -49,7 +49,7 @@ class EventPushActionsStore(SQLBaseStore): ) self._simple_insert_many_txn(txn, "event_push_actions", values) - @cachedInlineCallbacks(num_args=3, lru=True, tree=True) + @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000) def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): From d787e41b2046572d42c13b6aa9b4636f98f7f9e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 14:44:48 +0000 Subject: [PATCH 40/59] Measure StateHandler._resolve_events --- synapse/state.py | 74 +++++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index b9a138752..e09632972 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.auth import AuthEventTypes @@ -263,48 +264,49 @@ class StateHandler(object): from (type, state_key) to event. prev_states is a list of event_ids. :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str]) """ - state = {} - for st in state_sets: - for e in st: - state.setdefault( - (e.type, e.state_key), - {} - )[e.event_id] = e + with Measure(self.clock, "state._resolve_events"): + state = {} + for st in state_sets: + for e in st: + state.setdefault( + (e.type, e.state_key), + {} + )[e.event_id] = e - unconflicted_state = { - k: v.values()[0] for k, v in state.items() - if len(v.values()) == 1 - } + unconflicted_state = { + k: v.values()[0] for k, v in state.items() + if len(v.values()) == 1 + } - conflicted_state = { - k: v.values() - for k, v in state.items() - if len(v.values()) > 1 - } + conflicted_state = { + k: v.values() + for k, v in state.items() + if len(v.values()) > 1 + } - if event_type: - prev_states_events = conflicted_state.get( - (event_type, state_key), [] - ) - prev_states = [s.event_id for s in prev_states_events] - else: - prev_states = [] + if event_type: + prev_states_events = conflicted_state.get( + (event_type, state_key), [] + ) + prev_states = [s.event_id for s in prev_states_events] + else: + prev_states = [] - auth_events = { - k: e for k, e in unconflicted_state.items() - if k[0] in AuthEventTypes - } + auth_events = { + k: e for k, e in unconflicted_state.items() + if k[0] in AuthEventTypes + } - try: - resolved_state = self._resolve_state_events( - conflicted_state, auth_events - ) - except: - logger.exception("Failed to resolve state") - raise + try: + resolved_state = self._resolve_state_events( + conflicted_state, auth_events + ) + except: + logger.exception("Failed to resolve state") + raise - new_state = unconflicted_state - new_state.update(resolved_state) + new_state = unconflicted_state + new_state.update(resolved_state) return new_state, prev_states From 99f929f36b396b7152b3840c11e8debc5505f673 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 15:31:13 +0000 Subject: [PATCH 41/59] Make StateHandler._state_cache only store event_ids. --- synapse/state.py | 24 +++++++++++++++++------- synapse/storage/events.py | 25 +++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index e09632972..9d90a437d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -28,6 +28,7 @@ from collections import namedtuple import logging import hashlib +import os logger = logging.getLogger(__name__) @@ -35,8 +36,11 @@ logger = logging.getLogger(__name__) KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -SIZE_OF_CACHE = 1000 -EVICTION_TIMEOUT_SECONDS = 20 +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + +SIZE_OF_CACHE = int(5000 * CACHE_SIZE_FACTOR) +EVICTION_TIMEOUT_SECONDS = 60 * 60 class _StateCacheEntry(object): @@ -92,7 +96,9 @@ class StateHandler(object): if cache: cache.ts = self.clock.time_msec() - state = cache.state + + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} else: res = yield self.resolve_state_groups(room_id, event_ids) state = res[1] @@ -191,14 +197,18 @@ class StateHandler(object): cache = self._state_cache.get(frozenset(event_ids), None) if cache and cache.state_group: cache.ts = self.clock.time_msec() - prev_state = cache.state.get((event_type, state_key), None) + + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} + + prev_state = state.get((event_type, state_key), None) if prev_state: prev_state = prev_state.event_id prev_states = [prev_state] else: prev_states = [] defer.returnValue( - (cache.state_group, cache.state, prev_states) + (cache.state_group, state, prev_states) ) state_groups = yield self.store.get_state_groups( @@ -226,7 +236,7 @@ class StateHandler(object): if self._state_cache is not None: cache = _StateCacheEntry( - state=state, + state={key: event.event_id for key, event in state.items()}, state_group=name, ts=self.clock.time_msec() ) @@ -241,7 +251,7 @@ class StateHandler(object): if self._state_cache is not None: cache = _StateCacheEntry( - state=new_state, + state={key: event.event_id for key, event in new_state.items()}, state_group=None, ts=self.clock.time_msec() ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e444b64ce..584e659d4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -151,6 +151,31 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) + @defer.inlineCallbacks + def get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred : Dict from event_id to event. + """ + events = yield self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + defer.returnValue({e.event_id: e for e in events}) + @log_function def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): From c4a8cbd15a471d2a658de96abcc3254fc95de1bf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 16:06:21 +0000 Subject: [PATCH 42/59] Make LruCache use a dedicated _Node class --- synapse/util/caches/lrucache.py | 73 ++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index f7423f2fa..f9df445a8 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -29,6 +29,16 @@ def enumerate_leaves(node, depth): yield m +class _Node(object): + __slots__ = ["prev_node", "next_node", "key", "value"] + + def __init__(self, prev_node, next_node, key, value): + self.prev_node = prev_node + self.next_node = next_node + self.key = key + self.value = value + + class LruCache(object): """ Least-recently-used cache. @@ -38,10 +48,9 @@ class LruCache(object): def __init__(self, max_size, keylen=1, cache_type=dict): cache = cache_type() self.cache = cache # Used for introspection. - list_root = [] - list_root[:] = [list_root, list_root, None, None] - - PREV, NEXT, KEY, VALUE = 0, 1, 2, 3 + list_root = _Node(None, None, None, None) + list_root.next_node = list_root + list_root.prev_node = list_root lock = threading.Lock() @@ -55,36 +64,36 @@ class LruCache(object): def add_node(key, value): prev_node = list_root - next_node = prev_node[NEXT] - node = [prev_node, next_node, key, value] - prev_node[NEXT] = node - next_node[PREV] = node + next_node = prev_node.next_node + node = _Node(prev_node, next_node, key, value) + prev_node.next_node = node + next_node.prev_node = node cache[key] = node def move_node_to_front(node): - prev_node = node[PREV] - next_node = node[NEXT] - prev_node[NEXT] = next_node - next_node[PREV] = prev_node + prev_node = node.prev_node + next_node = node.next_node + prev_node.next_node = next_node + next_node.prev_node = prev_node prev_node = list_root - next_node = prev_node[NEXT] - node[PREV] = prev_node - node[NEXT] = next_node - prev_node[NEXT] = node - next_node[PREV] = node + next_node = prev_node.next_node + node.prev_node = prev_node + node.next_node = next_node + prev_node.next_node = node + next_node.prev_node = node def delete_node(node): - prev_node = node[PREV] - next_node = node[NEXT] - prev_node[NEXT] = next_node - next_node[PREV] = prev_node + prev_node = node.prev_node + next_node = node.next_node + prev_node.next_node = next_node + next_node.prev_node = prev_node @synchronized def cache_get(key, default=None): node = cache.get(key, None) if node is not None: move_node_to_front(node) - return node[VALUE] + return node.value else: return default @@ -93,25 +102,25 @@ class LruCache(object): node = cache.get(key, None) if node is not None: move_node_to_front(node) - node[VALUE] = value + node.value = value else: add_node(key, value) if len(cache) > max_size: - todelete = list_root[PREV] + todelete = list_root.prev_node delete_node(todelete) - cache.pop(todelete[KEY], None) + cache.pop(todelete.key, None) @synchronized def cache_set_default(key, value): node = cache.get(key, None) if node is not None: - return node[VALUE] + return node.value else: add_node(key, value) if len(cache) > max_size: - todelete = list_root[PREV] + todelete = list_root.prev_node delete_node(todelete) - cache.pop(todelete[KEY], None) + cache.pop(todelete.key, None) return value @synchronized @@ -119,8 +128,8 @@ class LruCache(object): node = cache.get(key, None) if node: delete_node(node) - cache.pop(node[KEY], None) - return node[VALUE] + cache.pop(node.key, None) + return node.value else: return default @@ -137,8 +146,8 @@ class LruCache(object): @synchronized def cache_clear(): - list_root[NEXT] = list_root - list_root[PREV] = list_root + list_root.next_node = list_root + list_root.prev_node = list_root cache.clear() @synchronized From d531ebcb57de61bad0ac2e4231280d41d8db4404 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 18:02:36 +0000 Subject: [PATCH 43/59] Key StateHandler._state_cache off of state groups --- synapse/state.py | 77 ++++++++++++++++++------------------------------ 1 file changed, 29 insertions(+), 48 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 9d90a437d..14c043001 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -90,18 +90,8 @@ class StateHandler(object): """ event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - cache = None - if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) - - if cache: - cache.ts = self.clock.time_msec() - - event_dict = yield self.store.get_events(cache.state.values()) - state = {(e.type, e.state_key): e for e in event_dict.values()} - else: - res = yield self.resolve_state_groups(room_id, event_ids) - state = res[1] + res = yield self.resolve_state_groups(room_id, event_ids) + state = res[1] if event_type: defer.returnValue(state.get((event_type, state_key))) @@ -193,8 +183,33 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) + state_groups = yield self.store.get_state_groups( + room_id, event_ids + ) + + logger.debug( + "resolve_state_groups state_groups %s", + state_groups.keys() + ) + + group_names = frozenset(state_groups.keys()) + if len(group_names) == 1: + name, state_list = state_groups.items().pop() + state = { + (e.type, e.state_key): e + for e in state_list + } + prev_state = state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + prev_states = [prev_state] + else: + prev_states = [] + + defer.returnValue((name, state, prev_states)) + if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) + cache = self._state_cache.get(group_names, None) if cache and cache.state_group: cache.ts = self.clock.time_msec() @@ -211,40 +226,6 @@ class StateHandler(object): (cache.state_group, state, prev_states) ) - state_groups = yield self.store.get_state_groups( - room_id, event_ids - ) - - logger.debug( - "resolve_state_groups state_groups %s", - state_groups.keys() - ) - - group_names = set(state_groups.keys()) - if len(group_names) == 1: - name, state_list = state_groups.items().pop() - state = { - (e.type, e.state_key): e - for e in state_list - } - prev_state = state.get((event_type, state_key), None) - if prev_state: - prev_state = prev_state.event_id - prev_states = [prev_state] - else: - prev_states = [] - - if self._state_cache is not None: - cache = _StateCacheEntry( - state={key: event.event_id for key, event in state.items()}, - state_group=name, - ts=self.clock.time_msec() - ) - - self._state_cache[frozenset(event_ids)] = cache - - defer.returnValue((name, state, prev_states)) - new_state, prev_states = self._resolve_events( state_groups.values(), event_type, state_key ) @@ -256,7 +237,7 @@ class StateHandler(object): ts=self.clock.time_msec() ) - self._state_cache[frozenset(event_ids)] = cache + self._state_cache[group_names] = cache defer.returnValue((None, new_state, prev_states)) From 9e2e994395327956f846113566fd18c01f12441a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 09:28:07 +0000 Subject: [PATCH 44/59] Reduce cache size --- synapse/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/state.py b/synapse/state.py index 14c043001..41d32e664 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -39,7 +39,7 @@ KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) -SIZE_OF_CACHE = int(5000 * CACHE_SIZE_FACTOR) +SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 From b6507869cdd5da18117dbbd0fbf78f4bdd4391f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 10:32:10 +0000 Subject: [PATCH 45/59] Make get_invites return RoomsForUser --- synapse/push/__init__.py | 2 +- synapse/storage/roommember.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 65ef1b68a..296c4447e 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -317,7 +317,7 @@ class Pusher(object): @defer.inlineCallbacks def _get_badge_count(self): invites, joins = yield defer.gatherResults([ - self.store.get_invites_for_user(self.user_id), + self.store.get_invited_rooms_for_user(self.user_id), self.store.get_rooms_for_user(self.user_id), ], consumeErrors=True) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0cd89260f..430b49c12 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -115,19 +115,17 @@ class RoomMemberStore(SQLBaseStore): ).addCallback(self._get_events) @cached() - def get_invites_for_user(self, user_id): - """ Get all the invite events for a user + def get_invited_rooms_for_user(self, user_id): + """ Get all the rooms the user is invited to Args: user_id (str): The user ID. Returns: - A deferred list of event objects. + A deferred list of RoomsForUser. """ return self.get_rooms_for_user_where_membership_is( user_id, [Membership.INVITE] - ).addCallback(lambda invites: self._get_events([ - invite.event_id for invite in invites - ])) + ) def get_leave_and_ban_events_for_user(self, user_id): """ Get all the leave events for a user From 34473a9c7f3e42db5154d2558e737fcab2546a81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 10:42:19 +0000 Subject: [PATCH 46/59] Don't require alias in public room list. Rooms now no longer require an alias to be published. Also, changes the way we pull out state of each room to not require fetching all state events. --- synapse/handlers/room.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 051468989..a07c0ee43 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -946,53 +946,52 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def handle_room(room_id): aliases = yield self.store.get_aliases_for_room(room_id) - if not aliases: - defer.returnValue(None) - state = yield self.state_handler.get_current_state(room_id) + def get_state(etype, state_key): + return self.state_handler.get_current_state(room_id, etype, state_key) - result = {"aliases": aliases, "room_id": room_id} + result = {"room_id": room_id} + if aliases: + result["aliases"] = aliases - name_event = state.get((EventTypes.Name, ""), None) + name_event = yield get_state(EventTypes.Name, "") if name_event: name = name_event.content.get("name", None) if name: result["name"] = name - topic_event = state.get((EventTypes.Topic, ""), None) + topic_event = yield get_state(EventTypes.Topic, "") if topic_event: topic = topic_event.content.get("topic", None) if topic: result["topic"] = topic - canonical_event = state.get((EventTypes.CanonicalAlias, ""), None) + canonical_event = yield get_state(EventTypes.CanonicalAlias, "") if canonical_event: canonical_alias = canonical_event.content.get("alias", None) if canonical_alias: result["canonical_alias"] = canonical_alias - visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) + visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "") visibility = None if visibility_event: visibility = visibility_event.content.get("history_visibility", None) result["world_readable"] = visibility == "world_readable" - guest_event = state.get((EventTypes.GuestAccess, ""), None) + guest_event = yield get_state(EventTypes.GuestAccess, "") guest = None if guest_event: guest = guest_event.content.get("guest_access", None) result["guest_can_join"] = guest == "can_join" - avatar_event = state.get(("m.room.avatar", ""), None) + avatar_event = yield get_state("m.room.avatar", "") if avatar_event: avatar_url = avatar_event.content.get("url", None) if avatar_url: result["avatar_url"] = avatar_url - result["num_joined_members"] = sum( - 1 for (event_type, _), ev in state.items() - if event_type == EventTypes.Member and ev.membership == Membership.JOIN - ) + joined_users = yield self.store.get_users_in_room(room_id) + result["num_joined_members"] = len(joined_users) defer.returnValue(result) From 8b0dfc9fc4e98ea9c029c5f7b49efaf011ce1979 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 11:37:58 +0000 Subject: [PATCH 47/59] Don't cache events in get_current_state_for_key --- synapse/storage/events.py | 4 ++-- synapse/storage/state.py | 16 +++++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e444b64ce..7bcddf1f3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -157,7 +157,7 @@ class EventsStore(SQLBaseStore): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.call_after(self.get_current_state_for_key.invalidate_all) + txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) @@ -441,7 +441,7 @@ class EventsStore(SQLBaseStore): for event, _ in state_events_and_contexts: if not context.rejected: txn.call_after( - self.get_current_state_for_key.invalidate, + self._get_current_state_for_key.invalidate, (event.room_id, event.type, event.state_key,) ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f06c734c4..eab2c5a8c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,9 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import ( - cached, cachedInlineCallbacks, cachedList -) +from synapse.util.caches.descriptors import cached, cachedList from twisted.internet import defer @@ -155,8 +153,14 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @cachedInlineCallbacks(num_args=3) + @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): + event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) + events = yield self._get_events(event_ids, get_prev_content=False) + defer.returnValue(events) + + @cached(num_args=3) + def _get_current_state_for_key(self, room_id, event_type, state_key): def f(txn): sql = ( "SELECT event_id FROM current_state_events" @@ -167,9 +171,7 @@ class StateStore(SQLBaseStore): txn.execute(sql, args) results = txn.fetchall() return [r[0] for r in results] - event_ids = yield self.runInteraction("get_current_state_for_key", f) - events = yield self._get_events(event_ids, get_prev_content=False) - defer.returnValue(events) + return self.runInteraction("get_current_state_for_key", f) def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) From d87a846ebceb81f78660c4900126eff6e3998b8a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 11:42:50 +0000 Subject: [PATCH 48/59] Don't cache events in get_recent_events_for_room --- synapse/storage/stream.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7f4a82752..cf84938be 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,7 +36,7 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn @@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) - @cachedInlineCallbacks(num_args=4) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): + rows, token = yield self.get_recent_event_ids_for_room( + room_id, limit, end_token, from_token + ) + logger.debug("stream before") + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + logger.debug("stream after") + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) + + @cached(num_args=4) + def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): end_token = RoomStreamToken.parse_stream_token(end_token) if from_token is None: @@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore): return rows, token - rows, token = yield self.runInteraction( + return self.runInteraction( "get_recent_events_for_room", get_recent_events_for_room_txn ) - logger.debug("stream before") - events = yield self._get_events( - [r["event_id"] for r in rows], - get_prev_content=True - ) - logger.debug("stream after") - - self._set_before_and_after(events, rows) - - defer.returnValue((events, token)) - @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token() From 0677fc1c4e48fb0b91a2f91b348d16e5ce676125 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 13:24:27 +0000 Subject: [PATCH 49/59] Comment --- synapse/handlers/room.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a07c0ee43..25225ea1c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -947,6 +947,9 @@ class RoomListHandler(BaseHandler): def handle_room(room_id): aliases = yield self.store.get_aliases_for_room(room_id) + # We pull each bit of state out indvidually to avoid pulling the + # full state into memory. Due to how the caching works this should + # be fairly quick, even if not originally in the cache. def get_state(etype, state_key): return self.state_handler.get_current_state(room_id, etype, state_key) From b2802a1351cf5dbfb68d0e0f96ad9fb16df98fe8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 13:45:57 +0000 Subject: [PATCH 50/59] Ensure published rooms have public join rules --- synapse/handlers/room.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 25225ea1c..7062414ad 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -953,6 +953,13 @@ class RoomListHandler(BaseHandler): def get_state(etype, state_key): return self.state_handler.get_current_state(room_id, etype, state_key) + # Double check that this is actually a public room. + join_rules_event = yield get_state(EventTypes.JoinRules, "") + if join_rules_event: + join_rule = join_rules_event.content.get("join_rule", None) + if join_rule and join_rule != JoinRules.PUBLIC: + defer.returnValue(None) + result = {"room_id": room_id} if aliases: result["aliases"] = aliases From 84afeb41f32acdab22036b8c1efbb402eef31cd7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 13:49:10 +0000 Subject: [PATCH 51/59] Ensure all old public rooms have aliases --- synapse/handlers/room.py | 9 ++++---- .../storage/schema/delta/30/public_rooms.sql | 21 +++++++++++++++++++ 2 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/30/public_rooms.sql diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7062414ad..d5c56ce0d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -119,7 +119,8 @@ class RoomCreationHandler(BaseHandler): invite_3pid_list = config.get("invite_3pid", []) - is_public = config.get("visibility", None) == "public" + visibility = config.get("visibility", None) + is_public = visibility == "public" # autogen room IDs and try to create it. We may clash, so just # try a few times till one goes through, giving up eventually. @@ -155,9 +156,9 @@ class RoomCreationHandler(BaseHandler): preset_config = config.get( "preset", - RoomCreationPreset.PUBLIC_CHAT - if is_public - else RoomCreationPreset.PRIVATE_CHAT + RoomCreationPreset.PRIVATE_CHAT + if visibility == "private" + else RoomCreationPreset.PUBLIC_CHAT ) raw_initial_state = config.get("initial_state", []) diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql new file mode 100644 index 000000000..a48604faa --- /dev/null +++ b/synapse/storage/schema/delta/30/public_rooms.sql @@ -0,0 +1,21 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/* This release removes the restriction that published rooms must have an alias, + * so we go back and ensure the only 'public' rooms are ones with an alias.*/ +UPDATE rooms SET is_public = 0 WHERE is_public = 1 AND room_id not in ( + SELECT room_id FROM room_aliases +); From 0c1a27b7877c08643ced4bdabe9843d69b0bcea1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 14:10:49 +0000 Subject: [PATCH 52/59] SQLite and postgres doesn't share a true literal --- synapse/storage/schema/delta/30/public_rooms.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql index a48604faa..3400898ed 100644 --- a/synapse/storage/schema/delta/30/public_rooms.sql +++ b/synapse/storage/schema/delta/30/public_rooms.sql @@ -16,6 +16,6 @@ /* This release removes the restriction that published rooms must have an alias, * so we go back and ensure the only 'public' rooms are ones with an alias.*/ -UPDATE rooms SET is_public = 0 WHERE is_public = 1 AND room_id not in ( +UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in ( SELECT room_id FROM room_aliases ); From b2757655455fb2bf485c66affeb5a294eb9459c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 14:15:32 +0000 Subject: [PATCH 53/59] Comment about weird SQL --- synapse/storage/schema/delta/30/public_rooms.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql index 3400898ed..f09db4faa 100644 --- a/synapse/storage/schema/delta/30/public_rooms.sql +++ b/synapse/storage/schema/delta/30/public_rooms.sql @@ -15,7 +15,9 @@ /* This release removes the restriction that published rooms must have an alias, - * so we go back and ensure the only 'public' rooms are ones with an alias.*/ + * so we go back and ensure the only 'public' rooms are ones with an alias. + * We use (1 = 0) and (1 = 1) so that it works in both postgres and sqlite + */ UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in ( SELECT room_id FROM room_aliases ); From 75daede92f041500347a5f446229be5ca50c2b8e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 18:22:52 +0000 Subject: [PATCH 54/59] String intern --- synapse/storage/state.py | 12 +++++++++--- synapse/util/caches/__init__.py | 8 ++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index eab2c5a8c..1982b1c60 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches import intern_string from twisted.internet import defer @@ -155,7 +156,9 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): - event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) + event_ids = yield self._get_current_state_for_key( + room_id, intern_string(event_type), intern_string(state_key) + ) events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) @@ -202,7 +205,7 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - key = (row["type"], row["state_key"]) + key = (intern_string(row["type"]), intern_string(row["state_key"])) results.setdefault(row["state_group"], {})[key] = row["event_id"] return results @@ -393,7 +396,10 @@ class StateStore(SQLBaseStore): # cache absence of the key, on the assumption that if we've # explicitly asked for some types then we will probably ask # for them again. - state_dict = {key: None for key in types} + state_dict = { + (intern_string(etype), intern_string(state_key)): None + for (etype, state_key) in types + } state_dict.update(results[group]) results[group] = state_dict else: diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 1a1490419..9d450fade 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. import synapse.metrics +from lrucache import LruCache DEBUG_CACHES = False @@ -25,3 +26,10 @@ cache_counter = metrics.register_cache( lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, labels=["name"], ) + +_string_cache = LruCache(5000) +caches_by_name["string_cache"] = _string_cache + + +def intern_string(string): + return _string_cache.setdefault(string, string) From fe9794706ab817fcedc17f99693eb0823b339d93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 14:58:08 +0000 Subject: [PATCH 55/59] Intern type and state_key on events --- synapse/events/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index abed6b5e6..2ceac19ad 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.util.frozenutils import freeze +from synapse.util.caches import intern_string # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents @@ -140,6 +141,12 @@ class FrozenEvent(EventBase): unsigned = dict(event_dict.pop("unsigned", {})) + # We intern these strings because they turn up a lot (especially when + # caching). + event_dict["type"] = intern_string(event_dict["type"]) + if "state_key" in event_dict: + event_dict["state_key"] = intern_string(event_dict["state_key"]) + if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) else: From f96526ffc23fdd99ab47abda67fb579a1ad764f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 15:01:05 +0000 Subject: [PATCH 56/59] Intern sender, event_id and room_id in events --- synapse/events/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 2ceac19ad..63004eaf0 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -146,6 +146,11 @@ class FrozenEvent(EventBase): event_dict["type"] = intern_string(event_dict["type"]) if "state_key" in event_dict: event_dict["state_key"] = intern_string(event_dict["state_key"]) + if "sender" in event_dict: + event_dict["sender"] = intern_string(event_dict["sender"]) + + event_dict["event_id"] = intern(event_dict["event_id"].encode('ascii')) + event_dict["room_id"] = intern(event_dict["room_id"].encode('ascii')) if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) From acdfef7b1443a8260c43e31e9944b74dfdf286dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 16:13:05 +0000 Subject: [PATCH 57/59] Intern all the things --- synapse/events/__init__.py | 11 +---- synapse/federation/federation_client.py | 1 + synapse/federation/transport/server.py | 28 ++++++------ synapse/http/server.py | 10 +++-- synapse/storage/_base.py | 3 +- synapse/storage/receipts.py | 21 +++++---- synapse/storage/state.py | 10 ++--- synapse/util/caches/__init__.py | 58 ++++++++++++++++++++++++- 8 files changed, 97 insertions(+), 45 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 63004eaf0..23f8b612a 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.util.frozenutils import freeze -from synapse.util.caches import intern_string +from synapse.util.caches import intern_dict # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents @@ -143,14 +143,7 @@ class FrozenEvent(EventBase): # We intern these strings because they turn up a lot (especially when # caching). - event_dict["type"] = intern_string(event_dict["type"]) - if "state_key" in event_dict: - event_dict["state_key"] = intern_string(event_dict["state_key"]) - if "sender" in event_dict: - event_dict["sender"] = intern_string(event_dict["sender"]) - - event_dict["event_id"] = intern(event_dict["event_id"].encode('ascii')) - event_dict["room_id"] = intern(event_dict["room_id"].encode('ascii')) + event_dict = intern_dict(event_dict) if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 83c1f4658..37ee469fa 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -418,6 +418,7 @@ class FederationClient(FederationBase): "Failed to make_%s via %s: %s", membership, destination, e.message ) + raise raise RuntimeError("Failed to send to any server.") diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 208bff8d4..d65a7893d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -175,7 +175,7 @@ class BaseFederationServlet(object): class FederationSendServlet(BaseFederationServlet): - PATH = "/send/([^/]*)/" + PATH = "/send/(?P[^/]*)/" def __init__(self, handler, server_name, **kwargs): super(FederationSendServlet, self).__init__( @@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet): class FederationEventServlet(BaseFederationServlet): - PATH = "/event/([^/]*)/" + PATH = "/event/(?P[^/]*)/" # This is when someone asks for a data item for a given server data_id pair. def on_GET(self, origin, content, query, event_id): @@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet): class FederationStateServlet(BaseFederationServlet): - PATH = "/state/([^/]*)/" + PATH = "/state/(?P[^/]*)/" # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): @@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet): - PATH = "/backfill/([^/]*)/" + PATH = "/backfill/(?P[^/]*)/" def on_GET(self, origin, content, query, context): versions = query["v"] @@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet): class FederationQueryServlet(BaseFederationServlet): - PATH = "/query/([^/]*)" + PATH = "/query/(?P[^/]*)" # This is when we receive a server-server Query def on_GET(self, origin, content, query, query_type): @@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet): class FederationMakeJoinServlet(BaseFederationServlet): - PATH = "/make_join/([^/]*)/([^/]*)" + PATH = "/make_join/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet): class FederationMakeLeaveServlet(BaseFederationServlet): - PATH = "/make_leave/([^/]*)/([^/]*)" + PATH = "/make_leave/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet): class FederationSendLeaveServlet(BaseFederationServlet): - PATH = "/send_leave/([^/]*)/([^/]*)" + PATH = "/send_leave/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id, txid): @@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet): - PATH = "/event_auth/([^/]*)/([^/]*)" + PATH = "/event_auth(?P[^/]*)/(?P[^/]*)" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) class FederationSendJoinServlet(BaseFederationServlet): - PATH = "/send_join/([^/]*)/([^/]*)" + PATH = "/send_join/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet): class FederationInviteServlet(BaseFederationServlet): - PATH = "/invite/([^/]*)/([^/]*)" + PATH = "/invite/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet): class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): - PATH = "/exchange_third_party_invite/([^/]*)" + PATH = "/exchange_third_party_invite/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id): @@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet): class FederationQueryAuthServlet(BaseFederationServlet): - PATH = "/query_auth/([^/]*)/([^/]*)" + PATH = "/query_auth/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): @@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet): class FederationGetMissingEventsServlet(BaseFederationServlet): # TODO(paul): Why does this path alone end with "/?" optional? - PATH = "/get_missing_events/([^/]*)/?" + PATH = "/get_missing_events/(?P[^/]*)/?" @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): diff --git a/synapse/http/server.py b/synapse/http/server.py index b17b190ee..b82196fd5 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes ) from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.caches import intern_dict import synapse.metrics import synapse.events @@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource): else: servlet_classname = "%r" % callback - args = [ - urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups() - ] + kwargs = intern_dict({ + name: urllib.unquote(value).decode("UTF-8") if value else value + for name, value in m.groupdict().items() + }) - callback_return = yield callback(request, *args) + callback_return = yield callback(request, **kwargs) if callback_return is not None: code, response = callback_return self._send_response(request, code, response) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 583b77a83..b75b79df3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,6 +18,7 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache +from synapse.util.caches import intern_dict import synapse.metrics @@ -350,7 +351,7 @@ class SQLBaseStore(object): """ col_headers = list(column[0] for column in cursor.description) results = list( - dict(zip(col_headers, row)) for row in cursor.fetchall() + intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall() ) return results diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index dbc074d6b..6b9d848ea 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2) def get_receipts_for_user(self, user_id, receipt_type): - def f(txn): - sql = ( - "SELECT room_id,event_id " - "FROM receipts_linearized " - "WHERE user_id = ? AND receipt_type = ? " - ) - txn.execute(sql, (user_id, receipt_type)) - return txn.fetchall() + rows = yield self._simple_select_list( + table="receipts_linearized", + keyvalues={ + "user_id": user_id, + "receipt_type": receipt_type, + }, + retcols=("room_id", "event_id"), + desc="get_receipts_for_user", + ) - defer.returnValue(dict( - (yield self.runInteraction("get_receipts_for_user", f)) - )) + defer.returnValue({row["room_id"]: row["event_id"] for row in rows}) @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1982b1c60..03eecbbbb 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -156,9 +156,7 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): - event_ids = yield self._get_current_state_for_key( - room_id, intern_string(event_type), intern_string(state_key) - ) + event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) @@ -205,7 +203,7 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - key = (intern_string(row["type"]), intern_string(row["state_key"])) + key = (row["type"], row["state_key"]) results.setdefault(row["state_group"], {})[key] = row["event_id"] return results @@ -286,7 +284,9 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_events", ) - defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) + defer.returnValue({ + intern(row["event_id"].encode('ascii')): row["state_group"] for row in rows + }) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 9d450fade..838cec45f 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -15,6 +15,9 @@ import synapse.metrics from lrucache import LruCache +import os + +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) DEBUG_CACHES = False @@ -27,9 +30,62 @@ cache_counter = metrics.register_cache( labels=["name"], ) -_string_cache = LruCache(5000) +_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache +KNOWN_KEYS = { + key: key for key in + ( + "auth_events", + "content", + "depth", + "event_id", + "hashes", + "origin", + "origin_server_ts", + "prev_events", + "room_id", + "sender", + "signatures", + "state_key", + "type", + "unsigned", + "user_id", + ) +} + + def intern_string(string): + """Takes a (potentially) unicode string and interns using custom cache + """ return _string_cache.setdefault(string, string) + + +def intern_dict(dictionary): + """Takes a dictionary and interns well known keys and their values + """ + return _intern_known_values({ + _intern_key(key): value for key, value in dictionary.items() + }) + + +def _intern_known_values(dictionary): + intern_str_keys = ("event_id", "room_id") + intern_unicode_keys = ("sender", "user_id", "type", "state_key") + + for key in intern_str_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern(val.encode('ascii')) + + for key in intern_unicode_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern_string(val) + + return dictionary + + +def _intern_key(key): + return KNOWN_KEYS.get(key, key) From 2f0180b09e2a2afeed418a5840ae6b4fffcb4be4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 16:29:46 +0000 Subject: [PATCH 58/59] Don't bother interning keys that are already interned --- synapse/storage/state.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 03eecbbbb..02cefdff2 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -284,9 +284,7 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_events", ) - defer.returnValue({ - intern(row["event_id"].encode('ascii')): row["state_group"] for row in rows - }) + defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` From 8122ad7bab74b1a52188e350ca605033a9eca28e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 16:34:59 +0000 Subject: [PATCH 59/59] Simplify intern_dict --- synapse/util/caches/__init__.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 838cec45f..d53569ca4 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -65,27 +65,20 @@ def intern_string(string): def intern_dict(dictionary): """Takes a dictionary and interns well known keys and their values """ - return _intern_known_values({ - _intern_key(key): value for key, value in dictionary.items() - }) + return { + KNOWN_KEYS.get(key, key): _intern_known_values(key, value) + for key, value in dictionary.items() + } -def _intern_known_values(dictionary): +def _intern_known_values(key, value): intern_str_keys = ("event_id", "room_id") intern_unicode_keys = ("sender", "user_id", "type", "state_key") - for key in intern_str_keys: - val = dictionary.get(key, None) - if val is not None: - dictionary[key] = intern(val.encode('ascii')) + if key in intern_str_keys: + return intern(value.encode('ascii')) - for key in intern_unicode_keys: - val = dictionary.get(key, None) - if val is not None: - dictionary[key] = intern_string(val) + if key in intern_unicode_keys: + return intern_string(value) - return dictionary - - -def _intern_key(key): - return KNOWN_KEYS.get(key, key) + return value