forked from MirrorHub/synapse
Don't serialize current state over replication
This commit is contained in:
parent
5fb347fc41
commit
106906a65e
3 changed files with 51 additions and 8 deletions
|
@ -13,6 +13,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
from frozendict import frozendict
|
from frozendict import frozendict
|
||||||
|
|
||||||
|
@ -77,16 +78,30 @@ class EventContext(object):
|
||||||
|
|
||||||
self.app_service = None
|
self.app_service = None
|
||||||
|
|
||||||
def serialize(self):
|
def serialize(self, event):
|
||||||
"""Converts self to a type that can be serialized as JSON, and then
|
"""Converts self to a type that can be serialized as JSON, and then
|
||||||
deserialized by `deserialize`
|
deserialized by `deserialize`
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event (FrozenEvent): The event that this context relates to
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict
|
dict
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# We don't serialize the full state dicts, instead they get pulled out
|
||||||
|
# of the DB on the other side. However, the other side can't figure out
|
||||||
|
# the prev_state_ids, so if we're a state event we include the event
|
||||||
|
# id that we replaced in the state.
|
||||||
|
if event.is_state():
|
||||||
|
prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
|
||||||
|
else:
|
||||||
|
prev_state_id = None
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"current_state_ids": _encode_state_dict(self.current_state_ids),
|
"prev_state_id": prev_state_id,
|
||||||
"prev_state_ids": _encode_state_dict(self.prev_state_ids),
|
"event_type": event.type,
|
||||||
|
"event_state_key": event.state_key if event.is_state() else None,
|
||||||
"state_group": self.state_group,
|
"state_group": self.state_group,
|
||||||
"rejected": self.rejected,
|
"rejected": self.rejected,
|
||||||
"push_actions": self.push_actions,
|
"push_actions": self.push_actions,
|
||||||
|
@ -97,6 +112,7 @@ class EventContext(object):
|
||||||
}
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@defer.inlineCallbacks
|
||||||
def deserialize(store, input):
|
def deserialize(store, input):
|
||||||
"""Converts a dict that was produced by `serialize` back into a
|
"""Converts a dict that was produced by `serialize` back into a
|
||||||
EventContext.
|
EventContext.
|
||||||
|
@ -109,8 +125,6 @@ class EventContext(object):
|
||||||
EventContext
|
EventContext
|
||||||
"""
|
"""
|
||||||
context = EventContext()
|
context = EventContext()
|
||||||
context.current_state_ids = _decode_state_dict(input["current_state_ids"])
|
|
||||||
context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
|
|
||||||
context.state_group = input["state_group"]
|
context.state_group = input["state_group"]
|
||||||
context.rejected = input["rejected"]
|
context.rejected = input["rejected"]
|
||||||
context.push_actions = input["push_actions"]
|
context.push_actions = input["push_actions"]
|
||||||
|
@ -118,11 +132,26 @@ class EventContext(object):
|
||||||
context.delta_ids = _decode_state_dict(input["delta_ids"])
|
context.delta_ids = _decode_state_dict(input["delta_ids"])
|
||||||
context.prev_state_events = input["prev_state_events"]
|
context.prev_state_events = input["prev_state_events"]
|
||||||
|
|
||||||
|
# We use the state_group and prev_state_id stuff to pull the
|
||||||
|
# current_state_ids out of the DB and construct prev_state_ids.
|
||||||
|
prev_state_id = input["prev_state_id"]
|
||||||
|
event_type = input["event_type"]
|
||||||
|
event_state_key = input["event_state_key"]
|
||||||
|
|
||||||
|
context.current_state_ids = yield store.get_state_ids_for_group(
|
||||||
|
context.state_group,
|
||||||
|
)
|
||||||
|
if prev_state_id and event_state_key:
|
||||||
|
context.prev_state_ids = dict(context.current_state_ids)
|
||||||
|
context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
|
||||||
|
else:
|
||||||
|
context.prev_state_ids = context.current_state_ids
|
||||||
|
|
||||||
app_service_id = input["app_service_id"]
|
app_service_id = input["app_service_id"]
|
||||||
if app_service_id:
|
if app_service_id:
|
||||||
context.app_service = store.get_app_service_by_id(app_service_id)
|
context.app_service = store.get_app_service_by_id(app_service_id)
|
||||||
|
|
||||||
return context
|
defer.returnValue(context)
|
||||||
|
|
||||||
|
|
||||||
def _encode_state_dict(state_dict):
|
def _encode_state_dict(state_dict):
|
||||||
|
|
|
@ -46,7 +46,7 @@ def send_event_to_master(client, host, port, requester, event, context):
|
||||||
"event": event.get_pdu_json(),
|
"event": event.get_pdu_json(),
|
||||||
"internal_metadata": event.internal_metadata.get_dict(),
|
"internal_metadata": event.internal_metadata.get_dict(),
|
||||||
"rejected_reason": event.rejected_reason,
|
"rejected_reason": event.rejected_reason,
|
||||||
"context": context.serialize(),
|
"context": context.serialize(event),
|
||||||
"requester": requester.serialize(),
|
"requester": requester.serialize(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ class ReplicationSendEventRestServlet(RestServlet):
|
||||||
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
|
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
|
||||||
|
|
||||||
requester = Requester.deserialize(self.store, content["requester"])
|
requester = Requester.deserialize(self.store, content["requester"])
|
||||||
context = EventContext.deserialize(self.store, content["context"])
|
context = yield EventContext.deserialize(self.store, content["context"])
|
||||||
|
|
||||||
if requester.user:
|
if requester.user:
|
||||||
request.authenticated_entity = requester.user.to_string()
|
request.authenticated_entity = requester.user.to_string()
|
||||||
|
|
|
@ -139,6 +139,20 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
defer.returnValue(group_to_state)
|
defer.returnValue(group_to_state)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_state_ids_for_group(self, state_group):
|
||||||
|
"""Get the state IDs for the given state group
|
||||||
|
|
||||||
|
Args:
|
||||||
|
state_group (int)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[dict]: Resolves to a map of (type, state_key) -> event_id
|
||||||
|
"""
|
||||||
|
group_to_state = yield self._get_state_for_groups((state_group,))
|
||||||
|
|
||||||
|
defer.returnValue(group_to_state[state_group])
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_state_groups(self, room_id, event_ids):
|
def get_state_groups(self, room_id, event_ids):
|
||||||
""" Get the state groups for the given list of event_ids
|
""" Get the state groups for the given list of event_ids
|
||||||
|
|
Loading…
Reference in a new issue