Always process marker events regardless if backfilled

Before, we could rely on the `connected_insertion_event_query`
to navigate the and find the historical branch. But now
we solely rely on the marker event to point out the
historical branch. So we need to make sure to add
the insertion event extremeties whenever we see a marker event.
Whether it be a live event or backfilled.
This commit is contained in:
Eric Eastwood 2021-11-02 21:26:52 -05:00
parent 15c3282be7
commit 5db717ab85
3 changed files with 15 additions and 15 deletions

View file

@ -142,7 +142,7 @@ class FederationHandler:
insertion_events_to_be_backfilled: Dict[str, int] = {}
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(
await self.store.get_insertion_event_backward_extremities_in_room(
room_id
)
)
@ -1055,8 +1055,8 @@ class FederationHandler:
limit = min(limit, 100)
events = await self.store.get_backfill_events(room_id, pdu_list, limit)
logger.info(
"old implementation backfill events=%s",
logger.debug(
"on_backfill_request: backfill events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (

View file

@ -752,9 +752,9 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
logger.info(
"backfill events=%s",
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
@ -1146,6 +1146,8 @@ class FederationEventHandler:
await self._run_push_actions_and_persist_event(event, context, backfilled)
await self._handle_marker_event(origin, event)
if backfilled:
return
@ -1223,8 +1225,6 @@ class FederationEventHandler:
event.sender,
)
await self._handle_marker_event(origin, event)
async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.

View file

@ -735,7 +735,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id,
)
async def get_insertion_event_backwards_extremities_in_room(
async def get_insertion_event_backward_extremities_in_room(
self, room_id
) -> Dict[str, int]:
"""Get the insertion events we know about that we haven't backfilled yet.
@ -752,7 +752,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
Map from event_id to depth
"""
def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
@ -768,8 +768,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return dict(txn)
return await self.db_pool.runInteraction(
"get_insertion_event_backwards_extremities_in_room",
get_insertion_event_backwards_extremities_in_room_txn,
"get_insertion_event_backward_extremities_in_room",
get_insertion_event_backward_extremities_in_room_txn,
room_id,
)
@ -1089,7 +1089,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
We also handle navigating historical branches of history connected by
insertion and batch events.
"""
logger.info(
logger.debug(
"_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
room_id,
seed_event_id_list,
@ -1119,8 +1119,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
allow_none=True,
)
logger.info(
"get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
logger.debug(
"_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
room_id,
seed_event_id,
event_lookup_result["depth"],