0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-08-11 05:14:42 +02:00

Merge pull request #7429 from matrix-org/rav/upsert_for_device_list

use an upsert to update device_lists_outbound_last_success
This commit is contained in:
Richard van der Hoff 2020-05-06 11:53:18 +01:00 committed by GitHub
commit 79007a42b2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 115 additions and 48 deletions

1
changelog.d/7429.misc Normal file
View file

@ -0,0 +1 @@
Improve performance of `mark_as_sent_devices_by_remote`.

View file

@ -55,6 +55,10 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = (
"drop_device_lists_outbound_last_success_non_unique_idx"
)
class DeviceWorkerStore(SQLBaseStore): class DeviceWorkerStore(SQLBaseStore):
def get_device(self, user_id, device_id): def get_device(self, user_id, device_id):
@ -342,32 +346,23 @@ class DeviceWorkerStore(SQLBaseStore):
def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
# We update the device_lists_outbound_last_success with the successfully # We update the device_lists_outbound_last_success with the successfully
# poked users. We do the join to see which users need to be inserted and # poked users.
# which updated.
sql = """ sql = """
SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) SELECT user_id, coalesce(max(o.stream_id), 0)
FROM device_lists_outbound_pokes as o FROM device_lists_outbound_pokes as o
LEFT JOIN device_lists_outbound_last_success as s
USING (destination, user_id)
WHERE destination = ? AND o.stream_id <= ? WHERE destination = ? AND o.stream_id <= ?
GROUP BY user_id GROUP BY user_id
""" """
txn.execute(sql, (destination, stream_id)) txn.execute(sql, (destination, stream_id))
rows = txn.fetchall() rows = txn.fetchall()
sql = """ self.db.simple_upsert_many_txn(
UPDATE device_lists_outbound_last_success txn=txn,
SET stream_id = ? table="device_lists_outbound_last_success",
WHERE destination = ? AND user_id = ? key_names=("destination", "user_id"),
""" key_values=((destination, user_id) for user_id, _ in rows),
txn.executemany(sql, ((row[1], destination, row[0]) for row in rows if row[2])) value_names=("stream_id",),
value_values=((stream_id,) for _, stream_id in rows),
sql = """
INSERT INTO device_lists_outbound_last_success
(destination, user_id, stream_id) VALUES (?, ?, ?)
"""
txn.executemany(
sql, ((destination, row[0], row[1]) for row in rows if not row[2])
) )
# Delete all sent outbound pokes # Delete all sent outbound pokes
@ -725,6 +720,21 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes, BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
) )
# create a unique index on device_lists_outbound_last_success
self.db.updates.register_background_index_update(
"device_lists_outbound_last_success_unique_idx",
index_name="device_lists_outbound_last_success_unique_idx",
table="device_lists_outbound_last_success",
columns=["destination", "user_id"],
unique=True,
)
# once that completes, we can remove the old non-unique index.
self.db.updates.register_background_update_handler(
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX,
self._drop_device_lists_outbound_last_success_non_unique_idx,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
def f(conn): def f(conn):
@ -799,6 +809,20 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
return rows return rows
async def _drop_device_lists_outbound_last_success_non_unique_idx(
self, progress, batch_size
):
def f(txn):
txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx")
await self.db.runInteraction(
"drop_device_lists_outbound_last_success_non_unique_idx", f,
)
await self.db.updates._end_background_update(
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX
)
return 1
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):

View file

@ -0,0 +1,28 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- register a background update which will create a unique index on
-- device_lists_outbound_last_success
INSERT into background_updates (ordering, update_name, progress_json)
VALUES (5804, 'device_lists_outbound_last_success_unique_idx', '{}');
-- once that completes, we can drop the old index.
INSERT into background_updates (ordering, update_name, progress_json, depends_on)
VALUES (
5804,
'drop_device_lists_outbound_last_success_non_unique_idx',
'{}',
'device_lists_outbound_last_success_unique_idx'
);

View file

@ -49,6 +49,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor from synapse.storage.types import Connection, Cursor
from synapse.types import Collection
from synapse.util.stringutils import exception_to_unicode from synapse.util.stringutils import exception_to_unicode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -78,6 +79,7 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
"device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx", "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
"device_lists_remote_cache": "device_lists_remote_cache_unique_idx", "device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
"event_search": "event_search_event_id_idx", "event_search": "event_search_event_id_idx",
"device_lists_outbound_last_success": "device_lists_outbound_last_success_unique_idx",
} }
@ -889,20 +891,24 @@ class Database(object):
txn.execute(sql, list(allvalues.values())) txn.execute(sql, list(allvalues.values()))
def simple_upsert_many_txn( def simple_upsert_many_txn(
self, txn, table, key_names, key_values, value_names, value_values self,
): txn: LoggingTransaction,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[str]],
) -> None:
""" """
Upsert, many times. Upsert, many times.
Args: Args:
table (str): The table to upsert into table: The table to upsert into
key_names (list[str]): The key column names. key_names: The key column names.
key_values (list[list]): A list of each row's key column values. key_values: A list of each row's key column values.
value_names (list[str]): The value column names. If empty, no value_names: The value column names
values will be used, even if value_values is provided. value_values: A list of each row's value column values.
value_values (list[list]): A list of each row's value column values. Ignored if value_names is empty.
Returns:
None
""" """
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert( return self.simple_upsert_many_txn_native_upsert(
@ -914,20 +920,24 @@ class Database(object):
) )
def simple_upsert_many_txn_emulated( def simple_upsert_many_txn_emulated(
self, txn, table, key_names, key_values, value_names, value_values self,
): txn: LoggingTransaction,
table: str,
key_names: Iterable[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[str]],
) -> None:
""" """
Upsert, many times, but without native UPSERT support or batching. Upsert, many times, but without native UPSERT support or batching.
Args: Args:
table (str): The table to upsert into table: The table to upsert into
key_names (list[str]): The key column names. key_names: The key column names.
key_values (list[list]): A list of each row's key column values. key_values: A list of each row's key column values.
value_names (list[str]): The value column names. If empty, no value_names: The value column names
values will be used, even if value_values is provided. value_values: A list of each row's value column values.
value_values (list[list]): A list of each row's value column values. Ignored if value_names is empty.
Returns:
None
""" """
# No value columns, therefore make a blank list so that the following # No value columns, therefore make a blank list so that the following
# zip() works correctly. # zip() works correctly.
@ -941,20 +951,24 @@ class Database(object):
self.simple_upsert_txn_emulated(txn, table, _keys, _vals) self.simple_upsert_txn_emulated(txn, table, _keys, _vals)
def simple_upsert_many_txn_native_upsert( def simple_upsert_many_txn_native_upsert(
self, txn, table, key_names, key_values, value_names, value_values self,
): txn: LoggingTransaction,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
) -> None:
""" """
Upsert, many times, using batching where possible. Upsert, many times, using batching where possible.
Args: Args:
table (str): The table to upsert into table: The table to upsert into
key_names (list[str]): The key column names. key_names: The key column names.
key_values (list[list]): A list of each row's key column values. key_values: A list of each row's key column values.
value_names (list[str]): The value column names. If empty, no value_names: The value column names
values will be used, even if value_values is provided. value_values: A list of each row's value column values.
value_values (list[list]): A list of each row's value column values. Ignored if value_names is empty.
Returns:
None
""" """
allnames = [] # type: List[str] allnames = [] # type: List[str]
allnames.extend(key_names) allnames.extend(key_names)