gcs: Fix writer/reader go-routine leaks and code re-use (#4651)

This PR serves to fix following things in GCS gateway.

- fixes leaks in object reader and writer, not getting closed
  under certain situations. This led to go-routine leaks.

- apparent confusing issue in case of complete multipart upload,
  where it is currently possible for an entirely different
  object name to concatenate parts of a different object name
  if you happen to know the upload-id and parts of the object.
  This is a very rare scenario but it is possible.

- succint usage of certain parts of code base and re-use.
This commit is contained in:
Harshavardhana 2017-07-11 09:25:19 -07:00 committed by Dee Koder
parent 1b92c5136b
commit ce7af3aae1

View file

@ -28,7 +28,6 @@ import (
"math"
"regexp"
"strings"
"time"
"golang.org/x/oauth2/google"
@ -53,7 +52,8 @@ const (
// Multipart meta file.
gcsMinioMultipartMeta = "gcs.json"
// gcs.json version number
gcsMultipartMetaCurrentVersion = "1"
gcsMinioMultipartMetaCurrentVersion = "1"
// token prefixed with GCS returned marker to differentiate
// from user supplied marker.
gcsTokenPrefix = "##minio"
@ -316,28 +316,28 @@ func (l *gcsGateway) GetBucketInfo(bucket string) (BucketInfo, error) {
}, nil
}
// ListBuckets lists all GCS buckets
func (l *gcsGateway) ListBuckets() ([]BucketInfo, error) {
// ListBuckets lists all buckets under your project-id on GCS.
func (l *gcsGateway) ListBuckets() (buckets []BucketInfo, err error) {
it := l.client.Buckets(l.ctx, l.projectID)
b := []BucketInfo{}
// Iterate and capture all the buckets.
for {
attrs, err := it.Next()
if err == iterator.Done {
attrs, ierr := it.Next()
if ierr == iterator.Done {
break
}
if err != nil {
return []BucketInfo{}, gcsToObjectError(traceError(err))
if ierr != nil {
return buckets, gcsToObjectError(traceError(ierr))
}
b = append(b, BucketInfo{
buckets = append(buckets, BucketInfo{
Name: attrs.Name,
Created: attrs.Created,
})
}
return b, nil
return buckets, nil
}
// DeleteBucket delete a bucket on GCS.
@ -412,7 +412,11 @@ func isGCSMarker(marker string) bool {
// ListObjects - lists all blobs in GCS bucket filtered by prefix
func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{
Delimiter: delimiter,
Prefix: prefix,
Versions: false,
})
isTruncated := false
nextMarker := ""
@ -510,8 +514,14 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
}
// ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix
func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) {
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool,
delimiter string, maxKeys int) (ListObjectsV2Info, error) {
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{
Delimiter: delimiter,
Prefix: prefix,
Versions: false,
})
isTruncated := false
it.PageInfo().MaxSize = maxKeys
@ -591,7 +601,6 @@ func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, len
if err != nil {
return gcsToObjectError(traceError(err), bucket, key)
}
defer r.Close()
if _, err := io.Copy(writer, r); err != nil {
@ -615,10 +624,9 @@ func fromMinioClientListBucketResultToV2Info(bucket string, result minio.ListBuc
}
return ListObjectsV2Info{
IsTruncated: result.IsTruncated,
Prefixes: prefixes,
Objects: objects,
IsTruncated: result.IsTruncated,
Prefixes: prefixes,
Objects: objects,
ContinuationToken: result.Marker,
NextContinuationToken: result.NextMarker,
}
@ -649,18 +657,17 @@ func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, er
}
attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, object)
}
objInfo := fromGCSAttrsToObjectInfo(attrs)
objInfo.ETag = fmt.Sprintf("%d", attrs.CRC32C)
return objInfo, nil
return fromGCSAttrsToObjectInfo(attrs), nil
}
// PutObject - Create a new object with the incoming data,
func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) {
func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Reader,
metadata map[string]string, sha256sum string) (ObjectInfo, error) {
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
// otherwise gcs will just return object not exist in case of non-existing bucket
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
@ -684,31 +691,26 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re
w.ContentType = metadata["content-type"]
w.ContentEncoding = metadata["content-encoding"]
w.Metadata = metadata
if md5sum != "" {
var err error
w.MD5, err = hex.DecodeString(md5sum)
if err != nil {
// Close the object writer upon error.
w.Close()
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
}
w.Metadata = metadata
_, err := io.Copy(w, reader)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
err = w.Close()
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
attrs, err := object.Attrs(l.ctx)
if err != nil {
if _, err := io.CopyN(w, reader, size); err != nil {
// Close the object writer upon error.
w.Close()
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
// Close the object writer upon success.
w.Close()
// Verify sha256sum after close.
if sha256sum != "" {
if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum {
object.Delete(l.ctx)
@ -716,11 +718,18 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re
}
}
attrs, err := object.Attrs(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
return fromGCSAttrsToObjectInfo(attrs), nil
}
// CopyObject - Copies a blob from source container to destination container.
func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, metadata map[string]string) (ObjectInfo, error) {
func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string,
metadata map[string]string) (ObjectInfo, error) {
src := l.client.Bucket(srcBucket).Object(srcObject)
dst := l.client.Bucket(destBucket).Object(destObject)
@ -751,21 +760,19 @@ func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[
meta := gcsMultipartMetaName(uploadID)
w := l.client.Bucket(bucket).Object(meta).NewWriter(l.ctx)
defer w.Close()
w.ContentType = metadata["content-type"]
w.ContentEncoding = metadata["content-encoding"]
w.Metadata = metadata
content, err := json.Marshal(gcsMultipartMetaV1{gcsMultipartMetaCurrentVersion, bucket, key})
if err != nil {
if err = json.NewEncoder(w).Encode(gcsMultipartMetaV1{
gcsMinioMultipartMetaCurrentVersion,
bucket,
key,
}); err != nil {
return "", gcsToObjectError(traceError(err), bucket, key)
}
if _, err = w.Write(content); err != nil {
return "", gcsToObjectError(traceError(err), bucket, key)
}
if err = w.Close(); err != nil {
return "", gcsToObjectError(traceError(err), bucket, key)
}
return uploadID, nil
}
@ -785,52 +792,61 @@ func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBuck
return PartInfo{}, traceError(NotSupported{})
}
// Checks if minio.sys.temp/multipart/v1/<upload-id>/gcs.json exists, returns
// an object layer compatible error upon any error.
func (l *gcsGateway) checkUploadIDExists(bucket string, key string, uploadID string) error {
_, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(l.ctx)
return gcsToObjectError(traceError(err), bucket, key)
}
// PutObjectPart puts a part of object in bucket
func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
_, err := object.Attrs(l.ctx)
if err != nil {
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil {
return PartInfo{}, err
}
var sha256Writer hash.Hash
// Generate random ETag.
etag := getMD5Hash([]byte(mustGetUUID()))
var etag string
// Honor etag if client did send md5Hex.
if md5Hex != "" {
etag = md5Hex
} else {
// Generate random ETag.
etag = getMD5Hash([]byte(mustGetUUID()))
}
reader := data
if sha256sum != "" {
sha256Writer = sha256.New()
reader = io.TeeReader(data, sha256Writer)
}
dataName := gcsMultipartDataName(uploadID, etag)
object = l.client.Bucket(bucket).Object(dataName)
object := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, etag))
w := object.NewWriter(l.ctx)
// Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case
// where it tries to upload 0 bytes in the last chunk and get error from server.
w.ChunkSize = 0
if md5Hex != "" {
var err error
w.MD5, err = hex.DecodeString(md5Hex)
if err != nil {
// Make sure to close object writer upon error.
w.Close()
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
}
_, err = io.Copy(w, reader)
if err != nil {
if _, err := io.CopyN(w, reader, size); err != nil {
// Make sure to close object writer upon error.
w.Close()
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
err = w.Close()
if err != nil {
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
// Make sure to close the object writer upon success.
w.Close()
// Verify sha256sum after Close().
if sha256sum != "" {
if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum {
object.Delete(l.ctx)
@ -841,34 +857,19 @@ func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, p
return PartInfo{
PartNumber: partID,
ETag: etag,
LastModified: time.Now().UTC(),
LastModified: UTCNow(),
Size: size,
}, nil
}
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
_, err := object.Attrs(l.ctx)
if err != nil {
return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
return ListPartsInfo{}, nil
return ListPartsInfo{}, l.checkUploadIDExists(bucket, key, uploadID)
}
// Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up.
func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
_, err := object.Attrs(l.ctx)
if err != nil {
return gcsToObjectError(traceError(err), bucket, key)
}
prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPathV1, uploadID)
// iterate through all parts and delete them
@ -893,13 +894,20 @@ func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error {
if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil {
return err
}
return l.cleanupMultipartUpload(bucket, key, uploadID)
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
// Note that there is a limit (currently 32) to the number of components that can be composed in a single operation.
// There is a limit (currently 1024) to the total number of components for a given composite object. This means you can append to each object at most 1023 times.
// There is a per-project rate limit (currently 200) to the number of components you can compose per second. This rate counts both the components being appended to a composite object as well as the components being copied when the composite object of which they are a part is copied.
// Note that there is a limit (currently 32) to the number of components that can
// be composed in a single operation. There is a limit (currently 1024) to the total
// number of components for a given composite object. This means you can append to
// each object at most 1023 times. There is a per-project rate limit (currently 200)
// to the number of components you can compose per second. This rate counts both the
// components being appended to a composite object as well as the components being
// copied when the composite object of which they are a part is copied.
func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
@ -908,29 +916,33 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
r, err := object.NewReader(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
defer r.Close()
// Check version compatibility of the meta file before compose()
multipartMeta := gcsMultipartMetaV1{}
decoder := json.NewDecoder(r)
err = decoder.Decode(&multipartMeta)
if err != nil {
if err = json.NewDecoder(r).Decode(&multipartMeta); err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
if multipartMeta.Version != gcsMultipartMetaCurrentVersion {
if multipartMeta.Version != gcsMinioMultipartMetaCurrentVersion {
return ObjectInfo{}, gcsToObjectError(traceError(errFormatNotSupported), bucket, key)
}
parts := make([]*storage.ObjectHandle, len(uploadedParts))
for i, uploadedPart := range uploadedParts {
parts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag))
// Validate if the gcs.json stores valid entries for the bucket and key.
if multipartMeta.Bucket != bucket || multipartMeta.Object != key {
return ObjectInfo{}, gcsToObjectError(InvalidUploadID{
UploadID: uploadID,
}, bucket, key)
}
if len(parts) > maxPartCount {
return ObjectInfo{}, traceError(NotSupported{})
var parts []*storage.ObjectHandle
for _, uploadedPart := range uploadedParts {
parts = append(parts, l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag)))
}
composeCount := int(math.Ceil(float64(len(parts)) / float64(maxComponents)))
@ -952,8 +964,7 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
composer.ContentType = partZeroAttrs.ContentType
composer.Metadata = partZeroAttrs.Metadata
_, err = composer.Run(l.ctx)
if err != nil {
if _, err = composer.Run(l.ctx); err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
}
@ -962,13 +973,9 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
parts = composeParts
}
dst := l.client.Bucket(bucket).Object(key)
composer := dst.ComposerFrom(parts...)
composer := l.client.Bucket(bucket).Object(key).ComposerFrom(parts...)
composer.ContentType = partZeroAttrs.ContentType
composer.Metadata = partZeroAttrs.Metadata
attrs, err := composer.Run(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
@ -1026,15 +1033,12 @@ func (l *gcsGateway) SetBucketPolicies(bucket string, policyInfo policy.BucketAc
// GetBucketPolicies - Get policy on bucket
func (l *gcsGateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) {
acl := l.client.Bucket(bucket).ACL()
rules, err := acl.List(l.ctx)
rules, err := l.client.Bucket(bucket).ACL().List(l.ctx)
if err != nil {
return policy.BucketAccessPolicy{}, gcsToObjectError(traceError(err), bucket)
}
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
for _, r := range rules {
if r.Entity != storage.AllUsers || r.Role == storage.RoleOwner {
continue
@ -1052,10 +1056,8 @@ func (l *gcsGateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy
// DeleteBucketPolicies - Delete all policies on bucket
func (l *gcsGateway) DeleteBucketPolicies(bucket string) error {
acl := l.client.Bucket(bucket).ACL()
// This only removes the storage.AllUsers policies
if err := acl.Delete(l.ctx, storage.AllUsers); err != nil {
if err := l.client.Bucket(bucket).ACL().Delete(l.ctx, storage.AllUsers); err != nil {
return gcsToObjectError(traceError(err), bucket)
}