Compare commits
11 commits
develop
...
uhoreg/e2e
Author | SHA1 | Date | |
---|---|---|---|
c915567517 | |||
3e180d8f14 | |||
199325bfd6 | |||
bc5f276ea9 | |||
72626e75e4 | |||
5be7ec5dc9 | |||
0d0ee82a6b | |||
bf679213c1 | |||
f405b1b58c | |||
aaaf086f6f | |||
c1606b01b1 |
1
changelog.d/4171.feature
Normal file
1
changelog.d/4171.feature
Normal file
|
@ -0,0 +1 @@
|
|||
add cross-signing support
|
|
@ -283,6 +283,20 @@ class DeviceHandler(BaseHandler):
|
|||
for host in hosts:
|
||||
self.federation_sender.send_device_messages(host)
|
||||
|
||||
@measure_func("notify_attestation_update")
|
||||
@defer.inlineCallbacks
|
||||
def notify_attestation_update(self, from_user_id, user_id):
|
||||
"""Notify a user that they have made new attestations.
|
||||
"""
|
||||
|
||||
position = yield self.store.add_attestation_change_to_streams(
|
||||
from_user_id, user_id
|
||||
)
|
||||
|
||||
yield self.notifier.on_new_event(
|
||||
"device_list_key", position, users=[from_user_id],
|
||||
)
|
||||
|
||||
@measure_func("device.get_user_ids_changed")
|
||||
@defer.inlineCallbacks
|
||||
def get_user_ids_changed(self, user_id, from_token):
|
||||
|
@ -299,7 +313,8 @@ class DeviceHandler(BaseHandler):
|
|||
|
||||
# First we check if any devices have changed
|
||||
changed = yield self.store.get_user_whose_devices_changed(
|
||||
from_token.device_list_key
|
||||
user_id,
|
||||
from_token.device_list_key,
|
||||
)
|
||||
|
||||
# Then work out if any users have since joined
|
||||
|
|
|
@ -46,7 +46,7 @@ class E2eKeysHandler(object):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def query_devices(self, query_body, timeout):
|
||||
def query_devices(self, req_user_id, query_body, timeout):
|
||||
""" Handle a device key query from a client
|
||||
|
||||
{
|
||||
|
@ -83,7 +83,7 @@ class E2eKeysHandler(object):
|
|||
failures = {}
|
||||
results = {}
|
||||
if local_query:
|
||||
local_result = yield self.query_local_devices(local_query)
|
||||
local_result = yield self.query_local_devices(local_query, req_user_id)
|
||||
for user_id, keys in local_result.items():
|
||||
if user_id in local_query:
|
||||
results[user_id] = keys
|
||||
|
@ -108,11 +108,14 @@ class E2eKeysHandler(object):
|
|||
for device_id, device in iteritems(devices):
|
||||
keys = device.get("keys", None)
|
||||
device_display_name = device.get("device_display_name", None)
|
||||
attestations = device.get("attestations", None)
|
||||
if keys:
|
||||
result = dict(keys)
|
||||
unsigned = result.setdefault("unsigned", {})
|
||||
if device_display_name:
|
||||
unsigned["device_display_name"] = device_display_name
|
||||
if attestations:
|
||||
unsigned["attestations"] = attestations
|
||||
user_devices[device_id] = result
|
||||
|
||||
for user_id in user_ids_not_in_cache:
|
||||
|
@ -148,12 +151,13 @@ class E2eKeysHandler(object):
|
|||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def query_local_devices(self, query):
|
||||
def query_local_devices(self, query, req_user_id=None):
|
||||
"""Get E2E device keys for local users
|
||||
|
||||
Args:
|
||||
query (dict[string, list[string]|None): map from user_id to a list
|
||||
of devices to query (None for all devices)
|
||||
req_user_id: the user requesting the devices
|
||||
|
||||
Returns:
|
||||
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
||||
|
@ -178,7 +182,9 @@ class E2eKeysHandler(object):
|
|||
# make sure that each queried user appears in the result dict
|
||||
result_dict[user_id] = {}
|
||||
|
||||
results = yield self.store.get_e2e_device_keys(local_query)
|
||||
results = yield self.store.get_e2e_device_keys(
|
||||
local_query, req_user_id=req_user_id,
|
||||
)
|
||||
|
||||
# Build the result structure, un-jsonify the results, and add the
|
||||
# "unsigned" section
|
||||
|
@ -187,8 +193,11 @@ class E2eKeysHandler(object):
|
|||
r = dict(device_info["keys"])
|
||||
r["unsigned"] = {}
|
||||
display_name = device_info["device_display_name"]
|
||||
attestations = device_info.get("attestations", None)
|
||||
if display_name is not None:
|
||||
r["unsigned"]["device_display_name"] = display_name
|
||||
if attestations is not None:
|
||||
r["unsigned"]["attestations"] = attestations
|
||||
result_dict[user_id][device_id] = r
|
||||
|
||||
defer.returnValue(result_dict)
|
||||
|
@ -286,6 +295,12 @@ class E2eKeysHandler(object):
|
|||
user_id, device_id, time_now, one_time_keys,
|
||||
)
|
||||
|
||||
attestations = keys.get("attestations", None)
|
||||
if attestations:
|
||||
yield self._upload_attestations_from_user(
|
||||
user_id, attestations,
|
||||
)
|
||||
|
||||
# the device should have been registered already, but it may have been
|
||||
# deleted due to a race with a DELETE request. Or we may be using an
|
||||
# old access_token without an associated device_id. Either way, we
|
||||
|
@ -297,6 +312,50 @@ class E2eKeysHandler(object):
|
|||
|
||||
defer.returnValue({"one_time_key_counts": result})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _upload_attestations_from_user(self, req_user_id, attestations):
|
||||
if not isinstance(attestations, list):
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Attestations is not an array."
|
||||
)
|
||||
|
||||
# only include attestations made by the user
|
||||
attestations = [
|
||||
{
|
||||
"user_id": x["user_id"],
|
||||
"device_id": x["device_id"],
|
||||
"keys": x["keys"],
|
||||
"state": x["state"],
|
||||
"signatures": {
|
||||
req_user_id: x["signatures"][req_user_id]
|
||||
}
|
||||
}
|
||||
for x in attestations if req_user_id in x["signatures"]]
|
||||
|
||||
yield self.store.add_e2e_attestations(
|
||||
attestations
|
||||
)
|
||||
|
||||
# notify about attestations
|
||||
attested_users = {}
|
||||
for attestation in attestations:
|
||||
user_id = attestation["user_id"]
|
||||
if user_id not in attested_users:
|
||||
attested_users[user_id] = []
|
||||
attested_users[user_id].append(attestation["device_id"])
|
||||
for user_id, devices in attested_users.items():
|
||||
if req_user_id == user_id:
|
||||
# when making an attestation on your own device, notify
|
||||
# everyone who shares a room with you. This is the same as the
|
||||
# users that are notified when your device list changes, so we
|
||||
# use the same stream.
|
||||
yield self.device_handler.notify_device_update(user_id, devices)
|
||||
else:
|
||||
# when making an attestation on someone else's device, notify
|
||||
# only your own devices.
|
||||
yield self.device_handler.notify_attestation_update(req_user_id, user_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _upload_one_time_keys_for_user(self, user_id, device_id, time_now,
|
||||
one_time_keys):
|
||||
|
|
|
@ -944,6 +944,7 @@ class SyncHandler(object):
|
|||
|
||||
device_lists = yield self._generate_sync_entry_for_device_list(
|
||||
sync_result_builder,
|
||||
user_id,
|
||||
newly_joined_rooms=newly_joined_rooms,
|
||||
newly_joined_users=newly_joined_users,
|
||||
newly_left_rooms=newly_left_rooms,
|
||||
|
@ -1020,7 +1021,7 @@ class SyncHandler(object):
|
|||
|
||||
@measure_func("_generate_sync_entry_for_device_list")
|
||||
@defer.inlineCallbacks
|
||||
def _generate_sync_entry_for_device_list(self, sync_result_builder,
|
||||
def _generate_sync_entry_for_device_list(self, sync_result_builder, user_id,
|
||||
newly_joined_rooms, newly_joined_users,
|
||||
newly_left_rooms, newly_left_users):
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
|
@ -1028,7 +1029,8 @@ class SyncHandler(object):
|
|||
|
||||
if since_token and since_token.device_list_key:
|
||||
changed = yield self.store.get_user_whose_devices_changed(
|
||||
since_token.device_list_key
|
||||
user_id,
|
||||
since_token.device_list_key,
|
||||
)
|
||||
|
||||
# TODO: Be more clever than this, i.e. remove users who we already
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -54,6 +55,19 @@ class KeyUploadServlet(RestServlet):
|
|||
"one_time_keys": {
|
||||
"<algorithm>:<key_id>": "<key_base64>"
|
||||
},
|
||||
"attestations": [
|
||||
{
|
||||
"user_id": "<user_id>",
|
||||
"device_id": "<device_id>",
|
||||
"keys": {
|
||||
"ed25519": "<key_base64>"
|
||||
},
|
||||
"state": "<verified or revoked>",
|
||||
"signatures": {
|
||||
"<user_id>": {
|
||||
"<algorithm>:<device_id>": "<signature_base64>"
|
||||
} } }
|
||||
]
|
||||
}
|
||||
"""
|
||||
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
|
||||
|
@ -127,6 +141,22 @@ class KeyQueryServlet(RestServlet):
|
|||
// Must be signed by this server.
|
||||
"<server_name>": {
|
||||
"<algorithm>:<key_id>": "<signature_base64>"
|
||||
},
|
||||
"unsigned": {
|
||||
"device_display_name": "<device_name>"
|
||||
"attestations": [ // attestations on this device
|
||||
{
|
||||
"user_id": "<user_id>",
|
||||
"device_id": "<device_id>",
|
||||
"keys": {
|
||||
"<algorithm>:<key_id>": "<key_base64>"
|
||||
},
|
||||
"state": "<verified or revoked>",
|
||||
"signatures": {
|
||||
"<user_id>": { // user_id of the user making the attestation
|
||||
"<algorithm>:<device_id>": "<signature_base64>"
|
||||
} } }
|
||||
]
|
||||
} } } } } }
|
||||
"""
|
||||
|
||||
|
@ -143,10 +173,11 @@ class KeyQueryServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
user_id = requester.user.to_string()
|
||||
timeout = parse_integer(request, "timeout", 10 * 1000)
|
||||
body = parse_json_object_from_request(request)
|
||||
result = yield self.e2e_keys_handler.query_devices(body, timeout)
|
||||
result = yield self.e2e_keys_handler.query_devices(user_id, body, timeout)
|
||||
defer.returnValue((200, result))
|
||||
|
||||
|
||||
|
|
|
@ -182,6 +182,9 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
self._device_list_stream_cache = StreamChangeCache(
|
||||
"DeviceListStreamChangeCache", device_list_max,
|
||||
)
|
||||
self._attestation_stream_cache = StreamChangeCache(
|
||||
"AttestationStreamChangeCache", device_list_max,
|
||||
)
|
||||
self._device_list_federation_stream_cache = StreamChangeCache(
|
||||
"DeviceListFederationStreamChangeCache", device_list_max,
|
||||
)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -479,6 +480,9 @@ class DeviceStore(BackgroundUpdateStore):
|
|||
device_display_name = device.get("device_display_name", None)
|
||||
if device_display_name:
|
||||
result["device_display_name"] = device_display_name
|
||||
attestations = device.get("attestations", None)
|
||||
if attestations:
|
||||
result["attestations"] = attestations
|
||||
else:
|
||||
result["deleted"] = True
|
||||
|
||||
|
@ -579,6 +583,9 @@ class DeviceStore(BackgroundUpdateStore):
|
|||
device_display_name = device.get("device_display_name", None)
|
||||
if device_display_name:
|
||||
result["device_display_name"] = device_display_name
|
||||
attestations = device.get("attestations", None)
|
||||
if attestations:
|
||||
result["attestations"] = attestations
|
||||
|
||||
results.append(result)
|
||||
|
||||
|
@ -634,19 +641,42 @@ class DeviceStore(BackgroundUpdateStore):
|
|||
txn.execute(sql, (destination, stream_id,))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_whose_devices_changed(self, from_key):
|
||||
def get_user_whose_devices_changed(self, user_id, from_key):
|
||||
"""Get set of users whose devices have changed since `from_key`.
|
||||
"""
|
||||
from_key = int(from_key)
|
||||
changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
|
||||
if changed is not None:
|
||||
defer.returnValue(set(changed))
|
||||
changed_set = set(changed)
|
||||
if self._attestation_stream_cache.has_entity_changed(user_id, from_key):
|
||||
# the user's attestations have changed, so fetch the new users
|
||||
# that have been attested
|
||||
sql = """
|
||||
SELECT DISTINCT user_id FROM attestations_stream
|
||||
WHERE from_user_id = ? AND stream_id > ?
|
||||
"""
|
||||
rows = yield self._execute(
|
||||
"get_user_whose_devices_were_attested", None, sql, user_id, from_key
|
||||
)
|
||||
changed_set.update(row[0] for row in rows)
|
||||
defer.returnValue(changed_set)
|
||||
|
||||
# get device updates
|
||||
sql = """
|
||||
SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
|
||||
"""
|
||||
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
|
||||
defer.returnValue(set(row[0] for row in rows))
|
||||
changed_set = set(row[0] for row in rows)
|
||||
# get attestations that the user has made
|
||||
sql = """
|
||||
SELECT DISTINCT user_id FROM attestations_stream
|
||||
WHERE from_user_id = ? AND stream_id > ?
|
||||
"""
|
||||
rows = yield self._execute(
|
||||
"get_user_whose_devices_were_attested", None, sql, user_id, from_key
|
||||
)
|
||||
changed_set.update(row[0] for row in rows)
|
||||
defer.returnValue(changed_set)
|
||||
|
||||
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
|
||||
"""Return a list of `(stream_id, user_id, destination)` which is the
|
||||
|
@ -732,6 +762,33 @@ class DeviceStore(BackgroundUpdateStore):
|
|||
]
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_attestation_change_to_streams(self, from_user_id, user_id):
|
||||
"""Persist that a user's attestations have been updated.
|
||||
"""
|
||||
|
||||
with self._device_list_id_gen.get_next() as stream_id:
|
||||
yield self.runInteraction(
|
||||
"add_attestation_change_to_streams", self._add_attestation_change_txn,
|
||||
from_user_id, user_id, stream_id,
|
||||
)
|
||||
defer.returnValue(stream_id)
|
||||
|
||||
def _add_attestation_change_txn(self, txn, from_user_id, user_id, stream_id):
|
||||
txn.call_after(
|
||||
self._attestation_stream_cache.entity_has_changed,
|
||||
from_user_id, stream_id,
|
||||
)
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"attestations_stream",
|
||||
values={
|
||||
"stream_id": stream_id,
|
||||
"from_user_id": from_user_id,
|
||||
"user_id": user_id,
|
||||
},
|
||||
)
|
||||
|
||||
def get_device_stream_token(self):
|
||||
return self._device_list_id_gen.get_current_token()
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -14,7 +15,7 @@
|
|||
# limitations under the License.
|
||||
from six import iteritems
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
@ -69,7 +70,7 @@ class EndToEndKeyStore(SQLBaseStore):
|
|||
@defer.inlineCallbacks
|
||||
def get_e2e_device_keys(
|
||||
self, query_list, include_all_devices=False,
|
||||
include_deleted_devices=False,
|
||||
include_deleted_devices=False, req_user_id=None,
|
||||
):
|
||||
"""Fetch a list of device keys.
|
||||
Args:
|
||||
|
@ -79,6 +80,7 @@ class EndToEndKeyStore(SQLBaseStore):
|
|||
include_deleted_devices (bool): whether to include null entries for
|
||||
devices which no longer exist (but were in the query_list).
|
||||
This option only takes effect if include_all_devices is true.
|
||||
req_user_id: The user requesting the device list
|
||||
Returns:
|
||||
Dict mapping from user-id to dict mapping from device_id to
|
||||
dict containing "key_json", "device_display_name".
|
||||
|
@ -88,7 +90,7 @@ class EndToEndKeyStore(SQLBaseStore):
|
|||
|
||||
results = yield self.runInteraction(
|
||||
"get_e2e_device_keys", self._get_e2e_device_keys_txn,
|
||||
query_list, include_all_devices, include_deleted_devices,
|
||||
query_list, include_all_devices, include_deleted_devices, req_user_id,
|
||||
)
|
||||
|
||||
for user_id, device_keys in iteritems(results):
|
||||
|
@ -99,7 +101,7 @@ class EndToEndKeyStore(SQLBaseStore):
|
|||
|
||||
def _get_e2e_device_keys_txn(
|
||||
self, txn, query_list, include_all_devices=False,
|
||||
include_deleted_devices=False,
|
||||
include_deleted_devices=False, req_user_id=None,
|
||||
):
|
||||
query_clauses = []
|
||||
query_params = []
|
||||
|
@ -145,6 +147,17 @@ class EndToEndKeyStore(SQLBaseStore):
|
|||
for user_id, device_id in deleted_devices:
|
||||
result.setdefault(user_id, {})[device_id] = None
|
||||
|
||||
attestations = self._get_e2e_attestations_txn(txn, req_user_id, query_list)
|
||||
|
||||
for attestation in attestations:
|
||||
user_id = attestation["user_id"]
|
||||
device_id = attestation["device_id"]
|
||||
# FIXME: combine signatures of the same payload?
|
||||
if user_id in result and device_id in result[user_id] \
|
||||
and result[user_id][device_id]:
|
||||
result[user_id][device_id].setdefault("attestations", []) \
|
||||
.append(attestation)
|
||||
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -288,3 +301,60 @@ class EndToEndKeyStore(SQLBaseStore):
|
|||
return self.runInteraction(
|
||||
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
|
||||
)
|
||||
|
||||
def add_e2e_attestations(self, attestations):
|
||||
def add_e2e_attestations_txn(txn):
|
||||
self._simple_insert_many_txn(
|
||||
txn, table="e2e_attestations",
|
||||
values=[
|
||||
{
|
||||
"user_id": x["user_id"],
|
||||
"device_id": x["device_id"],
|
||||
"from_user_id": from_user_id,
|
||||
"attestation": json.dumps(x),
|
||||
}
|
||||
for x in attestations
|
||||
for from_user_id in x["signatures"].keys()
|
||||
],
|
||||
)
|
||||
return self.runInteraction(
|
||||
"add_e2e_attestations", add_e2e_attestations_txn
|
||||
)
|
||||
|
||||
def _get_e2e_attestations_txn(self, txn, from_user_id, query_list):
|
||||
|
||||
query_clauses = []
|
||||
query_params = []
|
||||
|
||||
for (user_id, device_id) in query_list:
|
||||
# Users can only see attestations made by themselves about the
|
||||
# target user's devices, or by the target user about their own
|
||||
# devices. If the requesting user is not specified, select only
|
||||
# the publicly visible attestations, that is, attestations made by
|
||||
# the target user's own devices.
|
||||
if from_user_id:
|
||||
query_clause = "(from_user_id = ? OR from_user_id = ?)"
|
||||
query_params.append(from_user_id)
|
||||
else:
|
||||
query_clause = "(from_user_id = ?)"
|
||||
query_params.append(user_id)
|
||||
|
||||
query_clause += " AND user_id = ?"
|
||||
query_params.append(user_id)
|
||||
|
||||
if device_id is not None:
|
||||
query_clause += " AND device_id = ?"
|
||||
query_params.append(device_id)
|
||||
|
||||
query_clauses.append(query_clause)
|
||||
|
||||
sql = (
|
||||
"SELECT attestation "
|
||||
" FROM e2e_attestations "
|
||||
" WHERE %s"
|
||||
) % (
|
||||
" OR ".join("(" + q + ")" for q in query_clauses)
|
||||
)
|
||||
|
||||
txn.execute(sql, query_params)
|
||||
return [json.loads(row[0]) for row in txn]
|
||||
|
|
33
synapse/storage/schema/delta/53/e2e_attestations.sql
Normal file
33
synapse/storage/schema/delta/53/e2e_attestations.sql
Normal file
|
@ -0,0 +1,33 @@
|
|||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- attestations of e2e device keys
|
||||
CREATE TABLE e2e_attestations (
|
||||
user_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
from_user_id TEXT NOT NULL,
|
||||
attestation TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX e2e_attestations_idx ON e2e_attestations(user_id, from_user_id, device_id);
|
||||
|
||||
-- stream of attestation updates
|
||||
CREATE TABLE attestations_stream (
|
||||
stream_id BIGINT NOT NULL,
|
||||
from_user_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX attestations_stream_idx ON attestations_stream(stream_id, from_user_id);
|
Loading…
Reference in a new issue