From 33cd910d3a26bad251a73d48dffb5ed2f462106c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 8 Apr 2016 17:13:16 -0700 Subject: [PATCH] backend/fs: More cleanup and start using checkBuckeArg. (#1306) backend/fs: More cleanup and start using checkBucketArg. --- fs-bucket-listobjects.go | 22 +++------- fs-bucket.go | 37 +++++----------- fs-dir-common.go | 3 +- fs-multipart-dir.go | 50 +++++++++++---------- fs-multipart.go | 88 ++++++++++++------------------------- fs-object.go | 95 ++++++++++++++-------------------------- fs-object_test.go | 4 +- fs.go | 44 ++++++++++++++++--- fs_api_suite_test.go | 12 ++--- server_fs_test.go | 2 - storage-local.go | 18 -------- 11 files changed, 152 insertions(+), 223 deletions(-) diff --git a/fs-bucket-listobjects.go b/fs-bucket-listobjects.go index 6e55f3b8b..c5b234b20 100644 --- a/fs-bucket-listobjects.go +++ b/fs-bucket-listobjects.go @@ -80,24 +80,12 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe result := ListObjectsInfo{} // Input validation. - if !IsValidBucketName(bucket) { - return result, probe.NewError(BucketNameInvalid{Bucket: bucket}) + bucket, e := fs.checkBucketArg(bucket) + if e != nil { + return result, probe.NewError(e) } + bucketDir := filepath.Join(fs.diskPath, bucket) - bucket = getActualBucketname(fs.path, bucket) // Get the right bucket name. - bucketDir := filepath.Join(fs.path, bucket) - // Verify if bucket exists. - if status, e := isDirExist(bucketDir); !status { - if e == nil { - // File exists, but its not a directory. - return result, probe.NewError(BucketNotFound{Bucket: bucket}) - } else if os.IsNotExist(e) { - // File does not exist. - return result, probe.NewError(BucketNotFound{Bucket: bucket}) - } else { - return result, probe.NewError(e) - } - } if !IsValidObjectPrefix(prefix) { return result, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } @@ -149,7 +137,7 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe // popTreeWalker returns the channel from which rest of the objects can be retrieved. walker := fs.lookupTreeWalk(listObjectParams{bucket, delimiter, marker, prefix}) if walker == nil { - walker = startTreeWalk(fs.path, bucket, filepath.FromSlash(prefix), filepath.FromSlash(marker), recursive) + walker = startTreeWalk(fs.diskPath, bucket, filepath.FromSlash(prefix), filepath.FromSlash(marker), recursive) } nextMarker := "" diff --git a/fs-bucket.go b/fs-bucket.go index e3131ce15..bebc36e9e 100644 --- a/fs-bucket.go +++ b/fs-bucket.go @@ -22,7 +22,6 @@ import ( "path/filepath" "strings" - "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/probe" ) @@ -34,8 +33,8 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } - bucket = getActualBucketname(fs.path, bucket) - bucketDir := filepath.Join(fs.path, bucket) + bucket = getActualBucketname(fs.diskPath, bucket) + bucketDir := filepath.Join(fs.diskPath, bucket) if e := os.Remove(bucketDir); e != nil { // Error if there was no bucket in the first place. if os.IsNotExist(e) { @@ -57,7 +56,7 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { // ListBuckets - Get service. func (fs Filesystem) ListBuckets() ([]BucketInfo, *probe.Error) { - files, e := ioutil.ReadDir(fs.path) + files, e := ioutil.ReadDir(fs.diskPath) if e != nil { return []BucketInfo{}, probe.NewError(e) } @@ -105,32 +104,16 @@ func removeDuplicateBuckets(buckets []BucketInfo) []BucketInfo { // MakeBucket - PUT Bucket func (fs Filesystem) MakeBucket(bucket string) *probe.Error { - di, err := disk.GetInfo(fs.path) - if err != nil { - return probe.NewError(err) - } - - // Remove 5% from total space for cumulative disk space used for - // journalling, inodes etc. - availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 - if int64(availableDiskSpace) <= fs.minFreeDisk { - return probe.NewError(RootPathFull{Path: fs.path}) - } - - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { + if _, e := fs.checkBucketArg(bucket); e == nil { + return probe.NewError(BucketExists{Bucket: bucket}) + } else if _, ok := e.(BucketNameInvalid); ok { return probe.NewError(BucketNameInvalid{Bucket: bucket}) } - - bucket = getActualBucketname(fs.path, bucket) - bucketDir := filepath.Join(fs.path, bucket) - if _, e := os.Stat(bucketDir); e == nil { - return probe.NewError(BucketExists{Bucket: bucket}) - } + bucketDir := filepath.Join(fs.diskPath, bucket) // Make bucket. if e := os.Mkdir(bucketDir, 0700); e != nil { - return probe.NewError(err) + return probe.NewError(e) } return nil } @@ -162,9 +145,9 @@ func (fs Filesystem) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) { if !IsValidBucketName(bucket) { return BucketInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } - bucket = getActualBucketname(fs.path, bucket) + bucket = getActualBucketname(fs.diskPath, bucket) // Get bucket path. - bucketDir := filepath.Join(fs.path, bucket) + bucketDir := filepath.Join(fs.diskPath, bucket) fi, e := os.Stat(bucketDir) if e != nil { // Check if bucket exists. diff --git a/fs-dir-common.go b/fs-dir-common.go index 0716bf146..96d247a45 100644 --- a/fs-dir-common.go +++ b/fs-dir-common.go @@ -210,7 +210,8 @@ func startTreeWalk(fsPath, bucket, prefix, marker string, recursive bool) *treeW return false } } - treeWalk(filepath.Join(fsPath, bucket), prefixDir, entryPrefixMatch, marker, recursive, send, &count) + bucketDir := filepath.Join(fsPath, bucket) + treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker, recursive, send, &count) }() return &walkNotify } diff --git a/fs-multipart-dir.go b/fs-multipart-dir.go index 1266b2335..b023cbb6d 100644 --- a/fs-multipart-dir.go +++ b/fs-multipart-dir.go @@ -65,7 +65,7 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, // TODO: check markerPath must be a file if uploadIDMarker != "" { - markerPath = filepath.Join(markerPath, uploadIDMarker+uploadIDSuffix) + markerPath = filepath.Join(markerPath, uploadIDMarker+multipartUploadIDSuffix) } // TODO: check if markerPath starts with bucketDir @@ -112,15 +112,19 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, } for { - dirents, err := scandir(scanDir, - func(dirent fsDirent) bool { - if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, uploadIDSuffix)) { - return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath - } - - return false - }, - false) + // Filters scandir entries. This filter function is + // specific for multipart listing. + multipartFilterFn := func(dirent fsDirent) bool { + // Verify if dirent is a directory a regular file + // with match uploadID suffix. + if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) { + // Return if dirent matches prefix and + // lexically higher than marker. + return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath + } + return false + } + dirents, err := scandir(scanDir, multipartFilterFn, false) if err != nil { send(multipartObjectInfo{Err: err}) return @@ -129,19 +133,19 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, var dirent fsDirent for len(dirents) > 0 { dirent, dirents = dirents[0], dirents[1:] - if dirent.IsRegular() { // Handle uploadid file name := strings.Replace(filepath.Dir(dirent.name), bucketDir, "", 1) if name == "" { // This should not happen ie uploadid file should not be in bucket directory - send(multipartObjectInfo{Err: errors.New("corrupted meta data")}) + send(multipartObjectInfo{Err: errors.New("Corrupted metadata")}) return } - uploadID := strings.Split(filepath.Base(dirent.name), uploadIDSuffix)[0] + uploadID := strings.Split(filepath.Base(dirent.name), multipartUploadIDSuffix)[0] - // In some OS modTime is empty and use os.Stat() to fill missing values + // Solaris and older unixes have modTime to be + // empty, fall back to os.Stat() to fill missing values. if dirent.modTime.IsZero() { if fi, e := os.Stat(dirent.name); e == nil { dirent.modTime = fi.ModTime() @@ -164,11 +168,11 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, continue } - subDirents, err := scandir(dirent.name, - func(dirent fsDirent) bool { - return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, uploadIDSuffix)) - }, - false) + multipartSubDirentFilterFn := func(dirent fsDirent) bool { + return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) + } + // Fetch sub dirents. + subDirents, err := scandir(dirent.name, multipartSubDirentFilterFn, false) if err != nil { send(multipartObjectInfo{Err: err}) return @@ -191,9 +195,10 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, } } - // send directory only for non-recursive listing + // Send directory only for non-recursive listing if !recursive && (subDirFound || len(subDirents) == 0) { - // In some OS modTime is empty and use os.Stat() to fill missing values + // Solaris and older unixes have modTime to be + // empty, fall back to os.Stat() to fill missing values. if dirent.modTime.IsZero() { if fi, e := os.Stat(dirent.name); e == nil { dirent.modTime = fi.ModTime() @@ -226,13 +231,13 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, } markerPath = scanDir + string(os.PathSeparator) - if scanDir = filepath.Dir(scanDir); scanDir < dirDepth { break } } }() + // Return multipart info. return multipartObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh} } @@ -258,7 +263,6 @@ func (oic *multipartObjectInfoChannel) Read() (multipartObjectInfo, bool) { if oic.closed { return multipartObjectInfo{}, false } - if oic.objInfo == nil { // First read. if oi, ok := <-oic.ch; ok { diff --git a/fs-multipart.go b/fs-multipart.go index 5fd1a343f..c6c8ea726 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -28,16 +28,18 @@ import ( "strconv" "strings" - "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/safe" "github.com/skyrings/skyring-common/tools/uuid" ) -const configDir = ".minio" -const uploadIDSuffix = ".uploadid" +const ( + minioMetaDir = ".minio" + multipartUploadIDSuffix = ".uploadid" +) +// Removes files and its parent directories up to a given level. func removeFileTree(fileName string, level string) error { if e := os.Remove(fileName); e != nil { return e @@ -49,7 +51,6 @@ func removeFileTree(fileName string, level string) error { } else if !status { break } - if e := os.Remove(fileDir); e != nil { return e } @@ -126,7 +127,7 @@ func makeS3MD5(md5Strs ...string) (string, *probe.Error) { } func (fs Filesystem) newUploadID(bucket, object string) (string, error) { - metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) + metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object) // create metaObjectDir if not exist if status, e := isDirExist(metaObjectDir); e != nil { @@ -144,7 +145,7 @@ func (fs Filesystem) newUploadID(bucket, object string) (string, error) { } uploadID := uuid.String() - uploadIDFile := filepath.Join(metaObjectDir, uploadID+uploadIDSuffix) + uploadIDFile := filepath.Join(metaObjectDir, uploadID+multipartUploadIDSuffix) if _, e := os.Lstat(uploadIDFile); e != nil { if !os.IsNotExist(e) { return "", e @@ -163,11 +164,11 @@ func (fs Filesystem) newUploadID(bucket, object string) (string, error) { } func (fs Filesystem) isUploadIDExist(bucket, object, uploadID string) (bool, error) { - return isFileExist(filepath.Join(fs.path, configDir, bucket, object, uploadID+uploadIDSuffix)) + return isFileExist(filepath.Join(fs.diskPath, minioMetaDir, bucket, object, uploadID+multipartUploadIDSuffix)) } func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error { - metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) + metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object) uploadIDPrefix := uploadID + "." dirents, e := scandir(metaObjectDir, @@ -182,19 +183,14 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error { for _, dirent := range dirents { if e := os.Remove(filepath.Join(metaObjectDir, dirent.name)); e != nil { - //return InternalError{Err: err} return e } } if status, e := isDirEmpty(metaObjectDir); e != nil { - // TODO: add log than returning error - //return InternalError{Err: err} return e } else if status { - if e := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); e != nil { - // TODO: add log than returning error - //return InternalError{Err: err} + if e := removeFileTree(metaObjectDir, filepath.Join(fs.diskPath, minioMetaDir, bucket)); e != nil { return e } } @@ -202,37 +198,6 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error { return nil } -func (fs Filesystem) checkBucketArg(bucket string) (string, error) { - if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} - } - - bucket = getActualBucketname(fs.path, bucket) - if status, e := isDirExist(filepath.Join(fs.path, bucket)); e != nil { - //return "", InternalError{Err: err} - return "", e - } else if !status { - return "", BucketNotFound{Bucket: bucket} - } - - return bucket, nil -} - -func (fs Filesystem) checkDiskFree() error { - di, e := disk.GetInfo(fs.path) - if e != nil { - return e - } - - // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. - availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 - if int64(availableDiskSpace) <= fs.minFreeDisk { - return RootPathFull{Path: fs.path} - } - - return nil -} - func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) { bucket, e := fs.checkBucketArg(bucket) if e != nil { @@ -254,7 +219,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E return "", probe.NewError(e) } - if e := fs.checkDiskFree(); e != nil { + if e := checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil { return "", probe.NewError(e) } @@ -290,12 +255,12 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i return "", probe.NewError(errors.New("invalid part id, should be not more than 10000")) } - if e := fs.checkDiskFree(); e != nil { + if e := checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil { return "", probe.NewError(e) } partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex) - partFilePath := filepath.Join(fs.path, configDir, bucket, object, partSuffix) + partFilePath := filepath.Join(fs.diskPath, minioMetaDir, bucket, object, partSuffix) if e := safeWriteFile(partFilePath, data, size, md5Hex); e != nil { return "", probe.NewError(e) } @@ -339,11 +304,11 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) } - if e := fs.checkDiskFree(); e != nil { + if e := checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil { return ObjectInfo{}, probe.NewError(e) } - metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) + metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object) var md5Sums []string for _, part := range parts { @@ -397,7 +362,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa return ObjectInfo{}, probe.NewError(e) } - bucketPath := filepath.Join(fs.path, bucket) + bucketPath := filepath.Join(fs.diskPath, bucket) objectPath := filepath.Join(bucketPath, object) if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil { os.Remove(completeObjectFile) @@ -469,10 +434,9 @@ func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectPara func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) { result := ListMultipartsInfo{} - if bucketDirName, err := fs.checkBucketArg(bucket); err == nil { - bucket = bucketDirName - } else { - return result, probe.NewError(err) + bucket, e := fs.checkBucketArg(bucket) + if e != nil { + return result, probe.NewError(e) } if !IsValidObjectPrefix(objectPrefix) { @@ -519,7 +483,7 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa recursive = false } - bucketDir := filepath.Join(fs.path, configDir, bucket) + metaBucketDir := filepath.Join(fs.diskPath, minioMetaDir, bucket) // Lookup of if listMultipartObjectChannel is available for given // parameters, else create a new one. multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{ @@ -530,7 +494,7 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa uploadIDMarker: uploadIDMarker, }) if multipartObjectInfoCh == nil { - ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) + ch := scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) multipartObjectInfoCh = &ch } @@ -574,7 +538,13 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa result.IsTruncated = true result.NextKeyMarker = nextKeyMarker result.NextUploadIDMarker = nextUploadIDMarker - fs.saveListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *multipartObjectInfoCh) + fs.saveListMultipartObjectCh(listMultipartObjectParams{ + bucket: bucket, + delimiter: delimiter, + keyMarker: nextKeyMarker, + prefix: objectPrefix, + uploadIDMarker: nextUploadIDMarker, + }, *multipartObjectInfoCh) } return result, nil @@ -604,7 +574,7 @@ func (fs Filesystem) ListObjectParts(bucket, object, uploadID string, partNumber maxParts = 1000 } - metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) + metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object) uploadIDPrefix := uploadID + "." dirents, e := scandir(metaObjectDir, diff --git a/fs-object.go b/fs-object.go index e4afbd46a..c10f91832 100644 --- a/fs-object.go +++ b/fs-object.go @@ -27,7 +27,6 @@ import ( "encoding/hex" "runtime" - "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/safe" @@ -38,23 +37,21 @@ import ( // GetObject - GET object func (fs Filesystem) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) { // Input validation. - if !IsValidBucketName(bucket) { - return nil, probe.NewError(BucketNameInvalid{Bucket: bucket}) + bucket, e := fs.checkBucketArg(bucket) + if e != nil { + return nil, probe.NewError(e) } if !IsValidObjectName(object) { return nil, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // normalize buckets. - bucket = getActualBucketname(fs.path, bucket) - objectPath := filepath.Join(fs.path, bucket, object) - + objectPath := filepath.Join(fs.diskPath, bucket, object) file, e := os.Open(objectPath) if e != nil { // If the object doesn't exist, the bucket might not exist either. Stat for // the bucket and give a better error message if that is true. if os.IsNotExist(e) { - _, e = os.Stat(filepath.Join(fs.path, bucket)) + _, e = os.Stat(filepath.Join(fs.diskPath, bucket)) if os.IsNotExist(e) { return nil, probe.NewError(BucketNotFound{Bucket: bucket}) } @@ -69,7 +66,10 @@ func (fs Filesystem) GetObject(bucket, object string, startOffset int64) (io.Rea } // Object path is a directory prefix, return object not found error. if st.IsDir() { - return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) + return nil, probe.NewError(ObjectExistsAsPrefix{ + Bucket: bucket, + Prefix: object, + }) } // Seek to a starting offset. @@ -87,25 +87,16 @@ func (fs Filesystem) GetObject(bucket, object string, startOffset int64) (io.Rea // GetObjectInfo - get object info. func (fs Filesystem) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) { // Input validation. - if !IsValidBucketName(bucket) { - return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + bucket, e := fs.checkBucketArg(bucket) + if e != nil { + return ObjectInfo{}, probe.NewError(e) } if !IsValidObjectName(object) { return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - // Normalize buckets. - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e := os.Stat(bucketPath); e != nil { - if os.IsNotExist(e) { - return ObjectInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) - } - return ObjectInfo{}, probe.NewError(e) - } - - info, err := getObjectInfo(fs.path, bucket, object) + info, err := getObjectInfo(fs.diskPath, bucket, object) if err != nil { if os.IsNotExist(err.ToGoError()) { return ObjectInfo{}, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) @@ -145,7 +136,7 @@ func getObjectInfo(rootPath, bucket, object string) (ObjectInfo, *probe.Error) { contentType = content.ContentType } } - metadata := ObjectInfo{ + objInfo := ObjectInfo{ Bucket: bucket, Name: object, ModifiedTime: stat.ModTime(), @@ -153,7 +144,7 @@ func getObjectInfo(rootPath, bucket, object string) (ObjectInfo, *probe.Error) { ContentType: contentType, IsDir: stat.Mode().IsDir(), } - return metadata, nil + return objInfo, nil } // isMD5SumEqual - returns error if md5sum mismatches, success its `nil` @@ -181,39 +172,25 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool { // PutObject - create an object. func (fs Filesystem) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) { - di, e := disk.GetInfo(fs.path) + // Check bucket name valid. + bucket, e := fs.checkBucketArg(bucket) if e != nil { return ObjectInfo{}, probe.NewError(e) } - // Remove 5% from total space for cumulative disk space used for - // journalling, inodes etc. - availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 - if int64(availableDiskSpace) <= fs.minFreeDisk { - return ObjectInfo{}, probe.NewError(RootPathFull{Path: fs.path}) - } - - // Check bucket name valid. - if !IsValidBucketName(bucket) { - return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e = os.Stat(bucketPath); e != nil { - if os.IsNotExist(e) { - return ObjectInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) - } - return ObjectInfo{}, probe.NewError(e) - } + bucketDir := filepath.Join(fs.diskPath, bucket) // Verify object path legal. if !IsValidObjectName(object) { return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } + if e = checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil { + return ObjectInfo{}, probe.NewError(e) + } + // Get object path. - objectPath := filepath.Join(bucketPath, object) + objectPath := filepath.Join(bucketDir, object) // md5Hex representation. var md5Hex string @@ -328,20 +305,13 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error // DeleteObject - delete object. func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { // Check bucket name valid - if !IsValidBucketName(bucket) { - return probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - // Check bucket exists - if _, e := os.Stat(bucketPath); e != nil { - if os.IsNotExist(e) { - return probe.NewError(BucketNotFound{Bucket: bucket}) - } + bucket, e := fs.checkBucketArg(bucket) + if e != nil { return probe.NewError(e) } + bucketDir := filepath.Join(fs.diskPath, bucket) + // Verify object path legal if !IsValidObjectName(object) { return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) @@ -349,21 +319,20 @@ func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { // Do not use filepath.Join() since filepath.Join strips off any // object names with '/', use them as is in a static manner so - // that we can send a proper 'ObjectNotFound' reply back upon - // os.Stat(). + // that we can send a proper 'ObjectNotFound' reply back upon os.Stat(). var objectPath string if runtime.GOOS == "windows" { - objectPath = fs.path + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object + objectPath = fs.diskPath + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object } else { - objectPath = fs.path + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object + objectPath = fs.diskPath + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object } // Delete object path if its empty. - err := deleteObjectPath(bucketPath, objectPath, bucket, object) + err := deleteObjectPath(bucketDir, objectPath, bucket, object) if err != nil { if os.IsNotExist(err.ToGoError()) { return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) } - return err.Trace(bucketPath, objectPath, bucket, object) + return err.Trace(bucketDir, objectPath, bucket, object) } return nil } diff --git a/fs-object_test.go b/fs-object_test.go index cf0647f61..d9958c294 100644 --- a/fs-object_test.go +++ b/fs-object_test.go @@ -76,8 +76,8 @@ func TestGetObjectInfo(t *testing.T) { {"abcdefgh", "abc", ObjectInfo{}, BucketNotFound{Bucket: "abcdefgh"}, false}, {"ijklmnop", "efg", ObjectInfo{}, BucketNotFound{Bucket: "ijklmnop"}, false}, // Test cases with valid but non-existing bucket names and invalid object name (Test number 8-9). - {"abcdefgh", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "abcdefgh", Object: ""}, false}, - {"ijklmnop", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "ijklmnop", Object: ""}, false}, + {"test-getobjectinfo", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "test-getobjectinfo", Object: ""}, false}, + {"test-getobjectinfo", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "test-getobjectinfo", Object: ""}, false}, // Test cases with non-existing object name with existing bucket (Test number 10-12). {"test-getobjectinfo", "Africa", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Africa"}, false}, {"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false}, diff --git a/fs.go b/fs.go index 2f639eb94..dbf927f65 100644 --- a/fs.go +++ b/fs.go @@ -17,8 +17,11 @@ package main import ( + "os" + "path/filepath" "sync" + "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/probe" ) @@ -41,7 +44,7 @@ type listMultipartObjectParams struct { // Filesystem - local variables type Filesystem struct { - path string + diskPath string minFreeDisk int64 rwLock *sync.RWMutex listObjectMap map[listObjectParams][]*treeWalker @@ -51,20 +54,21 @@ type Filesystem struct { } // newFS instantiate a new filesystem. -func newFS(rootPath string) (ObjectAPI, *probe.Error) { +func newFS(diskPath string) (ObjectAPI, *probe.Error) { fs := &Filesystem{ rwLock: &sync.RWMutex{}, } - fs.path = rootPath + fs.diskPath = diskPath /// Defaults - // Minium free disk required for i/o operations to succeed. fs.minFreeDisk = 5 + // Initialize list object map. fs.listObjectMap = make(map[listObjectParams][]*treeWalker) fs.listObjectMapMutex = &sync.Mutex{} + // Initialize list multipart map. fs.listMultipartObjectMap = make(map[listMultipartObjectParams][]multipartObjectInfoChannel) fs.listMultipartObjectMapMutex = &sync.Mutex{} @@ -72,7 +76,37 @@ func newFS(rootPath string) (ObjectAPI, *probe.Error) { return fs, nil } +func (fs Filesystem) checkBucketArg(bucket string) (string, error) { + if !IsValidBucketName(bucket) { + return "", BucketNameInvalid{Bucket: bucket} + } + bucket = getActualBucketname(fs.diskPath, bucket) + if status, e := isDirExist(filepath.Join(fs.diskPath, bucket)); !status { + if e == nil { + return "", BucketNotFound{Bucket: bucket} + } else if os.IsNotExist(e) { + return "", BucketNotFound{Bucket: bucket} + } else { + return "", e + } + } + return bucket, nil +} + +func checkDiskFree(diskPath string, minFreeDisk int64) error { + di, e := disk.GetInfo(diskPath) + if e != nil { + return e + } + // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. + availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 + if int64(availableDiskSpace) <= minFreeDisk { + return RootPathFull{Path: diskPath} + } + return nil +} + // GetRootPath - get root path. func (fs Filesystem) GetRootPath() string { - return fs.path + return fs.diskPath } diff --git a/fs_api_suite_test.go b/fs_api_suite_test.go index 804a64db7..757d43846 100644 --- a/fs_api_suite_test.go +++ b/fs_api_suite_test.go @@ -390,22 +390,22 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectAPI) _, err = fs.GetObject("bucket", "dir1", 0) switch err := err.ToGoError().(type) { - case ObjectNotFound: + case ObjectExistsAsPrefix: c.Assert(err.Bucket, check.Equals, "bucket") - c.Assert(err.Object, check.Equals, "dir1") + c.Assert(err.Prefix, check.Equals, "dir1") default: // force a failure with a line number - c.Assert(err, check.Equals, "ObjectNotFound") + c.Assert(err.Error(), check.Equals, "Object exists on : bucket as prefix dir1") } _, err = fs.GetObject("bucket", "dir1/", 0) switch err := err.ToGoError().(type) { - case ObjectNotFound: + case ObjectExistsAsPrefix: c.Assert(err.Bucket, check.Equals, "bucket") - c.Assert(err.Object, check.Equals, "dir1/") + c.Assert(err.Prefix, check.Equals, "dir1/") default: // force a failure with a line number - c.Assert(err, check.Equals, "ObjectNotFound") + c.Assert(err.Error(), check.Equals, "Object exists on : bucket as prefix dir1") } } diff --git a/server_fs_test.go b/server_fs_test.go index fc8e3001a..f94c03fea 100644 --- a/server_fs_test.go +++ b/server_fs_test.go @@ -1070,7 +1070,6 @@ func (s *MyAPISuite) TestObjectMultipartAbort(c *C) { c.Assert(response3.StatusCode, Equals, http.StatusNoContent) } -/* func (s *MyAPISuite) TestBucketMultipartList(c *C) { request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/bucketmultipartlist", 0, nil) c.Assert(err, IsNil) @@ -1159,7 +1158,6 @@ func (s *MyAPISuite) TestBucketMultipartList(c *C) { c.Assert(err, IsNil) c.Assert(newResponse3.Bucket, Equals, "bucketmultipartlist") } -*/ func (s *MyAPISuite) TestValidateObjectMultipartUploadID(c *C) { request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultipartlist-uploadid", 0, nil) diff --git a/storage-local.go b/storage-local.go index 7ad7e264a..a92fed617 100644 --- a/storage-local.go +++ b/storage-local.go @@ -70,24 +70,6 @@ func newLocalStorage(diskPath string) (StorageAPI, error) { return disk, nil } -// checkDiskFree verifies if disk path has sufficient minium free disk -// space. -func checkDiskFree(diskPath string, minFreeDisk int64) error { - di, e := disk.GetInfo(diskPath) - if e != nil { - return e - } - - // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. - availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 - if int64(availableDiskSpace) <= minFreeDisk { - return ErrDiskPathFull - } - - // Success. - return nil -} - // Make a volume entry. func (s localStorage) MakeVol(volume string) error { if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil {