From 52b2318777ac334480316b8a8ac2778367dcf53d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 15:59:08 +0100 Subject: [PATCH 01/10] Clobber EDUs in send queue --- synapse/federation/federation_client.py | 8 +++-- synapse/federation/transaction_queue.py | 48 +++++++++++++++++++++++-- synapse/handlers/presence.py | 20 +++-------- synapse/handlers/receipts.py | 1 + synapse/handlers/typing.py | 1 + 5 files changed, 58 insertions(+), 20 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 78719eed25..3395c9e41e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -122,8 +122,12 @@ class FederationClient(FederationBase): pdu.event_id ) + def send_presence(self, destination, states): + if destination != self.server_name: + self._transaction_queue.enqueue_presence(destination, states) + @log_function - def send_edu(self, destination, edu_type, content): + def send_edu(self, destination, edu_type, content, key=None): edu = Edu( origin=self.server_name, destination=destination, @@ -134,7 +138,7 @@ class FederationClient(FederationBase): sent_edus_counter.inc() # TODO, add errback, etc. - self._transaction_queue.enqueue_edu(edu) + self._transaction_queue.enqueue_edu(edu, key=key) return defer.succeed(None) @log_function diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1ac569b305..bd2a04af9e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.handlers.presence import format_user_presence_state import synapse.metrics import logging @@ -69,13 +70,20 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} + self.pending_presence_by_dest = presence = {} + self.pending_edus_keyed_by_dest = edus_keyed = {} + metrics.register_callback( "pending_pdus", lambda: sum(map(len, pdus.values())), ) metrics.register_callback( "pending_edus", - lambda: sum(map(len, edus.values())), + lambda: ( + sum(map(len, edus.values())) + + sum(map(len, presence.values())) + + sum(map(len, edus_keyed.values())) + ), ) # destination -> list of tuple(failure, deferred) @@ -130,13 +138,25 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_edu(self, edu): + def enqueue_presence(self, destination, states): + self.pending_presence_by_dest.setdefault(destination, {}).update({ + state.user_id: state for state in states + }) + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + + def enqueue_edu(self, edu, key=None): destination = edu.destination if not self.can_send_to(destination): return - self.pending_edus_by_dest.setdefault(destination, []).append(edu) + if key: + self.pending_edus_keyed_by_dest.setdefault(destination, {})[key] = edu + else: + self.pending_edus_by_dest.setdefault(destination, []).append(edu) preserve_context_over_fn( self._attempt_new_transaction, destination @@ -190,8 +210,13 @@ class TransactionQueue(object): while True: pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_failures = self.pending_failures_by_dest.pop(destination, []) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + limiter = yield get_retry_limiter( destination, self.clock, @@ -203,6 +228,23 @@ class TransactionQueue(object): ) pending_edus.extend(device_message_edus) + logger.info("Sending presence: %r", pending_presence) + if pending_presence: + pending_edus.append( + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self.clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, + ) + ) if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 16dbddee03..a949e39bda 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -625,18 +625,8 @@ class PresenceHandler(object): Args: hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` """ - now = self.clock.time_msec() for host, states in hosts_to_states.items(): - self.federation.send_edu( - destination=host, - edu_type="m.presence", - content={ - "push": [ - _format_user_presence_state(state, now) - for state in states - ] - } - ) + self.federation.send_presence(host, states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -723,13 +713,13 @@ class PresenceHandler(object): defer.returnValue([ { "type": "m.presence", - "content": _format_user_presence_state(state, now), + "content": format_user_presence_state(state, now), } for state in updates ]) else: defer.returnValue([ - _format_user_presence_state(state, now) for state in updates + format_user_presence_state(state, now) for state in updates ]) @defer.inlineCallbacks @@ -988,7 +978,7 @@ def should_notify(old_state, new_state): return False -def _format_user_presence_state(state, now): +def format_user_presence_state(state, now): """Convert UserPresenceState to a format that can be sent down to clients and to other servers. """ @@ -1101,7 +1091,7 @@ class PresenceEventSource(object): defer.returnValue(([ { "type": "m.presence", - "content": _format_user_presence_state(s, now), + "content": format_user_presence_state(s, now), } for s in updates.values() if include_offline or s.state != PresenceState.OFFLINE diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 726f7308d2..e536a909d0 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -156,6 +156,7 @@ class ReceiptsHandler(BaseHandler): } }, }, + key=(room_id, receipt_type, user_id), ) @defer.inlineCallbacks diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b687957dd..0548b81c34 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -187,6 +187,7 @@ class TypingHandler(object): "user_id": user_id, "typing": typing, }, + key=(room_id, user_id), )) yield preserve_context_over_deferred( From a4339de9de417394b170b608491183374e1c09bf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 16:44:26 +0100 Subject: [PATCH 02/10] Correctly handle typing stream id resetting --- synapse/replication/resource.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 857bc9795c..299e9419a4 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -274,11 +274,18 @@ class ReplicationResource(Resource): @defer.inlineCallbacks def typing(self, writer, current_token, request_streams): - current_position = current_token.presence + current_position = current_token.typing request_typing = request_streams.get("typing") if request_typing is not None: + # If they have a higher token than current max, we can assume that + # they had been talking to a previous instance of the master. Since + # we reset the token on restart, the best (but hacky) thing we can + # do is to simply resend down all the typing notifications. + if request_typing > current_position: + request_typing = 0 + typing_rows = yield self.typing_handler.get_all_typing_updates( request_typing, current_position ) From 327425764e44ea299ea4d85859035f3052c7b8b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 17:13:30 +0100 Subject: [PATCH 03/10] Add edu.type as part of key. Remove debug logging --- synapse/federation/transaction_queue.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bd2a04af9e..4f8315e59d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -154,7 +154,9 @@ class TransactionQueue(object): return if key: - self.pending_edus_keyed_by_dest.setdefault(destination, {})[key] = edu + self.pending_edus_keyed_by_dest.setdefault( + destination, {} + )[(edu.type, key)] = edu else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) @@ -228,7 +230,6 @@ class TransactionQueue(object): ) pending_edus.extend(device_message_edus) - logger.info("Sending presence: %r", pending_presence) if pending_presence: pending_edus.append( Edu( From 464ffd1b5efd30e59ee3d0adef0fa1541130781f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 17:17:23 +0100 Subject: [PATCH 04/10] Comment --- synapse/federation/transaction_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4f8315e59d..1898e4b44b 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -70,6 +70,7 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} + # Presence needs to be separate as we send single aggragate EDUs self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} From 44330a21e91746999995f7491d5a5e502e8e5908 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 17:22:07 +0100 Subject: [PATCH 05/10] Comment --- synapse/app/synchrotron.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 07d3d047c6..dbaa48035d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -242,6 +242,9 @@ class SynchrotronTyping(object): self._room_typing = {} def stream_positions(self): + # We must update this typing token from the response of the previous + # sync. In particular, the stream id may "reset" back to zero/a low + # value which we *must* use for the next replication request. return {"typing": self._latest_room_serial} def process_replication(self, result): From af4701b311f60e6410d98ff8526ff16db5d22142 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 17:36:56 +0100 Subject: [PATCH 06/10] Fix incorrect attribute name --- synapse/federation/transaction_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1898e4b44b..f8ca93e4c3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -157,7 +157,7 @@ class TransactionQueue(object): if key: self.pending_edus_keyed_by_dest.setdefault( destination, {} - )[(edu.type, key)] = edu + )[(edu.edu_type, key)] = edu else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) From 29205e959639ce145070b75df70cc4424d6e258a Mon Sep 17 00:00:00 2001 From: Shell Turner Date: Fri, 9 Sep 2016 19:39:30 +0100 Subject: [PATCH 07/10] Conform better to the CAS protocol specification Redirect to CAS's /login endpoint properly, and don't require an element. Signed-off-by: Shell Turner --- synapse/rest/client/v1/login.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 6c0eec8fb3..345018a8fc 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -318,7 +318,7 @@ class CasRedirectServlet(ClientV1RestServlet): service_param = urllib.urlencode({ "service": "%s?%s" % (hs_redirect_url, client_redirect_url_param) }) - request.redirect("%s?%s" % (self.cas_server_url, service_param)) + request.redirect("%s/login?%s" % (self.cas_server_url, service_param)) finish_request(request) @@ -385,7 +385,7 @@ class CasTicketServlet(ClientV1RestServlet): def parse_cas_response(self, cas_response_body): user = None - attributes = None + attributes = {} try: root = ET.fromstring(cas_response_body) if not root.tag.endswith("serviceResponse"): @@ -395,7 +395,6 @@ class CasTicketServlet(ClientV1RestServlet): if child.tag.endswith("user"): user = child.text if child.tag.endswith("attributes"): - attributes = {} for attribute in child: # ElementTree library expands the namespace in # attribute tags to the full URL of the namespace. @@ -407,8 +406,6 @@ class CasTicketServlet(ClientV1RestServlet): attributes[tag] = attribute.text if user is None: raise Exception("CAS response does not contain user") - if attributes is None: - raise Exception("CAS response does not contain attributes") except Exception: logger.error("Error parsing CAS response", exc_info=1) raise LoginError(401, "Invalid CAS response", From 897d57bc58579b5dd253b3294f31bedd43edf0f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 10:05:07 +0100 Subject: [PATCH 08/10] Change state fetch query for postgres to be faster It turns out that postgres doesn't like doing a list of OR's and is about 1000x slower, so we just issue a query for each specific type seperately. --- synapse/storage/state.py | 52 ++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0cff0a0cda..f98d5d53ee 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -306,13 +306,6 @@ class StateStore(SQLBaseStore): defer.returnValue(results) def _get_state_groups_from_groups_txn(self, txn, groups, types=None): - if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) - else: - where_clause = "" - results = {group: {} for group in groups} if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is @@ -342,20 +335,43 @@ class StateStore(SQLBaseStore): WHERE state_group IN ( SELECT state_group FROM state ) - %s; - """) % (where_clause,) + %s + """) - for group in groups: - args = [group] - if types is not None: - args.extend([i for typ in types for i in typ]) + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) + for etype, state_key in types + ] + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] + for where_clause, where_args in clause_to_args: + for group in groups: + args = [group] + args.extend(where_args) + + txn.execute(sql % (where_clause,), args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] else: + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: From 54417999b692a8dd0f8f4edd62598c80835a4212 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Sep 2016 10:39:55 +0100 Subject: [PATCH 09/10] Revert "Add index to event_push_actions" --- synapse/storage/event_push_actions.py | 4 +--- .../delta/35/event_push_actions_index.sql | 18 ------------------ 2 files changed, 1 insertion(+), 21 deletions(-) delete mode 100644 synapse/storage/schema/delta/35/event_push_actions_index.sql diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a87d90741a..10e9305f7b 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -353,14 +353,12 @@ class EventPushActionsStore(SQLBaseStore): before_clause += " " before_clause += "AND epa.highlight = 1" - # NB. This assumes event_ids are globally unique since - # it makes the query easier to index sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," " epa.actions, epa.profile_tag, e.received_ts" " FROM event_push_actions epa, events e" - " WHERE epa.event_id = e.event_id" + " WHERE epa.room_id = e.room_id AND epa.event_id = e.event_id" " AND epa.user_id = ? %s" " ORDER BY epa.stream_ordering DESC" " LIMIT ?" diff --git a/synapse/storage/schema/delta/35/event_push_actions_index.sql b/synapse/storage/schema/delta/35/event_push_actions_index.sql deleted file mode 100644 index 4fc32c351a..0000000000 --- a/synapse/storage/schema/delta/35/event_push_actions_index.sql +++ /dev/null @@ -1,18 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - CREATE INDEX event_push_actions_user_id_highlight_stream_ordering on event_push_actions( - user_id, highlight, stream_ordering - ); From 31f85f9db9d78ce2e201e7adb7128131377b376d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 12 Sep 2016 11:00:26 +0100 Subject: [PATCH 10/10] Add comments to existing schema deltas that used "CREATE INDEX" directly --- synapse/storage/schema/delta/22/receipts_index.sql | 4 ++++ synapse/storage/schema/delta/28/events_room_stream.sql | 4 ++++ synapse/storage/schema/delta/28/public_roms_index.sql | 4 ++++ synapse/storage/schema/delta/28/receipts_user_id_index.sql | 4 ++++ synapse/storage/schema/delta/29/push_actions.sql | 4 ++++ synapse/storage/schema/delta/31/pushers_index.sql | 4 ++++ 6 files changed, 24 insertions(+) diff --git a/synapse/storage/schema/delta/22/receipts_index.sql b/synapse/storage/schema/delta/22/receipts_index.sql index 7bc061dff6..bfc0b3bcaa 100644 --- a/synapse/storage/schema/delta/22/receipts_index.sql +++ b/synapse/storage/schema/delta/22/receipts_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id ); diff --git a/synapse/storage/schema/delta/28/events_room_stream.sql b/synapse/storage/schema/delta/28/events_room_stream.sql index 200c35e6e2..36609475f1 100644 --- a/synapse/storage/schema/delta/28/events_room_stream.sql +++ b/synapse/storage/schema/delta/28/events_room_stream.sql @@ -13,4 +13,8 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX events_room_stream on events(room_id, stream_ordering); diff --git a/synapse/storage/schema/delta/28/public_roms_index.sql b/synapse/storage/schema/delta/28/public_roms_index.sql index ba62a974a4..6c1fd68c5b 100644 --- a/synapse/storage/schema/delta/28/public_roms_index.sql +++ b/synapse/storage/schema/delta/28/public_roms_index.sql @@ -13,4 +13,8 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX public_room_index on rooms(is_public); diff --git a/synapse/storage/schema/delta/28/receipts_user_id_index.sql b/synapse/storage/schema/delta/28/receipts_user_id_index.sql index 452a1b3c6c..cb84c69baa 100644 --- a/synapse/storage/schema/delta/28/receipts_user_id_index.sql +++ b/synapse/storage/schema/delta/28/receipts_user_id_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); diff --git a/synapse/storage/schema/delta/29/push_actions.sql b/synapse/storage/schema/delta/29/push_actions.sql index 7e7b09820a..84b21cf813 100644 --- a/synapse/storage/schema/delta/29/push_actions.sql +++ b/synapse/storage/schema/delta/29/push_actions.sql @@ -26,6 +26,10 @@ UPDATE event_push_actions SET stream_ordering = ( UPDATE event_push_actions SET notif = 1, highlight = 0; +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering ); diff --git a/synapse/storage/schema/delta/31/pushers_index.sql b/synapse/storage/schema/delta/31/pushers_index.sql index 9027bccc69..a82add88fd 100644 --- a/synapse/storage/schema/delta/31/pushers_index.sql +++ b/synapse/storage/schema/delta/31/pushers_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );