xl/multipart: Make sure to delete temp renamed object. (#3785)

Existing objects before overwrites are renamed to
temp location in completeMultipart. We make sure
that we delete it even if subsequenty calls fail.

Additionally move verifying of parent dir is a
file earlier to fail the entire operation.

Ref #3784
This commit is contained in:
Harshavardhana 2017-02-21 19:43:44 -08:00 committed by GitHub
parent fe86319c56
commit cc28765025
3 changed files with 39 additions and 26 deletions

View file

@ -483,7 +483,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
objInfo, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata, sha256sum)
}
if err != nil {
errorIf(err, "Unable to create an object.")
errorIf(err, "Unable to create an object. %s", r.URL.Path)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}

View file

@ -499,15 +499,15 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
// success.
defer xl.deleteObject(minioMetaTmpBucket, tempUploadIDPath)
// Attempt to rename temp upload object to actual upload path
// object
if rErr := renameObject(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum); rErr != nil {
// Attempt to rename temp upload object to actual upload path object
rErr := renameObject(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
if rErr != nil {
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
}
initiated := time.Now().UTC()
// Create or update 'uploads.json'
if err := xl.addUploadID(bucket, object, uploadID, initiated); err != nil {
if err = xl.addUploadID(bucket, object, uploadID, initiated); err != nil {
return "", err
}
// Return success.
@ -885,6 +885,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if !xl.isUploadIDExists(bucket, object, uploadID) {
return ObjectInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
}
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := getCompleteMultipartMD5(parts)
if err != nil {
@ -964,11 +971,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
}
}
// Check if an object is present as one of the parent dir.
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
// Save the final object size and modtime.
xlMeta.Stat.Size = objectSize
xlMeta.Stat.ModTime = time.Now().UTC()
@ -990,6 +992,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
}
rErr := commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
if rErr != nil {
return ObjectInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
@ -1008,13 +1011,17 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
}
}()
// Rename if an object already exists to temporary location.
uniqueID := mustGetUUID()
if xl.isObject(bucket, object) {
// Rename if an object already exists to temporary location.
newUniqueID := mustGetUUID()
// Delete success renamed object.
defer xl.deleteObject(minioMetaTmpBucket, newUniqueID)
// NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors.
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, uniqueID, xl.writeQuorum)
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -1038,9 +1045,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Delete the previously successfully renamed object.
xl.deleteObject(minioMetaTmpBucket, uniqueID)
// Hold the lock so that two parallel
// complete-multipart-uploads do not leave a stale
// uploads.json behind.

View file

@ -439,11 +439,25 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// a slash separator, we treat it like a valid operation and
// return success.
if isObjectDir(object, size) {
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
return dirObjectInfo(bucket, object, size, metadata), nil
}
// Validate put object input args.
if err = checkPutObjectArgs(bucket, object, xl); err != nil {
return ObjectInfo{}, err
}
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
// No metadata is set, allocate a new one.
if metadata == nil {
metadata = make(map[string]string)
@ -526,7 +540,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Compute part name
partName := "part." + strconv.Itoa(partIdx)
// Compute the path of current part
tempErasureObj := path.Join(uniqueID, partName)
tempErasureObj := pathJoin(uniqueID, partName)
// Calculate the size of the current part, if size is unknown, curPartSize wil be unknown too.
// allowEmptyPart will always be true if this is the first part and false otherwise.
@ -645,16 +659,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
}
}
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
}
// Rename if an object already exists to temporary location.
newUniqueID := mustGetUUID()
if xl.isObject(bucket, object) {
// Delete the temporary copy of the object that existed before this PutObject request.
// Rename if an object already exists to temporary location.
newUniqueID := mustGetUUID()
// Delete successfully renamed object.
defer xl.deleteObject(minioMetaTmpBucket, newUniqueID)
// NOTE: Do not use online disks slice here.