0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-15 00:33:50 +01:00

Merge pull request #7428 from matrix-org/rav/cross_signing_keys_cache

Make get_e2e_cross_signing_key delegate to get_e2e_cross_signing_keys_bulk
This commit is contained in:
Richard van der Hoff 2020-05-06 12:00:01 +01:00 committed by GitHub
commit fa0b2bd28d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 77 deletions

1
changelog.d/7428.misc Normal file
View file

@ -0,0 +1 @@
Improve performance of `get_e2e_cross_signing_key`.

View file

@ -25,7 +25,9 @@ from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import make_in_list_sql_clause
from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.iterutils import batch_iter
class EndToEndKeyWorkerStore(SQLBaseStore): class EndToEndKeyWorkerStore(SQLBaseStore):
@ -268,53 +270,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"count_e2e_one_time_keys", _count_e2e_one_time_keys "count_e2e_one_time_keys", _count_e2e_one_time_keys
) )
def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None): @defer.inlineCallbacks
"""Returns a user's cross-signing key.
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
user_id (str): the user whose key is being requested
key_type (str): the type of key that is being requested: either 'master'
for a master key, 'self_signing' for a self-signing key, or
'user_signing' for a user-signing key
from_user_id (str): if specified, signatures made by this user on
the key will be included in the result
Returns:
dict of the key data or None if not found
"""
sql = (
"SELECT keydata "
" FROM e2e_cross_signing_keys "
" WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1"
)
txn.execute(sql, (user_id, key_type))
row = txn.fetchone()
if not row:
return None
key = json.loads(row[0])
device_id = None
for k in key["keys"].values():
device_id = k
if from_user_id is not None:
sql = (
"SELECT key_id, signature "
" FROM e2e_cross_signing_signatures "
" WHERE user_id = ? "
" AND target_user_id = ? "
" AND target_device_id = ? "
)
txn.execute(sql, (from_user_id, user_id, device_id))
row = txn.fetchone()
if row:
key.setdefault("signatures", {}).setdefault(from_user_id, {})[
row[0]
] = row[1]
return key
def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None): def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
"""Returns a user's cross-signing key. """Returns a user's cross-signing key.
@ -329,13 +285,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Returns: Returns:
dict of the key data or None if not found dict of the key data or None if not found
""" """
return self.db.runInteraction( res = yield self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id)
"get_e2e_cross_signing_key", user_keys = res.get(user_id)
self._get_e2e_cross_signing_key_txn, if not user_keys:
user_id, return None
key_type, return user_keys.get(key_type)
from_user_id,
)
@cached(num_args=1) @cached(num_args=1)
def _get_bare_e2e_cross_signing_keys(self, user_id): def _get_bare_e2e_cross_signing_keys(self, user_id):
@ -391,26 +345,24 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
""" """
result = {} result = {}
batch_size = 100 for user_chunk in batch_iter(user_ids, 100):
chunks = [ clause, params = make_in_list_sql_clause(
user_ids[i : i + batch_size] for i in range(0, len(user_ids), batch_size) txn.database_engine, "k.user_id", user_chunk
] )
for user_chunk in chunks: sql = (
sql = """ """
SELECT k.user_id, k.keytype, k.keydata, k.stream_id SELECT k.user_id, k.keytype, k.keydata, k.stream_id
FROM e2e_cross_signing_keys k FROM e2e_cross_signing_keys k
INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id
FROM e2e_cross_signing_keys FROM e2e_cross_signing_keys
GROUP BY user_id, keytype) s GROUP BY user_id, keytype) s
USING (user_id, stream_id, keytype) USING (user_id, stream_id, keytype)
WHERE k.user_id IN (%s) WHERE
""" % ( """
",".join("?" for u in user_chunk), + clause
) )
query_params = []
query_params.extend(user_chunk)
txn.execute(sql, query_params) txn.execute(sql, params)
rows = self.db.cursor_to_dict(txn) rows = self.db.cursor_to_dict(txn)
for row in rows: for row in rows:
@ -453,15 +405,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
device_id = k device_id = k
devices[(user_id, device_id)] = key_type devices[(user_id, device_id)] = key_type
device_list = list(devices) for batch in batch_iter(devices.keys(), size=100):
# split into batches
batch_size = 100
chunks = [
device_list[i : i + batch_size]
for i in range(0, len(device_list), batch_size)
]
for user_chunk in chunks:
sql = """ sql = """
SELECT target_user_id, target_device_id, key_id, signature SELECT target_user_id, target_device_id, key_id, signature
FROM e2e_cross_signing_signatures FROM e2e_cross_signing_signatures
@ -469,11 +413,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
AND (%s) AND (%s)
""" % ( """ % (
" OR ".join( " OR ".join(
"(target_user_id = ? AND target_device_id = ?)" for d in devices "(target_user_id = ? AND target_device_id = ?)" for _ in batch
) )
) )
query_params = [from_user_id] query_params = [from_user_id]
for item in devices: for item in batch:
# item is a (user_id, device_id) tuple # item is a (user_id, device_id) tuple
query_params.extend(item) query_params.extend(item)