Move back to the old get_backfill_events and simplify backfill.

We now rely on the marker events to backfill the base insertion event
which puts it as a insertion event extremity. This functionality was
already in place (see `handle_marker_event`) and was an easy transition.
This way, remote federated homeserver will have the insertion extremity
to ask about in backfill and goes down the historical branch no problem
because of the depth order and the rest of the DAG navigation happens as
normal. Yay simplification!

The key breakthrough was discussing all the ways we can find connected insertion events.
https://docs.google.com/document/d/1KCEmpnGr4J-I8EeaVQ8QJZKBDu53ViI7V62y5BzfXr0/edit#bookmark=id.1hbt9acs963h

The three options we came up were:

 - Find by insertion event prev_events (this is what we were doing before)
 - Find connected insertion events by depth
 - Find connected insertion events by the marker event
    - This made the most sense since we already backfill the insertion event
      when a marker event is processed (see `handle_marker_event`).
    - Gets rid of the extra insertion event lookup in backfill because we
      know it's already backfilled from the marker processing.
    - And gets rid of the extra federated lookup we added in this to PR
      to ask whether the homeserver requesting backfill already has the
      insertion event (deciding whether we fork
      to the history branch before we go down the "live" DAG)
This commit is contained in:
Eric Eastwood 2021-11-02 17:27:04 -05:00
parent 3529449310
commit 321f9ea68b
2 changed files with 87 additions and 67 deletions

View file

@ -1265,25 +1265,9 @@ class FederationHandler:
# Synapse asks for 100 events per backfill request. Do not allow more.
limit = min(limit, 100)
# events = await self.store.get_backfill_events(room_id, pdu_list, limit)
# logger.info(
# "old implementation backfill events=%s",
# [
# "event_id=%s,depth=%d,body=%s,prevs=%s\n"
# % (
# event.event_id,
# event.depth,
# event.content.get("body", event.type),
# event.prev_event_ids(),
# )
# for event in events
# ],
# )
events = await self.get_backfill_events(origin, room_id, pdu_list, limit)
events = await self.store.get_backfill_events(room_id, pdu_list, limit)
logger.info(
"new implementation backfill events(%d)=%s",
len(events),
"old implementation backfill events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
@ -1296,6 +1280,22 @@ class FederationHandler:
],
)
# events = await self.get_backfill_events(origin, room_id, pdu_list, limit)
# logger.info(
# "new implementation backfill events(%d)=%s",
# len(events),
# [
# "event_id=%s,depth=%d,body=%s,prevs=%s\n"
# % (
# event.event_id,
# event.depth,
# event.content.get("body", event.type),
# event.prev_event_ids(),
# )
# for event in events
# ],
# )
events = await filter_events_for_server(self.storage, origin, events)
return events

View file

@ -1106,20 +1106,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
_get_connected_prev_event_backfill_results_txn,
)
async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
async def get_backfill_events(
self, room_id: str, seed_event_id_list: list, limit: int
):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
including) the events in seed_event_id_list. Return a list of max size `limit`
Args:
room_id
event_list
seed_event_id_list
limit
"""
event_ids = await self.db_pool.runInteraction(
"get_backfill_events",
self._get_backfill_events,
room_id,
event_list,
seed_event_id_list,
limit,
)
events = await self.get_events_as_list(event_ids)
@ -1127,10 +1129,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
)
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
logger.info(
"_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
room_id,
seed_event_id_list,
limit,
)
event_results = set()
event_id_results = set()
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
@ -1181,11 +1188,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# going backwards in time. stream_ordering follows the same pattern.
queue = PriorityQueue()
for event_id in event_list:
for seed_event_id in seed_event_id_list:
event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id, "room_id": room_id},
keyvalues={"event_id": seed_event_id, "room_id": room_id},
retcols=(
"type",
"depth",
@ -1194,57 +1201,66 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
allow_none=True,
)
logger.info(
"get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
room_id,
seed_event_id,
event_lookup_result["depth"],
event_lookup_result["stream_ordering"],
event_lookup_result["type"],
)
if event_lookup_result["depth"]:
queue.put(
(
-event_lookup_result["depth"],
-event_lookup_result["stream_ordering"],
event_id,
seed_event_id,
event_lookup_result["type"],
)
)
while not queue.empty() and len(event_results) < limit:
while not queue.empty() and len(event_id_results) < limit:
try:
_, _, event_id, event_type = queue.get_nowait()
except Empty:
break
if event_id in event_results:
if event_id in event_id_results:
continue
event_results.add(event_id)
event_id_results.add(event_id)
# Try and find any potential historical batches of message history.
if self.hs.config.experimental.msc2716_enabled:
# Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we'll add them to the queue
# and navigate up the DAG like normal in the next iteration of the
# loop.
txn.execute(
connected_insertion_event_query,
(event_id, limit - len(event_results)),
)
connected_insertion_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event_id = row[2]
connected_insertion_event_type = row[3]
if connected_insertion_event_id not in event_results:
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event_id,
connected_insertion_event_type,
)
)
# # First we look for an insertion event connected to the current
# # event (by prev_event). If we find any, we'll add them to the queue
# # and navigate up the DAG like normal in the next iteration of the
# # loop.
# txn.execute(
# connected_insertion_event_query,
# (event_id, limit - len(event_id_results)),
# )
# connected_insertion_event_id_results = txn.fetchall()
# logger.debug(
# "_get_backfill_events(room_id=%s): connected_insertion_event_query %s",
# room_id,
# connected_insertion_event_id_results,
# )
# for row in connected_insertion_event_id_results:
# connected_insertion_event_depth = row[0]
# connected_insertion_event_stream_ordering = row[1]
# connected_insertion_event_id = row[2]
# connected_insertion_event_type = row[3]
# if connected_insertion_event_id not in event_id_results:
# queue.put(
# (
# -connected_insertion_event_depth,
# -connected_insertion_event_stream_ordering,
# connected_insertion_event_id,
# connected_insertion_event_type,
# )
# )
# Second, we need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
@ -1254,31 +1270,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# Find any batch connections for the given insertion event
txn.execute(
batch_connection_query,
(event_id, limit - len(event_results)),
(event_id, limit - len(event_id_results)),
)
batch_start_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: batch_start_event_id_results %s",
"_get_backfill_events(room_id=%s): batch_start_event_id_results %s",
room_id,
batch_start_event_id_results,
)
for row in batch_start_event_id_results:
if row[2] not in event_results:
if row[2] not in event_id_results:
queue.put((-row[0], -row[1], row[2], row[3]))
# Now we just look up the DAG by prev_events as normal
txn.execute(
connected_prev_event_query,
(event_id, False, limit - len(event_results)),
(event_id, False, limit - len(event_id_results)),
)
prev_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
"_get_backfill_events(room_id=%s): prev_event_ids %s",
room_id,
prev_event_id_results,
)
for row in prev_event_id_results:
if row[2] not in event_results:
if row[2] not in event_id_results:
queue.put((-row[0], -row[1], row[2], row[3]))
return event_results
return event_id_results
async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(