forked from MirrorHub/synapse
Clear out old rows from event_push_actions_staging
(#14020)
On matrix.org we have ~5 million stale rows in `event_push_actions_staging`, let's add a background job to make sure we clear them out.
This commit is contained in:
parent
b381701f8c
commit
5a6d025246
5 changed files with 105 additions and 1 deletions
1
changelog.d/14020.misc
Normal file
1
changelog.d/14020.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Clear out stale entries in `event_push_actions_staging` table.
|
|
@ -205,6 +205,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||||
):
|
):
|
||||||
super().__init__(database, db_conn, hs)
|
super().__init__(database, db_conn, hs)
|
||||||
|
|
||||||
|
# Track when the process started.
|
||||||
|
self._started_ts = self._clock.time_msec()
|
||||||
|
|
||||||
# These get correctly set by _find_stream_orderings_for_times_txn
|
# These get correctly set by _find_stream_orderings_for_times_txn
|
||||||
self.stream_ordering_month_ago: Optional[int] = None
|
self.stream_ordering_month_ago: Optional[int] = None
|
||||||
self.stream_ordering_day_ago: Optional[int] = None
|
self.stream_ordering_day_ago: Optional[int] = None
|
||||||
|
@ -224,6 +227,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||||
self._rotate_notifs, 30 * 1000
|
self._rotate_notifs, 30 * 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._clear_old_staging_loop = self._clock.looping_call(
|
||||||
|
self._clear_old_push_actions_staging, 30 * 60 * 1000
|
||||||
|
)
|
||||||
|
|
||||||
self.db_pool.updates.register_background_index_update(
|
self.db_pool.updates.register_background_index_update(
|
||||||
"event_push_summary_unique_index",
|
"event_push_summary_unique_index",
|
||||||
index_name="event_push_summary_unique_index",
|
index_name="event_push_summary_unique_index",
|
||||||
|
@ -791,7 +798,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||||
# can be used to insert into the `event_push_actions_staging` table.
|
# can be used to insert into the `event_push_actions_staging` table.
|
||||||
def _gen_entry(
|
def _gen_entry(
|
||||||
user_id: str, actions: Collection[Union[Mapping, str]]
|
user_id: str, actions: Collection[Union[Mapping, str]]
|
||||||
) -> Tuple[str, str, str, int, int, int, str]:
|
) -> Tuple[str, str, str, int, int, int, str, int]:
|
||||||
is_highlight = 1 if _action_has_highlight(actions) else 0
|
is_highlight = 1 if _action_has_highlight(actions) else 0
|
||||||
notif = 1 if "notify" in actions else 0
|
notif = 1 if "notify" in actions else 0
|
||||||
return (
|
return (
|
||||||
|
@ -802,6 +809,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||||
is_highlight, # highlight column
|
is_highlight, # highlight column
|
||||||
int(count_as_unread), # unread column
|
int(count_as_unread), # unread column
|
||||||
thread_id, # thread_id column
|
thread_id, # thread_id column
|
||||||
|
self._clock.time_msec(), # inserted_ts column
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.db_pool.simple_insert_many(
|
await self.db_pool.simple_insert_many(
|
||||||
|
@ -814,6 +822,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||||
"highlight",
|
"highlight",
|
||||||
"unread",
|
"unread",
|
||||||
"thread_id",
|
"thread_id",
|
||||||
|
"inserted_ts",
|
||||||
),
|
),
|
||||||
values=[
|
values=[
|
||||||
_gen_entry(user_id, actions)
|
_gen_entry(user_id, actions)
|
||||||
|
@ -1340,6 +1349,53 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||||
if done:
|
if done:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@wrap_as_background_process("_clear_old_push_actions_staging")
|
||||||
|
async def _clear_old_push_actions_staging(self) -> None:
|
||||||
|
"""Clear out any old event push actions from the staging table for
|
||||||
|
events that we failed to persist.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# We delete anything more than an hour old, on the assumption that we'll
|
||||||
|
# never take more than an hour to persist an event.
|
||||||
|
delete_before_ts = self._clock.time_msec() - 60 * 60 * 1000
|
||||||
|
|
||||||
|
if self._started_ts > delete_before_ts:
|
||||||
|
# We need to wait for at least an hour before we started deleting,
|
||||||
|
# so that we know it's safe to delete rows with NULL `inserted_ts`.
|
||||||
|
return
|
||||||
|
|
||||||
|
# We don't have an index on `inserted_ts`, instead we assume that the
|
||||||
|
# number of "live" rows in `event_push_actions_staging` is small enough
|
||||||
|
# that an infrequent periodic scan won't cause a problem.
|
||||||
|
#
|
||||||
|
# Note: we also delete any columns with NULL `inserted_ts`, this is safe
|
||||||
|
# as we added a default value to new rows and so they must be at least
|
||||||
|
# an hour old.
|
||||||
|
limit = 1000
|
||||||
|
sql = """
|
||||||
|
DELETE FROM event_push_actions_staging WHERE event_id IN (
|
||||||
|
SELECT event_id FROM event_push_actions_staging WHERE
|
||||||
|
inserted_ts < ? OR inserted_ts IS NULL
|
||||||
|
LIMIT ?
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _clear_old_push_actions_staging_txn(txn: LoggingTransaction) -> bool:
|
||||||
|
txn.execute(sql, (delete_before_ts, limit))
|
||||||
|
return txn.rowcount >= limit
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Returns true if we have more stuff to delete from the table.
|
||||||
|
deleted = await self.db_pool.runInteraction(
|
||||||
|
"_clear_old_push_actions_staging", _clear_old_push_actions_staging_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
if not deleted:
|
||||||
|
return
|
||||||
|
|
||||||
|
# We sleep to ensure that we don't overwhelm the DB.
|
||||||
|
await self._clock.sleep(1.0)
|
||||||
|
|
||||||
|
|
||||||
class EventPushActionsStore(EventPushActionsWorkerStore):
|
class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||||
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
|
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
|
||||||
|
|
|
@ -85,6 +85,7 @@ Changes in SCHEMA_VERSION = 73;
|
||||||
events over federation.
|
events over federation.
|
||||||
- Add indexes to various tables (`event_failed_pull_attempts`, `insertion_events`,
|
- Add indexes to various tables (`event_failed_pull_attempts`, `insertion_events`,
|
||||||
`batch_events`) to make it easy to delete all associated rows when purging a room.
|
`batch_events`) to make it easy to delete all associated rows when purging a room.
|
||||||
|
- `inserted_ts` column is added to `event_push_actions_staging` table.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Add a column so that we know when a push action was inserted, to make it
|
||||||
|
-- easier to clear out old ones.
|
||||||
|
ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT;
|
||||||
|
|
||||||
|
-- We now add a default for *new* rows. We don't do this above as we don't want
|
||||||
|
-- to have to update every remove with the new default.
|
||||||
|
ALTER TABLE event_push_actions_staging ALTER COLUMN inserted_ts SET DEFAULT extract(epoch from now()) * 1000;
|
|
@ -0,0 +1,24 @@
|
||||||
|
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- On SQLite we must be in monolith mode and updating the database from Synapse,
|
||||||
|
-- so its safe to assume that `event_push_actions_staging` should be empty (as
|
||||||
|
-- over restart an event must either have been fully persisted or we'll
|
||||||
|
-- recalculate the push actions)
|
||||||
|
DELETE FROM event_push_actions_staging;
|
||||||
|
|
||||||
|
-- Add a column so that we know when a push action was inserted, to make it
|
||||||
|
-- easier to clear out old ones.
|
||||||
|
ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT;
|
Loading…
Reference in a new issue