diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21f1717dd2..afa19bf653 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -47,21 +47,9 @@ class MessageHandler(BaseHandler): self.hs = hs self.state = hs.get_state_handler() self.clock = hs.get_clock() - self.validator = EventValidator() - self.profile_handler = hs.get_profile_handler() self.pagination_lock = ReadWriteLock() - self.pusher_pool = hs.get_pusherpool() - - # We arbitrarily limit concurrent event creation for a room to 5. - # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) - - self.action_generator = hs.get_action_generator() - - self.spam_checker = hs.get_spam_checker() - @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -182,166 +170,6 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) - @defer.inlineCallbacks - def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_event_ids=None): - """ - Given a dict from a client, create a new event. - - Creates an FrozenEvent object, filling out auth_events, prev_events, - etc. - - Adds display names to Join membership events. - - Args: - requester - event_dict (dict): An entire event - token_id (str) - txn_id (str) - prev_event_ids (list): The prev event ids to use when creating the event - - Returns: - Tuple of created event (FrozenEvent), Context - """ - builder = self.event_builder_factory.new(event_dict) - - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) - - if token_id is not None: - builder.internal_metadata.token_id = token_id - - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id - - event, context = yield self._create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) - - defer.returnValue((event, context)) - - @defer.inlineCallbacks - def send_nonmember_event(self, requester, event, context, ratelimit=True): - """ - Persists and notifies local clients and federation of an event. - - Args: - event (FrozenEvent) the event to send. - context (Context) the context of the event. - ratelimit (bool): Whether to rate limit this send. - is_guest (bool): Whether the sender is a guest. - """ - if event.type == EventTypes.Member: - raise SynapseError( - 500, - "Tried to send member event through non-member codepath" - ) - - # We check here if we are currently being rate limited, so that we - # don't do unnecessary work. We check again just before we actually - # send the event. - yield self.ratelimit(requester, update=False) - - user = UserID.from_string(event.sender) - - assert self.hs.is_mine(user), "User must be our own: %s" % (user,) - - if event.is_state(): - prev_state = yield self.deduplicate_state_event(event, context) - if prev_state is not None: - defer.returnValue(prev_state) - - yield self.handle_new_client_event( - requester=requester, - event=event, - context=context, - ratelimit=ratelimit, - ) - - if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() - # We don't want to block sending messages on any presence code. This - # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(user) - - @defer.inlineCallbacks - def deduplicate_state_event(self, event, context): - """ - Checks whether event is in the latest resolved state in context. - - If so, returns the version of the event in context. - Otherwise, returns None. - """ - prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) - prev_event = yield self.store.get_event(prev_event_id, allow_none=True) - if not prev_event: - return - - if prev_event and event.user_id == prev_event.user_id: - prev_content = encode_canonical_json(prev_event.content) - next_content = encode_canonical_json(event.content) - if prev_content == next_content: - defer.returnValue(prev_event) - return - - @defer.inlineCallbacks - def create_and_send_nonmember_event( - self, - requester, - event_dict, - ratelimit=True, - txn_id=None - ): - """ - Creates an event, then sends it. - - See self.create_event and self.send_nonmember_event. - """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN - ) - - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) - defer.returnValue(event) - @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, event_type=None, state_key="", is_guest=False): @@ -470,6 +298,194 @@ class MessageHandler(BaseHandler): for user_id, profile in users_with_profile.iteritems() }) + +class EventCreationHandler(object): + def __init__(self, hs): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.clock = hs.get_clock() + self.validator = EventValidator() + self.profile_handler = hs.get_profile_handler() + self.event_builder_factory = hs.get_event_builder_factory() + self.server_name = hs.hostname + self.ratelimiter = hs.get_ratelimiter() + self.notifier = hs.get_notifier() + + # This is only used to get at ratelimit function, and maybe_kick_guest_users + self.base_handler = BaseHandler(hs) + + self.pusher_pool = hs.get_pusherpool() + + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. + self.limiter = Limiter(max_count=5) + + self.action_generator = hs.get_action_generator() + + self.spam_checker = hs.get_spam_checker() + + @defer.inlineCallbacks + def create_event(self, requester, event_dict, token_id=None, txn_id=None, + prev_event_ids=None): + """ + Given a dict from a client, create a new event. + + Creates an FrozenEvent object, filling out auth_events, prev_events, + etc. + + Adds display names to Join membership events. + + Args: + requester + event_dict (dict): An entire event + token_id (str) + txn_id (str) + prev_event_ids (list): The prev event ids to use when creating the event + + Returns: + Tuple of created event (FrozenEvent), Context + """ + builder = self.event_builder_factory.new(event_dict) + + with (yield self.limiter.queue(builder.room_id)): + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) + + if token_id is not None: + builder.internal_metadata.token_id = token_id + + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id + + event, context = yield self._create_new_client_event( + builder=builder, + requester=requester, + prev_event_ids=prev_event_ids, + ) + + defer.returnValue((event, context)) + + @defer.inlineCallbacks + def send_nonmember_event(self, requester, event, context, ratelimit=True): + """ + Persists and notifies local clients and federation of an event. + + Args: + event (FrozenEvent) the event to send. + context (Context) the context of the event. + ratelimit (bool): Whether to rate limit this send. + is_guest (bool): Whether the sender is a guest. + """ + if event.type == EventTypes.Member: + raise SynapseError( + 500, + "Tried to send member event through non-member codepath" + ) + + # We check here if we are currently being rate limited, so that we + # don't do unnecessary work. We check again just before we actually + # send the event. + yield self.base_handler.ratelimit(requester, update=False) + + user = UserID.from_string(event.sender) + + assert self.hs.is_mine(user), "User must be our own: %s" % (user,) + + if event.is_state(): + prev_state = yield self.deduplicate_state_event(event, context) + if prev_state is not None: + defer.returnValue(prev_state) + + yield self.handle_new_client_event( + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + ) + + if event.type == EventTypes.Message: + presence = self.hs.get_presence_handler() + # We don't want to block sending messages on any presence code. This + # matters as sometimes presence code can take a while. + preserve_fn(presence.bump_presence_active_time)(user) + + @defer.inlineCallbacks + def deduplicate_state_event(self, event, context): + """ + Checks whether event is in the latest resolved state in context. + + If so, returns the version of the event in context. + Otherwise, returns None. + """ + prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + if not prev_event: + return + + if prev_event and event.user_id == prev_event.user_id: + prev_content = encode_canonical_json(prev_event.content) + next_content = encode_canonical_json(event.content) + if prev_content == next_content: + defer.returnValue(prev_event) + return + + @defer.inlineCallbacks + def create_and_send_nonmember_event( + self, + requester, + event_dict, + ratelimit=True, + txn_id=None + ): + """ + Creates an event, then sends it. + + See self.create_event and self.send_nonmember_event. + """ + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id + ) + + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, basestring): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) + defer.returnValue(event) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): @@ -509,9 +525,7 @@ class MessageHandler(BaseHandler): builder.prev_events = prev_events builder.depth = depth - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) + context = yield self.state.compute_event_context(builder) if requester: context.app_service = requester.app_service @@ -551,7 +565,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - yield self.ratelimit(requester) + yield self.base_handler.ratelimit(requester) try: yield self.auth.check_from_context(event, context) @@ -567,7 +581,7 @@ class MessageHandler(BaseHandler): logger.exception("Failed to encode content: %r", event.content) raise - yield self.maybe_kick_guest_users(event, context) + yield self.base_handler.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: # Check the alias is acually valid (at this time at least)