From a51daffba5e58489f93f76a074aa7d6f73533226 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Aug 2024 12:41:53 +0100 Subject: [PATCH] Reduce concurrent thread usage in media (#17567) Follow on from #17558 Basically, we want to reduce the number of threads we want to use at a time, i.e. reduce the number of threads that are paused/blocked. We do this by returning from the thread when the consumer pauses the producer, rather than pausing in the thread. --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/17567.misc | 1 + synapse/media/_base.py | 91 +++++++++++++++++++---------------- synapse/util/async_helpers.py | 43 +++++++++++++++++ 3 files changed, 93 insertions(+), 42 deletions(-) create mode 100644 changelog.d/17567.misc diff --git a/changelog.d/17567.misc b/changelog.d/17567.misc new file mode 100644 index 000000000..cfa8089a8 --- /dev/null +++ b/changelog.d/17567.misc @@ -0,0 +1 @@ +Speed up responding to media requests. diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 89dea3916..fdbbe2947 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -22,7 +22,6 @@ import logging import os -import threading import urllib from abc import ABC, abstractmethod from types import TracebackType @@ -56,6 +55,7 @@ from synapse.logging.context import ( run_in_background, ) from synapse.util import Clock +from synapse.util.async_helpers import DeferredEvent from synapse.util.stringutils import is_ascii if TYPE_CHECKING: @@ -620,10 +620,13 @@ class ThreadedFileSender: A producer that sends the contents of a file to a consumer, reading from the file on a thread. - This works by spawning a loop in a threadpool that repeatedly reads from the - file and sends it to the consumer. The main thread communicates with the - loop via two `threading.Event`, which controls when to start/pause reading - and when to terminate. + This works by having a loop in a threadpool repeatedly reading from the + file, until the consumer pauses the producer. There is then a loop in the + main thread that waits until the consumer resumes the producer and then + starts reading in the threadpool again. + + This is done to ensure that we're never waiting in the threadpool, as + otherwise its easy to starve it of threads. """ # How much data to read in one go. @@ -643,12 +646,11 @@ class ThreadedFileSender: # Signals if the thread should keep reading/sending data. Set means # continue, clear means pause. - self.wakeup_event = threading.Event() + self.wakeup_event = DeferredEvent(self.reactor) # Signals if the thread should terminate, e.g. because the consumer has - # gone away. Both this and `wakeup_event` should be set to terminate the - # loop (otherwise the thread will block on `wakeup_event`). - self.stop_event = threading.Event() + # gone away. + self.stop_writing = False def beginFileTransfer( self, file: BinaryIO, consumer: interfaces.IConsumer @@ -663,12 +665,7 @@ class ThreadedFileSender: # We set the wakeup signal as we should start producing immediately. self.wakeup_event.set() - run_in_background( - defer_to_threadpool, - self.reactor, - self.thread_pool, - self._on_thread_read_loop, - ) + run_in_background(self.start_read_loop) return make_deferred_yieldable(self.deferred) @@ -686,42 +683,52 @@ class ThreadedFileSender: # Unregister the consumer so we don't try and interact with it again. self.consumer = None - # Terminate the thread loop. + # Terminate the loop. + self.stop_writing = True self.wakeup_event.set() - self.stop_event.set() if not self.deferred.called: self.deferred.errback(Exception("Consumer asked us to stop producing")) - def _on_thread_read_loop(self) -> None: - """This is the loop that happens on a thread.""" - + async def start_read_loop(self) -> None: + """This is the loop that drives reading/writing""" try: - while not self.stop_event.is_set(): - # We wait for the producer to signal that the consumer wants - # more data (or we should abort) + while not self.stop_writing: + # Start the loop in the threadpool to read data. + more_data = await defer_to_threadpool( + self.reactor, self.thread_pool, self._on_thread_read_loop + ) + if not more_data: + # Reached EOF, we can just return. + return + if not self.wakeup_event.is_set(): - ret = self.wakeup_event.wait(self.TIMEOUT_SECONDS) + ret = await self.wakeup_event.wait(self.TIMEOUT_SECONDS) if not ret: raise Exception("Timed out waiting to resume") - - # Check if we were woken up so that we abort the download - if self.stop_event.is_set(): - return - - # The file should always have been set before we get here. - assert self.file is not None - - chunk = self.file.read(self.CHUNK_SIZE) - if not chunk: - return - - self.reactor.callFromThread(self._write, chunk) - except Exception: - self.reactor.callFromThread(self._error, Failure()) + self._error(Failure()) finally: - self.reactor.callFromThread(self._finish) + self._finish() + + def _on_thread_read_loop(self) -> bool: + """This is the loop that happens on a thread. + + Returns: + Whether there is more data to send. + """ + + while not self.stop_writing and self.wakeup_event.is_set(): + # The file should always have been set before we get here. + assert self.file is not None + + chunk = self.file.read(self.CHUNK_SIZE) + if not chunk: + return False + + self.reactor.callFromThread(self._write, chunk) + + return True def _write(self, chunk: bytes) -> None: """Called from the thread to write a chunk of data""" @@ -729,7 +736,7 @@ class ThreadedFileSender: self.consumer.write(chunk) def _error(self, failure: Failure) -> None: - """Called from the thread when there was a fatal error""" + """Called when there was a fatal error""" if self.consumer: self.consumer.unregisterProducer() self.consumer = None @@ -738,7 +745,7 @@ class ThreadedFileSender: self.deferred.errback(failure) def _finish(self) -> None: - """Called from the thread when it finishes (either on success or + """Called when we have finished writing (either on success or failure).""" if self.file: self.file.close() diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 70139beef..8618bb065 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -885,3 +885,46 @@ class AwakenableSleeper: # Cancel the sleep if we were woken up if call.active(): call.cancel() + + +class DeferredEvent: + """Like threading.Event but for async code""" + + def __init__(self, reactor: IReactorTime) -> None: + self._reactor = reactor + self._deferred: "defer.Deferred[None]" = defer.Deferred() + + def set(self) -> None: + if not self._deferred.called: + self._deferred.callback(None) + + def clear(self) -> None: + if self._deferred.called: + self._deferred = defer.Deferred() + + def is_set(self) -> bool: + return self._deferred.called + + async def wait(self, timeout_seconds: float) -> bool: + if self.is_set(): + return True + + # Create a deferred that gets called in N seconds + sleep_deferred: "defer.Deferred[None]" = defer.Deferred() + call = self._reactor.callLater(timeout_seconds, sleep_deferred.callback, None) + + try: + await make_deferred_yieldable( + defer.DeferredList( + [sleep_deferred, self._deferred], + fireOnOneCallback=True, + fireOnOneErrback=True, + consumeErrors=True, + ) + ) + finally: + # Cancel the sleep if we were woken up + if call.active(): + call.cancel() + + return self.is_set()