mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-12 04:52:26 +01:00
Use a larger, dedicated threadpool for media sending (#17564)
This commit is contained in:
parent
6a11bdf01d
commit
a9fc1fd112
7 changed files with 48 additions and 17 deletions
1
changelog.d/17564.misc
Normal file
1
changelog.d/17564.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Speed up responding to media requests.
|
|
@ -51,15 +51,15 @@ from synapse.api.errors import Codes, cs_error
|
||||||
from synapse.http.server import finish_request, respond_with_json
|
from synapse.http.server import finish_request, respond_with_json
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.context import (
|
from synapse.logging.context import (
|
||||||
defer_to_thread,
|
defer_to_threadpool,
|
||||||
make_deferred_yieldable,
|
make_deferred_yieldable,
|
||||||
run_in_background,
|
run_in_background,
|
||||||
)
|
)
|
||||||
from synapse.types import ISynapseReactor
|
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.stringutils import is_ascii
|
from synapse.util.stringutils import is_ascii
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.databases.main.media_repository import LocalMedia
|
from synapse.storage.databases.main.media_repository import LocalMedia
|
||||||
|
|
||||||
|
|
||||||
|
@ -132,6 +132,7 @@ def respond_404(request: SynapseRequest) -> None:
|
||||||
|
|
||||||
|
|
||||||
async def respond_with_file(
|
async def respond_with_file(
|
||||||
|
hs: "HomeServer",
|
||||||
request: SynapseRequest,
|
request: SynapseRequest,
|
||||||
media_type: str,
|
media_type: str,
|
||||||
file_path: str,
|
file_path: str,
|
||||||
|
@ -148,7 +149,7 @@ async def respond_with_file(
|
||||||
add_file_headers(request, media_type, file_size, upload_name)
|
add_file_headers(request, media_type, file_size, upload_name)
|
||||||
|
|
||||||
with open(file_path, "rb") as f:
|
with open(file_path, "rb") as f:
|
||||||
await ThreadedFileSender(request.reactor).beginFileTransfer(f, request)
|
await ThreadedFileSender(hs).beginFileTransfer(f, request)
|
||||||
|
|
||||||
finish_request(request)
|
finish_request(request)
|
||||||
else:
|
else:
|
||||||
|
@ -632,8 +633,9 @@ class ThreadedFileSender:
|
||||||
# read.
|
# read.
|
||||||
TIMEOUT_SECONDS = 90.0
|
TIMEOUT_SECONDS = 90.0
|
||||||
|
|
||||||
def __init__(self, reactor: ISynapseReactor) -> None:
|
def __init__(self, hs: "HomeServer") -> None:
|
||||||
self.reactor = reactor
|
self.reactor = hs.get_reactor()
|
||||||
|
self.thread_pool = hs.get_media_sender_thread_pool()
|
||||||
|
|
||||||
self.file: Optional[BinaryIO] = None
|
self.file: Optional[BinaryIO] = None
|
||||||
self.deferred: "Deferred[None]" = Deferred()
|
self.deferred: "Deferred[None]" = Deferred()
|
||||||
|
@ -661,7 +663,12 @@ class ThreadedFileSender:
|
||||||
|
|
||||||
# We set the wakeup signal as we should start producing immediately.
|
# We set the wakeup signal as we should start producing immediately.
|
||||||
self.wakeup_event.set()
|
self.wakeup_event.set()
|
||||||
run_in_background(defer_to_thread, self.reactor, self._on_thread_read_loop)
|
run_in_background(
|
||||||
|
defer_to_threadpool,
|
||||||
|
self.reactor,
|
||||||
|
self.thread_pool,
|
||||||
|
self._on_thread_read_loop,
|
||||||
|
)
|
||||||
|
|
||||||
return make_deferred_yieldable(self.deferred)
|
return make_deferred_yieldable(self.deferred)
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ from synapse.media._base import ThreadedFileSender
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||||
|
|
||||||
from ..types import ISynapseReactor, JsonDict
|
from ..types import JsonDict
|
||||||
from ._base import FileInfo, Responder
|
from ._base import FileInfo, Responder
|
||||||
from .filepath import MediaFilePaths
|
from .filepath import MediaFilePaths
|
||||||
|
|
||||||
|
@ -209,7 +209,7 @@ class MediaStorage:
|
||||||
local_path = os.path.join(self.local_media_directory, path)
|
local_path = os.path.join(self.local_media_directory, path)
|
||||||
if os.path.exists(local_path):
|
if os.path.exists(local_path):
|
||||||
logger.debug("responding with local file %s", local_path)
|
logger.debug("responding with local file %s", local_path)
|
||||||
return FileResponder(self.reactor, open(local_path, "rb"))
|
return FileResponder(self.hs, open(local_path, "rb"))
|
||||||
logger.debug("local file %s did not exist", local_path)
|
logger.debug("local file %s did not exist", local_path)
|
||||||
|
|
||||||
for provider in self.storage_providers:
|
for provider in self.storage_providers:
|
||||||
|
@ -332,14 +332,12 @@ class FileResponder(Responder):
|
||||||
is closed when finished streaming.
|
is closed when finished streaming.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, reactor: ISynapseReactor, open_file: BinaryIO):
|
def __init__(self, hs: "HomeServer", open_file: BinaryIO):
|
||||||
self.reactor = reactor
|
self.hs = hs
|
||||||
self.open_file = open_file
|
self.open_file = open_file
|
||||||
|
|
||||||
def write_to_consumer(self, consumer: IConsumer) -> Deferred:
|
def write_to_consumer(self, consumer: IConsumer) -> Deferred:
|
||||||
return ThreadedFileSender(self.reactor).beginFileTransfer(
|
return ThreadedFileSender(self.hs).beginFileTransfer(self.open_file, consumer)
|
||||||
self.open_file, consumer
|
|
||||||
)
|
|
||||||
|
|
||||||
def __exit__(
|
def __exit__(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -178,7 +178,7 @@ class FileStorageProviderBackend(StorageProvider):
|
||||||
|
|
||||||
backup_fname = os.path.join(self.base_directory, path)
|
backup_fname = os.path.join(self.base_directory, path)
|
||||||
if os.path.isfile(backup_fname):
|
if os.path.isfile(backup_fname):
|
||||||
return FileResponder(self.reactor, open(backup_fname, "rb"))
|
return FileResponder(self.hs, open(backup_fname, "rb"))
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
|
@ -374,11 +374,11 @@ class ThumbnailProvider:
|
||||||
await respond_with_multipart_responder(
|
await respond_with_multipart_responder(
|
||||||
self.hs.get_clock(),
|
self.hs.get_clock(),
|
||||||
request,
|
request,
|
||||||
FileResponder(self.reactor, open(file_path, "rb")),
|
FileResponder(self.hs, open(file_path, "rb")),
|
||||||
media_info,
|
media_info,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await respond_with_file(request, desired_type, file_path)
|
await respond_with_file(self.hs, request, desired_type, file_path)
|
||||||
else:
|
else:
|
||||||
logger.warning("Failed to generate thumbnail")
|
logger.warning("Failed to generate thumbnail")
|
||||||
raise SynapseError(400, "Failed to generate thumbnail.")
|
raise SynapseError(400, "Failed to generate thumbnail.")
|
||||||
|
@ -456,7 +456,7 @@ class ThumbnailProvider:
|
||||||
)
|
)
|
||||||
|
|
||||||
if file_path:
|
if file_path:
|
||||||
await respond_with_file(request, desired_type, file_path)
|
await respond_with_file(self.hs, request, desired_type, file_path)
|
||||||
else:
|
else:
|
||||||
logger.warning("Failed to generate thumbnail")
|
logger.warning("Failed to generate thumbnail")
|
||||||
raise SynapseError(400, "Failed to generate thumbnail.")
|
raise SynapseError(400, "Failed to generate thumbnail.")
|
||||||
|
|
|
@ -34,6 +34,7 @@ from typing_extensions import TypeAlias
|
||||||
|
|
||||||
from twisted.internet.interfaces import IOpenSSLContextFactory
|
from twisted.internet.interfaces import IOpenSSLContextFactory
|
||||||
from twisted.internet.tcp import Port
|
from twisted.internet.tcp import Port
|
||||||
|
from twisted.python.threadpool import ThreadPool
|
||||||
from twisted.web.iweb import IPolicyForHTTPS
|
from twisted.web.iweb import IPolicyForHTTPS
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
|
@ -941,3 +942,21 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_task_scheduler(self) -> TaskScheduler:
|
def get_task_scheduler(self) -> TaskScheduler:
|
||||||
return TaskScheduler(self)
|
return TaskScheduler(self)
|
||||||
|
|
||||||
|
@cache_in_self
|
||||||
|
def get_media_sender_thread_pool(self) -> ThreadPool:
|
||||||
|
"""Fetch the threadpool used to read files when responding to media
|
||||||
|
download requests."""
|
||||||
|
|
||||||
|
# We can choose a large threadpool size as these threads predominately
|
||||||
|
# do IO rather than CPU work.
|
||||||
|
media_threadpool = ThreadPool(
|
||||||
|
name="media_threadpool", minthreads=1, maxthreads=50
|
||||||
|
)
|
||||||
|
|
||||||
|
media_threadpool.start()
|
||||||
|
self.get_reactor().addSystemEventTrigger(
|
||||||
|
"during", "shutdown", media_threadpool.stop
|
||||||
|
)
|
||||||
|
|
||||||
|
return media_threadpool
|
||||||
|
|
|
@ -1166,6 +1166,12 @@ def setup_test_homeserver(
|
||||||
|
|
||||||
hs.get_auth_handler().validate_hash = validate_hash # type: ignore[assignment]
|
hs.get_auth_handler().validate_hash = validate_hash # type: ignore[assignment]
|
||||||
|
|
||||||
|
# We need to replace the media threadpool with the fake test threadpool.
|
||||||
|
def thread_pool() -> threadpool.ThreadPool:
|
||||||
|
return reactor.getThreadPool()
|
||||||
|
|
||||||
|
hs.get_media_sender_thread_pool = thread_pool # type: ignore[method-assign]
|
||||||
|
|
||||||
# Load any configured modules into the homeserver
|
# Load any configured modules into the homeserver
|
||||||
module_api = hs.get_module_api()
|
module_api = hs.get_module_api()
|
||||||
for module, module_config in hs.config.modules.loaded_modules:
|
for module, module_config in hs.config.modules.loaded_modules:
|
||||||
|
|
Loading…
Reference in a new issue