diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 52d97dfbf..63e633548 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,7 +43,6 @@ from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination -from synapse.push.action_generator import ActionGenerator from synapse.util.distributor import user_joined_room from twisted.internet import defer @@ -75,6 +74,7 @@ class FederationHandler(BaseHandler): self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() + self.action_generator = hs.get_action_generator() self.replication_layer.set_handler(self) @@ -1389,8 +1389,7 @@ class FederationHandler(BaseHandler): ) if not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( + yield self.action_generator.handle_push_actions_for_event( event, context ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ba8776f28..a04f634c5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.push.action_generator import ActionGenerator from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) @@ -54,7 +53,7 @@ class MessageHandler(BaseHandler): # This is to stop us from diverging history *too* much. self.limiter = Limiter(max_count=5) - self.action_generator = ActionGenerator(self.hs) + self.action_generator = hs.get_action_generator() @defer.inlineCallbacks def purge_history(self, room_id, event_id): diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 0658497d9..fe09d50d5 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -24,7 +24,7 @@ import logging logger = logging.getLogger(__name__) -class ActionGenerator: +class ActionGenerator(object): def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 015802691..760d567ca 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -24,6 +24,8 @@ from synapse.api.constants import EventTypes, Membership from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer +from collections import namedtuple + logger = logging.getLogger(__name__) @@ -31,7 +33,7 @@ logger = logging.getLogger(__name__) rules_by_room = {} -class BulkPushRuleEvaluator: +class BulkPushRuleEvaluator(object): """Calculates the outcome of push rules for an event for all users in the room at once. """ @@ -204,12 +206,7 @@ class RulesForRoom(object): # To get around this we pass a function that on invalidations looks ups # the RoomsForUser entry in the cache, rather than keeping a reference # to self around in the callback. - def invalidate_all_cb(): - rules = rules_for_room_cache.get(room_id, update_metrics=False) - if rules: - rules.invalidate_all() - - self.invalidate_all_cb = invalidate_all_cb + self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id) @defer.inlineCallbacks def get_rules(self, context): @@ -347,3 +344,15 @@ class RulesForRoom(object): self.member_map.update(members) self.rules_by_user = rules_by_user self.state_group = state_group + + +class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))): + # We rely on _CacheContext implementing __eq__ and __hash__ sensibly, + # which namedtuple does for us (i.e. two _CacheContext are the same if + # their caches and keys match). This is important in particular to + # dedupe when we add callbacks to lru cache nodes, otherwise the number + # of callbacks would grow. + def __call__(self): + rules = self.cache.get(self.room_id, None, update_metrics=False) + if rules: + rules.invalidate_all() diff --git a/synapse/server.py b/synapse/server.py index 12754c89a..e400e278c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -52,6 +52,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier +from synapse.push.action_generator import ActionGenerator from synapse.push.pusherpool import PusherPool from synapse.rest.media.v1.media_repository import MediaRepository from synapse.state import StateHandler @@ -135,6 +136,7 @@ class HomeServer(object): 'macaroon_generator', 'tcp_replication', 'read_marker_handler', + 'action_generator', ] def __init__(self, hostname, **kwargs): @@ -299,6 +301,9 @@ class HomeServer(object): def build_tcp_replication(self): raise NotImplementedError() + def build_action_generator(self): + return ActionGenerator(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)