From f30302db588c0865cba8d1bc966939ab3bac4185 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Oct 2021 15:24:19 -0500 Subject: [PATCH] Scratch debugging why events appear out of order on remote homeservers --- synapse/events/utils.py | 17 ++-- synapse/handlers/federation.py | 10 +- synapse/handlers/room_batch.py | 9 +- .../databases/main/event_federation.py | 94 +++++++++++++++---- 4 files changed, 100 insertions(+), 30 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 23bd24d963..895835abee 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -283,13 +283,13 @@ def format_event_for_client_v1(d: JsonDict) -> JsonDict: def format_event_for_client_v2(d: JsonDict) -> JsonDict: drop_keys = ( - "auth_events", - "prev_events", - "hashes", - "signatures", - "depth", - "origin", - "prev_state", + # "auth_events", + # "prev_events", + # "hashes", + # "signatures", + # "depth", + # "origin", + # "prev_state", ) for key in drop_keys: d.pop(key, None) @@ -340,6 +340,9 @@ def serialize_event( d["event_id"] = e.event_id + # TODO: Remove + d["stream_ordering"] = e.internal_metadata.stream_ordering + if "age_ts" in d["unsigned"]: d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"] del d["unsigned"]["age_ts"] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e072efad16..355291ff45 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -148,14 +148,14 @@ class FederationHandler: insertion_events_to_be_backfilled = ( await self.store.get_insertion_event_backwards_extremities_in_room(room_id) ) - logger.debug( + logger.info( "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s", oldest_events_with_depth, insertion_events_to_be_backfilled, ) if not oldest_events_with_depth and not insertion_events_to_be_backfilled: - logger.debug("Not backfilling as no extremeties found.") + logger.info("Not backfilling as no extremeties found.") return False # We only want to paginate if we can actually see the events we'll get, @@ -203,7 +203,7 @@ class FederationHandler: redact=False, check_history_visibility_only=True, ) - logger.debug( + logger.info( "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities ) @@ -230,7 +230,7 @@ class FederationHandler: # much larger factor will result in triggering a backfill request much # earlier than necessary. if current_depth - 2 * limit > max_depth: - logger.debug( + logger.info( "Not backfilling as we don't need to. %d < %d - 2 * %d", max_depth, current_depth, @@ -249,7 +249,7 @@ class FederationHandler: t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth ] - logger.debug( + logger.info( "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s", room_id, current_depth, diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 2f5a3e4d19..88c7d4b001 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -353,13 +353,20 @@ class RoomBatchHandler: # Events are sorted by (topological_ordering, stream_ordering) # where topological_ordering is just depth. for (event, context) in reversed(events_to_persist): - await self.event_creation_handler.handle_new_client_event( + result_event = await self.event_creation_handler.handle_new_client_event( await self.create_requester_for_user_id_from_app_service( event["sender"], app_service_requester.app_service ), event=event, context=context, ) + logger.info( + "result_event depth=%s stream_ordering=%s event_id=%s body=%s", + result_event.depth, + result_event.internal_metadata.stream_ordering, + result_event.event_id, + result_event.content.get("body", None), + ) return event_ids diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 10184d6ae7..c857158648 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1014,19 +1014,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Look for the prev_event_id connected to the given event_id query = """ - SELECT depth, prev_event_id FROM event_edges - /* Get the depth of the prev_event_id from the events table */ + SELECT depth, stream_ordering, prev_event_id FROM event_edges + /* Get the depth and stream_ordering of the prev_event_id from the events table */ INNER JOIN events ON prev_event_id = events.event_id - /* Find an event which matches the given event_id */ + /* Look for an edge which matches the given event_id */ WHERE event_edges.event_id = ? AND event_edges.is_state = ? + /* Because we can have many events at the same depth, + * we want to also tie-break and sort on stream_ordering */ + ORDER BY depth DESC, stream_ordering DESC LIMIT ? """ # Look for the "insertion" events connected to the given event_id connected_insertion_event_query = """ - SELECT e.depth, i.event_id FROM insertion_event_edges AS i + SELECT e.depth, e.stream_ordering, i.event_id FROM insertion_event_edges AS i /* Get the depth of the insertion event from the events table */ INNER JOIN events AS e USING (event_id) /* Find an insertion event which points via prev_events to the given event_id */ @@ -1036,7 +1039,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Find any batch connections of a given insertion event batch_connection_query = """ - SELECT e.depth, c.event_id FROM insertion_events AS i + SELECT e.depth, e.stream_ordering, c.event_id FROM insertion_events AS i /* Find the batch that connects to the given insertion event */ INNER JOIN batch_events AS c ON i.next_batch_id = c.batch_id @@ -1055,26 +1058,68 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas queue = PriorityQueue() for event_id in event_list: - depth = self.db_pool.simple_select_one_onecol_txn( + event_lookup_result = self.db_pool.simple_select_one_txn( txn, table="events", keyvalues={"event_id": event_id, "room_id": room_id}, - retcol="depth", + retcols=( + "depth", + "stream_ordering", + ), allow_none=True, ) - if depth: - queue.put((-depth, event_id)) + if event_lookup_result["depth"]: + queue.put( + ( + -event_lookup_result["depth"], + -event_lookup_result["stream_ordering"], + event_id, + ) + ) while not queue.empty() and len(event_results) < limit: try: - _, event_id = queue.get_nowait() + _, _, event_id = queue.get_nowait() except Empty: break if event_id in event_results: continue + event_lookup_result = self.db_pool.simple_select_one_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + retcols=["type", "depth", "stream_ordering", "content"], + allow_none=True, + ) + + event_json_lookup_result = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + retcol="json", + allow_none=True, + ) + + ev = db_to_json(event_json_lookup_result) + + if event_lookup_result: + logger.info( + "_get_backfill_events: event_results add event_id=%s type=%s depth=%s stream_ordering=%s content=%s", + event_id, + ev["type"], + ev["depth"], + event_lookup_result["stream_ordering"], + ev["content"].get("body", None), + ) + else: + logger.info( + "_get_backfill_events: event_results event_id=%s failed to lookup", + event_id, + ) + event_results.add(event_id) # Try and find any potential historical batches of message history. @@ -1094,8 +1139,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) for row in connected_insertion_event_id_results: connected_insertion_event_depth = row[0] - connected_insertion_event = row[1] - queue.put((-connected_insertion_event_depth, connected_insertion_event)) + connected_insertion_event_stream_ordering = row[1] + connected_insertion_event = row[2] + queue.put( + ( + -connected_insertion_event_depth, + -connected_insertion_event_stream_ordering, + connected_insertion_event, + ) + ) # Find any batch connections for the given insertion event txn.execute( @@ -1108,18 +1160,26 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas batch_start_event_id_results, ) for row in batch_start_event_id_results: - if row[1] not in event_results: - queue.put((-row[0], row[1])) + if row[2] not in event_results: + queue.put((-row[0], -row[1], row[2])) txn.execute(query, (event_id, False, limit - len(event_results))) prev_event_id_results = txn.fetchall() - logger.debug( + logger.info( "_get_backfill_events: prev_event_ids %s", prev_event_id_results ) + # TODO: Find out why stream_ordering is all out of order compared to + # when we persisted the events + + # TODO: We should probably skip adding the event itself if we + # branched off onto the insertion event first above. Need to make this a + # bit smart so it doesn't skip over the event altogether if we're at + # the end of the historical messages. + for row in prev_event_id_results: - if row[1] not in event_results: - queue.put((-row[0], row[1])) + if row[2] not in event_results: + queue.put((-row[0], -row[1], row[2])) return event_results