forked from MirrorHub/synapse
parent
c9f11d09fc
commit
9a6f2be572
7 changed files with 142 additions and 7 deletions
1
changelog.d/5855.misc
Normal file
1
changelog.d/5855.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Opentracing for room and e2e keys.
|
|
@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
|
||||||
from synapse.federation.units import Edu, Transaction
|
from synapse.federation.units import Edu, Transaction
|
||||||
from synapse.http.endpoint import parse_server_name
|
from synapse.http.endpoint import parse_server_name
|
||||||
from synapse.logging.context import nested_logging_context
|
from synapse.logging.context import nested_logging_context
|
||||||
|
from synapse.logging.opentracing import log_kv, trace
|
||||||
from synapse.logging.utils import log_function
|
from synapse.logging.utils import log_function
|
||||||
from synapse.replication.http.federation import (
|
from synapse.replication.http.federation import (
|
||||||
ReplicationFederationSendEduRestServlet,
|
ReplicationFederationSendEduRestServlet,
|
||||||
|
@ -507,6 +508,7 @@ class FederationServer(FederationBase):
|
||||||
def on_query_user_devices(self, origin, user_id):
|
def on_query_user_devices(self, origin, user_id):
|
||||||
return self.on_query_request("user_devices", user_id)
|
return self.on_query_request("user_devices", user_id)
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_claim_client_keys(self, origin, content):
|
def on_claim_client_keys(self, origin, content):
|
||||||
|
@ -515,6 +517,7 @@ class FederationServer(FederationBase):
|
||||||
for device_id, algorithm in device_keys.items():
|
for device_id, algorithm in device_keys.items():
|
||||||
query.append((user_id, device_id, algorithm))
|
query.append((user_id, device_id, algorithm))
|
||||||
|
|
||||||
|
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
|
||||||
results = yield self.store.claim_e2e_one_time_keys(query)
|
results = yield self.store.claim_e2e_one_time_keys(query)
|
||||||
|
|
||||||
json_result = {}
|
json_result = {}
|
||||||
|
|
|
@ -24,6 +24,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import CodeMessageException, SynapseError
|
from synapse.api.errors import CodeMessageException, SynapseError
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
|
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
|
||||||
from synapse.types import UserID, get_domain_from_id
|
from synapse.types import UserID, get_domain_from_id
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
@ -46,6 +47,7 @@ class E2eKeysHandler(object):
|
||||||
"client_keys", self.on_federation_query_client_keys
|
"client_keys", self.on_federation_query_client_keys
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def query_devices(self, query_body, timeout):
|
def query_devices(self, query_body, timeout):
|
||||||
""" Handle a device key query from a client
|
""" Handle a device key query from a client
|
||||||
|
@ -81,6 +83,9 @@ class E2eKeysHandler(object):
|
||||||
else:
|
else:
|
||||||
remote_queries[user_id] = device_ids
|
remote_queries[user_id] = device_ids
|
||||||
|
|
||||||
|
set_tag("local_key_query", local_query)
|
||||||
|
set_tag("remote_key_query", remote_queries)
|
||||||
|
|
||||||
# First get local devices.
|
# First get local devices.
|
||||||
failures = {}
|
failures = {}
|
||||||
results = {}
|
results = {}
|
||||||
|
@ -121,6 +126,7 @@ class E2eKeysHandler(object):
|
||||||
r[user_id] = remote_queries[user_id]
|
r[user_id] = remote_queries[user_id]
|
||||||
|
|
||||||
# Now fetch any devices that we don't have in our cache
|
# Now fetch any devices that we don't have in our cache
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def do_remote_query(destination):
|
def do_remote_query(destination):
|
||||||
"""This is called when we are querying the device list of a user on
|
"""This is called when we are querying the device list of a user on
|
||||||
|
@ -185,6 +191,8 @@ class E2eKeysHandler(object):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
failure = _exception_to_failure(e)
|
failure = _exception_to_failure(e)
|
||||||
failures[destination] = failure
|
failures[destination] = failure
|
||||||
|
set_tag("error", True)
|
||||||
|
set_tag("reason", failure)
|
||||||
|
|
||||||
yield make_deferred_yieldable(
|
yield make_deferred_yieldable(
|
||||||
defer.gatherResults(
|
defer.gatherResults(
|
||||||
|
@ -198,6 +206,7 @@ class E2eKeysHandler(object):
|
||||||
|
|
||||||
return {"device_keys": results, "failures": failures}
|
return {"device_keys": results, "failures": failures}
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def query_local_devices(self, query):
|
def query_local_devices(self, query):
|
||||||
"""Get E2E device keys for local users
|
"""Get E2E device keys for local users
|
||||||
|
@ -210,6 +219,7 @@ class E2eKeysHandler(object):
|
||||||
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
||||||
map from user_id -> device_id -> device details
|
map from user_id -> device_id -> device details
|
||||||
"""
|
"""
|
||||||
|
set_tag("local_query", query)
|
||||||
local_query = []
|
local_query = []
|
||||||
|
|
||||||
result_dict = {}
|
result_dict = {}
|
||||||
|
@ -217,6 +227,14 @@ class E2eKeysHandler(object):
|
||||||
# we use UserID.from_string to catch invalid user ids
|
# we use UserID.from_string to catch invalid user ids
|
||||||
if not self.is_mine(UserID.from_string(user_id)):
|
if not self.is_mine(UserID.from_string(user_id)):
|
||||||
logger.warning("Request for keys for non-local user %s", user_id)
|
logger.warning("Request for keys for non-local user %s", user_id)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Requested a local key for a user which"
|
||||||
|
" was not local to the homeserver",
|
||||||
|
"user_id": user_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
set_tag("error", True)
|
||||||
raise SynapseError(400, "Not a user here")
|
raise SynapseError(400, "Not a user here")
|
||||||
|
|
||||||
if not device_ids:
|
if not device_ids:
|
||||||
|
@ -241,6 +259,7 @@ class E2eKeysHandler(object):
|
||||||
r["unsigned"]["device_display_name"] = display_name
|
r["unsigned"]["device_display_name"] = display_name
|
||||||
result_dict[user_id][device_id] = r
|
result_dict[user_id][device_id] = r
|
||||||
|
|
||||||
|
log_kv(results)
|
||||||
return result_dict
|
return result_dict
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -251,6 +270,7 @@ class E2eKeysHandler(object):
|
||||||
res = yield self.query_local_devices(device_keys_query)
|
res = yield self.query_local_devices(device_keys_query)
|
||||||
return {"device_keys": res}
|
return {"device_keys": res}
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def claim_one_time_keys(self, query, timeout):
|
def claim_one_time_keys(self, query, timeout):
|
||||||
local_query = []
|
local_query = []
|
||||||
|
@ -265,6 +285,9 @@ class E2eKeysHandler(object):
|
||||||
domain = get_domain_from_id(user_id)
|
domain = get_domain_from_id(user_id)
|
||||||
remote_queries.setdefault(domain, {})[user_id] = device_keys
|
remote_queries.setdefault(domain, {})[user_id] = device_keys
|
||||||
|
|
||||||
|
set_tag("local_key_query", local_query)
|
||||||
|
set_tag("remote_key_query", remote_queries)
|
||||||
|
|
||||||
results = yield self.store.claim_e2e_one_time_keys(local_query)
|
results = yield self.store.claim_e2e_one_time_keys(local_query)
|
||||||
|
|
||||||
json_result = {}
|
json_result = {}
|
||||||
|
@ -276,8 +299,10 @@ class E2eKeysHandler(object):
|
||||||
key_id: json.loads(json_bytes)
|
key_id: json.loads(json_bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def claim_client_keys(destination):
|
def claim_client_keys(destination):
|
||||||
|
set_tag("destination", destination)
|
||||||
device_keys = remote_queries[destination]
|
device_keys = remote_queries[destination]
|
||||||
try:
|
try:
|
||||||
remote_result = yield self.federation.claim_client_keys(
|
remote_result = yield self.federation.claim_client_keys(
|
||||||
|
@ -290,6 +315,8 @@ class E2eKeysHandler(object):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
failure = _exception_to_failure(e)
|
failure = _exception_to_failure(e)
|
||||||
failures[destination] = failure
|
failures[destination] = failure
|
||||||
|
set_tag("error", True)
|
||||||
|
set_tag("reason", failure)
|
||||||
|
|
||||||
yield make_deferred_yieldable(
|
yield make_deferred_yieldable(
|
||||||
defer.gatherResults(
|
defer.gatherResults(
|
||||||
|
@ -313,9 +340,11 @@ class E2eKeysHandler(object):
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log_kv({"one_time_keys": json_result, "failures": failures})
|
||||||
return {"one_time_keys": json_result, "failures": failures}
|
return {"one_time_keys": json_result, "failures": failures}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@tag_args
|
||||||
def upload_keys_for_user(self, user_id, device_id, keys):
|
def upload_keys_for_user(self, user_id, device_id, keys):
|
||||||
|
|
||||||
time_now = self.clock.time_msec()
|
time_now = self.clock.time_msec()
|
||||||
|
@ -329,6 +358,13 @@ class E2eKeysHandler(object):
|
||||||
user_id,
|
user_id,
|
||||||
time_now,
|
time_now,
|
||||||
)
|
)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Updating device_keys for user.",
|
||||||
|
"user_id": user_id,
|
||||||
|
"device_id": device_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
# TODO: Sign the JSON with the server key
|
# TODO: Sign the JSON with the server key
|
||||||
changed = yield self.store.set_e2e_device_keys(
|
changed = yield self.store.set_e2e_device_keys(
|
||||||
user_id, device_id, time_now, device_keys
|
user_id, device_id, time_now, device_keys
|
||||||
|
@ -336,12 +372,24 @@ class E2eKeysHandler(object):
|
||||||
if changed:
|
if changed:
|
||||||
# Only notify about device updates *if* the keys actually changed
|
# Only notify about device updates *if* the keys actually changed
|
||||||
yield self.device_handler.notify_device_update(user_id, [device_id])
|
yield self.device_handler.notify_device_update(user_id, [device_id])
|
||||||
|
else:
|
||||||
|
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
|
||||||
one_time_keys = keys.get("one_time_keys", None)
|
one_time_keys = keys.get("one_time_keys", None)
|
||||||
if one_time_keys:
|
if one_time_keys:
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Updating one_time_keys for device.",
|
||||||
|
"user_id": user_id,
|
||||||
|
"device_id": device_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
yield self._upload_one_time_keys_for_user(
|
yield self._upload_one_time_keys_for_user(
|
||||||
user_id, device_id, time_now, one_time_keys
|
user_id, device_id, time_now, one_time_keys
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
log_kv(
|
||||||
|
{"message": "Did not update one_time_keys", "reason": "no keys given"}
|
||||||
|
)
|
||||||
|
|
||||||
# the device should have been registered already, but it may have been
|
# the device should have been registered already, but it may have been
|
||||||
# deleted due to a race with a DELETE request. Or we may be using an
|
# deleted due to a race with a DELETE request. Or we may be using an
|
||||||
|
@ -352,6 +400,7 @@ class E2eKeysHandler(object):
|
||||||
|
|
||||||
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||||
|
|
||||||
|
set_tag("one_time_key_counts", result)
|
||||||
return {"one_time_key_counts": result}
|
return {"one_time_key_counts": result}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -395,6 +444,7 @@ class E2eKeysHandler(object):
|
||||||
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
|
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
|
||||||
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
|
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ from synapse.api.errors import (
|
||||||
StoreError,
|
StoreError,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
|
from synapse.logging.opentracing import log_kv, trace
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
|
||||||
# changed.
|
# changed.
|
||||||
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
|
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||||
|
@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
|
||||||
user_id, version, room_id, session_id
|
user_id, version, room_id, session_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log_kv(results)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||||
|
@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
|
||||||
with (yield self._upload_linearizer.queue(user_id)):
|
with (yield self._upload_linearizer.queue(user_id)):
|
||||||
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
|
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def upload_room_keys(self, user_id, version, room_keys):
|
def upload_room_keys(self, user_id, version, room_keys):
|
||||||
"""Bulk upload a list of room keys into a given backup version, asserting
|
"""Bulk upload a list of room keys into a given backup version, asserting
|
||||||
|
@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
|
||||||
session_id(str): the session whose room_key we're setting
|
session_id(str): the session whose room_key we're setting
|
||||||
room_key(dict): the room_key being set
|
room_key(dict): the room_key being set
|
||||||
"""
|
"""
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Trying to upload room key",
|
||||||
|
"room_id": room_id,
|
||||||
|
"session_id": session_id,
|
||||||
|
"user_id": user_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
# get the room_key for this particular row
|
# get the room_key for this particular row
|
||||||
current_room_key = None
|
current_room_key = None
|
||||||
try:
|
try:
|
||||||
|
@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
|
||||||
)
|
)
|
||||||
except StoreError as e:
|
except StoreError as e:
|
||||||
if e.code == 404:
|
if e.code == 404:
|
||||||
pass
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Room key not found.",
|
||||||
|
"room_id": room_id,
|
||||||
|
"user_id": user_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if self._should_replace_room_key(current_room_key, room_key):
|
if self._should_replace_room_key(current_room_key, room_key):
|
||||||
|
log_kv({"message": "Replacing room key."})
|
||||||
yield self.store.set_e2e_room_key(
|
yield self.store.set_e2e_room_key(
|
||||||
user_id, version, room_id, session_id, room_key
|
user_id, version, room_id, session_id, room_key
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
log_kv({"message": "Not replacing room_key."})
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _should_replace_room_key(current_room_key, room_key):
|
def _should_replace_room_key(current_room_key, room_key):
|
||||||
|
@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_version(self, user_id, version_info):
|
def create_version(self, user_id, version_info):
|
||||||
"""Create a new backup version. This automatically becomes the new
|
"""Create a new backup version. This automatically becomes the new
|
||||||
|
@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
|
||||||
raise
|
raise
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_version(self, user_id, version=None):
|
def delete_version(self, user_id, version=None):
|
||||||
"""Deletes a given version of the user's e2e_room_keys backup
|
"""Deletes a given version of the user's e2e_room_keys backup
|
||||||
|
@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update_version(self, user_id, version, version_info):
|
def update_version(self, user_id, version, version_info):
|
||||||
"""Update the info about a given version of the user's backup
|
"""Update the info about a given version of the user's backup
|
||||||
|
|
|
@ -24,6 +24,7 @@ from synapse.http.servlet import (
|
||||||
parse_json_object_from_request,
|
parse_json_object_from_request,
|
||||||
parse_string,
|
parse_string,
|
||||||
)
|
)
|
||||||
|
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
|
||||||
from synapse.types import StreamToken
|
from synapse.types import StreamToken
|
||||||
|
|
||||||
from ._base import client_patterns
|
from ._base import client_patterns
|
||||||
|
@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
|
||||||
self.auth = hs.get_auth()
|
self.auth = hs.get_auth()
|
||||||
self.e2e_keys_handler = hs.get_e2e_keys_handler()
|
self.e2e_keys_handler = hs.get_e2e_keys_handler()
|
||||||
|
|
||||||
|
@trace_using_operation_name("upload_keys")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_POST(self, request, device_id):
|
def on_POST(self, request, device_id):
|
||||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||||
|
@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
|
||||||
# passing the device_id here is deprecated; however, we allow it
|
# passing the device_id here is deprecated; however, we allow it
|
||||||
# for now for compatibility with older clients.
|
# for now for compatibility with older clients.
|
||||||
if requester.device_id is not None and device_id != requester.device_id:
|
if requester.device_id is not None and device_id != requester.device_id:
|
||||||
|
set_tag("error", True)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Client uploading keys for a different device",
|
||||||
|
"logged_in_id": requester.device_id,
|
||||||
|
"key_being_uploaded": device_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Client uploading keys for a different device "
|
"Client uploading keys for a different device "
|
||||||
"(logged in as %s, uploading for %s)",
|
"(logged in as %s, uploading for %s)",
|
||||||
|
@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
|
||||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||||
|
|
||||||
from_token_string = parse_string(request, "from")
|
from_token_string = parse_string(request, "from")
|
||||||
|
set_tag("from", from_token_string)
|
||||||
|
|
||||||
# We want to enforce they do pass us one, but we ignore it and return
|
# We want to enforce they do pass us one, but we ignore it and return
|
||||||
# changes after the "to" as well as before.
|
# changes after the "to" as well as before.
|
||||||
parse_string(request, "to")
|
set_tag("to", parse_string(request, "to"))
|
||||||
|
|
||||||
from_token = StreamToken.from_string(from_token_string)
|
from_token = StreamToken.from_string(from_token_string)
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ import json
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
|
from synapse.logging.opentracing import log_kv, trace
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||||
},
|
},
|
||||||
lock=False,
|
lock=False,
|
||||||
)
|
)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Set room key",
|
||||||
|
"room_id": room_id,
|
||||||
|
"session_id": session_id,
|
||||||
|
"room_key": room_key,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||||
|
@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||||
|
|
||||||
return sessions
|
return sessions
|
||||||
|
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||||
|
@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||||
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
|
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
def create_e2e_room_keys_version(self, user_id, info):
|
def create_e2e_room_keys_version(self, user_id, info):
|
||||||
"""Atomically creates a new version of this user's e2e_room_keys store
|
"""Atomically creates a new version of this user's e2e_room_keys store
|
||||||
with the given version info.
|
with the given version info.
|
||||||
|
@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||||
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
|
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
def update_e2e_room_keys_version(self, user_id, version, info):
|
def update_e2e_room_keys_version(self, user_id, version, info):
|
||||||
"""Update a given backup version
|
"""Update a given backup version
|
||||||
|
|
||||||
|
@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||||
desc="update_e2e_room_keys_version",
|
desc="update_e2e_room_keys_version",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
def delete_e2e_room_keys_version(self, user_id, version=None):
|
def delete_e2e_room_keys_version(self, user_id, version=None):
|
||||||
"""Delete a given backup version of the user's room keys.
|
"""Delete a given backup version of the user's room keys.
|
||||||
Doesn't delete their actual key data.
|
Doesn't delete their actual key data.
|
||||||
|
|
|
@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
|
|
||||||
from ._base import SQLBaseStore, db_to_json
|
from ._base import SQLBaseStore, db_to_json
|
||||||
|
|
||||||
|
|
||||||
class EndToEndKeyWorkerStore(SQLBaseStore):
|
class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||||
|
@trace
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_e2e_device_keys(
|
def get_e2e_device_keys(
|
||||||
self, query_list, include_all_devices=False, include_deleted_devices=False
|
self, query_list, include_all_devices=False, include_deleted_devices=False
|
||||||
|
@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||||
Dict mapping from user-id to dict mapping from device_id to
|
Dict mapping from user-id to dict mapping from device_id to
|
||||||
dict containing "key_json", "device_display_name".
|
dict containing "key_json", "device_display_name".
|
||||||
"""
|
"""
|
||||||
|
set_tag("query_list", query_list)
|
||||||
if not query_list:
|
if not query_list:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@trace
|
||||||
def _get_e2e_device_keys_txn(
|
def _get_e2e_device_keys_txn(
|
||||||
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
|
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
|
||||||
):
|
):
|
||||||
|
set_tag("include_all_devices", include_all_devices)
|
||||||
|
set_tag("include_deleted_devices", include_deleted_devices)
|
||||||
|
|
||||||
query_clauses = []
|
query_clauses = []
|
||||||
query_params = []
|
query_params = []
|
||||||
|
|
||||||
|
@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||||
for user_id, device_id in deleted_devices:
|
for user_id, device_id in deleted_devices:
|
||||||
result.setdefault(user_id, {})[device_id] = None
|
result.setdefault(user_id, {})[device_id] = None
|
||||||
|
|
||||||
|
log_kv(result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||||
keyvalues={"user_id": user_id, "device_id": device_id},
|
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||||
desc="add_e2e_one_time_keys_check",
|
desc="add_e2e_one_time_keys_check",
|
||||||
)
|
)
|
||||||
|
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||||
return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
|
||||||
|
return result
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
|
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
|
||||||
|
@ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _add_e2e_one_time_keys(txn):
|
def _add_e2e_one_time_keys(txn):
|
||||||
|
set_tag("user_id", user_id)
|
||||||
|
set_tag("device_id", device_id)
|
||||||
|
set_tag("new_keys", new_keys)
|
||||||
# We are protected from race between lookup and insertion due to
|
# We are protected from race between lookup and insertion due to
|
||||||
# a unique constraint. If there is a race of two calls to
|
# a unique constraint. If there is a race of two calls to
|
||||||
# `add_e2e_one_time_keys` then they'll conflict and we will only
|
# `add_e2e_one_time_keys` then they'll conflict and we will only
|
||||||
|
@ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _set_e2e_device_keys_txn(txn):
|
def _set_e2e_device_keys_txn(txn):
|
||||||
|
set_tag("user_id", user_id)
|
||||||
|
set_tag("device_id", device_id)
|
||||||
|
set_tag("time_now", time_now)
|
||||||
|
set_tag("device_keys", device_keys)
|
||||||
|
|
||||||
old_key_json = self._simple_select_one_onecol_txn(
|
old_key_json = self._simple_select_one_onecol_txn(
|
||||||
txn,
|
txn,
|
||||||
table="e2e_device_keys_json",
|
table="e2e_device_keys_json",
|
||||||
|
@ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
|
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
|
||||||
|
|
||||||
if old_key_json == new_key_json:
|
if old_key_json == new_key_json:
|
||||||
|
log_kv({"Message": "Device key already stored."})
|
||||||
return False
|
return False
|
||||||
|
|
||||||
self._simple_upsert_txn(
|
self._simple_upsert_txn(
|
||||||
|
@ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
keyvalues={"user_id": user_id, "device_id": device_id},
|
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||||
values={"ts_added_ms": time_now, "key_json": new_key_json},
|
values={"ts_added_ms": time_now, "key_json": new_key_json},
|
||||||
)
|
)
|
||||||
|
log_kv({"message": "Device keys stored."})
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
|
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
|
||||||
|
@ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
def claim_e2e_one_time_keys(self, query_list):
|
def claim_e2e_one_time_keys(self, query_list):
|
||||||
"""Take a list of one time keys out of the database"""
|
"""Take a list of one time keys out of the database"""
|
||||||
|
|
||||||
|
@trace
|
||||||
def _claim_e2e_one_time_keys(txn):
|
def _claim_e2e_one_time_keys(txn):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
|
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
|
||||||
|
@ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
" AND key_id = ?"
|
" AND key_id = ?"
|
||||||
)
|
)
|
||||||
for user_id, device_id, algorithm, key_id in delete:
|
for user_id, device_id, algorithm, key_id in delete:
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Executing claim e2e_one_time_keys transaction on database."
|
||||||
|
}
|
||||||
|
)
|
||||||
txn.execute(sql, (user_id, device_id, algorithm, key_id))
|
txn.execute(sql, (user_id, device_id, algorithm, key_id))
|
||||||
|
log_kv({"message": "finished executing and invalidating cache"})
|
||||||
self._invalidate_cache_and_stream(
|
self._invalidate_cache_and_stream(
|
||||||
txn, self.count_e2e_one_time_keys, (user_id, device_id)
|
txn, self.count_e2e_one_time_keys, (user_id, device_id)
|
||||||
)
|
)
|
||||||
|
@ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
def delete_e2e_keys_by_device(self, user_id, device_id):
|
def delete_e2e_keys_by_device(self, user_id, device_id):
|
||||||
def delete_e2e_keys_by_device_txn(txn):
|
def delete_e2e_keys_by_device_txn(txn):
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Deleting keys for device",
|
||||||
|
"device_id": device_id,
|
||||||
|
"user_id": user_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
self._simple_delete_txn(
|
self._simple_delete_txn(
|
||||||
txn,
|
txn,
|
||||||
table="e2e_device_keys_json",
|
table="e2e_device_keys_json",
|
||||||
|
|
Loading…
Reference in a new issue