mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 23:03:51 +01:00
Merge pull request #2218 from matrix-org/rav/event_search_index
Add an index to event_search
This commit is contained in:
commit
d648f65aaf
4 changed files with 63 additions and 12 deletions
|
@ -210,7 +210,9 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
self._background_update_handlers[update_name] = update_handler
|
||||
|
||||
def register_background_index_update(self, update_name, index_name,
|
||||
table, columns, where_clause=None):
|
||||
table, columns, where_clause=None,
|
||||
unique=False,
|
||||
psql_only=False):
|
||||
"""Helper for store classes to do a background index addition
|
||||
|
||||
To use:
|
||||
|
@ -226,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
index_name (str): name of index to add
|
||||
table (str): table to add index to
|
||||
columns (list[str]): columns/expressions to include in index
|
||||
unique (bool): true to make a UNIQUE index
|
||||
psql_only: true to only create this index on psql databases (useful
|
||||
for virtual sqlite tables)
|
||||
"""
|
||||
|
||||
def create_index_psql(conn):
|
||||
|
@ -245,9 +250,11 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
c.execute(sql)
|
||||
|
||||
sql = (
|
||||
"CREATE INDEX CONCURRENTLY %(name)s ON %(table)s"
|
||||
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
|
||||
" ON %(table)s"
|
||||
" (%(columns)s) %(where_clause)s"
|
||||
) % {
|
||||
"unique": "UNIQUE" if unique else "",
|
||||
"name": index_name,
|
||||
"table": table,
|
||||
"columns": ", ".join(columns),
|
||||
|
@ -270,9 +277,10 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
# down at the wrong moment - hance we use IF NOT EXISTS. (SQLite
|
||||
# has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
|
||||
sql = (
|
||||
"CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s"
|
||||
"CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
|
||||
" (%(columns)s)"
|
||||
) % {
|
||||
"unique": "UNIQUE" if unique else "",
|
||||
"name": index_name,
|
||||
"table": table,
|
||||
"columns": ", ".join(columns),
|
||||
|
@ -284,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
|
||||
if isinstance(self.database_engine, engines.PostgresEngine):
|
||||
runner = create_index_psql
|
||||
elif psql_only:
|
||||
runner = None
|
||||
else:
|
||||
runner = create_index_sqlite
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def updater(progress, batch_size):
|
||||
logger.info("Adding index %s to %s", index_name, table)
|
||||
yield self.runWithConnection(runner)
|
||||
if runner is not None:
|
||||
logger.info("Adding index %s to %s", index_name, table)
|
||||
yield self.runWithConnection(runner)
|
||||
yield self._end_background_update(update_name)
|
||||
defer.returnValue(1)
|
||||
|
||||
|
|
|
@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore):
|
|||
where_clause="contains_url = true AND outlier = false",
|
||||
)
|
||||
|
||||
# an event_id index on event_search is useful for the purge_history
|
||||
# api. Plus it means we get to enforce some integrity with a UNIQUE
|
||||
# clause
|
||||
self.register_background_index_update(
|
||||
"event_search_event_id_idx",
|
||||
index_name="event_search_event_id_idx",
|
||||
table="event_search",
|
||||
columns=["event_id"],
|
||||
unique=True,
|
||||
psql_only=True,
|
||||
)
|
||||
|
||||
self._event_persist_queue = _EventPeristenceQueue()
|
||||
|
||||
def persist_events(self, events_and_contexts, backfilled=False):
|
||||
|
@ -2022,6 +2034,8 @@ class EventsStore(SQLBaseStore):
|
|||
400, "topological_ordering is greater than forward extremeties"
|
||||
)
|
||||
|
||||
logger.debug("[purge] looking for events to delete")
|
||||
|
||||
txn.execute(
|
||||
"SELECT event_id, state_key FROM events"
|
||||
" LEFT JOIN state_events USING (room_id, event_id)"
|
||||
|
@ -2030,6 +2044,14 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
event_rows = txn.fetchall()
|
||||
|
||||
to_delete = [
|
||||
(event_id,) for event_id, state_key in event_rows
|
||||
if state_key is None and not self.hs.is_mine_id(event_id)
|
||||
]
|
||||
logger.info(
|
||||
"[purge] found %i events before cutoff, of which %i are remote"
|
||||
" non-state events to delete", len(event_rows), len(to_delete))
|
||||
|
||||
for event_id, state_key in event_rows:
|
||||
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
|
||||
|
||||
|
@ -2080,6 +2102,7 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
state_rows = txn.fetchall()
|
||||
logger.debug("[purge] found %i redundant state groups", len(state_rows))
|
||||
|
||||
# make a set of the redundant state groups, so that we can look them up
|
||||
# efficiently
|
||||
|
@ -2173,10 +2196,6 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
# Delete all remote non-state events
|
||||
to_delete = [
|
||||
(event_id,) for event_id, state_key in event_rows
|
||||
if state_key is None and not self.hs.is_mine_id(event_id)
|
||||
]
|
||||
for table in (
|
||||
"events",
|
||||
"event_json",
|
||||
|
@ -2192,7 +2211,7 @@ class EventsStore(SQLBaseStore):
|
|||
"event_signatures",
|
||||
"rejections",
|
||||
):
|
||||
logger.debug("[purge] removing non-state events from %s", table)
|
||||
logger.debug("[purge] removing remote non-state events from %s", table)
|
||||
|
||||
txn.executemany(
|
||||
"DELETE FROM %s WHERE event_id = ?" % (table,),
|
||||
|
@ -2200,7 +2219,7 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
# Mark all state and own events as outliers
|
||||
logger.debug("[purge] marking events as outliers")
|
||||
logger.debug("[purge] marking remaining events as outliers")
|
||||
txn.executemany(
|
||||
"UPDATE events SET outlier = ?"
|
||||
" WHERE event_id = ?",
|
||||
|
@ -2210,7 +2229,7 @@ class EventsStore(SQLBaseStore):
|
|||
]
|
||||
)
|
||||
|
||||
logger.debug("[purge] done")
|
||||
logger.info("[purge] done")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def is_event_after(self, event_id1, event_id2):
|
||||
|
|
|
@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref;
|
|||
-- and is used incredibly rarely.
|
||||
DROP INDEX IF EXISTS events_order_topo_stream_room;
|
||||
|
||||
-- an equivalent index to this actually gets re-created in delta 41, because it
|
||||
-- turned out that deleting it wasn't a great plan :/. In any case, let's
|
||||
-- delete it here, and delta 41 will create a new one with an added UNIQUE
|
||||
-- constraint
|
||||
DROP INDEX IF EXISTS event_search_ev_idx;
|
||||
"""
|
||||
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
/* Copyright 2017 Vector Creations Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
INSERT into background_updates (update_name, progress_json)
|
||||
VALUES ('event_search_event_id_idx', '{}');
|
Loading…
Reference in a new issue