Put MSC2716 backfill logic behind experimental feature flag

This commit is contained in:
Eric Eastwood 2021-10-21 04:31:10 -05:00
parent 69dfa16dcb
commit 83474d9158
2 changed files with 60 additions and 50 deletions

View file

@ -145,9 +145,14 @@ class FederationHandler:
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
insertion_events_to_be_backfilled = []
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(
room_id
)
)
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,

View file

@ -14,7 +14,7 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, OrderedDict, Tuple
from typing import Collection, Dict, Iterable, List, Optional, OrderedDict, Set, Tuple
from prometheus_client import Counter, Gauge
@ -62,6 +62,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self.hs = hs
if hs.config.worker.run_background_tasks:
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
@ -1053,10 +1055,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"""
# In a PriorityQueue, the lowest valued entries are retrieved first.
# We're using depth as the priority in the queue.
# Depth is lowest at the oldest-in-time message and highest and
# newest-in-time message. We add events to the queue with a negative depth so that
# we process the newest-in-time messages first going backwards in time.
# We're using depth as the priority in the queue and tie-break based on
# stream_ordering. Depth is lowest at the oldest-in-time message and
# highest and newest-in-time message. We add events to the queue with a
# negative depth so that we process the newest-in-time messages first
# going backwards in time. stream_ordering follows the same pattern.
queue = PriorityQueue()
for event_id in event_list:
@ -1093,53 +1096,55 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
# Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we'll add them to the queue
# and navigate up the DAG like normal in the next iteration of the
# loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
)
connected_insertion_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event_id = row[2]
connected_insertion_event_type = row[3]
if connected_insertion_event_id not in event_results:
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event_id,
connected_insertion_event_type,
)
)
# Second, we need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
# add them to the queue and navigate up the DAG like normal in the
# next iteration of the loop.
if event_type == EventTypes.MSC2716_INSERTION:
# Find any batch connections for the given insertion event
if self.hs.config.experimental.msc2716_enabled:
# Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we'll add them to the queue
# and navigate up the DAG like normal in the next iteration of the
# loop.
txn.execute(
batch_connection_query,
connected_insertion_event_query,
(event_id, limit - len(event_results)),
)
batch_start_event_id_results = txn.fetchall()
connected_insertion_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: batch_start_event_id_results %s",
batch_start_event_id_results,
"_get_backfill_events: connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
for row in batch_start_event_id_results:
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2], row[3]))
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event_id = row[2]
connected_insertion_event_type = row[3]
if connected_insertion_event_id not in event_results:
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event_id,
connected_insertion_event_type,
)
)
# Second, we need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
# add them to the queue and navigate up the DAG like normal in the
# next iteration of the loop.
if event_type == EventTypes.MSC2716_INSERTION:
# Find any batch connections for the given insertion event
txn.execute(
batch_connection_query,
(event_id, limit - len(event_results)),
)
batch_start_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: batch_start_event_id_results %s",
batch_start_event_id_results,
)
for row in batch_start_event_id_results:
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2], row[3]))
txn.execute(
connected_prev_event_query,