diff --git a/changelog.d/17564.misc b/changelog.d/17564.misc new file mode 100644 index 000000000..cfa8089a8 --- /dev/null +++ b/changelog.d/17564.misc @@ -0,0 +1 @@ +Speed up responding to media requests. diff --git a/synapse/media/_base.py b/synapse/media/_base.py index ad80098e9..89dea3916 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -51,15 +51,15 @@ from synapse.api.errors import Codes, cs_error from synapse.http.server import finish_request, respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import ( - defer_to_thread, + defer_to_threadpool, make_deferred_yieldable, run_in_background, ) -from synapse.types import ISynapseReactor from synapse.util import Clock from synapse.util.stringutils import is_ascii if TYPE_CHECKING: + from synapse.server import HomeServer from synapse.storage.databases.main.media_repository import LocalMedia @@ -132,6 +132,7 @@ def respond_404(request: SynapseRequest) -> None: async def respond_with_file( + hs: "HomeServer", request: SynapseRequest, media_type: str, file_path: str, @@ -148,7 +149,7 @@ async def respond_with_file( add_file_headers(request, media_type, file_size, upload_name) with open(file_path, "rb") as f: - await ThreadedFileSender(request.reactor).beginFileTransfer(f, request) + await ThreadedFileSender(hs).beginFileTransfer(f, request) finish_request(request) else: @@ -632,8 +633,9 @@ class ThreadedFileSender: # read. TIMEOUT_SECONDS = 90.0 - def __init__(self, reactor: ISynapseReactor) -> None: - self.reactor = reactor + def __init__(self, hs: "HomeServer") -> None: + self.reactor = hs.get_reactor() + self.thread_pool = hs.get_media_sender_thread_pool() self.file: Optional[BinaryIO] = None self.deferred: "Deferred[None]" = Deferred() @@ -661,7 +663,12 @@ class ThreadedFileSender: # We set the wakeup signal as we should start producing immediately. self.wakeup_event.set() - run_in_background(defer_to_thread, self.reactor, self._on_thread_read_loop) + run_in_background( + defer_to_threadpool, + self.reactor, + self.thread_pool, + self._on_thread_read_loop, + ) return make_deferred_yieldable(self.deferred) diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index e06273c92..cf4208eb7 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -57,7 +57,7 @@ from synapse.media._base import ThreadedFileSender from synapse.util import Clock from synapse.util.file_consumer import BackgroundFileConsumer -from ..types import ISynapseReactor, JsonDict +from ..types import JsonDict from ._base import FileInfo, Responder from .filepath import MediaFilePaths @@ -209,7 +209,7 @@ class MediaStorage: local_path = os.path.join(self.local_media_directory, path) if os.path.exists(local_path): logger.debug("responding with local file %s", local_path) - return FileResponder(self.reactor, open(local_path, "rb")) + return FileResponder(self.hs, open(local_path, "rb")) logger.debug("local file %s did not exist", local_path) for provider in self.storage_providers: @@ -332,14 +332,12 @@ class FileResponder(Responder): is closed when finished streaming. """ - def __init__(self, reactor: ISynapseReactor, open_file: BinaryIO): - self.reactor = reactor + def __init__(self, hs: "HomeServer", open_file: BinaryIO): + self.hs = hs self.open_file = open_file def write_to_consumer(self, consumer: IConsumer) -> Deferred: - return ThreadedFileSender(self.reactor).beginFileTransfer( - self.open_file, consumer - ) + return ThreadedFileSender(self.hs).beginFileTransfer(self.open_file, consumer) def __exit__( self, diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py index 355df999d..300952025 100644 --- a/synapse/media/storage_provider.py +++ b/synapse/media/storage_provider.py @@ -178,7 +178,7 @@ class FileStorageProviderBackend(StorageProvider): backup_fname = os.path.join(self.base_directory, path) if os.path.isfile(backup_fname): - return FileResponder(self.reactor, open(backup_fname, "rb")) + return FileResponder(self.hs, open(backup_fname, "rb")) return None diff --git a/synapse/media/thumbnailer.py b/synapse/media/thumbnailer.py index 3380315b2..042851021 100644 --- a/synapse/media/thumbnailer.py +++ b/synapse/media/thumbnailer.py @@ -374,11 +374,11 @@ class ThumbnailProvider: await respond_with_multipart_responder( self.hs.get_clock(), request, - FileResponder(self.reactor, open(file_path, "rb")), + FileResponder(self.hs, open(file_path, "rb")), media_info, ) else: - await respond_with_file(request, desired_type, file_path) + await respond_with_file(self.hs, request, desired_type, file_path) else: logger.warning("Failed to generate thumbnail") raise SynapseError(400, "Failed to generate thumbnail.") @@ -456,7 +456,7 @@ class ThumbnailProvider: ) if file_path: - await respond_with_file(request, desired_type, file_path) + await respond_with_file(self.hs, request, desired_type, file_path) else: logger.warning("Failed to generate thumbnail") raise SynapseError(400, "Failed to generate thumbnail.") diff --git a/synapse/server.py b/synapse/server.py index 46b9d83a0..8b07bb39a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -34,6 +34,7 @@ from typing_extensions import TypeAlias from twisted.internet.interfaces import IOpenSSLContextFactory from twisted.internet.tcp import Port +from twisted.python.threadpool import ThreadPool from twisted.web.iweb import IPolicyForHTTPS from twisted.web.resource import Resource @@ -941,3 +942,21 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_task_scheduler(self) -> TaskScheduler: return TaskScheduler(self) + + @cache_in_self + def get_media_sender_thread_pool(self) -> ThreadPool: + """Fetch the threadpool used to read files when responding to media + download requests.""" + + # We can choose a large threadpool size as these threads predominately + # do IO rather than CPU work. + media_threadpool = ThreadPool( + name="media_threadpool", minthreads=1, maxthreads=50 + ) + + media_threadpool.start() + self.get_reactor().addSystemEventTrigger( + "during", "shutdown", media_threadpool.stop + ) + + return media_threadpool diff --git a/tests/server.py b/tests/server.py index 3e377585c..95aff6f66 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1166,6 +1166,12 @@ def setup_test_homeserver( hs.get_auth_handler().validate_hash = validate_hash # type: ignore[assignment] + # We need to replace the media threadpool with the fake test threadpool. + def thread_pool() -> threadpool.ThreadPool: + return reactor.getThreadPool() + + hs.get_media_sender_thread_pool = thread_pool # type: ignore[method-assign] + # Load any configured modules into the homeserver module_api = hs.get_module_api() for module, module_config in hs.config.modules.loaded_modules: