forked from MirrorHub/synapse
Merge pull request #2517 from matrix-org/rav/fed_server_refactor
fed server: refactor on_incoming_transaction
This commit is contained in:
commit
4d24becf7f
1 changed files with 29 additions and 24 deletions
|
@ -109,23 +109,12 @@ class FederationServer(FederationBase):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_incoming_transaction(self, transaction_data):
|
||||
# keep this as early as possible to make the calculated origin ts as
|
||||
# accurate as possible.
|
||||
request_time = int(self._clock.time_msec())
|
||||
|
||||
transaction = Transaction(**transaction_data)
|
||||
|
||||
received_pdus_counter.inc_by(len(transaction.pdus))
|
||||
|
||||
for p in transaction.pdus:
|
||||
if "unsigned" in p:
|
||||
unsigned = p["unsigned"]
|
||||
if "age" in unsigned:
|
||||
p["age"] = unsigned["age"]
|
||||
if "age" in p:
|
||||
p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
|
||||
del p["age"]
|
||||
|
||||
pdu_list = [
|
||||
self.event_from_pdu_json(p) for p in transaction.pdus
|
||||
]
|
||||
|
||||
logger.debug("[%s] Got transaction", transaction.transaction_id)
|
||||
|
||||
response = yield self.transaction_actions.have_responded(transaction)
|
||||
|
@ -140,17 +129,35 @@ class FederationServer(FederationBase):
|
|||
|
||||
logger.debug("[%s] Transaction is new", transaction.transaction_id)
|
||||
|
||||
results = []
|
||||
received_pdus_counter.inc_by(len(transaction.pdus))
|
||||
|
||||
pdu_list = []
|
||||
|
||||
for p in transaction.pdus:
|
||||
if "unsigned" in p:
|
||||
unsigned = p["unsigned"]
|
||||
if "age" in unsigned:
|
||||
p["age"] = unsigned["age"]
|
||||
if "age" in p:
|
||||
p["age_ts"] = request_time - int(p["age"])
|
||||
del p["age"]
|
||||
|
||||
event = self.event_from_pdu_json(p)
|
||||
pdu_list.append(event)
|
||||
|
||||
pdu_results = {}
|
||||
|
||||
for pdu in pdu_list:
|
||||
event_id = pdu.event_id
|
||||
try:
|
||||
yield self._handle_received_pdu(transaction.origin, pdu)
|
||||
results.append({})
|
||||
pdu_results[event_id] = {}
|
||||
except FederationError as e:
|
||||
logger.warn("Error handling PDU %s: %s", event_id, e)
|
||||
self.send_failure(e, transaction.origin)
|
||||
results.append({"error": str(e)})
|
||||
pdu_results[event_id] = {"error": str(e)}
|
||||
except Exception as e:
|
||||
results.append({"error": str(e)})
|
||||
pdu_results[event_id] = {"error": str(e)}
|
||||
logger.exception("Failed to handle PDU")
|
||||
|
||||
if hasattr(transaction, "edus"):
|
||||
|
@ -164,14 +171,12 @@ class FederationServer(FederationBase):
|
|||
for failure in getattr(transaction, "pdu_failures", []):
|
||||
logger.info("Got failure %r", failure)
|
||||
|
||||
logger.debug("Returning: %s", str(results))
|
||||
|
||||
response = {
|
||||
"pdus": dict(zip(
|
||||
(p.event_id for p in pdu_list), results
|
||||
)),
|
||||
"pdus": pdu_results,
|
||||
}
|
||||
|
||||
logger.debug("Returning: %s", str(response))
|
||||
|
||||
yield self.transaction_actions.set_response(
|
||||
transaction,
|
||||
200, response
|
||||
|
|
Loading…
Reference in a new issue