Compare commits

...

11 commits

Author SHA1 Message Date
Hubert Chathi c915567517 don't try to modify a non-existant result 2018-12-07 22:42:28 -05:00
Hubert Chathi 3e180d8f14 Merge branch 'develop' into e2e_cross-signing 2018-12-05 17:50:49 -05:00
Hubert Chathi 199325bfd6 update comment on keys/query and copyright header 2018-11-26 11:11:51 -05:00
Hubert Chathi bc5f276ea9 add changelog 2018-11-21 14:58:16 -05:00
Hubert Chathi 72626e75e4 pass attestations on to other servers 2018-11-21 14:57:26 -05:00
Hubert Chathi 5be7ec5dc9 add a comment explaining part of the query 2018-11-21 14:56:40 -05:00
Hubert Chathi 0d0ee82a6b put attestations in the right place 2018-11-19 11:15:44 -05:00
Hubert Chathi bf679213c1 work in Python 3 2018-11-14 21:40:52 -05:00
Hubert Chathi f405b1b58c limit notifications about attestations
when a user makes an attestation about another user's device, notify only the
user who made the attestation
2018-11-09 21:57:05 -05:00
Hubert Chathi aaaf086f6f fix variable name collision 2018-11-09 14:59:09 -05:00
Hubert Chathi c1606b01b1 initial work on storing and retreiving attestations for cross-signing 2018-11-05 22:40:06 -05:00
9 changed files with 287 additions and 16 deletions

1
changelog.d/4171.feature Normal file
View file

@ -0,0 +1 @@
add cross-signing support

View file

@ -283,6 +283,20 @@ class DeviceHandler(BaseHandler):
for host in hosts:
self.federation_sender.send_device_messages(host)
@measure_func("notify_attestation_update")
@defer.inlineCallbacks
def notify_attestation_update(self, from_user_id, user_id):
"""Notify a user that they have made new attestations.
"""
position = yield self.store.add_attestation_change_to_streams(
from_user_id, user_id
)
yield self.notifier.on_new_event(
"device_list_key", position, users=[from_user_id],
)
@measure_func("device.get_user_ids_changed")
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
@ -299,7 +313,8 @@ class DeviceHandler(BaseHandler):
# First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed(
from_token.device_list_key
user_id,
from_token.device_list_key,
)
# Then work out if any users have since joined

View file

