Remove topological sort when receiving backfill events

See https://github.com/matrix-org/synapse/pull/11114#discussion_r741517138
This commit is contained in:
Eric Eastwood 2021-11-03 01:13:19 -05:00
parent f3b7b3e882
commit 7f2105ad06

View file

@ -648,94 +648,6 @@ class FederationEventHandler:
logger.info("Got %d prev_events", len(missing_events))
await self._process_pulled_events(origin, missing_events, backfilled=False)
async def generateEventIdGraphFromEvents(
self, events: Iterable[EventBase]
) -> Dict[str, Iterable[str]]:
event_map = {event.event_id: event for event in events}
# Since the insertion event we try to reference later on might be in the
# backfill chunk itself, we need to make it easy to lookup. Maps a given
# batch_id to the insertion event.
batch_id_map = {
event.content.get(
EventContentFields.MSC2716_NEXT_BATCH_ID, None
): event.event_id
for event in events
if event.type == EventTypes.MSC2716_INSERTION
}
# Map a given event to it's successors (backwards prev_events)
successor_event_id_map = {}
for event in events:
for prev_event_id in event.prev_event_ids():
successor_event_id_map.setdefault(prev_event_id, []).append(
event.event_id
)
event_id_graph = {}
for event in events:
# Assign the real edges to the graph.
# Make a copy so we don't modify the actual prev_events when we extend them below.
event_id_graph.setdefault(event.event_id, []).extend(
event.prev_event_ids().copy()
)
# We need to make some fake edge connections from the batch event at
# the bottom of the historical batch to the insertion event. This
# way the historical batch topologically sorts in ahead-in-time of
# the event we branched off of.
batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID, None)
if event.type == EventTypes.MSC2716_BATCH and batch_id:
# Maybe we can get lucky and save ourselves a lookup
# by checking the events in the backfill first
insertion_event_id = batch_id_map[
batch_id
] or await self._store.get_insertion_event_id_by_batch_id(
event.room_id, batch_id
)
if insertion_event_id:
# Connect the insertion event via a fake edge pointing to the
# batch event so the historical batch topologically sorts
# behind-in-time the insertion event.
event_id_graph.setdefault(insertion_event_id, []).append(
event.event_id
)
# Maybe we can get lucky and save ourselves a lookup
# by checking the events in the backfill first
insertion_event = event_map[
insertion_event_id
] or await self._store.get_event(
insertion_event_id, allow_none=True
)
if insertion_event:
# Connect the insertion events' `prev_event` successors
# via fake edges pointing to the insertion event itself
# so the insertion event sorts topologically
# behind-in-time the successor. Nestled perfectly
# between the prev_event and the successor.
for insertion_prev_event_id in insertion_event.prev_event_ids():
successor_event_ids = successor_event_id_map[
insertion_prev_event_id
]
if successor_event_ids:
for successor_event_id in successor_event_ids:
# Don't add itself back as a successor
if successor_event_id != insertion_event_id:
# Fake edge to point the successor back
# at the insertion event
event_id_graph.setdefault(
successor_event_id, []
).append(insertion_event_id)
# TODO: We also need to add fake edges to connect insertion events -> to
# the base event in the "live" DAG we branched off of, see scenario 2
# https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985
return event_id_graph
async def _process_pulled_events(
self, origin: str, events: Iterable[EventBase], backfilled: bool
) -> None:
@ -774,33 +686,6 @@ class FederationEventHandler:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
# # We want to sort topologically so we process them and tell clients
# # about them in order.
# sorted_events = []
# event_ids = [event.event_id for event in events]
# event_map = {event.event_id: event for event in events}
# event_id_graph = await self.generateEventIdGraphFromEvents(events)
# for event_id in sorted_topologically(event_ids, event_id_graph):
# sorted_events.append(event_map[event_id])
# logger.info(
# "backfill sorted_events=%s",
# [
# "event_id=%s,depth=%d,body=%s,prevs=%s\n"
# % (
# event.event_id,
# event.depth,
# event.content.get("body", event.type),
# event.prev_event_ids(),
# )
# for event in reversed(sorted_events)
# ],
# )
# for ev in reversed(sorted_events):
# with nested_logging_context(ev.event_id):
# await self._process_pulled_event(origin, ev, backfilled=backfilled)
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None: