0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-11 20:42:23 +01:00

Update the device list cache when keys/query is called (#5693)

This commit is contained in:
Jorik Schellekens 2019-07-29 16:34:44 +01:00 committed by Erik Johnston
parent 3b476f5767
commit 85b0bd8fe0
3 changed files with 137 additions and 74 deletions

1
changelog.d/5693.bugfix Normal file
View file

@ -0,0 +1 @@
Fix UISIs during homeserver outage.

View file

@ -209,12 +209,12 @@ class DeviceHandler(DeviceWorkerHandler):
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self._edu_updater = DeviceListEduUpdater(hs, self) self.device_list_updater = DeviceListUpdater(hs, self)
federation_registry = hs.get_federation_registry() federation_registry = hs.get_federation_registry()
federation_registry.register_edu_handler( federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update "m.device_list_update", self.device_list_updater.incoming_device_list_update
) )
federation_registry.register_query_handler( federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices "user_devices", self.on_federation_query_user_devices
@ -426,7 +426,7 @@ def _update_device_from_client_ips(device, client_ips):
device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")}) device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
class DeviceListEduUpdater(object): class DeviceListUpdater(object):
"Handles incoming device list updates from federation and updates the DB" "Handles incoming device list updates from federation and updates the DB"
def __init__(self, hs, device_handler): def __init__(self, hs, device_handler):
@ -519,75 +519,7 @@ class DeviceListEduUpdater(object):
logger.debug("Need to re-sync devices for %r? %r", user_id, resync) logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
if resync: if resync:
# Fetch all devices for the user. yield self.user_device_resync(user_id)
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
except (
NotRetryingDestination,
RequestSendFailed,
HttpResponseException,
):
# TODO: Remember that we are now out of sync and try again
# later
logger.warn("Failed to handle device list update for %s", user_id)
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
# next time we get a device list update for this user_id.
# This makes it more likely that the device lists will
# eventually become consistent.
return
except FederationDeniedError as e:
logger.info(e)
return
except Exception:
# TODO: Remember that we are now out of sync and try again
# later
logger.exception(
"Failed to handle device list update for %s", user_id
)
return
stream_id = result["stream_id"]
devices = result["devices"]
# If the remote server has more than ~1000 devices for this user
# we assume that something is going horribly wrong (e.g. a bot
# that logs in and creates a new device every time it tries to
# send a message). Maintaining lots of devices per user in the
# cache can cause serious performance issues as if this request
# takes more than 60s to complete, internal replication from the
# inbound federation worker to the synapse master may time out
# causing the inbound federation to fail and causing the remote
# server to retry, causing a DoS. So in this scenario we give
# up on storing the total list of devices and only handle the
# delta instead.
if len(devices) > 1000:
logger.warn(
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
user_id,
len(devices),
)
devices = []
for device in devices:
logger.debug(
"Handling resync update %r/%r, ID: %r",
user_id,
device["device_id"],
stream_id,
)
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id
)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)
# We clobber the seen updates since we've re-synced from a given
# point.
self._seen_updates[user_id] = set([stream_id])
else: else:
# Simply update the single device, since we know that is the only # Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache) # change (because of the single prev_id matching the current cache)
@ -634,3 +566,77 @@ class DeviceListEduUpdater(object):
stream_id_in_updates.add(stream_id) stream_id_in_updates.add(stream_id)
return False return False
@defer.inlineCallbacks
def user_device_resync(self, user_id):
"""Fetches all devices for a user and updates the device cache with them.
Args:
user_id (str): The user's id whose device_list will be updated.
Returns:
Deferred[dict]: a dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
# TODO: Remember that we are now out of sync and try again
# later
logger.warn("Failed to handle device list update for %s", user_id)
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
# next time we get a device list update for this user_id.
# This makes it more likely that the device lists will
# eventually become consistent.
return
except FederationDeniedError as e:
logger.info(e)
return
except Exception:
# TODO: Remember that we are now out of sync and try again
# later
logger.exception("Failed to handle device list update for %s", user_id)
return
stream_id = result["stream_id"]
devices = result["devices"]
# If the remote server has more than ~1000 devices for this user
# we assume that something is going horribly wrong (e.g. a bot
# that logs in and creates a new device every time it tries to
# send a message). Maintaining lots of devices per user in the
# cache can cause serious performance issues as if this request
# takes more than 60s to complete, internal replication from the
# inbound federation worker to the synapse master may time out
# causing the inbound federation to fail and causing the remote
# server to retry, causing a DoS. So in this scenario we give
# up on storing the total list of devices and only handle the
# delta instead.
if len(devices) > 1000:
logger.warn(
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
user_id,
len(devices),
)
devices = []
for device in devices:
logger.debug(
"Handling resync update %r/%r, ID: %r",
user_id,
device["device_id"],
stream_id,
)
yield self.store.update_remote_device_list_cache(user_id, devices, stream_id)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)
# We clobber the seen updates since we've re-synced from a given
# point.
self._seen_updates[user_id] = set([stream_id])
defer.returnValue(result)

View file

@ -65,6 +65,7 @@ class E2eKeysHandler(object):
} }
} }
""" """
device_keys_query = query_body.get("device_keys", {}) device_keys_query = query_body.get("device_keys", {})
# separate users by domain. # separate users by domain.
@ -121,7 +122,58 @@ class E2eKeysHandler(object):
# Now fetch any devices that we don't have in our cache # Now fetch any devices that we don't have in our cache
@defer.inlineCallbacks @defer.inlineCallbacks
def do_remote_query(destination): def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
a remote homeserver and their device list is not in the device list
cache. If we share a room with this user and we're not querying for
specific user we will update the cache
with their device list."""
destination_query = remote_queries_not_in_cache[destination] destination_query = remote_queries_not_in_cache[destination]
# We first consider whether we wish to update the device list cache with
# the users device list. We want to track a user's devices when the
# authenticated user shares a room with the queried user and the query
# has not specified a particular device.
# If we update the cache for the queried user we remove them from further
# queries. We use the more efficient batched query_client_keys for all
# remaining users
user_ids_updated = []
for (user_id, device_list) in destination_query.items():
if user_id in user_ids_updated:
continue
if device_list:
continue
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
continue
# We've decided we're sharing a room with this user and should
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}
user_ids_updated.append(user_id)
except Exception as e:
failures[destination] = failures.get(destination, []).append(
_exception_to_failure(e)
)
if len(destination_query) == len(user_ids_updated):
# We've updated all the users in the query and we do not need to
# make any further remote calls.
return
# Remove all the users from the query which we have updated
for user_id in user_ids_updated:
destination_query.pop(user_id)
try: try:
remote_result = yield self.federation.query_client_keys( remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout destination, {"device_keys": destination_query}, timeout=timeout
@ -132,7 +184,8 @@ class E2eKeysHandler(object):
results[user_id] = keys results[user_id] = keys
except Exception as e: except Exception as e:
failures[destination] = _exception_to_failure(e) failure = _exception_to_failure(e)
failures[destination] = failure
yield make_deferred_yieldable( yield make_deferred_yieldable(
defer.gatherResults( defer.gatherResults(
@ -234,8 +287,10 @@ class E2eKeysHandler(object):
for user_id, keys in remote_result["one_time_keys"].items(): for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys: if user_id in device_keys:
json_result[user_id] = keys json_result[user_id] = keys
except Exception as e: except Exception as e:
failures[destination] = _exception_to_failure(e) failure = _exception_to_failure(e)
failures[destination] = failure
yield make_deferred_yieldable( yield make_deferred_yieldable(
defer.gatherResults( defer.gatherResults(
@ -263,6 +318,7 @@ class E2eKeysHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def upload_keys_for_user(self, user_id, device_id, keys): def upload_keys_for_user(self, user_id, device_id, keys):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys. # TODO: Validate the JSON to make sure it has the right keys.