forked from MirrorHub/synapse
Use MediaStorage for remote media
This commit is contained in:
parent
dd3092c3a3
commit
9e20840e02
2 changed files with 154 additions and 134 deletions
synapse/rest/media/v1
|
@ -14,7 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import synapse.http.servlet
|
import synapse.http.servlet
|
||||||
|
|
||||||
from ._base import parse_media_id, respond_with_file, respond_404
|
from ._base import parse_media_id, respond_404
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
from synapse.http.server import request_handler, set_cors_headers
|
from synapse.http.server import request_handler, set_cors_headers
|
||||||
|
|
||||||
|
@ -59,35 +59,14 @@ class DownloadResource(Resource):
|
||||||
if server_name == self.server_name:
|
if server_name == self.server_name:
|
||||||
yield self.media_repo.get_local_media(request, media_id, name)
|
yield self.media_repo.get_local_media(request, media_id, name)
|
||||||
else:
|
else:
|
||||||
yield self._respond_remote_file(
|
allow_remote = synapse.http.servlet.parse_boolean(
|
||||||
request, server_name, media_id, name
|
request, "allow_remote", default=True)
|
||||||
)
|
if not allow_remote:
|
||||||
|
logger.info(
|
||||||
|
"Rejecting request for remote media %s/%s due to allow_remote",
|
||||||
|
server_name, media_id,
|
||||||
|
)
|
||||||
|
respond_404(request)
|
||||||
|
return
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
yield self.media_repo.get_remote_media(request, server_name, media_id, name)
|
||||||
def _respond_remote_file(self, request, server_name, media_id, name):
|
|
||||||
# don't forward requests for remote media if allow_remote is false
|
|
||||||
allow_remote = synapse.http.servlet.parse_boolean(
|
|
||||||
request, "allow_remote", default=True)
|
|
||||||
if not allow_remote:
|
|
||||||
logger.info(
|
|
||||||
"Rejecting request for remote media %s/%s due to allow_remote",
|
|
||||||
server_name, media_id,
|
|
||||||
)
|
|
||||||
respond_404(request)
|
|
||||||
return
|
|
||||||
|
|
||||||
media_info = yield self.media_repo.get_remote_media(server_name, media_id)
|
|
||||||
|
|
||||||
media_type = media_info["media_type"]
|
|
||||||
media_length = media_info["media_length"]
|
|
||||||
filesystem_id = media_info["filesystem_id"]
|
|
||||||
upload_name = name if name else media_info["upload_name"]
|
|
||||||
|
|
||||||
file_path = self.filepaths.remote_media_filepath(
|
|
||||||
server_name, filesystem_id
|
|
||||||
)
|
|
||||||
|
|
||||||
yield respond_with_file(
|
|
||||||
request, media_type, file_path, media_length,
|
|
||||||
upload_name=upload_name,
|
|
||||||
)
|
|
||||||
|
|
|
@ -19,7 +19,7 @@ import twisted.internet.error
|
||||||
import twisted.web.http
|
import twisted.web.http
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
from ._base import respond_404, RequestWriter, FileInfo, respond_with_responder
|
from ._base import respond_404, FileInfo, respond_with_responder
|
||||||
from .upload_resource import UploadResource
|
from .upload_resource import UploadResource
|
||||||
from .download_resource import DownloadResource
|
from .download_resource import DownloadResource
|
||||||
from .thumbnail_resource import ThumbnailResource
|
from .thumbnail_resource import ThumbnailResource
|
||||||
|
@ -161,124 +161,165 @@ class MediaRepository(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_remote_media(self, server_name, media_id):
|
def get_remote_media(self, request, server_name, media_id, name):
|
||||||
|
"""Respond to requests for remote media.
|
||||||
|
"""
|
||||||
|
self.recently_accessed_remotes.add((server_name, media_id))
|
||||||
|
|
||||||
|
# We linearize here to ensure that we don't try and download remote
|
||||||
|
# media mutliple times concurrently
|
||||||
key = (server_name, media_id)
|
key = (server_name, media_id)
|
||||||
with (yield self.remote_media_linearizer.queue(key)):
|
with (yield self.remote_media_linearizer.queue(key)):
|
||||||
media_info = yield self._get_remote_media_impl(server_name, media_id)
|
responder, media_info = yield self._get_remote_media_impl(
|
||||||
defer.returnValue(media_info)
|
server_name, media_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
# We purposefully stream the file outside the lock
|
||||||
|
if responder:
|
||||||
|
media_type = media_info["media_type"]
|
||||||
|
media_length = media_info["media_length"]
|
||||||
|
upload_name = name if name else media_info["upload_name"]
|
||||||
|
yield respond_with_responder(
|
||||||
|
request, responder, media_type, media_length, upload_name,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
respond_404(request)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_remote_media_impl(self, server_name, media_id):
|
def _get_remote_media_impl(self, server_name, media_id):
|
||||||
|
"""Looks for media in local cache, if not there then attempt to
|
||||||
|
download from remote server.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred((Respodner, media_info))
|
||||||
|
"""
|
||||||
media_info = yield self.store.get_cached_remote_media(
|
media_info = yield self.store.get_cached_remote_media(
|
||||||
server_name, media_id
|
server_name, media_id
|
||||||
)
|
)
|
||||||
if not media_info:
|
|
||||||
media_info = yield self._download_remote_file(
|
# file_id is the ID we use to track the file locally. If we've already
|
||||||
server_name, media_id
|
# seen the file then reuse the existing ID, otherwise genereate a new
|
||||||
)
|
# one.
|
||||||
elif media_info["quarantined_by"]:
|
if media_info:
|
||||||
raise NotFoundError()
|
file_id = media_info["filesystem_id"]
|
||||||
else:
|
else:
|
||||||
self.recently_accessed_remotes.add((server_name, media_id))
|
file_id = random_string(24)
|
||||||
yield self.store.update_cached_last_access_time(
|
|
||||||
[(server_name, media_id)], self.clock.time_msec()
|
file_info = FileInfo(server_name, file_id)
|
||||||
)
|
|
||||||
defer.returnValue(media_info)
|
# If we have an entry in the DB, try and look for it
|
||||||
|
if media_info:
|
||||||
|
if media_info["quarantined_by"]:
|
||||||
|
raise NotFoundError()
|
||||||
|
|
||||||
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
|
if responder:
|
||||||
|
defer.returnValue((responder, media_info))
|
||||||
|
|
||||||
|
# Failed to find the file anywhere, lets download it.
|
||||||
|
|
||||||
|
media_info = yield self._download_remote_file(
|
||||||
|
server_name, media_id, file_id
|
||||||
|
)
|
||||||
|
|
||||||
|
responder = yield self.media_storage.fetch_media(file_info)
|
||||||
|
if responder:
|
||||||
|
defer.returnValue((responder, media_info))
|
||||||
|
|
||||||
|
defer.returnValue((None, media_info))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _download_remote_file(self, server_name, media_id):
|
def _download_remote_file(self, server_name, media_id, file_id):
|
||||||
file_id = random_string(24)
|
"""Attempt to download the remote file from the given server name,
|
||||||
|
using the given file_id as the local id.
|
||||||
|
"""
|
||||||
|
|
||||||
fpath = self.filepaths.remote_media_filepath_rel(
|
file_info = FileInfo(
|
||||||
server_name, file_id
|
server_name=server_name,
|
||||||
|
file_id=file_id,
|
||||||
)
|
)
|
||||||
fname = os.path.join(self.primary_base_path, fpath)
|
|
||||||
self._makedirs(fname)
|
|
||||||
|
|
||||||
try:
|
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
|
||||||
with open(fname, "wb") as f:
|
request_path = "/".join((
|
||||||
request_path = "/".join((
|
"/_matrix/media/v1/download", server_name, media_id,
|
||||||
"/_matrix/media/v1/download", server_name, media_id,
|
))
|
||||||
))
|
try:
|
||||||
|
length, headers = yield self.client.get_file(
|
||||||
|
server_name, request_path, output_stream=f,
|
||||||
|
max_size=self.max_upload_size, args={
|
||||||
|
# tell the remote server to 404 if it doesn't
|
||||||
|
# recognise the server_name, to make sure we don't
|
||||||
|
# end up with a routing loop.
|
||||||
|
"allow_remote": "false",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except twisted.internet.error.DNSLookupError as e:
|
||||||
|
logger.warn("HTTP error fetching remote media %s/%s: %r",
|
||||||
|
server_name, media_id, e)
|
||||||
|
raise NotFoundError()
|
||||||
|
|
||||||
|
except HttpResponseException as e:
|
||||||
|
logger.warn("HTTP error fetching remote media %s/%s: %s",
|
||||||
|
server_name, media_id, e.response)
|
||||||
|
if e.code == twisted.web.http.NOT_FOUND:
|
||||||
|
raise SynapseError.from_http_response_exception(e)
|
||||||
|
raise SynapseError(502, "Failed to fetch remote media")
|
||||||
|
|
||||||
|
except SynapseError:
|
||||||
|
logger.exception("Failed to fetch remote media %s/%s",
|
||||||
|
server_name, media_id)
|
||||||
|
raise
|
||||||
|
except NotRetryingDestination:
|
||||||
|
logger.warn("Not retrying destination %r", server_name)
|
||||||
|
raise SynapseError(502, "Failed to fetch remote media")
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to fetch remote media %s/%s",
|
||||||
|
server_name, media_id)
|
||||||
|
raise SynapseError(502, "Failed to fetch remote media")
|
||||||
|
|
||||||
|
yield finish()
|
||||||
|
|
||||||
|
media_type = headers["Content-Type"][0]
|
||||||
|
|
||||||
|
time_now_ms = self.clock.time_msec()
|
||||||
|
|
||||||
|
content_disposition = headers.get("Content-Disposition", None)
|
||||||
|
if content_disposition:
|
||||||
|
_, params = cgi.parse_header(content_disposition[0],)
|
||||||
|
upload_name = None
|
||||||
|
|
||||||
|
# First check if there is a valid UTF-8 filename
|
||||||
|
upload_name_utf8 = params.get("filename*", None)
|
||||||
|
if upload_name_utf8:
|
||||||
|
if upload_name_utf8.lower().startswith("utf-8''"):
|
||||||
|
upload_name = upload_name_utf8[7:]
|
||||||
|
|
||||||
|
# If there isn't check for an ascii name.
|
||||||
|
if not upload_name:
|
||||||
|
upload_name_ascii = params.get("filename", None)
|
||||||
|
if upload_name_ascii and is_ascii(upload_name_ascii):
|
||||||
|
upload_name = upload_name_ascii
|
||||||
|
|
||||||
|
if upload_name:
|
||||||
|
upload_name = urlparse.unquote(upload_name)
|
||||||
try:
|
try:
|
||||||
length, headers = yield self.client.get_file(
|
upload_name = upload_name.decode("utf-8")
|
||||||
server_name, request_path, output_stream=f,
|
except UnicodeDecodeError:
|
||||||
max_size=self.max_upload_size, args={
|
upload_name = None
|
||||||
# tell the remote server to 404 if it doesn't
|
else:
|
||||||
# recognise the server_name, to make sure we don't
|
upload_name = None
|
||||||
# end up with a routing loop.
|
|
||||||
"allow_remote": "false",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
except twisted.internet.error.DNSLookupError as e:
|
|
||||||
logger.warn("HTTP error fetching remote media %s/%s: %r",
|
|
||||||
server_name, media_id, e)
|
|
||||||
raise NotFoundError()
|
|
||||||
|
|
||||||
except HttpResponseException as e:
|
logger.info("Stored remote media in file %r", fname)
|
||||||
logger.warn("HTTP error fetching remote media %s/%s: %s",
|
|
||||||
server_name, media_id, e.response)
|
|
||||||
if e.code == twisted.web.http.NOT_FOUND:
|
|
||||||
raise SynapseError.from_http_response_exception(e)
|
|
||||||
raise SynapseError(502, "Failed to fetch remote media")
|
|
||||||
|
|
||||||
except SynapseError:
|
yield self.store.store_cached_remote_media(
|
||||||
logger.exception("Failed to fetch remote media %s/%s",
|
origin=server_name,
|
||||||
server_name, media_id)
|
media_id=media_id,
|
||||||
raise
|
media_type=media_type,
|
||||||
except NotRetryingDestination:
|
time_now_ms=self.clock.time_msec(),
|
||||||
logger.warn("Not retrying destination %r", server_name)
|
upload_name=upload_name,
|
||||||
raise SynapseError(502, "Failed to fetch remote media")
|
media_length=length,
|
||||||
except Exception:
|
filesystem_id=file_id,
|
||||||
logger.exception("Failed to fetch remote media %s/%s",
|
)
|
||||||
server_name, media_id)
|
|
||||||
raise SynapseError(502, "Failed to fetch remote media")
|
|
||||||
|
|
||||||
yield self.copy_to_backup(fpath)
|
|
||||||
|
|
||||||
media_type = headers["Content-Type"][0]
|
|
||||||
time_now_ms = self.clock.time_msec()
|
|
||||||
|
|
||||||
content_disposition = headers.get("Content-Disposition", None)
|
|
||||||
if content_disposition:
|
|
||||||
_, params = cgi.parse_header(content_disposition[0],)
|
|
||||||
upload_name = None
|
|
||||||
|
|
||||||
# First check if there is a valid UTF-8 filename
|
|
||||||
upload_name_utf8 = params.get("filename*", None)
|
|
||||||
if upload_name_utf8:
|
|
||||||
if upload_name_utf8.lower().startswith("utf-8''"):
|
|
||||||
upload_name = upload_name_utf8[7:]
|
|
||||||
|
|
||||||
# If there isn't check for an ascii name.
|
|
||||||
if not upload_name:
|
|
||||||
upload_name_ascii = params.get("filename", None)
|
|
||||||
if upload_name_ascii and is_ascii(upload_name_ascii):
|
|
||||||
upload_name = upload_name_ascii
|
|
||||||
|
|
||||||
if upload_name:
|
|
||||||
upload_name = urlparse.unquote(upload_name)
|
|
||||||
try:
|
|
||||||
upload_name = upload_name.decode("utf-8")
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
upload_name = None
|
|
||||||
else:
|
|
||||||
upload_name = None
|
|
||||||
|
|
||||||
logger.info("Stored remote media in file %r", fname)
|
|
||||||
|
|
||||||
yield self.store.store_cached_remote_media(
|
|
||||||
origin=server_name,
|
|
||||||
media_id=media_id,
|
|
||||||
media_type=media_type,
|
|
||||||
time_now_ms=self.clock.time_msec(),
|
|
||||||
upload_name=upload_name,
|
|
||||||
media_length=length,
|
|
||||||
filesystem_id=file_id,
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
os.remove(fname)
|
|
||||||
raise
|
|
||||||
|
|
||||||
media_info = {
|
media_info = {
|
||||||
"media_type": media_type,
|
"media_type": media_type,
|
||||||
|
|
Loading…
Add table
Reference in a new issue