From b3be9e4376b976a5cc4b37d70c4985342d5c30cb Mon Sep 17 00:00:00 2001 From: Fabian Niepelt Date: Tue, 30 Aug 2016 15:03:03 +0200 Subject: [PATCH 01/13] Add prerequisites to install on openSUSE to README Signed-off-by: Fabian Niepelt --- README.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.rst b/README.rst index 323f5b8db7..358038ade0 100644 --- a/README.rst +++ b/README.rst @@ -134,6 +134,12 @@ Installing prerequisites on Raspbian:: sudo pip install --upgrade ndg-httpsclient sudo pip install --upgrade virtualenv +Installing prerequisites on openSUSE:: + + sudo zypper in -t pattern devel_basis + sudo zypper in python-pip python-setuptools sqlite3 python-virtualenv \ + python-devel libffi-devel libopenssl-devel libjpeg62-devel + To install the synapse homeserver run:: virtualenv -p python2.7 ~/.synapse From e82247f990d8b47fc1d151d6151d82f9d802bebb Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 16:21:16 +0100 Subject: [PATCH 02/13] Allow application services to have an optional 'url' If 'url' is not specified, they will not be pushed for events or queries. This is useful for bots who simply wish to reserve large chunks of user/alias namespace, and don't care about being pushed for events. --- synapse/appservice/api.py | 11 +++++++++++ synapse/config/appservice.py | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 775417eb21..e596cb7376 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -67,6 +67,8 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_user(self, service, user_id): + if service.url == "": + defer.returnValue(False) uri = service.url + ("/users/%s" % urllib.quote(user_id)) response = None try: @@ -86,6 +88,8 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_alias(self, service, alias): + if service.url == "": + defer.returnValue(False) uri = service.url + ("/rooms/%s" % urllib.quote(alias)) response = None try: @@ -113,6 +117,8 @@ class ApplicationServiceApi(SimpleHttpClient): raise ValueError( "Unrecognised 'kind' argument %r to query_3pe()", kind ) + if service.url == "": + defer.returnValue([]) uri = "%s%s/thirdparty/%s/%s" % ( service.url, @@ -145,6 +151,8 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue([]) def get_3pe_protocol(self, service, protocol): + if service.url == "": + defer.returnValue({}) @defer.inlineCallbacks def _get(): uri = "%s%s/thirdparty/protocol/%s" % ( @@ -166,6 +174,9 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): + if service.url == "": + defer.returnValue(True) + events = self._serialize(events) if txn_id is None: diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index dfe43b0b4c..3488a28ff2 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -132,6 +132,12 @@ def _load_appservice(hostname, as_info, config_filename): for p in protocols: if not isinstance(p, str): raise KeyError("Bad value for 'protocols' item") + + if as_info["url"] == "": + logger.info( + "(%s) Explicitly empty 'url' provided. This application service will not receive events or queries.", + config_filename, + ) return ApplicationService( token=as_info["as_token"], url=as_info["url"], From 16b652f0a375a880175b54d5d439bde6a0604dd2 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 16:30:12 +0100 Subject: [PATCH 03/13] Flake8 --- synapse/appservice/api.py | 1 + synapse/config/appservice.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index e596cb7376..ee63f06359 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -153,6 +153,7 @@ class ApplicationServiceApi(SimpleHttpClient): def get_3pe_protocol(self, service, protocol): if service.url == "": defer.returnValue({}) + @defer.inlineCallbacks def _get(): uri = "%s%s/thirdparty/protocol/%s" % ( diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 3488a28ff2..5569c43b35 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -135,7 +135,8 @@ def _load_appservice(hostname, as_info, config_filename): if as_info["url"] == "": logger.info( - "(%s) Explicitly empty 'url' provided. This application service will not receive events or queries.", + "(%s) Explicitly empty 'url' provided. This application service " + + "will not receive events or queries.", config_filename, ) return ApplicationService( From 572acde4832869c8f7f4daf555296bd1164364d0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 17:16:00 +0100 Subject: [PATCH 04/13] Use None instead of the empty string Change how we validate the 'url' field as a result. --- synapse/appservice/api.py | 10 +++++----- synapse/config/appservice.py | 15 +++++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index ee63f06359..cc4af23962 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -67,7 +67,7 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_user(self, service, user_id): - if service.url == "": + if service.url is None: defer.returnValue(False) uri = service.url + ("/users/%s" % urllib.quote(user_id)) response = None @@ -88,7 +88,7 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def query_alias(self, service, alias): - if service.url == "": + if service.url is None: defer.returnValue(False) uri = service.url + ("/rooms/%s" % urllib.quote(alias)) response = None @@ -117,7 +117,7 @@ class ApplicationServiceApi(SimpleHttpClient): raise ValueError( "Unrecognised 'kind' argument %r to query_3pe()", kind ) - if service.url == "": + if service.url is None: defer.returnValue([]) uri = "%s%s/thirdparty/%s/%s" % ( @@ -151,7 +151,7 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue([]) def get_3pe_protocol(self, service, protocol): - if service.url == "": + if service.url is None: defer.returnValue({}) @defer.inlineCallbacks @@ -175,7 +175,7 @@ class ApplicationServiceApi(SimpleHttpClient): @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): - if service.url == "": + if service.url is None: defer.returnValue(True) events = self._serialize(events) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 5569c43b35..759209a85b 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -86,7 +86,7 @@ def load_appservices(hostname, config_files): def _load_appservice(hostname, as_info, config_filename): required_string_fields = [ - "id", "url", "as_token", "hs_token", "sender_localpart" + "id", "as_token", "hs_token", "sender_localpart" ] for field in required_string_fields: if not isinstance(as_info.get(field), basestring): @@ -94,6 +94,13 @@ def _load_appservice(hostname, as_info, config_filename): field, config_filename, )) + # 'url' must either be a string or explicitly null, not missing + # to avoid accidentally turning off push for ASes. + if not isinstance(as_info.get("url"), basestring) and as_info.get("url", "") is not None: + raise KeyError( + "Required string field or explicit null: 'url' (%s)" % (config_filename,) + ) + localpart = as_info["sender_localpart"] if urllib.quote(localpart) != localpart: raise ValueError( @@ -133,10 +140,10 @@ def _load_appservice(hostname, as_info, config_filename): if not isinstance(p, str): raise KeyError("Bad value for 'protocols' item") - if as_info["url"] == "": + if as_info["url"] == None: logger.info( - "(%s) Explicitly empty 'url' provided. This application service " + - "will not receive events or queries.", + "(%s) Explicitly empty 'url' provided. This application service" + " will not receive events or queries.", config_filename, ) return ApplicationService( From c88278353512d778f460d8223a2ff173323e8f94 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 30 Aug 2016 17:20:31 +0100 Subject: [PATCH 05/13] flake8 --- synapse/config/appservice.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 759209a85b..d7537e8d44 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -96,7 +96,8 @@ def _load_appservice(hostname, as_info, config_filename): # 'url' must either be a string or explicitly null, not missing # to avoid accidentally turning off push for ASes. - if not isinstance(as_info.get("url"), basestring) and as_info.get("url", "") is not None: + if (not isinstance(as_info.get("url"), basestring) and + as_info.get("url", "") is not None): raise KeyError( "Required string field or explicit null: 'url' (%s)" % (config_filename,) ) @@ -140,7 +141,7 @@ def _load_appservice(hostname, as_info, config_filename): if not isinstance(p, str): raise KeyError("Bad value for 'protocols' item") - if as_info["url"] == None: + if as_info["url"] is None: logger.info( "(%s) Explicitly empty 'url' provided. This application service" " will not receive events or queries.", From d80f64d370e2b7e7d56ef32bfa54c66fa1b1482a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Aug 2016 21:46:39 +0100 Subject: [PATCH 06/13] Fix email notifs by adding missing param --- synapse/push/mailer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 81b1af4a19..2cafcfd8f5 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -338,7 +338,7 @@ class Mailer(object): # want the generated-from-names one here otherwise we'll # end up with, "new message from Bob in the Bob room" room_name = yield calculate_room_name( - state_by_room[room_id], user_id, fallback_to_members=False + self.store, state_by_room[room_id], user_id, fallback_to_members=False ) my_member_event = state_by_room[room_id][("m.room.member", user_id)] From 1aa3e1d2874370fbb6fa9f4fd2b8a110d81981fc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 31 Aug 2016 10:38:58 +0100 Subject: [PATCH 07/13] Add a replication stream for direct to device messages --- synapse/replication/resource.py | 19 ++++++++- .../replication/slave/storage/deviceinbox.py | 12 ++++++ synapse/rest/client/v2_alpha/sendtodevice.py | 10 ++++- synapse/storage/deviceinbox.py | 39 +++++++++++++++++++ 4 files changed, 77 insertions(+), 3 deletions(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 84993b33b3..533676814a 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -42,6 +42,7 @@ STREAM_NAMES = ( ("pushers",), ("state",), ("caches",), + ("to_device",), ) @@ -144,6 +145,7 @@ class ReplicationResource(Resource): pushers_token, state_token, caches_token, + int(stream_token.to_device_key), )) @request_handler() @@ -193,6 +195,7 @@ class ReplicationResource(Resource): yield self.pushers(writer, current_token, limit, request_streams) yield self.state(writer, current_token, limit, request_streams) yield self.caches(writer, current_token, limit, request_streams) + yield self.to_device(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.info("Replicated %d rows", writer.total) @@ -398,6 +401,20 @@ class ReplicationResource(Resource): "position", "cache_func", "keys", "invalidation_ts" )) + @defer.inlineCallbacks + def to_device(self, writer, current_token, limit, request_streams): + current_position = current_token.to_device + + to_device = request_streams.get("to_device") + + if to_device is not None: + to_device_rows = yield self.store.get_all_new_device_messages( + to_device, current_position, limit + ) + writer.write_header_and_rows("to_device", to_device_rows, ( + "position", "user_id", "device_id", "message_json" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -426,7 +443,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers", "state", "caches", + "push_rules", "pushers", "state", "caches", "to_device", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 7583d23708..64d8eb2af1 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -28,3 +28,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore): get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ delete_messages_for_device = DataStore.delete_messages_for_device.__func__ + + def stream_positions(self): + result = super(SlavedDeviceInboxStore, self).stream_positions() + result["to_device"] = self._device_inbox_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("to_device") + if stream: + self._device_inbox_id_gen.advance(int(stream["position"])) + + return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 00533741af..7c0991ca55 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -40,6 +40,7 @@ class SendToDeviceRestServlet(servlet.RestServlet): self.hs = hs self.auth = hs.get_auth() self.store = hs.get_datastore() + self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id self.txns = HttpTransactionStore() @@ -71,9 +72,14 @@ class SendToDeviceRestServlet(servlet.RestServlet): } for device_id, message_content in by_device.items() } - local_messages[user_id] = messages_by_device + if messages_by_device: + local_messages[user_id] = messages_by_device - yield self.store.add_messages_to_device_inbox(local_messages) + stream_id = yield self.store.add_messages_to_device_inbox(local_messages) + + self.notifier.on_new_event( + "to_device", stream_id, users=local_messages.keys() + ) response = (200, {}) self.txns.store_client_transaction(request, txn_id, response) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index e682fe1bd1..27ed1004da 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -136,5 +136,44 @@ class DeviceInboxStore(SQLBaseStore): "delete_messages_for_device", delete_messages_for_device_txn ) + def get_all_new_device_messages(self, last_pos, current_pos, limit): + """ + Args: + last_pos(int): + current_pos(int): + limit(int): + Returns: + A deferred list of rows from the device inbox + """ + if last_pos == current_pos: + return defer.succeed([]) + + def get_all_new_device_messages_txn(txn): + sql = ( + "SELECT stream_id FROM device_inbox" + " WHERE ? < stream_id AND stream_id <= ?" + " GROUP BY stream_id" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_pos, current_pos, limit)) + stream_ids = txn.fetchall() + if not stream_ids: + return [] + max_stream_id_in_limit = stream_ids[-1] + + sql = ( + "SELECT stream_id, user_id, device_id, message_json" + " FROM device_inbox" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + ) + txn.execute(sql, (last_pos, max_stream_id_in_limit)) + return txn.fetchall() + + return self.runInteraction( + "get_all_new_device_messages", get_all_new_device_messages_txn + ) + def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() From a66225275896c00287e4f627ebbacd6f7b34e6ba Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 31 Aug 2016 10:42:52 +0100 Subject: [PATCH 08/13] Return the current stream position from add_messages_to_device_inbox --- synapse/storage/deviceinbox.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 27ed1004da..2fa0a218b9 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -33,7 +33,8 @@ class DeviceInboxStore(SQLBaseStore): messages_by_user_and_device(dict): Dictionary of user_id to device_id to message. Returns: - A deferred that resolves when the messages have been inserted. + A deferred stream_id that resolves when the messages have been + inserted. """ def select_devices_txn(txn, user_id, devices): @@ -81,6 +82,8 @@ class DeviceInboxStore(SQLBaseStore): stream_id ) + defer.returnValue(self._device_inbox_id_gen.get_current_token()) + def get_new_messages_for_device( self, user_id, device_id, current_stream_id, limit=100 ): From ef0cc648cfe74a13268c0df8fc679e0c4f29ecbe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 11:12:02 +0100 Subject: [PATCH 09/13] Clean up old sent transactions --- .../storage/schema/delta/34/sent_txn_purge.py | 32 +++++++++++++++++++ synapse/storage/transactions.py | 2 ++ 2 files changed, 34 insertions(+) create mode 100644 synapse/storage/schema/delta/34/sent_txn_purge.py diff --git a/synapse/storage/schema/delta/34/sent_txn_purge.py b/synapse/storage/schema/delta/34/sent_txn_purge.py new file mode 100644 index 0000000000..81948e3431 --- /dev/null +++ b/synapse/storage/schema/delta/34/sent_txn_purge.py @@ -0,0 +1,32 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute("TRUNCATE sent_transactions") + else: + cur.execute("DELETE FROM sent_transactions") + + cur.execute("CREATE INDEX sent_transactions_ts ON sent_transactions(ts)") + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 58d4de4f1d..1c588bd46b 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -387,8 +387,10 @@ class TransactionStore(SQLBaseStore): def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 + six_hours_ago = now - 6 * 60 * 60 * 1000 def _cleanup_transactions_txn(txn): txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,)) return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) From 5405351b147cb5e1b62651fd0c5ad95c5e0569e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 Aug 2016 16:19:36 +0100 Subject: [PATCH 10/13] Lower get_linearized_receipts_for_room cache size --- synapse/storage/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index ccc3811e84..9747a04a9a 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -145,7 +145,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000, tree=True) + @cachedInlineCallbacks(num_args=3, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. From 516a272aca16670961c3d337a152ce507371c0cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 10:55:02 +0100 Subject: [PATCH 11/13] Ensure we only return a validated pdu in get_pdu --- synapse/federation/federation_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f2b3aceb49..7212903eac 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -267,7 +267,7 @@ class FederationClient(FederationBase): pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) - pdu = None + signed_pdu = None for destination in destinations: now = self._clock.time_msec() last_attempt = pdu_attempts.get(destination, 0) @@ -297,7 +297,7 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - pdu = yield self._check_sigs_and_hashes([pdu])[0] + signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] break @@ -320,10 +320,10 @@ class FederationClient(FederationBase): ) continue - if self._get_pdu_cache is not None and pdu: - self._get_pdu_cache[event_id] = pdu + if self._get_pdu_cache is not None and signed_pdu: + self._get_pdu_cache[event_id] = signed_pdu - defer.returnValue(pdu) + defer.returnValue(signed_pdu) @defer.inlineCallbacks @log_function From 265d847ffd3f9b7a123974a7c1cafd2487720c96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 14:50:06 +0100 Subject: [PATCH 12/13] Fix typo in log line --- synapse/handlers/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d22adadc38..cf82a2336e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -230,7 +230,7 @@ class PresenceHandler(object): """ logger.info( "Performing _persist_unpersisted_changes. Persiting %d unpersisted changes", - len(self.user_to_current_state) + len(self.unpersisted_users_changes) ) unpersisted = self.unpersisted_users_changes From 051a9ea92134b3bf8dec27cedf2e8410c731dfc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 14:55:03 +0100 Subject: [PATCH 13/13] Linearize state resolution to help caches --- synapse/state.py | 115 ++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 56 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index b31bbcdbd2..cd792afed1 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.auth import AuthEventTypes from synapse.events.snapshot import EventContext +from synapse.util.async import Linearizer from collections import namedtuple @@ -84,6 +85,7 @@ class StateHandler(object): # dict of set of event_ids -> _StateCacheEntry. self._state_cache = None + self.resolve_linearizer = Linearizer() def start_caching(self): logger.debug("start_caching") @@ -283,69 +285,70 @@ class StateHandler(object): state_group=name, )) - if self._state_cache is not None: - cache = self._state_cache.get(group_names, None) - if cache: - defer.returnValue(cache) + with (yield self.resolve_linearizer.queue(group_names)): + if self._state_cache is not None: + cache = self._state_cache.get(group_names, None) + if cache: + defer.returnValue(cache) - logger.info( - "Resolving state for %s with %d groups", room_id, len(state_groups_ids) - ) - - state = {} - for st in state_groups_ids.values(): - for key, e_id in st.items(): - state.setdefault(key, set()).add(e_id) - - conflicted_state = { - k: list(v) - for k, v in state.items() - if len(v) > 1 - } - - if conflicted_state: - logger.info("Resolving conflicted state for %r", room_id) - state_map = yield self.store.get_events( - [e_id for st in state_groups_ids.values() for e_id in st.values()], - get_prev_content=False + logger.info( + "Resolving state for %s with %d groups", room_id, len(state_groups_ids) ) - state_sets = [ - [state_map[e_id] for key, e_id in st.items() if e_id in state_map] - for st in state_groups_ids.values() - ] - new_state, _ = self._resolve_events( - state_sets, event_type, state_key - ) - new_state = { - key: e.event_id for key, e in new_state.items() - } - else: - new_state = { - key: e_ids.pop() for key, e_ids in state.items() + + state = {} + for st in state_groups_ids.values(): + for key, e_id in st.items(): + state.setdefault(key, set()).add(e_id) + + conflicted_state = { + k: list(v) + for k, v in state.items() + if len(v) > 1 } - state_group = None - new_state_event_ids = frozenset(new_state.values()) - for sg, events in state_groups_ids.items(): - if new_state_event_ids == frozenset(e_id for e_id in events): - state_group = sg - break - if state_group is None: - # Worker instances don't have access to this method, but we want - # to set the state_group on the main instance to increase cache - # hits. - if hasattr(self.store, "get_next_state_group"): - state_group = self.store.get_next_state_group() + if conflicted_state: + logger.info("Resolving conflicted state for %r", room_id) + state_map = yield self.store.get_events( + [e_id for st in state_groups_ids.values() for e_id in st.values()], + get_prev_content=False + ) + state_sets = [ + [state_map[e_id] for key, e_id in st.items() if e_id in state_map] + for st in state_groups_ids.values() + ] + new_state, _ = self._resolve_events( + state_sets, event_type, state_key + ) + new_state = { + key: e.event_id for key, e in new_state.items() + } + else: + new_state = { + key: e_ids.pop() for key, e_ids in state.items() + } - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - ) + state_group = None + new_state_event_ids = frozenset(new_state.values()) + for sg, events in state_groups_ids.items(): + if new_state_event_ids == frozenset(e_id for e_id in events): + state_group = sg + break + if state_group is None: + # Worker instances don't have access to this method, but we want + # to set the state_group on the main instance to increase cache + # hits. + if hasattr(self.store, "get_next_state_group"): + state_group = self.store.get_next_state_group() - if self._state_cache is not None: - self._state_cache[group_names] = cache + cache = _StateCacheEntry( + state=new_state, + state_group=state_group, + ) - defer.returnValue(cache) + if self._state_cache is not None: + self._state_cache[group_names] = cache + + defer.returnValue(cache) def resolve_events(self, state_sets, event): logger.info(