forked from MirrorHub/synapse
Refactor the code to inject bundled relations during serialization. (#11408)
This commit is contained in:
parent
1035663833
commit
6a5dd485bd
8 changed files with 92 additions and 73 deletions
1
changelog.d/11408.misc
Normal file
1
changelog.d/11408.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Refactor including the bundled relations when serializing an event.
|
|
@ -1,4 +1,5 @@
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -392,15 +393,16 @@ class EventClientSerializer:
|
||||||
self,
|
self,
|
||||||
event: Union[JsonDict, EventBase],
|
event: Union[JsonDict, EventBase],
|
||||||
time_now: int,
|
time_now: int,
|
||||||
bundle_aggregations: bool = True,
|
bundle_relations: bool = True,
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> JsonDict:
|
) -> JsonDict:
|
||||||
"""Serializes a single event.
|
"""Serializes a single event.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event
|
event: The event being serialized.
|
||||||
time_now: The current time in milliseconds
|
time_now: The current time in milliseconds
|
||||||
bundle_aggregations: Whether to bundle in related events
|
bundle_relations: Whether to include the bundled relations for this
|
||||||
|
event.
|
||||||
**kwargs: Arguments to pass to `serialize_event`
|
**kwargs: Arguments to pass to `serialize_event`
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -410,77 +412,93 @@ class EventClientSerializer:
|
||||||
if not isinstance(event, EventBase):
|
if not isinstance(event, EventBase):
|
||||||
return event
|
return event
|
||||||
|
|
||||||
event_id = event.event_id
|
|
||||||
serialized_event = serialize_event(event, time_now, **kwargs)
|
serialized_event = serialize_event(event, time_now, **kwargs)
|
||||||
|
|
||||||
# If MSC1849 is enabled then we need to look if there are any relations
|
# If MSC1849 is enabled then we need to look if there are any relations
|
||||||
# we need to bundle in with the event.
|
# we need to bundle in with the event.
|
||||||
# Do not bundle relations if the event has been redacted
|
# Do not bundle relations if the event has been redacted
|
||||||
if not event.internal_metadata.is_redacted() and (
|
if not event.internal_metadata.is_redacted() and (
|
||||||
self._msc1849_enabled and bundle_aggregations
|
self._msc1849_enabled and bundle_relations
|
||||||
):
|
):
|
||||||
annotations = await self.store.get_aggregation_groups_for_event(event_id)
|
await self._injected_bundled_relations(event, time_now, serialized_event)
|
||||||
references = await self.store.get_relations_for_event(
|
|
||||||
event_id, RelationTypes.REFERENCE, direction="f"
|
|
||||||
)
|
|
||||||
|
|
||||||
if annotations.chunk:
|
|
||||||
r = serialized_event["unsigned"].setdefault("m.relations", {})
|
|
||||||
r[RelationTypes.ANNOTATION] = annotations.to_dict()
|
|
||||||
|
|
||||||
if references.chunk:
|
|
||||||
r = serialized_event["unsigned"].setdefault("m.relations", {})
|
|
||||||
r[RelationTypes.REFERENCE] = references.to_dict()
|
|
||||||
|
|
||||||
edit = None
|
|
||||||
if event.type == EventTypes.Message:
|
|
||||||
edit = await self.store.get_applicable_edit(event_id)
|
|
||||||
|
|
||||||
if edit:
|
|
||||||
# If there is an edit replace the content, preserving existing
|
|
||||||
# relations.
|
|
||||||
|
|
||||||
# Ensure we take copies of the edit content, otherwise we risk modifying
|
|
||||||
# the original event.
|
|
||||||
edit_content = edit.content.copy()
|
|
||||||
|
|
||||||
# Unfreeze the event content if necessary, so that we may modify it below
|
|
||||||
edit_content = unfreeze(edit_content)
|
|
||||||
serialized_event["content"] = edit_content.get("m.new_content", {})
|
|
||||||
|
|
||||||
# Check for existing relations
|
|
||||||
relations = event.content.get("m.relates_to")
|
|
||||||
if relations:
|
|
||||||
# Keep the relations, ensuring we use a dict copy of the original
|
|
||||||
serialized_event["content"]["m.relates_to"] = relations.copy()
|
|
||||||
else:
|
|
||||||
serialized_event["content"].pop("m.relates_to", None)
|
|
||||||
|
|
||||||
r = serialized_event["unsigned"].setdefault("m.relations", {})
|
|
||||||
r[RelationTypes.REPLACE] = {
|
|
||||||
"event_id": edit.event_id,
|
|
||||||
"origin_server_ts": edit.origin_server_ts,
|
|
||||||
"sender": edit.sender,
|
|
||||||
}
|
|
||||||
|
|
||||||
# If this event is the start of a thread, include a summary of the replies.
|
|
||||||
if self._msc3440_enabled:
|
|
||||||
(
|
|
||||||
thread_count,
|
|
||||||
latest_thread_event,
|
|
||||||
) = await self.store.get_thread_summary(event_id)
|
|
||||||
if latest_thread_event:
|
|
||||||
r = serialized_event["unsigned"].setdefault("m.relations", {})
|
|
||||||
r[RelationTypes.THREAD] = {
|
|
||||||
# Don't bundle aggregations as this could recurse forever.
|
|
||||||
"latest_event": await self.serialize_event(
|
|
||||||
latest_thread_event, time_now, bundle_aggregations=False
|
|
||||||
),
|
|
||||||
"count": thread_count,
|
|
||||||
}
|
|
||||||
|
|
||||||
return serialized_event
|
return serialized_event
|
||||||
|
|
||||||
|
async def _injected_bundled_relations(
|
||||||
|
self, event: EventBase, time_now: int, serialized_event: JsonDict
|
||||||
|
) -> None:
|
||||||
|
"""Potentially injects bundled relations into the unsigned portion of the serialized event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event: The event being serialized.
|
||||||
|
time_now: The current time in milliseconds
|
||||||
|
serialized_event: The serialized event which may be modified.
|
||||||
|
|
||||||
|
"""
|
||||||
|
event_id = event.event_id
|
||||||
|
|
||||||
|
# The bundled relations to include.
|
||||||
|
relations = {}
|
||||||
|
|
||||||
|
annotations = await self.store.get_aggregation_groups_for_event(event_id)
|
||||||
|
if annotations.chunk:
|
||||||
|
relations[RelationTypes.ANNOTATION] = annotations.to_dict()
|
||||||
|
|
||||||
|
references = await self.store.get_relations_for_event(
|
||||||
|
event_id, RelationTypes.REFERENCE, direction="f"
|
||||||
|
)
|
||||||
|
if references.chunk:
|
||||||
|
relations[RelationTypes.REFERENCE] = references.to_dict()
|
||||||
|
|
||||||
|
edit = None
|
||||||
|
if event.type == EventTypes.Message:
|
||||||
|
edit = await self.store.get_applicable_edit(event_id)
|
||||||
|
|
||||||
|
if edit:
|
||||||
|
# If there is an edit replace the content, preserving existing
|
||||||
|
# relations.
|
||||||
|
|
||||||
|
# Ensure we take copies of the edit content, otherwise we risk modifying
|
||||||
|
# the original event.
|
||||||
|
edit_content = edit.content.copy()
|
||||||
|
|
||||||
|
# Unfreeze the event content if necessary, so that we may modify it below
|
||||||
|
edit_content = unfreeze(edit_content)
|
||||||
|
serialized_event["content"] = edit_content.get("m.new_content", {})
|
||||||
|
|
||||||
|
# Check for existing relations
|
||||||
|
relates_to = event.content.get("m.relates_to")
|
||||||
|
if relates_to:
|
||||||
|
# Keep the relations, ensuring we use a dict copy of the original
|
||||||
|
serialized_event["content"]["m.relates_to"] = relates_to.copy()
|
||||||
|
else:
|
||||||
|
serialized_event["content"].pop("m.relates_to", None)
|
||||||
|
|
||||||
|
relations[RelationTypes.REPLACE] = {
|
||||||
|
"event_id": edit.event_id,
|
||||||
|
"origin_server_ts": edit.origin_server_ts,
|
||||||
|
"sender": edit.sender,
|
||||||
|
}
|
||||||
|
|
||||||
|
# If this event is the start of a thread, include a summary of the replies.
|
||||||
|
if self._msc3440_enabled:
|
||||||
|
(
|
||||||
|
thread_count,
|
||||||
|
latest_thread_event,
|
||||||
|
) = await self.store.get_thread_summary(event_id)
|
||||||
|
if latest_thread_event:
|
||||||
|
relations[RelationTypes.THREAD] = {
|
||||||
|
# Don't bundle relations as this could recurse forever.
|
||||||
|
"latest_event": await self.serialize_event(
|
||||||
|
latest_thread_event, time_now, bundle_relations=False
|
||||||
|
),
|
||||||
|
"count": thread_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
# If any bundled relations were found, include them.
|
||||||
|
if relations:
|
||||||
|
serialized_event["unsigned"].setdefault("m.relations", {}).update(relations)
|
||||||
|
|
||||||
async def serialize_events(
|
async def serialize_events(
|
||||||
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
|
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
|
||||||
) -> List[JsonDict]:
|
) -> List[JsonDict]:
|
||||||
|
|
|
@ -124,7 +124,7 @@ class EventStreamHandler:
|
||||||
as_client_event=as_client_event,
|
as_client_event=as_client_event,
|
||||||
# We don't bundle "live" events, as otherwise clients
|
# We don't bundle "live" events, as otherwise clients
|
||||||
# will end up double counting annotations.
|
# will end up double counting annotations.
|
||||||
bundle_aggregations=False,
|
bundle_relations=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
chunk = {
|
chunk = {
|
||||||
|
|
|
@ -252,7 +252,7 @@ class MessageHandler:
|
||||||
now,
|
now,
|
||||||
# We don't bother bundling aggregations in when asked for state
|
# We don't bother bundling aggregations in when asked for state
|
||||||
# events, as clients won't use them.
|
# events, as clients won't use them.
|
||||||
bundle_aggregations=False,
|
bundle_relations=False,
|
||||||
)
|
)
|
||||||
return events
|
return events
|
||||||
|
|
||||||
|
|
|
@ -448,7 +448,7 @@ class RoomStateRestServlet(RestServlet):
|
||||||
now,
|
now,
|
||||||
# We don't bother bundling aggregations in when asked for state
|
# We don't bother bundling aggregations in when asked for state
|
||||||
# events, as clients won't use them.
|
# events, as clients won't use them.
|
||||||
bundle_aggregations=False,
|
bundle_relations=False,
|
||||||
)
|
)
|
||||||
ret = {"state": room_state}
|
ret = {"state": room_state}
|
||||||
|
|
||||||
|
@ -778,7 +778,7 @@ class RoomEventContextServlet(RestServlet):
|
||||||
results["state"],
|
results["state"],
|
||||||
time_now,
|
time_now,
|
||||||
# No need to bundle aggregations for state events
|
# No need to bundle aggregations for state events
|
||||||
bundle_aggregations=False,
|
bundle_relations=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
return 200, results
|
return 200, results
|
||||||
|
|
|
@ -224,17 +224,17 @@ class RelationPaginationServlet(RestServlet):
|
||||||
)
|
)
|
||||||
|
|
||||||
now = self.clock.time_msec()
|
now = self.clock.time_msec()
|
||||||
# We set bundle_aggregations to False when retrieving the original
|
# We set bundle_relations to False when retrieving the original
|
||||||
# event because we want the content before relations were applied to
|
# event because we want the content before relations were applied to
|
||||||
# it.
|
# it.
|
||||||
original_event = await self._event_serializer.serialize_event(
|
original_event = await self._event_serializer.serialize_event(
|
||||||
event, now, bundle_aggregations=False
|
event, now, bundle_relations=False
|
||||||
)
|
)
|
||||||
# Similarly, we don't allow relations to be applied to relations, so we
|
# Similarly, we don't allow relations to be applied to relations, so we
|
||||||
# return the original relations without any aggregations on top of them
|
# return the original relations without any aggregations on top of them
|
||||||
# here.
|
# here.
|
||||||
serialized_events = await self._event_serializer.serialize_events(
|
serialized_events = await self._event_serializer.serialize_events(
|
||||||
events, now, bundle_aggregations=False
|
events, now, bundle_relations=False
|
||||||
)
|
)
|
||||||
|
|
||||||
return_value = pagination_chunk.to_dict()
|
return_value = pagination_chunk.to_dict()
|
||||||
|
|
|
@ -719,7 +719,7 @@ class RoomEventContextServlet(RestServlet):
|
||||||
results["state"],
|
results["state"],
|
||||||
time_now,
|
time_now,
|
||||||
# No need to bundle aggregations for state events
|
# No need to bundle aggregations for state events
|
||||||
bundle_aggregations=False,
|
bundle_relations=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
return 200, results
|
return 200, results
|
||||||
|
|
|
@ -522,7 +522,7 @@ class SyncRestServlet(RestServlet):
|
||||||
time_now=time_now,
|
time_now=time_now,
|
||||||
# We don't bundle "live" events, as otherwise clients
|
# We don't bundle "live" events, as otherwise clients
|
||||||
# will end up double counting annotations.
|
# will end up double counting annotations.
|
||||||
bundle_aggregations=False,
|
bundle_relations=False,
|
||||||
token_id=token_id,
|
token_id=token_id,
|
||||||
event_format=event_formatter,
|
event_format=event_formatter,
|
||||||
only_event_fields=only_fields,
|
only_event_fields=only_fields,
|
||||||
|
|
Loading…
Reference in a new issue