From ce7af3aae13687186dada8082c751f33f199fba8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 11 Jul 2017 09:25:19 -0700 Subject: [PATCH] gcs: Fix writer/reader go-routine leaks and code re-use (#4651) This PR serves to fix following things in GCS gateway. - fixes leaks in object reader and writer, not getting closed under certain situations. This led to go-routine leaks. - apparent confusing issue in case of complete multipart upload, where it is currently possible for an entirely different object name to concatenate parts of a different object name if you happen to know the upload-id and parts of the object. This is a very rare scenario but it is possible. - succint usage of certain parts of code base and re-use. --- cmd/gateway-gcs.go | 224 +++++++++++++++++++++++---------------------- 1 file changed, 113 insertions(+), 111 deletions(-) diff --git a/cmd/gateway-gcs.go b/cmd/gateway-gcs.go index d884e0345..ef5a7f9a1 100644 --- a/cmd/gateway-gcs.go +++ b/cmd/gateway-gcs.go @@ -28,7 +28,6 @@ import ( "math" "regexp" "strings" - "time" "golang.org/x/oauth2/google" @@ -53,7 +52,8 @@ const ( // Multipart meta file. gcsMinioMultipartMeta = "gcs.json" // gcs.json version number - gcsMultipartMetaCurrentVersion = "1" + gcsMinioMultipartMetaCurrentVersion = "1" + // token prefixed with GCS returned marker to differentiate // from user supplied marker. gcsTokenPrefix = "##minio" @@ -316,28 +316,28 @@ func (l *gcsGateway) GetBucketInfo(bucket string) (BucketInfo, error) { }, nil } -// ListBuckets lists all GCS buckets -func (l *gcsGateway) ListBuckets() ([]BucketInfo, error) { +// ListBuckets lists all buckets under your project-id on GCS. +func (l *gcsGateway) ListBuckets() (buckets []BucketInfo, err error) { it := l.client.Buckets(l.ctx, l.projectID) - b := []BucketInfo{} + // Iterate and capture all the buckets. for { - attrs, err := it.Next() - if err == iterator.Done { + attrs, ierr := it.Next() + if ierr == iterator.Done { break } - if err != nil { - return []BucketInfo{}, gcsToObjectError(traceError(err)) + if ierr != nil { + return buckets, gcsToObjectError(traceError(ierr)) } - b = append(b, BucketInfo{ + buckets = append(buckets, BucketInfo{ Name: attrs.Name, Created: attrs.Created, }) } - return b, nil + return buckets, nil } // DeleteBucket delete a bucket on GCS. @@ -412,7 +412,11 @@ func isGCSMarker(marker string) bool { // ListObjects - lists all blobs in GCS bucket filtered by prefix func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) { - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) + it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{ + Delimiter: delimiter, + Prefix: prefix, + Versions: false, + }) isTruncated := false nextMarker := "" @@ -510,8 +514,14 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de } // ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix -func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) { - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) +func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, + delimiter string, maxKeys int) (ListObjectsV2Info, error) { + + it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{ + Delimiter: delimiter, + Prefix: prefix, + Versions: false, + }) isTruncated := false it.PageInfo().MaxSize = maxKeys @@ -591,7 +601,6 @@ func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, len if err != nil { return gcsToObjectError(traceError(err), bucket, key) } - defer r.Close() if _, err := io.Copy(writer, r); err != nil { @@ -615,10 +624,9 @@ func fromMinioClientListBucketResultToV2Info(bucket string, result minio.ListBuc } return ListObjectsV2Info{ - IsTruncated: result.IsTruncated, - Prefixes: prefixes, - Objects: objects, - + IsTruncated: result.IsTruncated, + Prefixes: prefixes, + Objects: objects, ContinuationToken: result.Marker, NextContinuationToken: result.NextMarker, } @@ -649,18 +657,17 @@ func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, er } attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx) - if err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, object) } - objInfo := fromGCSAttrsToObjectInfo(attrs) - objInfo.ETag = fmt.Sprintf("%d", attrs.CRC32C) - return objInfo, nil + return fromGCSAttrsToObjectInfo(attrs), nil } // PutObject - Create a new object with the incoming data, -func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) { +func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Reader, + metadata map[string]string, sha256sum string) (ObjectInfo, error) { + // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { @@ -684,31 +691,26 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re w.ContentType = metadata["content-type"] w.ContentEncoding = metadata["content-encoding"] + w.Metadata = metadata if md5sum != "" { var err error w.MD5, err = hex.DecodeString(md5sum) if err != nil { + // Close the object writer upon error. + w.Close() return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } } - w.Metadata = metadata - - _, err := io.Copy(w, reader) - if err != nil { - return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) - } - - err = w.Close() - if err != nil { - return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) - } - - attrs, err := object.Attrs(l.ctx) - if err != nil { + if _, err := io.CopyN(w, reader, size); err != nil { + // Close the object writer upon error. + w.Close() return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } + // Close the object writer upon success. + w.Close() + // Verify sha256sum after close. if sha256sum != "" { if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum { object.Delete(l.ctx) @@ -716,11 +718,18 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re } } + attrs, err := object.Attrs(l.ctx) + if err != nil { + return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + return fromGCSAttrsToObjectInfo(attrs), nil } // CopyObject - Copies a blob from source container to destination container. -func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, metadata map[string]string) (ObjectInfo, error) { +func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, + metadata map[string]string) (ObjectInfo, error) { + src := l.client.Bucket(srcBucket).Object(srcObject) dst := l.client.Bucket(destBucket).Object(destObject) @@ -751,21 +760,19 @@ func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[ meta := gcsMultipartMetaName(uploadID) w := l.client.Bucket(bucket).Object(meta).NewWriter(l.ctx) + defer w.Close() + w.ContentType = metadata["content-type"] w.ContentEncoding = metadata["content-encoding"] w.Metadata = metadata - content, err := json.Marshal(gcsMultipartMetaV1{gcsMultipartMetaCurrentVersion, bucket, key}) - if err != nil { + if err = json.NewEncoder(w).Encode(gcsMultipartMetaV1{ + gcsMinioMultipartMetaCurrentVersion, + bucket, + key, + }); err != nil { return "", gcsToObjectError(traceError(err), bucket, key) } - if _, err = w.Write(content); err != nil { - return "", gcsToObjectError(traceError(err), bucket, key) - } - if err = w.Close(); err != nil { - return "", gcsToObjectError(traceError(err), bucket, key) - } - return uploadID, nil } @@ -785,52 +792,61 @@ func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBuck return PartInfo{}, traceError(NotSupported{}) } +// Checks if minio.sys.temp/multipart/v1//gcs.json exists, returns +// an object layer compatible error upon any error. +func (l *gcsGateway) checkUploadIDExists(bucket string, key string, uploadID string) error { + _, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(l.ctx) + return gcsToObjectError(traceError(err), bucket, key) +} + // PutObjectPart puts a part of object in bucket func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) { - meta := gcsMultipartMetaName(uploadID) - object := l.client.Bucket(bucket).Object(meta) - - _, err := object.Attrs(l.ctx) - if err != nil { - return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) + if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil { + return PartInfo{}, err } var sha256Writer hash.Hash - // Generate random ETag. - etag := getMD5Hash([]byte(mustGetUUID())) + var etag string + // Honor etag if client did send md5Hex. + if md5Hex != "" { + etag = md5Hex + } else { + // Generate random ETag. + etag = getMD5Hash([]byte(mustGetUUID())) + } reader := data - if sha256sum != "" { sha256Writer = sha256.New() reader = io.TeeReader(data, sha256Writer) } - dataName := gcsMultipartDataName(uploadID, etag) - - object = l.client.Bucket(bucket).Object(dataName) - + object := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, etag)) w := object.NewWriter(l.ctx) // Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case // where it tries to upload 0 bytes in the last chunk and get error from server. w.ChunkSize = 0 if md5Hex != "" { + var err error w.MD5, err = hex.DecodeString(md5Hex) if err != nil { + // Make sure to close object writer upon error. + w.Close() return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) } } - _, err = io.Copy(w, reader) - if err != nil { + + if _, err := io.CopyN(w, reader, size); err != nil { + // Make sure to close object writer upon error. + w.Close() return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) } - err = w.Close() - if err != nil { - return PartInfo{}, gcsToObjectError(traceError(err), bucket, key) - } + // Make sure to close the object writer upon success. + w.Close() + // Verify sha256sum after Close(). if sha256sum != "" { if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum { object.Delete(l.ctx) @@ -841,34 +857,19 @@ func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, p return PartInfo{ PartNumber: partID, ETag: etag, - LastModified: time.Now().UTC(), + LastModified: UTCNow(), Size: size, }, nil + } // ListObjectParts returns all object parts for specified object in specified bucket func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) { - meta := gcsMultipartMetaName(uploadID) - object := l.client.Bucket(bucket).Object(meta) - - _, err := object.Attrs(l.ctx) - if err != nil { - return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, key) - } - - return ListPartsInfo{}, nil + return ListPartsInfo{}, l.checkUploadIDExists(bucket, key, uploadID) } // Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up. func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error { - meta := gcsMultipartMetaName(uploadID) - object := l.client.Bucket(bucket).Object(meta) - - _, err := object.Attrs(l.ctx) - if err != nil { - return gcsToObjectError(traceError(err), bucket, key) - } - prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPathV1, uploadID) // iterate through all parts and delete them @@ -893,13 +894,20 @@ func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error // AbortMultipartUpload aborts a ongoing multipart upload func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error { + if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil { + return err + } return l.cleanupMultipartUpload(bucket, key, uploadID) } // CompleteMultipartUpload completes ongoing multipart upload and finalizes object -// Note that there is a limit (currently 32) to the number of components that can be composed in a single operation. -// There is a limit (currently 1024) to the total number of components for a given composite object. This means you can append to each object at most 1023 times. -// There is a per-project rate limit (currently 200) to the number of components you can compose per second. This rate counts both the components being appended to a composite object as well as the components being copied when the composite object of which they are a part is copied. +// Note that there is a limit (currently 32) to the number of components that can +// be composed in a single operation. There is a limit (currently 1024) to the total +// number of components for a given composite object. This means you can append to +// each object at most 1023 times. There is a per-project rate limit (currently 200) +// to the number of components you can compose per second. This rate counts both the +// components being appended to a composite object as well as the components being +// copied when the composite object of which they are a part is copied. func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) { meta := gcsMultipartMetaName(uploadID) object := l.client.Bucket(bucket).Object(meta) @@ -908,29 +916,33 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID if err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } + r, err := object.NewReader(l.ctx) if err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } + defer r.Close() // Check version compatibility of the meta file before compose() multipartMeta := gcsMultipartMetaV1{} - decoder := json.NewDecoder(r) - err = decoder.Decode(&multipartMeta) - if err != nil { + if err = json.NewDecoder(r).Decode(&multipartMeta); err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } - if multipartMeta.Version != gcsMultipartMetaCurrentVersion { + + if multipartMeta.Version != gcsMinioMultipartMetaCurrentVersion { return ObjectInfo{}, gcsToObjectError(traceError(errFormatNotSupported), bucket, key) } - parts := make([]*storage.ObjectHandle, len(uploadedParts)) - for i, uploadedPart := range uploadedParts { - parts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag)) + // Validate if the gcs.json stores valid entries for the bucket and key. + if multipartMeta.Bucket != bucket || multipartMeta.Object != key { + return ObjectInfo{}, gcsToObjectError(InvalidUploadID{ + UploadID: uploadID, + }, bucket, key) } - if len(parts) > maxPartCount { - return ObjectInfo{}, traceError(NotSupported{}) + var parts []*storage.ObjectHandle + for _, uploadedPart := range uploadedParts { + parts = append(parts, l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag))) } composeCount := int(math.Ceil(float64(len(parts)) / float64(maxComponents))) @@ -952,8 +964,7 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID composer.ContentType = partZeroAttrs.ContentType composer.Metadata = partZeroAttrs.Metadata - _, err = composer.Run(l.ctx) - if err != nil { + if _, err = composer.Run(l.ctx); err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) } } @@ -962,13 +973,9 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID parts = composeParts } - dst := l.client.Bucket(bucket).Object(key) - - composer := dst.ComposerFrom(parts...) - + composer := l.client.Bucket(bucket).Object(key).ComposerFrom(parts...) composer.ContentType = partZeroAttrs.ContentType composer.Metadata = partZeroAttrs.Metadata - attrs, err := composer.Run(l.ctx) if err != nil { return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) @@ -1026,15 +1033,12 @@ func (l *gcsGateway) SetBucketPolicies(bucket string, policyInfo policy.BucketAc // GetBucketPolicies - Get policy on bucket func (l *gcsGateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) { - acl := l.client.Bucket(bucket).ACL() - - rules, err := acl.List(l.ctx) + rules, err := l.client.Bucket(bucket).ACL().List(l.ctx) if err != nil { return policy.BucketAccessPolicy{}, gcsToObjectError(traceError(err), bucket) } policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"} - for _, r := range rules { if r.Entity != storage.AllUsers || r.Role == storage.RoleOwner { continue @@ -1052,10 +1056,8 @@ func (l *gcsGateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy // DeleteBucketPolicies - Delete all policies on bucket func (l *gcsGateway) DeleteBucketPolicies(bucket string) error { - acl := l.client.Bucket(bucket).ACL() - // This only removes the storage.AllUsers policies - if err := acl.Delete(l.ctx, storage.AllUsers); err != nil { + if err := l.client.Bucket(bucket).ACL().Delete(l.ctx, storage.AllUsers); err != nil { return gcsToObjectError(traceError(err), bucket) }