From f878e6f8af9e80cfa4be717c03cc4f9853a93794 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 5 Oct 2017 14:02:28 +0100 Subject: [PATCH 1/9] Spam checking: add the invitee to user_may_invite --- synapse/events/spamcheck.py | 4 ++-- synapse/handlers/federation.py | 12 +++++++----- synapse/handlers/room_member.py | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 595b1760f..dccc579ea 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -46,7 +46,7 @@ class SpamChecker(object): return self.spam_checker.check_event_for_spam(event) - def user_may_invite(self, userid, room_id): + def user_may_invite(self, inviter_userid, invitee_userid, room_id): """Checks if a given user may send an invite If this method returns false, the invite will be rejected. @@ -60,7 +60,7 @@ class SpamChecker(object): if self.spam_checker is None: return True - return self.spam_checker.user_may_invite(userid, room_id) + return self.spam_checker.user_may_invite(inviter_userid, invitee_userid, room_id) def user_may_create_room(self, userid): """Checks if a given user may create a room diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 737fe518e..8fccf8bab 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1071,6 +1071,9 @@ class FederationHandler(BaseHandler): """ event = pdu + if event.state_key is None: + raise SynapseError(400, "The invite event did not have a state key") + is_blocked = yield self.store.is_room_blocked(event.room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") @@ -1078,9 +1081,11 @@ class FederationHandler(BaseHandler): if self.hs.config.block_non_admin_invites: raise SynapseError(403, "This server does not accept room invites") - if not self.spam_checker.user_may_invite(event.sender, event.room_id): + if not self.spam_checker.user_may_invite( + event.sender, event.state_key, event.room_id, + ): raise SynapseError( - 403, "This user is not permitted to send invites to this server" + 403, "This user is not permitted to send invites to this server/user" ) membership = event.content.get("membership") @@ -1091,9 +1096,6 @@ class FederationHandler(BaseHandler): if sender_domain != origin: raise SynapseError(400, "The invite event was not from the server sending it") - if event.state_key is None: - raise SynapseError(400, "The invite event did not have a state key") - if not self.is_mine_id(event.state_key): raise SynapseError(400, "The invite event must be for this server") diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 37985fa1f..36a8ef8ce 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -225,7 +225,7 @@ class RoomMemberHandler(BaseHandler): block_invite = True if not self.spam_checker.user_may_invite( - requester.user.to_string(), room_id, + requester.user.to_string(), target.to_string(), room_id, ): logger.info("Blocking invite due to spam checker") block_invite = True From 3ddda939d35951896faa48631a3fe023e89e13e1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 5 Oct 2017 14:58:17 +0100 Subject: [PATCH 2/9] some comments in the state res code --- synapse/state.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/synapse/state.py b/synapse/state.py index 390799fbd..dcdcdef65 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -288,6 +288,9 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) + # map from state group id to the state in that state group (where + # 'state' is a map from state key to event id) + # dict[int, dict[(str, str), str]] state_groups_ids = yield self.store.get_state_groups_ids( room_id, event_ids ) @@ -320,11 +323,15 @@ class StateHandler(object): "Resolving state for %s with %d groups", room_id, len(state_groups_ids) ) + # build a map from state key to the event_ids which set that state. + # dict[(str, str), set[str]) state = {} for st in state_groups_ids.values(): for key, e_id in st.items(): state.setdefault(key, set()).add(e_id) + # build a map from state key to the event_ids which set that state, + # including only those where there are state keys in conflict. conflicted_state = { k: list(v) for k, v in state.items() @@ -494,8 +501,14 @@ def _resolve_with_state_fac(unconflicted_state, conflicted_state, logger.info("Asking for %d conflicted events", len(needed_events)) + # dict[str, FrozenEvent]: a map from state event id to event. Only includes + # the state events which are in conflict. state_map = yield state_map_factory(needed_events) + # get the ids of the auth events which allow us to authenticate the + # conflicted state, picking only from the unconflicting state. + # + # dict[(str, str), str]: a map from state key to event id auth_events = _create_auth_events_from_maps( unconflicted_state, conflicted_state, state_map ) From c8f568ddf9e8827d8971e14137663fa8df5b57d2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 22:14:24 +0100 Subject: [PATCH 3/9] Fix up deferred handling in federation.py * Avoid preserve_context_over_deferred, which is broken * set consumeErrors=True on defer.gatherResults, to avoid spurious "unhandled failure" erros --- synapse/handlers/federation.py | 45 ++++++++++++++++------------------ 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8fccf8bab..63c56a4a3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,7 +14,6 @@ # limitations under the License. """Contains handlers for federation events.""" -import synapse.util.logcontext from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -26,10 +25,7 @@ from synapse.api.errors import ( ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator -from synapse.util import unwrapFirstError -from synapse.util.logcontext import ( - preserve_fn, preserve_context_over_deferred -) +from synapse.util import unwrapFirstError, logcontext from synapse.util.metrics import measure_func from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor, Linearizer @@ -592,9 +588,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.replication_layer.get_pdu)( + logcontext.preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -786,10 +782,14 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") - states = yield preserve_context_over_deferred(defer.gatherResults([ - preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) - for e in event_ids - ])) + states = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.preserve_fn(self.state_handler.resolve_state_groups)( + room_id, [e] + ) + for e in event_ids + ], consumeErrors=True, + )) states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( @@ -942,9 +942,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( - room_queue - ) + logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) defer.returnValue(True) @@ -1438,7 +1436,7 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( event_stream_id, max_stream_id ) @@ -1451,16 +1449,16 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield preserve_context_over_deferred(defer.gatherResults( + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._prep_event)( + logcontext.preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) for ev_info in event_infos - ] + ], consumeErrors=True, )) yield self.store.persist_events( @@ -1768,18 +1766,17 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield preserve_context_over_deferred(defer.gatherResults( - [ - preserve_fn(self.store.get_event)( + different_events = yield logcontext.make_deferred_yieldable( + defer.gatherResults([ + logcontext.preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, ) for d in different_auth if d in have_events and not have_events[d] - ], - consumeErrors=True - )).addErrback(unwrapFirstError) + ], consumeErrors=True) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) From 148428ce763978583da2b1d3c435ec321df45855 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 22:24:28 +0100 Subject: [PATCH 4/9] Fix logcontext handling for concurrently_execute Avoid preserve_context_over_deferred, which is broken. --- synapse/util/async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/async.py b/synapse/util/async.py index 1453faf0e..bb252f75d 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from .logcontext import ( PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, ) -from synapse.util import unwrapFirstError +from synapse.util import logcontext, unwrapFirstError from contextlib import contextmanager @@ -155,7 +155,7 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return preserve_context_over_deferred(defer.gatherResults([ + return logcontext.make_deferred_yieldable(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) From 01bbacf3c49f4311fddb61ef1ff98ee4f55fc44b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 15:12:43 +0100 Subject: [PATCH 5/9] Fix up logcontext handling in (federation) TransactionQueue Avoid using preserve_context_over_function, which has problems with respect to logcontexts. --- synapse/federation/transaction_queue.py | 48 ++++++++++++++++--------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 003eaba89..7a3c9cbb7 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -20,8 +20,8 @@ from .persistence import TransactionActions from .units import Transaction, Edu from synapse.api.errors import HttpResponseException +from synapse.util import logcontext from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes @@ -231,11 +231,9 @@ class TransactionQueue(object): (pdu, order) ) - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) - @preserve_fn # the caller should not yield on this + @logcontext.preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): """Send the new presence states to the appropriate destinations. @@ -299,7 +297,7 @@ class TransactionQueue(object): state.user_id: state for state in states }) - preserve_fn(self._attempt_new_transaction)(destination) + self._attempt_new_transaction(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( @@ -321,9 +319,7 @@ class TransactionQueue(object): else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) def send_failure(self, failure, destination): if destination == self.server_name or destination == "localhost": @@ -336,9 +332,7 @@ class TransactionQueue(object): destination, [] ).append(failure) - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) def send_device_messages(self, destination): if destination == self.server_name or destination == "localhost": @@ -347,15 +341,24 @@ class TransactionQueue(object): if not self.can_send_to(destination): return - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) def get_current_token(self): return 0 - @defer.inlineCallbacks def _attempt_new_transaction(self, destination): + """Try to start a new transaction to this destination + + If there is already a transaction in progress to this destination, + returns immediately. Otherwise kicks off the process of sending a + transaction in the background. + + Args: + destination (str): + + Returns: + None + """ # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -368,6 +371,19 @@ class TransactionQueue(object): ) return + logger.debug("TX [%s] Starting transaction loop", destination) + + # Drop the logcontext before starting the transaction. It doesn't + # really make sense to log all the outbound transactions against + # whatever path led us to this point: that's pretty arbitrary really. + # + # (this also means we can fire off _perform_transaction without + # yielding) + with logcontext.PreserveLoggingContext(): + self._transaction_transmission_loop(destination) + + @defer.inlineCallbacks + def _transaction_transmission_loop(self, destination): pending_pdus = [] try: self.pending_transactions[destination] = 1 From a6e3222fe5abf5b65b53678d1208c4c58f97b391 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 14:24:06 +0100 Subject: [PATCH 6/9] Fed server: Move origin-check code to _handle_received_pdu The response-building code expects there to be an entry in the `results` list for each entry in the pdu_list, so the early `continue` was messing this up. That doesn't really matter, because all that the federation client does is log any errors, but it's pretty poor form. --- synapse/federation/federation_server.py | 48 ++++++++++++------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 51e3fdea0..e791a1266 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -143,30 +143,6 @@ class FederationServer(FederationBase): results = [] for pdu in pdu_list: - # check that it's actually being sent from a valid destination to - # workaround bug #1753 in 0.18.5 and 0.18.6 - if transaction.origin != get_domain_from_id(pdu.event_id): - # We continue to accept join events from any server; this is - # necessary for the federation join dance to work correctly. - # (When we join over federation, the "helper" server is - # responsible for sending out the join event, rather than the - # origin. See bug #1893). - if not ( - pdu.type == 'm.room.member' and - pdu.content and - pdu.content.get("membership", None) == 'join' - ): - logger.info( - "Discarding PDU %s from invalid origin %s", - pdu.event_id, transaction.origin - ) - continue - else: - logger.info( - "Accepting join PDU %s from %s", - pdu.event_id, transaction.origin - ) - try: yield self._handle_received_pdu(transaction.origin, pdu) results.append({}) @@ -520,6 +496,30 @@ class FederationServer(FederationBase): Returns (Deferred): completes with None Raises: FederationError if the signatures / hash do not match """ + # check that it's actually being sent from a valid destination to + # workaround bug #1753 in 0.18.5 and 0.18.6 + if origin != get_domain_from_id(pdu.event_id): + # We continue to accept join events from any server; this is + # necessary for the federation join dance to work correctly. + # (When we join over federation, the "helper" server is + # responsible for sending out the join event, rather than the + # origin. See bug #1893). + if not ( + pdu.type == 'm.room.member' and + pdu.content and + pdu.content.get("membership", None) == 'join' + ): + logger.info( + "Discarding PDU %s from invalid origin %s", + pdu.event_id, origin + ) + return + else: + logger.info( + "Accepting join PDU %s from %s", + pdu.event_id, origin + ) + # Check signature. try: pdu = yield self._check_sigs_and_hash(pdu) From ba5b9b80a56a449ffab44afaf4661d5b44277898 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 15:18:58 +0100 Subject: [PATCH 7/9] fed server: refactor on_incoming_transaction Move as much as possible to after the have_responded check, and reduce the number of times we iterate over the pdu list. --- synapse/federation/federation_server.py | 53 ++++++++++++++----------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e791a1266..fa4ec2ad3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -109,23 +109,12 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_incoming_transaction(self, transaction_data): + # keep this as early as possible to make the calculated origin ts as + # accurate as possible. + request_time = int(self._clock.time_msec()) + transaction = Transaction(**transaction_data) - received_pdus_counter.inc_by(len(transaction.pdus)) - - for p in transaction.pdus: - if "unsigned" in p: - unsigned = p["unsigned"] - if "age" in unsigned: - p["age"] = unsigned["age"] - if "age" in p: - p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) - del p["age"] - - pdu_list = [ - self.event_from_pdu_json(p) for p in transaction.pdus - ] - logger.debug("[%s] Got transaction", transaction.transaction_id) response = yield self.transaction_actions.have_responded(transaction) @@ -140,17 +129,35 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - results = [] + received_pdus_counter.inc_by(len(transaction.pdus)) + + pdu_list = [] + + for p in transaction.pdus: + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] + if "age" in p: + p["age_ts"] = request_time - int(p["age"]) + del p["age"] + + event = self.event_from_pdu_json(p) + pdu_list.append(event) + + pdu_results = {} for pdu in pdu_list: + event_id = pdu.event_id try: yield self._handle_received_pdu(transaction.origin, pdu) - results.append({}) + pdu_results[event_id] = {} except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) self.send_failure(e, transaction.origin) - results.append({"error": str(e)}) + pdu_results[event_id] = {"error": str(e)} except Exception as e: - results.append({"error": str(e)}) + pdu_results[event_id] = {"error": str(e)} logger.exception("Failed to handle PDU") if hasattr(transaction, "edus"): @@ -164,14 +171,12 @@ class FederationServer(FederationBase): for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) - logger.debug("Returning: %s", str(results)) - response = { - "pdus": dict(zip( - (p.event_id for p in pdu_list), results - )), + "pdus": pdu_results, } + logger.debug("Returning: %s", str(response)) + yield self.transaction_actions.set_response( transaction, 200, response From 4c7c4d4061ae298ce6df445c888b91d3e5791164 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 15:31:58 +0100 Subject: [PATCH 8/9] Fed server: use a linearizer for ongoing transactions We don't want to process the same transaction multiple times concurrently, so use a linearizer. --- synapse/federation/federation_server.py | 30 ++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fa4ec2ad3..b2dffa2c3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -53,6 +53,7 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self._server_linearizer = Linearizer("fed_server") + self._transaction_linearizer = Linearizer("fed_txn_handler") # We cache responses to state queries, as they take a while and often # come in waves. @@ -111,12 +112,39 @@ class FederationServer(FederationBase): def on_incoming_transaction(self, transaction_data): # keep this as early as possible to make the calculated origin ts as # accurate as possible. - request_time = int(self._clock.time_msec()) + request_time = self._clock.time_msec() transaction = Transaction(**transaction_data) + if not transaction.transaction_id: + raise Exception("Transaction missing transaction_id") + if not transaction.origin: + raise Exception("Transaction missing origin") + logger.debug("[%s] Got transaction", transaction.transaction_id) + # use a linearizer to ensure that we don't process the same transaction + # multiple times in parallel. + with (yield self._transaction_linearizer.queue( + (transaction.origin, transaction.transaction_id), + )): + result = yield self._handle_incoming_transaction( + transaction, request_time, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _handle_incoming_transaction(self, transaction, request_time): + """ Process an incoming transaction and return the HTTP response + + Args: + transaction (Transaction): incoming transaction + request_time (int): timestamp that the HTTP request arrived at + + Returns: + Deferred[(int, object)]: http response code and body + """ response = yield self.transaction_actions.have_responded(transaction) if response: From 6a6cc27aee16ee045b6909d2c401a9d4f6e54324 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 16:07:20 +0100 Subject: [PATCH 9/9] fed server: process PDUs for different rooms in parallel With luck, this will give a real-time improvement when there are many rooms and the server ends up calling out to fetch missing events. --- synapse/federation/federation_server.py | 53 ++++++++++++++++--------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b2dffa2c3..f00d59e70 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -12,14 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - from twisted.internet import defer from .federation_base import FederationBase from .units import Transaction, Edu -from synapse.util.async import Linearizer +from synapse.util import async from synapse.util.logutils import log_function from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent @@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature import simplejson as json import logging +# when processing incoming transactions, we try to handle multiple rooms in +# parallel, up to this limit. +TRANSACTION_CONCURRENCY_LIMIT = 10 logger = logging.getLogger(__name__) @@ -52,8 +53,8 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() - self._server_linearizer = Linearizer("fed_server") - self._transaction_linearizer = Linearizer("fed_txn_handler") + self._server_linearizer = async.Linearizer("fed_server") + self._transaction_linearizer = async.Linearizer("fed_txn_handler") # We cache responses to state queries, as they take a while and often # come in waves. @@ -159,7 +160,7 @@ class FederationServer(FederationBase): received_pdus_counter.inc_by(len(transaction.pdus)) - pdu_list = [] + pdus_by_room = {} for p in transaction.pdus: if "unsigned" in p: @@ -171,22 +172,36 @@ class FederationServer(FederationBase): del p["age"] event = self.event_from_pdu_json(p) - pdu_list.append(event) + room_id = event.room_id + pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} - for pdu in pdu_list: - event_id = pdu.event_id - try: - yield self._handle_received_pdu(transaction.origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warn("Error handling PDU %s: %s", event_id, e) - self.send_failure(e, transaction.origin) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - pdu_results[event_id] = {"error": str(e)} - logger.exception("Failed to handle PDU") + # we can process different rooms in parallel (which is useful if they + # require callouts to other servers to fetch missing events), but + # impose a limit to avoid going too crazy with ram/cpu. + @defer.inlineCallbacks + def process_pdus_for_room(room_id): + logger.debug("Processing PDUs for %s", room_id) + for pdu in pdus_by_room[room_id]: + event_id = pdu.event_id + try: + yield self._handle_received_pdu( + transaction.origin, pdu + ) + pdu_results[event_id] = {} + except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) + self.send_failure(e, transaction.origin) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + pdu_results[event_id] = {"error": str(e)} + logger.exception("Failed to handle PDU %s", event_id) + + yield async.concurrently_execute( + process_pdus_for_room, pdus_by_room.keys(), + TRANSACTION_CONCURRENCY_LIMIT, + ) if hasattr(transaction, "edus"): for edu in (Edu(**x) for x in transaction.edus):