@ -46,7 +46,7 @@ class E2eKeysHandler(object):
)
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
def query_devices(self, req_user_id, query_body, timeout):
""" Handle a device key query from a client
{
@ -83,7 +83,7 @@ class E2eKeysHandler(object):
failures = {}
results = {}
if local_query:
local_result = yield self.query_local_devices(local_query)
local_result = yield self.query_local_devices(local_query, req_user_id)
for user_id, keys in local_result.items():
if user_id in local_query:
results[user_id] = keys
@ -108,11 +108,14 @@ class E2eKeysHandler(object):
for device_id, device in iteritems(devices):
keys = device.get("keys", None)
device_display_name = device.get("device_display_name", None)
attestations = device.get("attestations", None)
if keys:
result = dict(keys)
unsigned = result.setdefault("unsigned", {})
if device_display_name:
unsigned["device_display_name"] = device_display_name
if attestations:
unsigned["attestations"] = attestations
user_devices[device_id] = result
for user_id in user_ids_not_in_cache:
@ -148,12 +151,13 @@ class E2eKeysHandler(object):
})
@defer.inlineCallbacks
def query_local_devices(self, query):
def query_local_devices(self, query, req_user_id=None):
"""Get E2E device keys for local users
Args:
query (dict[string, list[string]|None): map from user_id to a list
of devices to query (None for all devices)
req_user_id: the user requesting the devices
Returns:
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
@ -178,7 +182,9 @@ class E2eKeysHandler(object):
# make sure that each queried user appears in the result dict
result_dict[user_id] = {}
results = yield self.store.get_e2e_device_keys(local_query)
results = yield self.store.get_e2e_device_keys(
local_query, req_user_id=req_user_id,
)
# Build the result structure, un-jsonify the results, and add the
# "unsigned" section
@ -187,8 +193,11 @@ class E2eKeysHandler(object):
r = dict(device_info["keys"])
r["unsigned"] = {}
display_name = device_info["device_display_name"]
attestations = device_info.get("attestations", None)
if display_name is not None:
r["unsigned"]["device_display_name"] = display_name
if attestations is not None:
r["unsigned"]["attestations"] = attestations
result_dict[user_id][device_id] = r
defer.returnValue(result_dict)
@ -286,6 +295,12 @@ class E2eKeysHandler(object):
user_id, device_id, time_now, one_time_keys,
)
attestations = keys.get("attestations", None)
if attestations:
yield self._upload_attestations_from_user(
user_id, attestations,
)
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
# old access_token without an associated device_id. Either way, we
@ -297,6 +312,50 @@ class E2eKeysHandler(object):
defer.returnValue({"one_time_key_counts": result})
@defer.inlineCallbacks
def _upload_attestations_from_user(self, req_user_id, attestations):
if not isinstance(attestations, list):
raise SynapseError(
400,
"Attestations is not an array."
)
# only include attestations made by the user
attestations = [
{
"user_id": x["user_id"],
"device_id": x["device_id"],
"keys": x["keys"],
"state": x["state"],
"signatures": {
req_user_id: x["signatures"][req_user_id]
}
}
for x in attestations if req_user_id in x["signatures"]]
yield self.store.add_e2e_attestations(
attestations
)
# notify about attestations
attested_users = {}
for attestation in attestations:
user_id = attestation["user_id"]
if user_id not in attested_users:
attested_users[user_id] = []
attested_users[user_id].append(attestation["device_id"])
for user_id, devices in attested_users.items():
if req_user_id == user_id:
# when making an attestation on your own device, notify
# everyone who shares a room with you. This is the same as the
# users that are notified when your device list changes, so we
# use the same stream.
yield self.device_handler.notify_device_update(user_id, devices)
else:
# when making an attestation on someone else's device, notify
# only your own devices.
yield self.device_handler.notify_attestation_update(req_user_id, user_id)
@defer.inlineCallbacks
def _upload_one_time_keys_for_user(self, user_id, device_id, time_now,
one_time_keys):

View file

@ -944,6 +944,7 @@ class SyncHandler(object):
device_lists = yield self._generate_sync_entry_for_device_list(
sync_result_builder,
user_id,
newly_joined_rooms=newly_joined_rooms,
newly_joined_users=newly_joined_users,
newly_left_rooms=newly_left_rooms,
@ -1020,7 +1021,7 @@ class SyncHandler(object):
@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
def _generate_sync_entry_for_device_list(self, sync_result_builder,
def _generate_sync_entry_for_device_list(self, sync_result_builder, user_id,
newly_joined_rooms, newly_joined_users,
newly_left_rooms, newly_left_users):
user_id = sync_result_builder.sync_config.user.to_string()
@ -1028,7 +1029,8 @@ class SyncHandler(object):
if since_token and since_token.device_list_key:
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
user_id,
since_token.device_list_key,
)
# TODO: Be more clever than this, i.e. remove users who we already

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -54,6 +55,19 @@ class KeyUploadServlet(RestServlet):
"one_time_keys": {
"<algorithm>:<key_id>": "<key_base64>"
},
"attestations": [
{
"user_id": "<user_id>",
"device_id": "<device_id>",
"keys": {
"ed25519": "<key_base64>"
},
"state": "<verified or revoked>",
"signatures": {
"<user_id>": {
"<algorithm>:<device_id>": "<signature_base64>"
} } }
]
}
"""
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@ -127,6 +141,22 @@ class KeyQueryServlet(RestServlet):
// Must be signed by this server.
"<server_name>": {
"<algorithm>:<key_id>": "<signature_base64>"
},
"unsigned": {
"device_display_name": "<device_name>"
"attestations": [ // attestations on this device
{
"user_id": "<user_id>",
"device_id": "<device_id>",
"keys": {
"<algorithm>:<key_id>": "<key_base64>"
},
"state": "<verified or revoked>",
"signatures": {
"<user_id>": { // user_id of the user making the attestation
"<algorithm>:<device_id>": "<signature_base64>"
} } }
]
} } } } } }
"""
@ -143,10 +173,11 @@ class KeyQueryServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
yield self.auth.get_user_by_req(request, allow_guest=True)
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
user_id = requester.user.to_string()
timeout = parse_integer(request, "timeout", 10 * 1000)
body = parse_json_object_from_request(request)
result = yield self.e2e_keys_handler.query_devices(body, timeout)
result = yield self.e2e_keys_handler.query_devices(user_id, body, timeout)
defer.returnValue((200, result))

