From 1df7c28661207df8575fd519ce9c23690b9156ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 13:36:38 +0100 Subject: [PATCH] Use callbacks to notify tcp replication rather than deferreds --- synapse/notifier.py | 17 +++++++++++------ synapse/replication/tcp/resource.py | 15 +-------------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index f9fcc0ca2..4fda184b7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -163,6 +163,8 @@ class Notifier(object): self.store = hs.get_datastore() self.pending_new_room_events = [] + self.replication_callbacks = [] + self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() @@ -202,6 +204,12 @@ class Notifier(object): lambda: len(self.user_to_user_stream), ) + def add_replication_callback(self, cb): + """Add a callback that will be called when some new data is available. + Callback is not given any arguments. + """ + self.replication_callbacks.append(cb) + @preserve_fn def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): @@ -510,6 +518,9 @@ class Notifier(object): self.replication_deferred = ObservableDeferred(defer.Deferred()) deferred.callback(None) + for cb in self.replication_callbacks: + preserve_fn(cb)() + @defer.inlineCallbacks def wait_for_replication(self, callback, timeout): """Wait for an event to happen. @@ -550,9 +561,3 @@ class Notifier(object): break defer.returnValue(result) - - def wait_once_for_replication(self): - """Returns a deferred which resolves when there is new data for - replication to handle. - """ - return self.replication_deferred.observe() diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 243a81d48..b70fa7334 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -21,7 +21,6 @@ from twisted.internet.protocol import Factory from streams import STREAMS_MAP, FederationStream from protocol import ServerReplicationStreamProtocol -from synapse.util.logcontext import preserve_fn from synapse.util.metrics import Measure, measure_func import logging @@ -66,7 +65,6 @@ class ReplicationStreamer(object): def __init__(self, hs): self.store = hs.get_datastore() - self.notifier = hs.get_notifier() self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() @@ -101,8 +99,7 @@ class ReplicationStreamer(object): if not hs.config.send_federation: self.federation_sender = hs.get_federation_sender() - # Start listening for updates from the notifier - preserve_fn(self.notifier_listener)() + hs.get_notifier().add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False @@ -115,16 +112,6 @@ class ReplicationStreamer(object): for conn in self.connections: conn.send_error("server shutting down") - @defer.inlineCallbacks - def notifier_listener(self): - """Sits forever looping on the notifier waiting for new data. - """ - while True: - yield self.notifier.wait_once_for_replication() - logger.debug("Woken up by notifier") - # We need to call this each time we get woken up, as per docstring - preserve_fn(self.on_notifier_poke)() - @defer.inlineCallbacks def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the