diff --git a/cmd/gateway-gcs.go b/cmd/gateway-gcs.go index 2b60f0725..7ebae3a9d 100644 --- a/cmd/gateway-gcs.go +++ b/cmd/gateway-gcs.go @@ -28,6 +28,7 @@ import ( "math" "regexp" "strings" + "time" "golang.org/x/oauth2/google" @@ -41,29 +42,38 @@ import ( ) const ( - // gcsMinioMeta is used for multiparts. We have "minio.sys.temp" prefix so that + // gcsMinioSysTmp is used for multiparts. We have "minio.sys.tmp" prefix so that // listing on the GCS lists this entry in the end. Also in the gateway // ListObjects we filter out this entry. - gcsMinioPath = "minio.sys.temp/" + gcsMinioSysTmp = "minio.sys.tmp/" + // Path where multipart objects are saved. // If we change the backend format we will use a different url path like /multipart/v2 // but we will not migrate old data. - gcsMinioMultipartPathV1 = gcsMinioPath + "multipart/v1" + gcsMinioMultipartPathV1 = gcsMinioSysTmp + "multipart/v1" + // Multipart meta file. gcsMinioMultipartMeta = "gcs.json" + // gcs.json version number gcsMinioMultipartMetaCurrentVersion = "1" // token prefixed with GCS returned marker to differentiate // from user supplied marker. - gcsTokenPrefix = "##minio" + gcsTokenPrefix = "{minio}" - // maxComponents - maximum component object count to create a composite object. + // Maximum component object count to create a composite object. // Refer https://cloud.google.com/storage/docs/composite-objects - maxComponents = 32 + gcsMaxComponents = 32 - // maxPartCount - maximum multipart parts GCS supports which is 32 x 32 = 1024. - maxPartCount = 1024 + // gcsMaxPartCount - maximum multipart parts GCS supports which is 32 x 32 = 1024. + gcsMaxPartCount = 1024 + + // Every 24 hours we scan minio.sys.tmp to delete expired multiparts in minio.sys.tmp + gcsCleanupInterval = time.Hour * 24 + + // The cleanup routine deletes files older than 2 weeks in minio.sys.tmp + gcsMultipartExpiry = time.Hour * 24 * 14 ) // Stored in gcs.json - Contents of this file is not used anywhere. It can be @@ -268,12 +278,56 @@ func newGCSGateway(projectID string) (GatewayLayer, error) { return nil, err } - return &gcsGateway{ + gateway := &gcsGateway{ client: client, projectID: projectID, ctx: ctx, anonClient: anonClient, - }, nil + } + // Start background process to cleanup old files in minio.sys.tmp + go gateway.CleanupGCSMinioSysTmp() + return gateway, nil +} + +// Cleanup old files in minio.sys.tmp of the given bucket. +func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) { + it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: gcsMinioSysTmp, Versions: false}) + for { + attrs, err := it.Next() + if err != nil { + if err != iterator.Done { + errorIf(err, "Object listing error on bucket %s during purging of old files in minio.sys.tmp", bucket) + } + return + } + if time.Since(attrs.Updated) > gcsMultipartExpiry { + // Delete files older than 2 weeks. + err := l.client.Bucket(bucket).Object(attrs.Name).Delete(l.ctx) + if err != nil { + errorIf(err, "Unable to delete %s/%s during purging of old files in minio.sys.tmp", bucket, attrs.Name) + return + } + } + } +} + +// Cleanup old files in minio.sys.tmp of all buckets. +func (l *gcsGateway) CleanupGCSMinioSysTmp() { + for { + it := l.client.Buckets(l.ctx, l.projectID) + for { + attrs, err := it.Next() + if err != nil { + if err != iterator.Done { + errorIf(err, "Bucket listing error during purging of old files in minio.sys.tmp") + } + break + } + l.CleanupGCSMinioSysTmpBucket(attrs.Name) + } + // Run the cleanup loop every 1 day. + time.Sleep(gcsCleanupInterval) + } } // Shutdown - save any gateway metadata to disk @@ -344,7 +398,7 @@ func (l *gcsGateway) ListBuckets() (buckets []BucketInfo, err error) { func (l *gcsGateway) DeleteBucket(bucket string) error { itObject := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: slashSeparator, Versions: false}) // We list the bucket and if we find any objects we return BucketNotEmpty error. If we - // find only "minio.sys.temp/" then we remove it before deleting the bucket. + // find only "minio.sys.tmp/" then we remove it before deleting the bucket. gcsMinioPathFound := false nonGCSMinioPathFound := false for { @@ -355,7 +409,7 @@ func (l *gcsGateway) DeleteBucket(bucket string) error { if err != nil { return gcsToObjectError(traceError(err)) } - if objAttrs.Prefix == gcsMinioPath { + if objAttrs.Prefix == gcsMinioSysTmp { gcsMinioPathFound = true continue } @@ -366,8 +420,8 @@ func (l *gcsGateway) DeleteBucket(bucket string) error { return gcsToObjectError(traceError(BucketNotEmpty{})) } if gcsMinioPathFound { - // Remove minio.sys.temp before deleting the bucket. - itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: gcsMinioPath}) + // Remove minio.sys.tmp before deleting the bucket. + itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: gcsMinioSysTmp}) for { objAttrs, err := itObject.Next() if err == iterator.Done { @@ -452,7 +506,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de // metadata folder, then just break // otherwise we've truncated the output attrs, _ := it.Next() - if attrs != nil && attrs.Prefix == gcsMinioPath { + if attrs != nil && attrs.Prefix == gcsMinioSysTmp { break } @@ -470,16 +524,16 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de nextMarker = toGCSPageToken(attrs.Name) - if attrs.Prefix == gcsMinioPath { + if attrs.Prefix == gcsMinioSysTmp { // We don't return our metadata prefix. continue } - if !strings.HasPrefix(prefix, gcsMinioPath) { + if !strings.HasPrefix(prefix, gcsMinioSysTmp) { // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // which will be helpful to observe the "directory structure" for debugging purposes. - if strings.HasPrefix(attrs.Prefix, gcsMinioPath) || - strings.HasPrefix(attrs.Name, gcsMinioPath) { + if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) || + strings.HasPrefix(attrs.Name, gcsMinioSysTmp) { continue } } @@ -552,16 +606,16 @@ func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fet return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix) } - if attrs.Prefix == gcsMinioPath { + if attrs.Prefix == gcsMinioSysTmp { // We don't return our metadata prefix. continue } - if !strings.HasPrefix(prefix, gcsMinioPath) { + if !strings.HasPrefix(prefix, gcsMinioSysTmp) { // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // which will be helpful to observe the "directory structure" for debugging purposes. - if strings.HasPrefix(attrs.Prefix, gcsMinioPath) || - strings.HasPrefix(attrs.Name, gcsMinioPath) { + if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) || + strings.HasPrefix(attrs.Name, gcsMinioSysTmp) { continue } } @@ -792,7 +846,7 @@ func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBuck return PartInfo{}, traceError(NotSupported{}) } -// Checks if minio.sys.temp/multipart/v1//gcs.json exists, returns +// Checks if minio.sys.tmp/multipart/v1//gcs.json exists, returns // an object layer compatible error upon any error. func (l *gcsGateway) checkUploadIDExists(bucket string, key string, uploadID string) error { _, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(l.ctx) @@ -948,18 +1002,18 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID // Returns name of the composed object. gcsMultipartComposeName := func(uploadID string, composeNumber int) string { - return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", gcsMinioPath, uploadID, composeNumber) + return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", gcsMinioSysTmp, uploadID, composeNumber) } - composeCount := int(math.Ceil(float64(len(parts)) / float64(maxComponents))) + composeCount := int(math.Ceil(float64(len(parts)) / float64(gcsMaxComponents))) if composeCount > 1 { // Create composes of every 32 parts. composeParts := make([]*storage.ObjectHandle, composeCount) for i := 0; i < composeCount; i++ { // Create 'composed-object-N' using next 32 parts. composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartComposeName(uploadID, i)) - start := i * maxComponents - end := start + maxComponents + start := i * gcsMaxComponents + end := start + gcsMaxComponents if end > len(parts) { end = len(parts) } diff --git a/cmd/gateway-gcs_test.go b/cmd/gateway-gcs_test.go index b54798faf..9a865973e 100644 --- a/cmd/gateway-gcs_test.go +++ b/cmd/gateway-gcs_test.go @@ -108,15 +108,15 @@ func TestIsGCSMarker(t *testing.T) { expected bool }{ { - marker: "##miniogcs123", + marker: "{minio}gcs123", expected: true, }, { - marker: "##mini_notgcs123", + marker: "{mini_no}tgcs123", expected: false, }, { - marker: "#minioagainnotgcs123", + marker: "{minioagainnotgcs123", expected: false, }, {