Copy everything to backup

This commit is contained in:
Erik Johnston 2017-10-12 17:31:24 +01:00
parent b77a13812c
commit e283b555b1
5 changed files with 151 additions and 77 deletions

View file

@ -75,7 +75,9 @@ class ContentRepositoryConfig(Config):
self.backup_media_store_path = config.get("backup_media_store_path")
if self.backup_media_store_path:
self.ensure_directory(self.backup_media_store_path)
self.backup_media_store_path = self.ensure_directory(
self.backup_media_store_path
)
self.synchronous_backup_media_store = config.get(
"synchronous_backup_media_store", False

View file

@ -15,102 +15,133 @@
import os
import re
import functools
NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
def _wrap_in_base_path(func):
"""Takes a function that returns a relative path and turns it into an
absolute path based on the location of the primary media store
"""
@functools.wraps(func)
def _wrapped(self, *args, **kwargs):
path = func(self, *args, **kwargs)
return os.path.join(self.primary_base_path, path)
return _wrapped
class MediaFilePaths(object):
"""Describes where files are stored on disk.
def __init__(self, base_path):
self.base_path = base_path
Most of the function have a `*_rel` variant which returns a file path that
is relative to the base media store path. This is mainly used when we want
to write to the backup media store (when one is configured)
"""
def default_thumbnail(self, default_top_level, default_sub_type, width,
def __init__(self, primary_base_path):
self.primary_base_path = primary_base_path
def default_thumbnail_rel(self, default_top_level, default_sub_type, width,
height, content_type, method):
top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s-%s" % (
width, height, top_level_type, sub_type, method
)
return os.path.join(
self.base_path, "default_thumbnails", default_top_level,
"default_thumbnails", default_top_level,
default_sub_type, file_name
)
def local_media_filepath(self, media_id):
default_thumbnail = _wrap_in_base_path(default_thumbnail_rel)
def local_media_filepath_rel(self, media_id):
return os.path.join(
self.base_path, "local_content",
"local_content",
media_id[0:2], media_id[2:4], media_id[4:]
)
def local_media_thumbnail(self, media_id, width, height, content_type,
local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
def local_media_thumbnail_rel(self, media_id, width, height, content_type,
method):
top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s-%s" % (
width, height, top_level_type, sub_type, method
)
return os.path.join(
self.base_path, "local_thumbnails",
"local_thumbnails",
media_id[0:2], media_id[2:4], media_id[4:],
file_name
)
def remote_media_filepath(self, server_name, file_id):
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
def remote_media_filepath_rel(self, server_name, file_id):
return os.path.join(
self.base_path, "remote_content", server_name,
"remote_content", server_name,
file_id[0:2], file_id[2:4], file_id[4:]
)
def remote_media_thumbnail(self, server_name, file_id, width, height,
remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
def remote_media_thumbnail_rel(self, server_name, file_id, width, height,
content_type, method):
top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
return os.path.join(
self.base_path, "remote_thumbnail", server_name,
"remote_thumbnail", server_name,
file_id[0:2], file_id[2:4], file_id[4:],
file_name
)
remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
def remote_media_thumbnail_dir(self, server_name, file_id):
return os.path.join(
self.base_path, "remote_thumbnail", server_name,
"remote_thumbnail", server_name,
file_id[0:2], file_id[2:4], file_id[4:],
)
def url_cache_filepath(self, media_id):
def url_cache_filepath_rel(self, media_id):
if NEW_FORMAT_ID_RE.match(media_id):
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
return os.path.join(
self.base_path, "url_cache",
"url_cache",
media_id[:10], media_id[11:]
)
else:
return os.path.join(
self.base_path, "url_cache",
"url_cache",
media_id[0:2], media_id[2:4], media_id[4:],
)
url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
def url_cache_filepath_dirs_to_delete(self, media_id):
"The dirs to try and remove if we delete the media_id file"
if NEW_FORMAT_ID_RE.match(media_id):
return [
os.path.join(
self.base_path, "url_cache",
"url_cache",
media_id[:10],
),
]
else:
return [
os.path.join(
self.base_path, "url_cache",
"url_cache",
media_id[0:2], media_id[2:4],
),
os.path.join(
self.base_path, "url_cache",
"url_cache",
media_id[0:2],
),
]
def url_cache_thumbnail(self, media_id, width, height, content_type,
def url_cache_thumbnail_rel(self, media_id, width, height, content_type,
method):
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
@ -122,29 +153,31 @@ class MediaFilePaths(object):
if NEW_FORMAT_ID_RE.match(media_id):
return os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[:10], media_id[11:],
file_name
)
else:
return os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[0:2], media_id[2:4], media_id[4:],
file_name
)
url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
def url_cache_thumbnail_directory(self, media_id):
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
if NEW_FORMAT_ID_RE.match(media_id):
return os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[:10], media_id[11:],
)
else:
return os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[0:2], media_id[2:4], media_id[4:],
)
@ -155,26 +188,26 @@ class MediaFilePaths(object):
if NEW_FORMAT_ID_RE.match(media_id):
return [
os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[:10], media_id[11:],
),
os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[:10],
),
]
else:
return [
os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[0:2], media_id[2:4], media_id[4:],
),
os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[0:2], media_id[2:4],
),
os.path.join(
self.base_path, "url_cache_thumbnails",
"url_cache_thumbnails",
media_id[0:2],
),
]

