Compare commits

...

42 commits

Author SHA1 Message Date
Amber H. Brown ecd4f2341c gMerge branch 'hawkowl/fix-media-repo' into hawkowl/federation-limiter 2019-07-02 22:21:45 +10:00
Amber H. Brown 6dd1ad36e9 Merge remote-tracking branch 'origin/develop' into hawkowl/federation-limiter 2019-07-02 22:21:26 +10:00
Amber H. Brown 02e8e895aa try this? 2019-07-02 22:21:11 +10:00
Amber H. Brown 0b95b49401 try this? 2019-07-02 21:49:55 +10:00
Amber H. Brown d7ac3a57ae Merge branch 'hawkowl/more-aggressive-no-retry' into hawkowl/federation-limiter 2019-07-01 22:11:24 +10:00
Amber H. Brown f1467e5972 handle more things 2019-07-01 22:11:15 +10:00
Amber H. Brown 45c0117abf cancel in connect 2019-07-01 22:07:19 +10:00
Amber H. Brown 8a58b9272b Merge branch 'hawkowl/more-aggressive-no-retry' into hawkowl/federation-limiter 2019-07-01 21:49:52 +10:00
Amber H. Brown 30ed28417e make logging a little better 2019-07-01 21:47:17 +10:00
Amber H. Brown 11c8e23419 Merge branch 'hawkowl/more-aggressive-no-retry' into hawkowl/federation-limiter 2019-07-01 21:37:59 +10:00
Amber H. Brown a82457c5a3 make logging a little better 2019-07-01 21:36:38 +10:00
Amber H. Brown d08ce1b4bf make logging a little better 2019-07-01 21:35:25 +10:00
Amber H. Brown e6ad608a95 fix small bug 2019-07-01 21:29:29 +10:00
Amber H. Brown 3960220cf2 Merge branch 'hawkowl/more-aggressive-no-retry' into hawkowl/federation-limiter 2019-07-01 21:04:57 +10:00
Amber H. Brown 84cbc5019c make the aggressiveness more configurable 2019-07-01 20:57:27 +10:00
Amber H. Brown 535e8ea79f fix some logs 2019-06-27 19:09:59 +10:00
Amber H. Brown 69490ad447 lint 2019-06-26 15:27:28 +10:00
Amber H. Brown ad0d7d4c9f changelog 2019-06-26 15:25:27 +10:00
Amber H. Brown bdad7ffaff fixes 2019-06-26 15:25:20 +10:00
Amber H. Brown 411e4d8f54 be a bit more aggressive with not retrying likely-dead hosts 2019-06-26 15:21:21 +10:00
Amber H. Brown f591732f9b revert the well known and just use the loop 2019-06-24 11:52:30 +10:00
Amber H. Brown 10c6a180b2 try and fix DNS and socket errors from being retried 2019-06-24 11:16:46 +10:00
Amber H. Brown 7d35b7603f try and fix DNS and socket errors from being retried 2019-06-24 11:12:12 +10:00
Amber H. Brown 04b5175fcf try and fix DNS and socket errors from being retried 2019-06-24 11:11:32 +10:00
Amber H. Brown 7bb35a3288 try and fix DNS and socket errors from being retried 2019-06-24 11:10:14 +10:00
Amber H. Brown 929b2e4e1e try and fix DNS and socket errors from being retried 2019-06-24 10:56:24 +10:00
Amber H. Brown 4da90de3b2 try and fix DNS and socket errors from being retried 2019-06-24 10:52:45 +10:00
Amber H. Brown 32eb3fca89 try and fix DNS and socket errors from being retried 2019-06-24 10:42:16 +10:00
Amber H. Brown 0bcdce2237 try and fix DNS and socket errors from being retried 2019-06-24 10:36:10 +10:00
Amber H. Brown c59f43d325 try and fix DNS and socket errors from being retried 2019-06-24 09:51:02 +10:00
Amber H. Brown 9ab19d1ed7 try and fix DNS and socket errors from being retried 2019-06-24 09:31:47 +10:00
Amber H. Brown 79b380c37a try and fix DNS and socket errors from being retried 2019-06-24 09:25:22 +10:00
Amber H. Brown 3aa6008925 try and fix DNS and socket errors from being retried 2019-06-24 08:54:39 +10:00
Amber H. Brown b67c8ede83 try and fix DNS and socket errors from being retried 2019-06-24 08:53:25 +10:00
Amber H. Brown baef252b1e try and fix DNS and socket errors from being retried 2019-06-24 08:51:34 +10:00
Amber H. Brown fd53a11451 nix based image 2019-06-24 08:27:51 +10:00
Amber H. Brown 436decd055 nix based image 2019-06-24 08:26:12 +10:00
Amber H. Brown 5e7abc1d8e nix based image 2019-06-24 08:19:40 +10:00
Amber H. Brown ca3eca49d8 track this a bit better 2019-06-24 08:19:30 +10:00
Amber H. Brown 1241094993 try this? 2019-06-24 06:38:26 +10:00
Amber H. Brown 40142497b7 try this? 2019-06-24 06:22:19 +10:00
Amber H. Brown 9abaa52ac1 add some limiting 2019-06-24 04:24:32 +10:00
13 changed files with 200 additions and 9 deletions

