forked from MirrorHub/synapse
Factor _get_missing_events_for_pdu out of _handle_new_pdu
This should be functionally identical: it just seeks to improve readability by reducing indentation.
This commit is contained in:
parent
45d173a59a
commit
3406333a58
1 changed files with 82 additions and 62 deletions
|
@ -574,68 +574,9 @@ class FederationServer(FederationBase):
|
||||||
pdu.room_id, len(prevs - seen),
|
pdu.room_id, len(prevs - seen),
|
||||||
)
|
)
|
||||||
|
|
||||||
# We recalculate seen, since it may have changed.
|
yield self._get_missing_events_for_pdu(
|
||||||
have_seen = yield self.store.have_events(prevs)
|
origin, pdu, prevs, min_depth
|
||||||
seen = set(have_seen.keys())
|
)
|
||||||
|
|
||||||
if prevs - seen:
|
|
||||||
latest = yield self.store.get_latest_event_ids_in_room(
|
|
||||||
pdu.room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# We add the prev events that we have seen to the latest
|
|
||||||
# list to ensure the remote server doesn't give them to us
|
|
||||||
latest = set(latest)
|
|
||||||
latest |= seen
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Missing %d events for room %r: %r...",
|
|
||||||
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: we set timeout to 10s to help workaround
|
|
||||||
# https://github.com/matrix-org/synapse/issues/1733.
|
|
||||||
# The reason is to avoid holding the linearizer lock
|
|
||||||
# whilst processing inbound /send transactions, causing
|
|
||||||
# FDs to stack up and block other inbound transactions
|
|
||||||
# which empirically can currently take up to 30 minutes.
|
|
||||||
#
|
|
||||||
# N.B. this explicitly disables retry attempts.
|
|
||||||
#
|
|
||||||
# N.B. this also increases our chances of falling back to
|
|
||||||
# fetching fresh state for the room if the missing event
|
|
||||||
# can't be found, which slightly reduces our security.
|
|
||||||
# it may also increase our DAG extremity count for the room,
|
|
||||||
# causing additional state resolution? See #1760.
|
|
||||||
# However, fetching state doesn't hold the linearizer lock
|
|
||||||
# apparently.
|
|
||||||
#
|
|
||||||
# see https://github.com/matrix-org/synapse/pull/1744
|
|
||||||
|
|
||||||
missing_events = yield self.get_missing_events(
|
|
||||||
origin,
|
|
||||||
pdu.room_id,
|
|
||||||
earliest_events_ids=list(latest),
|
|
||||||
latest_events=[pdu],
|
|
||||||
limit=10,
|
|
||||||
min_depth=min_depth,
|
|
||||||
timeout=10000,
|
|
||||||
)
|
|
||||||
|
|
||||||
# We want to sort these by depth so we process them and
|
|
||||||
# tell clients about them in order.
|
|
||||||
missing_events.sort(key=lambda x: x.depth)
|
|
||||||
|
|
||||||
for e in missing_events:
|
|
||||||
yield self._handle_new_pdu(
|
|
||||||
origin,
|
|
||||||
e,
|
|
||||||
get_missing=False
|
|
||||||
)
|
|
||||||
|
|
||||||
have_seen = yield self.store.have_events(
|
|
||||||
[ev for ev, _ in pdu.prev_events]
|
|
||||||
)
|
|
||||||
|
|
||||||
prevs = {e_id for e_id, _ in pdu.prev_events}
|
prevs = {e_id for e_id, _ in pdu.prev_events}
|
||||||
seen = set(have_seen.keys())
|
seen = set(have_seen.keys())
|
||||||
|
@ -667,6 +608,85 @@ class FederationServer(FederationBase):
|
||||||
auth_chain=auth_chain,
|
auth_chain=auth_chain,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
origin (str): Origin of the pdu. Will be called to get the missing events
|
||||||
|
pdu: received pdu
|
||||||
|
prevs (str[]): List of event ids which we are missing
|
||||||
|
min_depth (int): Minimum depth of events to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred<dict(str, str?)>: updated have_seen dictionary
|
||||||
|
"""
|
||||||
|
# We recalculate seen, since it may have changed.
|
||||||
|
have_seen = yield self.store.have_events(prevs)
|
||||||
|
seen = set(have_seen.keys())
|
||||||
|
|
||||||
|
if not prevs - seen:
|
||||||
|
# nothing left to do
|
||||||
|
defer.returnValue(have_seen)
|
||||||
|
|
||||||
|
latest = yield self.store.get_latest_event_ids_in_room(
|
||||||
|
pdu.room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# We add the prev events that we have seen to the latest
|
||||||
|
# list to ensure the remote server doesn't give them to us
|
||||||
|
latest = set(latest)
|
||||||
|
latest |= seen
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Missing %d events for room %r: %r...",
|
||||||
|
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: we set timeout to 10s to help workaround
|
||||||
|
# https://github.com/matrix-org/synapse/issues/1733.
|
||||||
|
# The reason is to avoid holding the linearizer lock
|
||||||
|
# whilst processing inbound /send transactions, causing
|
||||||
|
# FDs to stack up and block other inbound transactions
|
||||||
|
# which empirically can currently take up to 30 minutes.
|
||||||
|
#
|
||||||
|
# N.B. this explicitly disables retry attempts.
|
||||||
|
#
|
||||||
|
# N.B. this also increases our chances of falling back to
|
||||||
|
# fetching fresh state for the room if the missing event
|
||||||
|
# can't be found, which slightly reduces our security.
|
||||||
|
# it may also increase our DAG extremity count for the room,
|
||||||
|
# causing additional state resolution? See #1760.
|
||||||
|
# However, fetching state doesn't hold the linearizer lock
|
||||||
|
# apparently.
|
||||||
|
#
|
||||||
|
# see https://github.com/matrix-org/synapse/pull/1744
|
||||||
|
|
||||||
|
missing_events = yield self.get_missing_events(
|
||||||
|
origin,
|
||||||
|
pdu.room_id,
|
||||||
|
earliest_events_ids=list(latest),
|
||||||
|
latest_events=[pdu],
|
||||||
|
limit=10,
|
||||||
|
min_depth=min_depth,
|
||||||
|
timeout=10000,
|
||||||
|
)
|
||||||
|
|
||||||
|
# We want to sort these by depth so we process them and
|
||||||
|
# tell clients about them in order.
|
||||||
|
missing_events.sort(key=lambda x: x.depth)
|
||||||
|
|
||||||
|
for e in missing_events:
|
||||||
|
yield self._handle_new_pdu(
|
||||||
|
origin,
|
||||||
|
e,
|
||||||
|
get_missing=False
|
||||||
|
)
|
||||||
|
|
||||||
|
have_seen = yield self.store.have_events(
|
||||||
|
[ev for ev, _ in pdu.prev_events]
|
||||||
|
)
|
||||||
|
defer.returnValue(have_seen)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "<ReplicationLayer(%s)>" % self.server_name
|
return "<ReplicationLayer(%s)>" % self.server_name
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue