forked from MirrorHub/synapse
Retry to sync out of sync device lists (#7453)
When a call to `user_device_resync` fails, we don't currently mark the remote user's device list as out of sync, nor do we retry to sync it. https://github.com/matrix-org/synapse/pull/6776 introduced some code infrastructure to mark device lists as stale/out of sync. This commit uses that code infrastructure to mark device lists as out of sync if processing an incoming device list update makes the device handler realise that the device list is out of sync, but we can't resync right now. It also adds a looping call to retry all failed resync every 30s. This shouldn't cause too much spam in the logs as this commit also removes the "Failed to handle device list update for..." warning logs when catching `NotRetryingDestination`. Fixes #7418
This commit is contained in:
parent
0bbbd10513
commit
d1ae1015ec
4 changed files with 158 additions and 20 deletions
1
changelog.d/7453.bugfix
Normal file
1
changelog.d/7453.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a bug that would cause Synapse not to resync out-of-sync device lists.
|
|
@ -29,6 +29,7 @@ from synapse.api.errors import (
|
|||
SynapseError,
|
||||
)
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.util import stringutils
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
@ -535,6 +536,15 @@ class DeviceListUpdater(object):
|
|||
iterable=True,
|
||||
)
|
||||
|
||||
# Attempt to resync out of sync device lists every 30s.
|
||||
self._resync_retry_in_progress = False
|
||||
self.clock.looping_call(
|
||||
run_as_background_process,
|
||||
30 * 1000,
|
||||
func=self._maybe_retry_device_resync,
|
||||
desc="_maybe_retry_device_resync",
|
||||
)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def incoming_device_list_update(self, origin, edu_content):
|
||||
|
@ -679,11 +689,50 @@ class DeviceListUpdater(object):
|
|||
return False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_device_resync(self, user_id):
|
||||
def _maybe_retry_device_resync(self):
|
||||
"""Retry to resync device lists that are out of sync, except if another retry is
|
||||
in progress.
|
||||
"""
|
||||
if self._resync_retry_in_progress:
|
||||
return
|
||||
|
||||
try:
|
||||
# Prevent another call of this function to retry resyncing device lists so
|
||||
# we don't send too many requests.
|
||||
self._resync_retry_in_progress = True
|
||||
# Get all of the users that need resyncing.
|
||||
need_resync = yield self.store.get_user_ids_requiring_device_list_resync()
|
||||
# Iterate over the set of user IDs.
|
||||
for user_id in need_resync:
|
||||
# Try to resync the current user's devices list. Exception handling
|
||||
# isn't necessary here, since user_device_resync catches all instances
|
||||
# of "Exception" that might be raised from the federation request. This
|
||||
# means that if an exception is raised by this function, it must be
|
||||
# because of a database issue, which means _maybe_retry_device_resync
|
||||
# probably won't be able to go much further anyway.
|
||||
result = yield self.user_device_resync(
|
||||
user_id=user_id, mark_failed_as_stale=False,
|
||||
)
|
||||
# user_device_resync only returns a result if it managed to successfully
|
||||
# resync and update the database. Updating the table of users requiring
|
||||
# resync isn't necessary here as user_device_resync already does it
|
||||
# (through self.store.update_remote_device_list_cache).
|
||||
if result:
|
||||
logger.debug(
|
||||
"Successfully resynced the device list for %s" % user_id,
|
||||
)
|
||||
finally:
|
||||
# Allow future calls to retry resyncinc out of sync device lists.
|
||||
self._resync_retry_in_progress = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_device_resync(self, user_id, mark_failed_as_stale=True):
|
||||
"""Fetches all devices for a user and updates the device cache with them.
|
||||
|
||||
Args:
|
||||
user_id (str): The user's id whose device_list will be updated.
|
||||
mark_failed_as_stale (bool): Whether to mark the user's device list as stale
|
||||
if the attempt to resync failed.
|
||||
Returns:
|
||||
Deferred[dict]: a dict with device info as under the "devices" in the result of this
|
||||
request:
|
||||
|
@ -694,10 +743,23 @@ class DeviceListUpdater(object):
|
|||
origin = get_domain_from_id(user_id)
|
||||
try:
|
||||
result = yield self.federation.query_user_devices(origin, user_id)
|
||||
except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
logger.warning("Failed to handle device list update for %s", user_id)
|
||||
except NotRetryingDestination:
|
||||
if mark_failed_as_stale:
|
||||
# Mark the remote user's device list as stale so we know we need to retry
|
||||
# it later.
|
||||
yield self.store.mark_remote_user_device_cache_as_stale(user_id)
|
||||
|
||||
return
|
||||
except (RequestSendFailed, HttpResponseException) as e:
|
||||
logger.warning(
|
||||
"Failed to handle device list update for %s: %s", user_id, e,
|
||||
)
|
||||
|
||||
if mark_failed_as_stale:
|
||||
# Mark the remote user's device list as stale so we know we need to retry
|
||||
# it later.
|
||||
yield self.store.mark_remote_user_device_cache_as_stale(user_id)
|
||||
|
||||
# We abort on exceptions rather than accepting the update
|
||||
# as otherwise synapse will 'forget' that its device list
|
||||
# is out of date. If we bail then we will retry the resync
|
||||
|
@ -711,13 +773,17 @@ class DeviceListUpdater(object):
|
|||
logger.info(e)
|
||||
return
|
||||
except Exception as e:
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
set_tag("error", True)
|
||||
log_kv(
|
||||
{"message": "Exception raised by federation request", "exception": e}
|
||||
)
|
||||
logger.exception("Failed to handle device list update for %s", user_id)
|
||||
|
||||
if mark_failed_as_stale:
|
||||
# Mark the remote user's device list as stale so we know we need to retry
|
||||
# it later.
|
||||
yield self.store.mark_remote_user_device_cache_as_stale(user_id)
|
||||
|
||||
return
|
||||
log_kv({"result": result})
|
||||
stream_id = result["stream_id"]
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import List, Tuple
|
||||
from typing import List, Optional, Set, Tuple
|
||||
|
||||
from six import iteritems
|
||||
|
||||
|
@ -649,21 +649,31 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]):
|
||||
def get_user_ids_requiring_device_list_resync(
|
||||
self, user_ids: Optional[Collection[str]] = None,
|
||||
) -> Set[str]:
|
||||
"""Given a list of remote users return the list of users that we
|
||||
should resync the device lists for.
|
||||
should resync the device lists for. If None is given instead of a list,
|
||||
return every user that we should resync the device lists for.
|
||||
|
||||
Returns:
|
||||
Deferred[Set[str]]
|
||||
The IDs of users whose device lists need resync.
|
||||
"""
|
||||
|
||||
rows = yield self.db.simple_select_many_batch(
|
||||
table="device_lists_remote_resync",
|
||||
column="user_id",
|
||||
iterable=user_ids,
|
||||
retcols=("user_id",),
|
||||
desc="get_user_ids_requiring_device_list_resync",
|
||||
)
|
||||
if user_ids:
|
||||
rows = yield self.db.simple_select_many_batch(
|
||||
table="device_lists_remote_resync",
|
||||
column="user_id",
|
||||
iterable=user_ids,
|
||||
retcols=("user_id",),
|
||||
desc="get_user_ids_requiring_device_list_resync_with_iterable",
|
||||
)
|
||||
else:
|
||||
rows = yield self.db.simple_select_list(
|
||||
table="device_lists_remote_resync",
|
||||
keyvalues=None,
|
||||
retcols=("user_id",),
|
||||
desc="get_user_ids_requiring_device_list_resync",
|
||||
)
|
||||
|
||||
return {row["user_id"] for row in rows}
|
||||
|
||||
|
|
|
@ -6,12 +6,13 @@ from synapse.events import make_event_from_dict
|
|||
from synapse.logging.context import LoggingContext
|
||||
from synapse.types import Requester, UserID
|
||||
from synapse.util import Clock
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
|
||||
|
||||
|
||||
class MessageAcceptTests(unittest.TestCase):
|
||||
class MessageAcceptTests(unittest.HomeserverTestCase):
|
||||
def setUp(self):
|
||||
|
||||
self.http_client = Mock()
|
||||
|
@ -145,3 +146,63 @@ class MessageAcceptTests(unittest.TestCase):
|
|||
# Make sure the invalid event isn't there
|
||||
extrem = maybeDeferred(self.store.get_latest_event_ids_in_room, self.room_id)
|
||||
self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
|
||||
|
||||
def test_retry_device_list_resync(self):
|
||||
"""Tests that device lists are marked as stale if they couldn't be synced, and
|
||||
that stale device lists are retried periodically.
|
||||
"""
|
||||
remote_user_id = "@john:test_remote"
|
||||
remote_origin = "test_remote"
|
||||
|
||||
# Track the number of attempts to resync the user's device list.
|
||||
self.resync_attempts = 0
|
||||
|
||||
# When this function is called, increment the number of resync attempts (only if
|
||||
# we're querying devices for the right user ID), then raise a
|
||||
# NotRetryingDestination error to fail the resync gracefully.
|
||||
def query_user_devices(destination, user_id):
|
||||
if user_id == remote_user_id:
|
||||
self.resync_attempts += 1
|
||||
|
||||
raise NotRetryingDestination(0, 0, destination)
|
||||
|
||||
# Register the mock on the federation client.
|
||||
federation_client = self.homeserver.get_federation_client()
|
||||
federation_client.query_user_devices = Mock(side_effect=query_user_devices)
|
||||
|
||||
# Register a mock on the store so that the incoming update doesn't fail because
|
||||
# we don't share a room with the user.
|
||||
store = self.homeserver.get_datastore()
|
||||
store.get_rooms_for_user = Mock(return_value=["!someroom:test"])
|
||||
|
||||
# Manually inject a fake device list update. We need this update to include at
|
||||
# least one prev_id so that the user's device list will need to be retried.
|
||||
device_list_updater = self.homeserver.get_device_handler().device_list_updater
|
||||
self.get_success(
|
||||
device_list_updater.incoming_device_list_update(
|
||||
origin=remote_origin,
|
||||
edu_content={
|
||||
"deleted": False,
|
||||
"device_display_name": "Mobile",
|
||||
"device_id": "QBUAZIFURK",
|
||||
"prev_id": [5],
|
||||
"stream_id": 6,
|
||||
"user_id": remote_user_id,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Check that there was one resync attempt.
|
||||
self.assertEqual(self.resync_attempts, 1)
|
||||
|
||||
# Check that the resync attempt failed and caused the user's device list to be
|
||||
# marked as stale.
|
||||
need_resync = self.get_success(
|
||||
store.get_user_ids_requiring_device_list_resync()
|
||||
)
|
||||
self.assertIn(remote_user_id, need_resync)
|
||||
|
||||
# Check that waiting for 30 seconds caused Synapse to retry resyncing the device
|
||||
# list.
|
||||
self.reactor.advance(30)
|
||||
self.assertEqual(self.resync_attempts, 2)
|
||||
|
|
Loading…
Reference in a new issue