diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 7b80444f7..f9445bef1 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from frozendict import frozendict @@ -77,16 +78,30 @@ class EventContext(object): self.app_service = None - def serialize(self): + def serialize(self, event): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` + Args: + event (FrozenEvent): The event that this context relates to + Returns: dict """ + + # We don't serialize the full state dicts, instead they get pulled out + # of the DB on the other side. However, the other side can't figure out + # the prev_state_ids, so if we're a state event we include the event + # id that we replaced in the state. + if event.is_state(): + prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) + else: + prev_state_id = None + return { - "current_state_ids": _encode_state_dict(self.current_state_ids), - "prev_state_ids": _encode_state_dict(self.prev_state_ids), + "prev_state_id": prev_state_id, + "event_type": event.type, + "event_state_key": event.state_key if event.is_state() else None, "state_group": self.state_group, "rejected": self.rejected, "push_actions": self.push_actions, @@ -97,6 +112,7 @@ class EventContext(object): } @staticmethod + @defer.inlineCallbacks def deserialize(store, input): """Converts a dict that was produced by `serialize` back into a EventContext. @@ -109,8 +125,6 @@ class EventContext(object): EventContext """ context = EventContext() - context.current_state_ids = _decode_state_dict(input["current_state_ids"]) - context.prev_state_ids = _decode_state_dict(input["prev_state_ids"]) context.state_group = input["state_group"] context.rejected = input["rejected"] context.push_actions = input["push_actions"] @@ -118,11 +132,26 @@ class EventContext(object): context.delta_ids = _decode_state_dict(input["delta_ids"]) context.prev_state_events = input["prev_state_events"] + # We use the state_group and prev_state_id stuff to pull the + # current_state_ids out of the DB and construct prev_state_ids. + prev_state_id = input["prev_state_id"] + event_type = input["event_type"] + event_state_key = input["event_state_key"] + + context.current_state_ids = yield store.get_state_ids_for_group( + context.state_group, + ) + if prev_state_id and event_state_key: + context.prev_state_ids = dict(context.current_state_ids) + context.prev_state_ids[(event_type, event_state_key)] = prev_state_id + else: + context.prev_state_ids = context.current_state_ids + app_service_id = input["app_service_id"] if app_service_id: context.app_service = store.get_app_service_by_id(app_service_id) - return context + defer.returnValue(context) def _encode_state_dict(state_dict): diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 7b21a2213..468f4b68f 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -46,7 +46,7 @@ def send_event_to_master(client, host, port, requester, event, context): "event": event.get_pdu_json(), "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, - "context": context.serialize(), + "context": context.serialize(event), "requester": requester.serialize(), } @@ -96,7 +96,7 @@ class ReplicationSendEventRestServlet(RestServlet): event = FrozenEvent(event_dict, internal_metadata, rejected_reason) requester = Requester.deserialize(self.store, content["requester"]) - context = EventContext.deserialize(self.store, content["context"]) + context = yield EventContext.deserialize(self.store, content["context"]) if requester.user: request.authenticated_entity = requester.user.to_string() diff --git a/synapse/storage/state.py b/synapse/storage/state.py index d0a840456..2b325e1c1 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -139,6 +139,20 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue(group_to_state) + @defer.inlineCallbacks + def get_state_ids_for_group(self, state_group): + """Get the state IDs for the given state group + + Args: + state_group (int) + + Returns: + Deferred[dict]: Resolves to a map of (type, state_key) -> event_id + """ + group_to_state = yield self._get_state_for_groups((state_group,)) + + defer.returnValue(group_to_state[state_group]) + @defer.inlineCallbacks def get_state_groups(self, room_id, event_ids): """ Get the state groups for the given list of event_ids