forked from MirrorHub/synapse
Limit in flight DNS requests
This is to work around a bug in twisted where a large number of concurrent DNS requests cause it to tight loop forever. c.f. https://twistedmatrix.com/trac/ticket/9620#ticket
This commit is contained in:
parent
bfc8fdf1fc
commit
a0fc256d65
1 changed files with 82 additions and 1 deletions
|
@ -22,13 +22,14 @@ import traceback
|
||||||
import psutil
|
import psutil
|
||||||
from daemonize import Daemonize
|
from daemonize import Daemonize
|
||||||
|
|
||||||
from twisted.internet import error, reactor
|
from twisted.internet import defer, error, reactor
|
||||||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
from synapse.app import check_bind_error
|
from synapse.app import check_bind_error
|
||||||
from synapse.crypto import context_factory
|
from synapse.crypto import context_factory
|
||||||
from synapse.util import PreserveLoggingContext
|
from synapse.util import PreserveLoggingContext
|
||||||
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.rlimit import change_resource_limit
|
from synapse.util.rlimit import change_resource_limit
|
||||||
from synapse.util.versionstring import get_version_string
|
from synapse.util.versionstring import get_version_string
|
||||||
|
|
||||||
|
@ -99,6 +100,8 @@ def start_reactor(
|
||||||
logger (logging.Logger): logger instance to pass to Daemonize
|
logger (logging.Logger): logger instance to pass to Daemonize
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
install_dns_limiter(reactor)
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
# make sure that we run the reactor with the sentinel log context,
|
# make sure that we run the reactor with the sentinel log context,
|
||||||
# otherwise other PreserveLoggingContext instances will get confused
|
# otherwise other PreserveLoggingContext instances will get confused
|
||||||
|
@ -312,3 +315,81 @@ def setup_sentry(hs):
|
||||||
name = hs.config.worker_name if hs.config.worker_name else "master"
|
name = hs.config.worker_name if hs.config.worker_name else "master"
|
||||||
scope.set_tag("worker_app", app)
|
scope.set_tag("worker_app", app)
|
||||||
scope.set_tag("worker_name", name)
|
scope.set_tag("worker_name", name)
|
||||||
|
|
||||||
|
|
||||||
|
def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
|
||||||
|
"""Replaces the resolver with one that limits the number of in flight DNS
|
||||||
|
requests.
|
||||||
|
|
||||||
|
This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
|
||||||
|
can run out of file descriptors and infinite loop if we attempt to do too
|
||||||
|
many DNS queries at once
|
||||||
|
"""
|
||||||
|
new_resolver = _LimitedHostnameResolver(
|
||||||
|
reactor.nameResolver, max_dns_requests_in_flight,
|
||||||
|
)
|
||||||
|
|
||||||
|
reactor.installNameResolver(new_resolver)
|
||||||
|
|
||||||
|
|
||||||
|
class _LimitedHostnameResolver(object):
|
||||||
|
"""Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, resolver, max_dns_requests_in_flight):
|
||||||
|
self._resolver = resolver
|
||||||
|
self._limiter = Linearizer(
|
||||||
|
name="dns_client_limiter", max_count=max_dns_requests_in_flight,
|
||||||
|
)
|
||||||
|
|
||||||
|
def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
|
||||||
|
addressTypes=None, transportSemantics='TCP'):
|
||||||
|
# Note this is happening deep within the reactor, so we don't need to
|
||||||
|
# worry about log contexts.
|
||||||
|
|
||||||
|
# We need this function to return `resolutionReceiver` so we do all the
|
||||||
|
# actual logic involving deferreds in a separate function.
|
||||||
|
self._resolve(
|
||||||
|
resolutionReceiver, hostName, portNumber,
|
||||||
|
addressTypes, transportSemantics,
|
||||||
|
)
|
||||||
|
|
||||||
|
return resolutionReceiver
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _resolve(self, resolutionReceiver, hostName, portNumber=0,
|
||||||
|
addressTypes=None, transportSemantics='TCP'):
|
||||||
|
|
||||||
|
with (yield self._limiter.queue(())):
|
||||||
|
# resolveHostName doesn't return a Deferred, so we need to hook into
|
||||||
|
# the receiver interface to get told when resolution has finished.
|
||||||
|
|
||||||
|
deferred = defer.Deferred()
|
||||||
|
receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
|
||||||
|
|
||||||
|
self._resolver.resolveHostName(
|
||||||
|
receiver, hostName, portNumber,
|
||||||
|
addressTypes, transportSemantics,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield deferred
|
||||||
|
|
||||||
|
|
||||||
|
class _DeferredResolutionReceiver(object):
|
||||||
|
"""Wraps a IResolutionReceiver and simply resolves the given deferred when
|
||||||
|
resolution is complete
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, receiver, deferred):
|
||||||
|
self._receiver = receiver
|
||||||
|
self._deferred = deferred
|
||||||
|
|
||||||
|
def resolutionBegan(self, resolutionInProgress):
|
||||||
|
self._receiver.resolutionBegan(resolutionInProgress)
|
||||||
|
|
||||||
|
def addressResolved(self, address):
|
||||||
|
self._receiver.addressResolved(address)
|
||||||
|
|
||||||
|
def resolutionComplete(self):
|
||||||
|
self._deferred.callback(())
|
||||||
|
self._receiver.resolutionComplete()
|
||||||
|
|
Loading…
Reference in a new issue