forked from MirrorHub/synapse
Move well known lookup into a separate clas
This commit is contained in:
parent
af9f1c0764
commit
107ad133fc
3 changed files with 216 additions and 173 deletions
|
@ -12,10 +12,8 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
||||
import attr
|
||||
from netaddr import IPAddress
|
||||
|
@ -24,34 +22,16 @@ from zope.interface import implementer
|
|||
from twisted.internet import defer
|
||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||
from twisted.internet.interfaces import IStreamClientEndpoint
|
||||
from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
|
||||
from twisted.web.http import stringToDatetime
|
||||
from twisted.web.client import URI, Agent, HTTPConnectionPool
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IAgent
|
||||
|
||||
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
|
||||
from synapse.http.federation.well_known_resolver import WellKnownResolver
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
# period to cache .well-known results for by default
|
||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
|
||||
|
||||
# jitter to add to the .well-known default cache ttl
|
||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
|
||||
|
||||
# period to cache failure to fetch .well-known for
|
||||
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
|
||||
|
||||
# cap for .well-known cache period
|
||||
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
|
||||
|
||||
# lower bound for .well-known cache period
|
||||
WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
well_known_cache = TTLCache("well-known")
|
||||
|
||||
|
||||
@implementer(IAgent)
|
||||
|
@ -81,7 +61,7 @@ class MatrixFederationAgent(object):
|
|||
reactor,
|
||||
tls_client_options_factory,
|
||||
_srv_resolver=None,
|
||||
_well_known_cache=well_known_cache,
|
||||
_well_known_cache=None,
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
@ -96,20 +76,15 @@ class MatrixFederationAgent(object):
|
|||
self._pool.maxPersistentPerHost = 5
|
||||
self._pool.cachedConnectionTimeout = 2 * 60
|
||||
|
||||
_well_known_agent = RedirectAgent(
|
||||
Agent(
|
||||
self._well_known_resolver = WellKnownResolver(
|
||||
self._reactor,
|
||||
agent=Agent(
|
||||
self._reactor,
|
||||
pool=self._pool,
|
||||
contextFactory=tls_client_options_factory,
|
||||
),
|
||||
well_known_cache=_well_known_cache,
|
||||
)
|
||||
)
|
||||
self._well_known_agent = _well_known_agent
|
||||
|
||||
# our cache of .well-known lookup results, mapping from server name
|
||||
# to delegated name. The values can be:
|
||||
# `bytes`: a valid server-name
|
||||
# `None`: there is no (valid) .well-known here
|
||||
self._well_known_cache = _well_known_cache
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def request(self, method, uri, headers=None, bodyProducer=None):
|
||||
|
@ -220,7 +195,10 @@ class MatrixFederationAgent(object):
|
|||
|
||||
if lookup_well_known:
|
||||
# try a .well-known lookup
|
||||
well_known_server = yield self._get_well_known(parsed_uri.host)
|
||||
well_known_result = yield self._well_known_resolver.get_well_known(
|
||||
parsed_uri.host
|
||||
)
|
||||
well_known_server = well_known_result.delegated_server
|
||||
|
||||
if well_known_server:
|
||||
# if we found a .well-known, start again, but don't do another
|
||||
|
@ -283,86 +261,6 @@ class MatrixFederationAgent(object):
|
|||
target_port=port,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_well_known(self, server_name):
|
||||
"""Attempt to fetch and parse a .well-known file for the given server
|
||||
|
||||
Args:
|
||||
server_name (bytes): name of the server, from the requested url
|
||||
|
||||
Returns:
|
||||
Deferred[bytes|None]: either the new server name, from the .well-known, or
|
||||
None if there was no .well-known file.
|
||||
"""
|
||||
try:
|
||||
result = self._well_known_cache[server_name]
|
||||
except KeyError:
|
||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||
# requests for the same server in parallel?
|
||||
with Measure(self._clock, "get_well_known"):
|
||||
result, cache_period = yield self._do_get_well_known(server_name)
|
||||
|
||||
if cache_period > 0:
|
||||
self._well_known_cache.set(server_name, result, cache_period)
|
||||
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_get_well_known(self, server_name):
|
||||
"""Actually fetch and parse a .well-known, without checking the cache
|
||||
|
||||
Args:
|
||||
server_name (bytes): name of the server, from the requested url
|
||||
|
||||
Returns:
|
||||
Deferred[Tuple[bytes|None|object],int]:
|
||||
result, cache period, where result is one of:
|
||||
- the new server name from the .well-known (as a `bytes`)
|
||||
- None if there was no .well-known file.
|
||||
- INVALID_WELL_KNOWN if the .well-known was invalid
|
||||
"""
|
||||
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
||||
uri_str = uri.decode("ascii")
|
||||
logger.info("Fetching %s", uri_str)
|
||||
try:
|
||||
response = yield make_deferred_yieldable(
|
||||
self._well_known_agent.request(b"GET", uri)
|
||||
)
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
if response.code != 200:
|
||||
raise Exception("Non-200 response %s" % (response.code,))
|
||||
|
||||
parsed_body = json.loads(body.decode("utf-8"))
|
||||
logger.info("Response from .well-known: %s", parsed_body)
|
||||
if not isinstance(parsed_body, dict):
|
||||
raise Exception("not a dict")
|
||||
if "m.server" not in parsed_body:
|
||||
raise Exception("Missing key 'm.server'")
|
||||
except Exception as e:
|
||||
logger.info("Error fetching %s: %s", uri_str, e)
|
||||
|
||||
# add some randomness to the TTL to avoid a stampeding herd every hour
|
||||
# after startup
|
||||
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
return (None, cache_period)
|
||||
|
||||
result = parsed_body["m.server"].encode("ascii")
|
||||
|
||||
cache_period = _cache_period_from_headers(
|
||||
response.headers, time_now=self._reactor.seconds
|
||||
)
|
||||
if cache_period is None:
|
||||
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
|
||||
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
|
||||
# after startup
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
else:
|
||||
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
|
||||
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
|
||||
|
||||
return (result, cache_period)
|
||||
|
||||
|
||||
@implementer(IStreamClientEndpoint)
|
||||
class LoggingHostnameEndpoint(object):
|
||||
|
@ -378,44 +276,6 @@ class LoggingHostnameEndpoint(object):
|
|||
return self.ep.connect(protocol_factory)
|
||||
|
||||
|
||||
def _cache_period_from_headers(headers, time_now=time.time):
|
||||
cache_controls = _parse_cache_control(headers)
|
||||
|
||||
if b"no-store" in cache_controls:
|
||||
return 0
|
||||
|
||||
if b"max-age" in cache_controls:
|
||||
try:
|
||||
max_age = int(cache_controls[b"max-age"])
|
||||
return max_age
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
expires = headers.getRawHeaders(b"expires")
|
||||
if expires is not None:
|
||||
try:
|
||||
expires_date = stringToDatetime(expires[-1])
|
||||
return expires_date - time_now()
|
||||
except ValueError:
|
||||
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
|
||||
# especially the value "0", as representing a time in the past (i.e.,
|
||||
# "already expired").
|
||||
return 0
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _parse_cache_control(headers):
|
||||
cache_controls = {}
|
||||
for hdr in headers.getRawHeaders(b"cache-control", []):
|
||||
for directive in hdr.split(b","):
|
||||
splits = [x.strip() for x in directive.split(b"=", 1)]
|
||||
k = splits[0].lower()
|
||||
v = splits[1] if len(splits) > 1 else None
|
||||
cache_controls[k] = v
|
||||
return cache_controls
|
||||
|
||||
|
||||
@attr.s
|
||||
class _RoutingResult(object):
|
||||
"""The result returned by `_route_matrix_uri`.
|
||||
|
|
184
synapse/http/federation/well_known_resolver.py
Normal file
184
synapse/http/federation/well_known_resolver.py
Normal file
|
@ -0,0 +1,184 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
||||
import attr
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.web.client import RedirectAgent, readBody
|
||||
from twisted.web.http import stringToDatetime
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
# period to cache .well-known results for by default
|
||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
|
||||
|
||||
# jitter to add to the .well-known default cache ttl
|
||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
|
||||
|
||||
# period to cache failure to fetch .well-known for
|
||||
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
|
||||
|
||||
# cap for .well-known cache period
|
||||
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
|
||||
|
||||
# lower bound for .well-known cache period
|
||||
WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class WellKnownLookupResult(object):
|
||||
delegated_server = attr.ib()
|
||||
|
||||
|
||||
class WellKnownResolver(object):
|
||||
"""Handles well-known lookups for matrix servers.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor, agent, well_known_cache=None):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
||||
if well_known_cache is None:
|
||||
well_known_cache = TTLCache("well-known")
|
||||
|
||||
self._well_known_cache = well_known_cache
|
||||
self._well_known_agent = RedirectAgent(agent)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_well_known(self, server_name):
|
||||
"""Attempt to fetch and parse a .well-known file for the given server
|
||||
|
||||
Args:
|
||||
server_name (bytes): name of the server, from the requested url
|
||||
|
||||
Returns:
|
||||
Deferred[WellKnownLookupResult]: The result of the lookup
|
||||
"""
|
||||
try:
|
||||
result = self._well_known_cache[server_name]
|
||||
except KeyError:
|
||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||
# requests for the same server in parallel?
|
||||
with Measure(self._clock, "get_well_known"):
|
||||
result, cache_period = yield self._do_get_well_known(server_name)
|
||||
|
||||
if cache_period > 0:
|
||||
self._well_known_cache.set(server_name, result, cache_period)
|
||||
|
||||
return WellKnownLookupResult(delegated_server=result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_get_well_known(self, server_name):
|
||||
"""Actually fetch and parse a .well-known, without checking the cache
|
||||
|
||||
Args:
|
||||
server_name (bytes): name of the server, from the requested url
|
||||
|
||||
Returns:
|
||||
Deferred[Tuple[bytes|None|object],int]:
|
||||
result, cache period, where result is one of:
|
||||
- the new server name from the .well-known (as a `bytes`)
|
||||
- None if there was no .well-known file.
|
||||
- INVALID_WELL_KNOWN if the .well-known was invalid
|
||||
"""
|
||||
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
||||
uri_str = uri.decode("ascii")
|
||||
logger.info("Fetching %s", uri_str)
|
||||
try:
|
||||
response = yield make_deferred_yieldable(
|
||||
self._well_known_agent.request(b"GET", uri)
|
||||
)
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
if response.code != 200:
|
||||
raise Exception("Non-200 response %s" % (response.code,))
|
||||
|
||||
parsed_body = json.loads(body.decode("utf-8"))
|
||||
logger.info("Response from .well-known: %s", parsed_body)
|
||||
if not isinstance(parsed_body, dict):
|
||||
raise Exception("not a dict")
|
||||
if "m.server" not in parsed_body:
|
||||
raise Exception("Missing key 'm.server'")
|
||||
except Exception as e:
|
||||
logger.info("Error fetching %s: %s", uri_str, e)
|
||||
|
||||
# add some randomness to the TTL to avoid a stampeding herd every hour
|
||||
# after startup
|
||||
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
return (None, cache_period)
|
||||
|
||||
result = parsed_body["m.server"].encode("ascii")
|
||||
|
||||
cache_period = _cache_period_from_headers(
|
||||
response.headers, time_now=self._reactor.seconds
|
||||
)
|
||||
if cache_period is None:
|
||||
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
|
||||
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
|
||||
# after startup
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
else:
|
||||
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
|
||||
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
|
||||
|
||||
return (result, cache_period)
|
||||
|
||||
|
||||
def _cache_period_from_headers(headers, time_now=time.time):
|
||||
cache_controls = _parse_cache_control(headers)
|
||||
|
||||
if b"no-store" in cache_controls:
|
||||
return 0
|
||||
|
||||
if b"max-age" in cache_controls:
|
||||
try:
|
||||
max_age = int(cache_controls[b"max-age"])
|
||||
return max_age
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
expires = headers.getRawHeaders(b"expires")
|
||||
if expires is not None:
|
||||
try:
|
||||
expires_date = stringToDatetime(expires[-1])
|
||||
return expires_date - time_now()
|
||||
except ValueError:
|
||||
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
|
||||
# especially the value "0", as representing a time in the past (i.e.,
|
||||
# "already expired").
|
||||
return 0
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _parse_cache_control(headers):
|
||||
cache_controls = {}
|
||||
for hdr in headers.getRawHeaders(b"cache-control", []):
|
||||
for directive in hdr.split(b","):
|
||||
splits = [x.strip() for x in directive.split(b"=", 1)]
|
||||
k = splits[0].lower()
|
||||
v = splits[1] if len(splits) > 1 else None
|
||||
cache_controls[k] = v
|
||||
return cache_controls
|
|
@ -25,17 +25,19 @@ from twisted.internet._sslverify import ClientTLSOptions, OpenSSLCertificateOpti
|
|||
from twisted.internet.protocol import Factory
|
||||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||
from twisted.web._newclient import ResponseNeverReceived
|
||||
from twisted.web.client import Agent
|
||||
from twisted.web.http import HTTPChannel
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IPolicyForHTTPS
|
||||
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.crypto.context_factory import ClientTLSOptionsFactory
|
||||
from synapse.http.federation.matrix_federation_agent import (
|
||||
MatrixFederationAgent,
|
||||
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
||||
from synapse.http.federation.srv_resolver import Server
|
||||
from synapse.http.federation.well_known_resolver import (
|
||||
WellKnownResolver,
|
||||
_cache_period_from_headers,
|
||||
)
|
||||
from synapse.http.federation.srv_resolver import Server
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
|
||||
|
@ -79,9 +81,10 @@ class MatrixFederationAgentTests(TestCase):
|
|||
self._config = config = HomeServerConfig()
|
||||
config.parse_config_dict(config_dict, "", "")
|
||||
|
||||
self.tls_factory = ClientTLSOptionsFactory(config)
|
||||
self.agent = MatrixFederationAgent(
|
||||
reactor=self.reactor,
|
||||
tls_client_options_factory=ClientTLSOptionsFactory(config),
|
||||
tls_client_options_factory=self.tls_factory,
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_cache=self.well_known_cache,
|
||||
)
|
||||
|
@ -928,20 +931,16 @@ class MatrixFederationAgentTests(TestCase):
|
|||
self.reactor.pump((0.1,))
|
||||
self.successResultOf(test_d)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_get_well_known(self, serv):
|
||||
try:
|
||||
result = yield self.agent._get_well_known(serv)
|
||||
logger.info("Result from well-known fetch: %s", result)
|
||||
except Exception as e:
|
||||
logger.warning("Error fetching well-known: %s", e)
|
||||
raise
|
||||
return result
|
||||
|
||||
def test_well_known_cache(self):
|
||||
well_known_resolver = WellKnownResolver(
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
well_known_cache=self.well_known_cache,
|
||||
)
|
||||
|
||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||
|
||||
fetch_d = self.do_get_well_known(b"testserv")
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
# there should be an attempt to connect on port 443 for the .well-known
|
||||
clients = self.reactor.tcpClients
|
||||
|
@ -958,21 +957,21 @@ class MatrixFederationAgentTests(TestCase):
|
|||
)
|
||||
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r, b"target-server")
|
||||
self.assertEqual(r.delegated_server, b"target-server")
|
||||
|
||||
# close the tcp connection
|
||||
well_known_server.loseConnection()
|
||||
|
||||
# repeat the request: it should hit the cache
|
||||
fetch_d = self.do_get_well_known(b"testserv")
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r, b"target-server")
|
||||
self.assertEqual(r.delegated_server, b"target-server")
|
||||
|
||||
# expire the cache
|
||||
self.reactor.pump((1000.0,))
|
||||
|
||||
# now it should connect again
|
||||
fetch_d = self.do_get_well_known(b"testserv")
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
self.assertEqual(len(clients), 1)
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
|
@ -986,7 +985,7 @@ class MatrixFederationAgentTests(TestCase):
|
|||
)
|
||||
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r, b"other-server")
|
||||
self.assertEqual(r.delegated_server, b"other-server")
|
||||
|
||||
|
||||
class TestCachePeriodFromHeaders(TestCase):
|
||||
|
|
Loading…
Reference in a new issue