Use execute_values more in PostgreSQL (#10754)

`execute_values` is a faster version of `execute_batch`.
This commit is contained in:
Erik Johnston 2021-09-03 16:35:49 +01:00 committed by GitHub
parent 2cb85bdf75
commit 0eae330a26
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 19 deletions

1
changelog.d/10754.misc Normal file
View file

@ -0,0 +1 @@
Minor speed ups when joining large rooms over federation.

View file

@ -280,18 +280,18 @@ class LoggingTransaction:
else: else:
self.executemany(sql, args) self.executemany(sql, args)
def execute_values(self, sql: str, *args: Any) -> List[Tuple]: def execute_values(self, sql: str, *args: Any, fetch: bool = True) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when """Corresponds to psycopg2.extras.execute_values. Only available when
using postgres. using postgres.
Always sets fetch=True when caling `execute_values`, so will return the The `fetch` parameter must be set to False if the query does not return
results. rows (e.g. INSERTs).
""" """
assert isinstance(self.database_engine, PostgresEngine) assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values # type: ignore from psycopg2.extras import execute_values # type: ignore
return self._do_execute( return self._do_execute(
lambda *x: execute_values(self.txn, *x, fetch=True), sql, *args lambda *x: execute_values(self.txn, *x, fetch=fetch), sql, *args
) )
def execute(self, sql: str, *args: Any) -> None: def execute(self, sql: str, *args: Any) -> None:
@ -920,6 +920,16 @@ class DatabasePool:
if k != keys[0]: if k != keys[0]:
raise RuntimeError("All items must have the same keys") raise RuntimeError("All items must have the same keys")
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ?" % (
table,
", ".join(k for k in keys[0]),
)
txn.execute_values(sql, vals, fetch=False)
else:
sql = "INSERT INTO %s (%s) VALUES(%s)" % ( sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table, table,
", ".join(k for k in keys[0]), ", ".join(k for k in keys[0]),
@ -1281,6 +1291,24 @@ class DatabasePool:
k + "=EXCLUDED." + k for k in value_names k + "=EXCLUDED." + k for k in value_names
) )
args = []
for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ? ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join(key_names),
latter,
)
txn.execute_values(sql, args, fetch=False)
else:
sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % ( sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
table, table,
", ".join(k for k in allnames), ", ".join(k for k in allnames),
@ -1289,11 +1317,6 @@ class DatabasePool:
latter, latter,
) )
args = []
for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))
return txn.execute_batch(sql, args) return txn.execute_batch(sql, args)
@overload @overload