From 3928c1e14c16f12c107b8da23e1e2f6025a72219 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Sat, 17 Jun 2017 16:00:41 -0700 Subject: [PATCH] gateway/gcs: Change in multipart backend format (#4455) --- cmd/gateway-azure.go | 1 - cmd/gateway-gcs-anonymous.go | 2 +- cmd/gateway-gcs.go | 437 +++++++++++++++-------------------- cmd/gateway-gcs_test.go | 61 ++--- cmd/storage-errors.go | 4 + 5 files changed, 218 insertions(+), 287 deletions(-) diff --git a/cmd/gateway-azure.go b/cmd/gateway-azure.go index 2fa5664c5..c012c3f5a 100644 --- a/cmd/gateway-azure.go +++ b/cmd/gateway-azure.go @@ -156,7 +156,6 @@ func azureToObjectError(err error, params ...string) error { // Inits azure blob storage client and returns AzureObjects. func newAzureLayer(host string) (GatewayLayer, error) { - var err error var endpoint = storage.DefaultBaseURL var secure = true diff --git a/cmd/gateway-gcs-anonymous.go b/cmd/gateway-gcs-anonymous.go index 9350be20e..f91cfca5e 100644 --- a/cmd/gateway-gcs-anonymous.go +++ b/cmd/gateway-gcs-anonymous.go @@ -29,7 +29,7 @@ func toGCSPublicURL(bucket, object string) string { } // AnonPutObject creates a new object anonymously with the incoming data, -func (l *gcsGateway) AnonPutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) { +func (l *gcsGateway) AnonPutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) { return ObjectInfo{}, NotImplemented{} } diff --git a/cmd/gateway-gcs.go b/cmd/gateway-gcs.go index f60a9bd9c..64877d4d5 100644 --- a/cmd/gateway-gcs.go +++ b/cmd/gateway-gcs.go @@ -21,13 +21,13 @@ import ( "crypto/sha256" "encoding/base64" "encoding/hex" + "encoding/json" "fmt" "hash" "io" - "path" "regexp" - "strconv" "strings" + "time" "cloud.google.com/go/storage" "google.golang.org/api/googleapi" @@ -38,17 +38,42 @@ import ( ) const ( - // ZZZZMinioPrefix is used for metadata and multiparts. The prefix is being filtered out, - // hence the naming of ZZZZ (last prefix) - ZZZZMinioPrefix = "ZZZZ-Minio" + // gcsMinioMeta is used for multiparts. We have "minio.sys.temp" prefix so that + // listing on the GCS lists this entry in the end. Also in the gateway + // ListObjects we filter out this entry. + gcsMinioPath = "minio.sys.temp" + // Path where multipart objects are saved. + gcsMinioMultipartPath = gcsMinioPath + "/multipart" + // Multipart meta file. + gcsMinioMultipartMeta = "gcs.json" + // gcs.json version number + gcsMultipartMetaCurrentVersion = "1" // token prefixed with GCS returned marker to differentiate // from user supplied marker. gcsTokenPrefix = "##minio" ) +// Stored in gcs.json - Contents of this file is not used anywhere. It can be +// used for debugging purposes. +type gcsMultipartMetaV1 struct { + Version string `json:"version"` // Version number + Bucket string `json:"bucket"` // Bucket name + Object string `json:"object"` // Object name +} + // Check if object prefix is "ZZZZ_Minio". func isGCSPrefix(prefix string) bool { - return strings.TrimSuffix(prefix, slashSeparator) == ZZZZMinioPrefix + return strings.TrimSuffix(prefix, slashSeparator) == gcsMinioPath +} + +// Returns name of the multipart meta object. +func gcsMultipartMetaName(uploadID string) string { + return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPath, uploadID, gcsMinioMultipartMeta) +} + +// Returns name of the part object. +func gcsMultipartDataName(uploadID, etag string) string { + return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPath, uploadID, etag) } // Convert Minio errors to minio object layer errors. @@ -97,6 +122,12 @@ func gcsToObjectError(err error, params ...string) error { if !ok { // We don't interpret non Minio errors. As minio errors will // have StatusCode to help to convert to object errors. + e.e = err + return e + } + + if len(googleAPIErr.Errors) == 0 { + e.e = err return e } @@ -148,9 +179,6 @@ func gcsToObjectError(err error, params ...string) error { } - _ = bucket - _ = object - e.e = err return e } @@ -211,23 +239,20 @@ func (l *gcsGateway) StorageInfo() StorageInfo { return StorageInfo{} } -// MakeBucket - Create a new container on GCS backend. -func (l *gcsGateway) MakeBucket(bucket string) error { - // will never be called, only satisfy ObjectLayer interface - return traceError(NotImplemented{}) -} - -// MakeBucket - Create a new container on GCS backend. +// MakeBucketWithLocation - Create a new container on GCS backend. func (l *gcsGateway) MakeBucketWithLocation(bucket, location string) error { bkt := l.client.Bucket(bucket) - if err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{ - Location: location, - }); err != nil { - return gcsToObjectError(traceError(err), bucket) + // we'll default to the us multi-region in case of us-east-1 + if location == "us-east-1" { + location = "us" } - return nil + err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{ + Location: location, + }) + + return gcsToObjectError(traceError(err), bucket) } // GetBucketInfo - Get bucket metadata.. @@ -270,11 +295,7 @@ func (l *gcsGateway) ListBuckets() ([]BucketInfo, error) { // DeleteBucket delete a bucket on GCS func (l *gcsGateway) DeleteBucket(bucket string) error { err := l.client.Bucket(bucket).Delete(l.ctx) - if err != nil { - return gcsToObjectError(traceError(err), bucket) - } - - return nil + return gcsToObjectError(traceError(err), bucket) } func toGCSPageToken(name string) string { @@ -474,7 +495,6 @@ func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo { ContentType: attrs.ContentType, ContentEncoding: attrs.ContentEncoding, } - } // GetObjectInfo - reads object info and replies back ObjectInfo @@ -504,15 +524,12 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re return ObjectInfo{}, gcsToObjectError(traceError(err), bucket) } - teeReader := data + reader := data var sha256Writer hash.Hash - if sha256sum == "" { - } else if _, err := hex.DecodeString(sha256sum); err != nil { - return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) - } else { + if sha256sum != "" { sha256Writer = sha256.New() - teeReader = io.TeeReader(teeReader, sha256Writer) + reader = io.TeeReader(data, sha256Writer) } md5sum := metadata["etag"] @@ -524,16 +541,17 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re w.ContentType = metadata["content-type"] w.ContentEncoding = metadata["content-encoding"] - if md5sum == "" { - } else if md5, err := hex.DecodeString(md5sum); err != nil { - return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) - } else { - w.MD5 = md5 + if md5sum != "" { + var err error + w.MD5, err = hex.DecodeString(md5sum) + if err != nil { + return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) + } } w.Metadata = metadata - _, err := io.Copy(w, teeReader) + _, err := io.Copy(w, reader) if err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } @@ -548,10 +566,11 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } - if sha256sum == "" { - } else if newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)); newSHA256sum != sha256sum { - object.Delete(l.ctx) - return ObjectInfo{}, traceError(SHA256Mismatch{}) + if sha256sum != "" { + if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum { + object.Delete(l.ctx) + return ObjectInfo{}, traceError(SHA256Mismatch{}) + } } return fromGCSAttrsToObjectInfo(attrs), nil @@ -580,127 +599,26 @@ func (l *gcsGateway) DeleteObject(bucket string, object string) error { return nil } -// ListMultipartUploads - lists all multipart uploads. -func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - prefix = pathJoin(ZZZZMinioPrefix, "multipart-") - - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) - - nextMarker := "" - isTruncated := false - - it.PageInfo().Token = uploadIDMarker - - uploads := []uploadMetadata{} - for { - if len(uploads) >= maxUploads { - isTruncated = true - nextMarker = it.PageInfo().Token - break - } - - attrs, err := it.Next() - if err == iterator.Done { - break - } else if err != nil { - return ListMultipartsInfo{}, gcsToObjectError(traceError(err), bucket) - } - - if attrs.Prefix != "" { - continue - } - - objectKey, uploadID, partID, err := fromGCSMultipartKey(attrs.Name) - if err != nil { - continue - } else if partID != 0 { - continue - } - - nextMarker = toGCSPageToken(attrs.Name) - - // we count only partID == 0 - uploads = append(uploads, uploadMetadata{ - Object: objectKey, - UploadID: uploadID, - Initiated: attrs.Created, - }) - - } - - return ListMultipartsInfo{ - Uploads: uploads, - IsTruncated: isTruncated, - - KeyMarker: nextMarker, - UploadIDMarker: nextMarker, - NextKeyMarker: nextMarker, - NextUploadIDMarker: nextMarker, - MaxUploads: maxUploads, - }, nil - -} - -func fromGCSMultipartKey(s string) (key, uploadID string, partID int, err error) { - // remove prefixes - s = path.Base(s) - - parts := strings.Split(s, "-") - if parts[0] != "multipart" { - return "", "", 0, errGCSNotValidMultipartIdentifier - } - - if len(parts) != 4 { - return "", "", 0, errGCSNotValidMultipartIdentifier - } - - key = unescape(parts[1]) - - uploadID = parts[2] - - partID, err = strconv.Atoi(parts[3]) - if err != nil { - return "", "", 0, err - } - - return -} - -func unescape(s string) string { - s = strings.Replace(s, "%2D", "-", -1) - s = strings.Replace(s, "%2F", "/", -1) - s = strings.Replace(s, "%25", "%", -1) - return s -} - -func escape(s string) string { - s = strings.Replace(s, "%", "%25", -1) - s = strings.Replace(s, "/", "%2F", -1) - s = strings.Replace(s, "-", "%2D", -1) - return s -} - -func toGCSMultipartKey(key string, uploadID string, partID int) string { - // parts are allowed to be numbered from 1 to 10,000 (inclusive) - - // we need to encode the key because of possible slashes - return pathJoin(ZZZZMinioPrefix, fmt.Sprintf("multipart-%s-%s-%05d", escape(key), uploadID, partID)) -} - // NewMultipartUpload - upload object in multiple parts func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[string]string) (uploadID string, err error) { // generate new uploadid uploadID = mustGetUUID() - uploadID = strings.Replace(uploadID, "-", "", -1) // generate name for part zero - partZeroKey := toGCSMultipartKey(key, uploadID, 0) + meta := gcsMultipartMetaName(uploadID) - // we are writing a 0 sized object to hold the metadata - w := l.client.Bucket(bucket).Object(partZeroKey).NewWriter(l.ctx) + w := l.client.Bucket(bucket).Object(meta).NewWriter(l.ctx) w.ContentType = metadata["content-type"] w.ContentEncoding = metadata["content-encoding"] w.Metadata = metadata + + content, err := json.Marshal(gcsMultipartMetaV1{gcsMultipartMetaCurrentVersion, bucket, key}) + if err != nil { + return "", gcsToObjectError(traceError(err), bucket, key) + } + if _, err = w.Write(content); err != nil { + return "", gcsToObjectError(traceError(err), bucket, key) + } if err = w.Close(); err != nil { return "", gcsToObjectError(traceError(err), bucket, key) } @@ -708,6 +626,17 @@ func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[ return uploadID, nil } +// ListMultipartUploads - lists all multipart uploads. +func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) { + return ListMultipartsInfo{ + KeyMarker: keyMarker, + UploadIDMarker: uploadIDMarker, + MaxUploads: maxUploads, + Prefix: prefix, + Delimiter: delimiter, + }, nil +} + // CopyObjectPart - copy part of object to other bucket and object func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) { return PartInfo{}, traceError(NotSupported{}) @@ -715,131 +644,146 @@ func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBuck // PutObjectPart puts a part of object in bucket func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) { - multipartKey := toGCSMultipartKey(key, uploadID, partID) + meta := gcsMultipartMetaName(uploadID) + object := l.client.Bucket(bucket).Object(meta) + + _, err := object.Attrs(l.ctx) + if err != nil { + return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + + var sha256Writer hash.Hash + + // Generate random ETag. + etag := getMD5Hash([]byte(mustGetUUID())) + + reader := data + + if sha256sum != "" { + sha256Writer = sha256.New() + reader = io.TeeReader(data, sha256Writer) + } + + dataName := gcsMultipartDataName(uploadID, etag) + + object = l.client.Bucket(bucket).Object(dataName) + + w := object.NewWriter(l.ctx) + // Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case + // where it tries to upload 0 bytes in the last chunk and get error from server. + w.ChunkSize = 0 + if md5Hex != "" { + w.MD5, err = hex.DecodeString(md5Hex) + if err != nil { + return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + } + _, err = io.Copy(w, reader) + if err != nil { + return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + + err = w.Close() + if err != nil { + return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + + if sha256sum != "" { + if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum { + object.Delete(l.ctx) + return PartInfo{}, traceError(SHA256Mismatch{}) + } + } - info, err := l.PutObject(bucket, multipartKey, size, data, map[string]string{}, sha256sum) return PartInfo{ PartNumber: partID, - LastModified: info.ModTime, - ETag: info.ETag, - Size: info.Size, - }, err + ETag: etag, + LastModified: time.Now().UTC(), + Size: size, + }, nil } // ListObjectParts returns all object parts for specified object in specified bucket func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) { - delimiter := slashSeparator - prefix := pathJoin(ZZZZMinioPrefix, fmt.Sprintf("multipart-%s-%s", escape(key), uploadID)) + meta := gcsMultipartMetaName(uploadID) + object := l.client.Bucket(bucket).Object(meta) - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) - - isTruncated := false - - it.PageInfo().Token = toGCSPageToken(toGCSMultipartKey(key, uploadID, partNumberMarker)) - it.PageInfo().MaxSize = maxParts - - nextPartnumberMarker := 0 - - parts := []PartInfo{} - for { - if len(parts) >= maxParts { - isTruncated = true - break - } - - attrs, err := it.Next() - if err == iterator.Done { - break - } else if err != nil { - return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, prefix) - } - - if attrs.Prefix != "" { - continue - } - - _, _, partID, err := fromGCSMultipartKey(attrs.Name) - if err != nil { - continue - } else if partID == 0 { - // we'll ignore partID 0, it is our zero object, containing - // metadata - continue - } - - nextPartnumberMarker = partID - - parts = append(parts, PartInfo{ - PartNumber: partID, - LastModified: attrs.Updated, - ETag: fmt.Sprintf("%d", attrs.CRC32C), - Size: attrs.Size, - }) + _, err := object.Attrs(l.ctx) + if err != nil { + return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, key) } - return ListPartsInfo{ - IsTruncated: isTruncated, - NextPartNumberMarker: nextPartnumberMarker, - Parts: parts, - }, nil + return ListPartsInfo{}, nil } -// AbortMultipartUpload aborts a ongoing multipart upload -func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error { - delimiter := slashSeparator - prefix := pathJoin(ZZZZMinioPrefix, fmt.Sprintf("multipart-%s-%s", escape(key), uploadID)) +// Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up. +func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error { + meta := gcsMultipartMetaName(uploadID) + object := l.client.Bucket(bucket).Object(meta) + + _, err := object.Attrs(l.ctx) + if err != nil { + return gcsToObjectError(traceError(err), bucket, key) + } + + prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPath, uploadID) // iterate through all parts and delete them - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) - - it.PageInfo().Token = toGCSPageToken(toGCSMultipartKey(key, uploadID, 0)) + it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: prefix, Versions: false}) for { attrs, err := it.Next() if err == iterator.Done { break - } else if err != nil { + } + if err != nil { return gcsToObjectError(traceError(err), bucket, key) } - // on error continue deleting other parts - l.client.Bucket(bucket).Object(attrs.Name).Delete(l.ctx) + object := l.client.Bucket(bucket).Object(attrs.Name) + // Ignore the error as parallel AbortMultipartUpload might have deleted it. + object.Delete(l.ctx) } - // delete part zero, ignoring errors here, we want to clean up all remains - _ = l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, 0)).Delete(l.ctx) - return nil } +// AbortMultipartUpload aborts a ongoing multipart upload +func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error { + return l.cleanupMultipartUpload(bucket, key, uploadID) +} + // CompleteMultipartUpload completes ongoing multipart upload and finalizes object - // Note that there is a limit (currently 32) to the number of components that can be composed in a single operation. - // There is a limit (currently 1024) to the total number of components for a given composite object. This means you can append to each object at most 1023 times. - // There is a per-project rate limit (currently 200) to the number of components you can compose per second. This rate counts both the components being appended to a composite object as well as the components being copied when the composite object of which they are a part is copied. func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) { - partZero := l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, 0)) + meta := gcsMultipartMetaName(uploadID) + object := l.client.Bucket(bucket).Object(meta) - partZeroAttrs, err := partZero.Attrs(l.ctx) + partZeroAttrs, err := object.Attrs(l.ctx) + if err != nil { + return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + r, err := object.NewReader(l.ctx) if err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } + // Check version compatibility of the meta file before compose() + multipartMeta := gcsMultipartMetaV1{} + decoder := json.NewDecoder(r) + err = decoder.Decode(&multipartMeta) + if err != nil { + return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + if multipartMeta.Version != gcsMultipartMetaCurrentVersion { + return ObjectInfo{}, gcsToObjectError(traceError(errFormatNotSupported), bucket, key) + } + parts := make([]*storage.ObjectHandle, len(uploadedParts)) for i, uploadedPart := range uploadedParts { - object := l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)) - attrs, partErr := object.Attrs(l.ctx) - if partErr != nil { - return ObjectInfo{}, gcsToObjectError(traceError(partErr), bucket, key) - } - crc32cStr := fmt.Sprintf("%d", attrs.CRC32C) - if crc32cStr != uploadedPart.ETag { - return ObjectInfo{}, gcsToObjectError(traceError(InvalidPart{}), bucket, key) - } - - parts[i] = object + parts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag)) } if len(parts) > 32 { @@ -858,14 +802,13 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID composer.Metadata = partZeroAttrs.Metadata attrs, err := composer.Run(l.ctx) - // cleanup, delete all parts - for _, uploadedPart := range uploadedParts { - l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)).Delete(l.ctx) + if err != nil { + return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } - - partZero.Delete(l.ctx) - - return fromGCSAttrsToObjectInfo(attrs), gcsToObjectError(traceError(err), bucket, key) + if err = l.cleanupMultipartUpload(bucket, key, uploadID); err != nil { + return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + return fromGCSAttrsToObjectInfo(attrs), nil } // SetBucketPolicies - Set policy on bucket diff --git a/cmd/gateway-gcs_test.go b/cmd/gateway-gcs_test.go index 739690414..65c385eb9 100644 --- a/cmd/gateway-gcs_test.go +++ b/cmd/gateway-gcs_test.go @@ -18,41 +18,6 @@ package cmd import "testing" -func TestEscape(t *testing.T) { - testCases := []struct { - Value string - EscapedValue string - }{ - { - Value: "test-test", - EscapedValue: "test%2Dtest", - }, - { - Value: "test/test", - EscapedValue: "test%2Ftest", - }, - { - Value: "test%test", - EscapedValue: "test%25test", - }, - { - Value: "%%%////+++", - EscapedValue: "%25%25%25%2F%2F%2F%2F+++", - }, - } - - for i, testCase := range testCases { - if escape(testCase.Value) != testCase.EscapedValue { - t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.EscapedValue, escape(testCase.Value)) - } - - if unescape(testCase.EscapedValue) != testCase.Value { - t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.Value, unescape(testCase.EscapedValue)) - } - } - -} - func TestToGCSPageToken(t *testing.T) { testCases := []struct { Name string @@ -122,7 +87,6 @@ func TestValidGCSProjectID(t *testing.T) { {"projectid1414", true}, } - // for i, testCase := range testCases { if isValidGCSProjectID(testCase.ProjectID) != testCase.Valid { t.Errorf("Test %d: Expected %v, got %v", i+1, isValidGCSProjectID(testCase.ProjectID), testCase.Valid) @@ -148,12 +112,12 @@ func TestIsGCSPrefix(t *testing.T) { }, // GCS prefix without a trailing slash { - prefix: ZZZZMinioPrefix, + prefix: gcsMinioPath, expectedRes: true, }, // GCS prefix with a trailing slash { - prefix: ZZZZMinioPrefix + "/", + prefix: gcsMinioPath + "/", expectedRes: true, }, } @@ -196,3 +160,24 @@ func TestIsGCSMarker(t *testing.T) { } } } + +// Test for gcsMultipartMetaName. +func TestGCSMultipartMetaName(t *testing.T) { + uploadID := "a" + expected := pathJoin(gcsMinioMultipartPath, uploadID, gcsMinioMultipartMeta) + got := gcsMultipartMetaName(uploadID) + if expected != got { + t.Errorf("expected: %s, got: %s", expected, got) + } +} + +// Test for gcsMultipartDataName. +func TestGCSMultipartDataName(t *testing.T) { + uploadID := "a" + etag := "b" + expected := pathJoin(gcsMinioMultipartPath, uploadID, etag) + got := gcsMultipartDataName(uploadID, etag) + if expected != got { + t.Errorf("expected: %s, got: %s", expected, got) + } +} diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index 2df4b43eb..f142701c0 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -27,6 +27,10 @@ var errUnexpected = errors.New("Unexpected error, please report this issue at ht // errCorruptedFormat - corrupted backend format. var errCorruptedFormat = errors.New("corrupted backend format, please join https://slack.minio.io for assistance") +// errFormatNotSupported - returned when older minio tries to parse metadata +// created by newer minio. +var errFormatNotSupported = errors.New("format not supported") + // errUnformattedDisk - unformatted disk found. var errUnformattedDisk = errors.New("unformatted disk found")