Replace or_ignore in simple_insert with simple_upsert (#10442)

Now that we have `simple_upsert` that should be used in preference to
trying to insert and looking for an exception. The main benefit is that
we ERROR message don't get written to postgres logs.

We also have tidy up the return value on `simple_upsert`, rather than
having a tri-state of inserted/not-inserted/unknown.
This commit is contained in:
Erik Johnston 2021-07-22 12:39:50 +01:00 committed by GitHub
parent d8324b8238
commit 38b346a504
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 99 deletions

1
changelog.d/10442.misc Normal file
View file

@ -0,0 +1 @@
Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages.

View file

@ -832,31 +832,16 @@ class DatabasePool:
self, self,
table: str, table: str,
values: Dict[str, Any], values: Dict[str, Any],
or_ignore: bool = False,
desc: str = "simple_insert", desc: str = "simple_insert",
) -> bool: ) -> None:
"""Executes an INSERT query on the named table. """Executes an INSERT query on the named table.
Args: Args:
table: string giving the table name table: string giving the table name
values: dict of new column names and values for them values: dict of new column names and values for them
or_ignore: bool stating whether an exception should be raised
when a conflicting row already exists. If True, False will be
returned by the function instead
desc: description of the transaction, for logging and metrics desc: description of the transaction, for logging and metrics
Returns:
Whether the row was inserted or not. Only useful when `or_ignore` is True
""" """
try: await self.runInteraction(desc, self.simple_insert_txn, table, values)
await self.runInteraction(desc, self.simple_insert_txn, table, values)
except self.engine.module.IntegrityError:
# We have to do or_ignore flag at this layer, since we can't reuse
# a cursor after we receive an error from the db.
if not or_ignore:
raise
return False
return True
@staticmethod @staticmethod
def simple_insert_txn( def simple_insert_txn(
@ -930,7 +915,7 @@ class DatabasePool:
insertion_values: Optional[Dict[str, Any]] = None, insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert", desc: str = "simple_upsert",
lock: bool = True, lock: bool = True,
) -> Optional[bool]: ) -> bool:
""" """
`lock` should generally be set to True (the default), but can be set `lock` should generally be set to True (the default), but can be set
@ -951,8 +936,8 @@ class DatabasePool:
desc: description of the transaction, for logging and metrics desc: description of the transaction, for logging and metrics
lock: True to lock the table when doing the upsert. lock: True to lock the table when doing the upsert.
Returns: Returns:
Native upserts always return None. Emulated upserts return True if a Returns True if a row was inserted or updated (i.e. if `values` is
new entry was created, False if an existing one was updated. not empty then this always returns True)
""" """
insertion_values = insertion_values or {} insertion_values = insertion_values or {}
@ -995,7 +980,7 @@ class DatabasePool:
values: Dict[str, Any], values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None, insertion_values: Optional[Dict[str, Any]] = None,
lock: bool = True, lock: bool = True,
) -> Optional[bool]: ) -> bool:
""" """
Pick the UPSERT method which works best on the platform. Either the Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method. native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
@ -1008,16 +993,15 @@ class DatabasePool:
insertion_values: additional key/values to use only when inserting insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert. lock: True to lock the table when doing the upsert.
Returns: Returns:
Native upserts always return None. Emulated upserts return True if a Returns True if a row was inserted or updated (i.e. if `values` is
new entry was created, False if an existing one was updated. not empty then this always returns True)
""" """
insertion_values = insertion_values or {} insertion_values = insertion_values or {}
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
self.simple_upsert_txn_native_upsert( return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values txn, table, keyvalues, values, insertion_values=insertion_values
) )
return None
else: else:
return self.simple_upsert_txn_emulated( return self.simple_upsert_txn_emulated(
txn, txn,
@ -1045,8 +1029,8 @@ class DatabasePool:
insertion_values: additional key/values to use only when inserting insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert. lock: True to lock the table when doing the upsert.
Returns: Returns:
Returns True if a new entry was created, False if an existing Returns True if a row was inserted or updated (i.e. if `values` is
one was updated. not empty then this always returns True)
""" """
insertion_values = insertion_values or {} insertion_values = insertion_values or {}
@ -1086,8 +1070,7 @@ class DatabasePool:
txn.execute(sql, sqlargs) txn.execute(sql, sqlargs)
if txn.rowcount > 0: if txn.rowcount > 0:
# successfully updated at least one row. return True
return False
# We didn't find any existing rows, so insert a new one # We didn't find any existing rows, so insert a new one
allvalues: Dict[str, Any] = {} allvalues: Dict[str, Any] = {}
@ -1111,15 +1094,19 @@ class DatabasePool:
keyvalues: Dict[str, Any], keyvalues: Dict[str, Any],
values: Dict[str, Any], values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None, insertion_values: Optional[Dict[str, Any]] = None,
) -> None: ) -> bool:
""" """
Use the native UPSERT functionality in recent PostgreSQL versions. Use the native UPSERT functionality in PostgreSQL.
Args: Args:
table: The table to upsert into table: The table to upsert into
keyvalues: The unique key tables and their new values keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting insertion_values: additional key/values to use only when inserting
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
""" """
allvalues: Dict[str, Any] = {} allvalues: Dict[str, Any] = {}
allvalues.update(keyvalues) allvalues.update(keyvalues)
@ -1140,6 +1127,8 @@ class DatabasePool:
) )
txn.execute(sql, list(allvalues.values())) txn.execute(sql, list(allvalues.values()))
return bool(txn.rowcount)
async def simple_upsert_many( async def simple_upsert_many(
self, self,
table: str, table: str,

View file

@ -1078,16 +1078,18 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
return False return False
try: try:
inserted = await self.db_pool.simple_insert( inserted = await self.db_pool.simple_upsert(
"devices", "devices",
values={ keyvalues={
"user_id": user_id, "user_id": user_id,
"device_id": device_id, "device_id": device_id,
},
values={},
insertion_values={
"display_name": initial_device_display_name, "display_name": initial_device_display_name,
"hidden": False, "hidden": False,
}, },
desc="store_device", desc="store_device",
or_ignore=True,
) )
if not inserted: if not inserted:
# if the device already exists, check if it's a real device, or # if the device already exists, check if it's a real device, or
@ -1099,6 +1101,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
) )
if hidden: if hidden:
raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN) raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
self.device_id_exists_cache.set(key, True) self.device_id_exists_cache.set(key, True)
return inserted return inserted
except StoreError: except StoreError:

