forked from MirrorHub/synapse
Fix caching devices for remote servers in worker.
When the `/keys/query` API is hit on client_reader worker Synapse may decide that it needs to resync some remote deivces. Usually this happens on master, and then gets cached. However, that fails on workers and so it falls back to fetching devices from remotes directly, which may in turn fail if the remote is down.
This commit is contained in:
parent
4086002827
commit
c16e192e2f
3 changed files with 94 additions and 4 deletions
|
@ -30,6 +30,7 @@ from twisted.internet import defer
|
|||
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
|
||||
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
|
||||
from synapse.types import (
|
||||
UserID,
|
||||
get_domain_from_id,
|
||||
|
@ -53,6 +54,12 @@ class E2eKeysHandler(object):
|
|||
|
||||
self._edu_updater = SigningKeyEduUpdater(hs, self)
|
||||
|
||||
self._is_master = hs.config.worker_app is None
|
||||
if not self._is_master:
|
||||
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
|
||||
hs
|
||||
)
|
||||
|
||||
federation_registry = hs.get_federation_registry()
|
||||
|
||||
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
|
||||
|
@ -191,9 +198,15 @@ class E2eKeysHandler(object):
|
|||
# probably be tracking their device lists. However, we haven't
|
||||
# done an initial sync on the device list so we do it now.
|
||||
try:
|
||||
if self._is_master:
|
||||
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
|
||||
user_id
|
||||
)
|
||||
else:
|
||||
user_devices = yield self._user_device_resync_client(
|
||||
user_id=user_id
|
||||
)
|
||||
|
||||
user_devices = user_devices["devices"]
|
||||
for device in user_devices:
|
||||
results[user_id] = {device["device_id"]: device["keys"]}
|
||||
|
|
|
@ -14,7 +14,14 @@
|
|||
# limitations under the License.
|
||||
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.replication.http import federation, login, membership, register, send_event
|
||||
from synapse.replication.http import (
|
||||
devices,
|
||||
federation,
|
||||
login,
|
||||
membership,
|
||||
register,
|
||||
send_event,
|
||||
)
|
||||
|
||||
REPLICATION_PREFIX = "/_synapse/replication"
|
||||
|
||||
|
@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource):
|
|||
federation.register_servlets(hs, self)
|
||||
login.register_servlets(hs, self)
|
||||
register.register_servlets(hs, self)
|
||||
devices.register_servlets(hs, self)
|
||||
|
|
69
synapse/replication/http/devices.py
Normal file
69
synapse/replication/http/devices.py
Normal file
|
@ -0,0 +1,69 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from synapse.replication.http._base import ReplicationEndpoint
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
|
||||
"""Notifies that a user has joined or left the room
|
||||
|
||||
Request format:
|
||||
|
||||
POST /_synapse/replication/user_device_resync/:user_id
|
||||
|
||||
{}
|
||||
|
||||
Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
|
||||
response, e.g.:
|
||||
|
||||
{
|
||||
"user_id": "@alice:example.org",
|
||||
"devices": [
|
||||
{
|
||||
"device_id": "JLAFKJWSCS",
|
||||
"keys": { ... },
|
||||
"device_display_name": "Alice's Mobile Phone"
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
NAME = "user_device_resync"
|
||||
PATH_ARGS = ("user_id",)
|
||||
CACHE = False
|
||||
|
||||
def __init__(self, hs):
|
||||
super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
|
||||
|
||||
self.device_list_updater = hs.get_device_handler().device_list_updater
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
@staticmethod
|
||||
def _serialize_payload(user_id):
|
||||
return {}
|
||||
|
||||
async def _handle_request(self, request, user_id):
|
||||
user_devices = await self.device_list_updater.user_device_resync(user_id)
|
||||
|
||||
return 200, user_devices
|
||||
|
||||
|
||||
def register_servlets(hs, http_server):
|
||||
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
|
Loading…
Reference in a new issue