diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index de5a1c297..74ec2e216 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -25,6 +25,7 @@ import ( "time" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -226,8 +227,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s dryRun := opts.DryRun scanMode := opts.ScanMode - dataBlocks := lfi.Erasure.DataBlocks - storageDisks := er.getDisks() storageEndpoints := er.getEndpoints() @@ -244,8 +243,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s Bucket: bucket, Object: object, DiskCount: len(storageDisks), - ParityBlocks: len(storageDisks) / 2, - DataBlocks: len(storageDisks) / 2, + ParityBlocks: er.defaultParityCount, + DataBlocks: len(storageDisks) - er.defaultParityCount, } // Loop to find number of disks with valid data, per-drive @@ -316,7 +315,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // If less than read quorum number of disks have all the parts // of the data, we can't reconstruct the erasure-coded data. - if numAvailableDisks < dataBlocks { + if numAvailableDisks < result.DataBlocks { return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, dataErrs, opts) } @@ -333,7 +332,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // Latest FileInfo for reference. If a valid metadata is not // present, it is as good as object not found. - latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataBlocks) + latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks) if err != nil { return result, toObjectErr(err, bucket, object, versionID) } @@ -363,7 +362,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s dataDir = migrateDataDir } - if !latestMeta.Deleted { + if !latestMeta.Deleted || latestMeta.TransitionStatus != lifecycle.TransitionComplete { result.DataBlocks = latestMeta.Erasure.DataBlocks result.ParityBlocks = latestMeta.Erasure.ParityBlocks @@ -775,8 +774,10 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid break } - if validMeta.Deleted { - // notFoundParts is ignored since a delete marker does not have any parts + if validMeta.Deleted || validMeta.TransitionStatus == lifecycle.TransitionComplete { + // notFoundParts is ignored since a + // - delete marker does not have any parts + // - transition status of complete has no parts return validMeta, corruptedErasureMeta+notFoundErasureMeta > len(errs)/2 } diff --git a/cmd/xl-storage-format-v1.go b/cmd/xl-storage-format-v1.go index c0128c3c5..cf374b799 100644 --- a/cmd/xl-storage-format-v1.go +++ b/cmd/xl-storage-format-v1.go @@ -182,10 +182,6 @@ func (m *xlMetaV1Object) ToFileInfo(volume, path string) (FileInfo, error) { if !m.valid() { return FileInfo{}, errFileCorrupt } - var transitionStatus string - if st, ok := m.Meta[ReservedMetadataPrefixLower+"transition-status"]; ok { - transitionStatus = st - } fi := FileInfo{ Volume: volume, Name: path, @@ -197,8 +193,8 @@ func (m *xlMetaV1Object) ToFileInfo(volume, path string) (FileInfo, error) { VersionID: m.VersionID, DataDir: m.DataDir, } - if transitionStatus != "" { - fi.TransitionStatus = transitionStatus + if st, ok := m.Meta[ReservedMetadataPrefixLower+"transition-status"]; ok { + fi.TransitionStatus = st } return fi, nil } diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index abcffadec..dc95a2feb 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -260,6 +260,7 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error { ventry.DeleteMarker = &xlMetaV2DeleteMarker{ VersionID: uv, ModTime: fi.ModTime.UnixNano(), + MetaSys: make(map[string][]byte), } } else { ventry.Type = ObjectType @@ -355,13 +356,19 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) versionID = uuid.UUID(j.VersionID).String() } fi := FileInfo{ - Volume: volume, - Name: path, - ModTime: time.Unix(0, j.ModTime).UTC(), - VersionID: versionID, - Deleted: true, - DeleteMarkerReplicationStatus: string(j.MetaSys[xhttp.AmzBucketReplicationStatus]), - VersionPurgeStatus: VersionPurgeStatusType(string(j.MetaSys[VersionPurgeStatusKey])), + Volume: volume, + Name: path, + ModTime: time.Unix(0, j.ModTime).UTC(), + VersionID: versionID, + Deleted: true, + } + for k, v := range j.MetaSys { + if strings.EqualFold(k, xhttp.AmzBucketReplicationStatus) { + fi.DeleteMarkerReplicationStatus = string(v) + } + if strings.EqualFold(k, VersionPurgeStatusKey) { + fi.VersionPurgeStatus = VersionPurgeStatusType(string(v)) + } } return fi, nil } @@ -408,12 +415,15 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) { fi.Metadata[k] = v } for k, v := range j.MetaSys { - if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { - fi.Metadata[k] = string(v) + if strings.EqualFold(strings.ToLower(k), ReservedMetadataPrefixLower+"transition-status") { + fi.TransitionStatus = string(v) } if strings.EqualFold(k, VersionPurgeStatusKey) { fi.VersionPurgeStatus = VersionPurgeStatusType(string(v)) } + if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { + fi.Metadata[k] = string(v) + } } fi.Erasure.Algorithm = j.ErasureAlgorithm.String() fi.Erasure.Index = j.ErasureIndex @@ -426,9 +436,6 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) { } fi.DataDir = uuid.UUID(j.DataDir).String() - if st, ok := j.MetaSys[ReservedMetadataPrefixLower+"transition-status"]; ok { - fi.TransitionStatus = string(st) - } return fi, nil } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 0e55207eb..8ad2cd7a2 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -962,7 +962,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F } // when data-dir is specified. Transition leverages existing DeleteObject - // api call to mark object as deleted.When object is pending transition, + // api call to mark object as deleted. When object is pending transition, // just update the metadata and avoid deleting data dir. if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending { filePath := pathJoin(volumeDir, path, dataDir) @@ -974,6 +974,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } } + // transitioned objects maintains metadata on the source cluster. When transition // status is set, update the metadata to disk. if !lastVersion || fi.TransitionStatus != "" {