diff --git a/cmd/gateway-gcs.go b/cmd/gateway-gcs.go index a99e13832..673647045 100644 --- a/cmd/gateway-gcs.go +++ b/cmd/gateway-gcs.go @@ -25,6 +25,7 @@ import ( "fmt" "hash" "io" + "math" "regexp" "strings" "time" @@ -56,6 +57,13 @@ const ( // token prefixed with GCS returned marker to differentiate // from user supplied marker. gcsTokenPrefix = "##minio" + + // maxComponents - maximum component object count to create a composite object. + // Refer https://cloud.google.com/storage/docs/composite-objects + maxComponents = 32 + + // maxPartCount - maximum multipart parts GCS supports which is 32 x 32 = 1024. + maxPartCount = 1024 ) // Stored in gcs.json - Contents of this file is not used anywhere. It can be @@ -899,14 +907,39 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID parts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag)) } - if len(parts) > 32 { - // we need to split up the compose of more than 32 parts - // into subcomposes. This means that the first 32 parts will - // compose to a composed-object-0, next parts to composed-object-1, - // the final compose will compose composed-object* to 1. + if len(parts) > maxPartCount { return ObjectInfo{}, traceError(NotSupported{}) } + composeCount := int(math.Ceil(float64(len(parts)) / float64(maxComponents))) + if composeCount > 1 { + // Create composes of every 32 parts. + composeParts := make([]*storage.ObjectHandle, composeCount) + for i := 0; i < composeCount; i++ { + // Create 'composed-object-N' using next 32 parts. + composeName := fmt.Sprintf("composed-object-%d", i) + composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, composeName)) + + start := i * maxComponents + end := start + maxComponents + if end > len(parts) { + end = len(parts) + } + + composer := composeParts[i].ComposerFrom(parts[start:end]...) + composer.ContentType = partZeroAttrs.ContentType + composer.Metadata = partZeroAttrs.Metadata + + _, err = composer.Run(l.ctx) + if err != nil { + return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) + } + } + + // As composes are successfully created, final object needs to be created using composes. + parts = composeParts + } + dst := l.client.Bucket(bucket).Object(key) composer := dst.ComposerFrom(parts...)