From b6b67715edc496ea95fd8005ee5b5685adcf2601 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 4 Jan 2017 13:34:35 +0000 Subject: [PATCH 1/6] Send ALL membership events to the server that was affected. Send all membership changes to the server that was affected. This ensures that if the last member of a room on a server was kicked or banned they get told about it. --- synapse/federation/transaction_queue.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 51b656d74..b00dda290 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,7 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn @@ -161,9 +161,11 @@ class TransactionQueue(object): get_domain_from_id(user_id) for user_id in users_in_room ) + # Send all membership changes to the server that was affected. + # This ensures that if the last member of a room on a server + # was kicked or banned they get told about it. if event.type == EventTypes.Member: - if event.content["membership"] == Membership.JOIN: - destinations.add(get_domain_from_id(event.state_key)) + destinations.add(get_domain_from_id(event.state_key)) logger.debug("Sending %s to %r", event, destinations) From e02bdaf08ba12a5c1ef15d721a7505ffd870c608 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 4 Jan 2017 15:13:40 +0000 Subject: [PATCH 2/6] Get the destinations from the state from before the event Rather than the state after then event. --- synapse/federation/transaction_queue.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index b00dda290..891c245bb 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,7 +19,6 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu -from synapse.api.constants import EventTypes from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn @@ -153,20 +152,22 @@ class TransactionQueue(object): break for event in events: + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. users_in_room = yield self.state.get_current_user_in_room( - event.room_id, latest_event_ids=[event.event_id], + event.room_id, latest_event_ids=[ + prev_id for prev_id, _ in event.prev_events + ], ) destinations = set( get_domain_from_id(user_id) for user_id in users_in_room ) - # Send all membership changes to the server that was affected. - # This ensures that if the last member of a room on a server - # was kicked or banned they get told about it. - if event.type == EventTypes.Member: - destinations.add(get_domain_from_id(event.state_key)) - logger.debug("Sending %s to %r", event, destinations) self._send_pdu(event, destinations) From f784980d2b2bd3827bfef94b0360582b2ef228ba Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 5 Jan 2017 11:26:30 +0000 Subject: [PATCH 3/6] Only send events that originate on this server. Or events that are sent via the federation "send_join" API. This should match the behaviour from before v0.18.5 and #1635 landed. --- synapse/events/__init__.py | 9 +++++++++ synapse/federation/transaction_queue.py | 12 ++++++++++++ synapse/handlers/federation.py | 4 ++++ 3 files changed, 25 insertions(+) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bcb8f33a5..8c71aeb5e 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -36,6 +36,15 @@ class _EventInternalMetadata(object): def is_invite_from_remote(self): return getattr(self, "invite_from_remote", False) + def get_send_on_behalf_of(self): + """Whether this server should send the event on behalf of another server. + This is used by the federation "send_join" API to forward the initial join + event for a server in the room. + + returns a str with the name of the server this event is sent on behalf of. + """ + return getattr(self, "get_send_on_behalf_of", None) + def _event_dict_property(key): def getter(self): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 891c245bb..7db7b806d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -61,6 +61,7 @@ class TransactionQueue(object): self.transport_layer = hs.get_federation_transport_client() self.clock = hs.get_clock() + self.is_mine_id = hs.is_mine_id # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -152,6 +153,12 @@ class TransactionQueue(object): break for event in events: + # Only send events for this server. + send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() + is_mine = self.is_mine_id(event.event_id) + if not is_mine and send_on_behalf_of is None: + continue + # Get the state from before the event. # We need to make sure that this is the state from before # the event and not from after it. @@ -167,6 +174,11 @@ class TransactionQueue(object): destinations = set( get_domain_from_id(user_id) for user_id in users_in_room ) + if send_on_behalf_of is not None: + # If we are sending the event on behalf of another server + # then it already has the event and there is no reason to + # send the event to it. + destinations.discard(send_on_behalf_of) logger.debug("Sending %s to %r", event, destinations) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1d07e4d02..8c93d6d39 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -790,6 +790,10 @@ class FederationHandler(BaseHandler): ) event.internal_metadata.outlier = False + # Send this event on behalf of the origin server since they may not + # have an up to data view of the state of the room at this event so + # will not know which servers to send the event to. + event.internal_metadata.send_on_behalf_of = origin context, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event From eedf400d05ba72c2c21b55a64f67104af54e90bd Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sat, 31 Dec 2016 15:21:37 +0000 Subject: [PATCH 4/6] limit total timeout for get_missing_events to 10s --- synapse/federation/federation_client.py | 4 +++- synapse/federation/federation_server.py | 5 +++++ synapse/federation/transport/client.py | 5 +++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6851f2376..b4bcec77e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -707,7 +707,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def get_missing_events(self, destination, room_id, earliest_events_ids, - latest_events, limit, min_depth): + latest_events, limit, min_depth, timeout): """Tries to fetch events we are missing. This is called when we receive an event without having received all of its ancestors. @@ -721,6 +721,7 @@ class FederationClient(FederationBase): have all previous events for. limit (int): Maximum number of events to return. min_depth (int): Minimum depth of events tor return. + timeout (int): Max time to wait in ms """ try: content = yield self.transport_layer.get_missing_events( @@ -730,6 +731,7 @@ class FederationClient(FederationBase): latest_events=[e.event_id for e in latest_events], limit=limit, min_depth=min_depth, + timeout=timeout, ) events = [ diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index f4c60e67e..6d76e6f91 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -425,6 +425,7 @@ class FederationServer(FederationBase): " limit: %d, min_depth: %d", earliest_events, latest_events, limit, min_depth ) + missing_events = yield self.handler.on_get_missing_events( origin, room_id, earliest_events, latest_events, limit, min_depth ) @@ -567,6 +568,9 @@ class FederationServer(FederationBase): len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] ) + # XXX: we set timeout to 10s to help workaround + # https://github.com/matrix-org/synapse/issues/1733 + missing_events = yield self.get_missing_events( origin, pdu.room_id, @@ -574,6 +578,7 @@ class FederationServer(FederationBase): latest_events=[pdu], limit=10, min_depth=min_depth, + timeout=10000, ) # We want to sort these by depth so we process them and diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 491cdc29e..915af3440 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -386,7 +386,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def get_missing_events(self, destination, room_id, earliest_events, - latest_events, limit, min_depth): + latest_events, limit, min_depth, timeout): path = PREFIX + "/get_missing_events/%s" % (room_id,) content = yield self.client.post_json( @@ -397,7 +397,8 @@ class TransportLayerClient(object): "min_depth": int(min_depth), "earliest_events": earliest_events, "latest_events": latest_events, - } + }, + timeout=timeout, ) defer.returnValue(content) From 468749c9fca61fabc9dc5da5521ead84b4825783 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 5 Jan 2017 11:44:44 +0000 Subject: [PATCH 5/6] fix comment --- synapse/federation/federation_server.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 6d76e6f91..800f04189 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -569,7 +569,23 @@ class FederationServer(FederationBase): ) # XXX: we set timeout to 10s to help workaround - # https://github.com/matrix-org/synapse/issues/1733 + # https://github.com/matrix-org/synapse/issues/1733. + # The reason is to avoid holding the linearizer lock + # whilst processing inbound /send transactions, causing + # FDs to stack up and block other inbound transactions + # which empirically can currently take up to 30 minutes. + # + # N.B. this explicitly disables retry attempts. + # + # N.B. this also increases our chances of falling back to + # fetching fresh state for the room if the missing event + # can't be found, which slightly reduces our security. + # it may also increase our DAG extremity count for the room, + # causing additional state resolution? See #1760. + # However, fetching state doesn't hold the linearizer lock + # apparently. + # + # see https://github.com/matrix-org/synapse/pull/1744 missing_events = yield self.get_missing_events( origin, From dd3df11c5524441db3c9dd638ea59432d6f3d118 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 5 Jan 2017 12:32:47 +0000 Subject: [PATCH 6/6] More logging for the linearizer and for get_events --- synapse/storage/events.py | 2 ++ synapse/util/async.py | 13 +++++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ecb79c07e..04dbdac3f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1084,8 +1084,10 @@ class EventsStore(SQLBaseStore): self._do_fetch ) + logger.info("Loading %d events", len(events)) with PreserveLoggingContext(): rows = yield events_d + logger.info("Loaded %d events (%d rows)", len(events), len(rows)) if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] diff --git a/synapse/util/async.py b/synapse/util/async.py index 4280455cb..83875edc8 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -166,7 +166,11 @@ class Linearizer(object): # do some work. """ - def __init__(self): + def __init__(self, name=None): + if name is None: + self.name = id(self) + else: + self.name = name self.key_to_defer = {} @defer.inlineCallbacks @@ -185,15 +189,20 @@ class Linearizer(object): self.key_to_defer[key] = new_defer if current_defer: - logger.info("Waiting to acquire linearizer lock for key %r", key) + logger.info( + "Waiting to acquire linearizer lock %r for key %r", self.name, key + ) with PreserveLoggingContext(): yield current_defer + logger.info("Acquired linearizer lock %r for key %r", self.name, key) + @contextmanager def _ctx_manager(): try: yield finally: + logger.info("Releasing linearizer lock %r for key %r", self.name, key) new_defer.callback(None) current_d = self.key_to_defer.get(key) if current_d is new_defer: