Compare commits
42 commits
develop
...
hawkowl/fe
Author | SHA1 | Date | |
---|---|---|---|
|
ecd4f2341c | ||
|
6dd1ad36e9 | ||
|
02e8e895aa | ||
|
0b95b49401 | ||
|
d7ac3a57ae | ||
|
f1467e5972 | ||
|
45c0117abf | ||
|
8a58b9272b | ||
|
30ed28417e | ||
|
11c8e23419 | ||
|
a82457c5a3 | ||
|
d08ce1b4bf | ||
|
e6ad608a95 | ||
|
3960220cf2 | ||
|
84cbc5019c | ||
|
535e8ea79f | ||
|
69490ad447 | ||
|
ad0d7d4c9f | ||
|
bdad7ffaff | ||
|
411e4d8f54 | ||
|
f591732f9b | ||
|
10c6a180b2 | ||
|
7d35b7603f | ||
|
04b5175fcf | ||
|
7bb35a3288 | ||
|
929b2e4e1e | ||
|
4da90de3b2 | ||
|
32eb3fca89 | ||
|
0bcdce2237 | ||
|
c59f43d325 | ||
|
9ab19d1ed7 | ||
|
79b380c37a | ||
|
3aa6008925 | ||
|
b67c8ede83 | ||
|
baef252b1e | ||
|
fd53a11451 | ||
|
436decd055 | ||
|
5e7abc1d8e | ||
|
ca3eca49d8 | ||
|
1241094993 | ||
|
40142497b7 | ||
|
9abaa52ac1 |
1
changelog.d/5556.misc
Normal file
1
changelog.d/5556.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Treat more outgoing federation connection failures (like refused connection, dead domains, and no route to host) as fatal and not able to be retried immediately.
|
52
docker/Dockerfile-arm64v8
Normal file
52
docker/Dockerfile-arm64v8
Normal file
|
@ -0,0 +1,52 @@
|
|||
|
||||
FROM alpine
|
||||
|
||||
# Enable HTTPS support in wget.
|
||||
RUN apk add --no-cache --update openssl su-exec
|
||||
|
||||
# Download Nix and install it into the system.
|
||||
RUN wget https://nixos.org/releases/nix/nix-2.2.1/nix-2.2.1-$(uname -m)-linux.tar.bz2 \
|
||||
&& tar xjf nix-*-$(uname -m)-linux.tar.bz2 \
|
||||
&& addgroup -g 30000 -S nixbld \
|
||||
&& for i in $(seq 1 30); do adduser -S -D -h /var/empty -g "Nix build user $i" -u $((30000 + i)) -G nixbld nixbld$i ; done \
|
||||
&& mkdir -m 0755 /etc/nix \
|
||||
&& echo 'sandbox = false' > /etc/nix/nix.conf \
|
||||
&& mkdir -m 0755 /nix && USER=root sh nix-*-$(uname -m)-linux/install \
|
||||
&& ln -s /nix/var/nix/profiles/default/etc/profile.d/nix.sh /etc/profile.d/ \
|
||||
&& rm -r /nix-*-$(uname -m)-linux* \
|
||||
&& rm -rf /var/cache/apk/* \
|
||||
&& /nix/var/nix/profiles/default/bin/nix-collect-garbage --delete-old \
|
||||
&& /nix/var/nix/profiles/default/bin/nix-store --optimise \
|
||||
&& /nix/var/nix/profiles/default/bin/nix-store --verify --check-contents
|
||||
|
||||
ENV \
|
||||
ENV=/etc/profile \
|
||||
USER=root \
|
||||
PATH=/nix/var/nix/profiles/default/bin:/nix/var/nix/profiles/default/sbin:/bin:/sbin:/usr/bin:/usr/sbin \
|
||||
GIT_SSL_CAINFO=/nix/var/nix/profiles/default/etc/ssl/certs/ca-bundle.crt \
|
||||
NIX_SSL_CERT_FILE=/nix/var/nix/profiles/default/etc/ssl/certs/ca-bundle.crt \
|
||||
NIX_PATH=/nix/var/nix/profiles/per-user/root/channels \
|
||||
PYTHONPATH=/nix/var/nix/profiles/default/lib/python3.7/site-packages/
|
||||
|
||||
RUN nix-channel --add https://nixos.org/channels/nixpkgs-unstable && nix-channel --update
|
||||
|
||||
COPY ./docker/synapse-deps.nix /synapse-deps.nix
|
||||
|
||||
RUN nix-env -if /synapse-deps.nix
|
||||
|
||||
# now install synapse and all of the python deps to /install.
|
||||
|
||||
COPY synapse /synapse/synapse/
|
||||
COPY scripts /synapse/scripts/
|
||||
COPY MANIFEST.in README.rst setup.py synctl /synapse/
|
||||
|
||||
RUN pip install --no-warn-script-location "/synapse"[all]
|
||||
|
||||
COPY ./docker/start.py /start.py
|
||||
COPY ./docker/conf /conf
|
||||
|
||||
VOLUME ["/data"]
|
||||
|
||||
EXPOSE 8008/tcp 8009/tcp 8448/tcp
|
||||
|
||||
CMD ["/nix/var/nix/profiles/default/bin/python3", "/start.py"]
|
3
docker/synapse-deps.nix
Normal file
3
docker/synapse-deps.nix
Normal file
|
@ -0,0 +1,3 @@
|
|||
with import <nixpkgs> {};
|
||||
|
||||
python37.withPackages (ps: with ps; [ phonenumbers canonicaljson pymacaroons psycopg2 psutil parameterized pillow lxml pysaml2 bleach service-identity six frozendict pip idna pyasn1-modules netaddr jsonschema pynacl signedjson pyyaml pyopenssl mock bcrypt jinja2 msgpack attrs twisted pyasn1 treq sortedcontainers daemonize unpaddedbase64 ])
|
|
@ -18,6 +18,7 @@
|
|||
import logging
|
||||
import os.path
|
||||
|
||||
import attr
|
||||
from netaddr import IPSet
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
|
@ -324,6 +325,23 @@ class ServerConfig(Config):
|
|||
"cleanup_extremities_with_dummy_events", False
|
||||
)
|
||||
|
||||
@attr.s
|
||||
class FederationBackoffSettings(object):
|
||||
dns_resolution = attr.ib(default=False, type=bool)
|
||||
dns_servfail = attr.ib(default=False, type=bool)
|
||||
no_route_to_host = attr.ib(default=False, type=bool)
|
||||
refused_connection = attr.ib(default=False, type=bool)
|
||||
cannot_assign_address = attr.ib(default=False, type=bool)
|
||||
timeout_amount = attr.ib(default=60, type=int)
|
||||
on_timeout = attr.ib(default=False, type=bool)
|
||||
invalid_tls = attr.ib(default=True, type=bool)
|
||||
|
||||
federation_backoff_settings = config.get("federation_backoff", {})
|
||||
|
||||
self.federation_backoff_settings = FederationBackoffSettings(
|
||||
**federation_backoff_settings
|
||||
)
|
||||
|
||||
def has_tls_listener(self):
|
||||
return any(l["tls"] for l in self.listeners)
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ from synapse.federation.units import Edu
|
|||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.metrics import sent_transactions_counter
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.storage import UserPresenceState
|
||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
|
||||
|
@ -65,6 +66,7 @@ class PerDestinationQueue(object):
|
|||
self._server_name = hs.hostname
|
||||
self._clock = hs.get_clock()
|
||||
self._store = hs.get_datastore()
|
||||
self._reactor = hs.get_reactor()
|
||||
self._transaction_manager = transaction_manager
|
||||
|
||||
self._destination = destination
|
||||
|
@ -178,6 +180,7 @@ class PerDestinationQueue(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _transaction_transmission_loop(self):
|
||||
acquired_lock = None
|
||||
pending_pdus = []
|
||||
try:
|
||||
self.transmission_loop_running = True
|
||||
|
@ -202,6 +205,9 @@ class PerDestinationQueue(object):
|
|||
yield self._get_to_device_message_edus(limit)
|
||||
)
|
||||
|
||||
# Lock before we start doing work
|
||||
acquired_lock = yield make_deferred_yieldable(self._transaction_manager.limiter.acquire())
|
||||
|
||||
pending_edus = device_update_edus + to_device_edus
|
||||
|
||||
# BEGIN CRITICAL SECTION
|
||||
|
@ -270,6 +276,7 @@ class PerDestinationQueue(object):
|
|||
success = yield self._transaction_manager.send_new_transaction(
|
||||
self._destination, pending_pdus, pending_edus
|
||||
)
|
||||
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
|
@ -295,6 +302,10 @@ class PerDestinationQueue(object):
|
|||
self._last_device_list_stream_id = dev_list_id
|
||||
else:
|
||||
break
|
||||
|
||||
# Release the lock after all the work is done
|
||||
self._reactor.callLater(0.0, acquired_lock.release)
|
||||
acquired_lock = None
|
||||
except NotRetryingDestination as e:
|
||||
logger.debug(
|
||||
"TX [%s] not ready for retry yet (next retry at %s) - "
|
||||
|
@ -331,6 +342,8 @@ class PerDestinationQueue(object):
|
|||
finally:
|
||||
# We want to be *very* sure we clear this after we stop processing
|
||||
self.transmission_loop_running = False
|
||||
if acquired_lock:
|
||||
self._reactor.callLater(0.0, acquired_lock.release)
|
||||
|
||||
def _get_rr_edus(self, force_flush):
|
||||
if not self._pending_rrs:
|
||||
|
|
|
@ -23,6 +23,8 @@ from synapse.util.metrics import measure_func
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from synapse.metrics import LaterGauge
|
||||
|
||||
|
||||
class TransactionManager(object):
|
||||
"""Helper class which handles building and sending transactions
|
||||
|
@ -40,6 +42,21 @@ class TransactionManager(object):
|
|||
# HACK to get unique tx id
|
||||
self._next_txn_id = int(self.clock.time_msec())
|
||||
|
||||
self.limiter = defer.DeferredSemaphore(50)
|
||||
|
||||
LaterGauge(
|
||||
"synapse_federation_transaction_client_concurrency",
|
||||
"",
|
||||
[],
|
||||
lambda: self.limiter.limit - self.limiter.tokens,
|
||||
)
|
||||
LaterGauge(
|
||||
"synapse_federation_transaction_client_concurrency_queue",
|
||||
"",
|
||||
[],
|
||||
lambda: len(self.limiter.waiting),
|
||||
)
|
||||
|
||||
@measure_func("_send_new_transaction")
|
||||
@defer.inlineCallbacks
|
||||
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||
|
|
|
@ -33,6 +33,7 @@ class TransportLayerClient(object):
|
|||
def __init__(self, hs):
|
||||
self.server_name = hs.hostname
|
||||
self.client = hs.get_http_client()
|
||||
self.backoff_settings = hs.config.federation_backoff_settings
|
||||
|
||||
@log_function
|
||||
def get_room_state(self, destination, room_id, event_id):
|
||||
|
@ -181,6 +182,7 @@ class TransportLayerClient(object):
|
|||
long_retries=True,
|
||||
backoff_on_404=True, # If we get a 404 the other side has gone
|
||||
try_trailing_slash_on_400=True,
|
||||
retry_on_dns_fail=not self.backoff_settings.dns_resolution,
|
||||
)
|
||||
|
||||
defer.returnValue(response)
|
||||
|
|
|
@ -57,6 +57,10 @@ class QuieterFileBodyProducer(FileBodyProducer):
|
|||
"""
|
||||
|
||||
def stopProducing(self):
|
||||
if not hasattr(self, "_task"):
|
||||
# We haven't started producing yet
|
||||
return
|
||||
|
||||
try:
|
||||
FileBodyProducer.stopProducing(self)
|
||||
except task.TaskStopped:
|
||||
|
|
|
@ -19,7 +19,10 @@ import random
|
|||
import sys
|
||||
from io import BytesIO
|
||||
|
||||
from OpenSSL import SSL
|
||||
|
||||
from six import PY3, raise_from, string_types
|
||||
from service_identity.exceptions import VerificationError
|
||||
from six.moves import urllib
|
||||
|
||||
import attr
|
||||
|
@ -29,11 +32,23 @@ from prometheus_client import Counter
|
|||
from signedjson.sign import sign_json
|
||||
from zope.interface import implementer
|
||||
|
||||
from OpenSSL import SSL
|
||||
from twisted.internet import defer, protocol
|
||||
from twisted.internet.error import DNSLookupError
|
||||
from twisted.internet.error import (
|
||||
ConnectError,
|
||||
ConnectionRefusedError,
|
||||
DNSLookupError,
|
||||
TimeoutError,
|
||||
ConnectingCancelledError,
|
||||
)
|
||||
from twisted.internet.interfaces import IReactorPluggableNameResolver
|
||||
from twisted.internet.task import _EPSILON, Cooperator
|
||||
from twisted.web._newclient import ResponseDone
|
||||
from twisted.names.error import DNSServerError
|
||||
from twisted.web._newclient import (
|
||||
RequestTransmissionFailed,
|
||||
ResponseDone,
|
||||
ResponseNeverReceived,
|
||||
)
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
import synapse.metrics
|
||||
|
@ -173,6 +188,7 @@ class MatrixFederationHttpClient(object):
|
|||
self.hs = hs
|
||||
self.signing_key = hs.config.signing_key[0]
|
||||
self.server_name = hs.hostname
|
||||
self.backoff_settings = hs.config.federation_backoff_settings
|
||||
|
||||
real_reactor = hs.get_reactor()
|
||||
|
||||
|
@ -205,7 +221,7 @@ class MatrixFederationHttpClient(object):
|
|||
self.clock = hs.get_clock()
|
||||
self._store = hs.get_datastore()
|
||||
self.version_string_bytes = hs.version_string.encode("ascii")
|
||||
self.default_timeout = 60
|
||||
self.default_timeout = self.backoff_settings.timeout_amount
|
||||
|
||||
def schedule(x):
|
||||
self.reactor.callLater(_EPSILON, x)
|
||||
|
@ -407,10 +423,65 @@ class MatrixFederationHttpClient(object):
|
|||
response = yield request_deferred
|
||||
except DNSLookupError as e:
|
||||
raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
|
||||
except Exception as e:
|
||||
except DNSServerError as e:
|
||||
# Their domain's nameserver is busted and can't give us a result
|
||||
raise_from(
|
||||
RequestSendFailed(
|
||||
e, can_retry=not self.backoff_settings.dns_servfail
|
||||
),
|
||||
e,
|
||||
)
|
||||
except (RequestTransmissionFailed, ResponseNeverReceived) as e:
|
||||
for i in e.reasons:
|
||||
# If it's an OpenSSL error, they probably don't have
|
||||
# a valid certificate or something else very bad went on.
|
||||
if i.check(SSL.Error) or i.check(VerificationError):
|
||||
if self.backoff_settings.invalid_tls:
|
||||
raise_from(RequestSendFailed(e, can_retry=False), e)
|
||||
|
||||
elif i.check(TimeoutError, defer.CancelledError):
|
||||
# If we backoff hard on timeout, raise it here.
|
||||
if self.backoff_settings.on_timeout:
|
||||
raise_from(RequestSendFailed(e, can_retry=False), e)
|
||||
|
||||
# If it's not that, raise it normally.
|
||||
logger.info("Failed to send request: %s", e)
|
||||
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||
|
||||
except (TimeoutError, ConnectingCancelledError) as e:
|
||||
# Handle timeouts
|
||||
if self.backoff_settings.on_timeout:
|
||||
raise_from(RequestSendFailed(e, can_retry=False), e)
|
||||
|
||||
# If it's not that, raise it normally.
|
||||
logger.info("Failed to send request: %s", e)
|
||||
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||
|
||||
except (ConnectError, ConnectionRefusedError) as e:
|
||||
if e.osError == 113 and self.backoff_settings.no_route_to_host:
|
||||
# No route to host -- they're gone
|
||||
raise_from(RequestSendFailed(e, can_retry=False), e)
|
||||
elif (
|
||||
e.osError == 111
|
||||
and self.backoff_settings.refused_connection
|
||||
):
|
||||
# Refused connection -- they're gone
|
||||
raise_from(RequestSendFailed(e, can_retry=False), e)
|
||||
elif (
|
||||
e.osError == 99
|
||||
and self.backoff_settings.cannot_assign_address
|
||||
):
|
||||
# Cannot assign address -- don't try?
|
||||
raise_from(RequestSendFailed(e, can_retry=False), e)
|
||||
|
||||
# Some other socket error, try retrying
|
||||
logger.info("Failed to send request: %s", e)
|
||||
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||
|
||||
except Exception as e:
|
||||
logger.info("Failed to send request for unhandled reason: %s", e)
|
||||
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||
|
||||
logger.info(
|
||||
"{%s} [%s] Got response headers: %d %s",
|
||||
request.txn_id,
|
||||
|
@ -456,9 +527,10 @@ class MatrixFederationHttpClient(object):
|
|||
break
|
||||
except RequestSendFailed as e:
|
||||
logger.warn(
|
||||
"{%s} [%s] Request failed: %s %s: %s",
|
||||
"{%s} [%s] Request failed, %s: %s %s: %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
"retrying" if e.can_retry else "not retrying",
|
||||
request.method,
|
||||
url_str,
|
||||
_flatten_response_never_received(e.inner_exception),
|
||||
|
@ -557,6 +629,7 @@ class MatrixFederationHttpClient(object):
|
|||
ignore_backoff=False,
|
||||
backoff_on_404=False,
|
||||
try_trailing_slash_on_400=False,
|
||||
retry_on_dns_fail=True,
|
||||
):
|
||||
""" Sends the specifed json data using PUT
|
||||
|
||||
|
@ -618,6 +691,7 @@ class MatrixFederationHttpClient(object):
|
|||
ignore_backoff=ignore_backoff,
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
)
|
||||
|
||||
body = yield _handle_json_response(
|
||||
|
|
|
@ -22,6 +22,7 @@ them.
|
|||
See doc/log_contexts.rst for details on how this works.
|
||||
"""
|
||||
|
||||
import types
|
||||
import logging
|
||||
import threading
|
||||
|
||||
|
@ -544,6 +545,9 @@ def run_in_background(f, *args, **kwargs):
|
|||
# by synchronous exceptions, so let's turn them into Failures.
|
||||
return defer.fail()
|
||||
|
||||
if isinstance(res, types.CoroutineType):
|
||||
res = defer.ensureDeferred(res)
|
||||
|
||||
if not isinstance(res, defer.Deferred):
|
||||
return res
|
||||
|
||||
|
|
|
@ -190,6 +190,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
|||
json_data_callback=ANY,
|
||||
long_retries=True,
|
||||
backoff_on_404=True,
|
||||
retry_on_dns_fail=False,
|
||||
try_trailing_slash_on_400=True,
|
||||
)
|
||||
|
||||
|
@ -263,6 +264,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
|||
),
|
||||
json_data_callback=ANY,
|
||||
long_retries=True,
|
||||
retry_on_dns_fail=False,
|
||||
backoff_on_404=True,
|
||||
try_trailing_slash_on_400=True,
|
||||
)
|
||||
|
|
|
@ -25,7 +25,7 @@ from twisted.test.proto_helpers import AccumulatingProtocol
|
|||
from twisted.web._newclient import ResponseDone
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import FakeTransport
|
||||
from tests.server import FakeTransport, wait_until_result
|
||||
|
||||
|
||||
@attr.s
|
||||
|
@ -143,7 +143,7 @@ class URLPreviewTests(unittest.HomeserverTestCase):
|
|||
+ self.end_content
|
||||
)
|
||||
|
||||
self.pump()
|
||||
wait_until_result(self.reactor, request)
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(
|
||||
channel.json_body, {"og:title": "~matrix~", "og:description": "hi"}
|
||||
|
@ -154,9 +154,9 @@ class URLPreviewTests(unittest.HomeserverTestCase):
|
|||
"GET", "url_preview?url=http://matrix.org", shorthand=False
|
||||
)
|
||||
request.render(self.preview_url)
|
||||
self.pump()
|
||||
|
||||
# Check the cache response has the same content
|
||||
wait_until_result(self.reactor, request)
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(
|
||||
channel.json_body, {"og:title": "~matrix~", "og:description": "hi"}
|
||||
|
@ -172,9 +172,9 @@ class URLPreviewTests(unittest.HomeserverTestCase):
|
|||
"GET", "url_preview?url=http://matrix.org", shorthand=False
|
||||
)
|
||||
request.render(self.preview_url)
|
||||
self.pump()
|
||||
|
||||
# Check the cache response has the same content
|
||||
wait_until_result(self.reactor, request)
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(
|
||||
channel.json_body, {"og:title": "~matrix~", "og:description": "hi"}
|
||||
|
|
|
@ -205,6 +205,7 @@ def wait_until_result(clock, request, timeout=100):
|
|||
|
||||
# If there's a producer, tell it to resume producing so we get content
|
||||
if request._channel._producer:
|
||||
print(request._channel._producer)
|
||||
request._channel._producer.resumeProducing()
|
||||
|
||||
x += 1
|
||||
|
|
Loading…
Reference in a new issue