From f164fd922024308e702269a881328f7de980e9eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 14:07:27 +0100 Subject: [PATCH] Move _bulk_get_push_rules_for_room to storage layer --- synapse/push/action_generator.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 41 ++++------------- synapse/storage/push_rule.py | 56 ++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 34 deletions(-) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index b2c94bfaa..ed2ccc4df 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,7 +40,7 @@ class ActionGenerator: def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "evaluator_for_event"): bulk_evaluator = yield evaluator_for_event( - event, self.hs, self.store, context.current_state + event, self.hs, self.store, context.state_group, context.current_state ) with Measure(self.clock, "action_for_event_by_user"): diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 756e5da51..004eded61 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -36,35 +36,11 @@ def _get_rules(room_id, user_ids, store): @defer.inlineCallbacks -def evaluator_for_event(event, hs, store, current_state): - room_id = event.room_id - # We also will want to generate notifs for other people in the room so - # their unread countss are correct in the event stream, but to avoid - # generating them for bot / AS users etc, we only do so for people who've - # sent a read receipt into the room. - - local_users_in_room = set( - e.state_key for e in current_state.values() - if e.type == EventTypes.Member and e.membership == Membership.JOIN - and hs.is_mine_id(e.state_key) +def evaluator_for_event(event, hs, store, state_group, current_state): + rules_by_user = yield store.bulk_get_push_rules_for_room( + event.room_id, state_group, current_state ) - # users in the room who have pushers need to get push rules run because - # that's how their pushers work - if_users_with_pushers = yield store.get_if_users_have_pushers( - local_users_in_room - ) - user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher - ) - - users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id) - - # any users with pushers must be ours: they have pushers - for uid in users_with_receipts: - if uid in local_users_in_room: - user_ids.add(uid) - # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited if event.type == 'm.room.member' and event.content['membership'] == 'invite': @@ -72,12 +48,12 @@ def evaluator_for_event(event, hs, store, current_state): if invited_user and hs.is_mine_id(invited_user): has_pusher = yield store.user_has_pusher(invited_user) if has_pusher: - user_ids.add(invited_user) - - rules_by_user = yield _get_rules(room_id, user_ids, store) + rules_by_user[invited_user] = yield store.get_push_rules_for_user( + invited_user + ) defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, user_ids, store + event.room_id, rules_by_user, store )) @@ -90,10 +66,9 @@ class BulkPushRuleEvaluator: the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ - def __init__(self, room_id, rules_by_user, users_in_room, store): + def __init__(self, room_id, rules_by_user, store): self.room_id = room_id self.rules_by_user = rules_by_user - self.users_in_room = users_in_room self.store = store @defer.inlineCallbacks diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 86e4a3a81..ca929bc23 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.push.baserules import list_with_base_rules +from synapse.api.constants import EventTypes, Membership from twisted.internet import defer import logging @@ -123,6 +124,61 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(results) + def bulk_get_push_rules_for_room(self, room_id, state_group, current_state): + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + return self._bulk_get_push_rules_for_room(room_id, state_group, current_state) + + @cachedInlineCallbacks(num_args=2) + def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state, + cache_context): + # We don't use `state_group`, its there so that we can cache based + # on it. However, its important that its never None, since two current_state's + # with a state_group of None are likely to be different. + # See bulk_get_push_rules_for_room for how we work around this. + assert state_group is not None + + # We also will want to generate notifs for other people in the room so + # their unread countss are correct in the event stream, but to avoid + # generating them for bot / AS users etc, we only do so for people who've + # sent a read receipt into the room. + local_users_in_room = set( + e.state_key for e in current_state.values() + if e.type == EventTypes.Member and e.membership == Membership.JOIN + and self.hs.is_mine_id(e.state_key) + ) + + # users in the room who have pushers need to get push rules run because + # that's how their pushers work + if_users_with_pushers = yield self.get_if_users_have_pushers( + local_users_in_room, cache_context=cache_context, + ) + user_ids = set( + uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher + ) + + users_with_receipts = yield self.get_users_with_read_receipts_in_room( + room_id, cache_context=cache_context, + ) + + # any users with pushers must be ours: they have pushers + for uid in users_with_receipts: + if uid in local_users_in_room: + user_ids.add(uid) + + rules_by_user = yield self.bulk_get_push_rules( + user_ids, cache_context=cache_context + ) + + rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} + + defer.returnValue(rules_by_user) + @cachedList(cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules_enabled(self, user_ids):