forked from MirrorHub/synapse
Implementation of state rollback in /sync
Implementation of SPEC-254: roll back the state dictionary to how it looked at the start of the timeline. Merged PR https://github.com/matrix-org/synapse/pull/373
This commit is contained in:
parent
fddedd51d9
commit
e4d622aaaf
2 changed files with 69 additions and 4 deletions
|
@ -20,6 +20,7 @@ from synapse.http.servlet import (
|
||||||
)
|
)
|
||||||
from synapse.handlers.sync import SyncConfig
|
from synapse.handlers.sync import SyncConfig
|
||||||
from synapse.types import StreamToken
|
from synapse.types import StreamToken
|
||||||
|
from synapse.events import FrozenEvent
|
||||||
from synapse.events.utils import (
|
from synapse.events.utils import (
|
||||||
serialize_event, format_event_for_client_v2_without_event_id,
|
serialize_event, format_event_for_client_v2_without_event_id,
|
||||||
)
|
)
|
||||||
|
@ -256,7 +257,13 @@ class SyncRestServlet(RestServlet):
|
||||||
:rtype: dict[str, object]
|
:rtype: dict[str, object]
|
||||||
"""
|
"""
|
||||||
event_map = {}
|
event_map = {}
|
||||||
state_events = filter.filter_room_state(room.state.values())
|
state_dict = room.state
|
||||||
|
timeline_events = filter.filter_room_timeline(room.timeline.events)
|
||||||
|
|
||||||
|
state_dict = SyncRestServlet._rollback_state_for_timeline(
|
||||||
|
state_dict, timeline_events)
|
||||||
|
|
||||||
|
state_events = filter.filter_room_state(state_dict.values())
|
||||||
state_event_ids = []
|
state_event_ids = []
|
||||||
for event in state_events:
|
for event in state_events:
|
||||||
# TODO(mjark): Respect formatting requirements in the filter.
|
# TODO(mjark): Respect formatting requirements in the filter.
|
||||||
|
@ -266,7 +273,6 @@ class SyncRestServlet(RestServlet):
|
||||||
)
|
)
|
||||||
state_event_ids.append(event.event_id)
|
state_event_ids.append(event.event_id)
|
||||||
|
|
||||||
timeline_events = filter.filter_room_timeline(room.timeline.events)
|
|
||||||
timeline_event_ids = []
|
timeline_event_ids = []
|
||||||
for event in timeline_events:
|
for event in timeline_events:
|
||||||
# TODO(mjark): Respect formatting requirements in the filter.
|
# TODO(mjark): Respect formatting requirements in the filter.
|
||||||
|
@ -297,6 +303,63 @@ class SyncRestServlet(RestServlet):
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _rollback_state_for_timeline(state, timeline):
|
||||||
|
"""
|
||||||
|
Wind the state dictionary backwards, so that it represents the
|
||||||
|
state at the start of the timeline, rather than at the end.
|
||||||
|
|
||||||
|
:param dict[(str, str), synapse.events.EventBase] state: the
|
||||||
|
state dictionary. Will be updated to the state before the timeline.
|
||||||
|
:param list[synapse.events.EventBase] timeline: the event timeline
|
||||||
|
:return: updated state dictionary
|
||||||
|
"""
|
||||||
|
logger.debug("Processing state dict %r; timeline %r", state,
|
||||||
|
[e.get_dict() for e in timeline])
|
||||||
|
|
||||||
|
result = state.copy()
|
||||||
|
|
||||||
|
for timeline_event in reversed(timeline):
|
||||||
|
if not timeline_event.is_state():
|
||||||
|
continue
|
||||||
|
|
||||||
|
event_key = (timeline_event.type, timeline_event.state_key)
|
||||||
|
|
||||||
|
logger.debug("Considering %s for removal", event_key)
|
||||||
|
|
||||||
|
state_event = result.get(event_key)
|
||||||
|
if (state_event is None or
|
||||||
|
state_event.event_id != timeline_event.event_id):
|
||||||
|
# the event in the timeline isn't present in the state
|
||||||
|
# dictionary.
|
||||||
|
#
|
||||||
|
# the most likely cause for this is that there was a fork in
|
||||||
|
# the event graph, and the state is no longer valid. Really,
|
||||||
|
# the event shouldn't be in the timeline. We're going to ignore
|
||||||
|
# it for now, however.
|
||||||
|
logger.warn("Found state event %r in timeline which doesn't "
|
||||||
|
"match state dictionary", timeline_event)
|
||||||
|
continue
|
||||||
|
|
||||||
|
prev_event_id = timeline_event.unsigned.get("replaces_state", None)
|
||||||
|
logger.debug("Replacing %s with %s in state dict",
|
||||||
|
timeline_event.event_id, prev_event_id)
|
||||||
|
|
||||||
|
if prev_event_id is None:
|
||||||
|
del result[event_key]
|
||||||
|
else:
|
||||||
|
result[event_key] = FrozenEvent({
|
||||||
|
"type": timeline_event.type,
|
||||||
|
"state_key": timeline_event.state_key,
|
||||||
|
"content": timeline_event.unsigned['prev_content'],
|
||||||
|
"sender": timeline_event.unsigned['prev_sender'],
|
||||||
|
"event_id": prev_event_id,
|
||||||
|
"room_id": timeline_event.room_id,
|
||||||
|
})
|
||||||
|
logger.debug("New value: %r", result.get(event_key))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def register_servlets(hs, http_server):
|
def register_servlets(hs, http_server):
|
||||||
SyncRestServlet(hs).register(http_server)
|
SyncRestServlet(hs).register(http_server)
|
||||||
|
|
|
@ -831,7 +831,8 @@ class EventsStore(SQLBaseStore):
|
||||||
allow_none=True,
|
allow_none=True,
|
||||||
)
|
)
|
||||||
if prev:
|
if prev:
|
||||||
ev.unsigned["prev_content"] = prev.get_dict()["content"]
|
ev.unsigned["prev_content"] = prev.content
|
||||||
|
ev.unsigned["prev_sender"] = prev.sender
|
||||||
|
|
||||||
self._get_event_cache.prefill(
|
self._get_event_cache.prefill(
|
||||||
(ev.event_id, check_redacted, get_prev_content), ev
|
(ev.event_id, check_redacted, get_prev_content), ev
|
||||||
|
@ -888,7 +889,8 @@ class EventsStore(SQLBaseStore):
|
||||||
get_prev_content=False,
|
get_prev_content=False,
|
||||||
)
|
)
|
||||||
if prev:
|
if prev:
|
||||||
ev.unsigned["prev_content"] = prev.get_dict()["content"]
|
ev.unsigned["prev_content"] = prev.content
|
||||||
|
ev.unsigned["prev_sender"] = prev.sender
|
||||||
|
|
||||||
self._get_event_cache.prefill(
|
self._get_event_cache.prefill(
|
||||||
(ev.event_id, check_redacted, get_prev_content), ev
|
(ev.event_id, check_redacted, get_prev_content), ev
|
||||||
|
|
Loading…
Reference in a new issue