forked from MirrorHub/synapse
Merge pull request #3924 from matrix-org/rav/clean_up_on_receive_pdu
Comments and interface cleanup for on_receive_pdu
This commit is contained in:
commit
c2185f14d7
3 changed files with 48 additions and 24 deletions
1
changelog.d/3924.misc
Normal file
1
changelog.d/3924.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Comments and interface cleanup for on_receive_pdu
|
|
@ -618,7 +618,7 @@ class FederationServer(FederationBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.handler.on_receive_pdu(
|
yield self.handler.on_receive_pdu(
|
||||||
origin, pdu, get_missing=True, sent_to_us_directly=True,
|
origin, pdu, sent_to_us_directly=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
|
|
@ -136,7 +136,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_receive_pdu(
|
def on_receive_pdu(
|
||||||
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
|
self, origin, pdu, sent_to_us_directly=False,
|
||||||
):
|
):
|
||||||
""" Process a PDU received via a federation /send/ transaction, or
|
""" Process a PDU received via a federation /send/ transaction, or
|
||||||
via backfill of missing prev_events
|
via backfill of missing prev_events
|
||||||
|
@ -145,7 +145,8 @@ class FederationHandler(BaseHandler):
|
||||||
origin (str): server which initiated the /send/ transaction. Will
|
origin (str): server which initiated the /send/ transaction. Will
|
||||||
be used to fetch missing events or state.
|
be used to fetch missing events or state.
|
||||||
pdu (FrozenEvent): received PDU
|
pdu (FrozenEvent): received PDU
|
||||||
get_missing (bool): True if we should fetch missing prev_events
|
sent_to_us_directly (bool): True if this event was pushed to us; False if
|
||||||
|
we pulled it as the result of a missing prev_event.
|
||||||
|
|
||||||
Returns (Deferred): completes with None
|
Returns (Deferred): completes with None
|
||||||
"""
|
"""
|
||||||
|
@ -250,7 +251,7 @@ class FederationHandler(BaseHandler):
|
||||||
pdu.internal_metadata.outlier = True
|
pdu.internal_metadata.outlier = True
|
||||||
elif min_depth and pdu.depth > min_depth:
|
elif min_depth and pdu.depth > min_depth:
|
||||||
missing_prevs = prevs - seen
|
missing_prevs = prevs - seen
|
||||||
if get_missing and missing_prevs:
|
if sent_to_us_directly and missing_prevs:
|
||||||
# If we're missing stuff, ensure we only fetch stuff one
|
# If we're missing stuff, ensure we only fetch stuff one
|
||||||
# at a time.
|
# at a time.
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -282,24 +283,46 @@ class FederationHandler(BaseHandler):
|
||||||
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
|
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
|
||||||
)
|
)
|
||||||
|
|
||||||
if sent_to_us_directly and prevs - seen:
|
if prevs - seen:
|
||||||
# If they have sent it to us directly, and the server
|
# We've still not been able to get all of the prev_events for this event.
|
||||||
# isn't telling us about the auth events that it's
|
#
|
||||||
# made a message referencing, we explode
|
# In this case, we need to fall back to asking another server in the
|
||||||
logger.warn(
|
# federation for the state at this event. That's ok provided we then
|
||||||
"[%s %s] Failed to fetch %d prev events: rejecting",
|
# resolve the state against other bits of the DAG before using it (which
|
||||||
room_id, event_id, len(prevs - seen),
|
# will ensure that you can't just take over a room by sending an event,
|
||||||
)
|
# withholding its prev_events, and declaring yourself to be an admin in
|
||||||
raise FederationError(
|
# the subsequent state request).
|
||||||
"ERROR",
|
#
|
||||||
403,
|
# Now, if we're pulling this event as a missing prev_event, then clearly
|
||||||
(
|
# this event is not going to become the only forward-extremity and we are
|
||||||
"Your server isn't divulging details about prev_events "
|
# guaranteed to resolve its state against our existing forward
|
||||||
"referenced in this event."
|
# extremities, so that should be fine.
|
||||||
),
|
#
|
||||||
affected=pdu.event_id,
|
# On the other hand, if this event was pushed to us, it is possible for
|
||||||
)
|
# it to become the only forward-extremity in the room, and we would then
|
||||||
elif prevs - seen:
|
# trust its state to be the state for the whole room. This is very bad.
|
||||||
|
# Further, if the event was pushed to us, there is no excuse for us not to
|
||||||
|
# have all the prev_events. We therefore reject any such events.
|
||||||
|
#
|
||||||
|
# XXX this really feels like it could/should be merged with the above,
|
||||||
|
# but there is an interaction with min_depth that I'm not really
|
||||||
|
# following.
|
||||||
|
|
||||||
|
if sent_to_us_directly:
|
||||||
|
logger.warn(
|
||||||
|
"[%s %s] Failed to fetch %d prev events: rejecting",
|
||||||
|
room_id, event_id, len(prevs - seen),
|
||||||
|
)
|
||||||
|
raise FederationError(
|
||||||
|
"ERROR",
|
||||||
|
403,
|
||||||
|
(
|
||||||
|
"Your server isn't divulging details about prev_events "
|
||||||
|
"referenced in this event."
|
||||||
|
),
|
||||||
|
affected=pdu.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
# Calculate the state of the previous events, and
|
# Calculate the state of the previous events, and
|
||||||
# de-conflict them to find the current state.
|
# de-conflict them to find the current state.
|
||||||
state_groups = []
|
state_groups = []
|
||||||
|
@ -464,7 +487,7 @@ class FederationHandler(BaseHandler):
|
||||||
yield self.on_receive_pdu(
|
yield self.on_receive_pdu(
|
||||||
origin,
|
origin,
|
||||||
ev,
|
ev,
|
||||||
get_missing=False
|
sent_to_us_directly=False,
|
||||||
)
|
)
|
||||||
except FederationError as e:
|
except FederationError as e:
|
||||||
if e.code == 403:
|
if e.code == 403:
|
||||||
|
@ -1112,7 +1135,7 @@ class FederationHandler(BaseHandler):
|
||||||
try:
|
try:
|
||||||
logger.info("Processing queued PDU %s which was received "
|
logger.info("Processing queued PDU %s which was received "
|
||||||
"while we were joining %s", p.event_id, p.room_id)
|
"while we were joining %s", p.event_id, p.room_id)
|
||||||
yield self.on_receive_pdu(origin, p)
|
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Error handling queued PDU %s from %s: %s",
|
"Error handling queued PDU %s from %s: %s",
|
||||||
|
|
Loading…
Reference in a new issue