Improve comments (& avoid a duplicate query) in push actions processing. (#13455)

* Adds docstrings and inline comments.
* Formats SQL queries using triple quoted strings.
* Minor formatting changes.
* Avoid fetching `event_push_summary_stream_ordering` multiple times
  in the same transactions.
This commit is contained in:
Patrick Cloke 2022-08-04 15:24:44 -04:00 committed by GitHub
parent 96d92156d0
commit ec24813220
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 159 additions and 124 deletions

1
changelog.d/13455.misc Normal file
View file

@ -0,0 +1 @@
Add some comments about how event push actions are stored.

View file

@ -265,7 +265,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
counts.notify_count += row[1] counts.notify_count += row[1]
counts.unread_count += row[2] counts.unread_count += row[2]
# Next we need to count highlights, which aren't summarized # Next we need to count highlights, which aren't summarised
sql = """ sql = """
SELECT COUNT(*) FROM event_push_actions SELECT COUNT(*) FROM event_push_actions
WHERE user_id = ? WHERE user_id = ?
@ -280,7 +280,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# Finally we need to count push actions that aren't included in the # Finally we need to count push actions that aren't included in the
# summary returned above, e.g. recent events that haven't been # summary returned above, e.g. recent events that haven't been
# summarized yet, or the summary is empty due to a recent read receipt. # summarised yet, or the summary is empty due to a recent read receipt.
stream_ordering = max(stream_ordering, summary_stream_ordering) stream_ordering = max(stream_ordering, summary_stream_ordering)
notify_count, unread_count = self._get_notif_unread_count_for_user_room( notify_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering txn, room_id, user_id, stream_ordering
@ -304,6 +304,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
Does not consult `event_push_summary` table, which may include push Does not consult `event_push_summary` table, which may include push
actions that have been deleted from `event_push_actions` table. actions that have been deleted from `event_push_actions` table.
Args:
txn: The database transaction.
room_id: The room ID to get unread counts for.
user_id: The user ID to get unread counts for.
stream_ordering: The (exclusive) minimum stream ordering to consider.
max_stream_ordering: The (inclusive) maximum stream ordering to consider.
If this is not given, then no maximum is applied.
Return:
A tuple of the notif count and unread count in the given range.
""" """
# If there have been no events in the room since the stream ordering, # If there have been no events in the room since the stream ordering,
@ -383,27 +394,27 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
) -> List[Tuple[str, str, int, str, bool]]: ) -> List[Tuple[str, str, int, str, bool]]:
# find rooms that have a read receipt in them and return the next # find rooms that have a read receipt in them and return the next
# push actions # push actions
sql = ( sql = """
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
" ep.highlight " ep.highlight
" FROM (" FROM (
" SELECT room_id," SELECT room_id,
" MAX(stream_ordering) as stream_ordering" MAX(stream_ordering) as stream_ordering
" FROM events" FROM events
" INNER JOIN receipts_linearized USING (room_id, event_id)" INNER JOIN receipts_linearized USING (room_id, event_id)
" WHERE receipt_type = 'm.read' AND user_id = ?" WHERE receipt_type = 'm.read' AND user_id = ?
" GROUP BY room_id" GROUP BY room_id
") AS rl," ) AS rl,
" event_push_actions AS ep" event_push_actions AS ep
" WHERE" WHERE
" ep.room_id = rl.room_id" ep.room_id = rl.room_id
" AND ep.stream_ordering > rl.stream_ordering" AND ep.stream_ordering > rl.stream_ordering
" AND ep.user_id = ?" AND ep.user_id = ?
" AND ep.stream_ordering > ?" AND ep.stream_ordering > ?
" AND ep.stream_ordering <= ?" AND ep.stream_ordering <= ?
" AND ep.notif = 1" AND ep.notif = 1
" ORDER BY ep.stream_ordering ASC LIMIT ?" ORDER BY ep.stream_ordering ASC LIMIT ?
) """
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args) txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@ -418,23 +429,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt( def get_no_receipt(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]: ) -> List[Tuple[str, str, int, str, bool]]:
sql = ( sql = """
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
" ep.highlight " ep.highlight
" FROM event_push_actions AS ep" FROM event_push_actions AS ep
" INNER JOIN events AS e USING (room_id, event_id)" INNER JOIN events AS e USING (room_id, event_id)
" WHERE" WHERE
" ep.room_id NOT IN (" ep.room_id NOT IN (
" SELECT room_id FROM receipts_linearized" SELECT room_id FROM receipts_linearized
" WHERE receipt_type = 'm.read' AND user_id = ?" WHERE receipt_type = 'm.read' AND user_id = ?
" GROUP BY room_id" GROUP BY room_id
" )"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
) )
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args) txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@ -490,28 +501,28 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_after_receipt( def get_after_receipt(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]: ) -> List[Tuple[str, str, int, str, bool, int]]:
sql = ( sql = """
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
" ep.highlight, e.received_ts" ep.highlight, e.received_ts
" FROM (" FROM (
" SELECT room_id," SELECT room_id,
" MAX(stream_ordering) as stream_ordering" MAX(stream_ordering) as stream_ordering
" FROM events" FROM events
" INNER JOIN receipts_linearized USING (room_id, event_id)" INNER JOIN receipts_linearized USING (room_id, event_id)
" WHERE receipt_type = 'm.read' AND user_id = ?" WHERE receipt_type = 'm.read' AND user_id = ?
" GROUP BY room_id" GROUP BY room_id
") AS rl," ) AS rl,
" event_push_actions AS ep" event_push_actions AS ep
" INNER JOIN events AS e USING (room_id, event_id)" INNER JOIN events AS e USING (room_id, event_id)
" WHERE" WHERE
" ep.room_id = rl.room_id" ep.room_id = rl.room_id
" AND ep.stream_ordering > rl.stream_ordering" AND ep.stream_ordering > rl.stream_ordering
" AND ep.user_id = ?" AND ep.user_id = ?
" AND ep.stream_ordering > ?" AND ep.stream_ordering > ?
" AND ep.stream_ordering <= ?" AND ep.stream_ordering <= ?
" AND ep.notif = 1" AND ep.notif = 1
" ORDER BY ep.stream_ordering DESC LIMIT ?" ORDER BY ep.stream_ordering DESC LIMIT ?
) """
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args) txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@ -526,23 +537,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt( def get_no_receipt(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]: ) -> List[Tuple[str, str, int, str, bool, int]]:
sql = ( sql = """
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
" ep.highlight, e.received_ts" ep.highlight, e.received_ts
" FROM event_push_actions AS ep" FROM event_push_actions AS ep
" INNER JOIN events AS e USING (room_id, event_id)" INNER JOIN events AS e USING (room_id, event_id)
" WHERE" WHERE
" ep.room_id NOT IN (" ep.room_id NOT IN (
" SELECT room_id FROM receipts_linearized" SELECT room_id FROM receipts_linearized
" WHERE receipt_type = 'm.read' AND user_id = ?" WHERE receipt_type = 'm.read' AND user_id = ?
" GROUP BY room_id" GROUP BY room_id
" )"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
) )
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args) txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@ -769,12 +780,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# [10, <none>, 20], we should treat this as being equivalent to # [10, <none>, 20], we should treat this as being equivalent to
# [10, 10, 20]. # [10, 10, 20].
# #
sql = ( sql = """
"SELECT received_ts FROM events" SELECT received_ts FROM events
" WHERE stream_ordering <= ?" WHERE stream_ordering <= ?
" ORDER BY stream_ordering DESC" ORDER BY stream_ordering DESC
" LIMIT 1" LIMIT 1
) """
while range_end - range_start > 0: while range_end - range_start > 0:
middle = (range_end + range_start) // 2 middle = (range_end + range_start) // 2
@ -802,14 +813,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
self, stream_ordering: int self, stream_ordering: int
) -> Optional[int]: ) -> Optional[int]:
def f(txn: LoggingTransaction) -> Optional[Tuple[int]]: def f(txn: LoggingTransaction) -> Optional[Tuple[int]]:
sql = ( sql = """
"SELECT e.received_ts" SELECT e.received_ts
" FROM event_push_actions AS ep" FROM event_push_actions AS ep
" JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id
" WHERE ep.stream_ordering > ? AND notif = 1" WHERE ep.stream_ordering > ? AND notif = 1
" ORDER BY ep.stream_ordering ASC" ORDER BY ep.stream_ordering ASC
" LIMIT 1" LIMIT 1
) """
txn.execute(sql, (stream_ordering,)) txn.execute(sql, (stream_ordering,))
return cast(Optional[Tuple[int]], txn.fetchone()) return cast(Optional[Tuple[int]], txn.fetchone())
@ -858,10 +869,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
Any push actions which predate the user's most recent read receipt are Any push actions which predate the user's most recent read receipt are
now redundant, so we can remove them from `event_push_actions` and now redundant, so we can remove them from `event_push_actions` and
update `event_push_summary`. update `event_push_summary`.
Returns true if all new receipts have been processed.
""" """
limit = 100 limit = 100
# The (inclusive) receipt stream ID that was previously processed..
min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn( min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn(
txn, txn,
table="event_push_summary_last_receipt_stream_id", table="event_push_summary_last_receipt_stream_id",
@ -871,6 +885,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
max_receipts_stream_id = self._receipts_id_gen.get_current_token() max_receipts_stream_id = self._receipts_id_gen.get_current_token()
# The (inclusive) event stream ordering that was previously summarised.
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
sql = """ sql = """
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
FROM receipts_linearized AS r FROM receipts_linearized AS r
@ -895,13 +917,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
) )
rows = txn.fetchall() rows = txn.fetchall()
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
# For each new read receipt we delete push actions from before it and # For each new read receipt we delete push actions from before it and
# recalculate the summary. # recalculate the summary.
for _, room_id, user_id, stream_ordering in rows: for _, room_id, user_id, stream_ordering in rows:
@ -920,10 +935,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(room_id, user_id, stream_ordering), (room_id, user_id, stream_ordering),
) )
# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
notif_count, unread_count = self._get_notif_unread_count_for_user_room( notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
) )
# Replace the previous summary with the new counts.
self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,
table="event_push_summary", table="event_push_summary",
@ -956,10 +974,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return len(rows) < limit return len(rows) < limit
def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool: def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Archives older notifications into event_push_summary. Returns whether """Archives older notifications (from event_push_actions) into event_push_summary.
the archiving process has caught up or not.
Returns whether the archiving process has caught up or not.
""" """
# The (inclusive) event stream ordering that was previously summarised.
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn, txn,
table="event_push_summary_stream_ordering", table="event_push_summary_stream_ordering",
@ -993,19 +1013,31 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering) logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) self._rotate_notifs_before_txn(
txn, old_rotate_stream_ordering, rotate_to_stream_ordering
)
return caught_up return caught_up
def _rotate_notifs_before_txn( def _rotate_notifs_before_txn(
self, txn: LoggingTransaction, rotate_to_stream_ordering: int self,
txn: LoggingTransaction,
old_rotate_stream_ordering: int,
rotate_to_stream_ordering: int,
) -> None: ) -> None:
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( """Archives older notifications (from event_push_actions) into event_push_summary.
txn,
table="event_push_summary_stream_ordering", Any event_push_actions between old_rotate_stream_ordering (exclusive) and
keyvalues={}, rotate_to_stream_ordering (inclusive) will be added to the event_push_summary
retcol="stream_ordering", table.
)
Args:
txn: The database transaction.
old_rotate_stream_ordering: The previous maximum event stream ordering.
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
Returns whether the archiving process has caught up or not.
"""
# Calculate the new counts that should be upserted into event_push_summary # Calculate the new counts that should be upserted into event_push_summary
sql = """ sql = """
@ -1093,9 +1125,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
async def _remove_old_push_actions_that_have_rotated( async def _remove_old_push_actions_that_have_rotated(
self, self,
) -> None: ) -> None:
"""Clear out old push actions that have been summarized.""" """Clear out old push actions that have been summarised."""
# We want to clear out anything that older than a day that *has* already # We want to clear out anything that is older than a day that *has* already
# been rotated. # been rotated.
rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol( rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol(
table="event_push_summary_stream_ordering", table="event_push_summary_stream_ordering",
@ -1215,16 +1247,18 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
# NB. This assumes event_ids are globally unique since # NB. This assumes event_ids are globally unique since
# it makes the query easier to index # it makes the query easier to index
sql = ( sql = """
"SELECT epa.event_id, epa.room_id," SELECT epa.event_id, epa.room_id,
" epa.stream_ordering, epa.topological_ordering," epa.stream_ordering, epa.topological_ordering,
" epa.actions, epa.highlight, epa.profile_tag, e.received_ts" epa.actions, epa.highlight, epa.profile_tag, e.received_ts
" FROM event_push_actions epa, events e" FROM event_push_actions epa, events e
" WHERE epa.event_id = e.event_id" WHERE epa.event_id = e.event_id
" AND epa.user_id = ? %s" AND epa.user_id = ? %s
" AND epa.notif = 1" AND epa.notif = 1
" ORDER BY epa.stream_ordering DESC" ORDER BY epa.stream_ordering DESC
" LIMIT ?" % (before_clause,) LIMIT ?
""" % (
before_clause,
) )
txn.execute(sql, args) txn.execute(sql, args)
return cast( return cast(