forked from MirrorHub/synapse
Merge branch 'rav/refactor_device_query' into develop
This commit is contained in:
commit
e555bc6551
6 changed files with 177 additions and 92 deletions
|
@ -372,27 +372,9 @@ class FederationServer(FederationBase):
|
|||
(200, send_content)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_query_client_keys(self, origin, content):
|
||||
query = []
|
||||
for user_id, device_ids in content.get("device_keys", {}).items():
|
||||
if not device_ids:
|
||||
query.append((user_id, None))
|
||||
else:
|
||||
for device_id in device_ids:
|
||||
query.append((user_id, device_id))
|
||||
|
||||
results = yield self.store.get_e2e_device_keys(query)
|
||||
|
||||
json_result = {}
|
||||
for user_id, device_keys in results.items():
|
||||
for device_id, json_bytes in device_keys.items():
|
||||
json_result.setdefault(user_id, {})[device_id] = json.loads(
|
||||
json_bytes
|
||||
)
|
||||
|
||||
defer.returnValue({"device_keys": json_result})
|
||||
return self.on_query_request("client_keys", content)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
|
|
@ -367,10 +367,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
|
|||
class FederationClientKeysQueryServlet(BaseFederationServlet):
|
||||
PATH = "/user/keys/query"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, origin, content, query):
|
||||
response = yield self.handler.on_query_client_keys(origin, content)
|
||||
defer.returnValue((200, response))
|
||||
return self.handler.on_query_client_keys(origin, content)
|
||||
|
||||
|
||||
class FederationClientKeysClaimServlet(BaseFederationServlet):
|
||||
|
|
130
synapse/handlers/e2e_keys.py
Normal file
130
synapse/handlers/e2e_keys.py
Normal file
|
@ -0,0 +1,130 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import json
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api import errors
|
||||
import synapse.types
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class E2eKeysHandler(object):
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
self.federation = hs.get_replication_layer()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.server_name = hs.hostname
|
||||
|
||||
# doesn't really work as part of the generic query API, because the
|
||||
# query request requires an object POST, but we abuse the
|
||||
# "query handler" interface.
|
||||
self.federation.register_query_handler(
|
||||
"client_keys", self.on_federation_query_client_keys
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def query_devices(self, query_body):
|
||||
""" Handle a device key query from a client
|
||||
|
||||
{
|
||||
"device_keys": {
|
||||
"<user_id>": ["<device_id>"]
|
||||
}
|
||||
}
|
||||
->
|
||||
{
|
||||
"device_keys": {
|
||||
"<user_id>": {
|
||||
"<device_id>": {
|
||||
...
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
device_keys_query = query_body.get("device_keys", {})
|
||||
|
||||
# separate users by domain.
|
||||
# make a map from domain to user_id to device_ids
|
||||
queries_by_domain = collections.defaultdict(dict)
|
||||
for user_id, device_ids in device_keys_query.items():
|
||||
user = synapse.types.UserID.from_string(user_id)
|
||||
queries_by_domain[user.domain][user_id] = device_ids
|
||||
|
||||
# do the queries
|
||||
# TODO: do these in parallel
|
||||
results = {}
|
||||
for destination, destination_query in queries_by_domain.items():
|
||||
if destination == self.server_name:
|
||||
res = yield self.query_local_devices(destination_query)
|
||||
else:
|
||||
res = yield self.federation.query_client_keys(
|
||||
destination, {"device_keys": destination_query}
|
||||
)
|
||||
res = res["device_keys"]
|
||||
for user_id, keys in res.items():
|
||||
if user_id in destination_query:
|
||||
results[user_id] = keys
|
||||
|
||||
defer.returnValue((200, {"device_keys": results}))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def query_local_devices(self, query):
|
||||
"""Get E2E device keys for local users
|
||||
|
||||
Args:
|
||||
query (dict[string, list[string]|None): map from user_id to a list
|
||||
of devices to query (None for all devices)
|
||||
|
||||
Returns:
|
||||
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
||||
map from user_id -> device_id -> device details
|
||||
"""
|
||||
local_query = []
|
||||
|
||||
for user_id, device_ids in query.items():
|
||||
if not self.is_mine_id(user_id):
|
||||
logger.warning("Request for keys for non-local user %s",
|
||||
user_id)
|
||||
raise errors.SynapseError(400, "Not a user here")
|
||||
|
||||
if not device_ids:
|
||||
local_query.append((user_id, None))
|
||||
else:
|
||||
for device_id in device_ids:
|
||||
local_query.append((user_id, device_id))
|
||||
|
||||
results = yield self.store.get_e2e_device_keys(local_query)
|
||||
|
||||
# un-jsonify the results
|
||||
json_result = collections.defaultdict(dict)
|
||||
for user_id, device_keys in results.items():
|
||||
for device_id, json_bytes in device_keys.items():
|
||||
json_result[user_id][device_id] = json.loads(json_bytes)
|
||||
|
||||
defer.returnValue(json_result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_federation_query_client_keys(self, query_body):
|
||||
""" Handle a device key query from a federated server
|
||||
"""
|
||||
device_keys_query = query_body.get("device_keys", {})
|
||||
res = yield self.query_local_devices(device_keys_query)
|
||||
defer.returnValue({"device_keys": res})
|
|
@ -186,17 +186,19 @@ class KeyQueryServlet(RestServlet):
|
|||
)
|
||||
|
||||
def __init__(self, hs):
|
||||
"""
|
||||
Args:
|
||||
hs (synapse.server.HomeServer):
|
||||
"""
|
||||
super(KeyQueryServlet, self).__init__()
|
||||
self.store = hs.get_datastore()
|
||||
self.auth = hs.get_auth()
|
||||
self.federation = hs.get_replication_layer()
|
||||
self.is_mine = hs.is_mine
|
||||
self.e2e_keys_handler = hs.get_e2e_keys_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, user_id, device_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
body = parse_json_object_from_request(request)
|
||||
result = yield self.handle_request(body)
|
||||
result = yield self.e2e_keys_handler.query_devices(body)
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -205,45 +207,11 @@ class KeyQueryServlet(RestServlet):
|
|||
auth_user_id = requester.user.to_string()
|
||||
user_id = user_id if user_id else auth_user_id
|
||||
device_ids = [device_id] if device_id else []
|
||||
result = yield self.handle_request(
|
||||
result = yield self.e2e_keys_handler.query_devices(
|
||||
{"device_keys": {user_id: device_ids}}
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_request(self, body):
|
||||
local_query = []
|
||||
remote_queries = {}
|
||||
for user_id, device_ids in body.get("device_keys", {}).items():
|
||||
user = UserID.from_string(user_id)
|
||||
if self.is_mine(user):
|
||||
if not device_ids:
|
||||
local_query.append((user_id, None))
|
||||
else:
|
||||
for device_id in device_ids:
|
||||
local_query.append((user_id, device_id))
|
||||
else:
|
||||
remote_queries.setdefault(user.domain, {})[user_id] = list(
|
||||
device_ids
|
||||
)
|
||||
results = yield self.store.get_e2e_device_keys(local_query)
|
||||
|
||||
json_result = {}
|
||||
for user_id, device_keys in results.items():
|
||||
for device_id, json_bytes in device_keys.items():
|
||||
json_result.setdefault(user_id, {})[device_id] = json.loads(
|
||||
json_bytes
|
||||
)
|
||||
|
||||
for destination, device_keys in remote_queries.items():
|
||||
remote_result = yield self.federation.query_client_keys(
|
||||
destination, {"device_keys": device_keys}
|
||||
)
|
||||
for user_id, keys in remote_result["device_keys"].items():
|
||||
if user_id in device_keys:
|
||||
json_result[user_id] = keys
|
||||
defer.returnValue((200, {"device_keys": json_result}))
|
||||
|
||||
|
||||
class OneTimeKeyServlet(RestServlet):
|
||||
"""
|
||||
|
|
|
@ -19,39 +19,38 @@
|
|||
# partial one for unit test mocking.
|
||||
|
||||
# Imports required for the default HomeServer() implementation
|
||||
from twisted.web.client import BrowserLikePolicyForHTTPS
|
||||
from twisted.enterprise import adbapi
|
||||
|
||||
from synapse.appservice.scheduler import ApplicationServiceScheduler
|
||||
from synapse.appservice.api import ApplicationServiceApi
|
||||
from synapse.federation import initialize_http_replication
|
||||
from synapse.handlers.device import DeviceHandler
|
||||
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
|
||||
from synapse.notifier import Notifier
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.handlers import Handlers
|
||||
from synapse.handlers.presence import PresenceHandler
|
||||
from synapse.handlers.sync import SyncHandler
|
||||
from synapse.handlers.typing import TypingHandler
|
||||
from synapse.handlers.room import RoomListHandler
|
||||
from synapse.handlers.auth import AuthHandler
|
||||
from synapse.handlers.appservice import ApplicationServicesHandler
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage import DataStore
|
||||
from synapse.util import Clock
|
||||
from synapse.util.distributor import Distributor
|
||||
from synapse.streams.events import EventSources
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.crypto.keyring import Keyring
|
||||
from synapse.push.pusherpool import PusherPool
|
||||
from synapse.events.builder import EventBuilderFactory
|
||||
from synapse.api.filtering import Filtering
|
||||
from synapse.rest.media.v1.media_repository import MediaRepository
|
||||
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
|
||||
import logging
|
||||
|
||||
from twisted.enterprise import adbapi
|
||||
from twisted.web.client import BrowserLikePolicyForHTTPS
|
||||
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.api.filtering import Filtering
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.appservice.api import ApplicationServiceApi
|
||||
from synapse.appservice.scheduler import ApplicationServiceScheduler
|
||||
from synapse.crypto.keyring import Keyring
|
||||
from synapse.events.builder import EventBuilderFactory
|
||||
from synapse.federation import initialize_http_replication
|
||||
from synapse.handlers import Handlers
|
||||
from synapse.handlers.appservice import ApplicationServicesHandler
|
||||
from synapse.handlers.auth import AuthHandler
|
||||
from synapse.handlers.device import DeviceHandler
|
||||
from synapse.handlers.e2e_keys import E2eKeysHandler
|
||||
from synapse.handlers.presence import PresenceHandler
|
||||
from synapse.handlers.room import RoomListHandler
|
||||
from synapse.handlers.sync import SyncHandler
|
||||
from synapse.handlers.typing import TypingHandler
|
||||
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.notifier import Notifier
|
||||
from synapse.push.pusherpool import PusherPool
|
||||
from synapse.rest.media.v1.media_repository import MediaRepository
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage import DataStore
|
||||
from synapse.streams.events import EventSources
|
||||
from synapse.util import Clock
|
||||
from synapse.util.distributor import Distributor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -94,6 +93,7 @@ class HomeServer(object):
|
|||
'room_list_handler',
|
||||
'auth_handler',
|
||||
'device_handler',
|
||||
'e2e_keys_handler',
|
||||
'application_service_api',
|
||||
'application_service_scheduler',
|
||||
'application_service_handler',
|
||||
|
@ -202,6 +202,9 @@ class HomeServer(object):
|
|||
def build_device_handler(self):
|
||||
return DeviceHandler(self)
|
||||
|
||||
def build_e2e_keys_handler(self):
|
||||
return E2eKeysHandler(self)
|
||||
|
||||
def build_application_service_api(self):
|
||||
return ApplicationServiceApi(self)
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import synapse.handlers
|
||||
import synapse.handlers.auth
|
||||
import synapse.handlers.device
|
||||
import synapse.handlers.e2e_keys
|
||||
import synapse.storage
|
||||
import synapse.state
|
||||
|
||||
|
@ -14,6 +15,9 @@ class HomeServer(object):
|
|||
def get_device_handler(self) -> synapse.handlers.device.DeviceHandler:
|
||||
pass
|
||||
|
||||
def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler:
|
||||
pass
|
||||
|
||||
def get_handlers(self) -> synapse.handlers.Handlers:
|
||||
pass
|
||||
|
||||
|
|
Loading…
Reference in a new issue