From 82fd90793320d748c85a3cfe4ed52616fe0ab6d4 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 2 Jun 2016 22:49:27 -0700 Subject: [PATCH] XL/PutObject: Handle all pending cases of DiskNotFound. --- posix.go | 35 +++++++++++++++++++++++++++++++++++ tree-walk-xl.go | 5 +++++ xl-v1-bucket.go | 10 +++++----- xl-v1-common.go | 3 +++ xl-v1-metadata.go | 14 ++++++++------ xl-v1-multipart-common.go | 9 ++++++++- xl-v1-multipart.go | 20 +++++++++++++++++--- xl-v1-object.go | 2 -- 8 files changed, 81 insertions(+), 17 deletions(-) diff --git a/posix.go b/posix.go index c543be094..db97681fd 100644 --- a/posix.go +++ b/posix.go @@ -249,6 +249,11 @@ func (s posix) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - delete a volume. func (s posix) DeleteVol(volume string) error { + // Validate if disk is free. + if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return err + } + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -274,6 +279,11 @@ func (s posix) DeleteVol(volume string) error { // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing "/". func (s posix) ListDir(volume, dirPath string) ([]string, error) { + // Validate if disk is free. + if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return nil, err + } + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -296,6 +306,11 @@ func (s posix) ListDir(volume, dirPath string) ([]string, error) { // for io.EOF. Additionally ReadFile also starts reading from an // offset. func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) { + // Validate if disk is free. + if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return 0, err + } + volumeDir, err := s.getVolDir(volume) if err != nil { return 0, err @@ -356,6 +371,11 @@ func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n // AppendFile - append a byte array at path, if file doesn't exist at // path this call explicitly creates it. func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) { + // Validate if disk is free. + if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return 0, err + } + volumeDir, err := s.getVolDir(volume) if err != nil { return 0, err @@ -403,6 +423,11 @@ func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) // StatFile - get file info. func (s posix) StatFile(volume, path string) (file FileInfo, err error) { + // Validate if disk is free. + if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return FileInfo{}, err + } + volumeDir, err := s.getVolDir(volume) if err != nil { return FileInfo{}, err @@ -480,6 +505,11 @@ func deleteFile(basePath, deletePath string) error { // DeleteFile - delete a file at path. func (s posix) DeleteFile(volume, path string) error { + // Validate if disk is free. + if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return err + } + volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -506,6 +536,11 @@ func (s posix) DeleteFile(volume, path string) error { // RenameFile - rename source path to destination path atomically. func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { + // Validate if disk is free. + if err := checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return err + } + srcVolumeDir, err := s.getVolDir(srcVolume) if err != nil { return err diff --git a/tree-walk-xl.go b/tree-walk-xl.go index f49a6c931..3a0ddf3c8 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -53,6 +53,11 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) } entries, err = disk.ListDir(bucket, prefixDir) if err != nil { + // For any reason disk was deleted or goes offline, continue + // and list form other disks if possible. + if err == errDiskNotFound { + continue + } break } // Skip the entries which do not match the filter. diff --git a/xl-v1-bucket.go b/xl-v1-bucket.go index 26e8aa3ff..66f3dc014 100644 --- a/xl-v1-bucket.go +++ b/xl-v1-bucket.go @@ -56,9 +56,7 @@ func (xl xlObjects) MakeBucket(bucket string) error { err := disk.MakeVol(bucket) if err != nil { dErrs[index] = err - return } - dErrs[index] = nil }(index, disk) } @@ -99,7 +97,7 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err var volInfo VolInfo volInfo, err = disk.StatVol(bucketName) if err != nil { - // For some reason disk went offline pick the next one. + // For any reason disk went offline continue and pick the next one. if err == errDiskNotFound { continue } @@ -154,6 +152,10 @@ func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) { } var volsInfo []VolInfo volsInfo, err = disk.ListVols() + // Ignore any disks not found. + if err == errDiskNotFound { + continue + } if err == nil { // NOTE: The assumption here is that volumes across all disks in // readQuorum have consistent view i.e they all have same number @@ -218,9 +220,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error { err := disk.DeleteVol(bucket) if err != nil { dErrs[index] = err - return } - dErrs[index] = nil }(index, disk) } diff --git a/xl-v1-common.go b/xl-v1-common.go index 446e8ffa2..92314d4c9 100644 --- a/xl-v1-common.go +++ b/xl-v1-common.go @@ -63,6 +63,9 @@ func (xl xlObjects) isObject(bucket, prefix string) bool { } _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) if err != nil { + if err == errDiskNotFound { + continue + } return false } break diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 066e4ef0e..1e29abf0c 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -209,6 +209,10 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err var buf []byte buf, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile)) if err != nil { + // For any reason disk is not available continue and read from other disks. + if err == errDiskNotFound { + continue + } return xlMetaV1{}, err } err = json.Unmarshal(buf, &xlMeta) @@ -338,11 +342,10 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet xlMetas[index].Erasure.Index = index + 1 // Write unique `xl.json` for a disk at index. - if err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]); err != nil { + err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]) + if err != nil { mErrs[index] = err - return } - mErrs[index] = nil }(index, disk) } @@ -404,11 +407,10 @@ func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1) metadata.Erasure.Index = index + 1 // Write xl metadata. - if err := writeXLMetadata(disk, bucket, prefix, metadata); err != nil { + err := writeXLMetadata(disk, bucket, prefix, metadata) + if err != nil { mErrs[index] = err - return } - mErrs[index] = nil }(index, disk, xlMeta) } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index a2a0214df..079eebdee 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -363,6 +363,9 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { } _, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) if err != nil { + if err == errDiskNotFound { + continue + } return false } break @@ -377,8 +380,12 @@ func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo continue } splitPrefixes := strings.SplitN(prefixPath, "/", 3) - uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk) + var uploadsJSON uploadsV1 + uploadsJSON, err = readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk) if err != nil { + if err == errDiskNotFound { + continue + } if err == errFileNotFound { return []uploadInfo{}, nil } diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 7becbe7ad..844e3f66c 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -68,6 +68,9 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark continue } uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) + if err == errDiskNotFound { + continue + } break } nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) @@ -124,9 +127,12 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark if disk == nil { continue } + newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) + if err == errDiskNotFound { + continue + } break } - newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) if err != nil { if err == errFileNotFound || walkResult.err == errDiskNotFound { @@ -661,13 +667,17 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. var disk StorageAPI + var uploadsJSON uploadsV1 for _, disk = range xl.getLoadBalancedQuorumDisks() { if disk == nil { continue } + uploadsJSON, err = readUploadsJSON(bucket, object, disk) + if err == errDiskNotFound { + continue + } break } - uploadsJSON, err := readUploadsJSON(bucket, object, disk) if err != nil { return "", toObjectErr(err, minioMetaBucket, object) } @@ -709,13 +719,17 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. var disk StorageAPI + var uploadsJSON uploadsV1 for _, disk = range xl.getLoadBalancedQuorumDisks() { if disk == nil { continue } + uploadsJSON, err = readUploadsJSON(bucket, object, disk) + if err == errDiskNotFound { + continue + } break } - uploadsJSON, err := readUploadsJSON(bucket, object, disk) if err != nil { return toObjectErr(err, bucket, object) } diff --git a/xl-v1-object.go b/xl-v1-object.go index da40ea52d..3f17c4789 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -401,9 +401,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error { err := cleanupDir(disk, bucket, object) if err != nil { dErrs[index] = err - return } - dErrs[index] = nil }(index, disk) }