From fa211f6a1044fbee8b2c8f7a95fe3d60e8fc8f73 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 8 Jul 2020 04:54:09 +0100 Subject: [PATCH] heal: Fix healing delete markers (#9989) --- cmd/erasure-healing.go | 182 +++++++++++++++++++++++------------------ 1 file changed, 102 insertions(+), 80 deletions(-) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 72aad7736..e6cd871b6 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -242,8 +242,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s Bucket: bucket, Object: object, DiskCount: len(storageDisks), - ParityBlocks: latestFileInfo.Erasure.ParityBlocks, - DataBlocks: latestFileInfo.Erasure.DataBlocks, + ParityBlocks: len(storageDisks) / 2, + DataBlocks: len(storageDisks) / 2, // Initialize object size to -1, so we can detect if we are // unable to reliably find the object size. @@ -265,8 +265,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // If data is sane on any one disk, we can // extract the correct object size. result.ObjectSize = partsMetadata[i].Size - result.ParityBlocks = partsMetadata[i].Erasure.ParityBlocks - result.DataBlocks = partsMetadata[i].Erasure.DataBlocks + if partsMetadata[i].Erasure.ParityBlocks > 0 && partsMetadata[i].Erasure.DataBlocks > 0 { + result.ParityBlocks = partsMetadata[i].Erasure.ParityBlocks + result.DataBlocks = partsMetadata[i].Erasure.DataBlocks + } case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound: driveState = madmin.DriveStateOffline case errs[i] == errFileNotFound, errs[i] == errVolumeNotFound: @@ -315,7 +317,11 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s writeQuorum = getWriteQuorum(len(storageDisks)) } if !dryRun && remove { - err = er.deleteObject(ctx, bucket, object, writeQuorum) + if latestFileInfo.VersionID == "" { + err = er.deleteObject(ctx, bucket, object, writeQuorum) + } else { + err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: latestFileInfo.VersionID}) + } } return defaultHealResult(latestFileInfo, storageDisks, storageEndpoints, errs, bucket, object), err } @@ -348,10 +354,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s return nfi } - // Reorder so that we have data disks first and parity disks next. - latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution) - outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution) - partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution) + // We write at temporary location and then rename to final location. + tmpID := mustGetUUID() + for i := range outDatedDisks { if outDatedDisks[i] == nil { continue @@ -359,80 +364,85 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s partsMetadata[i] = cleanFileInfo(latestMeta) } - // We write at temporary location and then rename to final location. - tmpID := mustGetUUID() + if !latestMeta.Deleted { + result.DataBlocks = latestMeta.Erasure.DataBlocks + result.ParityBlocks = latestMeta.Erasure.ParityBlocks - // Heal each part. erasureHealFile() will write the healed - // part to .minio/tmp/uuid/ which needs to be renamed later to - // the final location. - erasure, err := NewErasure(ctx, latestMeta.Erasure.DataBlocks, - latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize) - if err != nil { - return result, toObjectErr(err, bucket, object) - } + // Reorder so that we have data disks first and parity disks next. + latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution) + outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution) + partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution) - erasureInfo := latestMeta.Erasure - for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { - partSize := latestMeta.Parts[partIndex].Size - partActualSize := latestMeta.Parts[partIndex].ActualSize - partNumber := latestMeta.Parts[partIndex].Number - tillOffset := erasure.ShardFileOffset(0, partSize, partSize) - readers := make([]io.ReaderAt, len(latestDisks)) - checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm - for i, disk := range latestDisks { - if disk == OfflineDisk { - continue - } - checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber) - partPath := pathJoin(object, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) - readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) - } - writers := make([]io.Writer, len(outDatedDisks)) - for i, disk := range outDatedDisks { - if disk == OfflineDisk { - continue - } - partPath := pathJoin(tmpID, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) - } - err = erasure.Heal(ctx, readers, writers, partSize) - closeBitrotReaders(readers) - closeBitrotWriters(writers) + // Heal each part. erasureHealFile() will write the healed + // part to .minio/tmp/uuid/ which needs to be renamed later to + // the final location. + erasure, err := NewErasure(ctx, latestMeta.Erasure.DataBlocks, + latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize) if err != nil { return result, toObjectErr(err, bucket, object) } - // outDatedDisks that had write errors should not be - // written to for remaining parts, so we nil it out. - for i, disk := range outDatedDisks { - if disk == OfflineDisk { - continue + + erasureInfo := latestMeta.Erasure + for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { + partSize := latestMeta.Parts[partIndex].Size + partActualSize := latestMeta.Parts[partIndex].ActualSize + partNumber := latestMeta.Parts[partIndex].Number + tillOffset := erasure.ShardFileOffset(0, partSize, partSize) + readers := make([]io.ReaderAt, len(latestDisks)) + checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm + for i, disk := range latestDisks { + if disk == OfflineDisk { + continue + } + checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber) + partPath := pathJoin(object, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) + readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) + } + writers := make([]io.Writer, len(outDatedDisks)) + for i, disk := range outDatedDisks { + if disk == OfflineDisk { + continue + } + partPath := pathJoin(tmpID, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + } + err = erasure.Heal(ctx, readers, writers, partSize) + closeBitrotReaders(readers) + closeBitrotWriters(writers) + if err != nil { + return result, toObjectErr(err, bucket, object) + } + // outDatedDisks that had write errors should not be + // written to for remaining parts, so we nil it out. + for i, disk := range outDatedDisks { + if disk == OfflineDisk { + continue + } + + // A non-nil stale disk which did not receive + // a healed part checksum had a write error. + if writers[i] == nil { + outDatedDisks[i] = nil + disksToHealCount-- + continue + } + + partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ + PartNumber: partNumber, + Algorithm: checksumAlgo, + Hash: bitrotWriterSum(writers[i]), + }) } - // A non-nil stale disk which did not receive - // a healed part checksum had a write error. - if writers[i] == nil { - outDatedDisks[i] = nil - disksToHealCount-- - continue + // If all disks are having errors, we give up. + if disksToHealCount == 0 { + return result, fmt.Errorf("all disks had write errors, unable to heal") } - - partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize) - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ - PartNumber: partNumber, - Algorithm: checksumAlgo, - Hash: bitrotWriterSum(writers[i]), - }) - } - - // If all disks are having errors, we give up. - if disksToHealCount == 0 { - return result, fmt.Errorf("all disks had write errors, unable to heal") } } - // Cleanup in case of er.meta writing failure - writeQuorum := latestMeta.Erasure.DataBlocks + 1 - defer er.deleteObject(ctx, minioMetaTmpBucket, tmpID, writeQuorum) + defer er.deleteObject(ctx, minioMetaTmpBucket, tmpID, len(storageDisks)/2+1) // Generate and write `xl.meta` generated from other disks. outDatedDisks, err = writeUniqueFileInfo(ctx, outDatedDisks, minioMetaTmpBucket, tmpID, @@ -659,12 +669,12 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid // We can consider an object data not reliable // when er.meta is not found in read quorum disks. // or when er.meta is not readable in read quorum disks. - var notFoundErasureJSON, corruptedErasureJSON int + var notFoundErasureMeta, corruptedErasureMeta int for _, readErr := range errs { - if readErr == errFileNotFound { - notFoundErasureJSON++ + if readErr == errFileNotFound || readErr == errFileVersionNotFound { + notFoundErasureMeta++ } else if readErr == errCorruptedFormat { - corruptedErasureJSON++ + corruptedErasureMeta++ } } var notFoundParts int @@ -674,7 +684,7 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid // double counting when both parts and er.meta // are not available. if errs[i] != dataErrs[i] { - if dataErrs[i] == errFileNotFound { + if dataErrs[i] == errFileNotFound || dataErrs[i] == errFileVersionNotFound { notFoundParts++ } } @@ -688,13 +698,17 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid break } + if validMeta.Deleted { + return validMeta, false + } + // We couldn't find any valid meta we are indeed corrupted, return true right away. if validMeta.Erasure.DataBlocks == 0 { return validMeta, true } // We have valid meta, now verify if we have enough files with parity blocks. - return validMeta, corruptedErasureJSON+notFoundErasureJSON+notFoundParts > validMeta.Erasure.ParityBlocks + return validMeta, corruptedErasureMeta+notFoundErasureMeta+notFoundParts > validMeta.Erasure.ParityBlocks } // HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true. @@ -729,7 +743,11 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version writeQuorum = getWriteQuorum(len(storageDisks)) } if !opts.DryRun && opts.Remove { - er.deleteObject(healCtx, bucket, object, writeQuorum) + if versionID == "" { + er.deleteObject(healCtx, bucket, object, writeQuorum) + } else { + er.deleteObjectVersion(healCtx, bucket, object, writeQuorum, FileInfo{VersionID: versionID}) + } } err = reduceReadQuorumErrs(ctx, errs, nil, writeQuorum-1) return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object) @@ -758,7 +776,11 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version writeQuorum = getWriteQuorum(len(storageDisks)) } if !opts.DryRun && opts.Remove { - er.deleteObject(ctx, bucket, object, writeQuorum) + if versionID == "" { + er.deleteObject(ctx, bucket, object, writeQuorum) + } else { + er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: versionID}) + } } } return defaultHealResult(latestFileInfo, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object)