forked from MirrorHub/synapse
Add config options for media retention (#12732)
This commit is contained in:
parent
641908f72f
commit
2fc787c341
5 changed files with 353 additions and 2 deletions
1
changelog.d/12732.feature
Normal file
1
changelog.d/12732.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add new `media_retention` options to the homeserver config for routinely cleaning up non-recently accessed media.
|
|
@ -1459,7 +1459,7 @@ federation_rr_transactions_per_room_per_second: 40
|
||||||
```
|
```
|
||||||
---
|
---
|
||||||
## Media Store ##
|
## Media Store ##
|
||||||
Config options relating to Synapse media store.
|
Config options related to Synapse's media store.
|
||||||
|
|
||||||
---
|
---
|
||||||
Config option: `enable_media_repo`
|
Config option: `enable_media_repo`
|
||||||
|
@ -1563,6 +1563,33 @@ thumbnail_sizes:
|
||||||
height: 600
|
height: 600
|
||||||
method: scale
|
method: scale
|
||||||
```
|
```
|
||||||
|
---
|
||||||
|
Config option: `media_retention`
|
||||||
|
|
||||||
|
Controls whether local media and entries in the remote media cache
|
||||||
|
(media that is downloaded from other homeservers) should be removed
|
||||||
|
under certain conditions, typically for the purpose of saving space.
|
||||||
|
|
||||||
|
Purging media files will be the carried out by the media worker
|
||||||
|
(that is, the worker that has the `enable_media_repo` homeserver config
|
||||||
|
option set to 'true'). This may be the main process.
|
||||||
|
|
||||||
|
The `media_retention.local_media_lifetime` and
|
||||||
|
`media_retention.remote_media_lifetime` config options control whether
|
||||||
|
media will be purged if it has not been accessed in a given amount of
|
||||||
|
time. Note that media is 'accessed' when loaded in a room in a client, or
|
||||||
|
otherwise downloaded by a local or remote user. If the media has never
|
||||||
|
been accessed, the media's creation time is used instead. Both thumbnails
|
||||||
|
and the original media will be removed. If either of these options are unset,
|
||||||
|
then media of that type will not be purged.
|
||||||
|
|
||||||
|
Example configuration:
|
||||||
|
```yaml
|
||||||
|
media_retention:
|
||||||
|
local_media_lifetime: 90d
|
||||||
|
remote_media_lifetime: 14d
|
||||||
|
```
|
||||||
|
---
|
||||||
Config option: `url_preview_enabled`
|
Config option: `url_preview_enabled`
|
||||||
|
|
||||||
This setting determines whether the preview URL API is enabled.
|
This setting determines whether the preview URL API is enabled.
|
||||||
|
|
|
@ -223,6 +223,22 @@ class ContentRepositoryConfig(Config):
|
||||||
"url_preview_accept_language"
|
"url_preview_accept_language"
|
||||||
) or ["en"]
|
) or ["en"]
|
||||||
|
|
||||||
|
media_retention = config.get("media_retention") or {}
|
||||||
|
|
||||||
|
self.media_retention_local_media_lifetime_ms = None
|
||||||
|
local_media_lifetime = media_retention.get("local_media_lifetime")
|
||||||
|
if local_media_lifetime is not None:
|
||||||
|
self.media_retention_local_media_lifetime_ms = self.parse_duration(
|
||||||
|
local_media_lifetime
|
||||||
|
)
|
||||||
|
|
||||||
|
self.media_retention_remote_media_lifetime_ms = None
|
||||||
|
remote_media_lifetime = media_retention.get("remote_media_lifetime")
|
||||||
|
if remote_media_lifetime is not None:
|
||||||
|
self.media_retention_remote_media_lifetime_ms = self.parse_duration(
|
||||||
|
remote_media_lifetime
|
||||||
|
)
|
||||||
|
|
||||||
def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
|
def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
|
||||||
assert data_dir_path is not None
|
assert data_dir_path is not None
|
||||||
media_store = os.path.join(data_dir_path, "media_store")
|
media_store = os.path.join(data_dir_path, "media_store")
|
||||||
|
|
|
@ -65,7 +65,12 @@ if TYPE_CHECKING:
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
|
# How often to run the background job to update the "recently accessed"
|
||||||
|
# attribute of local and remote media.
|
||||||
|
UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 # 1 minute
|
||||||
|
# How often to run the background job to check for local and remote media
|
||||||
|
# that should be purged according to the configured media retention settings.
|
||||||
|
MEDIA_RETENTION_CHECK_PERIOD_MS = 60 * 60 * 1000 # 1 hour
|
||||||
|
|
||||||
|
|
||||||
class MediaRepository:
|
class MediaRepository:
|
||||||
|
@ -122,11 +127,36 @@ class MediaRepository:
|
||||||
self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS
|
self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Media retention configuration options
|
||||||
|
self._media_retention_local_media_lifetime_ms = (
|
||||||
|
hs.config.media.media_retention_local_media_lifetime_ms
|
||||||
|
)
|
||||||
|
self._media_retention_remote_media_lifetime_ms = (
|
||||||
|
hs.config.media.media_retention_remote_media_lifetime_ms
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check whether local or remote media retention is configured
|
||||||
|
if (
|
||||||
|
hs.config.media.media_retention_local_media_lifetime_ms is not None
|
||||||
|
or hs.config.media.media_retention_remote_media_lifetime_ms is not None
|
||||||
|
):
|
||||||
|
# Run the background job to apply media retention rules routinely,
|
||||||
|
# with the duration between runs dictated by the homeserver config.
|
||||||
|
self.clock.looping_call(
|
||||||
|
self._start_apply_media_retention_rules,
|
||||||
|
MEDIA_RETENTION_CHECK_PERIOD_MS,
|
||||||
|
)
|
||||||
|
|
||||||
def _start_update_recently_accessed(self) -> Deferred:
|
def _start_update_recently_accessed(self) -> Deferred:
|
||||||
return run_as_background_process(
|
return run_as_background_process(
|
||||||
"update_recently_accessed_media", self._update_recently_accessed
|
"update_recently_accessed_media", self._update_recently_accessed
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _start_apply_media_retention_rules(self) -> Deferred:
|
||||||
|
return run_as_background_process(
|
||||||
|
"apply_media_retention_rules", self._apply_media_retention_rules
|
||||||
|
)
|
||||||
|
|
||||||
async def _update_recently_accessed(self) -> None:
|
async def _update_recently_accessed(self) -> None:
|
||||||
remote_media = self.recently_accessed_remotes
|
remote_media = self.recently_accessed_remotes
|
||||||
self.recently_accessed_remotes = set()
|
self.recently_accessed_remotes = set()
|
||||||
|
@ -835,6 +865,45 @@ class MediaRepository:
|
||||||
|
|
||||||
return {"width": m_width, "height": m_height}
|
return {"width": m_width, "height": m_height}
|
||||||
|
|
||||||
|
async def _apply_media_retention_rules(self) -> None:
|
||||||
|
"""
|
||||||
|
Purge old local and remote media according to the media retention rules
|
||||||
|
defined in the homeserver config.
|
||||||
|
"""
|
||||||
|
# Purge remote media
|
||||||
|
if self._media_retention_remote_media_lifetime_ms is not None:
|
||||||
|
# Calculate a threshold timestamp derived from the configured lifetime. Any
|
||||||
|
# media that has not been accessed since this timestamp will be removed.
|
||||||
|
remote_media_threshold_timestamp_ms = (
|
||||||
|
self.clock.time_msec() - self._media_retention_remote_media_lifetime_ms
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Purging remote media last accessed before"
|
||||||
|
f" {remote_media_threshold_timestamp_ms}"
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.delete_old_remote_media(
|
||||||
|
before_ts=remote_media_threshold_timestamp_ms
|
||||||
|
)
|
||||||
|
|
||||||
|
# And now do the same for local media
|
||||||
|
if self._media_retention_local_media_lifetime_ms is not None:
|
||||||
|
# This works the same as the remote media threshold
|
||||||
|
local_media_threshold_timestamp_ms = (
|
||||||
|
self.clock.time_msec() - self._media_retention_local_media_lifetime_ms
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Purging local media last accessed before"
|
||||||
|
f" {local_media_threshold_timestamp_ms}"
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.delete_old_local_media(
|
||||||
|
before_ts=local_media_threshold_timestamp_ms,
|
||||||
|
keep_profiles=True,
|
||||||
|
)
|
||||||
|
|
||||||
async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]:
|
async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]:
|
||||||
old_media = await self.store.get_remote_media_before(before_ts)
|
old_media = await self.store.get_remote_media_before(before_ts)
|
||||||
|
|
||||||
|
|
238
tests/rest/media/test_media_retention.py
Normal file
238
tests/rest/media/test_media_retention.py
Normal file
|
@ -0,0 +1,238 @@
|
||||||
|
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import io
|
||||||
|
from typing import Iterable, Optional, Tuple
|
||||||
|
|
||||||
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
|
from synapse.rest import admin
|
||||||
|
from synapse.rest.client import login, register, room
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
from synapse.types import UserID
|
||||||
|
from synapse.util import Clock
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
from tests.unittest import override_config
|
||||||
|
from tests.utils import MockClock
|
||||||
|
|
||||||
|
|
||||||
|
class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
|
ONE_DAY_IN_MS = 24 * 60 * 60 * 1000
|
||||||
|
THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS
|
||||||
|
|
||||||
|
servlets = [
|
||||||
|
room.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
register.register_servlets,
|
||||||
|
admin.register_servlets_for_client_rest_resource,
|
||||||
|
]
|
||||||
|
|
||||||
|
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||||
|
# We need to be able to test advancing time in the homeserver, so we
|
||||||
|
# replace the test homeserver's default clock with a MockClock, which
|
||||||
|
# supports advancing time.
|
||||||
|
return self.setup_test_homeserver(clock=MockClock())
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
self.remote_server_name = "remote.homeserver"
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
|
||||||
|
# Create a user to upload media with
|
||||||
|
test_user_id = self.register_user("alice", "password")
|
||||||
|
|
||||||
|
# Inject media (3 images each; recently accessed, old access, never accessed)
|
||||||
|
# into both the local store and the remote cache
|
||||||
|
media_repository = hs.get_media_repository()
|
||||||
|
test_media_content = b"example string"
|
||||||
|
|
||||||
|
def _create_media_and_set_last_accessed(
|
||||||
|
last_accessed_ms: Optional[int],
|
||||||
|
) -> str:
|
||||||
|
# "Upload" some media to the local media store
|
||||||
|
mxc_uri = self.get_success(
|
||||||
|
media_repository.create_content(
|
||||||
|
media_type="text/plain",
|
||||||
|
upload_name=None,
|
||||||
|
content=io.BytesIO(test_media_content),
|
||||||
|
content_length=len(test_media_content),
|
||||||
|
auth_user=UserID.from_string(test_user_id),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
media_id = mxc_uri.split("/")[-1]
|
||||||
|
|
||||||
|
# Set the last recently accessed time for this media
|
||||||
|
if last_accessed_ms is not None:
|
||||||
|
self.get_success(
|
||||||
|
self.store.update_cached_last_access_time(
|
||||||
|
local_media=(media_id,),
|
||||||
|
remote_media=(),
|
||||||
|
time_ms=last_accessed_ms,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return media_id
|
||||||
|
|
||||||
|
def _cache_remote_media_and_set_last_accessed(
|
||||||
|
media_id: str, last_accessed_ms: Optional[int]
|
||||||
|
) -> str:
|
||||||
|
# Pretend to cache some remote media
|
||||||
|
self.get_success(
|
||||||
|
self.store.store_cached_remote_media(
|
||||||
|
origin=self.remote_server_name,
|
||||||
|
media_id=media_id,
|
||||||
|
media_type="text/plain",
|
||||||
|
media_length=1,
|
||||||
|
time_now_ms=clock.time_msec(),
|
||||||
|
upload_name="testfile.txt",
|
||||||
|
filesystem_id="abcdefg12345",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set the last recently accessed time for this media
|
||||||
|
if last_accessed_ms is not None:
|
||||||
|
self.get_success(
|
||||||
|
hs.get_datastores().main.update_cached_last_access_time(
|
||||||
|
local_media=(),
|
||||||
|
remote_media=((self.remote_server_name, media_id),),
|
||||||
|
time_ms=last_accessed_ms,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return media_id
|
||||||
|
|
||||||
|
# Start with the local media store
|
||||||
|
self.local_recently_accessed_media = _create_media_and_set_last_accessed(
|
||||||
|
self.THIRTY_DAYS_IN_MS
|
||||||
|
)
|
||||||
|
self.local_not_recently_accessed_media = _create_media_and_set_last_accessed(
|
||||||
|
self.ONE_DAY_IN_MS
|
||||||
|
)
|
||||||
|
self.local_never_accessed_media = _create_media_and_set_last_accessed(None)
|
||||||
|
|
||||||
|
# And now the remote media store
|
||||||
|
self.remote_recently_accessed_media = _cache_remote_media_and_set_last_accessed(
|
||||||
|
"a", self.THIRTY_DAYS_IN_MS
|
||||||
|
)
|
||||||
|
self.remote_not_recently_accessed_media = (
|
||||||
|
_cache_remote_media_and_set_last_accessed("b", self.ONE_DAY_IN_MS)
|
||||||
|
)
|
||||||
|
# Remote media will always have a "last accessed" attribute, as it would not
|
||||||
|
# be fetched from the remote homeserver unless instigated by a user.
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{
|
||||||
|
"media_retention": {
|
||||||
|
# Enable retention for local media
|
||||||
|
"local_media_lifetime": "30d"
|
||||||
|
# Cached remote media should not be purged
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
def test_local_media_retention(self) -> None:
|
||||||
|
"""
|
||||||
|
Tests that local media that have not been accessed recently is purged, while
|
||||||
|
cached remote media is unaffected.
|
||||||
|
"""
|
||||||
|
# Advance 31 days (in seconds)
|
||||||
|
self.reactor.advance(31 * 24 * 60 * 60)
|
||||||
|
|
||||||
|
# Check that media has been correctly purged.
|
||||||
|
# Local media accessed <30 days ago should still exist.
|
||||||
|
# Remote media should be unaffected.
|
||||||
|
self._assert_if_mxc_uris_purged(
|
||||||
|
purged=[
|
||||||
|
(
|
||||||
|
self.hs.config.server.server_name,
|
||||||
|
self.local_not_recently_accessed_media,
|
||||||
|
),
|
||||||
|
(self.hs.config.server.server_name, self.local_never_accessed_media),
|
||||||
|
],
|
||||||
|
not_purged=[
|
||||||
|
(self.hs.config.server.server_name, self.local_recently_accessed_media),
|
||||||
|
(self.remote_server_name, self.remote_recently_accessed_media),
|
||||||
|
(self.remote_server_name, self.remote_not_recently_accessed_media),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{
|
||||||
|
"media_retention": {
|
||||||
|
# Enable retention for cached remote media
|
||||||
|
"remote_media_lifetime": "30d"
|
||||||
|
# Local media should not be purged
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
def test_remote_media_cache_retention(self) -> None:
|
||||||
|
"""
|
||||||
|
Tests that entries from the remote media cache that have not been accessed
|
||||||
|
recently is purged, while local media is unaffected.
|
||||||
|
"""
|
||||||
|
# Advance 31 days (in seconds)
|
||||||
|
self.reactor.advance(31 * 24 * 60 * 60)
|
||||||
|
|
||||||
|
# Check that media has been correctly purged.
|
||||||
|
# Local media should be unaffected.
|
||||||
|
# Remote media accessed <30 days ago should still exist.
|
||||||
|
self._assert_if_mxc_uris_purged(
|
||||||
|
purged=[
|
||||||
|
(self.remote_server_name, self.remote_not_recently_accessed_media),
|
||||||
|
],
|
||||||
|
not_purged=[
|
||||||
|
(self.remote_server_name, self.remote_recently_accessed_media),
|
||||||
|
(self.hs.config.server.server_name, self.local_recently_accessed_media),
|
||||||
|
(
|
||||||
|
self.hs.config.server.server_name,
|
||||||
|
self.local_not_recently_accessed_media,
|
||||||
|
),
|
||||||
|
(self.hs.config.server.server_name, self.local_never_accessed_media),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
def _assert_if_mxc_uris_purged(
|
||||||
|
self, purged: Iterable[Tuple[str, str]], not_purged: Iterable[Tuple[str, str]]
|
||||||
|
) -> None:
|
||||||
|
def _assert_mxc_uri_purge_state(
|
||||||
|
server_name: str, media_id: str, expect_purged: bool
|
||||||
|
) -> None:
|
||||||
|
"""Given an MXC URI, assert whether it has been purged or not."""
|
||||||
|
if server_name == self.hs.config.server.server_name:
|
||||||
|
found_media_dict = self.get_success(
|
||||||
|
self.store.get_local_media(media_id)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
found_media_dict = self.get_success(
|
||||||
|
self.store.get_cached_remote_media(server_name, media_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
mxc_uri = f"mxc://{server_name}/{media_id}"
|
||||||
|
|
||||||
|
if expect_purged:
|
||||||
|
self.assertIsNone(
|
||||||
|
found_media_dict, msg=f"{mxc_uri} unexpectedly not purged"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.assertIsNotNone(
|
||||||
|
found_media_dict,
|
||||||
|
msg=f"{mxc_uri} unexpectedly purged",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assert that the given MXC URIs have either been correctly purged or not.
|
||||||
|
for server_name, media_id in purged:
|
||||||
|
_assert_mxc_uri_purge_state(server_name, media_id, expect_purged=True)
|
||||||
|
for server_name, media_id in not_purged:
|
||||||
|
_assert_mxc_uri_purge_state(server_name, media_id, expect_purged=False)
|
Loading…
Reference in a new issue