diff --git a/cmd/gateway-gcs.go b/cmd/gateway-gcs.go index ea76ae01a..67177864d 100644 --- a/cmd/gateway-gcs.go +++ b/cmd/gateway-gcs.go @@ -41,11 +41,11 @@ const ( // gcsMinioMeta is used for multiparts. We have "minio.sys.temp" prefix so that // listing on the GCS lists this entry in the end. Also in the gateway // ListObjects we filter out this entry. - gcsMinioPath = "minio.sys.temp" + gcsMinioPath = "minio.sys.temp/" // Path where multipart objects are saved. // If we change the backend format we will use a different url path like /multipart/v2 // but we will not migrate old data. - gcsMinioMultipartPathV1 = gcsMinioPath + "/multipart/v1" + gcsMinioMultipartPathV1 = gcsMinioPath + "multipart/v1" // Multipart meta file. gcsMinioMultipartMeta = "gcs.json" // gcs.json version number @@ -63,11 +63,6 @@ type gcsMultipartMetaV1 struct { Object string `json:"object"` // Object name } -// Check if object prefix is "ZZZZ_Minio". -func isGCSPrefix(prefix string) bool { - return strings.TrimSuffix(prefix, slashSeparator) == gcsMinioPath -} - // Returns name of the multipart meta object. func gcsMultipartMetaName(uploadID string) string { return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPathV1, uploadID, gcsMinioMultipartMeta) @@ -157,28 +152,21 @@ func gcsToObjectError(err error, params ...string) error { Bucket: bucket, Object: object, } - } else { - err = BucketNotFound{ - Bucket: bucket, - } + break } + err = BucketNotFound{Bucket: bucket} case "conflict": if message == "You already own this bucket. Please select another name." { - err = BucketAlreadyOwnedByYou{ - Bucket: bucket, - } - } else if message == "Sorry, that name is not available. Please try a different one." { - err = BucketAlreadyExists{ - Bucket: bucket, - } - } else { - err = BucketNotEmpty{ - Bucket: bucket, - } + err = BucketAlreadyOwnedByYou{Bucket: bucket} + break } + if message == "Sorry, that name is not available. Please try a different one." { + err = BucketAlreadyExists{Bucket: bucket} + break + } + err = BucketNotEmpty{Bucket: bucket} default: err = fmt.Errorf("Unsupported error reason: %s", reason) - } e.e = err @@ -294,8 +282,48 @@ func (l *gcsGateway) ListBuckets() ([]BucketInfo, error) { return b, nil } -// DeleteBucket delete a bucket on GCS +// DeleteBucket delete a bucket on GCS. func (l *gcsGateway) DeleteBucket(bucket string) error { + itObject := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: slashSeparator, Versions: false}) + // We list the bucket and if we find any objects we return BucketNotEmpty error. If we + // find only "minio.sys.temp/" then we remove it before deleting the bucket. + gcsMinioPathFound := false + nonGCSMinioPathFound := false + for { + objAttrs, err := itObject.Next() + if err == iterator.Done { + break + } + if err != nil { + return gcsToObjectError(traceError(err)) + } + if objAttrs.Prefix == gcsMinioPath { + gcsMinioPathFound = true + continue + } + nonGCSMinioPathFound = true + break + } + if nonGCSMinioPathFound { + return gcsToObjectError(traceError(BucketNotEmpty{})) + } + if gcsMinioPathFound { + // Remove minio.sys.temp before deleting the bucket. + itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: gcsMinioPath}) + for { + objAttrs, err := itObject.Next() + if err == iterator.Done { + break + } + if err != nil { + return gcsToObjectError(traceError(err)) + } + err = l.client.Bucket(bucket).Object(objAttrs.Name).Delete(l.ctx) + if err != nil { + return gcsToObjectError(traceError(err)) + } + } + } err := l.client.Bucket(bucket).Delete(l.ctx) return gcsToObjectError(traceError(err), bucket) } @@ -361,10 +389,8 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de // if that one next object is our hidden // metadata folder, then just break // otherwise we've truncated the output - attrs, _ := it.Next() - if attrs == nil { - } else if isGCSPrefix(attrs.Prefix) { + if attrs != nil && attrs.Prefix == gcsMinioPath { break } @@ -375,19 +401,31 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de attrs, err := it.Next() if err == iterator.Done { break - } else if err != nil { + } + if err != nil { return ListObjectsInfo{}, gcsToObjectError(traceError(err), bucket, prefix) } nextMarker = toGCSPageToken(attrs.Name) - if isGCSPrefix(attrs.Prefix) { - // we don't return our metadata prefix + if attrs.Prefix == gcsMinioPath { + // We don't return our metadata prefix. continue - } else if attrs.Prefix != "" { + } + if !strings.HasPrefix(prefix, gcsMinioPath) { + // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. + // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ + // which will be helpful to observe the "directory structure" for debugging purposes. + if strings.HasPrefix(attrs.Prefix, gcsMinioPath) || + strings.HasPrefix(attrs.Name, gcsMinioPath) { + continue + } + } + if attrs.Prefix != "" { prefixes = append(prefixes, attrs.Prefix) continue - } else if !gcsMarker && attrs.Name <= marker { + } + if !gcsMarker && attrs.Name <= marker { // if user supplied a marker don't append // objects until we reach marker (and skip it). continue @@ -851,7 +889,8 @@ func (l *gcsGateway) SetBucketPolicies(bucket string, policyInfo policy.BucketAc if len(policies) != 1 { return traceError(NotImplemented{}) - } else if policies[0].Prefix != prefix { + } + if policies[0].Prefix != prefix { return traceError(NotImplemented{}) } diff --git a/cmd/gateway-gcs_test.go b/cmd/gateway-gcs_test.go index 8ee2013b2..df7ca246b 100644 --- a/cmd/gateway-gcs_test.go +++ b/cmd/gateway-gcs_test.go @@ -99,41 +99,6 @@ func TestValidGCSProjectID(t *testing.T) { } } -// Test for isGCSPrefix -func TestIsGCSPrefix(t *testing.T) { - testCases := []struct { - prefix string - expectedRes bool - }{ - // Regular prefix without a trailing slash - { - prefix: "hello", - expectedRes: false, - }, - // Regular prefix with a trailing slash - { - prefix: "hello/", - expectedRes: false, - }, - // GCS prefix without a trailing slash - { - prefix: gcsMinioPath, - expectedRes: true, - }, - // GCS prefix with a trailing slash - { - prefix: gcsMinioPath + "/", - expectedRes: true, - }, - } - - for i, tc := range testCases { - if actualRes := isGCSPrefix(tc.prefix); actualRes != tc.expectedRes { - t.Errorf("%d: Expected isGCSPrefix to return %v but got %v", i, tc.expectedRes, actualRes) - } - } -} - // Test for isGCSMarker. func TestIsGCSMarker(t *testing.T) { testCases := []struct {