minio/cmd/fs-v1-multipart.go
Harshavardhana 0b546ddfd4 Return errors in PutObject()/PutObjectPart() if input size is -1. (#5015)
Amazon S3 API expects all incoming stream has a content-length
set it was superflous for us to support object layer which supports
unknown sized stream as well, this PR removes such requirements
and explicitly error out if input stream is less than zero.
2017-10-06 09:38:01 -07:00

1072 lines
34 KiB
Go

/*
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/hex"
"fmt"
"io"
"os"
pathutil "path"
"strings"
"time"
"github.com/minio/minio/pkg/lock"
)
const (
// Expiry duration after which the multipart uploads are deemed stale.
fsMultipartExpiry = time.Hour * 24 * 14 // 2 weeks.
// Cleanup interval when the stale multipart cleanup is initiated.
fsMultipartCleanupInterval = time.Hour * 24 // 24 hrs.
)
// Returns if the prefix is a multipart upload.
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile)
_, err := fsStatFile(uploadsIDPath)
if err != nil {
if errorCause(err) == errFileNotFound {
return false
}
errorIf(err, "Unable to access uploads.json "+uploadsIDPath)
return false
}
return true
}
// Delete uploads.json file wrapper
func (fs fsObjects) deleteUploadsJSON(bucket, object, uploadID string) error {
multipartBucketPath := pathJoin(fs.fsPath, minioMetaMultipartBucket)
uploadPath := pathJoin(multipartBucketPath, bucket, object)
uploadsMetaPath := pathJoin(uploadPath, uploadsJSONFile)
tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)
return fsRemoveMeta(multipartBucketPath, uploadsMetaPath, tmpDir)
}
// Removes the uploadID, called either by CompleteMultipart of AbortMultipart. If the resuling uploads
// slice is empty then we remove/purge the file.
func (fs fsObjects) removeUploadID(bucket, object, uploadID string, rwlk *lock.LockedFile) (bool, error) {
uploadIDs := uploadsV1{}
_, err := uploadIDs.ReadFrom(rwlk)
if err != nil {
return false, err
}
// Removes upload id from the uploads list.
uploadIDs.RemoveUploadID(uploadID)
// Check this is the last entry.
if uploadIDs.IsEmpty() {
// No more uploads left, so we delete `uploads.json` file.
return true, fs.deleteUploadsJSON(bucket, object, uploadID)
} // else not empty
// Write update `uploads.json`.
_, err = uploadIDs.WriteTo(rwlk)
return false, err
}
// Adds a new uploadID if no previous `uploads.json` is
// found we initialize a new one.
func (fs fsObjects) addUploadID(bucket, object, uploadID string, initiated time.Time, rwlk *lock.LockedFile) error {
uploadIDs := uploadsV1{}
_, err := uploadIDs.ReadFrom(rwlk)
// For all unexpected errors, we return.
if err != nil && errorCause(err) != io.EOF {
return err
}
// If we couldn't read anything, we assume a default
// (empty) upload info.
if errorCause(err) == io.EOF {
uploadIDs = newUploadsV1("fs")
}
// Adds new upload id to the list.
uploadIDs.AddUploadID(uploadID, initiated)
// Write update `uploads.json`.
_, err = uploadIDs.WriteTo(rwlk)
return err
}
// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'.
func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int) ([]uploadMetadata, bool, error) {
var uploads []uploadMetadata
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucketName, objectName))
if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil {
return nil, false, traceError(err)
}
defer objectMPartPathLock.RUnlock()
uploadsPath := pathJoin(bucketName, objectName, uploadsJSONFile)
rlk, err := fs.rwPool.Open(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return nil, true, nil
}
return nil, false, traceError(err)
}
defer fs.rwPool.Close(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
// Read `uploads.json`.
uploadIDs := uploadsV1{}
if _, err = uploadIDs.ReadFrom(rlk.LockedFile); err != nil {
return nil, false, err
}
index := 0
if uploadIDMarker != "" {
for ; index < len(uploadIDs.Uploads); index++ {
if uploadIDs.Uploads[index].UploadID == uploadIDMarker {
// Skip the uploadID as it would already be listed in previous listing.
index++
break
}
}
}
for index < len(uploadIDs.Uploads) {
uploads = append(uploads, uploadMetadata{
Object: objectName,
UploadID: uploadIDs.Uploads[index].UploadID,
Initiated: uploadIDs.Uploads[index].Initiated,
})
count--
index++
if count == 0 {
break
}
}
end := (index == len(uploadIDs.Uploads))
return uploads, end, nil
}
// listMultipartUploadsCleanup - lists all multipart uploads. Called by fs.cleanupStaleMultipartUpload()
func (fs fsObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
result := ListMultipartsInfo{}
recursive := true
if delimiter == slashSeparator {
recursive = false
}
result.IsTruncated = true
result.MaxUploads = maxUploads
result.KeyMarker = keyMarker
result.Prefix = prefix
result.Delimiter = delimiter
// Not using path.Join() as it strips off the trailing '/'.
multipartPrefixPath := pathJoin(bucket, prefix)
if prefix == "" {
// Should have a trailing "/" if prefix is ""
// For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is ""
multipartPrefixPath += slashSeparator
}
multipartMarkerPath := ""
if keyMarker != "" {
multipartMarkerPath = pathJoin(bucket, keyMarker)
}
var uploads []uploadMetadata
var err error
var eof bool
if uploadIDMarker != "" {
uploads, _, err = fs.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads)
if err != nil {
return lmi, err
}
maxUploads = maxUploads - len(uploads)
}
var walkResultCh chan treeWalkResult
var endWalkCh chan struct{}
// true only for xl.ListObjectsHeal(), set to false.
heal := false
// Proceed to list only if we have more uploads to be listed.
if maxUploads > 0 {
listPrms := listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal}
// Pop out any previously waiting marker.
walkResultCh, endWalkCh = fs.listPool.Release(listPrms)
if walkResultCh == nil {
endWalkCh = make(chan struct{})
isLeaf := fs.isMultipartUpload
listDir := fs.listDirFactory(isLeaf)
walkResultCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath,
multipartMarkerPath, recursive, listDir, isLeaf, endWalkCh)
}
// List until maxUploads requested.
for maxUploads > 0 {
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if isErrIgnored(walkResult.err, fsTreeWalkIgnoredErrs...) {
eof = true
break
}
return lmi, walkResult.err
}
entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket))
if hasSuffix(walkResult.entry, slashSeparator) {
uploads = append(uploads, uploadMetadata{
Object: entry,
})
maxUploads--
if maxUploads == 0 {
if walkResult.end {
eof = true
break
}
}
continue
}
var tmpUploads []uploadMetadata
var end bool
uploadIDMarker = ""
tmpUploads, end, err = fs.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads)
if err != nil {
return lmi, err
}
uploads = append(uploads, tmpUploads...)
maxUploads -= len(tmpUploads)
if walkResult.end && end {
eof = true
break
}
}
}
// Loop through all the received uploads fill in the multiparts result.
for _, upload := range uploads {
var objectName string
var uploadID string
if hasSuffix(upload.Object, slashSeparator) {
// All directory entries are common prefixes.
uploadID = "" // Upload ids are empty for CommonPrefixes.
objectName = upload.Object
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
} else {
uploadID = upload.UploadID
objectName = upload.Object
result.Uploads = append(result.Uploads, upload)
}
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
}
if !eof {
// Save the go-routine state in the pool so that it can continue from where it left off on
// the next request.
fs.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkResultCh, endWalkCh)
}
result.IsTruncated = !eof
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
// Success.
return result, nil
}
// ListMultipartUploads - lists all the uploadIDs for the specified object.
// We do not support prefix based listing.
func (fs fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
return lmi, err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return lmi, toObjectErr(err, bucket)
}
result := ListMultipartsInfo{}
result.IsTruncated = true
result.MaxUploads = maxUploads
result.KeyMarker = keyMarker
result.Prefix = object
result.Delimiter = delimiter
uploads, _, err := fs.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads)
if err != nil {
return lmi, err
}
result.NextKeyMarker = object
// Loop through all the received uploads fill in the multiparts result.
for _, upload := range uploads {
uploadID := upload.UploadID
result.Uploads = append(result.Uploads, upload)
result.NextUploadIDMarker = uploadID
}
result.IsTruncated = len(uploads) == maxUploads
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
// newMultipartUpload - wrapper for initializing a new multipart
// request, returns back a unique upload id.
//
// Internally this function creates 'uploads.json' associated for the
// incoming object at '.minio.sys/multipart/bucket/object/uploads.json' on
// all the disks. `uploads.json` carries metadata regarding on going
// multipart operation on the object.
func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// Initialize `fs.json` values.
fsMeta := newFSMetaV1()
// Save additional metadata.
fsMeta.Meta = meta
uploadID = mustGetUUID()
initiated := UTCNow()
// Add upload ID to uploads.json
uploadsPath := pathJoin(bucket, object, uploadsJSONFile)
rwlk, err := fs.rwPool.Create(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
if err != nil {
return "", toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
uploadIDPath := pathJoin(bucket, object, uploadID)
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
metaFile, err := fs.rwPool.Create(fsMetaPath)
if err != nil {
return "", toObjectErr(traceError(err), bucket, object)
}
defer metaFile.Close()
// Add a new upload id.
if err = fs.addUploadID(bucket, object, uploadID, initiated, rwlk); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Write all the set metadata.
if _, err = fsMeta.WriteTo(metaFile); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Return success.
return uploadID, nil
}
// NewMultipartUpload - initialize a new multipart upload, returns a
// unique id. The unique id returned here is of UUID form, for each
// subsequent request each UUID is unique.
//
// Implements S3 compatible initiate multipart API.
func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
if err := checkNewMultipartArgs(bucket, object, fs); err != nil {
return "", err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return "", toObjectErr(err, bucket)
}
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return "", err
}
defer objectMPartPathLock.Unlock()
return fs.newMultipartUpload(bucket, object, meta)
}
// Returns if a new part can be appended to fsAppendDataFile.
func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, appendNeeded bool) {
if len(fsMeta.Parts) == 0 {
return
}
// As fsAppendMeta.Parts will be sorted len(fsAppendMeta.Parts) will naturally be the next part number
nextPartNum := len(fsAppendMeta.Parts) + 1
nextPartIndex := fsMeta.ObjectPartIndex(nextPartNum)
if nextPartIndex == -1 {
return
}
return fsMeta.Parts[nextPartIndex], true
}
// CopyObjectPart - similar to PutObjectPart but reads data from an existing
// object. Internally incoming data is written to '.minio.sys/tmp' location
// and safely renamed to '.minio.sys/multipart' for reach parts.
func (fs fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64) (pi PartInfo, e error) {
if err := checkNewMultipartArgs(srcBucket, srcObject, fs); err != nil {
return pi, err
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil {
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
pipeWriter.CloseWithError(gerr)
return
}
pipeWriter.Close() // Close writer explicitly signalling we wrote all data.
}()
partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, NewHashReader(pipeReader, length, "", ""))
if err != nil {
return pi, toObjectErr(err, dstBucket, dstObject)
}
// Explicitly close the reader.
pipeReader.Close()
return partInfo, nil
}
// PutObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is
// written to '.minio.sys/tmp' location and safely renamed to
// '.minio.sys/multipart' for reach parts.
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *HashReader) (pi PartInfo, e error) {
if err := checkPutObjectPartArgs(bucket, object, fs); err != nil {
return pi, err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return pi, toObjectErr(err, bucket)
}
// Validate input data size and it can never be less than zero.
if data.Size() < 0 {
return pi, toObjectErr(traceError(errInvalidArgument))
}
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return pi, err
}
defer objectMPartPathLock.Unlock()
// Disallow any parallel abort or complete multipart operations.
uploadsPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile)
if _, err := fs.rwPool.Open(uploadsPath); err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return pi, traceError(InvalidUploadID{UploadID: uploadID})
}
return pi, toObjectErr(traceError(err), bucket, object)
}
defer fs.rwPool.Close(uploadsPath)
uploadIDPath := pathJoin(bucket, object, uploadID)
// Just check if the uploadID exists to avoid copy if it doesn't.
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
rwlk, err := fs.rwPool.Write(fsMetaPath)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return pi, traceError(InvalidUploadID{UploadID: uploadID})
}
return pi, toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
fsMeta := fsMetaV1{}
_, err = fsMeta.ReadFrom(rwlk)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, fsMetaPath)
}
partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := uploadID + "." + mustGetUUID() + "." + partSuffix
bufSize := int64(readSizeV1)
if size := data.Size(); size > 0 && bufSize > size {
bufSize = size
}
buf := make([]byte, bufSize)
fsPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tmpPartPath)
bytesWritten, cErr := fsCreateFile(fsPartPath, data, buf, data.Size())
if cErr != nil {
fsRemoveFile(fsPartPath)
return pi, toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath)
}
// Should return IncompleteBody{} error when reader has fewer
// bytes than specified in request header.
if bytesWritten < data.Size() {
fsRemoveFile(fsPartPath)
return pi, traceError(IncompleteBody{})
}
// Delete temporary part in case of failure. If
// PutObjectPart succeeds then there would be nothing to
// delete.
defer fsRemoveFile(fsPartPath)
if err = data.Verify(); err != nil {
return pi, toObjectErr(err, minioMetaTmpBucket, tmpPartPath)
}
partPath := pathJoin(bucket, object, uploadID, partSuffix)
// Lock the part so that another part upload with same part-number gets blocked
// while the part is getting appended in the background.
partLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, partPath)
if err = partLock.GetLock(globalOperationTimeout); err != nil {
return pi, err
}
fsNSPartPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, partPath)
if err = fsRenameFile(fsPartPath, fsNSPartPath); err != nil {
partLock.Unlock()
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Save the object part info in `fs.json`.
md5Hex := hex.EncodeToString(data.MD5())
fsMeta.AddObjectPart(partID, partSuffix, md5Hex, data.Size())
if _, err = fsMeta.WriteTo(rwlk); err != nil {
partLock.Unlock()
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, partSuffix)
fi, err := fsStatFile(partNamePath)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partSuffix)
}
// Append the part in background.
errCh := fs.append(bucket, object, uploadID, fsMeta)
go func() {
// Also receive the error so that the appendParts go-routine
// does not block on send. But the error received is ignored
// as fs.PutObjectPart() would have already returned success
// to the client.
<-errCh
partLock.Unlock()
}()
return PartInfo{
PartNumber: partID,
LastModified: fi.ModTime(),
ETag: md5Hex,
Size: fi.Size(),
}, nil
}
// listObjectParts - wrapper scanning through
// '.minio.sys/multipart/bucket/object/UPLOADID'. Lists all the parts
// saved inside '.minio.sys/multipart/bucket/object/UPLOADID'.
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
result := ListPartsInfo{}
uploadIDPath := pathJoin(bucket, object, uploadID)
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
metaFile, err := fs.rwPool.Open(fsMetaPath)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
// On windows oddly this is returned.
return lpi, traceError(InvalidUploadID{UploadID: uploadID})
}
return lpi, toObjectErr(traceError(err), bucket, object)
}
defer fs.rwPool.Close(fsMetaPath)
fsMeta := fsMetaV1{}
_, err = fsMeta.ReadFrom(metaFile.LockedFile)
if err != nil {
return lpi, toObjectErr(err, minioMetaBucket, fsMetaPath)
}
// Only parts with higher part numbers will be listed.
partIdx := fsMeta.ObjectPartIndex(partNumberMarker)
parts := fsMeta.Parts
if partIdx != -1 {
parts = fsMeta.Parts[partIdx+1:]
}
count := maxParts
for _, part := range parts {
var fi os.FileInfo
partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name)
fi, err = fsStatFile(partNamePath)
if err != nil {
return lpi, toObjectErr(err, minioMetaMultipartBucket, partNamePath)
}
result.Parts = append(result.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime(),
Size: fi.Size(),
})
count--
if count == 0 {
break
}
}
// If listed entries are more than maxParts, we set IsTruncated as true.
if len(parts) > len(result.Parts) {
result.IsTruncated = true
// Make sure to fill next part number marker if IsTruncated is
// true for subsequent listing.
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
result.NextPartNumberMarker = nextPartNumberMarker
}
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
result.MaxParts = maxParts
// Success.
return result, nil
}
// ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID. Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
if err := checkListPartsArgs(bucket, object, fs); err != nil {
return lpi, err
}
// Check if bucket exists
if _, err := fs.statBucketDir(bucket); err != nil {
return lpi, toObjectErr(err, bucket)
}
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil {
return lpi, traceError(err)
}
defer objectMPartPathLock.RUnlock()
listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil {
return lpi, toObjectErr(err, bucket, object)
}
return listPartsInfo, nil
}
// CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual
// md5sums of all the parts.
//
// Implements S3 compatible Complete multipart API.
func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (oi ObjectInfo, e error) {
if err := checkCompleteMultipartArgs(bucket, object, fs); err != nil {
return oi, err
}
// Check if an object is present as one of the parent dir.
if fs.parentDirIsObject(bucket, pathutil.Dir(object)) {
return oi, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
if _, err := fs.statBucketDir(bucket); err != nil {
return oi, toObjectErr(err, bucket)
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := getCompleteMultipartMD5(parts)
if err != nil {
return oi, err
}
uploadIDPath := pathJoin(bucket, object, uploadID)
var removeObjectDir bool
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return oi, err
}
defer func() {
if removeObjectDir {
basePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket)
derr := fsDeleteFile(basePath, pathJoin(basePath, object))
errorIf(derr, "unable to remove %s in %s", pathJoin(basePath, object), basePath)
}
objectMPartPathLock.Unlock()
}()
fsMetaPathMultipart := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
rlk, err := fs.rwPool.Open(fsMetaPathMultipart)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return oi, traceError(InvalidUploadID{UploadID: uploadID})
}
return oi, toObjectErr(traceError(err), bucket, object)
}
// Disallow any parallel abort or complete multipart operations.
rwlk, err := fs.rwPool.Write(pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile))
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
if err == errFileNotFound || err == errFileAccessDenied {
return oi, traceError(InvalidUploadID{UploadID: uploadID})
}
return oi, toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
fsMeta := fsMetaV1{}
// Read saved fs metadata for ongoing multipart.
_, err = fsMeta.ReadFrom(rlk.LockedFile)
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, toObjectErr(err, minioMetaMultipartBucket, fsMetaPathMultipart)
}
// Validate all parts and then commit to disk.
for i, part := range parts {
partIdx := fsMeta.ObjectPartIndex(part.PartNumber)
if partIdx == -1 {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, traceError(InvalidPart{})
}
if fsMeta.Parts[partIdx].ETag != part.ETag {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, traceError(InvalidPart{})
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, traceError(PartTooSmall{
PartNumber: part.PartNumber,
PartSize: fsMeta.Parts[partIdx].Size,
PartETag: part.ETag,
})
}
}
// Wait for any competing PutObject() operation on bucket/object, since same namespace
// would be acquired for `fs.json`.
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
metaFile, err := fs.rwPool.Create(fsMetaPath)
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, toObjectErr(traceError(err), bucket, object)
}
defer metaFile.Close()
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
// This lock is held during rename of the appended tmp file to the actual
// location so that any competing GetObject/PutObject/DeleteObject do not race.
appendFallback := true // In case background-append did not append the required parts.
if isPartsSame(fsMeta.Parts, parts) {
err = fs.complete(bucket, object, uploadID, fsMeta)
if err == nil {
appendFallback = false
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID)
if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, toObjectErr(err, minioMetaTmpBucket, uploadID)
}
}
}
if appendFallback {
// background append could not do append all the required parts, hence we do it here.
tempObj := uploadID + "-" + "part.1"
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
// Delete the temporary object in the case of a
// failure. If PutObject succeeds, then there would be
// nothing to delete.
defer fsRemoveFile(fsTmpObjPath)
// Allocate staging buffer.
var buf = make([]byte, readSizeV1)
for _, part := range parts {
// Construct part suffix.
partSuffix := fmt.Sprintf("object%d", part.PartNumber)
multipartPartFile := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, partSuffix)
var reader io.ReadCloser
var offset int64
reader, _, err = fsOpenFile(multipartPartFile, offset)
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
if err == errFileNotFound {
return oi, traceError(InvalidPart{})
}
return oi, toObjectErr(traceError(err), minioMetaMultipartBucket, partSuffix)
}
// No need to hold a lock, this is a unique file and will be only written
// to one one process per uploadID per minio process.
var wfile *os.File
wfile, err = os.OpenFile((fsTmpObjPath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
reader.Close()
fs.rwPool.Close(fsMetaPathMultipart)
return oi, toObjectErr(traceError(err), bucket, object)
}
_, err = io.CopyBuffer(wfile, reader, buf)
if err != nil {
wfile.Close()
reader.Close()
fs.rwPool.Close(fsMetaPathMultipart)
return oi, toObjectErr(traceError(err), bucket, object)
}
wfile.Close()
reader.Close()
}
if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, toObjectErr(err, minioMetaTmpBucket, uploadID)
}
}
// No need to save part info, since we have concatenated all parts.
fsMeta.Parts = nil
// Save additional metadata.
if len(fsMeta.Meta) == 0 {
fsMeta.Meta = make(map[string]string)
}
fsMeta.Meta["etag"] = s3MD5
// Write all the set metadata.
if _, err = fsMeta.WriteTo(metaFile); err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return oi, toObjectErr(err, bucket, object)
}
// Close lock held on bucket/object/uploadid/fs.json,
// this needs to be done for windows so that we can happily
// delete the bucket/object/uploadid
fs.rwPool.Close(fsMetaPathMultipart)
// Cleanup all the parts if everything else has been safely committed.
multipartObjectDir := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object)
multipartUploadIDDir := pathJoin(multipartObjectDir, uploadID)
if err = fsRemoveUploadIDPath(multipartObjectDir, multipartUploadIDDir); err != nil {
return oi, toObjectErr(err, bucket, object)
}
// Remove entry from `uploads.json`.
removeObjectDir, err = fs.removeUploadID(bucket, object, uploadID, rwlk)
if err != nil {
return oi, toObjectErr(err, minioMetaMultipartBucket, pathutil.Join(bucket, object))
}
fi, err := fsStatFile(fsNSObjPath)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
// Return object info.
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}
// AbortMultipartUpload - aborts an ongoing multipart operation
// signified by the input uploadID. This is an atomic operation
// doesn't require clients to initiate multiple such requests.
//
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
//
// Implements S3 compatible Abort multipart API, slight difference is
// that this is an atomic idempotent operation. Subsequent calls have
// no affect and further requests to the same uploadID would not be
// honored.
func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
if err := checkAbortMultipartArgs(bucket, object, fs); err != nil {
return err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return toObjectErr(err, bucket)
}
uploadIDPath := pathJoin(bucket, object, uploadID)
var removeObjectDir bool
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer func() {
if removeObjectDir {
basePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket)
derr := fsDeleteFile(basePath, pathJoin(basePath, object))
errorIf(derr, "unable to remove %s in %s", pathJoin(basePath, object), basePath)
}
objectMPartPathLock.Unlock()
}()
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
if _, err := fs.rwPool.Open(fsMetaPath); err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return traceError(InvalidUploadID{UploadID: uploadID})
}
return toObjectErr(traceError(err), bucket, object)
}
uploadsPath := pathJoin(bucket, object, uploadsJSONFile)
rwlk, err := fs.rwPool.Write(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
if err != nil {
fs.rwPool.Close(fsMetaPath)
if err == errFileNotFound || err == errFileAccessDenied {
return traceError(InvalidUploadID{UploadID: uploadID})
}
return toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
// Signal appendParts routine to stop waiting for new parts to arrive.
fs.abort(uploadID)
// Close lock held on bucket/object/uploadid/fs.json,
// this needs to be done for windows so that we can happily
// delete the bucket/object/uploadid
fs.rwPool.Close(fsMetaPath)
// Cleanup all uploaded parts and abort the upload.
multipartObjectDir := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object)
multipartUploadIDDir := pathJoin(multipartObjectDir, uploadID)
if err = fsRemoveUploadIDPath(multipartObjectDir, multipartUploadIDDir); err != nil {
return toObjectErr(err, bucket, object)
}
// Remove entry from `uploads.json`.
removeObjectDir, err = fs.removeUploadID(bucket, object, uploadID, rwlk)
if err != nil {
return toObjectErr(err, bucket, object)
}
return nil
}
// Removes multipart uploads if any older than `expiry` duration in a given bucket.
func (fs fsObjects) cleanupStaleMultipartUpload(bucket string, expiry time.Duration) (err error) {
var lmi ListMultipartsInfo
var st os.FileInfo
for {
// List multipart uploads in a bucket 1000 at a time
prefix := ""
lmi, err = fs.listMultipartUploadsCleanup(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000)
if err != nil {
errorIf(err, "Unable to list uploads")
return err
}
// Remove uploads (and its parts) older than expiry duration.
for _, upload := range lmi.Uploads {
uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, upload.Object, upload.UploadID)
if st, err = fsStatDir(uploadIDPath); err != nil {
errorIf(err, "Failed to lookup uploads directory path %s", uploadIDPath)
continue
}
if time.Since(st.ModTime()) > expiry {
fs.AbortMultipartUpload(bucket, upload.Object, upload.UploadID)
}
}
// No more incomplete uploads remain, break and return.
if !lmi.IsTruncated {
break
}
}
return nil
}
// Removes multipart uploads if any older than `expiry` duration
// on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine.
func (fs fsObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
ticker := time.NewTicker(cleanupInterval)
for {
select {
case <-doneCh:
// Stop the timer.
ticker.Stop()
return
case <-ticker.C:
bucketInfos, err := fs.ListBuckets()
if err != nil {
errorIf(err, "Unable to list buckets")
continue
}
for _, bucketInfo := range bucketInfos {
fs.cleanupStaleMultipartUpload(bucketInfo.Name, expiry)
}
}
}
}