1
changelog.d/5556.misc Normal file
View file

@ -0,0 +1 @@
Treat more outgoing federation connection failures (like refused connection, dead domains, and no route to host) as fatal and not able to be retried immediately.

52
docker/Dockerfile-arm64v8 Normal file
View file

@ -0,0 +1,52 @@
FROM alpine
# Enable HTTPS support in wget.
RUN apk add --no-cache --update openssl su-exec
# Download Nix and install it into the system.
RUN wget https://nixos.org/releases/nix/nix-2.2.1/nix-2.2.1-$(uname -m)-linux.tar.bz2 \
&& tar xjf nix-*-$(uname -m)-linux.tar.bz2 \
&& addgroup -g 30000 -S nixbld \
&& for i in $(seq 1 30); do adduser -S -D -h /var/empty -g "Nix build user $i" -u $((30000 + i)) -G nixbld nixbld$i ; done \
&& mkdir -m 0755 /etc/nix \
&& echo 'sandbox = false' > /etc/nix/nix.conf \
&& mkdir -m 0755 /nix && USER=root sh nix-*-$(uname -m)-linux/install \
&& ln -s /nix/var/nix/profiles/default/etc/profile.d/nix.sh /etc/profile.d/ \
&& rm -r /nix-*-$(uname -m)-linux* \
&& rm -rf /var/cache/apk/* \
&& /nix/var/nix/profiles/default/bin/nix-collect-garbage --delete-old \
&& /nix/var/nix/profiles/default/bin/nix-store --optimise \
&& /nix/var/nix/profiles/default/bin/nix-store --verify --check-contents
ENV \
ENV=/etc/profile \
USER=root \
PATH=/nix/var/nix/profiles/default/bin:/nix/var/nix/profiles/default/sbin:/bin:/sbin:/usr/bin:/usr/sbin \
GIT_SSL_CAINFO=/nix/var/nix/profiles/default/etc/ssl/certs/ca-bundle.crt \
NIX_SSL_CERT_FILE=/nix/var/nix/profiles/default/etc/ssl/certs/ca-bundle.crt \
NIX_PATH=/nix/var/nix/profiles/per-user/root/channels \
PYTHONPATH=/nix/var/nix/profiles/default/lib/python3.7/site-packages/
RUN nix-channel --add https://nixos.org/channels/nixpkgs-unstable && nix-channel --update
COPY ./docker/synapse-deps.nix /synapse-deps.nix
RUN nix-env -if /synapse-deps.nix
# now install synapse and all of the python deps to /install.
COPY synapse /synapse/synapse/
COPY scripts /synapse/scripts/
COPY MANIFEST.in README.rst setup.py synctl /synapse/
RUN pip install --no-warn-script-location "/synapse"[all]
COPY ./docker/start.py /start.py
COPY ./docker/conf /conf
VOLUME ["/data"]
EXPOSE 8008/tcp 8009/tcp 8448/tcp
CMD ["/nix/var/nix/profiles/default/bin/python3", "/start.py"]

3
docker/synapse-deps.nix Normal file
View file

@ -0,0 +1,3 @@
with import <nixpkgs> {};
python37.withPackages (ps: with ps; [ phonenumbers canonicaljson pymacaroons psycopg2 psutil parameterized pillow lxml pysaml2 bleach service-identity six frozendict pip idna pyasn1-modules netaddr jsonschema pynacl signedjson pyyaml pyopenssl mock bcrypt jinja2 msgpack attrs twisted pyasn1 treq sortedcontainers daemonize unpaddedbase64 ])

View file

@ -18,6 +18,7 @@
import logging
import os.path
import attr
from netaddr import IPSet
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@ -324,6 +325,23 @@ class ServerConfig(Config):
"cleanup_extremities_with_dummy_events", False
)
@attr.s
class FederationBackoffSettings(object):
dns_resolution = attr.ib(default=False, type=bool)
dns_servfail = attr.ib(default=False, type=bool)
no_route_to_host = attr.ib(default=False, type=bool)
refused_connection = attr.ib(default=False, type=bool)
cannot_assign_address = attr.ib(default=False, type=bool)
timeout_amount = attr.ib(default=60, type=int)
on_timeout = attr.ib(default=False, type=bool)
invalid_tls = attr.ib(default=True, type=bool)
federation_backoff_settings = config.get("federation_backoff", {})
self.federation_backoff_settings = FederationBackoffSettings(
**federation_backoff_settings
)
def has_tls_listener(self):
return any(l["tls"] for l in self.listeners)

View file

@ -30,6 +30,7 @@ from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable
from synapse.storage import UserPresenceState
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@ -65,6 +66,7 @@ class PerDestinationQueue(object):
self._server_name = hs.hostname
self._clock = hs.get_clock()
self._store = hs.get_datastore()
self._reactor = hs.get_reactor()
self._transaction_manager = transaction_manager
self._destination = destination
@ -178,6 +180,7 @@ class PerDestinationQueue(object):
@defer.inlineCallbacks
def _transaction_transmission_loop(self):
acquired_lock = None
pending_pdus = []
try:
self.transmission_loop_running = True
@ -202,6 +205,9 @@ class PerDestinationQueue(object):
yield self._get_to_device_message_edus(limit)
)
# Lock before we start doing work
acquired_lock = yield make_deferred_yieldable(self._transaction_manager.limiter.acquire())
pending_edus = device_update_edus + to_device_edus
# BEGIN CRITICAL SECTION
@ -270,6 +276,7 @@ class PerDestinationQueue(object):
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
@ -295,6 +302,10 @@ class PerDestinationQueue(object):
self._last_device_list_stream_id = dev_list_id
else:
break
# Release the lock after all the work is done
self._reactor.callLater(0.0, acquired_lock.release)
acquired_lock = None
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
@ -331,6 +342,8 @@ class PerDestinationQueue(object):
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
if acquired_lock:
self._reactor.callLater(0.0, acquired_lock.release)
def _get_rr_edus(self, force_flush):
if not self._pending_rrs:

View file

@ -23,6 +23,8 @@ from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
from synapse.metrics import LaterGauge
class TransactionManager(object):
"""Helper class which handles building and sending transactions
@ -40,6 +42,21 @@ class TransactionManager(object):
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
self.limiter = defer.DeferredSemaphore(50)
LaterGauge(
"synapse_federation_transaction_client_concurrency",
"",
[],
lambda: self.limiter.limit - self.limiter.tokens,
)
LaterGauge(
"synapse_federation_transaction_client_concurrency_queue",
"",
[],
lambda: len(self.limiter.waiting),
)
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):

View file

@ -33,6 +33,7 @@ class TransportLayerClient(object):
def __init__(self, hs):
self.server_name = hs.hostname
self.client = hs.get_http_client()
self.backoff_settings = hs.config.federation_backoff_settings
@log_function
def get_room_state(self, destination, room_id, event_id):
@ -181,6 +182,7 @@ class TransportLayerClient(object):
long_retries=True,
backoff_on_404=True, # If we get a 404 the other side has gone
try_trailing_slash_on_400=True,
retry_on_dns_fail=not self.backoff_settings.dns_resolution,
)
defer.returnValue(response)

View file

@ -57,6 +57,10 @@ class QuieterFileBodyProducer(FileBodyProducer):
"""
def stopProducing(self):
if not hasattr(self, "_task"):
# We haven't started producing yet
return
try:
FileBodyProducer.stopProducing(self)
except task.TaskStopped:

View file

@ -19,7 +19,10 @@ import random
import sys
from io import BytesIO
from OpenSSL import SSL
from six import PY3, raise_from, string_types
from service_identity.exceptions import VerificationError
from six.moves import urllib
import attr
@ -29,11 +32,23 @@ from prometheus_client import Counter
from signedjson.sign import sign_json
from zope.interface import implementer
from OpenSSL import SSL
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.internet.error import (
ConnectError,
ConnectionRefusedError,
DNSLookupError,
TimeoutError,
ConnectingCancelledError,
)
from twisted.internet.interfaces import IReactorPluggableNameResolver
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
from twisted.names.error import DNSServerError
from twisted.web._newclient import (
RequestTransmissionFailed,
ResponseDone,
ResponseNeverReceived,
)
from twisted.web.http_headers import Headers
import synapse.metrics
@ -173,6 +188,7 @@ class MatrixFederationHttpClient(object):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.backoff_settings = hs.config.federation_backoff_settings
real_reactor = hs.get_reactor()
@ -205,7 +221,7 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout = 60
self.default_timeout = self.backoff_settings.timeout_amount
def schedule(x):
self.reactor.callLater(_EPSILON, x)
@ -407,10 +423,65 @@ class MatrixFederationHttpClient(object):
response = yield request_deferred
except DNSLookupError as e:
raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
except Exception as e:
except DNSServerError as e:
# Their domain's nameserver is busted and can't give us a result
raise_from(
RequestSendFailed(
e, can_retry=not self.backoff_settings.dns_servfail
),
e,
)
except (RequestTransmissionFailed, ResponseNeverReceived) as e:
for i in e.reasons:
# If it's an OpenSSL error, they probably don't have
# a valid certificate or something else very bad went on.
if i.check(SSL.Error) or i.check(VerificationError):
if self.backoff_settings.invalid_tls:
raise_from(RequestSendFailed(e, can_retry=False), e)
elif i.check(TimeoutError, defer.CancelledError):
# If we backoff hard on timeout, raise it here.
if self.backoff_settings.on_timeout:
raise_from(RequestSendFailed(e, can_retry=False), e)
# If it's not that, raise it normally.
logger.info("Failed to send request: %s", e)
raise_from(RequestSendFailed(e, can_retry=True), e)
except (TimeoutError, ConnectingCancelledError) as e:
# Handle timeouts
if self.backoff_settings.on_timeout:
raise_from(RequestSendFailed(e, can_retry=False), e)
# If it's not that, raise it normally.
logger.info("Failed to send request: %s", e)
raise_from(RequestSendFailed(e, can_retry=True), e)
except (ConnectError, ConnectionRefusedError) as e:
if e.osError == 113 and self.backoff_settings.no_route_to_host:
# No route to host -- they're gone
raise_from(RequestSendFailed(e, can_retry=False), e)
elif (
e.osError == 111
and self.backoff_settings.refused_connection
):
# Refused connection -- they're gone
raise_from(RequestSendFailed(e, can_retry=False), e)
elif (
e.osError == 99
and self.backoff_settings.cannot_assign_address
):
# Cannot assign address -- don't try?
raise_from(RequestSendFailed(e, can_retry=False), e)
# Some other socket error, try retrying
logger.info("Failed to send request: %s", e)
raise_from(RequestSendFailed(e, can_retry=True), e)
except Exception as e:
logger.info("Failed to send request for unhandled reason: %s", e)
raise_from(RequestSendFailed(e, can_retry=True), e)
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
@ -456,9 +527,10 @@ class MatrixFederationHttpClient(object):
break
except RequestSendFailed as e:
logger.warn(
"{%s} [%s] Request failed: %s %s: %s",
"{%s} [%s] Request failed, %s: %s %s: %s",
request.txn_id,
request.destination,
"retrying" if e.can_retry else "not retrying",
request.method,
url_str,
_flatten_response_never_received(e.inner_exception),
@ -557,6 +629,7 @@ class MatrixFederationHttpClient(object):
ignore_backoff=False,
backoff_on_404=False,
try_trailing_slash_on_400=False,
retry_on_dns_fail=True,
):
""" Sends the specifed json data using PUT
@ -618,6 +691,7 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff,
long_retries=long_retries,
timeout=timeout,
retry_on_dns_fail=retry_on_dns_fail,
)
body = yield _handle_json_response(

View file

@ -22,6 +22,7 @@ them.
See doc/log_contexts.rst for details on how this works.
"""
import types
import logging
import threading
@ -544,6 +545,9 @@ def run_in_background(f, *args, **kwargs):
# by synchronous exceptions, so let's turn them into Failures.
return defer.fail()
if isinstance(res, types.CoroutineType):
res = defer.ensureDeferred(res)
if not isinstance(res, defer.Deferred):
return res

View file

@ -190,6 +190,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
json_data_callback=ANY,
long_retries=True,
backoff_on_404=True,
retry_on_dns_fail=False,
try_trailing_slash_on_400=True,
)
@ -263,6 +264,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
),
json_data_callback=ANY,
long_retries=True,
retry_on_dns_fail=False,
backoff_on_404=True,
try_trailing_slash_on_400=True,
)

View file

@ -25,7 +25,7 @@ from twisted.test.proto_helpers import AccumulatingProtocol
from twisted.web._newclient import ResponseDone
from tests import unittest
from tests.server import FakeTransport
from tests.server import FakeTransport, wait_until_result
@attr.s
@ -143,7 +143,7 @@ class URLPreviewTests(unittest.HomeserverTestCase):
+ self.end_content
)
self.pump()
wait_until_result(self.reactor, request)
self.assertEqual(channel.code, 200)
self.assertEqual(
channel.json_body, {"og:title": "~matrix~", "og:description": "hi"}
@ -154,9 +154,9 @@ class URLPreviewTests(unittest.HomeserverTestCase):
"GET", "url_preview?url=http://matrix.org", shorthand=False
)
request.render(self.preview_url)
self.pump()
# Check the cache response has the same content
wait_until_result(self.reactor, request)
self.assertEqual(channel.code, 200)
self.assertEqual(
channel.json_body, {"og:title": "~matrix~", "og:description": "hi"}
@ -172,9 +172,9 @@ class URLPreviewTests(unittest.HomeserverTestCase):
"GET", "url_preview?url=http://matrix.org", shorthand=False
)
request.render(self.preview_url)
self.pump()
# Check the cache response has the same content
wait_until_result(self.reactor, request)
self.assertEqual(channel.code, 200)
self.assertEqual(
channel.json_body, {"og:title": "~matrix~", "og:description": "hi"}

View file

@ -205,6 +205,7 @@ def wait_until_result(clock, request, timeout=100):
# If there's a producer, tell it to resume producing so we get content
if request._channel._producer:
print(request._channel._producer)
request._channel._producer.resumeProducing()
x += 1