Merge pull request #3621 from matrix-org/erikj/split_fed_store

Split out DB writes in federation handler
This commit is contained in:
Erik Johnston 2018-08-02 10:41:42 +01:00 committed by GitHub
commit 40c1c59cf4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 89 deletions

1
changelog.d/3621.misc Normal file
View file

@ -0,0 +1 @@
Refactor FederationHandler to move DB writes into separate functions

View file

@ -400,7 +400,7 @@ class FederationHandler(BaseHandler):
) )
try: try:
event_stream_id, max_stream_id = yield self._persist_auth_tree( yield self._persist_auth_tree(
origin, auth_chain, state, event origin, auth_chain, state, event
) )
except AuthError as e: except AuthError as e:
@ -444,7 +444,7 @@ class FederationHandler(BaseHandler):
yield self._handle_new_events(origin, event_infos) yield self._handle_new_events(origin, event_infos)
try: try:
context, event_stream_id, max_stream_id = yield self._handle_new_event( context = yield self._handle_new_event(
origin, origin,
event, event,
state=state, state=state,
@ -469,17 +469,6 @@ class FederationHandler(BaseHandler):
except StoreError: except StoreError:
logger.exception("Failed to store room.") logger.exception("Failed to store room.")
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally # Only fire user_joined_room if the user has acutally
@ -501,7 +490,7 @@ class FederationHandler(BaseHandler):
if newly_joined: if newly_joined:
user = UserID.from_string(event.state_key) user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id) yield self.user_joined_room(user, event.room_id)
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
@ -942,7 +931,7 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id] = [] self.room_queues[room_id] = []
yield self.store.clean_room_for_join(room_id) yield self._clean_room_for_join(room_id)
handled_events = set() handled_events = set()
@ -981,15 +970,10 @@ class FederationHandler(BaseHandler):
# FIXME # FIXME
pass pass
event_stream_id, max_stream_id = yield self._persist_auth_tree( yield self._persist_auth_tree(
origin, auth_chain, state, event origin, auth_chain, state, event
) )
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[joinee]
)
logger.debug("Finished joining %s to %s", joinee, room_id) logger.debug("Finished joining %s to %s", joinee, room_id)
finally: finally:
room_queue = self.room_queues[room_id] room_queue = self.room_queues[room_id]
@ -1084,7 +1068,7 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems. # would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin event.internal_metadata.send_on_behalf_of = origin
context, event_stream_id, max_stream_id = yield self._handle_new_event( context = yield self._handle_new_event(
origin, event origin, event
) )
@ -1094,20 +1078,10 @@ class FederationHandler(BaseHandler):
event.signatures, event.signatures,
) )
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN: if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key) user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id) yield self.user_joined_room(user, event.room_id)
prev_state_ids = yield context.get_prev_state_ids(self.store) prev_state_ids = yield context.get_prev_state_ids(self.store)
@ -1176,17 +1150,7 @@ class FederationHandler(BaseHandler):
) )
context = yield self.state_handler.compute_event_context(event) context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
)
target_user = UserID.from_string(event.state_key)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[target_user],
)
defer.returnValue(event) defer.returnValue(event)
@ -1217,17 +1181,7 @@ class FederationHandler(BaseHandler):
) )
context = yield self.state_handler.compute_event_context(event) context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
)
target_user = UserID.from_string(event.state_key)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[target_user],
)
defer.returnValue(event) defer.returnValue(event)
@ -1318,7 +1272,7 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False event.internal_metadata.outlier = False
context, event_stream_id, max_stream_id = yield self._handle_new_event( yield self._handle_new_event(
origin, event origin, event
) )
@ -1328,16 +1282,6 @@ class FederationHandler(BaseHandler):
event.signatures, event.signatures,
) )
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -1472,9 +1416,8 @@ class FederationHandler(BaseHandler):
event, context event, context
) )
event_stream_id, max_stream_id = yield self.store.persist_event( yield self._persist_events(
event, [(event, context)],
context=context,
backfilled=backfilled, backfilled=backfilled,
) )
except: # noqa: E722, as we reraise the exception this is fine. except: # noqa: E722, as we reraise the exception this is fine.
@ -1487,15 +1430,7 @@ class FederationHandler(BaseHandler):
six.reraise(tp, value, tb) six.reraise(tp, value, tb)
if not backfilled: defer.returnValue(context)
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False): def _handle_new_events(self, origin, event_infos, backfilled=False):
@ -1503,6 +1438,8 @@ class FederationHandler(BaseHandler):
should not depend on one another, e.g. this should be used to persist should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations. on each other for state calculations.
Notifies about the events where appropriate.
""" """
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
@ -1517,7 +1454,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True, ], consumeErrors=True,
)) ))
yield self.store.persist_events( yield self._persist_events(
[ [
(ev_info["event"], context) (ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts) for ev_info, context in zip(event_infos, contexts)
@ -1529,7 +1466,8 @@ class FederationHandler(BaseHandler):
def _persist_auth_tree(self, origin, auth_events, state, event): def _persist_auth_tree(self, origin, auth_events, state, event):
"""Checks the auth chain is valid (and passes auth checks) for the """Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically. state and event. Then persists the auth chain and state atomically.
Persists the event seperately. Persists the event separately. Notifies about the persisted events
where appropriate.
Will attempt to fetch missing auth events. Will attempt to fetch missing auth events.
@ -1540,8 +1478,7 @@ class FederationHandler(BaseHandler):
event (Event) event (Event)
Returns: Returns:
2-tuple of (event_stream_id, max_stream_id) from the persist_event Deferred
call for `event`
""" """
events_to_context = {} events_to_context = {}
for e in itertools.chain(auth_events, state): for e in itertools.chain(auth_events, state):
@ -1605,7 +1542,7 @@ class FederationHandler(BaseHandler):
raise raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
yield self.store.persist_events( yield self._persist_events(
[ [
(e, events_to_context[e.event_id]) (e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state) for e in itertools.chain(auth_events, state)
@ -1616,12 +1553,10 @@ class FederationHandler(BaseHandler):
event, old_state=state event, old_state=state
) )
event_stream_id, max_stream_id = yield self.store.persist_event( yield self._persist_events(
event, new_event_context, [(event, new_event_context)],
) )
defer.returnValue((event_stream_id, max_stream_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None): def _prep_event(self, origin, event, state=None, auth_events=None):
""" """
@ -2347,3 +2282,69 @@ class FederationHandler(BaseHandler):
) )
if "valid" not in response or not response["valid"]: if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid") raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
def _persist_events(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
Args:
event_and_contexts(list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether these events are a result of
backfilling or not
Returns:
Deferred
"""
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
event or not.
Args:
event (FrozenEvent)
max_stream_id (int): The max_stream_id returned by persist_events
"""
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
# We notify for memberships if its an invite for one of our
# users
if event.internal_metadata.is_outlier():
if event.membership != Membership.INVITE:
if not self.is_mine_id(target_user_id):
return
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
elif event.internal_metadata.is_outlier():
return
event_stream_id = event.internal_metadata.stream_ordering
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)
def _clean_room_for_join(self, room_id):
return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
return user_joined_room(self.distributor, user, room_id)

View file

@ -241,12 +241,18 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
self._state_resolution_handler = hs.get_state_resolution_handler() self._state_resolution_handler = hs.get_state_resolution_handler()
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False): def persist_events(self, events_and_contexts, backfilled=False):
""" """
Write events to the database Write events to the database
Args: Args:
events_and_contexts: list of tuples of (event, context) events_and_contexts: list of tuples of (event, context)
backfilled: ? backfilled (bool): Whether the results are retrieved from federation
via backfill or not. Used to determine if they're "new" events
which might update the current state etc.
Returns:
Deferred[int]: the stream ordering of the latest persisted event
""" """
partitioned = {} partitioned = {}
for event, ctx in events_and_contexts: for event, ctx in events_and_contexts:
@ -263,10 +269,14 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
for room_id in partitioned: for room_id in partitioned:
self._maybe_start_persisting(room_id) self._maybe_start_persisting(room_id)
return make_deferred_yieldable( yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True) defer.gatherResults(deferreds, consumeErrors=True)
) )
max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue(max_persisted_id)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event, context, backfilled=False): def persist_event(self, event, context, backfilled=False):