minio/object-common-multipart.go
Harshavardhana 4bc923e63b XL/fs: Optimize calling isBucketExist() (#1656)
* posix: Avoid using getAllVolumeInfo() in getVolumeDir()

This is necessary compromise to avoid significant slowness this
causes under load. The compromise is also substantial in a way
so that to avoid penalizing common cases v/s special cases.

For buckets with Caps on Unixes, we filter buckets based on the
latest anyways, so this is completely acceptable.

* XL/fs: Change the usage of verification of existance of buckets.

Optimize calling isBucketExists, it is not needed for all call
paths. isBucketExist should be called only for calls which use
temporary volume location for operations, for the rest rely on
the errors returned on their original call path.

Remove usage of filtering as well across all volume names.
2016-05-17 21:22:27 -07:00

547 lines
18 KiB
Go

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"path"
"sort"
"strconv"
"strings"
"github.com/skyrings/skyring-common/tools/uuid"
)
const (
incompleteFile = "00000.incomplete"
uploadsJSONFile = "uploads.json"
)
// createUploadsJSON - create uploads.json placeholder file.
func createUploadsJSON(storage StorageAPI, bucket, object, uploadID string) error {
// Place holder uploads.json
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
uploadsJSONSuffix := fmt.Sprintf("%s.%s", uploadID, uploadsJSONFile)
tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONSuffix)
w, err := storage.CreateFile(minioMetaBucket, uploadsPath)
if err != nil {
return err
}
if err = w.Close(); err != nil {
if clErr := safeCloseAndRemove(w); clErr != nil {
return clErr
}
return err
}
_, err = storage.StatFile(minioMetaBucket, uploadsPath)
if err != nil {
if err == errFileNotFound {
err = storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)
if err == nil {
return nil
}
}
if derr := storage.DeleteFile(minioMetaBucket, tmpUploadsPath); derr != nil {
return derr
}
return err
}
return nil
}
/// Common multipart object layer functions.
// newMultipartUploadCommon - initialize a new multipart, is a common
// function for both object layers.
func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) (uploadID string, err error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if !isBucketExist(storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Loops through until successfully generates a new unique upload id.
for {
uuid, err := uuid.New()
if err != nil {
return "", err
}
uploadID := uuid.String()
// Create placeholder file 'uploads.json'
err = createUploadsJSON(storage, bucket, object, uploadID)
if err != nil {
return "", err
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile)
incompleteSuffix := fmt.Sprintf("%s.%s", uploadID, incompleteFile)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, incompleteSuffix)
if _, err = storage.StatFile(minioMetaBucket, uploadIDPath); err != nil {
if err != errFileNotFound {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// uploadIDPath doesn't exist, so create empty file to reserve the name
var w io.WriteCloser
if w, err = storage.CreateFile(minioMetaBucket, tempUploadIDPath); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
// Close the writer.
if err = w.Close(); err != nil {
if clErr := safeCloseAndRemove(w); clErr != nil {
return "", toObjectErr(clErr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
err = storage.RenameFile(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
if err != nil {
if derr := storage.DeleteFile(minioMetaBucket, tempUploadIDPath); derr != nil {
return "", toObjectErr(derr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
return uploadID, nil
}
// uploadIDPath already exists.
// loop again to try with different uuid generated.
}
}
// putObjectPartCommon - put object part.
func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if !isBucketExist(storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
}
if !isUploadIDExists(storage, bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
}
partSuffix := fmt.Sprintf("%s.%.5d", uploadID, partID)
partSuffixPath := path.Join(tmpMetaPrefix, bucket, object, partSuffix)
fileWriter, err := storage.CreateFile(minioMetaBucket, partSuffixPath)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
// Reader shouldn't have more data what mentioned in size argument.
// reading one more byte from the reader to validate it.
// expected to fail, success validates existence of more data in the reader.
if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", UnExpectedDataSize{Size: int(size)}
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", err
}
partSuffixMD5 := fmt.Sprintf("%.5d.%s", partID, newMD5Hex)
partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffixMD5)
err = storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path)
if err != nil {
if derr := storage.DeleteFile(minioMetaBucket, partSuffixPath); derr != nil {
return "", toObjectErr(derr, minioMetaBucket, partSuffixPath)
}
return "", toObjectErr(err, minioMetaBucket, partSuffixMD5Path)
}
return newMD5Hex, nil
}
// Wrapper to which removes all the uploaded parts after a successful
// complete multipart upload.
func cleanupUploadedParts(storage StorageAPI, bucket, object, uploadID string) error {
return cleanupDir(storage, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID))
}
// abortMultipartUploadCommon - aborts a multipart upload, common
// function used by both object layers.
func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(storage, bucket) {
return BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
if !isUploadIDExists(storage, bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID}
}
return cleanupUploadedParts(storage, bucket, object, uploadID)
}
// isIncompleteMultipart - is object incomplete multipart.
func isIncompleteMultipart(storage StorageAPI, objectPath string) (bool, error) {
_, err := storage.StatFile(minioMetaBucket, path.Join(objectPath, uploadsJSONFile))
if err != nil {
if err == errFileNotFound {
return false, nil
}
return false, err
}
return true, nil
}
// listLeafEntries - lists all entries if a given prefixPath is a leaf
// directory, returns error if any - returns empty list if prefixPath
// is not a leaf directory.
func listLeafEntries(storage StorageAPI, prefixPath string) (entries []string, err error) {
var ok bool
if ok, err = isIncompleteMultipart(storage, prefixPath); err != nil {
return nil, err
} else if !ok {
return nil, nil
}
entries, err = storage.ListDir(minioMetaBucket, prefixPath)
if err != nil {
return nil, err
}
var newEntries []string
for _, entry := range entries {
if strings.HasSuffix(entry, slashSeparator) {
newEntries = append(newEntries, entry)
}
}
return newEntries, nil
}
// listMetaBucketMultipartFiles - list all files at a given prefix inside minioMetaBucket.
func listMetaBucketMultipartFiles(layer ObjectLayer, prefixPath string, markerPath string, recursive bool, maxKeys int) (fileInfos []FileInfo, eof bool, err error) {
var storage StorageAPI
switch l := layer.(type) {
case fsObjects:
storage = l.storage
case xlObjects:
storage = l.storage
}
walker := lookupTreeWalk(layer, listParams{minioMetaBucket, recursive, markerPath, prefixPath})
if walker == nil {
walker = startTreeWalk(layer, minioMetaBucket, prefixPath, markerPath, recursive)
}
// newMaxKeys tracks the size of entries which are going to be
// returned back.
var newMaxKeys int
// Following loop gathers and filters out special files inside
// minio meta volume.
for {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound {
return nil, true, nil
}
return nil, false, toObjectErr(walkResult.err, minioMetaBucket, prefixPath)
}
fi := walkResult.fileInfo
var entries []string
if fi.Mode.IsDir() {
// List all the entries if fi.Name is a leaf directory, if
// fi.Name is not a leaf directory then the resulting
// entries are empty.
entries, err = listLeafEntries(storage, fi.Name)
if err != nil {
return nil, false, err
}
}
if len(entries) > 0 {
// We reach here for non-recursive case and a leaf entry.
sort.Strings(entries)
for _, entry := range entries {
var fileInfo FileInfo
incompleteUploadFile := path.Join(fi.Name, entry, incompleteFile)
fileInfo, err = storage.StatFile(minioMetaBucket, incompleteUploadFile)
if err != nil {
return nil, false, err
}
fileInfo.Name = path.Join(fi.Name, entry)
fileInfos = append(fileInfos, fileInfo)
}
} else {
// We reach here for a non-recursive case non-leaf entry
// OR recursive case with fi.Name.
if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries.
// Validate if 'fi.Name' is incomplete multipart.
if !strings.HasSuffix(fi.Name, incompleteFile) {
continue
}
fi.Name = path.Dir(fi.Name)
}
fileInfos = append(fileInfos, fi)
}
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
return
}
}
// Return entries here.
return fileInfos, eof, nil
}
// listMultipartUploadsCommon - lists all multipart uploads, common
// function for both object layers.
func listMultipartUploadsCommon(layer ObjectLayer, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
var storage StorageAPI
switch l := layer.(type) {
case xlObjects:
storage = l.storage
case fsObjects:
storage = l.storage
}
result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(storage, bucket) {
return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, UnsupportedDelimiter{
Delimiter: delimiter,
}
}
// Verify if marker has prefix.
if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) {
return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{
Marker: keyMarker,
Prefix: prefix,
}
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashSeparator) {
return result, InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
}
}
id, err := uuid.Parse(uploadIDMarker)
if err != nil {
return result, err
}
if id.IsZero() {
return result, MalformedUploadID{
UploadID: uploadIDMarker,
}
}
}
recursive := true
if delimiter == slashSeparator {
recursive = false
}
result.IsTruncated = true
result.MaxUploads = maxUploads
// Not using path.Join() as it strips off the trailing '/'.
multipartPrefixPath := pathJoin(mpartMetaPrefix, pathJoin(bucket, prefix))
if prefix == "" {
// Should have a trailing "/" if prefix is ""
// For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is ""
multipartPrefixPath += slashSeparator
}
multipartMarkerPath := ""
if keyMarker != "" {
keyMarkerPath := pathJoin(pathJoin(bucket, keyMarker), uploadIDMarker)
multipartMarkerPath = pathJoin(mpartMetaPrefix, keyMarkerPath)
}
// List all the multipart files at prefixPath, starting with marker keyMarkerPath.
fileInfos, eof, err := listMetaBucketMultipartFiles(layer, multipartPrefixPath, multipartMarkerPath, recursive, maxUploads)
if err != nil {
return ListMultipartsInfo{}, err
}
// Loop through all the received files fill in the multiparts result.
for _, fi := range fileInfos {
var objectName string
var uploadID string
if fi.Mode.IsDir() {
// All directory entries are common prefixes.
uploadID = "" // Upload ids are empty for CommonPrefixes.
objectName = strings.TrimPrefix(fi.Name, retainSlash(pathJoin(mpartMetaPrefix, bucket)))
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
} else {
uploadID = path.Base(fi.Name)
objectName = strings.TrimPrefix(path.Dir(fi.Name), retainSlash(pathJoin(mpartMetaPrefix, bucket)))
result.Uploads = append(result.Uploads, uploadMetadata{
Object: objectName,
UploadID: uploadID,
Initiated: fi.ModTime,
})
}
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
}
result.IsTruncated = !eof
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
// ListObjectParts - list object parts, common function across both object layers.
func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if !isBucketExist(storage, bucket) {
return ListPartsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
if !isUploadIDExists(storage, bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
}
result := ListPartsInfo{}
entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID))
if err != nil {
return result, err
}
sort.Strings(entries)
var newEntries []string
for _, entry := range entries {
newEntries = append(newEntries, path.Base(entry))
}
idx := sort.SearchStrings(newEntries, fmt.Sprintf("%.5d.", partNumberMarker+1))
newEntries = newEntries[idx:]
count := maxParts
for _, entry := range newEntries {
fi, err := storage.StatFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID, entry))
splitEntry := strings.SplitN(entry, ".", 2)
partStr := splitEntry[0]
etagStr := splitEntry[1]
partNum, err := strconv.Atoi(partStr)
if err != nil {
return ListPartsInfo{}, err
}
result.Parts = append(result.Parts, partInfo{
PartNumber: partNum,
LastModified: fi.ModTime,
ETag: etagStr,
Size: fi.Size,
})
count--
if count == 0 {
break
}
}
// If listed entries are more than maxParts, we set IsTruncated as true.
if len(newEntries) > len(result.Parts) {
result.IsTruncated = true
// Make sure to fill next part number marker if IsTruncated is
// true for subsequent listing.
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
result.NextPartNumberMarker = nextPartNumberMarker
}
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
result.MaxParts = maxParts
return result, nil
}
// isUploadIDExists - verify if a given uploadID exists and is valid.
func isUploadIDExists(storage StorageAPI, bucket, object, uploadID string) bool {
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, incompleteFile)
st, err := storage.StatFile(minioMetaBucket, uploadIDPath)
if err != nil {
if err == errFileNotFound {
return false
}
errorIf(err, "Stat failed on "+minioMetaBucket+"/"+uploadIDPath+".")
return false
}
return st.Mode.IsRegular()
}