forked from MirrorHub/synapse
Iteratively encode JSON responses to avoid blocking the reactor. (#8013)
This commit is contained in:
parent
25e55d2598
commit
2f4d60a5ba
5 changed files with 94 additions and 13 deletions
1
changelog.d/8013.feature
Normal file
1
changelog.d/8013.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Iteratively encode JSON to avoid blocking the reactor.
|
|
@ -22,12 +22,13 @@ import types
|
||||||
import urllib
|
import urllib
|
||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Any, Callable, Dict, Tuple, Union
|
from typing import Any, Callable, Dict, Iterator, List, Tuple, Union
|
||||||
|
|
||||||
import jinja2
|
import jinja2
|
||||||
from canonicaljson import encode_canonical_json, encode_pretty_printed_json
|
from canonicaljson import iterencode_canonical_json, iterencode_pretty_printed_json
|
||||||
|
from zope.interface import implementer
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer, interfaces
|
||||||
from twisted.python import failure
|
from twisted.python import failure
|
||||||
from twisted.web import resource
|
from twisted.web import resource
|
||||||
from twisted.web.server import NOT_DONE_YET, Request
|
from twisted.web.server import NOT_DONE_YET, Request
|
||||||
|
@ -499,6 +500,78 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@implementer(interfaces.IPullProducer)
|
||||||
|
class _ByteProducer:
|
||||||
|
"""
|
||||||
|
Iteratively write bytes to the request.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# The minimum number of bytes for each chunk. Note that the last chunk will
|
||||||
|
# usually be smaller than this.
|
||||||
|
min_chunk_size = 1024
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, request: Request, iterator: Iterator[bytes],
|
||||||
|
):
|
||||||
|
self._request = request
|
||||||
|
self._iterator = iterator
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
self._request.registerProducer(self, False)
|
||||||
|
|
||||||
|
def _send_data(self, data: List[bytes]) -> None:
|
||||||
|
"""
|
||||||
|
Send a list of strings as a response to the request.
|
||||||
|
"""
|
||||||
|
if not data:
|
||||||
|
return
|
||||||
|
self._request.write(b"".join(data))
|
||||||
|
|
||||||
|
def resumeProducing(self) -> None:
|
||||||
|
# We've stopped producing in the meantime (note that this might be
|
||||||
|
# re-entrant after calling write).
|
||||||
|
if not self._request:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get the next chunk and write it to the request.
|
||||||
|
#
|
||||||
|
# The output of the JSON encoder is coalesced until min_chunk_size is
|
||||||
|
# reached. (This is because JSON encoders produce a very small output
|
||||||
|
# per iteration.)
|
||||||
|
#
|
||||||
|
# Note that buffer stores a list of bytes (instead of appending to
|
||||||
|
# bytes) to hopefully avoid many allocations.
|
||||||
|
buffer = []
|
||||||
|
buffered_bytes = 0
|
||||||
|
while buffered_bytes < self.min_chunk_size:
|
||||||
|
try:
|
||||||
|
data = next(self._iterator)
|
||||||
|
buffer.append(data)
|
||||||
|
buffered_bytes += len(data)
|
||||||
|
except StopIteration:
|
||||||
|
# The entire JSON object has been serialized, write any
|
||||||
|
# remaining data, finalize the producer and the request, and
|
||||||
|
# clean-up any references.
|
||||||
|
self._send_data(buffer)
|
||||||
|
self._request.unregisterProducer()
|
||||||
|
self._request.finish()
|
||||||
|
self.stopProducing()
|
||||||
|
return
|
||||||
|
|
||||||
|
self._send_data(buffer)
|
||||||
|
|
||||||
|
def stopProducing(self) -> None:
|
||||||
|
self._request = None
|
||||||
|
|
||||||
|
|
||||||
|
def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
|
||||||
|
"""
|
||||||
|
Encode an object into JSON. Returns an iterator of bytes.
|
||||||
|
"""
|
||||||
|
for chunk in json_encoder.iterencode(json_object):
|
||||||
|
yield chunk.encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
def respond_with_json(
|
def respond_with_json(
|
||||||
request: Request,
|
request: Request,
|
||||||
code: int,
|
code: int,
|
||||||
|
@ -533,15 +606,23 @@ def respond_with_json(
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if pretty_print:
|
if pretty_print:
|
||||||
json_bytes = encode_pretty_printed_json(json_object) + b"\n"
|
encoder = iterencode_pretty_printed_json
|
||||||
else:
|
else:
|
||||||
if canonical_json or synapse.events.USE_FROZEN_DICTS:
|
if canonical_json or synapse.events.USE_FROZEN_DICTS:
|
||||||
# canonicaljson already encodes to bytes
|
encoder = iterencode_canonical_json
|
||||||
json_bytes = encode_canonical_json(json_object)
|
|
||||||
else:
|
else:
|
||||||
json_bytes = json_encoder.encode(json_object).encode("utf-8")
|
encoder = _encode_json_bytes
|
||||||
|
|
||||||
return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors)
|
request.setResponseCode(code)
|
||||||
|
request.setHeader(b"Content-Type", b"application/json")
|
||||||
|
request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
|
||||||
|
|
||||||
|
if send_cors:
|
||||||
|
set_cors_headers(request)
|
||||||
|
|
||||||
|
producer = _ByteProducer(request, encoder(json_object))
|
||||||
|
producer.start()
|
||||||
|
return NOT_DONE_YET
|
||||||
|
|
||||||
|
|
||||||
def respond_with_json_bytes(
|
def respond_with_json_bytes(
|
||||||
|
|
|
@ -43,7 +43,7 @@ REQUIREMENTS = [
|
||||||
"jsonschema>=2.5.1",
|
"jsonschema>=2.5.1",
|
||||||
"frozendict>=1",
|
"frozendict>=1",
|
||||||
"unpaddedbase64>=1.1.0",
|
"unpaddedbase64>=1.1.0",
|
||||||
"canonicaljson>=1.2.0",
|
"canonicaljson>=1.3.0",
|
||||||
# we use the type definitions added in signedjson 1.1.
|
# we use the type definitions added in signedjson 1.1.
|
||||||
"signedjson>=1.1.0",
|
"signedjson>=1.1.0",
|
||||||
"pynacl>=1.2.1",
|
"pynacl>=1.2.1",
|
||||||
|
|
|
@ -15,12 +15,12 @@
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Set
|
from typing import Dict, Set
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json, json
|
from canonicaljson import json
|
||||||
from signedjson.sign import sign_json
|
from signedjson.sign import sign_json
|
||||||
|
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
from synapse.crypto.keyring import ServerKeyFetcher
|
from synapse.crypto.keyring import ServerKeyFetcher
|
||||||
from synapse.http.server import DirectServeJsonResource, respond_with_json_bytes
|
from synapse.http.server import DirectServeJsonResource, respond_with_json
|
||||||
from synapse.http.servlet import parse_integer, parse_json_object_from_request
|
from synapse.http.servlet import parse_integer, parse_json_object_from_request
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -223,4 +223,4 @@ class RemoteKey(DirectServeJsonResource):
|
||||||
|
|
||||||
results = {"server_keys": signed_keys}
|
results = {"server_keys": signed_keys}
|
||||||
|
|
||||||
respond_with_json_bytes(request, 200, encode_canonical_json(results))
|
respond_with_json(request, 200, results, canonical_json=True)
|
||||||
|
|
|
@ -178,7 +178,6 @@ class JsonResourceTests(unittest.TestCase):
|
||||||
|
|
||||||
self.assertEqual(channel.result["code"], b"200")
|
self.assertEqual(channel.result["code"], b"200")
|
||||||
self.assertNotIn("body", channel.result)
|
self.assertNotIn("body", channel.result)
|
||||||
self.assertEqual(channel.headers.getRawHeaders(b"Content-Length"), [b"15"])
|
|
||||||
|
|
||||||
|
|
||||||
class OptionsResourceTests(unittest.TestCase):
|
class OptionsResourceTests(unittest.TestCase):
|
||||||
|
|
Loading…
Reference in a new issue