Avoid constant missing prev_event fetching while backfilling
Persist backfilled event response from oldest -> newest to avoid having to go fetch missing prev_events which de-outliers every other event and screws up the stream_ordering. Missing prev_events aren't fetched as "backfilled" so the stream_ordering was incrementing. This helps us in MSC2716 land where we can more easily copy a similar stream_ordering that the originating homeserver has.
This commit is contained in:
parent
438e2226cc
commit
4983739156
|
@ -441,7 +441,37 @@ class FederationEventHandler:
|
|||
f"room {ev.room_id}, when we were backfilling in {room_id}"
|
||||
)
|
||||
|
||||
await self._process_pulled_events(dest, events, backfilled=True)
|
||||
await self._process_pulled_events(
|
||||
dest,
|
||||
# The /backfill response should start from `?v` and include the
|
||||
# events that preceded it (so the list will be newest -> oldest). We
|
||||
# reverse that order so the messages are oldest -> newest and we can
|
||||
# persist the backfilled events without constantly have to go fetch
|
||||
# missing prev_events which are probably included in the same
|
||||
# backfill chunk.
|
||||
reversed(events),
|
||||
backfilled=True,
|
||||
)
|
||||
|
||||
for ev in events:
|
||||
event_after_persisted = await self._store.get_event(
|
||||
ev.event_id, allow_none=True
|
||||
)
|
||||
|
||||
if event_after_persisted:
|
||||
logger.info(
|
||||
"from remote server: processed backfilled event_id=%s type=%s depth=%s stream_ordering=%s content=%s",
|
||||
ev.event_id,
|
||||
event_after_persisted["type"],
|
||||
event_after_persisted["depth"],
|
||||
event_after_persisted.internal_metadata.stream_ordering,
|
||||
event_after_persisted["content"].get("body", None),
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"from remote server: processed backfilled event_id=%s failed to lookup",
|
||||
ev.event_id,
|
||||
)
|
||||
|
||||
async def _get_missing_events_for_pdu(
|
||||
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
|
||||
|
|
|
@ -561,6 +561,7 @@ class RoomMessageListRestServlet(RestServlet):
|
|||
pagination_config = await PaginationConfig.from_request(
|
||||
self.store, request, default_limit=10
|
||||
)
|
||||
logger.info("/messages rest start pagination_config=%s", pagination_config)
|
||||
# Twisted will have processed the args by now.
|
||||
assert request.args is not None
|
||||
as_client_event = b"raw" not in request.args
|
||||
|
@ -585,6 +586,7 @@ class RoomMessageListRestServlet(RestServlet):
|
|||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
logger.info("/messages rest end msgs=%s", msgs)
|
||||
return 200, msgs
|
||||
|
||||
|
||||
|
|
|
@ -1141,13 +1141,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
connected_insertion_event_depth = row[0]
|
||||
connected_insertion_event_stream_ordering = row[1]
|
||||
connected_insertion_event = row[2]
|
||||
queue.put(
|
||||
(
|
||||
-connected_insertion_event_depth,
|
||||
-connected_insertion_event_stream_ordering,
|
||||
connected_insertion_event,
|
||||
if connected_insertion_event not in event_results:
|
||||
queue.put(
|
||||
(
|
||||
-connected_insertion_event_depth,
|
||||
-connected_insertion_event_stream_ordering,
|
||||
connected_insertion_event,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# Find any batch connections for the given insertion event
|
||||
txn.execute(
|
||||
|
@ -1169,9 +1170,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
|
||||
)
|
||||
|
||||
# TODO: Find out why stream_ordering is all out of order compared to
|
||||
# when we persisted the events
|
||||
|
||||
# TODO: We should probably skip adding the event itself if we
|
||||
# branched off onto the insertion event first above. Need to make this a
|
||||
# bit smart so it doesn't skip over the event altogether if we're at
|
||||
|
|
|
@ -169,6 +169,14 @@ class PersistEventsStore:
|
|||
|
||||
async with stream_ordering_manager as stream_orderings:
|
||||
for (event, _), stream in zip(events_and_contexts, stream_orderings):
|
||||
logger.info(
|
||||
"_persist_events_and_state_updates backfilled=%s event_id=%s depth=%s stream_ordering=%s content=%s",
|
||||
backfilled,
|
||||
event.event_id,
|
||||
event.depth,
|
||||
stream,
|
||||
event["content"].get("body", None),
|
||||
)
|
||||
event.internal_metadata.stream_ordering = stream
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
|
|
|
@ -1166,6 +1166,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
|
|||
"order": order,
|
||||
}
|
||||
|
||||
logger.info("stream: getting events sql=%s args=%s", sql, args)
|
||||
txn.execute(sql, args)
|
||||
|
||||
# Filter the result set.
|
||||
|
@ -1236,6 +1237,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
|
|||
event_filter,
|
||||
)
|
||||
|
||||
logger.info("paginate_room_events event_ids(%d)=%s", len(rows), [r.event_id for r in rows])
|
||||
events = await self.get_events_as_list(
|
||||
[r.event_id for r in rows], get_prev_content=True
|
||||
)
|
||||
|
|
|
@ -309,6 +309,11 @@ class EventsPersistenceStorage:
|
|||
matched the transcation ID; the existing event is returned in such
|
||||
a case.
|
||||
"""
|
||||
# logger.info(
|
||||
# "persist_events backfilled=%s events_and_contexts=%s",
|
||||
# backfilled,
|
||||
# events_and_contexts,
|
||||
# )
|
||||
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
||||
for event, ctx in events_and_contexts:
|
||||
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
||||
|
|
Loading…
Reference in a new issue