diff --git a/changelog.d/6141.bugfix b/changelog.d/6141.bugfix new file mode 100644 index 0000000000..c93920b7b5 --- /dev/null +++ b/changelog.d/6141.bugfix @@ -0,0 +1 @@ +Fix bad performance of censoring redactions background task. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ddf7ab6479..2e485c8644 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1389,6 +1389,18 @@ class EventsStore( ], ) + for event, _ in events_and_contexts: + if not event.internal_metadata.is_redacted(): + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + self._simple_update_txn( + txn, + table="redactions", + keyvalues={"redacts": event.event_id}, + updatevalues={"have_censored": False}, + ) + def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were rejected @@ -1552,9 +1564,15 @@ class EventsStore( def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) - txn.execute( - "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts), + + self._simple_insert_txn( + txn, + table="redactions", + values={ + "event_id": event.event_id, + "redacts": event.redacts, + "received_ts": self._clock.time_msec(), + }, ) @defer.inlineCallbacks @@ -1571,36 +1589,29 @@ class EventsStore( if self.hs.config.redaction_retention_period is None: return - max_pos = yield self.find_first_stream_ordering_after_ts( - self._clock.time_msec() - self.hs.config.redaction_retention_period - ) + before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period # We fetch all redactions that: # 1. point to an event we have, - # 2. has a stream ordering from before the cut off, and + # 2. has a received_ts from before the cut off, and # 3. we haven't yet censored. # # This is limited to 100 events to ensure that we don't try and do too # much at once. We'll get called again so this should eventually catch # up. - # - # We use the range [-max_pos, max_pos] to handle backfilled events, - # which are given negative stream ordering. sql = """ - SELECT redact_event.event_id, redacts FROM redactions - INNER JOIN events AS redact_event USING (event_id) - INNER JOIN events AS original_event ON ( - redact_event.room_id = original_event.room_id - AND redacts = original_event.event_id + SELECT redactions.event_id, redacts FROM redactions + LEFT JOIN events AS original_event ON ( + redacts = original_event.event_id ) WHERE NOT have_censored - AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ? - ORDER BY redact_event.stream_ordering ASC + AND redactions.received_ts <= ? + ORDER BY redactions.received_ts ASC LIMIT ? """ rows = yield self._execute( - "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 + "_censor_redactions_fetch", None, sql, before_ts, 100 ) updates = [] diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py index 6587f31e2b..5717baf48c 100644 --- a/synapse/storage/events_bg_updates.py +++ b/synapse/storage/events_bg_updates.py @@ -67,6 +67,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update ) + self.register_background_update_handler( + "redactions_received_ts", self._redactions_received_ts + ) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] @@ -397,3 +401,60 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): ) return num_handled + + @defer.inlineCallbacks + def _redactions_received_ts(self, progress, batch_size): + """Handles filling out the `received_ts` column in redactions. + """ + last_event_id = progress.get("last_event_id", "") + + def _redactions_received_ts_txn(txn): + # Fetch the set of event IDs that we want to update + sql = """ + SELECT event_id FROM redactions + WHERE event_id > ? + ORDER BY event_id ASC + LIMIT ? + """ + + txn.execute(sql, (last_event_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + upper_event_id, = rows[-1] + + # Update the redactions with the received_ts. + # + # Note: Not all events have an associated received_ts, so we + # fallback to using origin_server_ts. If we for some reason don't + # have an origin_server_ts, lets just use the current timestamp. + # + # We don't want to leave it null, as then we'll never try and + # censor those redactions. + sql = """ + UPDATE redactions + SET received_ts = ( + SELECT COALESCE(received_ts, origin_server_ts, ?) FROM events + WHERE events.event_id = redactions.event_id + ) + WHERE ? <= event_id AND event_id <= ? + """ + + txn.execute(sql, (self._clock.time_msec(), last_event_id, upper_event_id)) + + self._background_update_progress_txn( + txn, "redactions_received_ts", {"last_event_id": upper_event_id} + ) + + return len(rows) + + count = yield self.runInteraction( + "_redactions_received_ts", _redactions_received_ts_txn + ) + + if not count: + yield self._end_background_update("redactions_received_ts") + + return count diff --git a/synapse/storage/schema/delta/56/redaction_censor2.sql b/synapse/storage/schema/delta/56/redaction_censor2.sql new file mode 100644 index 0000000000..77a5eca499 --- /dev/null +++ b/synapse/storage/schema/delta/56/redaction_censor2.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE redactions ADD COLUMN received_ts BIGINT; +CREATE INDEX redactions_have_censored_ts ON redactions(received_ts) WHERE not have_censored; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('redactions_received_ts', '{}');