WIP: Don't include the event we branch from

We want to backfill all of the history before adding the base event.

But then there is a problem of how do we add the base event after
exhausting all of the historical messages. Backfill will give
us that extremity again but the current code will always
choose the historical branch over and over and never move past it.

I wish we could ask the federated homeserver if it already
has the insertion event locally but we can't make any requests
in the store code here :/
This commit is contained in:
Eric Eastwood 2021-10-28 20:24:00 -05:00
parent 4a12304cf7
commit 9a6d8faafe

View file

@ -1094,8 +1094,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
if event_id in event_results:
continue
event_results.add(event_id)
found_connected_historical_messages = False
if self.hs.config.experimental.msc2716_enabled:
# Try and find any potential historical batches of message history.
#
@ -1117,7 +1116,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event_id = row[2]
connected_insertion_event_type = row[3]
if connected_insertion_event_id not in event_results:
found_connected_historical_messages = True
queue.put(
(
-connected_insertion_event_depth,
@ -1146,18 +1147,26 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2], row[3]))
txn.execute(
connected_prev_event_query,
(event_id, False, limit - len(event_results)),
)
prev_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)
# Only add the event_result itself if we didn't branch off on the history first
# TODO: How can we not branch off to the historical batch if
# the federated homeserver already has it backfilled? We
# can't make any requests here (no async stuff and should
# really only be database calls)
if not found_connected_historical_messages:
event_results.add(event_id)
for row in prev_event_id_results:
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2], row[3]))
txn.execute(
connected_prev_event_query,
(event_id, False, limit - len(event_results)),
)
prev_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)
for row in prev_event_id_results:
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2], row[3]))
return event_results