View file

@ -297,17 +297,13 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
Args: Args:
txn (cursor): txn (cursor):
user_id (str): user to add/update user_id (str): user to add/update
Returns:
bool: True if a new entry was created, False if an
existing one was updated.
""" """
# Am consciously deciding to lock the table on the basis that is ought # Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple # never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity. # upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more # See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,
table="monthly_active_users", table="monthly_active_users",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
@ -322,8 +318,6 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
txn, self.user_last_seen_monthly_active, (user_id,) txn, self.user_last_seen_monthly_active, (user_id,)
) )
return is_insert
async def populate_monthly_active_users(self, user_id): async def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally """Checks on the state of monthly active user limits and optionally
add the user to the monthly active tables add the user to the monthly active tables

View file

@ -134,16 +134,18 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
response_dict: The response, to be encoded into JSON. response_dict: The response, to be encoded into JSON.
""" """
await self.db_pool.simple_insert( await self.db_pool.simple_upsert(
table="received_transactions", table="received_transactions",
values={ keyvalues={
"transaction_id": transaction_id, "transaction_id": transaction_id,
"origin": origin, "origin": origin,
},
values={},
insertion_values={
"response_code": code, "response_code": code,
"response_json": db_binary_type(encode_canonical_json(response_dict)), "response_json": db_binary_type(encode_canonical_json(response_dict)),
"ts": self._clock.time_msec(), "ts": self._clock.time_msec(),
}, },
or_ignore=True,
desc="set_received_txn_response", desc="set_received_txn_response",
) )

View file

@ -377,7 +377,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
avatar_url = None avatar_url = None
def _update_profile_in_user_dir_txn(txn): def _update_profile_in_user_dir_txn(txn):
new_entry = self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,
table="user_directory", table="user_directory",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
@ -388,8 +388,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally # We weight the localpart most highly, then display name and finally
# server name # server name
if self.database_engine.can_native_upsert: sql = """
sql = """
INSERT INTO user_directory_search(user_id, vector) INSERT INTO user_directory_search(user_id, vector)
VALUES (?, VALUES (?,
setweight(to_tsvector('simple', ?), 'A') setweight(to_tsvector('simple', ?), 'A')
@ -397,58 +396,15 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B') || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
""" """
txn.execute( txn.execute(
sql, sql,
( (
user_id, user_id,
get_localpart_from_id(user_id), get_localpart_from_id(user_id),
get_domain_from_id(user_id), get_domain_from_id(user_id),
display_name, display_name,
), ),
) )
else:
# TODO: Remove this code after we've bumped the minimum version
# of postgres to always support upserts, so we can get rid of
# `new_entry` usage
if new_entry is True:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
)
"""
txn.execute(
sql,
(
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
elif new_entry is False:
sql = """
UPDATE user_directory_search
SET vector = setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
WHERE user_id = ?
"""
txn.execute(
sql,
(
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
user_id,
),
)
else:
raise RuntimeError(
"upsert returned None when 'can_native_upsert' is False"
)
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name) if display_name else user_id value = "%s %s" % (user_id, display_name) if display_name else user_id
self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(