diff --git a/changelog.d/9391.bugfix b/changelog.d/9391.bugfix new file mode 100644 index 0000000000..b5e68e2ac7 --- /dev/null +++ b/changelog.d/9391.bugfix @@ -0,0 +1 @@ +Fix bug where Synapse would occaisonally stop reconnecting after the connection was lost. diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index fdd087683b..89f8af0f36 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -15,8 +15,9 @@ import logging from inspect import isawaitable -from typing import TYPE_CHECKING, Optional, Type, cast +from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast +import attr import txredisapi from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable @@ -42,6 +43,24 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +T = TypeVar("T") +V = TypeVar("V") + + +@attr.s +class ConstantProperty(Generic[T, V]): + """A descriptor that returns the given constant, ignoring attempts to set + it. + """ + + constant = attr.ib() # type: V + + def __get__(self, obj: Optional[T], objtype: Type[T] = None) -> V: + return self.constant + + def __set__(self, obj: Optional[T], value: V): + pass + class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): """Connection to redis subscribed to replication stream. @@ -195,6 +214,10 @@ class SynapseRedisFactory(txredisapi.RedisFactory): we detect dead connections. """ + # We want to *always* retry connecting, txredisapi will stop if there is a + # failure during certain operations, e.g. during AUTH. + continueTrying = cast(bool, ConstantProperty(True)) + def __init__( self, hs: "HomeServer", @@ -243,7 +266,6 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): """ maxDelay = 5 - continueTrying = True protocol = RedisSubscriber def __init__(