Spaces summary: call out to other servers ()

When we hit an unknown room in the space tree, see if there are other servers that we might be able to poll to get the data.

Fixes: 
This commit is contained in:
Richard van der Hoff 2021-03-24 12:45:39 +00:00 committed by GitHub
parent 4655d2221e
commit c73cc2c2ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 324 additions and 27 deletions
changelog.d
synapse

1
changelog.d/9653.feature Normal file
View file

@ -0,0 +1 @@
Add initial experimental support for a "space summary" API.

View file

@ -27,11 +27,13 @@ from typing import (
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)
import attr
from prometheus_client import Counter
from twisted.internet import defer
@ -455,6 +457,7 @@ class FederationClient(FederationBase):
description: str,
destinations: Iterable[str],
callback: Callable[[str], Awaitable[T]],
failover_on_unknown_endpoint: bool = False,
) -> T:
"""Try an operation on a series of servers, until it succeeds
@ -474,6 +477,10 @@ class FederationClient(FederationBase):
next server tried. Normally the stacktrace is logged but this is
suppressed if the exception is an InvalidResponseError.
failover_on_unknown_endpoint: if True, we will try other servers if it looks
like a server doesn't support the endpoint. This is typically useful
if the endpoint in question is new or experimental.
Returns:
The result of callback, if it succeeds
@ -493,16 +500,31 @@ class FederationClient(FederationBase):
except UnsupportedRoomVersionError:
raise
except HttpResponseException as e:
if not 500 <= e.code < 600:
raise e.to_synapse_error()
else:
logger.warning(
"Failed to %s via %s: %i %s",
description,
destination,
e.code,
e.args[0],
)
synapse_error = e.to_synapse_error()
failover = False
if 500 <= e.code < 600:
failover = True
elif failover_on_unknown_endpoint:
# there is no good way to detect an "unknown" endpoint. Dendrite
# returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
if e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
):
failover = True
if not failover:
raise synapse_error from e
logger.warning(
"Failed to %s via %s: %i %s",
description,
destination,
e.code,
e.args[0],
)
except Exception:
logger.warning(
"Failed to %s via %s", description, destination, exc_info=True
@ -1042,3 +1064,141 @@ class FederationClient(FederationBase):
# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
return None
async def get_space_summary(
self,
destinations: Iterable[str],
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> "FederationSpaceSummaryResult":
"""
Call other servers to get a summary of the given space
Args:
destinations: The remote servers. We will try them in turn, omitting any
that have been blacklisted.
room_id: ID of the space to be queried
suggested_only: If true, ask the remote server to only return children
with the "suggested" flag set
max_rooms_per_space: A limit on the number of children to return for each
space
exclude_rooms: A list of room IDs to tell the remote server to skip
Returns:
a parsed FederationSpaceSummaryResult
Raises:
SynapseError if we were unable to get a valid summary from any of the
remote servers
"""
async def send_request(destination: str) -> FederationSpaceSummaryResult:
res = await self.transport_layer.get_space_summary(
destination=destination,
room_id=room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_rooms_per_space,
exclude_rooms=exclude_rooms,
)
try:
return FederationSpaceSummaryResult.from_json_dict(res)
except ValueError as e:
raise InvalidResponseError(str(e))
return await self._try_destination_list(
"fetch space summary",
destinations,
send_request,
failover_on_unknown_endpoint=True,
)
@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryEventResult:
"""Represents a single event in the result of a successful get_space_summary call.
It's essentially just a serialised event object, but we do a bit of parsing and
validation in `from_json_dict` and store some of the validated properties in
object attributes.
"""
event_type = attr.ib(type=str)
state_key = attr.ib(type=str)
via = attr.ib(type=Sequence[str])
# the raw data, including the above keys
data = attr.ib(type=JsonDict)
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
"""Parse an event within the result of a /spaces/ request
Args:
d: json object to be parsed
Raises:
ValueError if d is not a valid event
"""
event_type = d.get("type")
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")
state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")
content = d.get("content")
if not isinstance(content, dict):
raise ValueError("Invalid event: 'content' must be a dict")
via = content.get("via")
if not isinstance(via, Sequence):
raise ValueError("Invalid event: 'via' must be a list")
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")
return cls(event_type, state_key, via, d)
@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryResult:
"""Represents the data returned by a successful get_space_summary call."""
rooms = attr.ib(type=Sequence[JsonDict])
events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
"""Parse the result of a /spaces/ request
Args:
d: json object to be parsed
Raises:
ValueError if d is not a valid /spaces/ response
"""
rooms = d.get("rooms")
if not isinstance(rooms, Sequence):
raise ValueError("'rooms' must be a list")
if any(not isinstance(r, dict) for r in rooms):
raise ValueError("Invalid room in 'rooms' list")
events = d.get("events")
if not isinstance(events, Sequence):
raise ValueError("'events' must be a list")
if any(not isinstance(e, dict) for e in events):
raise ValueError("Invalid event in 'events' list")
parsed_events = [
FederationSpaceSummaryEventResult.from_json_dict(e) for e in events
]
return cls(rooms, parsed_events)

View file

@ -16,7 +16,7 @@
import logging
import urllib
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from synapse.api.constants import Membership
from synapse.api.errors import Codes, HttpResponseException, SynapseError
@ -26,6 +26,7 @@ from synapse.api.urls import (
FEDERATION_V2_PREFIX,
)
from synapse.logging.utils import log_function
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@ -978,6 +979,38 @@ class TransportLayerClient:
return self.client.get_json(destination=destination, path=path)
async def get_space_summary(
self,
destination: str,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> JsonDict:
"""
Args:
destination: The remote server
room_id: The room ID to ask about.
suggested_only: if True, only suggested rooms will be returned
max_rooms_per_space: an optional limit to the number of children to be
returned per space
exclude_rooms: a list of any rooms we can skip
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
)
params = {
"suggested_only": suggested_only,
"exclude_rooms": exclude_rooms,
}
if max_rooms_per_space is not None:
params["max_rooms_per_space"] = max_rooms_per_space
return await self.client.post_json(
destination=destination, path=path, data=params
)
def _create_path(federation_prefix, path, *args):
"""

View file

@ -16,7 +16,7 @@
import itertools
import logging
from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast
import attr
@ -38,6 +38,9 @@ MAX_ROOMS = 50
# max number of events to return per room.
MAX_ROOMS_PER_SPACE = 50
# max number of federation servers to hit per room
MAX_SERVERS_PER_SPACE = 3
class SpaceSummaryHandler:
def __init__(self, hs: "HomeServer"):
@ -47,6 +50,8 @@ class SpaceSummaryHandler:
self._state_handler = hs.get_state_handler()
self._store = hs.get_datastore()
self._event_serializer = hs.get_event_client_serializer()
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
async def get_space_summary(
self,
@ -78,35 +83,81 @@ class SpaceSummaryHandler:
await self._auth.check_user_in_room_or_world_readable(room_id, requester)
# the queue of rooms to process
room_queue = deque((_RoomQueueEntry(room_id),))
room_queue = deque((_RoomQueueEntry(room_id, ()),))
# rooms we have already processed
processed_rooms = set() # type: Set[str]
# events we have already processed. We don't necessarily have their event ids,
# so instead we key on (room id, state key)
processed_events = set() # type: Set[Tuple[str, str]]
rooms_result = [] # type: List[JsonDict]
events_result = [] # type: List[JsonDict]
while room_queue and len(rooms_result) < MAX_ROOMS:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
if room_id in processed_rooms:
# already done this room
continue
logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)
is_in_room = await self._store.is_host_joined(room_id, self._server_name)
# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing.
max_children = max_rooms_per_space if processed_rooms else None
rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
if is_in_room:
rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
)
else:
rooms, events = await self._summarize_remote_room(
queue_entry,
suggested_only,
max_children,
exclude_rooms=processed_rooms,
)
logger.debug(
"Query of %s returned rooms %s, events %s",
queue_entry.room_id,
[room.get("room_id") for room in rooms],
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
rooms_result.extend(rooms)
events_result.extend(events)
# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(_RoomQueueEntry(edge_event["state_key"]))
# any rooms returned don't need visiting again
processed_rooms.update(cast(str, room.get("room_id")) for room in rooms)
# the room we queried may or may not have been returned, but don't process
# it again, anyway.
processed_rooms.add(room_id)
# XXX: is it ok that we blindly iterate through any events returned by
# a remote server, whether or not they actually link to any rooms in our
# tree?
for ev in events:
# remote servers might return events we have already processed
# (eg, Dendrite returns inward pointers as well as outward ones), so
# we need to filter them out, to avoid returning duplicate links to the
# client.
ev_key = (ev["room_id"], ev["state_key"])
if ev_key in processed_events:
continue
events_result.append(ev)
# add the child to the queue. we have already validated
# that the vias are a list of server names.
room_queue.append(
_RoomQueueEntry(ev["state_key"], ev["content"]["via"])
)
processed_events.add(ev_key)
return {"rooms": rooms_result, "events": events_result}
@ -149,20 +200,23 @@ class SpaceSummaryHandler:
while room_queue and len(rooms_result) < MAX_ROOMS:
room_id = room_queue.popleft()
if room_id in processed_rooms:
# already done this room
continue
logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)
rooms, events = await self._summarize_local_room(
None, room_id, suggested_only, max_rooms_per_space
)
processed_rooms.add(room_id)
rooms_result.extend(rooms)
events_result.extend(events)
# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(edge_event["state_key"])
# add any children to the queue
room_queue.extend(edge_event["state_key"] for edge_event in events)
return {"rooms": rooms_result, "events": events_result}
@ -200,6 +254,43 @@ class SpaceSummaryHandler:
)
return (room_entry,), events_result
async def _summarize_remote_room(
self,
room: "_RoomQueueEntry",
suggested_only: bool,
max_children: Optional[int],
exclude_rooms: Iterable[str],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)
# we need to make the exclusion list json-serialisable
exclude_rooms = list(exclude_rooms)
via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE)
try:
res = await self._federation_client.get_space_summary(
via,
room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_children,
exclude_rooms=exclude_rooms,
)
except Exception as e:
logger.warning(
"Unable to get summary of %s via federation: %s",
room_id,
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
return (), ()
return res.rooms, tuple(
ev.data
for ev in res.events
if ev.event_type == EventTypes.MSC1772_SPACE_CHILD
)
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
# if we have an authenticated requesting user, first check if they are in the
# room
@ -276,12 +367,24 @@ class SpaceSummaryHandler:
)
# filter out any events without a "via" (which implies it has been redacted)
return (e for e in events if e.content.get("via"))
return (e for e in events if _has_valid_via(e))
@attr.s(frozen=True, slots=True)
class _RoomQueueEntry:
room_id = attr.ib(type=str)
via = attr.ib(type=Sequence[str])
def _has_valid_via(e: EventBase) -> bool:
via = e.content.get("via")
if not via or not isinstance(via, Sequence):
return False
for v in via:
if not isinstance(v, str):
logger.debug("Ignoring edge event %s with invalid via entry", e.event_id)
return False
return True
def _is_suggested_child_event(edge_event: EventBase) -> bool: