forked from MirrorHub/synapse
Allow retrieving the relations of a redacted event. (#12130)
This is allowed per MSC2675, although the original implementation did not allow for it and would return an empty chunk / not bundle aggregations. The main thing to improve is that the various caches get cleared properly when an event is redacted, and that edits must not leak if the original event is redacted (as that would presumably leak something similar to the original event content).
This commit is contained in:
parent
3e4af36bc8
commit
88cd6f9378
8 changed files with 119 additions and 80 deletions
1
changelog.d/12130.bugfix
Normal file
1
changelog.d/12130.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug when redacting events with relations.
|
1
changelog.d/12189.bugfix
Normal file
1
changelog.d/12189.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug when redacting events with relations.
|
|
@ -1 +0,0 @@
|
||||||
Support skipping some arguments when generating cache keys.
|
|
|
@ -27,7 +27,7 @@ from synapse.http.server import HttpServer
|
||||||
from synapse.http.servlet import RestServlet, parse_integer, parse_string
|
from synapse.http.servlet import RestServlet, parse_integer, parse_string
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.rest.client._base import client_patterns
|
from synapse.rest.client._base import client_patterns
|
||||||
from synapse.storage.relations import AggregationPaginationToken, PaginationChunk
|
from synapse.storage.relations import AggregationPaginationToken
|
||||||
from synapse.types import JsonDict, StreamToken
|
from synapse.types import JsonDict, StreamToken
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -82,28 +82,25 @@ class RelationPaginationServlet(RestServlet):
|
||||||
from_token_str = parse_string(request, "from")
|
from_token_str = parse_string(request, "from")
|
||||||
to_token_str = parse_string(request, "to")
|
to_token_str = parse_string(request, "to")
|
||||||
|
|
||||||
if event.internal_metadata.is_redacted():
|
# Return the relations
|
||||||
# If the event is redacted, return an empty list of relations
|
from_token = None
|
||||||
pagination_chunk = PaginationChunk(chunk=[])
|
if from_token_str:
|
||||||
else:
|
from_token = await StreamToken.from_string(self.store, from_token_str)
|
||||||
# Return the relations
|
to_token = None
|
||||||
from_token = None
|
if to_token_str:
|
||||||
if from_token_str:
|
to_token = await StreamToken.from_string(self.store, to_token_str)
|
||||||
from_token = await StreamToken.from_string(self.store, from_token_str)
|
|
||||||
to_token = None
|
|
||||||
if to_token_str:
|
|
||||||
to_token = await StreamToken.from_string(self.store, to_token_str)
|
|
||||||
|
|
||||||
pagination_chunk = await self.store.get_relations_for_event(
|
pagination_chunk = await self.store.get_relations_for_event(
|
||||||
event_id=parent_id,
|
event_id=parent_id,
|
||||||
room_id=room_id,
|
event=event,
|
||||||
relation_type=relation_type,
|
room_id=room_id,
|
||||||
event_type=event_type,
|
relation_type=relation_type,
|
||||||
limit=limit,
|
event_type=event_type,
|
||||||
direction=direction,
|
limit=limit,
|
||||||
from_token=from_token,
|
direction=direction,
|
||||||
to_token=to_token,
|
from_token=from_token,
|
||||||
)
|
to_token=to_token,
|
||||||
|
)
|
||||||
|
|
||||||
events = await self.store.get_events_as_list(
|
events = await self.store.get_events_as_list(
|
||||||
[c["event_id"] for c in pagination_chunk.chunk]
|
[c["event_id"] for c in pagination_chunk.chunk]
|
||||||
|
@ -193,27 +190,23 @@ class RelationAggregationPaginationServlet(RestServlet):
|
||||||
from_token_str = parse_string(request, "from")
|
from_token_str = parse_string(request, "from")
|
||||||
to_token_str = parse_string(request, "to")
|
to_token_str = parse_string(request, "to")
|
||||||
|
|
||||||
if event.internal_metadata.is_redacted():
|
# Return the relations
|
||||||
# If the event is redacted, return an empty list of relations
|
from_token = None
|
||||||
pagination_chunk = PaginationChunk(chunk=[])
|
if from_token_str:
|
||||||
else:
|
from_token = AggregationPaginationToken.from_string(from_token_str)
|
||||||
# Return the relations
|
|
||||||
from_token = None
|
|
||||||
if from_token_str:
|
|
||||||
from_token = AggregationPaginationToken.from_string(from_token_str)
|
|
||||||
|
|
||||||
to_token = None
|
to_token = None
|
||||||
if to_token_str:
|
if to_token_str:
|
||||||
to_token = AggregationPaginationToken.from_string(to_token_str)
|
to_token = AggregationPaginationToken.from_string(to_token_str)
|
||||||
|
|
||||||
pagination_chunk = await self.store.get_aggregation_groups_for_event(
|
pagination_chunk = await self.store.get_aggregation_groups_for_event(
|
||||||
event_id=parent_id,
|
event_id=parent_id,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
event_type=event_type,
|
event_type=event_type,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
from_token=from_token,
|
from_token=from_token,
|
||||||
to_token=to_token,
|
to_token=to_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
return 200, await pagination_chunk.to_dict(self.store)
|
return 200, await pagination_chunk.to_dict(self.store)
|
||||||
|
|
||||||
|
@ -295,6 +288,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet):
|
||||||
|
|
||||||
result = await self.store.get_relations_for_event(
|
result = await self.store.get_relations_for_event(
|
||||||
event_id=parent_id,
|
event_id=parent_id,
|
||||||
|
event=event,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
relation_type=relation_type,
|
relation_type=relation_type,
|
||||||
event_type=event_type,
|
event_type=event_type,
|
||||||
|
|
|
@ -191,6 +191,10 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
if redacts:
|
if redacts:
|
||||||
self._invalidate_get_event_cache(redacts)
|
self._invalidate_get_event_cache(redacts)
|
||||||
|
# Caches which might leak edits must be invalidated for the event being
|
||||||
|
# redacted.
|
||||||
|
self.get_relations_for_event.invalidate((redacts,))
|
||||||
|
self.get_applicable_edit.invalidate((redacts,))
|
||||||
|
|
||||||
if etype == EventTypes.Member:
|
if etype == EventTypes.Member:
|
||||||
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
||||||
|
|
|
@ -1619,9 +1619,12 @@ class PersistEventsStore:
|
||||||
|
|
||||||
txn.call_after(prefill)
|
txn.call_after(prefill)
|
||||||
|
|
||||||
def _store_redaction(self, txn, event):
|
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
|
||||||
# invalidate the cache for the redacted event
|
# Invalidate the caches for the redacted event, note that these caches
|
||||||
|
# are also cleared as part of event replication in _invalidate_caches_for_event.
|
||||||
txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
|
txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
|
||||||
|
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
|
||||||
|
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
|
||||||
|
|
||||||
self.db_pool.simple_upsert_txn(
|
self.db_pool.simple_upsert_txn(
|
||||||
txn,
|
txn,
|
||||||
|
@ -1812,9 +1815,7 @@ class PersistEventsStore:
|
||||||
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
|
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
|
||||||
|
|
||||||
if rel_type == RelationTypes.THREAD:
|
if rel_type == RelationTypes.THREAD:
|
||||||
txn.call_after(
|
txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
|
||||||
self.store.get_thread_summary.invalidate, (parent_id, event.room_id)
|
|
||||||
)
|
|
||||||
# It should be safe to only invalidate the cache if the user has not
|
# It should be safe to only invalidate the cache if the user has not
|
||||||
# previously participated in the thread, but that's difficult (and
|
# previously participated in the thread, but that's difficult (and
|
||||||
# potentially error-prone) so it is always invalidated.
|
# potentially error-prone) so it is always invalidated.
|
||||||
|
|
|
@ -91,10 +91,11 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
self._msc3440_enabled = hs.config.experimental.msc3440_enabled
|
self._msc3440_enabled = hs.config.experimental.msc3440_enabled
|
||||||
|
|
||||||
@cached(tree=True)
|
@cached(uncached_args=("event",), tree=True)
|
||||||
async def get_relations_for_event(
|
async def get_relations_for_event(
|
||||||
self,
|
self,
|
||||||
event_id: str,
|
event_id: str,
|
||||||
|
event: EventBase,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
relation_type: Optional[str] = None,
|
relation_type: Optional[str] = None,
|
||||||
event_type: Optional[str] = None,
|
event_type: Optional[str] = None,
|
||||||
|
@ -108,6 +109,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_id: Fetch events that relate to this event ID.
|
event_id: Fetch events that relate to this event ID.
|
||||||
|
event: The matching EventBase to event_id.
|
||||||
room_id: The room the event belongs to.
|
room_id: The room the event belongs to.
|
||||||
relation_type: Only fetch events with this relation type, if given.
|
relation_type: Only fetch events with this relation type, if given.
|
||||||
event_type: Only fetch events with this event type, if given.
|
event_type: Only fetch events with this event type, if given.
|
||||||
|
@ -122,9 +124,13 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
List of event IDs that match relations requested. The rows are of
|
List of event IDs that match relations requested. The rows are of
|
||||||
the form `{"event_id": "..."}`.
|
the form `{"event_id": "..."}`.
|
||||||
"""
|
"""
|
||||||
|
# We don't use `event_id`, it's there so that we can cache based on
|
||||||
|
# it. The `event_id` must match the `event.event_id`.
|
||||||
|
assert event.event_id == event_id
|
||||||
|
|
||||||
where_clause = ["relates_to_id = ?", "room_id = ?"]
|
where_clause = ["relates_to_id = ?", "room_id = ?"]
|
||||||
where_args: List[Union[str, int]] = [event_id, room_id]
|
where_args: List[Union[str, int]] = [event.event_id, room_id]
|
||||||
|
is_redacted = event.internal_metadata.is_redacted()
|
||||||
|
|
||||||
if relation_type is not None:
|
if relation_type is not None:
|
||||||
where_clause.append("relation_type = ?")
|
where_clause.append("relation_type = ?")
|
||||||
|
@ -157,7 +163,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
order = "ASC"
|
order = "ASC"
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
SELECT event_id, topological_ordering, stream_ordering
|
SELECT event_id, relation_type, topological_ordering, stream_ordering
|
||||||
FROM event_relations
|
FROM event_relations
|
||||||
INNER JOIN events USING (event_id)
|
INNER JOIN events USING (event_id)
|
||||||
WHERE %s
|
WHERE %s
|
||||||
|
@ -178,9 +184,12 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
last_stream_id = None
|
last_stream_id = None
|
||||||
events = []
|
events = []
|
||||||
for row in txn:
|
for row in txn:
|
||||||
events.append({"event_id": row[0]})
|
# Do not include edits for redacted events as they leak event
|
||||||
last_topo_id = row[1]
|
# content.
|
||||||
last_stream_id = row[2]
|
if not is_redacted or row[1] != RelationTypes.REPLACE:
|
||||||
|
events.append({"event_id": row[0]})
|
||||||
|
last_topo_id = row[2]
|
||||||
|
last_stream_id = row[3]
|
||||||
|
|
||||||
# If there are more events, generate the next pagination key.
|
# If there are more events, generate the next pagination key.
|
||||||
next_token = None
|
next_token = None
|
||||||
|
@ -776,7 +785,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
references = await self.get_relations_for_event(
|
references = await self.get_relations_for_event(
|
||||||
event_id, room_id, RelationTypes.REFERENCE, direction="f"
|
event_id, event, room_id, RelationTypes.REFERENCE, direction="f"
|
||||||
)
|
)
|
||||||
if references.chunk:
|
if references.chunk:
|
||||||
aggregations.references = await references.to_dict(cast("DataStore", self))
|
aggregations.references = await references.to_dict(cast("DataStore", self))
|
||||||
|
@ -797,41 +806,36 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
A map of event ID to the bundled aggregation for the event. Not all
|
A map of event ID to the bundled aggregation for the event. Not all
|
||||||
events may have bundled aggregations in the results.
|
events may have bundled aggregations in the results.
|
||||||
"""
|
"""
|
||||||
# The already processed event IDs. Tracked separately from the result
|
# De-duplicate events by ID to handle the same event requested multiple times.
|
||||||
# since the result omits events which do not have bundled aggregations.
|
#
|
||||||
seen_event_ids = set()
|
# State events do not get bundled aggregations.
|
||||||
|
events_by_id = {
|
||||||
# State events and redacted events do not get bundled aggregations.
|
event.event_id: event for event in events if not event.is_state()
|
||||||
events = [
|
}
|
||||||
event
|
|
||||||
for event in events
|
|
||||||
if not event.is_state() and not event.internal_metadata.is_redacted()
|
|
||||||
]
|
|
||||||
|
|
||||||
# event ID -> bundled aggregation in non-serialized form.
|
# event ID -> bundled aggregation in non-serialized form.
|
||||||
results: Dict[str, BundledAggregations] = {}
|
results: Dict[str, BundledAggregations] = {}
|
||||||
|
|
||||||
# Fetch other relations per event.
|
# Fetch other relations per event.
|
||||||
for event in events:
|
for event in events_by_id.values():
|
||||||
# De-duplicate events by ID to handle the same event requested multiple
|
|
||||||
# times. The caches that _get_bundled_aggregation_for_event use should
|
|
||||||
# capture this, but best to reduce work.
|
|
||||||
if event.event_id in seen_event_ids:
|
|
||||||
continue
|
|
||||||
seen_event_ids.add(event.event_id)
|
|
||||||
|
|
||||||
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
|
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
|
||||||
if event_result:
|
if event_result:
|
||||||
results[event.event_id] = event_result
|
results[event.event_id] = event_result
|
||||||
|
|
||||||
# Fetch any edits.
|
# Fetch any edits (but not for redacted events).
|
||||||
edits = await self._get_applicable_edits(seen_event_ids)
|
edits = await self._get_applicable_edits(
|
||||||
|
[
|
||||||
|
event_id
|
||||||
|
for event_id, event in events_by_id.items()
|
||||||
|
if not event.internal_metadata.is_redacted()
|
||||||
|
]
|
||||||
|
)
|
||||||
for event_id, edit in edits.items():
|
for event_id, edit in edits.items():
|
||||||
results.setdefault(event_id, BundledAggregations()).replace = edit
|
results.setdefault(event_id, BundledAggregations()).replace = edit
|
||||||
|
|
||||||
# Fetch thread summaries.
|
# Fetch thread summaries.
|
||||||
if self._msc3440_enabled:
|
if self._msc3440_enabled:
|
||||||
summaries = await self._get_thread_summaries(seen_event_ids)
|
summaries = await self._get_thread_summaries(events_by_id.keys())
|
||||||
# Only fetch participated for a limited selection based on what had
|
# Only fetch participated for a limited selection based on what had
|
||||||
# summaries.
|
# summaries.
|
||||||
participated = await self._get_threads_participated(
|
participated = await self._get_threads_participated(
|
||||||
|
|
|
@ -1475,12 +1475,13 @@ class RelationRedactionTestCase(BaseRelationsTestCase):
|
||||||
self.assertEqual(relations, {})
|
self.assertEqual(relations, {})
|
||||||
|
|
||||||
def test_redact_parent_annotation(self) -> None:
|
def test_redact_parent_annotation(self) -> None:
|
||||||
"""Test that annotations of an event are redacted when the original event
|
"""Test that annotations of an event are viewable when the original event
|
||||||
is redacted.
|
is redacted.
|
||||||
"""
|
"""
|
||||||
# Add a relation
|
# Add a relation
|
||||||
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍")
|
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍")
|
||||||
self.assertEqual(200, channel.code, channel.json_body)
|
self.assertEqual(200, channel.code, channel.json_body)
|
||||||
|
related_event_id = channel.json_body["event_id"]
|
||||||
|
|
||||||
# The relations should exist.
|
# The relations should exist.
|
||||||
event_ids, relations = self._make_relation_requests()
|
event_ids, relations = self._make_relation_requests()
|
||||||
|
@ -1494,11 +1495,45 @@ class RelationRedactionTestCase(BaseRelationsTestCase):
|
||||||
# Redact the original event.
|
# Redact the original event.
|
||||||
self._redact(self.parent_id)
|
self._redact(self.parent_id)
|
||||||
|
|
||||||
# The relations are not returned.
|
# The relations are returned.
|
||||||
event_ids, relations = self._make_relation_requests()
|
event_ids, relations = self._make_relation_requests()
|
||||||
self.assertEqual(event_ids, [])
|
self.assertEquals(event_ids, [related_event_id])
|
||||||
self.assertEqual(relations, {})
|
self.assertEquals(
|
||||||
|
relations["m.annotation"],
|
||||||
|
{"chunk": [{"type": "m.reaction", "key": "👍", "count": 1}]},
|
||||||
|
)
|
||||||
|
|
||||||
# There's nothing to aggregate.
|
# There's nothing to aggregate.
|
||||||
chunk = self._get_aggregations()
|
chunk = self._get_aggregations()
|
||||||
self.assertEqual(chunk, [])
|
self.assertEqual(chunk, [{"count": 1, "key": "👍", "type": "m.reaction"}])
|
||||||
|
|
||||||
|
@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
|
||||||
|
def test_redact_parent_thread(self) -> None:
|
||||||
|
"""
|
||||||
|
Test that thread replies are still available when the root event is redacted.
|
||||||
|
"""
|
||||||
|
channel = self._send_relation(
|
||||||
|
RelationTypes.THREAD,
|
||||||
|
EventTypes.Message,
|
||||||
|
content={"body": "reply 1", "msgtype": "m.text"},
|
||||||
|
)
|
||||||
|
self.assertEqual(200, channel.code, channel.json_body)
|
||||||
|
related_event_id = channel.json_body["event_id"]
|
||||||
|
|
||||||
|
# Redact one of the reactions.
|
||||||
|
self._redact(self.parent_id)
|
||||||
|
|
||||||
|
# The unredacted relation should still exist.
|
||||||
|
event_ids, relations = self._make_relation_requests()
|
||||||
|
self.assertEquals(len(event_ids), 1)
|
||||||
|
self.assertDictContainsSubset(
|
||||||
|
{
|
||||||
|
"count": 1,
|
||||||
|
"current_user_participated": True,
|
||||||
|
},
|
||||||
|
relations[RelationTypes.THREAD],
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
relations[RelationTypes.THREAD]["latest_event"]["event_id"],
|
||||||
|
related_event_id,
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue