0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-22 09:43:57 +01:00

Refactor MSC3030 /timestamp_to_event to move away from our snowflake pull from destination pattern (#14096)

1. `federation_client.timestamp_to_event(...)` now handles all `destination` looping and uses our generic `_try_destination_list(...)` helper.
 2. Consistently handling `NotRetryingDestination` and `FederationDeniedError` across `get_pdu` , backfill, and the generic `_try_destination_list` which is used for many places we use this pattern.
 3. `get_pdu(...)` now returns `PulledPduInfo` so we know which `destination` we ended up pulling the PDU from
This commit is contained in:
Eric Eastwood 2022-10-26 16:10:55 -05:00 committed by GitHub
parent 0d59ae706a
commit 40fa8294e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 192 additions and 127 deletions

1
changelog.d/14096.misc Normal file
View file

@ -0,0 +1 @@
Refactor [MSC3030](https://github.com/matrix-org/matrix-spec-proposals/pull/3030) `/timestamp_to_event` endpoint to loop over federation destinations with standard pattern and error handling.

View file

@ -80,6 +80,18 @@ PDU_RETRY_TIME_MS = 1 * 60 * 1000
T = TypeVar("T") T = TypeVar("T")
@attr.s(frozen=True, slots=True, auto_attribs=True)
class PulledPduInfo:
"""
A result object that stores the PDU and info about it like which homeserver we
pulled it from (`pull_origin`)
"""
pdu: EventBase
# Which homeserver we pulled the PDU from
pull_origin: str
class InvalidResponseError(RuntimeError): class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response """Helper for _try_destination_list: indicates that the server returned a response
we couldn't parse we couldn't parse
@ -114,7 +126,9 @@ class FederationClient(FederationBase):
self.hostname = hs.hostname self.hostname = hs.hostname
self.signing_key = hs.signing_key self.signing_key = hs.signing_key
self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache( # Cache mapping `event_id` to a tuple of the event itself and the `pull_origin`
# (which server we pulled the event from)
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
cache_name="get_pdu_cache", cache_name="get_pdu_cache",
clock=self._clock, clock=self._clock,
max_len=1000, max_len=1000,
@ -352,11 +366,11 @@ class FederationClient(FederationBase):
@tag_args @tag_args
async def get_pdu( async def get_pdu(
self, self,
destinations: Iterable[str], destinations: Collection[str],
event_id: str, event_id: str,
room_version: RoomVersion, room_version: RoomVersion,
timeout: Optional[int] = None, timeout: Optional[int] = None,
) -> Optional[EventBase]: ) -> Optional[PulledPduInfo]:
"""Requests the PDU with given origin and ID from the remote home """Requests the PDU with given origin and ID from the remote home
servers. servers.
@ -371,11 +385,11 @@ class FederationClient(FederationBase):
moving to the next destination. None indicates no timeout. moving to the next destination. None indicates no timeout.
Returns: Returns:
The requested PDU, or None if we were unable to find it. The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it.
""" """
logger.debug( logger.debug(
"get_pdu: event_id=%s from destinations=%s", event_id, destinations "get_pdu(event_id=%s): from destinations=%s", event_id, destinations
) )
# TODO: Rate limit the number of times we try and get the same event. # TODO: Rate limit the number of times we try and get the same event.
@ -384,19 +398,25 @@ class FederationClient(FederationBase):
# it gets persisted to the database), so we cache the results of the lookup. # it gets persisted to the database), so we cache the results of the lookup.
# Note that this is separate to the regular get_event cache which caches # Note that this is separate to the regular get_event cache which caches
# events once they have been persisted. # events once they have been persisted.
event = self._get_pdu_cache.get(event_id) get_pdu_cache_entry = self._get_pdu_cache.get(event_id)
event = None
pull_origin = None
if get_pdu_cache_entry:
event, pull_origin = get_pdu_cache_entry
# If we don't see the event in the cache, go try to fetch it from the # If we don't see the event in the cache, go try to fetch it from the
# provided remote federated destinations # provided remote federated destinations
if not event: else:
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
# TODO: We can probably refactor this to use `_try_destination_list`
for destination in destinations: for destination in destinations:
now = self._clock.time_msec() now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0) last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now: if last_attempt + PDU_RETRY_TIME_MS > now:
logger.debug( logger.debug(
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", "get_pdu(event_id=%s): skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
event_id,
destination, destination,
last_attempt, last_attempt,
PDU_RETRY_TIME_MS, PDU_RETRY_TIME_MS,
@ -411,43 +431,48 @@ class FederationClient(FederationBase):
room_version=room_version, room_version=room_version,
timeout=timeout, timeout=timeout,
) )
pull_origin = destination
pdu_attempts[destination] = now pdu_attempts[destination] = now
if event: if event:
# Prime the cache # Prime the cache
self._get_pdu_cache[event.event_id] = event self._get_pdu_cache[event.event_id] = (event, pull_origin)
# Now that we have an event, we can break out of this # Now that we have an event, we can break out of this
# loop and stop asking other destinations. # loop and stop asking other destinations.
break break
except NotRetryingDestination as e:
logger.info("get_pdu(event_id=%s): %s", event_id, e)
continue
except FederationDeniedError:
logger.info(
"get_pdu(event_id=%s): Not attempting to fetch PDU from %s because the homeserver is not on our federation whitelist",
event_id,
destination,
)
continue
except SynapseError as e: except SynapseError as e:
logger.info( logger.info(
"Failed to get PDU %s from %s because %s", "get_pdu(event_id=%s): Failed to get PDU from %s because %s",
event_id, event_id,
destination, destination,
e, e,
) )
continue continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(str(e))
continue
except Exception as e: except Exception as e:
pdu_attempts[destination] = now pdu_attempts[destination] = now
logger.info( logger.info(
"Failed to get PDU %s from %s because %s", "get_pdu(event_id=): Failed to get PDU from %s because %s",
event_id, event_id,
destination, destination,
e, e,
) )
continue continue
if not event: if not event or not pull_origin:
return None return None
# `event` now refers to an object stored in `get_pdu_cache`. Our # `event` now refers to an object stored in `get_pdu_cache`. Our
@ -459,7 +484,7 @@ class FederationClient(FederationBase):
event.room_version, event.room_version,
) )
return event_copy return PulledPduInfo(event_copy, pull_origin)
@trace @trace
@tag_args @tag_args
@ -699,12 +724,14 @@ class FederationClient(FederationBase):
pdu_origin = get_domain_from_id(pdu.sender) pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin: if not res and pdu_origin != origin:
try: try:
res = await self.get_pdu( pulled_pdu_info = await self.get_pdu(
destinations=[pdu_origin], destinations=[pdu_origin],
event_id=pdu.event_id, event_id=pdu.event_id,
room_version=room_version, room_version=room_version,
timeout=10000, timeout=10000,
) )
if pulled_pdu_info is not None:
res = pulled_pdu_info.pdu
except SynapseError: except SynapseError:
pass pass
@ -806,6 +833,7 @@ class FederationClient(FederationBase):
) )
for destination in destinations: for destination in destinations:
# We don't want to ask our own server for information we don't have
if destination == self.server_name: if destination == self.server_name:
continue continue
@ -814,9 +842,21 @@ class FederationClient(FederationBase):
except ( except (
RequestSendFailed, RequestSendFailed,
InvalidResponseError, InvalidResponseError,
NotRetryingDestination,
) as e: ) as e:
logger.warning("Failed to %s via %s: %s", description, destination, e) logger.warning("Failed to %s via %s: %s", description, destination, e)
# Skip to the next homeserver in the list to try.
continue
except NotRetryingDestination as e:
logger.info("%s: %s", description, e)
continue
except FederationDeniedError:
logger.info(
"%s: Not attempting to %s from %s because the homeserver is not on our federation whitelist",
description,
description,
destination,
)
continue
except UnsupportedRoomVersionError: except UnsupportedRoomVersionError:
raise raise
except HttpResponseException as e: except HttpResponseException as e:
@ -1609,6 +1649,54 @@ class FederationClient(FederationBase):
return result return result
async def timestamp_to_event( async def timestamp_to_event(
self, *, destinations: List[str], room_id: str, timestamp: int, direction: str
) -> Optional["TimestampToEventResponse"]:
"""
Calls each remote federating server from `destinations` asking for their closest
event to the given timestamp in the given direction until we get a response.
Also validates the response to always return the expected keys or raises an
error.
Args:
destinations: The domains of homeservers to try fetching from
room_id: Room to fetch the event from
timestamp: The point in time (inclusive) we should navigate from in
the given direction to find the closest event.
direction: ["f"|"b"] to indicate whether we should navigate forward
or backward from the given timestamp to find the closest event.
Returns:
A parsed TimestampToEventResponse including the closest event_id
and origin_server_ts or None if no destination has a response.
"""
async def _timestamp_to_event_from_destination(
destination: str,
) -> TimestampToEventResponse:
return await self._timestamp_to_event_from_destination(
destination, room_id, timestamp, direction
)
try:
# Loop through each homeserver candidate until we get a succesful response
timestamp_to_event_response = await self._try_destination_list(
"timestamp_to_event",
destinations,
# TODO: The requested timestamp may lie in a part of the
# event graph that the remote server *also* didn't have,
# in which case they will have returned another event
# which may be nowhere near the requested timestamp. In
# the future, we may need to reconcile that gap and ask
# other homeservers, and/or extend `/timestamp_to_event`
# to return events on *both* sides of the timestamp to
# help reconcile the gap faster.
_timestamp_to_event_from_destination,
)
return timestamp_to_event_response
except SynapseError:
return None
async def _timestamp_to_event_from_destination(
self, destination: str, room_id: str, timestamp: int, direction: str self, destination: str, room_id: str, timestamp: int, direction: str
) -> "TimestampToEventResponse": ) -> "TimestampToEventResponse":
""" """

View file

@ -442,6 +442,15 @@ class FederationHandler:
# appropriate stuff. # appropriate stuff.
# TODO: We can probably do something more intelligent here. # TODO: We can probably do something more intelligent here.
return True return True
except NotRetryingDestination as e:
logger.info("_maybe_backfill_inner: %s", e)
continue
except FederationDeniedError:
logger.info(
"_maybe_backfill_inner: Not attempting to backfill from %s because the homeserver is not on our federation whitelist",
dom,
)
continue
except (SynapseError, InvalidResponseError) as e: except (SynapseError, InvalidResponseError) as e:
logger.info("Failed to backfill from %s because %s", dom, e) logger.info("Failed to backfill from %s because %s", dom, e)
continue continue
@ -477,15 +486,9 @@ class FederationHandler:
logger.info("Failed to backfill from %s because %s", dom, e) logger.info("Failed to backfill from %s because %s", dom, e)
continue continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except RequestSendFailed as e: except RequestSendFailed as e:
logger.info("Failed to get backfill from %s because %s", dom, e) logger.info("Failed to get backfill from %s because %s", dom, e)
continue continue
except FederationDeniedError as e:
logger.info(e)
continue
except Exception as e: except Exception as e:
logger.exception("Failed to backfill from %s because %s", dom, e) logger.exception("Failed to backfill from %s because %s", dom, e)
continue continue

View file

@ -58,7 +58,7 @@ from synapse.event_auth import (
) )
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo
from synapse.logging.context import nested_logging_context from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import ( from synapse.logging.opentracing import (
SynapseTags, SynapseTags,
@ -1517,8 +1517,8 @@ class FederationEventHandler:
) )
async def backfill_event_id( async def backfill_event_id(
self, destination: str, room_id: str, event_id: str self, destinations: List[str], room_id: str, event_id: str
) -> EventBase: ) -> PulledPduInfo:
"""Backfill a single event and persist it as a non-outlier which means """Backfill a single event and persist it as a non-outlier which means
we also pull in all of the state and auth events necessary for it. we also pull in all of the state and auth events necessary for it.
@ -1530,24 +1530,21 @@ class FederationEventHandler:
Raises: Raises:
FederationError if we are unable to find the event from the destination FederationError if we are unable to find the event from the destination
""" """
logger.info( logger.info("backfill_event_id: event_id=%s", event_id)
"backfill_event_id: event_id=%s from destination=%s", event_id, destination
)
room_version = await self._store.get_room_version(room_id) room_version = await self._store.get_room_version(room_id)
event_from_response = await self._federation_client.get_pdu( pulled_pdu_info = await self._federation_client.get_pdu(
[destination], destinations,
event_id, event_id,
room_version, room_version,
) )
if not event_from_response: if not pulled_pdu_info:
raise FederationError( raise FederationError(
"ERROR", "ERROR",
404, 404,
"Unable to find event_id=%s from destination=%s to backfill." f"Unable to find event_id={event_id} from remote servers to backfill.",
% (event_id, destination),
affected=event_id, affected=event_id,
) )
@ -1555,13 +1552,13 @@ class FederationEventHandler:
# and auth events to de-outlier it. This also sets up the necessary # and auth events to de-outlier it. This also sets up the necessary
# `state_groups` for the event. # `state_groups` for the event.
await self._process_pulled_events( await self._process_pulled_events(
destination, pulled_pdu_info.pull_origin,
[event_from_response], [pulled_pdu_info.pdu],
# Prevent notifications going to clients # Prevent notifications going to clients
backfilled=True, backfilled=True,
) )
return event_from_response return pulled_pdu_info
@trace @trace
@tag_args @tag_args
@ -1584,19 +1581,19 @@ class FederationEventHandler:
async def get_event(event_id: str) -> None: async def get_event(event_id: str) -> None:
with nested_logging_context(event_id): with nested_logging_context(event_id):
try: try:
event = await self._federation_client.get_pdu( pulled_pdu_info = await self._federation_client.get_pdu(
[destination], [destination],
event_id, event_id,
room_version, room_version,
) )
if event is None: if pulled_pdu_info is None:
logger.warning( logger.warning(
"Server %s didn't return event %s", "Server %s didn't return event %s",
destination, destination,
event_id, event_id,
) )
return return
events.append(event) events.append(pulled_pdu_info.pdu)
except Exception as e: except Exception as e:
logger.warning( logger.warning(

View file

@ -49,7 +49,6 @@ from synapse.api.constants import (
from synapse.api.errors import ( from synapse.api.errors import (
AuthError, AuthError,
Codes, Codes,
HttpResponseException,
LimitExceededError, LimitExceededError,
NotFoundError, NotFoundError,
StoreError, StoreError,
@ -60,7 +59,6 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.event_auth import validate_event_for_room_version from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.utils import copy_and_fixup_power_levels_contents from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.federation.federation_client import InvalidResponseError
from synapse.handlers.relations import BundledAggregations from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin from synapse.rest.admin._base import assert_user_is_admin
@ -1472,7 +1470,12 @@ class TimestampLookupHandler:
Raises: Raises:
SynapseError if unable to find any event locally in the given direction SynapseError if unable to find any event locally in the given direction
""" """
logger.debug(
"get_event_for_timestamp(room_id=%s, timestamp=%s, direction=%s) Finding closest event...",
room_id,
timestamp,
direction,
)
local_event_id = await self.store.get_event_id_for_timestamp( local_event_id = await self.store.get_event_id_for_timestamp(
room_id, timestamp, direction room_id, timestamp, direction
) )
@ -1524,85 +1527,54 @@ class TimestampLookupHandler:
) )
) )
# Loop through each homeserver candidate until we get a succesful response remote_response = await self.federation_client.timestamp_to_event(
for domain in likely_domains: destinations=likely_domains,
# We don't want to ask our own server for information we don't have room_id=room_id,
if domain == self.server_name: timestamp=timestamp,
continue direction=direction,
)
if remote_response is not None:
logger.debug(
"get_event_for_timestamp: remote_response=%s",
remote_response,
)
try: remote_event_id = remote_response.event_id
remote_response = await self.federation_client.timestamp_to_event( remote_origin_server_ts = remote_response.origin_server_ts
domain, room_id, timestamp, direction
) # Backfill this event so we can get a pagination token for
logger.debug( # it with `/context` and paginate `/messages` from this
"get_event_for_timestamp: response from domain(%s)=%s", # point.
domain, pulled_pdu_info = await self.federation_event_handler.backfill_event_id(
remote_response, likely_domains, room_id, remote_event_id
)
remote_event = pulled_pdu_info.pdu
# XXX: When we see that the remote server is not trustworthy,
# maybe we should not ask them first in the future.
if remote_origin_server_ts != remote_event.origin_server_ts:
logger.info(
"get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
pulled_pdu_info.pull_origin,
remote_event_id,
remote_origin_server_ts,
remote_event.origin_server_ts,
) )
remote_event_id = remote_response.event_id # Only return the remote event if it's closer than the local event
remote_origin_server_ts = remote_response.origin_server_ts if not local_event or (
abs(remote_event.origin_server_ts - timestamp)
# Backfill this event so we can get a pagination token for < abs(local_event.origin_server_ts - timestamp)
# it with `/context` and paginate `/messages` from this ):
# point. logger.info(
# "get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
# TODO: The requested timestamp may lie in a part of the remote_event_id,
# event graph that the remote server *also* didn't have, remote_event.origin_server_ts,
# in which case they will have returned another event timestamp,
# which may be nowhere near the requested timestamp. In local_event.event_id if local_event else None,
# the future, we may need to reconcile that gap and ask local_event.origin_server_ts if local_event else None,
# other homeservers, and/or extend `/timestamp_to_event`
# to return events on *both* sides of the timestamp to
# help reconcile the gap faster.
remote_event = (
await self.federation_event_handler.backfill_event_id(
domain, room_id, remote_event_id
)
)
# XXX: When we see that the remote server is not trustworthy,
# maybe we should not ask them first in the future.
if remote_origin_server_ts != remote_event.origin_server_ts:
logger.info(
"get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
domain,
remote_event_id,
remote_origin_server_ts,
remote_event.origin_server_ts,
)
# Only return the remote event if it's closer than the local event
if not local_event or (
abs(remote_event.origin_server_ts - timestamp)
< abs(local_event.origin_server_ts - timestamp)
):
logger.info(
"get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
remote_event_id,
remote_event.origin_server_ts,
timestamp,
local_event.event_id if local_event else None,
local_event.origin_server_ts if local_event else None,
)
return remote_event_id, remote_origin_server_ts
except (HttpResponseException, InvalidResponseError) as ex:
# Let's not put a high priority on some other homeserver
# failing to respond or giving a random response
logger.debug(
"get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
domain,
type(ex).__name__,
ex,
ex.args,
)
except Exception:
# But we do want to see some exceptions in our code
logger.warning(
"get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception",
domain,
exc_info=True,
) )
return remote_event_id, remote_origin_server_ts
# To appease mypy, we have to add both of these conditions to check for # To appease mypy, we have to add both of these conditions to check for
# `None`. We only expect `local_event` to be `None` when # `None`. We only expect `local_event` to be `None` when

View file

@ -51,7 +51,7 @@ class NotRetryingDestination(Exception):
destination: the domain in question destination: the domain in question
""" """
msg = "Not retrying server %s." % (destination,) msg = f"Not retrying server {destination} because we tried it recently retry_last_ts={retry_last_ts} and we won't check for another retry_interval={retry_interval}ms."
super().__init__(msg) super().__init__(msg)
self.retry_last_ts = retry_last_ts self.retry_last_ts = retry_last_ts

View file

@ -142,14 +142,14 @@ class FederationClientTest(FederatingHomeserverTestCase):
def test_get_pdu_returns_nothing_when_event_does_not_exist(self): def test_get_pdu_returns_nothing_when_event_does_not_exist(self):
"""No event should be returned when the event does not exist""" """No event should be returned when the event does not exist"""
remote_pdu = self.get_success( pulled_pdu_info = self.get_success(
self.hs.get_federation_client().get_pdu( self.hs.get_federation_client().get_pdu(
["yet.another.server"], ["yet.another.server"],
"event_should_not_exist", "event_should_not_exist",
RoomVersions.V9, RoomVersions.V9,
) )
) )
self.assertEqual(remote_pdu, None) self.assertEqual(pulled_pdu_info, None)
def test_get_pdu(self): def test_get_pdu(self):
"""Test to make sure an event is returned by `get_pdu()`""" """Test to make sure an event is returned by `get_pdu()`"""
@ -169,13 +169,15 @@ class FederationClientTest(FederatingHomeserverTestCase):
remote_pdu.internal_metadata.outlier = True remote_pdu.internal_metadata.outlier = True
# Get the event again. This time it should read it from cache. # Get the event again. This time it should read it from cache.
remote_pdu2 = self.get_success( pulled_pdu_info2 = self.get_success(
self.hs.get_federation_client().get_pdu( self.hs.get_federation_client().get_pdu(
["yet.another.server"], ["yet.another.server"],
remote_pdu.event_id, remote_pdu.event_id,
RoomVersions.V9, RoomVersions.V9,
) )
) )
self.assertIsNotNone(pulled_pdu_info2)
remote_pdu2 = pulled_pdu_info2.pdu
# Sanity check that we are working against the same event # Sanity check that we are working against the same event
self.assertEqual(remote_pdu.event_id, remote_pdu2.event_id) self.assertEqual(remote_pdu.event_id, remote_pdu2.event_id)
@ -215,13 +217,15 @@ class FederationClientTest(FederatingHomeserverTestCase):
) )
) )
remote_pdu = self.get_success( pulled_pdu_info = self.get_success(
self.hs.get_federation_client().get_pdu( self.hs.get_federation_client().get_pdu(
["yet.another.server"], ["yet.another.server"],
"event_id", "event_id",
RoomVersions.V9, RoomVersions.V9,
) )
) )
self.assertIsNotNone(pulled_pdu_info)
remote_pdu = pulled_pdu_info.pdu
# check the right call got made to the agent # check the right call got made to the agent
self._mock_agent.request.assert_called_once_with( self._mock_agent.request.assert_called_once_with(