View file

@ -60,10 +60,12 @@ class MediaRepository(object):
self.max_upload_size = hs.config.max_upload_size
self.max_image_pixels = hs.config.max_image_pixels
self.filepaths = MediaFilePaths(hs.config.media_store_path)
self.backup_filepaths = None
self.primary_base_path = hs.config.media_store_path
self.filepaths = MediaFilePaths(self.primary_base_path)
self.backup_base_path = None
if hs.config.backup_media_store_path:
self.backup_filepaths = MediaFilePaths(hs.config.backup_media_store_path)
self.backup_base_path = hs.config.backup_media_store_path
self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store
@ -94,42 +96,63 @@ class MediaRepository(object):
if not os.path.exists(dirname):
os.makedirs(dirname)
@defer.inlineCallbacks
def _write_to_file(self, source, file_name_func):
def write_file_thread(file_name):
@staticmethod
def write_file_synchronously(source, fname):
source.seek(0) # Ensure we read from the start of the file
with open(file_name, "wb") as f:
with open(fname, "wb") as f:
shutil.copyfileobj(source, f)
fname = file_name_func(self.filepaths)
@defer.inlineCallbacks
def write_to_file(self, source, path):
"""Write `source` to the on disk media store, and also the backup store
if configured.
Args:
source: A file like object that should be written
path: Relative path to write file to
Returns:
string: the file path written to in the primary media store
"""
fname = os.path.join(self.primary_base_path, path)
self._makedirs(fname)
# Write to the main repository
yield preserve_context_over_fn(threads.deferToThread, write_file_thread, fname)
yield preserve_context_over_fn(
threads.deferToThread,
self.write_file_synchronously, source, fname,
)
# Write to backup repository
if self.backup_filepaths:
backup_fname = file_name_func(self.backup_filepaths)
yield self.copy_to_backup(source, path)
defer.returnValue(fname)
@defer.inlineCallbacks
def copy_to_backup(self, source, path):
if self.backup_base_path:
backup_fname = os.path.join(self.backup_base_path, path)
self._makedirs(backup_fname)
# We can either wait for successful writing to the backup repository
# or write in the background and immediately return
if self.synchronous_backup_media_store:
yield preserve_context_over_fn(
threads.deferToThread, write_file_thread, backup_fname,
threads.deferToThread,
self.write_file_synchronously, source, backup_fname,
)
else:
preserve_fn(threads.deferToThread)(write_file_thread, backup_fname)
defer.returnValue(fname)
preserve_fn(threads.deferToThread)(
self.write_file_synchronously, source, backup_fname,
)
@defer.inlineCallbacks
def create_content(self, media_type, upload_name, content, content_length,
auth_user):
media_id = random_string(24)
fname = yield self._write_to_file(
content, lambda f: f.local_media_filepath(media_id)
fname = yield self.write_to_file(
content, self.filepaths.local_media_filepath_rel(media_id)
)
logger.info("Stored local media in file %r", fname)
@ -180,9 +203,10 @@ class MediaRepository(object):
def _download_remote_file(self, server_name, media_id):
file_id = random_string(24)
fname = self.filepaths.remote_media_filepath(
fpath = self.filepaths.remote_media_filepath_rel(
server_name, file_id
)
fname = os.path.join(self.primary_base_path, fpath)
self._makedirs(fname)
try:
@ -224,6 +248,9 @@ class MediaRepository(object):
server_name, media_id)
raise SynapseError(502, "Failed to fetch remote media")
with open(fname) as f:
yield self.copy_to_backup(f, fpath)
media_type = headers["Content-Type"][0]
time_now_ms = self.clock.time_msec()
@ -322,15 +349,15 @@ class MediaRepository(object):
)
if t_byte_source:
output_path = yield self._write_to_file(
output_path = yield self.write_to_file(
t_byte_source,
lambda f: f.local_media_thumbnail(
self.filepaths.local_media_thumbnail_rel(
media_id, t_width, t_height, t_type, t_method
)
)
logger.info("Stored thumbnail in file %r", output_path)
yield self.store.store_local_thumbnail(
yield self.store.store_local_thumbnail_rel(
media_id, t_width, t_height, t_type, t_method,
len(t_byte_source.getvalue())
)
@ -350,15 +377,15 @@ class MediaRepository(object):
)
if t_byte_source:
output_path = yield self._write_to_file(
output_path = yield self.write_to_file(
t_byte_source,
lambda f: f.remote_media_thumbnail(
self.filepaths.remote_media_thumbnail_rel(
server_name, file_id, t_width, t_height, t_type, t_method
)
)
logger.info("Stored thumbnail in file %r", output_path)
yield self.store.store_remote_media_thumbnail(
yield self.store.store_remote_media_thumbnail_rel(
server_name, media_id, file_id,
t_width, t_height, t_type, t_method, len(t_byte_source.getvalue())
)
@ -403,17 +430,16 @@ class MediaRepository(object):
yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails)
for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails:
def path_name_func(f):
if url_cache:
return f.url_cache_thumbnail(
file_path = self.filepaths.url_cache_thumbnail_rel(
media_id, t_width, t_height, t_type, t_method
)
else:
return f.local_media_thumbnail(
file_path = self.filepaths.local_media_thumbnail_rel(
media_id, t_width, t_height, t_type, t_method
)
yield self._write_to_file(t_byte_source, path_name_func)
yield self.write_to_file(t_byte_source, file_path)
yield self.store.store_local_thumbnail(
media_id, t_width, t_height, t_type, t_method,
@ -460,12 +486,11 @@ class MediaRepository(object):
yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails)
for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails:
def path_name_func(f):
return f.remote_media_thumbnail(
file_path = self.filepaths.remote_media_thumbnail_rel(
server_name, file_id, t_width, t_height, t_type, t_method
)
yield self._write_to_file(t_byte_source, path_name_func)
yield self.write_to_file(t_byte_source, file_path)
yield self.store.store_remote_media_thumbnail(
server_name, media_id, file_id,
@ -491,6 +516,8 @@ class MediaRepository(object):
logger.info("Deleting: %r", key)
# TODO: Should we delete from the backup store
with (yield self.remote_media_linearizer.queue(key)):
full_path = self.filepaths.remote_media_filepath(origin, file_id)
try:

View file

@ -59,6 +59,7 @@ class PreviewUrlResource(Resource):
self.store = hs.get_datastore()
self.client = SpiderHttpClient(hs)
self.media_repo = media_repo
self.primary_base_path = media_repo.primary_base_path
self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist
@ -262,7 +263,8 @@ class PreviewUrlResource(Resource):
file_id = datetime.date.today().isoformat() + '_' + random_string(16)
fname = self.filepaths.url_cache_filepath(file_id)
fpath = self.filepaths.url_cache_filepath_rel(file_id)
fname = os.path.join(self.primary_base_path, fpath)
self.media_repo._makedirs(fname)
try:
@ -273,6 +275,9 @@ class PreviewUrlResource(Resource):
)
# FIXME: pass through 404s and other error messages nicely
with open(fname) as f:
yield self.media_repo.copy_to_backup(f, fpath)
media_type = headers["Content-Type"][0]
time_now_ms = self.clock.time_msec()

View file

@ -51,7 +51,11 @@ class Thumbnailer(object):
return ((max_height * self.width) // self.height, max_height)
def scale(self, width, height, output_type):
"""Rescales the image to the given dimensions"""
"""Rescales the image to the given dimensions.
Returns:
BytesIO: the bytes of the encoded image ready to be written to disk
"""
scaled = self.image.resize((width, height), Image.ANTIALIAS)
return self._encode_image(scaled, output_type)
@ -65,6 +69,9 @@ class Thumbnailer(object):
Args:
max_width: The largest possible width.
max_height: The larget possible height.
Returns:
BytesIO: the bytes of the encoded image ready to be written to disk
"""
if width * self.height > height * self.width:
scaled_height = (width * self.height) // self.width