Merge branch 'develop' into erikj/client_apis_move

This commit is contained in:
Erik Johnston 2018-07-24 09:57:05 +01:00 committed by GitHub
commit 536bc63a4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 352 additions and 144 deletions

0
changelog.d/3577.misc Normal file
View file

1
changelog.d/3579.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3581.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3582.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3584.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

View file

@ -65,8 +65,9 @@ class Auth(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True): def check_from_context(self, event, context, do_sig_check=True):
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events( auth_events_ids = yield self.compute_auth_events(
event, context.prev_state_ids, for_verification=True, event, prev_state_ids, for_verification=True,
) )
auth_events = yield self.store.get_events(auth_events_ids) auth_events = yield self.store.get_events(auth_events_ids)
auth_events = { auth_events = {
@ -544,7 +545,8 @@ class Auth(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def add_auth_events(self, builder, context): def add_auth_events(self, builder, context):
auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids) prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
auth_events_entries = yield self.store.add_event_hashes( auth_events_entries = yield self.store.add_event_hashes(
auth_ids auth_ids

View file

@ -19,18 +19,12 @@ from frozendict import frozendict
from twisted.internet import defer from twisted.internet import defer
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
class EventContext(object): class EventContext(object):
""" """
Attributes: Attributes:
current_state_ids (dict[(str, str), str]):
The current state map including the current event.
(type, state_key) -> event_id
prev_state_ids (dict[(str, str), str]):
The current state map excluding the current event.
(type, state_key) -> event_id
state_group (int|None): state group id, if the state has been stored state_group (int|None): state group id, if the state has been stored
as a state group. This is usually only None if e.g. the event is as a state group. This is usually only None if e.g. the event is
an outlier. an outlier.
@ -47,38 +41,77 @@ class EventContext(object):
prev_state_events (?): XXX: is this ever set to anything other than prev_state_events (?): XXX: is this ever set to anything other than
the empty list? the empty list?
_current_state_ids (dict[(str, str), str]|None):
The current state map including the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id
_prev_state_ids (dict[(str, str), str]|None):
The current state map excluding the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
been calculated. None if we haven't started calculating yet
_event_type (str): The type of the event the context is associated with.
Only set when state has not been fetched yet.
_event_state_key (str|None): The state_key of the event the context is
associated with. Only set when state has not been fetched yet.
_prev_state_id (str|None): If the event associated with the context is
a state event, then `_prev_state_id` is the event_id of the state
that was replaced.
Only set when state has not been fetched yet.
""" """
__slots__ = [ __slots__ = [
"current_state_ids",
"prev_state_ids",
"state_group", "state_group",
"rejected", "rejected",
"prev_group", "prev_group",
"delta_ids", "delta_ids",
"prev_state_events", "prev_state_events",
"app_service", "app_service",
"_current_state_ids",
"_prev_state_ids",
"_prev_state_id",
"_event_type",
"_event_state_key",
"_fetching_state_deferred",
] ]
def __init__(self): def __init__(self):
# The current state including the current event self.prev_state_events = []
self.current_state_ids = None
# The current state excluding the current event
self.prev_state_ids = None
self.state_group = None
self.rejected = False self.rejected = False
self.app_service = None
@staticmethod
def with_state(state_group, current_state_ids, prev_state_ids,
prev_group=None, delta_ids=None):
context = EventContext()
# The current state including the current event
context._current_state_ids = current_state_ids
# The current state excluding the current event
context._prev_state_ids = prev_state_ids
context.state_group = state_group
context._prev_state_id = None
context._event_type = None
context._event_state_key = None
context._fetching_state_deferred = defer.succeed(None)
# A previously persisted state group and a delta between that # A previously persisted state group and a delta between that
# and this state. # and this state.
self.prev_group = None context.prev_group = prev_group
self.delta_ids = None context.delta_ids = delta_ids
self.prev_state_events = None return context
self.app_service = None @defer.inlineCallbacks
def serialize(self, event, store):
def serialize(self, event):
"""Converts self to a type that can be serialized as JSON, and then """Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize` deserialized by `deserialize`
@ -94,11 +127,12 @@ class EventContext(object):
# the prev_state_ids, so if we're a state event we include the event # the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state. # id that we replaced in the state.
if event.is_state(): if event.is_state():
prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) prev_state_ids = yield self.get_prev_state_ids(store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
else: else:
prev_state_id = None prev_state_id = None
return { defer.returnValue({
"prev_state_id": prev_state_id, "prev_state_id": prev_state_id,
"event_type": event.type, "event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None, "event_state_key": event.state_key if event.is_state() else None,
@ -108,10 +142,9 @@ class EventContext(object):
"delta_ids": _encode_state_dict(self.delta_ids), "delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events, "prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None "app_service_id": self.app_service.id if self.app_service else None
} })
@staticmethod @staticmethod
@defer.inlineCallbacks
def deserialize(store, input): def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a """Converts a dict that was produced by `serialize` back into a
EventContext. EventContext.
@ -124,32 +157,114 @@ class EventContext(object):
EventContext EventContext
""" """
context = EventContext() context = EventContext()
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]
# We use the state_group and prev_state_id stuff to pull the # We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids. # current_state_ids out of the DB and construct prev_state_ids.
prev_state_id = input["prev_state_id"] context._prev_state_id = input["prev_state_id"]
event_type = input["event_type"] context._event_type = input["event_type"]
event_state_key = input["event_state_key"] context._event_state_key = input["event_state_key"]
context.current_state_ids = yield store.get_state_ids_for_group( context._current_state_ids = None
context.state_group, context._prev_state_ids = None
) context._fetching_state_deferred = None
if prev_state_id and event_state_key:
context.prev_state_ids = dict(context.current_state_ids) context.state_group = input["state_group"]
context.prev_state_ids[(event_type, event_state_key)] = prev_state_id context.prev_group = input["prev_group"]
else: context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_ids = context.current_state_ids
context.rejected = input["rejected"]
context.prev_state_events = input["prev_state_events"]
app_service_id = input["app_service_id"] app_service_id = input["app_service_id"]
if app_service_id: if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id) context.app_service = store.get_app_service_by_id(app_service_id)
defer.returnValue(context) return context
@defer.inlineCallbacks
def get_current_state_ids(self, store):
"""Gets the current state IDs
Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group
is None, which happens when the associated event is an outlier.
"""
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store,
)
yield make_deferred_yieldable(self._fetching_state_deferred)
defer.returnValue(self._current_state_ids)
@defer.inlineCallbacks
def get_prev_state_ids(self, store):
"""Gets the prev state IDs
Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group
is None, which happens when the associated event is an outlier.
"""
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store,
)
yield make_deferred_yieldable(self._fetching_state_deferred)
defer.returnValue(self._prev_state_ids)
def get_cached_current_state_ids(self):
"""Gets the current state IDs if we have them already cached.
Returns:
dict[(str, str), str]|None: Returns None if we haven't cached the
state or if state_group is None, which happens when the associated
event is an outlier.
"""
return self._current_state_ids
@defer.inlineCallbacks
def _fill_out_state(self, store):
"""Called to populate the _current_state_ids and _prev_state_ids
attributes by loading from the database.
"""
if self.state_group is None:
return
self._current_state_ids = yield store.get_state_ids_for_group(
self.state_group,
)
if self._prev_state_id and self._event_state_key is not None:
self._prev_state_ids = dict(self._current_state_ids)
key = (self._event_type, self._event_state_key)
self._prev_state_ids[key] = self._prev_state_id
else:
self._prev_state_ids = self._current_state_ids
@defer.inlineCallbacks
def update_state(self, state_group, prev_state_ids, current_state_ids,
delta_ids):
"""Replace the state in the context
"""
# We need to make sure we wait for any ongoing fetching of state
# to complete so that the updated state doesn't get clobbered
if self._fetching_state_deferred:
yield make_deferred_yieldable(self._fetching_state_deferred)
self.state_group = state_group
self._prev_state_ids = prev_state_ids
self._current_state_ids = current_state_ids
self.delta_ids = delta_ids
# We need to ensure that that we've marked as having fetched the state
self._fetching_state_deferred = defer.succeed(None)
def _encode_state_dict(state_dict): def _encode_state_dict(state_dict):

View file

@ -112,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden") guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join": if guest_access != "can_join":
if context: if context:
current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events( current_state = yield self.store.get_events(
list(context.current_state_ids.values()) list(current_state_ids.values())
) )
else: else:
current_state = yield self.state_handler.get_current_state( current_state = yield self.state_handler.get_current_state(

View file

@ -486,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just # joined the room. Don't bother if the user is just
# changing their profile info. # changing their profile info.
newly_joined = True newly_joined = True
prev_state_id = context.prev_state_ids.get(
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_id = prev_state_ids.get(
(event.type, event.state_key) (event.type, event.state_key)
) )
if prev_state_id: if prev_state_id:
@ -1106,10 +1109,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key) user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id) yield user_joined_room(self.distributor, user, event.room_id)
state_ids = list(context.prev_state_ids.values()) prev_state_ids = yield context.get_prev_state_ids(self.store)
state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids) auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(list(context.prev_state_ids.values())) state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({ defer.returnValue({
"state": list(state.values()), "state": list(state.values()),
@ -1635,8 +1640,9 @@ class FederationHandler(BaseHandler):
) )
if not auth_events: if not auth_events:
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events( auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=True, event, prev_state_ids, for_verification=True,
) )
auth_events = yield self.store.get_events(auth_events_ids) auth_events = yield self.store.get_events(auth_events_ids)
auth_events = { auth_events = {
@ -1876,9 +1882,10 @@ class FederationHandler(BaseHandler):
break break
if do_resolution: if do_resolution:
prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain. # 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events( auth_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids event, prev_state_ids
) )
local_auth_chain = yield self.store.get_auth_chain( local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True auth_ids, include_given=True
@ -1968,21 +1975,35 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events) k: a.event_id for k, a in iteritems(auth_events)
if k != event_key if k != event_key
} }
context.current_state_ids = dict(context.current_state_ids) current_state_ids = yield context.get_current_state_ids(self.store)
context.current_state_ids.update(state_updates) current_state_ids = dict(current_state_ids)
current_state_ids.update(state_updates)
if context.delta_ids is not None: if context.delta_ids is not None:
context.delta_ids = dict(context.delta_ids) delta_ids = dict(context.delta_ids)
context.delta_ids.update(state_updates) delta_ids.update(state_updates)
context.prev_state_ids = dict(context.prev_state_ids)
context.prev_state_ids.update({ prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = dict(prev_state_ids)
prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events) k: a.event_id for k, a in iteritems(auth_events)
}) })
context.state_group = yield self.store.store_state_group(
state_group = yield self.store.store_state_group(
event.event_id, event.event_id,
event.room_id, event.room_id,
prev_group=context.prev_group, prev_group=context.prev_group,
delta_ids=context.delta_ids, delta_ids=delta_ids,
current_state_ids=context.current_state_ids, current_state_ids=current_state_ids,
)
yield context.update_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
delta_ids=delta_ids,
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -2222,7 +2243,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"] event.content["third_party_invite"]["signed"]["token"]
) )
original_invite = None original_invite = None
original_invite_id = context.prev_state_ids.get(key) prev_state_ids = yield context.get_prev_state_ids(self.store)
original_invite_id = prev_state_ids.get(key)
if original_invite_id: if original_invite_id:
original_invite = yield self.store.get_event( original_invite = yield self.store.get_event(
original_invite_id, allow_none=True original_invite_id, allow_none=True
@ -2264,7 +2286,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"] signed = event.content["third_party_invite"]["signed"]
token = signed["token"] token = signed["token"]
invite_event_id = context.prev_state_ids.get( prev_state_ids = yield context.get_prev_state_ids(self.store)
invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,) (EventTypes.ThirdPartyInvite, token,)
) )

View file

@ -383,7 +383,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context. If so, returns the version of the event in context.
Otherwise, returns None. Otherwise, returns None.
""" """
prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True) prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event: if not prev_event:
return return
@ -505,8 +506,8 @@ class EventCreationHandler(object):
event = builder.build() event = builder.build()
logger.debug( logger.debug(
"Created event %s with state: %s", "Created event %s",
event.event_id, context.prev_state_ids, event.event_id,
) )
defer.returnValue( defer.returnValue(
@ -559,8 +560,9 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master. # If we're a worker we need to hit out to the master.
if self.config.worker_app: if self.config.worker_app:
yield send_event_to_master( yield send_event_to_master(
self.hs.get_clock(), clock=self.hs.get_clock(),
self.http_client, store=self.store,
client=self.http_client,
host=self.config.worker_replication_host, host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port, port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
@ -637,9 +639,11 @@ class EventCreationHandler(object):
e.sender == event.sender e.sender == event.sender
) )
current_state_ids = yield context.get_current_state_ids(self.store)
state_to_include_ids = [ state_to_include_ids = [
e_id e_id
for k, e_id in iteritems(context.current_state_ids) for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender) or k == (EventTypes.Member, event.sender)
] ]
@ -675,8 +679,9 @@ class EventCreationHandler(object):
) )
if event.type == EventTypes.Redaction: if event.type == EventTypes.Redaction:
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events( auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=True, event, prev_state_ids, for_verification=True,
) )
auth_events = yield self.store.get_events(auth_events_ids) auth_events = yield self.store.get_events(auth_events_ids)
auth_events = { auth_events = {
@ -696,11 +701,13 @@ class EventCreationHandler(object):
"You don't have permission to redact events" "You don't have permission to redact events"
) )
if event.type == EventTypes.Create and context.prev_state_ids: if event.type == EventTypes.Create:
raise AuthError( prev_state_ids = yield context.get_prev_state_ids(self.store)
403, if prev_state_ids:
"Changing the room create event is forbidden", raise AuthError(
) 403,
"Changing the room create event is forbidden",
)
(event_stream_id, max_stream_id) = yield self.store.persist_event( (event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context event, context=context

View file

@ -201,7 +201,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit, ratelimit=ratelimit,
) )
prev_member_event_id = context.prev_state_ids.get( prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()), (EventTypes.Member, target.to_string()),
None None
) )
@ -496,9 +498,10 @@ class RoomMemberHandler(object):
if prev_event is not None: if prev_event is not None:
return return
prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
if requester.is_guest: if requester.is_guest:
guest_can_join = yield self._can_guest_join(context.prev_state_ids) guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join: if not guest_can_join:
# This should be an auth check, but guests are a local concept, # This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process. # so don't really fit into the general auth process.
@ -517,7 +520,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit, ratelimit=ratelimit,
) )
prev_member_event_id = context.prev_state_ids.get( prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key), (EventTypes.Member, event.state_key),
None None
) )

View file

@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_power_levels_and_sender_level(self, event, context): def _get_power_levels_and_sender_level(self, event, context):
pl_event_id = context.prev_state_ids.get(POWER_KEY) prev_state_ids = yield context.get_prev_state_ids(self.store)
pl_event_id = prev_state_ids.get(POWER_KEY)
if pl_event_id: if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and # fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case # not having a power level event is an extreme edge case
@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
auth_events = {POWER_KEY: pl_event} auth_events = {POWER_KEY: pl_event}
else: else:
auth_events_ids = yield self.auth.compute_auth_events( auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=False, event, prev_state_ids, for_verification=False,
) )
auth_events = yield self.store.get_events(auth_events_ids) auth_events = yield self.store.get_events(auth_events_ids)
auth_events = { auth_events = {
@ -304,7 +305,7 @@ class RulesForRoom(object):
push_rules_delta_state_cache_metric.inc_hits() push_rules_delta_state_cache_metric.inc_hits()
else: else:
current_state_ids = context.current_state_ids current_state_ids = yield context.get_current_state_ids(self.store)
push_rules_delta_state_cache_metric.inc_misses() push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc(len(current_state_ids)) push_rules_state_size_counter.inc(len(current_state_ids))

View file

@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks @defer.inlineCallbacks
def send_event_to_master(clock, client, host, port, requester, event, context, def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users): ratelimit, extra_users):
"""Send event to be handled on the master """Send event to be handled on the master
Args: Args:
clock (synapse.util.Clock) clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient) client (SimpleHttpClient)
host (str): host of master host (str): host of master
port (int): port on master listening for HTTP replication port (int): port on master listening for HTTP replication
@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
host, port, event.event_id, host, port, event.event_id,
) )
serialized_context = yield context.serialize(event, store)
payload = { payload = {
"event": event.get_pdu_json(), "event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(), "internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason, "rejected_reason": event.rejected_reason,
"context": context.serialize(event), "context": serialized_context,
"requester": requester.serialize(), "requester": requester.serialize(),
"ratelimit": ratelimit, "ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users], "extra_users": [u.to_string() for u in extra_users],

View file

@ -203,25 +203,27 @@ class StateHandler(object):
# If this is an outlier, then we know it shouldn't have any current # If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and # state. Certainly store.get_current_state won't return any, and
# persisting the event won't store the state group. # persisting the event won't store the state group.
context = EventContext()
if old_state: if old_state:
context.prev_state_ids = { prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state (s.type, s.state_key): s.event_id for s in old_state
} }
if event.is_state(): if event.is_state():
context.current_state_ids = dict(context.prev_state_ids) current_state_ids = dict(prev_state_ids)
key = (event.type, event.state_key) key = (event.type, event.state_key)
context.current_state_ids[key] = event.event_id current_state_ids[key] = event.event_id
else: else:
context.current_state_ids = context.prev_state_ids current_state_ids = prev_state_ids
else: else:
context.current_state_ids = {} current_state_ids = {}
context.prev_state_ids = {} prev_state_ids = {}
context.prev_state_events = []
# We don't store state for outliers, so we don't generate a state # We don't store state for outliers, so we don't generate a state
# froup for it. # group for it.
context.state_group = None context = EventContext.with_state(
state_group=None,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
)
defer.returnValue(context) defer.returnValue(context)
@ -230,31 +232,35 @@ class StateHandler(object):
# Let's just correctly fill out the context and create a # Let's just correctly fill out the context and create a
# new state group for it. # new state group for it.
context = EventContext() prev_state_ids = {
context.prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state (s.type, s.state_key): s.event_id for s in old_state
} }
if event.is_state(): if event.is_state():
key = (event.type, event.state_key) key = (event.type, event.state_key)
if key in context.prev_state_ids: if key in prev_state_ids:
replaces = context.prev_state_ids[key] replaces = prev_state_ids[key]
if replaces != event.event_id: # Paranoia check if replaces != event.event_id: # Paranoia check
event.unsigned["replaces_state"] = replaces event.unsigned["replaces_state"] = replaces
context.current_state_ids = dict(context.prev_state_ids) current_state_ids = dict(prev_state_ids)
context.current_state_ids[key] = event.event_id current_state_ids[key] = event.event_id
else: else:
context.current_state_ids = context.prev_state_ids current_state_ids = prev_state_ids
context.state_group = yield self.store.store_state_group( state_group = yield self.store.store_state_group(
event.event_id, event.event_id,
event.room_id, event.room_id,
prev_group=None, prev_group=None,
delta_ids=None, delta_ids=None,
current_state_ids=context.current_state_ids, current_state_ids=current_state_ids,
)
context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
) )
context.prev_state_events = []
defer.returnValue(context) defer.returnValue(context)
logger.debug("calling resolve_state_groups from compute_event_context") logger.debug("calling resolve_state_groups from compute_event_context")
@ -262,47 +268,47 @@ class StateHandler(object):
event.room_id, [e for e, _ in event.prev_events], event.room_id, [e for e, _ in event.prev_events],
) )
curr_state = entry.state prev_state_ids = entry.state
prev_group = None
delta_ids = None
context = EventContext()
context.prev_state_ids = curr_state
if event.is_state(): if event.is_state():
# If this is a state event then we need to create a new state # If this is a state event then we need to create a new state
# group for the state after this event. # group for the state after this event.
key = (event.type, event.state_key) key = (event.type, event.state_key)
if key in context.prev_state_ids: if key in prev_state_ids:
replaces = context.prev_state_ids[key] replaces = prev_state_ids[key]
event.unsigned["replaces_state"] = replaces event.unsigned["replaces_state"] = replaces
context.current_state_ids = dict(context.prev_state_ids) current_state_ids = dict(prev_state_ids)
context.current_state_ids[key] = event.event_id current_state_ids[key] = event.event_id
if entry.state_group: if entry.state_group:
# If the state at the event has a state group assigned then # If the state at the event has a state group assigned then
# we can use that as the prev group # we can use that as the prev group
context.prev_group = entry.state_group prev_group = entry.state_group
context.delta_ids = { delta_ids = {
key: event.event_id key: event.event_id
} }
elif entry.prev_group: elif entry.prev_group:
# If the state at the event only has a prev group, then we can # If the state at the event only has a prev group, then we can
# use that as a prev group too. # use that as a prev group too.
context.prev_group = entry.prev_group prev_group = entry.prev_group
context.delta_ids = dict(entry.delta_ids) delta_ids = dict(entry.delta_ids)
context.delta_ids[key] = event.event_id delta_ids[key] = event.event_id
context.state_group = yield self.store.store_state_group( state_group = yield self.store.store_state_group(
event.event_id, event.event_id,
event.room_id, event.room_id,
prev_group=context.prev_group, prev_group=prev_group,
delta_ids=context.delta_ids, delta_ids=delta_ids,
current_state_ids=context.current_state_ids, current_state_ids=current_state_ids,
) )
else: else:
context.current_state_ids = context.prev_state_ids current_state_ids = prev_state_ids
context.prev_group = entry.prev_group prev_group = entry.prev_group
context.delta_ids = entry.delta_ids delta_ids = entry.delta_ids
if entry.state_group is None: if entry.state_group is None:
entry.state_group = yield self.store.store_state_group( entry.state_group = yield self.store.store_state_group(
@ -310,13 +316,20 @@ class StateHandler(object):
event.room_id, event.room_id,
prev_group=entry.prev_group, prev_group=entry.prev_group,
delta_ids=entry.delta_ids, delta_ids=entry.delta_ids,
current_state_ids=context.current_state_ids, current_state_ids=current_state_ids,
) )
entry.state_id = entry.state_group entry.state_id = entry.state_group
context.state_group = entry.state_group state_group = entry.state_group
context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
prev_group=prev_group,
delta_ids=delta_ids,
)
context.prev_state_events = []
defer.returnValue(context) defer.returnValue(context)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -549,7 +549,12 @@ class EventsStore(EventsWorkerStore):
if ctx.state_group in state_groups_map: if ctx.state_group in state_groups_map:
continue continue
state_groups_map[ctx.state_group] = ctx.current_state_ids # We're only interested in pulling out state that has already
# been cached in the context. We'll pull stuff out of the DB later
# if necessary.
current_state_ids = ctx.get_cached_current_state_ids()
if current_state_ids is not None:
state_groups_map[ctx.state_group] = current_state_ids
# We need to map the event_ids to their state groups. First, let's # We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can # check if the event is one we're persisting, in which case we can

View file

@ -186,6 +186,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks
def bulk_get_push_rules_for_room(self, event, context): def bulk_get_push_rules_for_room(self, event, context):
state_group = context.state_group state_group = context.state_group
if not state_group: if not state_group:
@ -195,9 +196,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
# To do this we set the state_group to a new object as object() != object() # To do this we set the state_group to a new object as object() != object()
state_group = object() state_group = object()
return self._bulk_get_push_rules_for_room( current_state_ids = yield context.get_current_state_ids(self)
event.room_id, state_group, context.current_state_ids, event=event result = yield self._bulk_get_push_rules_for_room(
event.room_id, state_group, current_state_ids, event=event
) )
defer.returnValue(result)
@cachedInlineCallbacks(num_args=2, cache_context=True) @cachedInlineCallbacks(num_args=2, cache_context=True)
def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids, def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,

View file

@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(user_who_share_room) defer.returnValue(user_who_share_room)
@defer.inlineCallbacks
def get_joined_users_from_context(self, event, context): def get_joined_users_from_context(self, event, context):
state_group = context.state_group state_group = context.state_group
if not state_group: if not state_group:
@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object() # To do this we set the state_group to a new object as object() != object()
state_group = object() state_group = object()
return self._get_joined_users_from_context( current_state_ids = yield context.get_current_state_ids(self)
event.room_id, state_group, context.current_state_ids, result = yield self._get_joined_users_from_context(
event.room_id, state_group, current_state_ids,
event=event, event=event,
context=context, context=context,
) )
defer.returnValue(result)
def get_joined_users_from_state(self, room_id, state_entry): def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group state_group = state_entry.state_group

View file

@ -222,9 +222,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
state_ids = { state_ids = {
key: e.event_id for key, e in state.items() key: e.event_id for key, e in state.items()
} }
context = EventContext() context = EventContext.with_state(
context.current_state_ids = state_ids state_group=None,
context.prev_state_ids = state_ids current_state_ids=state_ids,
prev_state_ids=state_ids
)
else: else:
state_handler = self.hs.get_state_handler() state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event) context = yield state_handler.compute_event_context(event)

View file

@ -204,7 +204,8 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context) self.store.register_event_context(event, context)
context_store[event.event_id] = context context_store[event.event_id] = context
self.assertEqual(2, len(context_store["D"].prev_state_ids)) prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
self.assertEqual(2, len(prev_state_ids))
@defer.inlineCallbacks @defer.inlineCallbacks
def test_branch_basic_conflict(self): def test_branch_basic_conflict(self):
@ -255,9 +256,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context) self.store.register_event_context(event, context)
context_store[event.event_id] = context context_store[event.event_id] = context
prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
self.assertSetEqual( self.assertSetEqual(
{"START", "A", "C"}, {"START", "A", "C"},
{e_id for e_id in context_store["D"].prev_state_ids.values()} {e_id for e_id in prev_state_ids.values()}
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -318,9 +321,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context) self.store.register_event_context(event, context)
context_store[event.event_id] = context context_store[event.event_id] = context
prev_state_ids = yield context_store["E"].get_prev_state_ids(self.store)
self.assertSetEqual( self.assertSetEqual(
{"START", "A", "B", "C"}, {"START", "A", "B", "C"},
{e for e in context_store["E"].prev_state_ids.values()} {e for e in prev_state_ids.values()}
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -398,9 +403,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context) self.store.register_event_context(event, context)
context_store[event.event_id] = context context_store[event.event_id] = context
prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
self.assertSetEqual( self.assertSetEqual(
{"A1", "A2", "A3", "A5", "B"}, {"A1", "A2", "A3", "A5", "B"},
{e for e in context_store["D"].prev_state_ids.values()} {e for e in prev_state_ids.values()}
) )
def _add_depths(self, nodes, edges): def _add_depths(self, nodes, edges):
@ -429,8 +436,10 @@ class StateTestCase(unittest.TestCase):
event, old_state=old_state event, old_state=old_state
) )
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual( self.assertEqual(
set(e.event_id for e in old_state), set(context.current_state_ids.values()) set(e.event_id for e in old_state), set(current_state_ids.values())
) )
self.assertIsNotNone(context.state_group) self.assertIsNotNone(context.state_group)
@ -449,8 +458,10 @@ class StateTestCase(unittest.TestCase):
event, old_state=old_state event, old_state=old_state
) )
prev_state_ids = yield context.get_prev_state_ids(self.store)
self.assertEqual( self.assertEqual(
set(e.event_id for e in old_state), set(context.prev_state_ids.values()) set(e.event_id for e in old_state), set(prev_state_ids.values())
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -475,9 +486,11 @@ class StateTestCase(unittest.TestCase):
context = yield self.state.compute_event_context(event) context = yield self.state.compute_event_context(event)
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual( self.assertEqual(
set([e.event_id for e in old_state]), set([e.event_id for e in old_state]),
set(context.current_state_ids.values()) set(current_state_ids.values())
) )
self.assertEqual(group_name, context.state_group) self.assertEqual(group_name, context.state_group)
@ -504,9 +517,11 @@ class StateTestCase(unittest.TestCase):
context = yield self.state.compute_event_context(event) context = yield self.state.compute_event_context(event)
prev_state_ids = yield context.get_prev_state_ids(self.store)
self.assertEqual( self.assertEqual(
set([e.event_id for e in old_state]), set([e.event_id for e in old_state]),
set(context.prev_state_ids.values()) set(prev_state_ids.values())
) )
self.assertIsNotNone(context.state_group) self.assertIsNotNone(context.state_group)
@ -545,7 +560,9 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
) )
self.assertEqual(len(context.current_state_ids), 6) current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(len(current_state_ids), 6)
self.assertIsNotNone(context.state_group) self.assertIsNotNone(context.state_group)
@ -585,7 +602,9 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
) )
self.assertEqual(len(context.current_state_ids), 6) current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(len(current_state_ids), 6)
self.assertIsNotNone(context.state_group) self.assertIsNotNone(context.state_group)
@ -642,8 +661,10 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
) )
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual( self.assertEqual(
old_state_2[3].event_id, context.current_state_ids[("test1", "1")] old_state_2[3].event_id, current_state_ids[("test1", "1")]
) )
# Reverse the depth to make sure we are actually using the depths # Reverse the depth to make sure we are actually using the depths
@ -670,8 +691,10 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
) )
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual( self.assertEqual(
old_state_1[3].event_id, context.current_state_ids[("test1", "1")] old_state_1[3].event_id, current_state_ids[("test1", "1")]
) )
def _get_context(self, event, prev_event_id_1, old_state_1, prev_event_id_2, def _get_context(self, event, prev_event_id_1, old_state_1, prev_event_id_2,