forked from MirrorHub/synapse
Add MSC3030 experimental client and federation API endpoints to get the closest event to a given timestamp (#9445)
MSC3030: https://github.com/matrix-org/matrix-doc/pull/3030 Client API endpoint. This will also go and fetch from the federation API endpoint if unable to find an event locally or we found an extremity with possibly a closer event we don't know about. ``` GET /_matrix/client/unstable/org.matrix.msc3030/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction> { "event_id": ... "origin_server_ts": ... } ``` Federation API endpoint: ``` GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction> { "event_id": ... "origin_server_ts": ... } ``` Co-authored-by: Erik Johnston <erik@matrix.org>
This commit is contained in:
parent
84dc50e160
commit
a6f1a3abec
13 changed files with 674 additions and 31 deletions
1
changelog.d/9445.feature
Normal file
1
changelog.d/9445.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) experimental client and federation API endpoints to get the closest event to a given timestamp.
|
|
@ -46,3 +46,6 @@ class ExperimentalConfig(Config):
|
||||||
|
|
||||||
# MSC3266 (room summary api)
|
# MSC3266 (room summary api)
|
||||||
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
|
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)
|
||||||
|
|
||||||
|
# MSC3030 (Jump to date API endpoint)
|
||||||
|
self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False)
|
||||||
|
|
|
@ -1517,6 +1517,83 @@ class FederationClient(FederationBase):
|
||||||
self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
|
self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
async def timestamp_to_event(
|
||||||
|
self, destination: str, room_id: str, timestamp: int, direction: str
|
||||||
|
) -> "TimestampToEventResponse":
|
||||||
|
"""
|
||||||
|
Calls a remote federating server at `destination` asking for their
|
||||||
|
closest event to the given timestamp in the given direction. Also
|
||||||
|
validates the response to always return the expected keys or raises an
|
||||||
|
error.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination: Domain name of the remote homeserver
|
||||||
|
room_id: Room to fetch the event from
|
||||||
|
timestamp: The point in time (inclusive) we should navigate from in
|
||||||
|
the given direction to find the closest event.
|
||||||
|
direction: ["f"|"b"] to indicate whether we should navigate forward
|
||||||
|
or backward from the given timestamp to find the closest event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A parsed TimestampToEventResponse including the closest event_id
|
||||||
|
and origin_server_ts
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Various exceptions when the request fails
|
||||||
|
InvalidResponseError when the response does not have the correct
|
||||||
|
keys or wrong types
|
||||||
|
"""
|
||||||
|
remote_response = await self.transport_layer.timestamp_to_event(
|
||||||
|
destination, room_id, timestamp, direction
|
||||||
|
)
|
||||||
|
|
||||||
|
if not isinstance(remote_response, dict):
|
||||||
|
raise InvalidResponseError(
|
||||||
|
"Response must be a JSON dictionary but received %r" % remote_response
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return TimestampToEventResponse.from_json_dict(remote_response)
|
||||||
|
except ValueError as e:
|
||||||
|
raise InvalidResponseError(str(e))
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
|
class TimestampToEventResponse:
|
||||||
|
"""Typed response dictionary for the federation /timestamp_to_event endpoint"""
|
||||||
|
|
||||||
|
event_id: str
|
||||||
|
origin_server_ts: int
|
||||||
|
|
||||||
|
# the raw data, including the above keys
|
||||||
|
data: JsonDict
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse":
|
||||||
|
"""Parsed response from the federation /timestamp_to_event endpoint
|
||||||
|
|
||||||
|
Args:
|
||||||
|
d: JSON object response to be parsed
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError if d does not the correct keys or they are the wrong types
|
||||||
|
"""
|
||||||
|
|
||||||
|
event_id = d.get("event_id")
|
||||||
|
if not isinstance(event_id, str):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid response: 'event_id' must be a str but received %r" % event_id
|
||||||
|
)
|
||||||
|
|
||||||
|
origin_server_ts = d.get("origin_server_ts")
|
||||||
|
if not isinstance(origin_server_ts, int):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid response: 'origin_server_ts' must be a int but received %r"
|
||||||
|
% origin_server_ts
|
||||||
|
)
|
||||||
|
|
||||||
|
return cls(event_id, origin_server_ts, d)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
class FederationSpaceSummaryEventResult:
|
class FederationSpaceSummaryEventResult:
|
||||||
|
|
|
@ -110,6 +110,7 @@ class FederationServer(FederationBase):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
|
||||||
self.handler = hs.get_federation_handler()
|
self.handler = hs.get_federation_handler()
|
||||||
|
self.storage = hs.get_storage()
|
||||||
self._federation_event_handler = hs.get_federation_event_handler()
|
self._federation_event_handler = hs.get_federation_event_handler()
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self._event_auth_handler = hs.get_event_auth_handler()
|
self._event_auth_handler = hs.get_event_auth_handler()
|
||||||
|
@ -200,6 +201,48 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
return 200, res
|
return 200, res
|
||||||
|
|
||||||
|
async def on_timestamp_to_event_request(
|
||||||
|
self, origin: str, room_id: str, timestamp: int, direction: str
|
||||||
|
) -> Tuple[int, Dict[str, Any]]:
|
||||||
|
"""When we receive a federated `/timestamp_to_event` request,
|
||||||
|
handle all of the logic for validating and fetching the event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
origin: The server we received the event from
|
||||||
|
room_id: Room to fetch the event from
|
||||||
|
timestamp: The point in time (inclusive) we should navigate from in
|
||||||
|
the given direction to find the closest event.
|
||||||
|
direction: ["f"|"b"] to indicate whether we should navigate forward
|
||||||
|
or backward from the given timestamp to find the closest event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple indicating the response status code and dictionary response
|
||||||
|
body including `event_id`.
|
||||||
|
"""
|
||||||
|
with (await self._server_linearizer.queue((origin, room_id))):
|
||||||
|
origin_host, _ = parse_server_name(origin)
|
||||||
|
await self.check_server_matches_acl(origin_host, room_id)
|
||||||
|
|
||||||
|
# We only try to fetch data from the local database
|
||||||
|
event_id = await self.store.get_event_id_for_timestamp(
|
||||||
|
room_id, timestamp, direction
|
||||||
|
)
|
||||||
|
if event_id:
|
||||||
|
event = await self.store.get_event(
|
||||||
|
event_id, allow_none=False, allow_rejected=False
|
||||||
|
)
|
||||||
|
|
||||||
|
return 200, {
|
||||||
|
"event_id": event_id,
|
||||||
|
"origin_server_ts": event.origin_server_ts,
|
||||||
|
}
|
||||||
|
|
||||||
|
raise SynapseError(
|
||||||
|
404,
|
||||||
|
"Unable to find event from %s in direction %s" % (timestamp, direction),
|
||||||
|
errcode=Codes.NOT_FOUND,
|
||||||
|
)
|
||||||
|
|
||||||
async def on_incoming_transaction(
|
async def on_incoming_transaction(
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
|
|
|
@ -148,6 +148,42 @@ class TransportLayerClient:
|
||||||
destination, path=path, args=args, try_trailing_slash_on_400=True
|
destination, path=path, args=args, try_trailing_slash_on_400=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@log_function
|
||||||
|
async def timestamp_to_event(
|
||||||
|
self, destination: str, room_id: str, timestamp: int, direction: str
|
||||||
|
) -> Union[JsonDict, List]:
|
||||||
|
"""
|
||||||
|
Calls a remote federating server at `destination` asking for their
|
||||||
|
closest event to the given timestamp in the given direction.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination: Domain name of the remote homeserver
|
||||||
|
room_id: Room to fetch the event from
|
||||||
|
timestamp: The point in time (inclusive) we should navigate from in
|
||||||
|
the given direction to find the closest event.
|
||||||
|
direction: ["f"|"b"] to indicate whether we should navigate forward
|
||||||
|
or backward from the given timestamp to find the closest event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response dict received from the remote homeserver.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Various exceptions when the request fails
|
||||||
|
"""
|
||||||
|
path = _create_path(
|
||||||
|
FEDERATION_UNSTABLE_PREFIX,
|
||||||
|
"/org.matrix.msc3030/timestamp_to_event/%s",
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
args = {"ts": [str(timestamp)], "dir": [direction]}
|
||||||
|
|
||||||
|
remote_response = await self.client.get_json(
|
||||||
|
destination, path=path, args=args, try_trailing_slash_on_400=True
|
||||||
|
)
|
||||||
|
|
||||||
|
return remote_response
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
async def send_transaction(
|
async def send_transaction(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -22,7 +22,10 @@ from synapse.federation.transport.server._base import (
|
||||||
Authenticator,
|
Authenticator,
|
||||||
BaseFederationServlet,
|
BaseFederationServlet,
|
||||||
)
|
)
|
||||||
from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES
|
from synapse.federation.transport.server.federation import (
|
||||||
|
FEDERATION_SERVLET_CLASSES,
|
||||||
|
FederationTimestampLookupServlet,
|
||||||
|
)
|
||||||
from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES
|
from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES
|
||||||
from synapse.federation.transport.server.groups_server import (
|
from synapse.federation.transport.server.groups_server import (
|
||||||
GROUP_SERVER_SERVLET_CLASSES,
|
GROUP_SERVER_SERVLET_CLASSES,
|
||||||
|
@ -324,6 +327,13 @@ def register_servlets(
|
||||||
)
|
)
|
||||||
|
|
||||||
for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]:
|
for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]:
|
||||||
|
# Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
|
||||||
|
if (
|
||||||
|
servletclass == FederationTimestampLookupServlet
|
||||||
|
and not hs.config.experimental.msc3030_enabled
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
servletclass(
|
servletclass(
|
||||||
hs=hs,
|
hs=hs,
|
||||||
authenticator=authenticator,
|
authenticator=authenticator,
|
||||||
|
|
|
@ -174,6 +174,46 @@ class FederationBackfillServlet(BaseFederationServerServlet):
|
||||||
return await self.handler.on_backfill_request(origin, room_id, versions, limit)
|
return await self.handler.on_backfill_request(origin, room_id, versions, limit)
|
||||||
|
|
||||||
|
|
||||||
|
class FederationTimestampLookupServlet(BaseFederationServerServlet):
|
||||||
|
"""
|
||||||
|
API endpoint to fetch the `event_id` of the closest event to the given
|
||||||
|
timestamp (`ts` query parameter) in the given direction (`dir` query
|
||||||
|
parameter).
|
||||||
|
|
||||||
|
Useful for other homeservers when they're unable to find an event locally.
|
||||||
|
|
||||||
|
`ts` is a timestamp in milliseconds where we will find the closest event in
|
||||||
|
the given direction.
|
||||||
|
|
||||||
|
`dir` can be `f` or `b` to indicate forwards and backwards in time from the
|
||||||
|
given timestamp.
|
||||||
|
|
||||||
|
GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
|
||||||
|
{
|
||||||
|
"event_id": ...
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"
|
||||||
|
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030"
|
||||||
|
|
||||||
|
async def on_GET(
|
||||||
|
self,
|
||||||
|
origin: str,
|
||||||
|
content: Literal[None],
|
||||||
|
query: Dict[bytes, List[bytes]],
|
||||||
|
room_id: str,
|
||||||
|
) -> Tuple[int, JsonDict]:
|
||||||
|
timestamp = parse_integer_from_args(query, "ts", required=True)
|
||||||
|
direction = parse_string_from_args(
|
||||||
|
query, "dir", default="f", allowed_values=["f", "b"], required=True
|
||||||
|
)
|
||||||
|
|
||||||
|
return await self.handler.on_timestamp_to_event_request(
|
||||||
|
origin, room_id, timestamp, direction
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class FederationQueryServlet(BaseFederationServerServlet):
|
class FederationQueryServlet(BaseFederationServerServlet):
|
||||||
PATH = "/query/(?P<query_type>[^/]*)"
|
PATH = "/query/(?P<query_type>[^/]*)"
|
||||||
|
|
||||||
|
@ -683,6 +723,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
|
||||||
FederationStateV1Servlet,
|
FederationStateV1Servlet,
|
||||||
FederationStateIdsServlet,
|
FederationStateIdsServlet,
|
||||||
FederationBackfillServlet,
|
FederationBackfillServlet,
|
||||||
|
FederationTimestampLookupServlet,
|
||||||
FederationQueryServlet,
|
FederationQueryServlet,
|
||||||
FederationMakeJoinServlet,
|
FederationMakeJoinServlet,
|
||||||
FederationMakeLeaveServlet,
|
FederationMakeLeaveServlet,
|
||||||
|
|
|
@ -68,6 +68,37 @@ if TYPE_CHECKING:
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
|
||||||
|
"""Get joined domains from state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
state: State map from type/state key to event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Returns a list of servers with the lowest depth of their joins.
|
||||||
|
Sorted by lowest depth first.
|
||||||
|
"""
|
||||||
|
joined_users = [
|
||||||
|
(state_key, int(event.depth))
|
||||||
|
for (e_type, state_key), event in state.items()
|
||||||
|
if e_type == EventTypes.Member and event.membership == Membership.JOIN
|
||||||
|
]
|
||||||
|
|
||||||
|
joined_domains: Dict[str, int] = {}
|
||||||
|
for u, d in joined_users:
|
||||||
|
try:
|
||||||
|
dom = get_domain_from_id(u)
|
||||||
|
old_d = joined_domains.get(dom)
|
||||||
|
if old_d:
|
||||||
|
joined_domains[dom] = min(d, old_d)
|
||||||
|
else:
|
||||||
|
joined_domains[dom] = d
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return sorted(joined_domains.items(), key=lambda d: d[1])
|
||||||
|
|
||||||
|
|
||||||
class FederationHandler:
|
class FederationHandler:
|
||||||
"""Handles general incoming federation requests
|
"""Handles general incoming federation requests
|
||||||
|
|
||||||
|
@ -268,36 +299,6 @@ class FederationHandler:
|
||||||
|
|
||||||
curr_state = await self.state_handler.get_current_state(room_id)
|
curr_state = await self.state_handler.get_current_state(room_id)
|
||||||
|
|
||||||
def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
|
|
||||||
"""Get joined domains from state
|
|
||||||
|
|
||||||
Args:
|
|
||||||
state: State map from type/state key to event.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Returns a list of servers with the lowest depth of their joins.
|
|
||||||
Sorted by lowest depth first.
|
|
||||||
"""
|
|
||||||
joined_users = [
|
|
||||||
(state_key, int(event.depth))
|
|
||||||
for (e_type, state_key), event in state.items()
|
|
||||||
if e_type == EventTypes.Member and event.membership == Membership.JOIN
|
|
||||||
]
|
|
||||||
|
|
||||||
joined_domains: Dict[str, int] = {}
|
|
||||||
for u, d in joined_users:
|
|
||||||
try:
|
|
||||||
dom = get_domain_from_id(u)
|
|
||||||
old_d = joined_domains.get(dom)
|
|
||||||
if old_d:
|
|
||||||
joined_domains[dom] = min(d, old_d)
|
|
||||||
else:
|
|
||||||
joined_domains[dom] = d
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return sorted(joined_domains.items(), key=lambda d: d[1])
|
|
||||||
|
|
||||||
curr_domains = get_domains_from_state(curr_state)
|
curr_domains = get_domains_from_state(curr_state)
|
||||||
|
|
||||||
likely_domains = [
|
likely_domains = [
|
||||||
|
|
|
@ -46,6 +46,7 @@ from synapse.api.constants import (
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
AuthError,
|
AuthError,
|
||||||
Codes,
|
Codes,
|
||||||
|
HttpResponseException,
|
||||||
LimitExceededError,
|
LimitExceededError,
|
||||||
NotFoundError,
|
NotFoundError,
|
||||||
StoreError,
|
StoreError,
|
||||||
|
@ -56,6 +57,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||||
from synapse.event_auth import validate_event_for_room_version
|
from synapse.event_auth import validate_event_for_room_version
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.events.utils import copy_power_levels_contents
|
from synapse.events.utils import copy_power_levels_contents
|
||||||
|
from synapse.federation.federation_client import InvalidResponseError
|
||||||
|
from synapse.handlers.federation import get_domains_from_state
|
||||||
from synapse.rest.admin._base import assert_user_is_admin
|
from synapse.rest.admin._base import assert_user_is_admin
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.streams import EventSource
|
from synapse.streams import EventSource
|
||||||
|
@ -1220,6 +1223,147 @@ class RoomContextHandler:
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
class TimestampLookupHandler:
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.server_name = hs.hostname
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.state_handler = hs.get_state_handler()
|
||||||
|
self.federation_client = hs.get_federation_client()
|
||||||
|
|
||||||
|
async def get_event_for_timestamp(
|
||||||
|
self,
|
||||||
|
requester: Requester,
|
||||||
|
room_id: str,
|
||||||
|
timestamp: int,
|
||||||
|
direction: str,
|
||||||
|
) -> Tuple[str, int]:
|
||||||
|
"""Find the closest event to the given timestamp in the given direction.
|
||||||
|
If we can't find an event locally or the event we have locally is next to a gap,
|
||||||
|
it will ask other federated homeservers for an event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requester: The user making the request according to the access token
|
||||||
|
room_id: Room to fetch the event from
|
||||||
|
timestamp: The point in time (inclusive) we should navigate from in
|
||||||
|
the given direction to find the closest event.
|
||||||
|
direction: ["f"|"b"] to indicate whether we should navigate forward
|
||||||
|
or backward from the given timestamp to find the closest event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A tuple containing the `event_id` closest to the given timestamp in
|
||||||
|
the given direction and the `origin_server_ts`.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SynapseError if unable to find any event locally in the given direction
|
||||||
|
"""
|
||||||
|
|
||||||
|
local_event_id = await self.store.get_event_id_for_timestamp(
|
||||||
|
room_id, timestamp, direction
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
"get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s",
|
||||||
|
local_event_id,
|
||||||
|
timestamp,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check for gaps in the history where events could be hiding in between
|
||||||
|
# the timestamp given and the event we were able to find locally
|
||||||
|
is_event_next_to_backward_gap = False
|
||||||
|
is_event_next_to_forward_gap = False
|
||||||
|
if local_event_id:
|
||||||
|
local_event = await self.store.get_event(
|
||||||
|
local_event_id, allow_none=False, allow_rejected=False
|
||||||
|
)
|
||||||
|
|
||||||
|
if direction == "f":
|
||||||
|
# We only need to check for a backward gap if we're looking forwards
|
||||||
|
# to ensure there is nothing in between.
|
||||||
|
is_event_next_to_backward_gap = (
|
||||||
|
await self.store.is_event_next_to_backward_gap(local_event)
|
||||||
|
)
|
||||||
|
elif direction == "b":
|
||||||
|
# We only need to check for a forward gap if we're looking backwards
|
||||||
|
# to ensure there is nothing in between
|
||||||
|
is_event_next_to_forward_gap = (
|
||||||
|
await self.store.is_event_next_to_forward_gap(local_event)
|
||||||
|
)
|
||||||
|
|
||||||
|
# If we found a gap, we should probably ask another homeserver first
|
||||||
|
# about more history in between
|
||||||
|
if (
|
||||||
|
not local_event_id
|
||||||
|
or is_event_next_to_backward_gap
|
||||||
|
or is_event_next_to_forward_gap
|
||||||
|
):
|
||||||
|
logger.debug(
|
||||||
|
"get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first",
|
||||||
|
local_event_id,
|
||||||
|
timestamp,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find other homeservers from the given state in the room
|
||||||
|
curr_state = await self.state_handler.get_current_state(room_id)
|
||||||
|
curr_domains = get_domains_from_state(curr_state)
|
||||||
|
likely_domains = [
|
||||||
|
domain for domain, depth in curr_domains if domain != self.server_name
|
||||||
|
]
|
||||||
|
|
||||||
|
# Loop through each homeserver candidate until we get a succesful response
|
||||||
|
for domain in likely_domains:
|
||||||
|
try:
|
||||||
|
remote_response = await self.federation_client.timestamp_to_event(
|
||||||
|
domain, room_id, timestamp, direction
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
"get_event_for_timestamp: response from domain(%s)=%s",
|
||||||
|
domain,
|
||||||
|
remote_response,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: Do we want to persist this as an extremity?
|
||||||
|
# TODO: I think ideally, we would try to backfill from
|
||||||
|
# this event and run this whole
|
||||||
|
# `get_event_for_timestamp` function again to make sure
|
||||||
|
# they didn't give us an event from their gappy history.
|
||||||
|
remote_event_id = remote_response.event_id
|
||||||
|
origin_server_ts = remote_response.origin_server_ts
|
||||||
|
|
||||||
|
# Only return the remote event if it's closer than the local event
|
||||||
|
if not local_event or (
|
||||||
|
abs(origin_server_ts - timestamp)
|
||||||
|
< abs(local_event.origin_server_ts - timestamp)
|
||||||
|
):
|
||||||
|
return remote_event_id, origin_server_ts
|
||||||
|
except (HttpResponseException, InvalidResponseError) as ex:
|
||||||
|
# Let's not put a high priority on some other homeserver
|
||||||
|
# failing to respond or giving a random response
|
||||||
|
logger.debug(
|
||||||
|
"Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
|
||||||
|
domain,
|
||||||
|
type(ex).__name__,
|
||||||
|
ex,
|
||||||
|
ex.args,
|
||||||
|
)
|
||||||
|
except Exception as ex:
|
||||||
|
# But we do want to see some exceptions in our code
|
||||||
|
logger.warning(
|
||||||
|
"Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
|
||||||
|
domain,
|
||||||
|
type(ex).__name__,
|
||||||
|
ex,
|
||||||
|
ex.args,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not local_event_id:
|
||||||
|
raise SynapseError(
|
||||||
|
404,
|
||||||
|
"Unable to find event from %s in direction %s" % (timestamp, direction),
|
||||||
|
errcode=Codes.NOT_FOUND,
|
||||||
|
)
|
||||||
|
|
||||||
|
return local_event_id, local_event.origin_server_ts
|
||||||
|
|
||||||
|
|
||||||
class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
|
class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
|
|
@ -79,6 +79,35 @@ def parse_integer(
|
||||||
return parse_integer_from_args(args, name, default, required)
|
return parse_integer_from_args(args, name, default, required)
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def parse_integer_from_args(
|
||||||
|
args: Mapping[bytes, Sequence[bytes]],
|
||||||
|
name: str,
|
||||||
|
default: Optional[int] = None,
|
||||||
|
) -> Optional[int]:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def parse_integer_from_args(
|
||||||
|
args: Mapping[bytes, Sequence[bytes]],
|
||||||
|
name: str,
|
||||||
|
*,
|
||||||
|
required: Literal[True],
|
||||||
|
) -> int:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def parse_integer_from_args(
|
||||||
|
args: Mapping[bytes, Sequence[bytes]],
|
||||||
|
name: str,
|
||||||
|
default: Optional[int] = None,
|
||||||
|
required: bool = False,
|
||||||
|
) -> Optional[int]:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
def parse_integer_from_args(
|
def parse_integer_from_args(
|
||||||
args: Mapping[bytes, Sequence[bytes]],
|
args: Mapping[bytes, Sequence[bytes]],
|
||||||
name: str,
|
name: str,
|
||||||
|
|
|
@ -1070,6 +1070,62 @@ def register_txn_path(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TimestampLookupRestServlet(RestServlet):
|
||||||
|
"""
|
||||||
|
API endpoint to fetch the `event_id` of the closest event to the given
|
||||||
|
timestamp (`ts` query parameter) in the given direction (`dir` query
|
||||||
|
parameter).
|
||||||
|
|
||||||
|
Useful for cases like jump to date so you can start paginating messages from
|
||||||
|
a given date in the archive.
|
||||||
|
|
||||||
|
`ts` is a timestamp in milliseconds where we will find the closest event in
|
||||||
|
the given direction.
|
||||||
|
|
||||||
|
`dir` can be `f` or `b` to indicate forwards and backwards in time from the
|
||||||
|
given timestamp.
|
||||||
|
|
||||||
|
GET /_matrix/client/unstable/org.matrix.msc3030/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
|
||||||
|
{
|
||||||
|
"event_id": ...
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
PATTERNS = (
|
||||||
|
re.compile(
|
||||||
|
"^/_matrix/client/unstable/org.matrix.msc3030"
|
||||||
|
"/rooms/(?P<room_id>[^/]*)/timestamp_to_event$"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
super().__init__()
|
||||||
|
self._auth = hs.get_auth()
|
||||||
|
self._store = hs.get_datastore()
|
||||||
|
self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler()
|
||||||
|
|
||||||
|
async def on_GET(
|
||||||
|
self, request: SynapseRequest, room_id: str
|
||||||
|
) -> Tuple[int, JsonDict]:
|
||||||
|
requester = await self._auth.get_user_by_req(request)
|
||||||
|
await self._auth.check_user_in_room(room_id, requester.user.to_string())
|
||||||
|
|
||||||
|
timestamp = parse_integer(request, "ts", required=True)
|
||||||
|
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
|
||||||
|
|
||||||
|
(
|
||||||
|
event_id,
|
||||||
|
origin_server_ts,
|
||||||
|
) = await self.timestamp_lookup_handler.get_event_for_timestamp(
|
||||||
|
requester, room_id, timestamp, direction
|
||||||
|
)
|
||||||
|
|
||||||
|
return 200, {
|
||||||
|
"event_id": event_id,
|
||||||
|
"origin_server_ts": origin_server_ts,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class RoomSpaceSummaryRestServlet(RestServlet):
|
class RoomSpaceSummaryRestServlet(RestServlet):
|
||||||
PATTERNS = (
|
PATTERNS = (
|
||||||
re.compile(
|
re.compile(
|
||||||
|
@ -1239,6 +1295,8 @@ def register_servlets(
|
||||||
RoomAliasListServlet(hs).register(http_server)
|
RoomAliasListServlet(hs).register(http_server)
|
||||||
SearchRestServlet(hs).register(http_server)
|
SearchRestServlet(hs).register(http_server)
|
||||||
RoomCreateRestServlet(hs).register(http_server)
|
RoomCreateRestServlet(hs).register(http_server)
|
||||||
|
if hs.config.experimental.msc3030_enabled:
|
||||||
|
TimestampLookupRestServlet(hs).register(http_server)
|
||||||
|
|
||||||
# Some servlets only get registered for the main process.
|
# Some servlets only get registered for the main process.
|
||||||
if not is_worker:
|
if not is_worker:
|
||||||
|
|
|
@ -97,6 +97,7 @@ from synapse.handlers.room import (
|
||||||
RoomContextHandler,
|
RoomContextHandler,
|
||||||
RoomCreationHandler,
|
RoomCreationHandler,
|
||||||
RoomShutdownHandler,
|
RoomShutdownHandler,
|
||||||
|
TimestampLookupHandler,
|
||||||
)
|
)
|
||||||
from synapse.handlers.room_batch import RoomBatchHandler
|
from synapse.handlers.room_batch import RoomBatchHandler
|
||||||
from synapse.handlers.room_list import RoomListHandler
|
from synapse.handlers.room_list import RoomListHandler
|
||||||
|
@ -728,6 +729,10 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
def get_room_context_handler(self) -> RoomContextHandler:
|
def get_room_context_handler(self) -> RoomContextHandler:
|
||||||
return RoomContextHandler(self)
|
return RoomContextHandler(self)
|
||||||
|
|
||||||
|
@cache_in_self
|
||||||
|
def get_timestamp_lookup_handler(self) -> TimestampLookupHandler:
|
||||||
|
return TimestampLookupHandler(self)
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_registration_handler(self) -> RegistrationHandler:
|
def get_registration_handler(self) -> RegistrationHandler:
|
||||||
return RegistrationHandler(self)
|
return RegistrationHandler(self)
|
||||||
|
|
|
@ -1762,3 +1762,198 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
"_cleanup_old_transaction_ids",
|
"_cleanup_old_transaction_ids",
|
||||||
_cleanup_old_transaction_ids_txn,
|
_cleanup_old_transaction_ids_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def is_event_next_to_backward_gap(self, event: EventBase) -> bool:
|
||||||
|
"""Check if the given event is next to a backward gap of missing events.
|
||||||
|
<latest messages> A(False)--->B(False)--->C(True)---> <gap, unknown events> <oldest messages>
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: room where the event lives
|
||||||
|
event_id: event to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Boolean indicating whether it's an extremity
|
||||||
|
"""
|
||||||
|
|
||||||
|
def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool:
|
||||||
|
# If the event in question has any of its prev_events listed as a
|
||||||
|
# backward extremity, it's next to a gap.
|
||||||
|
#
|
||||||
|
# We can't just check the backward edges in `event_edges` because
|
||||||
|
# when we persist events, we will also record the prev_events as
|
||||||
|
# edges to the event in question regardless of whether we have those
|
||||||
|
# prev_events yet. We need to check whether those prev_events are
|
||||||
|
# backward extremities, also known as gaps, that need to be
|
||||||
|
# backfilled.
|
||||||
|
backward_extremity_query = """
|
||||||
|
SELECT 1 FROM event_backward_extremities
|
||||||
|
WHERE
|
||||||
|
room_id = ?
|
||||||
|
AND %s
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
# If the event in question is a backward extremity or has any of its
|
||||||
|
# prev_events listed as a backward extremity, it's next to a
|
||||||
|
# backward gap.
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine,
|
||||||
|
"event_id",
|
||||||
|
[event.event_id] + list(event.prev_event_ids()),
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute(backward_extremity_query % (clause,), [event.room_id] + args)
|
||||||
|
backward_extremities = txn.fetchall()
|
||||||
|
|
||||||
|
# We consider any backward extremity as a backward gap
|
||||||
|
if len(backward_extremities):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"is_event_next_to_backward_gap_txn",
|
||||||
|
is_event_next_to_backward_gap_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def is_event_next_to_forward_gap(self, event: EventBase) -> bool:
|
||||||
|
"""Check if the given event is next to a forward gap of missing events.
|
||||||
|
The gap in front of the latest events is not considered a gap.
|
||||||
|
<latest messages> A(False)--->B(False)--->C(False)---> <gap, unknown events> <oldest messages>
|
||||||
|
<latest messages> A(False)--->B(False)---> <gap, unknown events> --->D(True)--->E(False) <oldest messages>
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: room where the event lives
|
||||||
|
event_id: event to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Boolean indicating whether it's an extremity
|
||||||
|
"""
|
||||||
|
|
||||||
|
def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool:
|
||||||
|
# If the event in question is a forward extremity, we will just
|
||||||
|
# consider any potential forward gap as not a gap since it's one of
|
||||||
|
# the latest events in the room.
|
||||||
|
#
|
||||||
|
# `event_forward_extremities` does not include backfilled or outlier
|
||||||
|
# events so we can't rely on it to find forward gaps. We can only
|
||||||
|
# use it to determine whether a message is the latest in the room.
|
||||||
|
#
|
||||||
|
# We can't combine this query with the `forward_edge_query` below
|
||||||
|
# because if the event in question has no forward edges (isn't
|
||||||
|
# referenced by any other event's prev_events) but is in
|
||||||
|
# `event_forward_extremities`, we don't want to return 0 rows and
|
||||||
|
# say it's next to a gap.
|
||||||
|
forward_extremity_query = """
|
||||||
|
SELECT 1 FROM event_forward_extremities
|
||||||
|
WHERE
|
||||||
|
room_id = ?
|
||||||
|
AND event_id = ?
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Check to see whether the event in question is already referenced
|
||||||
|
# by another event. If we don't see any edges, we're next to a
|
||||||
|
# forward gap.
|
||||||
|
forward_edge_query = """
|
||||||
|
SELECT 1 FROM event_edges
|
||||||
|
/* Check to make sure the event referencing our event in question is not rejected */
|
||||||
|
LEFT JOIN rejections ON event_edges.event_id == rejections.event_id
|
||||||
|
WHERE
|
||||||
|
event_edges.room_id = ?
|
||||||
|
AND event_edges.prev_event_id = ?
|
||||||
|
/* It's not a valid edge if the event referencing our event in
|
||||||
|
* question is rejected.
|
||||||
|
*/
|
||||||
|
AND rejections.event_id IS NULL
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
# We consider any forward extremity as the latest in the room and
|
||||||
|
# not a forward gap.
|
||||||
|
#
|
||||||
|
# To expand, even though there is technically a gap at the front of
|
||||||
|
# the room where the forward extremities are, we consider those the
|
||||||
|
# latest messages in the room so asking other homeservers for more
|
||||||
|
# is useless. The new latest messages will just be federated as
|
||||||
|
# usual.
|
||||||
|
txn.execute(forward_extremity_query, (event.room_id, event.event_id))
|
||||||
|
forward_extremities = txn.fetchall()
|
||||||
|
if len(forward_extremities):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# If there are no forward edges to the event in question (another
|
||||||
|
# event hasn't referenced this event in their prev_events), then we
|
||||||
|
# assume there is a forward gap in the history.
|
||||||
|
txn.execute(forward_edge_query, (event.room_id, event.event_id))
|
||||||
|
forward_edges = txn.fetchall()
|
||||||
|
if not len(forward_edges):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"is_event_next_to_gap_txn",
|
||||||
|
is_event_next_to_gap_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_event_id_for_timestamp(
|
||||||
|
self, room_id: str, timestamp: int, direction: str
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Find the closest event to the given timestamp in the given direction.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room to fetch the event from
|
||||||
|
timestamp: The point in time (inclusive) we should navigate from in
|
||||||
|
the given direction to find the closest event.
|
||||||
|
direction: ["f"|"b"] to indicate whether we should navigate forward
|
||||||
|
or backward from the given timestamp to find the closest event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The closest event_id otherwise None if we can't find any event in
|
||||||
|
the given direction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
sql_template = """
|
||||||
|
SELECT event_id FROM events
|
||||||
|
LEFT JOIN rejections USING (event_id)
|
||||||
|
WHERE
|
||||||
|
origin_server_ts %s ?
|
||||||
|
AND room_id = ?
|
||||||
|
/* Make sure event is not rejected */
|
||||||
|
AND rejections.event_id IS NULL
|
||||||
|
ORDER BY origin_server_ts %s
|
||||||
|
LIMIT 1;
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
|
||||||
|
if direction == "b":
|
||||||
|
# Find closest event *before* a given timestamp. We use descending
|
||||||
|
# (which gives values largest to smallest) because we want the
|
||||||
|
# largest possible timestamp *before* the given timestamp.
|
||||||
|
comparison_operator = "<="
|
||||||
|
order = "DESC"
|
||||||
|
else:
|
||||||
|
# Find closest event *after* a given timestamp. We use ascending
|
||||||
|
# (which gives values smallest to largest) because we want the
|
||||||
|
# closest possible timestamp *after* the given timestamp.
|
||||||
|
comparison_operator = ">="
|
||||||
|
order = "ASC"
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
sql_template % (comparison_operator, order), (timestamp, room_id)
|
||||||
|
)
|
||||||
|
row = txn.fetchone()
|
||||||
|
if row:
|
||||||
|
(event_id,) = row
|
||||||
|
return event_id
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
if direction not in ("f", "b"):
|
||||||
|
raise ValueError("Unknown direction: %s" % (direction,))
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_event_id_for_timestamp_txn",
|
||||||
|
get_event_id_for_timestamp_txn,
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue