Scratch debugging why events appear out of order on remote homeservers
This commit is contained in:
parent
daf498e099
commit
f30302db58
|
@ -283,13 +283,13 @@ def format_event_for_client_v1(d: JsonDict) -> JsonDict:
|
|||
|
||||
def format_event_for_client_v2(d: JsonDict) -> JsonDict:
|
||||
drop_keys = (
|
||||
"auth_events",
|
||||
"prev_events",
|
||||
"hashes",
|
||||
"signatures",
|
||||
"depth",
|
||||
"origin",
|
||||
"prev_state",
|
||||
# "auth_events",
|
||||
# "prev_events",
|
||||
# "hashes",
|
||||
# "signatures",
|
||||
# "depth",
|
||||
# "origin",
|
||||
# "prev_state",
|
||||
)
|
||||
for key in drop_keys:
|
||||
d.pop(key, None)
|
||||
|
@ -340,6 +340,9 @@ def serialize_event(
|
|||
|
||||
d["event_id"] = e.event_id
|
||||
|
||||
# TODO: Remove
|
||||
d["stream_ordering"] = e.internal_metadata.stream_ordering
|
||||
|
||||
if "age_ts" in d["unsigned"]:
|
||||
d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"]
|
||||
del d["unsigned"]["age_ts"]
|
||||
|
|
|
@ -148,14 +148,14 @@ class FederationHandler:
|
|||
insertion_events_to_be_backfilled = (
|
||||
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
|
||||
)
|
||||
logger.debug(
|
||||
logger.info(
|
||||
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
|
||||
oldest_events_with_depth,
|
||||
insertion_events_to_be_backfilled,
|
||||
)
|
||||
|
||||
if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
|
||||
logger.debug("Not backfilling as no extremeties found.")
|
||||
logger.info("Not backfilling as no extremeties found.")
|
||||
return False
|
||||
|
||||
# We only want to paginate if we can actually see the events we'll get,
|
||||
|
@ -203,7 +203,7 @@ class FederationHandler:
|
|||
redact=False,
|
||||
check_history_visibility_only=True,
|
||||
)
|
||||
logger.debug(
|
||||
logger.info(
|
||||
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
|
||||
)
|
||||
|
||||
|
@ -230,7 +230,7 @@ class FederationHandler:
|
|||
# much larger factor will result in triggering a backfill request much
|
||||
# earlier than necessary.
|
||||
if current_depth - 2 * limit > max_depth:
|
||||
logger.debug(
|
||||
logger.info(
|
||||
"Not backfilling as we don't need to. %d < %d - 2 * %d",
|
||||
max_depth,
|
||||
current_depth,
|
||||
|
@ -249,7 +249,7 @@ class FederationHandler:
|
|||
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
|
||||
]
|
||||
|
||||
logger.debug(
|
||||
logger.info(
|
||||
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
|
||||
room_id,
|
||||
current_depth,
|
||||
|
|
|
@ -353,13 +353,20 @@ class RoomBatchHandler:
|
|||
# Events are sorted by (topological_ordering, stream_ordering)
|
||||
# where topological_ordering is just depth.
|
||||
for (event, context) in reversed(events_to_persist):
|
||||
await self.event_creation_handler.handle_new_client_event(
|
||||
result_event = await self.event_creation_handler.handle_new_client_event(
|
||||
await self.create_requester_for_user_id_from_app_service(
|
||||
event["sender"], app_service_requester.app_service
|
||||
),
|
||||
event=event,
|
||||
context=context,
|
||||
)
|
||||
logger.info(
|
||||
"result_event depth=%s stream_ordering=%s event_id=%s body=%s",
|
||||
result_event.depth,
|
||||
result_event.internal_metadata.stream_ordering,
|
||||
result_event.event_id,
|
||||
result_event.content.get("body", None),
|
||||
)
|
||||
|
||||
return event_ids
|
||||
|
||||
|
|
|
@ -1014,19 +1014,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
|
||||
# Look for the prev_event_id connected to the given event_id
|
||||
query = """
|
||||
SELECT depth, prev_event_id FROM event_edges
|
||||
/* Get the depth of the prev_event_id from the events table */
|
||||
SELECT depth, stream_ordering, prev_event_id FROM event_edges
|
||||
/* Get the depth and stream_ordering of the prev_event_id from the events table */
|
||||
INNER JOIN events
|
||||
ON prev_event_id = events.event_id
|
||||
/* Find an event which matches the given event_id */
|
||||
/* Look for an edge which matches the given event_id */
|
||||
WHERE event_edges.event_id = ?
|
||||
AND event_edges.is_state = ?
|
||||
/* Because we can have many events at the same depth,
|
||||
* we want to also tie-break and sort on stream_ordering */
|
||||
ORDER BY depth DESC, stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
# Look for the "insertion" events connected to the given event_id
|
||||
connected_insertion_event_query = """
|
||||
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
|
||||
SELECT e.depth, e.stream_ordering, i.event_id FROM insertion_event_edges AS i
|
||||
/* Get the depth of the insertion event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
/* Find an insertion event which points via prev_events to the given event_id */
|
||||
|
@ -1036,7 +1039,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
|
||||
# Find any batch connections of a given insertion event
|
||||
batch_connection_query = """
|
||||
SELECT e.depth, c.event_id FROM insertion_events AS i
|
||||
SELECT e.depth, e.stream_ordering, c.event_id FROM insertion_events AS i
|
||||
/* Find the batch that connects to the given insertion event */
|
||||
INNER JOIN batch_events AS c
|
||||
ON i.next_batch_id = c.batch_id
|
||||
|
@ -1055,26 +1058,68 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
queue = PriorityQueue()
|
||||
|
||||
for event_id in event_list:
|
||||
depth = self.db_pool.simple_select_one_onecol_txn(
|
||||
event_lookup_result = self.db_pool.simple_select_one_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": event_id, "room_id": room_id},
|
||||
retcol="depth",
|
||||
retcols=(
|
||||
"depth",
|
||||
"stream_ordering",
|
||||
),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if depth:
|
||||
queue.put((-depth, event_id))
|
||||
if event_lookup_result["depth"]:
|
||||
queue.put(
|
||||
(
|
||||
-event_lookup_result["depth"],
|
||||
-event_lookup_result["stream_ordering"],
|
||||
event_id,
|
||||
)
|
||||
)
|
||||
|
||||
while not queue.empty() and len(event_results) < limit:
|
||||
try:
|
||||
_, event_id = queue.get_nowait()
|
||||
_, _, event_id = queue.get_nowait()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
if event_id in event_results:
|
||||
continue
|
||||
|
||||
event_lookup_result = self.db_pool.simple_select_one_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": event_id},
|
||||
retcols=["type", "depth", "stream_ordering", "content"],
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
event_json_lookup_result = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="event_json",
|
||||
keyvalues={"event_id": event_id},
|
||||
retcol="json",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
ev = db_to_json(event_json_lookup_result)
|
||||
|
||||
if event_lookup_result:
|
||||
logger.info(
|
||||
"_get_backfill_events: event_results add event_id=%s type=%s depth=%s stream_ordering=%s content=%s",
|
||||
event_id,
|
||||
ev["type"],
|
||||
ev["depth"],
|
||||
event_lookup_result["stream_ordering"],
|
||||
ev["content"].get("body", None),
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"_get_backfill_events: event_results event_id=%s failed to lookup",
|
||||
event_id,
|
||||
)
|
||||
|
||||
event_results.add(event_id)
|
||||
|
||||
# Try and find any potential historical batches of message history.
|
||||
|
@ -1094,8 +1139,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
)
|
||||
for row in connected_insertion_event_id_results:
|
||||
connected_insertion_event_depth = row[0]
|
||||
connected_insertion_event = row[1]
|
||||
queue.put((-connected_insertion_event_depth, connected_insertion_event))
|
||||
connected_insertion_event_stream_ordering = row[1]
|
||||
connected_insertion_event = row[2]
|
||||
queue.put(
|
||||
(
|
||||
-connected_insertion_event_depth,
|
||||
-connected_insertion_event_stream_ordering,
|
||||
connected_insertion_event,
|
||||
)
|
||||
)
|
||||
|
||||
# Find any batch connections for the given insertion event
|
||||
txn.execute(
|
||||
|
@ -1108,18 +1160,26 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
batch_start_event_id_results,
|
||||
)
|
||||
for row in batch_start_event_id_results:
|
||||
if row[1] not in event_results:
|
||||
queue.put((-row[0], row[1]))
|
||||
if row[2] not in event_results:
|
||||
queue.put((-row[0], -row[1], row[2]))
|
||||
|
||||
txn.execute(query, (event_id, False, limit - len(event_results)))
|
||||
prev_event_id_results = txn.fetchall()
|
||||
logger.debug(
|
||||
logger.info(
|
||||
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
|
||||
)
|
||||
|
||||
# TODO: Find out why stream_ordering is all out of order compared to
|
||||
# when we persisted the events
|
||||
|
||||
# TODO: We should probably skip adding the event itself if we
|
||||
# branched off onto the insertion event first above. Need to make this a
|
||||
# bit smart so it doesn't skip over the event altogether if we're at
|
||||
# the end of the historical messages.
|
||||
|
||||
for row in prev_event_id_results:
|
||||
if row[1] not in event_results:
|
||||
queue.put((-row[0], row[1]))
|
||||
if row[2] not in event_results:
|
||||
queue.put((-row[0], -row[1], row[2]))
|
||||
|
||||
return event_results
|
||||
|
||||
|
|
Loading…
Reference in a new issue