Remove all global reactor imports & pass it around explicitly (#3424)

This commit is contained in:
Amber Brown 2018-06-25 14:08:28 +01:00 committed by GitHub
parent 1d009013b3
commit 07cad26d65
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 29 additions and 22 deletions

View file

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet import defer, reactor from twisted.internet import defer
from ._base import BaseHandler from ._base import BaseHandler
from synapse.types import UserID, create_requester from synapse.types import UserID, create_requester
@ -39,7 +39,7 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where # Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do). # it left off (if it has work left to do).
reactor.callWhenRunning(self._start_user_parting) hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks @defer.inlineCallbacks
def deactivate_account(self, user_id): def deactivate_account(self, user_id):

View file

@ -20,7 +20,7 @@ import sys
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
import six import six
from six import string_types, itervalues, iteritems from six import string_types, itervalues, iteritems
from twisted.internet import defer, reactor from twisted.internet import defer
from twisted.internet.defer import succeed from twisted.internet.defer import succeed
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -157,7 +157,7 @@ class MessageHandler(BaseHandler):
# remove the purge from the list 24 hours after it completes # remove the purge from the list 24 hours after it completes
def clear_purge(): def clear_purge():
del self._purges_by_id[purge_id] del self._purges_by_id[purge_id]
reactor.callLater(24 * 3600, clear_purge) self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id): def get_purge_status(self, purge_id):
"""Get the current status of an active purge """Get the current status of an active purge

View file

@ -22,7 +22,7 @@ The methods that define policy are:
- should_notify - should_notify
""" """
from twisted.internet import defer, reactor from twisted.internet import defer
from contextlib import contextmanager from contextlib import contextmanager
from six import itervalues, iteritems from six import itervalues, iteritems
@ -179,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted # have not yet been persisted
self.unpersisted_users_changes = set() self.unpersisted_users_changes = set()
reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {} self.serial_to_user = {}
self._next_serial = 1 self._next_serial = 1

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor from twisted.internet import defer
from twisted.internet.error import ConnectError from twisted.internet.error import ConnectError
from twisted.names import client, dns from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError from twisted.names.error import DNSNameError, DomainError
@ -78,17 +78,18 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
else: else:
return _WrappingEndpointFac(transport_endpoint( return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args reactor, domain, port, **endpoint_kw_args
)) ), reactor)
class _WrappingEndpointFac(object): class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac): def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac self.endpoint_fac = endpoint_fac
self.reactor = reactor
@defer.inlineCallbacks @defer.inlineCallbacks
def connect(self, protocolFactory): def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory) conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn) conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn) defer.returnValue(conn)
@ -98,9 +99,10 @@ class _WrappedConnection(object):
""" """
__slots__ = ["conn", "last_request"] __slots__ = ["conn", "last_request"]
def __init__(self, conn): def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn) object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time()) object.__setattr__(self, "last_request", time.time())
self._reactor = reactor
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self.conn, name) return getattr(self.conn, name)
@ -131,14 +133,14 @@ class _WrappedConnection(object):
# Time this connection out if we haven't send a request in the last # Time this connection out if we haven't send a request in the last
# N minutes # N minutes
# TODO: Cancel the previous callLater? # TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe) self._reactor.callLater(3 * 60, self._time_things_out_maybe)
d = self.conn.request(request) d = self.conn.request(request)
def update_request_time(res): def update_request_time(res):
self.last_request = time.time() self.last_request = time.time()
# TODO: Cancel the previous callLater? # TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe) self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res return res
d.addCallback(update_request_time) d.addCallback(update_request_time)

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet import defer, reactor from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled from twisted.internet.error import AlreadyCalled, AlreadyCancelled
import logging import logging
@ -199,7 +199,7 @@ class EmailPusher(object):
self.timed_call = None self.timed_call = None
if soonest_due_at is not None: if soonest_due_at is not None:
self.timed_call = reactor.callLater( self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer self.seconds_until(soonest_due_at), self.on_timer
) )

View file

@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from twisted.internet import defer, reactor from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator from . import push_rule_evaluator
@ -220,7 +220,9 @@ class HttpPusher(object):
) )
else: else:
logger.info("Push failed: delaying for %ds", self.backoff_delay) logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) self.timed_call = self.hs.get_reactor().callLater(
self.backoff_delay, self.on_timer
)
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break break

View file

@ -15,7 +15,7 @@
"""A replication client for use by synapse workers. """A replication client for use by synapse workers.
""" """
from twisted.internet import reactor, defer from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet.protocol import ReconnectingClientFactory
from .commands import ( from .commands import (
@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
self.server_name = hs.config.server_name self.server_name = hs.config.server_name
self._clock = hs.get_clock() # As self.clock is defined in super class self._clock = hs.get_clock() # As self.clock is defined in super class
reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying) hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
def startedConnecting(self, connector): def startedConnecting(self, connector):
logger.info("Connecting to replication: %r", connector.getDestination()) logger.info("Connecting to replication: %r", connector.getDestination())
@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
factory = ReplicationClientFactory(hs, client_name, self) factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host host = hs.config.worker_replication_host
port = hs.config.worker_replication_port port = hs.config.worker_replication_port
reactor.connectTCP(host, port, factory) hs.get_reactor().connectTCP(host, port, factory)
def on_rdata(self, stream_name, token, rows): def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes """Called when we get new replication data. By default this just pokes

View file

@ -15,7 +15,7 @@
"""The server side of the replication stream. """The server side of the replication stream.
""" """
from twisted.internet import defer, reactor from twisted.internet import defer
from twisted.internet.protocol import Factory from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream from .streams import STREAMS_MAP, FederationStream
@ -109,7 +109,7 @@ class ReplicationStreamer(object):
self.is_looping = False self.is_looping = False
self.pending_updates = False self.pending_updates = False
reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown) hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
def on_shutdown(self): def on_shutdown(self):
# close all connections on shutdown # close all connections on shutdown

View file

@ -34,6 +34,9 @@ def unwrapFirstError(failure):
class Clock(object): class Clock(object):
""" """
A Clock wraps a Twisted reactor and provides utilities on top of it. A Clock wraps a Twisted reactor and provides utilities on top of it.
Args:
reactor: The Twisted reactor to use.
""" """
_reactor = attr.ib() _reactor = attr.ib()