View file

@ -182,6 +182,9 @@ class DataStore(RoomMemberStore, RoomStore,
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max,
)
self._attestation_stream_cache = StreamChangeCache(
"AttestationStreamChangeCache", device_list_max,
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache", device_list_max,
)

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -479,6 +480,9 @@ class DeviceStore(BackgroundUpdateStore):
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name
attestations = device.get("attestations", None)
if attestations:
result["attestations"] = attestations
else:
result["deleted"] = True
@ -579,6 +583,9 @@ class DeviceStore(BackgroundUpdateStore):
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name
attestations = device.get("attestations", None)
if attestations:
result["attestations"] = attestations
results.append(result)
@ -634,19 +641,42 @@ class DeviceStore(BackgroundUpdateStore):
txn.execute(sql, (destination, stream_id,))
@defer.inlineCallbacks
def get_user_whose_devices_changed(self, from_key):
def get_user_whose_devices_changed(self, user_id, from_key):
"""Get set of users whose devices have changed since `from_key`.
"""
from_key = int(from_key)
changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
if changed is not None:
defer.returnValue(set(changed))
changed_set = set(changed)
if self._attestation_stream_cache.has_entity_changed(user_id, from_key):
# the user's attestations have changed, so fetch the new users
# that have been attested
sql = """
SELECT DISTINCT user_id FROM attestations_stream
WHERE from_user_id = ? AND stream_id > ?
"""
rows = yield self._execute(
"get_user_whose_devices_were_attested", None, sql, user_id, from_key
)
changed_set.update(row[0] for row in rows)
defer.returnValue(changed_set)
# get device updates
sql = """
SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
"""
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
defer.returnValue(set(row[0] for row in rows))
changed_set = set(row[0] for row in rows)
# get attestations that the user has made
sql = """
SELECT DISTINCT user_id FROM attestations_stream
WHERE from_user_id = ? AND stream_id > ?
"""
rows = yield self._execute(
"get_user_whose_devices_were_attested", None, sql, user_id, from_key
)
changed_set.update(row[0] for row in rows)
defer.returnValue(changed_set)
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the
@ -732,6 +762,33 @@ class DeviceStore(BackgroundUpdateStore):
]
)
@defer.inlineCallbacks
def add_attestation_change_to_streams(self, from_user_id, user_id):
"""Persist that a user's attestations have been updated.
"""
with self._device_list_id_gen.get_next() as stream_id:
yield self.runInteraction(
"add_attestation_change_to_streams", self._add_attestation_change_txn,
from_user_id, user_id, stream_id,
)
defer.returnValue(stream_id)
def _add_attestation_change_txn(self, txn, from_user_id, user_id, stream_id):
txn.call_after(
self._attestation_stream_cache.entity_has_changed,
from_user_id, stream_id,
)
self._simple_insert_txn(
txn,
"attestations_stream",
values={
"stream_id": stream_id,
"from_user_id": from_user_id,
"user_id": user_id,
},
)
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,7 +15,7 @@
# limitations under the License.
from six import iteritems
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
@ -69,7 +70,7 @@ class EndToEndKeyStore(SQLBaseStore):
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False,
include_deleted_devices=False,
include_deleted_devices=False, req_user_id=None,
):
"""Fetch a list of device keys.
Args:
@ -79,6 +80,7 @@ class EndToEndKeyStore(SQLBaseStore):
include_deleted_devices (bool): whether to include null entries for
devices which no longer exist (but were in the query_list).
This option only takes effect if include_all_devices is true.
req_user_id: The user requesting the device list
Returns:
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
@ -88,7 +90,7 @@ class EndToEndKeyStore(SQLBaseStore):
results = yield self.runInteraction(
"get_e2e_device_keys", self._get_e2e_device_keys_txn,
query_list, include_all_devices, include_deleted_devices,
query_list, include_all_devices, include_deleted_devices, req_user_id,
)
for user_id, device_keys in iteritems(results):
@ -99,7 +101,7 @@ class EndToEndKeyStore(SQLBaseStore):
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False,
include_deleted_devices=False,
include_deleted_devices=False, req_user_id=None,
):
query_clauses = []
query_params = []
@ -145,6 +147,17 @@ class EndToEndKeyStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
attestations = self._get_e2e_attestations_txn(txn, req_user_id, query_list)
for attestation in attestations:
user_id = attestation["user_id"]
device_id = attestation["device_id"]
# FIXME: combine signatures of the same payload?
if user_id in result and device_id in result[user_id] \
and result[user_id][device_id]:
result[user_id][device_id].setdefault("attestations", []) \
.append(attestation)
return result
@defer.inlineCallbacks
@ -288,3 +301,60 @@ class EndToEndKeyStore(SQLBaseStore):
return self.runInteraction(
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
)
def add_e2e_attestations(self, attestations):
def add_e2e_attestations_txn(txn):
self._simple_insert_many_txn(
txn, table="e2e_attestations",
values=[
{
"user_id": x["user_id"],
"device_id": x["device_id"],
"from_user_id": from_user_id,
"attestation": json.dumps(x),
}
for x in attestations
for from_user_id in x["signatures"].keys()
],
)
return self.runInteraction(
"add_e2e_attestations", add_e2e_attestations_txn
)
def _get_e2e_attestations_txn(self, txn, from_user_id, query_list):
query_clauses = []
query_params = []
for (user_id, device_id) in query_list:
# Users can only see attestations made by themselves about the
# target user's devices, or by the target user about their own
# devices. If the requesting user is not specified, select only
# the publicly visible attestations, that is, attestations made by
# the target user's own devices.
if from_user_id:
query_clause = "(from_user_id = ? OR from_user_id = ?)"
query_params.append(from_user_id)
else:
query_clause = "(from_user_id = ?)"
query_params.append(user_id)
query_clause += " AND user_id = ?"
query_params.append(user_id)
if device_id is not None:
query_clause += " AND device_id = ?"
query_params.append(device_id)
query_clauses.append(query_clause)
sql = (
"SELECT attestation "
" FROM e2e_attestations "
" WHERE %s"
) % (
" OR ".join("(" + q + ")" for q in query_clauses)
)
txn.execute(sql, query_params)
return [json.loads(row[0]) for row in txn]

View file

@ -0,0 +1,33 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- attestations of e2e device keys
CREATE TABLE e2e_attestations (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
from_user_id TEXT NOT NULL,
attestation TEXT NOT NULL
);
CREATE INDEX e2e_attestations_idx ON e2e_attestations(user_id, from_user_id, device_id);
-- stream of attestation updates
CREATE TABLE attestations_stream (
stream_id BIGINT NOT NULL,
from_user_id TEXT NOT NULL,
user_id TEXT NOT NULL
);
CREATE INDEX attestations_stream_idx ON attestations_stream(stream_id, from_user_id);