forked from MirrorHub/synapse
Persist auth events before the events that rely on them (#10771)
If we're persisting an event E which has auth_events A1, A2, then we ought to make sure that we correctly auth and persist A1 and A2, before we blindly accept E. This PR does part of that - it persists the auth events first - but it does not fully solve the problem, because we still don't check that the auth events weren't rejected.
This commit is contained in:
parent
857b000996
commit
5724883ac2
2 changed files with 66 additions and 36 deletions
1
changelog.d/10771.misc
Normal file
1
changelog.d/10771.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Clean up some of the federation event authentication code for clarity.
|
|
@ -818,7 +818,7 @@ class FederationEventHandler:
|
|||
missing_events = missing_desired_events | missing_auth_events
|
||||
logger.debug("Fetching %i events from remote", len(missing_events))
|
||||
await self._get_events_and_persist(
|
||||
destination=destination, room_id=room_id, events=missing_events
|
||||
destination=destination, room_id=room_id, event_ids=missing_events
|
||||
)
|
||||
|
||||
# we need to make sure we re-load from the database to get the rejected
|
||||
|
@ -1085,12 +1085,12 @@ class FederationEventHandler:
|
|||
)
|
||||
|
||||
async def _get_events_and_persist(
|
||||
self, destination: str, room_id: str, events: Iterable[str]
|
||||
self, destination: str, room_id: str, event_ids: Collection[str]
|
||||
) -> None:
|
||||
"""Fetch the given events from a server, and persist them as outliers.
|
||||
|
||||
This function *does not* recursively get missing auth events of the
|
||||
newly fetched events. Callers must include in the `events` argument
|
||||
newly fetched events. Callers must include in the `event_ids` argument
|
||||
any missing events from the auth chain.
|
||||
|
||||
Logs a warning if we can't find the given event.
|
||||
|
@ -1127,28 +1127,78 @@ class FederationEventHandler:
|
|||
e,
|
||||
)
|
||||
|
||||
await concurrently_execute(get_event, events, 5)
|
||||
await concurrently_execute(get_event, event_ids, 5)
|
||||
logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids))
|
||||
|
||||
# Make a map of auth events for each event. We do this after fetching
|
||||
# all the events as some of the events' auth events will be in the list
|
||||
# of requested events.
|
||||
# we now need to auth the events in an order which ensures that each event's
|
||||
# auth_events are authed before the event itself.
|
||||
#
|
||||
# XXX: it might be possible to kick this process off in parallel with fetching
|
||||
# the events.
|
||||
while event_map:
|
||||
# build a list of events whose auth events are not in the queue.
|
||||
roots = tuple(
|
||||
ev
|
||||
for ev in event_map.values()
|
||||
if not any(aid in event_map for aid in ev.auth_event_ids())
|
||||
)
|
||||
|
||||
auth_events = [
|
||||
aid
|
||||
for event in event_map.values()
|
||||
for aid in event.auth_event_ids()
|
||||
if aid not in event_map
|
||||
]
|
||||
if not roots:
|
||||
# if *none* of the remaining events are ready, that means
|
||||
# we have a loop. This either means a bug in our logic, or that
|
||||
# somebody has managed to create a loop (which requires finding a
|
||||
# hash collision in room v2 and later).
|
||||
logger.warning(
|
||||
"Loop found in auth events while fetching missing state/auth "
|
||||
"events: %s",
|
||||
shortstr(event_map.keys()),
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Persisting %i of %i remaining events", len(roots), len(event_map)
|
||||
)
|
||||
|
||||
await self._auth_and_persist_fetched_events(destination, room_id, roots)
|
||||
|
||||
for ev in roots:
|
||||
del event_map[ev.event_id]
|
||||
|
||||
async def _auth_and_persist_fetched_events(
|
||||
self, origin: str, room_id: str, fetched_events: Collection[EventBase]
|
||||
) -> None:
|
||||
"""Persist the events fetched by _get_events_and_persist.
|
||||
|
||||
The events should not depend on one another, e.g. this should be used to persist
|
||||
a bunch of outliers, but not a chunk of individual events that depend
|
||||
on each other for state calculations.
|
||||
|
||||
We also assume that all of the auth events for all of the events have already
|
||||
been persisted.
|
||||
|
||||
Notifies about the events where appropriate.
|
||||
|
||||
Params:
|
||||
origin: where the events came from
|
||||
room_id: the room that the events are meant to be in (though this has
|
||||
not yet been checked)
|
||||
event_id: map from event_id -> event for the fetched events
|
||||
"""
|
||||
# get all the auth events for all the events in this batch. By now, they should
|
||||
# have been persisted.
|
||||
auth_events = {
|
||||
aid for event in fetched_events for aid in event.auth_event_ids()
|
||||
}
|
||||
persisted_events = await self._store.get_events(
|
||||
auth_events,
|
||||
allow_rejected=True,
|
||||
)
|
||||
|
||||
event_infos = []
|
||||
for event in event_map.values():
|
||||
for event in fetched_events:
|
||||
auth = {}
|
||||
for auth_event_id in event.auth_event_ids():
|
||||
ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
|
||||
ae = persisted_events.get(auth_event_id)
|
||||
if ae:
|
||||
auth[(ae.type, ae.state_key)] = ae
|
||||
else:
|
||||
|
@ -1156,27 +1206,6 @@ class FederationEventHandler:
|
|||
|
||||
event_infos.append(_NewEventInfo(event, auth))
|
||||
|
||||
if event_infos:
|
||||
await self._auth_and_persist_events(
|
||||
destination,
|
||||
room_id,
|
||||
event_infos,
|
||||
)
|
||||
|
||||
async def _auth_and_persist_events(
|
||||
self,
|
||||
origin: str,
|
||||
room_id: str,
|
||||
event_infos: Collection[_NewEventInfo],
|
||||
) -> None:
|
||||
"""Creates the appropriate contexts and persists events. The events
|
||||
should not depend on one another, e.g. this should be used to persist
|
||||
a bunch of outliers, but not a chunk of individual events that depend
|
||||
on each other for state calculations.
|
||||
|
||||
Notifies about the events where appropriate.
|
||||
"""
|
||||
|
||||
if not event_infos:
|
||||
return
|
||||
|
||||
|
|
Loading…
Reference in a new issue