mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-13 21:41:30 +01:00
Reduce concurrent thread usage in media (#17567)
Follow on from #17558 Basically, we want to reduce the number of threads we want to use at a time, i.e. reduce the number of threads that are paused/blocked. We do this by returning from the thread when the consumer pauses the producer, rather than pausing in the thread. --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
parent
b05b2e14bb
commit
a51daffba5
3 changed files with 93 additions and 42 deletions
1
changelog.d/17567.misc
Normal file
1
changelog.d/17567.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Speed up responding to media requests.
|
|
@ -22,7 +22,6 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import threading
|
|
||||||
import urllib
|
import urllib
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
|
@ -56,6 +55,7 @@ from synapse.logging.context import (
|
||||||
run_in_background,
|
run_in_background,
|
||||||
)
|
)
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
from synapse.util.async_helpers import DeferredEvent
|
||||||
from synapse.util.stringutils import is_ascii
|
from synapse.util.stringutils import is_ascii
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -620,10 +620,13 @@ class ThreadedFileSender:
|
||||||
A producer that sends the contents of a file to a consumer, reading from the
|
A producer that sends the contents of a file to a consumer, reading from the
|
||||||
file on a thread.
|
file on a thread.
|
||||||
|
|
||||||
This works by spawning a loop in a threadpool that repeatedly reads from the
|
This works by having a loop in a threadpool repeatedly reading from the
|
||||||
file and sends it to the consumer. The main thread communicates with the
|
file, until the consumer pauses the producer. There is then a loop in the
|
||||||
loop via two `threading.Event`, which controls when to start/pause reading
|
main thread that waits until the consumer resumes the producer and then
|
||||||
and when to terminate.
|
starts reading in the threadpool again.
|
||||||
|
|
||||||
|
This is done to ensure that we're never waiting in the threadpool, as
|
||||||
|
otherwise its easy to starve it of threads.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# How much data to read in one go.
|
# How much data to read in one go.
|
||||||
|
@ -643,12 +646,11 @@ class ThreadedFileSender:
|
||||||
|
|
||||||
# Signals if the thread should keep reading/sending data. Set means
|
# Signals if the thread should keep reading/sending data. Set means
|
||||||
# continue, clear means pause.
|
# continue, clear means pause.
|
||||||
self.wakeup_event = threading.Event()
|
self.wakeup_event = DeferredEvent(self.reactor)
|
||||||
|
|
||||||
# Signals if the thread should terminate, e.g. because the consumer has
|
# Signals if the thread should terminate, e.g. because the consumer has
|
||||||
# gone away. Both this and `wakeup_event` should be set to terminate the
|
# gone away.
|
||||||
# loop (otherwise the thread will block on `wakeup_event`).
|
self.stop_writing = False
|
||||||
self.stop_event = threading.Event()
|
|
||||||
|
|
||||||
def beginFileTransfer(
|
def beginFileTransfer(
|
||||||
self, file: BinaryIO, consumer: interfaces.IConsumer
|
self, file: BinaryIO, consumer: interfaces.IConsumer
|
||||||
|
@ -663,12 +665,7 @@ class ThreadedFileSender:
|
||||||
|
|
||||||
# We set the wakeup signal as we should start producing immediately.
|
# We set the wakeup signal as we should start producing immediately.
|
||||||
self.wakeup_event.set()
|
self.wakeup_event.set()
|
||||||
run_in_background(
|
run_in_background(self.start_read_loop)
|
||||||
defer_to_threadpool,
|
|
||||||
self.reactor,
|
|
||||||
self.thread_pool,
|
|
||||||
self._on_thread_read_loop,
|
|
||||||
)
|
|
||||||
|
|
||||||
return make_deferred_yieldable(self.deferred)
|
return make_deferred_yieldable(self.deferred)
|
||||||
|
|
||||||
|
@ -686,42 +683,52 @@ class ThreadedFileSender:
|
||||||
# Unregister the consumer so we don't try and interact with it again.
|
# Unregister the consumer so we don't try and interact with it again.
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
|
|
||||||
# Terminate the thread loop.
|
# Terminate the loop.
|
||||||
|
self.stop_writing = True
|
||||||
self.wakeup_event.set()
|
self.wakeup_event.set()
|
||||||
self.stop_event.set()
|
|
||||||
|
|
||||||
if not self.deferred.called:
|
if not self.deferred.called:
|
||||||
self.deferred.errback(Exception("Consumer asked us to stop producing"))
|
self.deferred.errback(Exception("Consumer asked us to stop producing"))
|
||||||
|
|
||||||
def _on_thread_read_loop(self) -> None:
|
async def start_read_loop(self) -> None:
|
||||||
"""This is the loop that happens on a thread."""
|
"""This is the loop that drives reading/writing"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while not self.stop_event.is_set():
|
while not self.stop_writing:
|
||||||
# We wait for the producer to signal that the consumer wants
|
# Start the loop in the threadpool to read data.
|
||||||
# more data (or we should abort)
|
more_data = await defer_to_threadpool(
|
||||||
|
self.reactor, self.thread_pool, self._on_thread_read_loop
|
||||||
|
)
|
||||||
|
if not more_data:
|
||||||
|
# Reached EOF, we can just return.
|
||||||
|
return
|
||||||
|
|
||||||
if not self.wakeup_event.is_set():
|
if not self.wakeup_event.is_set():
|
||||||
ret = self.wakeup_event.wait(self.TIMEOUT_SECONDS)
|
ret = await self.wakeup_event.wait(self.TIMEOUT_SECONDS)
|
||||||
if not ret:
|
if not ret:
|
||||||
raise Exception("Timed out waiting to resume")
|
raise Exception("Timed out waiting to resume")
|
||||||
|
|
||||||
# Check if we were woken up so that we abort the download
|
|
||||||
if self.stop_event.is_set():
|
|
||||||
return
|
|
||||||
|
|
||||||
# The file should always have been set before we get here.
|
|
||||||
assert self.file is not None
|
|
||||||
|
|
||||||
chunk = self.file.read(self.CHUNK_SIZE)
|
|
||||||
if not chunk:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.reactor.callFromThread(self._write, chunk)
|
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
self.reactor.callFromThread(self._error, Failure())
|
self._error(Failure())
|
||||||
finally:
|
finally:
|
||||||
self.reactor.callFromThread(self._finish)
|
self._finish()
|
||||||
|
|
||||||
|
def _on_thread_read_loop(self) -> bool:
|
||||||
|
"""This is the loop that happens on a thread.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Whether there is more data to send.
|
||||||
|
"""
|
||||||
|
|
||||||
|
while not self.stop_writing and self.wakeup_event.is_set():
|
||||||
|
# The file should always have been set before we get here.
|
||||||
|
assert self.file is not None
|
||||||
|
|
||||||
|
chunk = self.file.read(self.CHUNK_SIZE)
|
||||||
|
if not chunk:
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.reactor.callFromThread(self._write, chunk)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def _write(self, chunk: bytes) -> None:
|
def _write(self, chunk: bytes) -> None:
|
||||||
"""Called from the thread to write a chunk of data"""
|
"""Called from the thread to write a chunk of data"""
|
||||||
|
@ -729,7 +736,7 @@ class ThreadedFileSender:
|
||||||
self.consumer.write(chunk)
|
self.consumer.write(chunk)
|
||||||
|
|
||||||
def _error(self, failure: Failure) -> None:
|
def _error(self, failure: Failure) -> None:
|
||||||
"""Called from the thread when there was a fatal error"""
|
"""Called when there was a fatal error"""
|
||||||
if self.consumer:
|
if self.consumer:
|
||||||
self.consumer.unregisterProducer()
|
self.consumer.unregisterProducer()
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
|
@ -738,7 +745,7 @@ class ThreadedFileSender:
|
||||||
self.deferred.errback(failure)
|
self.deferred.errback(failure)
|
||||||
|
|
||||||
def _finish(self) -> None:
|
def _finish(self) -> None:
|
||||||
"""Called from the thread when it finishes (either on success or
|
"""Called when we have finished writing (either on success or
|
||||||
failure)."""
|
failure)."""
|
||||||
if self.file:
|
if self.file:
|
||||||
self.file.close()
|
self.file.close()
|
||||||
|
|
|
@ -885,3 +885,46 @@ class AwakenableSleeper:
|
||||||
# Cancel the sleep if we were woken up
|
# Cancel the sleep if we were woken up
|
||||||
if call.active():
|
if call.active():
|
||||||
call.cancel()
|
call.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
class DeferredEvent:
|
||||||
|
"""Like threading.Event but for async code"""
|
||||||
|
|
||||||
|
def __init__(self, reactor: IReactorTime) -> None:
|
||||||
|
self._reactor = reactor
|
||||||
|
self._deferred: "defer.Deferred[None]" = defer.Deferred()
|
||||||
|
|
||||||
|
def set(self) -> None:
|
||||||
|
if not self._deferred.called:
|
||||||
|
self._deferred.callback(None)
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
if self._deferred.called:
|
||||||
|
self._deferred = defer.Deferred()
|
||||||
|
|
||||||
|
def is_set(self) -> bool:
|
||||||
|
return self._deferred.called
|
||||||
|
|
||||||
|
async def wait(self, timeout_seconds: float) -> bool:
|
||||||
|
if self.is_set():
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Create a deferred that gets called in N seconds
|
||||||
|
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
|
||||||
|
call = self._reactor.callLater(timeout_seconds, sleep_deferred.callback, None)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await make_deferred_yieldable(
|
||||||
|
defer.DeferredList(
|
||||||
|
[sleep_deferred, self._deferred],
|
||||||
|
fireOnOneCallback=True,
|
||||||
|
fireOnOneErrback=True,
|
||||||
|
consumeErrors=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
# Cancel the sleep if we were woken up
|
||||||
|
if call.active():
|
||||||
|
call.cancel()
|
||||||
|
|
||||||
|
return self.is_set()
|
||||||
|
|
Loading…
Reference in a new issue