mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-18 16:04:35 +01:00
Merge pull request #4798 from matrix-org/rav/rr_debug
Add some debug about processing read receipts.
This commit is contained in:
commit
48583cef7e
2 changed files with 21 additions and 6 deletions
1
changelog.d/4798.misc
Normal file
1
changelog.d/4798.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add some debug about processing read receipts.
|
|
@ -346,15 +346,23 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
|
|
||||||
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
|
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
|
||||||
user_id, event_id, data, stream_id):
|
user_id, event_id, data, stream_id):
|
||||||
|
"""Inserts a read-receipt into the database if it's newer than the current RR
|
||||||
|
|
||||||
|
Returns: int|None
|
||||||
|
None if the RR is older than the current RR
|
||||||
|
otherwise, the rx timestamp of the event that the RR corresponds to
|
||||||
|
(or 0 if the event is unknown)
|
||||||
|
"""
|
||||||
res = self._simple_select_one_txn(
|
res = self._simple_select_one_txn(
|
||||||
txn,
|
txn,
|
||||||
table="events",
|
table="events",
|
||||||
retcols=["topological_ordering", "stream_ordering"],
|
retcols=["stream_ordering", "received_ts"],
|
||||||
keyvalues={"event_id": event_id},
|
keyvalues={"event_id": event_id},
|
||||||
allow_none=True
|
allow_none=True
|
||||||
)
|
)
|
||||||
|
|
||||||
stream_ordering = int(res["stream_ordering"]) if res else None
|
stream_ordering = int(res["stream_ordering"]) if res else None
|
||||||
|
rx_ts = res["received_ts"] if res else 0
|
||||||
|
|
||||||
# We don't want to clobber receipts for more recent events, so we
|
# We don't want to clobber receipts for more recent events, so we
|
||||||
# have to compare orderings of existing receipts
|
# have to compare orderings of existing receipts
|
||||||
|
@ -373,7 +381,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
"one for later event %s",
|
"one for later event %s",
|
||||||
event_id, eid,
|
event_id, eid,
|
||||||
)
|
)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
|
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
|
||||||
|
@ -429,7 +437,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
stream_ordering=stream_ordering,
|
stream_ordering=stream_ordering,
|
||||||
)
|
)
|
||||||
|
|
||||||
return True
|
return rx_ts
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data):
|
def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data):
|
||||||
|
@ -466,7 +474,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
|
|
||||||
stream_id_manager = self._receipts_id_gen.get_next()
|
stream_id_manager = self._receipts_id_gen.get_next()
|
||||||
with stream_id_manager as stream_id:
|
with stream_id_manager as stream_id:
|
||||||
have_persisted = yield self.runInteraction(
|
event_ts = yield self.runInteraction(
|
||||||
"insert_linearized_receipt",
|
"insert_linearized_receipt",
|
||||||
self.insert_linearized_receipt_txn,
|
self.insert_linearized_receipt_txn,
|
||||||
room_id, receipt_type, user_id, linearized_event_id,
|
room_id, receipt_type, user_id, linearized_event_id,
|
||||||
|
@ -474,9 +482,15 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
stream_id=stream_id,
|
stream_id=stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
if not have_persisted:
|
if event_ts is None:
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
|
now = self._clock.time_msec()
|
||||||
|
logger.debug(
|
||||||
|
"RR for event %s in %s (%i ms old)",
|
||||||
|
linearized_event_id, room_id, now - event_ts,
|
||||||
|
)
|
||||||
|
|
||||||
yield self.insert_graph_receipt(
|
yield self.insert_graph_receipt(
|
||||||
room_id, receipt_type, user_id, event_ids, data
|
room_id, receipt_type, user_id, event_ids, data
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue