mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 21:53:50 +01:00
Limit the size of HTTP responses read over federation. (#9833)
This commit is contained in:
parent
c1ddbbde4f
commit
51a20914a8
4 changed files with 110 additions and 8 deletions
1
changelog.d/9833.bugfix
Normal file
1
changelog.d/9833.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Limit the size of HTTP responses read over federation.
|
|
@ -33,6 +33,7 @@ import treq
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
from netaddr import AddrFormatError, IPAddress, IPSet
|
from netaddr import AddrFormatError, IPAddress, IPSet
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
from typing_extensions import Protocol
|
||||||
from zope.interface import implementer, provider
|
from zope.interface import implementer, provider
|
||||||
|
|
||||||
from OpenSSL import SSL
|
from OpenSSL import SSL
|
||||||
|
@ -754,6 +755,16 @@ def _timeout_to_request_timed_out_error(f: Failure):
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
|
||||||
|
class ByteWriteable(Protocol):
|
||||||
|
"""The type of object which must be passed into read_body_with_max_size.
|
||||||
|
|
||||||
|
Typically this is a file object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def write(self, data: bytes) -> int:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class BodyExceededMaxSize(Exception):
|
class BodyExceededMaxSize(Exception):
|
||||||
"""The maximum allowed size of the HTTP body was exceeded."""
|
"""The maximum allowed size of the HTTP body was exceeded."""
|
||||||
|
|
||||||
|
@ -790,7 +801,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
|
||||||
transport = None # type: Optional[ITCPTransport]
|
transport = None # type: Optional[ITCPTransport]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int]
|
self, stream: ByteWriteable, deferred: defer.Deferred, max_size: Optional[int]
|
||||||
):
|
):
|
||||||
self.stream = stream
|
self.stream = stream
|
||||||
self.deferred = deferred
|
self.deferred = deferred
|
||||||
|
@ -830,7 +841,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
|
||||||
|
|
||||||
|
|
||||||
def read_body_with_max_size(
|
def read_body_with_max_size(
|
||||||
response: IResponse, stream: BinaryIO, max_size: Optional[int]
|
response: IResponse, stream: ByteWriteable, max_size: Optional[int]
|
||||||
) -> defer.Deferred:
|
) -> defer.Deferred:
|
||||||
"""
|
"""
|
||||||
Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.
|
Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
|
||||||
# Copyright 2018 New Vector Ltd
|
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -13,11 +12,13 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import cgi
|
import cgi
|
||||||
|
import codecs
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
import sys
|
import sys
|
||||||
|
import typing
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from io import BytesIO
|
from io import BytesIO, StringIO
|
||||||
from typing import Callable, Dict, List, Optional, Tuple, Union
|
from typing import Callable, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -72,6 +73,9 @@ incoming_responses_counter = Counter(
|
||||||
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
|
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# a federation response can be rather large (eg a big state_ids is 50M or so), so we
|
||||||
|
# need a generous limit here.
|
||||||
|
MAX_RESPONSE_SIZE = 100 * 1024 * 1024
|
||||||
|
|
||||||
MAX_LONG_RETRIES = 10
|
MAX_LONG_RETRIES = 10
|
||||||
MAX_SHORT_RETRIES = 3
|
MAX_SHORT_RETRIES = 3
|
||||||
|
@ -167,12 +171,27 @@ async def _handle_json_response(
|
||||||
try:
|
try:
|
||||||
check_content_type_is_json(response.headers)
|
check_content_type_is_json(response.headers)
|
||||||
|
|
||||||
# Use the custom JSON decoder (partially re-implements treq.json_content).
|
buf = StringIO()
|
||||||
d = treq.text_content(response, encoding="utf-8")
|
d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
|
||||||
d.addCallback(json_decoder.decode)
|
|
||||||
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
|
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
|
||||||
|
|
||||||
|
def parse(_len: int):
|
||||||
|
return json_decoder.decode(buf.getvalue())
|
||||||
|
|
||||||
|
d.addCallback(parse)
|
||||||
|
|
||||||
body = await make_deferred_yieldable(d)
|
body = await make_deferred_yieldable(d)
|
||||||
|
except BodyExceededMaxSize as e:
|
||||||
|
# The response was too big.
|
||||||
|
logger.warning(
|
||||||
|
"{%s} [%s] JSON response exceeded max size %i - %s %s",
|
||||||
|
request.txn_id,
|
||||||
|
request.destination,
|
||||||
|
MAX_RESPONSE_SIZE,
|
||||||
|
request.method,
|
||||||
|
request.uri.decode("ascii"),
|
||||||
|
)
|
||||||
|
raise RequestSendFailed(e, can_retry=False) from e
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
# The JSON content was invalid.
|
# The JSON content was invalid.
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
@ -218,6 +237,18 @@ async def _handle_json_response(
|
||||||
return body
|
return body
|
||||||
|
|
||||||
|
|
||||||
|
class BinaryIOWrapper:
|
||||||
|
"""A wrapper for a TextIO which converts from bytes on the fly."""
|
||||||
|
|
||||||
|
def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
|
||||||
|
self.decoder = codecs.getincrementaldecoder(encoding)(errors)
|
||||||
|
self.file = file
|
||||||
|
|
||||||
|
def write(self, b: Union[bytes, bytearray]) -> int:
|
||||||
|
self.file.write(self.decoder.decode(b))
|
||||||
|
return len(b)
|
||||||
|
|
||||||
|
|
||||||
class MatrixFederationHttpClient:
|
class MatrixFederationHttpClient:
|
||||||
"""HTTP client used to talk to other homeservers over the federation
|
"""HTTP client used to talk to other homeservers over the federation
|
||||||
protocol. Send client certificates and signs requests.
|
protocol. Send client certificates and signs requests.
|
||||||
|
|
|
@ -26,6 +26,7 @@ from twisted.web.http import HTTPChannel
|
||||||
|
|
||||||
from synapse.api.errors import RequestSendFailed
|
from synapse.api.errors import RequestSendFailed
|
||||||
from synapse.http.matrixfederationclient import (
|
from synapse.http.matrixfederationclient import (
|
||||||
|
MAX_RESPONSE_SIZE,
|
||||||
MatrixFederationHttpClient,
|
MatrixFederationHttpClient,
|
||||||
MatrixFederationRequest,
|
MatrixFederationRequest,
|
||||||
)
|
)
|
||||||
|
@ -560,3 +561,61 @@ class FederationClientTests(HomeserverTestCase):
|
||||||
|
|
||||||
f = self.failureResultOf(test_d)
|
f = self.failureResultOf(test_d)
|
||||||
self.assertIsInstance(f.value, RequestSendFailed)
|
self.assertIsInstance(f.value, RequestSendFailed)
|
||||||
|
|
||||||
|
def test_too_big(self):
|
||||||
|
"""
|
||||||
|
Test what happens if a huge response is returned from the remote endpoint.
|
||||||
|
"""
|
||||||
|
|
||||||
|
test_d = defer.ensureDeferred(self.cl.get_json("testserv:8008", "foo/bar"))
|
||||||
|
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
# Nothing happened yet
|
||||||
|
self.assertNoResult(test_d)
|
||||||
|
|
||||||
|
# Make sure treq is trying to connect
|
||||||
|
clients = self.reactor.tcpClients
|
||||||
|
self.assertEqual(len(clients), 1)
|
||||||
|
(host, port, factory, _timeout, _bindAddress) = clients[0]
|
||||||
|
self.assertEqual(host, "1.2.3.4")
|
||||||
|
self.assertEqual(port, 8008)
|
||||||
|
|
||||||
|
# complete the connection and wire it up to a fake transport
|
||||||
|
protocol = factory.buildProtocol(None)
|
||||||
|
transport = StringTransport()
|
||||||
|
protocol.makeConnection(transport)
|
||||||
|
|
||||||
|
# that should have made it send the request to the transport
|
||||||
|
self.assertRegex(transport.value(), b"^GET /foo/bar")
|
||||||
|
self.assertRegex(transport.value(), b"Host: testserv:8008")
|
||||||
|
|
||||||
|
# Deferred is still without a result
|
||||||
|
self.assertNoResult(test_d)
|
||||||
|
|
||||||
|
# Send it a huge HTTP response
|
||||||
|
protocol.dataReceived(
|
||||||
|
b"HTTP/1.1 200 OK\r\n"
|
||||||
|
b"Server: Fake\r\n"
|
||||||
|
b"Content-Type: application/json\r\n"
|
||||||
|
b"\r\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.pump()
|
||||||
|
|
||||||
|
# should still be waiting
|
||||||
|
self.assertNoResult(test_d)
|
||||||
|
|
||||||
|
sent = 0
|
||||||
|
chunk_size = 1024 * 512
|
||||||
|
while not test_d.called:
|
||||||
|
protocol.dataReceived(b"a" * chunk_size)
|
||||||
|
sent += chunk_size
|
||||||
|
self.assertLessEqual(sent, MAX_RESPONSE_SIZE)
|
||||||
|
|
||||||
|
self.assertEqual(sent, MAX_RESPONSE_SIZE)
|
||||||
|
|
||||||
|
f = self.failureResultOf(test_d)
|
||||||
|
self.assertIsInstance(f.value, RequestSendFailed)
|
||||||
|
|
||||||
|
self.assertTrue(transport.disconnecting)
|
||||||
|
|
Loading…
Reference in a new issue