heal: Fix healing delete markers (#9989)

This commit is contained in:
Anis Elleuch 2020-07-08 04:54:09 +01:00 committed by GitHub
parent 72e0745e2f
commit fa211f6a10
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -242,8 +242,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,
DiskCount: len(storageDisks), DiskCount: len(storageDisks),
ParityBlocks: latestFileInfo.Erasure.ParityBlocks, ParityBlocks: len(storageDisks) / 2,
DataBlocks: latestFileInfo.Erasure.DataBlocks, DataBlocks: len(storageDisks) / 2,
// Initialize object size to -1, so we can detect if we are // Initialize object size to -1, so we can detect if we are
// unable to reliably find the object size. // unable to reliably find the object size.
@ -265,8 +265,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// If data is sane on any one disk, we can // If data is sane on any one disk, we can
// extract the correct object size. // extract the correct object size.
result.ObjectSize = partsMetadata[i].Size result.ObjectSize = partsMetadata[i].Size
result.ParityBlocks = partsMetadata[i].Erasure.ParityBlocks if partsMetadata[i].Erasure.ParityBlocks > 0 && partsMetadata[i].Erasure.DataBlocks > 0 {
result.DataBlocks = partsMetadata[i].Erasure.DataBlocks result.ParityBlocks = partsMetadata[i].Erasure.ParityBlocks
result.DataBlocks = partsMetadata[i].Erasure.DataBlocks
}
case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound: case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound:
driveState = madmin.DriveStateOffline driveState = madmin.DriveStateOffline
case errs[i] == errFileNotFound, errs[i] == errVolumeNotFound: case errs[i] == errFileNotFound, errs[i] == errVolumeNotFound:
@ -315,7 +317,11 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
writeQuorum = getWriteQuorum(len(storageDisks)) writeQuorum = getWriteQuorum(len(storageDisks))
} }
if !dryRun && remove { if !dryRun && remove {
err = er.deleteObject(ctx, bucket, object, writeQuorum) if latestFileInfo.VersionID == "" {
err = er.deleteObject(ctx, bucket, object, writeQuorum)
} else {
err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: latestFileInfo.VersionID})
}
} }
return defaultHealResult(latestFileInfo, storageDisks, storageEndpoints, errs, bucket, object), err return defaultHealResult(latestFileInfo, storageDisks, storageEndpoints, errs, bucket, object), err
} }
@ -348,10 +354,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
return nfi return nfi
} }
// Reorder so that we have data disks first and parity disks next. // We write at temporary location and then rename to final location.
latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution) tmpID := mustGetUUID()
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
for i := range outDatedDisks { for i := range outDatedDisks {
if outDatedDisks[i] == nil { if outDatedDisks[i] == nil {
continue continue
@ -359,80 +364,85 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
partsMetadata[i] = cleanFileInfo(latestMeta) partsMetadata[i] = cleanFileInfo(latestMeta)
} }
// We write at temporary location and then rename to final location. if !latestMeta.Deleted {
tmpID := mustGetUUID() result.DataBlocks = latestMeta.Erasure.DataBlocks
result.ParityBlocks = latestMeta.Erasure.ParityBlocks
// Heal each part. erasureHealFile() will write the healed // Reorder so that we have data disks first and parity disks next.
// part to .minio/tmp/uuid/ which needs to be renamed later to latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
// the final location. outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
erasure, err := NewErasure(ctx, latestMeta.Erasure.DataBlocks, partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
if err != nil {
return result, toObjectErr(err, bucket, object)
}
erasureInfo := latestMeta.Erasure // Heal each part. erasureHealFile() will write the healed
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { // part to .minio/tmp/uuid/ which needs to be renamed later to
partSize := latestMeta.Parts[partIndex].Size // the final location.
partActualSize := latestMeta.Parts[partIndex].ActualSize erasure, err := NewErasure(ctx, latestMeta.Erasure.DataBlocks,
partNumber := latestMeta.Parts[partIndex].Number latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
tillOffset := erasure.ShardFileOffset(0, partSize, partSize)
readers := make([]io.ReaderAt, len(latestDisks))
checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
for i, disk := range latestDisks {
if disk == OfflineDisk {
continue
}
checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber))
readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
}
writers := make([]io.Writer, len(outDatedDisks))
for i, disk := range outDatedDisks {
if disk == OfflineDisk {
continue
}
partPath := pathJoin(tmpID, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber))
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
}
err = erasure.Heal(ctx, readers, writers, partSize)
closeBitrotReaders(readers)
closeBitrotWriters(writers)
if err != nil { if err != nil {
return result, toObjectErr(err, bucket, object) return result, toObjectErr(err, bucket, object)
} }
// outDatedDisks that had write errors should not be
// written to for remaining parts, so we nil it out. erasureInfo := latestMeta.Erasure
for i, disk := range outDatedDisks { for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
if disk == OfflineDisk { partSize := latestMeta.Parts[partIndex].Size
continue partActualSize := latestMeta.Parts[partIndex].ActualSize
partNumber := latestMeta.Parts[partIndex].Number
tillOffset := erasure.ShardFileOffset(0, partSize, partSize)
readers := make([]io.ReaderAt, len(latestDisks))
checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
for i, disk := range latestDisks {
if disk == OfflineDisk {
continue
}
checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber))
readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
}
writers := make([]io.Writer, len(outDatedDisks))
for i, disk := range outDatedDisks {
if disk == OfflineDisk {
continue
}
partPath := pathJoin(tmpID, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber))
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
}
err = erasure.Heal(ctx, readers, writers, partSize)
closeBitrotReaders(readers)
closeBitrotWriters(writers)
if err != nil {
return result, toObjectErr(err, bucket, object)
}
// outDatedDisks that had write errors should not be
// written to for remaining parts, so we nil it out.
for i, disk := range outDatedDisks {
if disk == OfflineDisk {
continue
}
// A non-nil stale disk which did not receive
// a healed part checksum had a write error.
if writers[i] == nil {
outDatedDisks[i] = nil
disksToHealCount--
continue
}
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: partNumber,
Algorithm: checksumAlgo,
Hash: bitrotWriterSum(writers[i]),
})
} }
// A non-nil stale disk which did not receive // If all disks are having errors, we give up.
// a healed part checksum had a write error. if disksToHealCount == 0 {
if writers[i] == nil { return result, fmt.Errorf("all disks had write errors, unable to heal")
outDatedDisks[i] = nil
disksToHealCount--
continue
} }
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: partNumber,
Algorithm: checksumAlgo,
Hash: bitrotWriterSum(writers[i]),
})
}
// If all disks are having errors, we give up.
if disksToHealCount == 0 {
return result, fmt.Errorf("all disks had write errors, unable to heal")
} }
} }
// Cleanup in case of er.meta writing failure defer er.deleteObject(ctx, minioMetaTmpBucket, tmpID, len(storageDisks)/2+1)
writeQuorum := latestMeta.Erasure.DataBlocks + 1
defer er.deleteObject(ctx, minioMetaTmpBucket, tmpID, writeQuorum)
// Generate and write `xl.meta` generated from other disks. // Generate and write `xl.meta` generated from other disks.
outDatedDisks, err = writeUniqueFileInfo(ctx, outDatedDisks, minioMetaTmpBucket, tmpID, outDatedDisks, err = writeUniqueFileInfo(ctx, outDatedDisks, minioMetaTmpBucket, tmpID,
@ -659,12 +669,12 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
// We can consider an object data not reliable // We can consider an object data not reliable
// when er.meta is not found in read quorum disks. // when er.meta is not found in read quorum disks.
// or when er.meta is not readable in read quorum disks. // or when er.meta is not readable in read quorum disks.
var notFoundErasureJSON, corruptedErasureJSON int var notFoundErasureMeta, corruptedErasureMeta int
for _, readErr := range errs { for _, readErr := range errs {
if readErr == errFileNotFound { if readErr == errFileNotFound || readErr == errFileVersionNotFound {
notFoundErasureJSON++ notFoundErasureMeta++
} else if readErr == errCorruptedFormat { } else if readErr == errCorruptedFormat {
corruptedErasureJSON++ corruptedErasureMeta++
} }
} }
var notFoundParts int var notFoundParts int
@ -674,7 +684,7 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
// double counting when both parts and er.meta // double counting when both parts and er.meta
// are not available. // are not available.
if errs[i] != dataErrs[i] { if errs[i] != dataErrs[i] {
if dataErrs[i] == errFileNotFound { if dataErrs[i] == errFileNotFound || dataErrs[i] == errFileVersionNotFound {
notFoundParts++ notFoundParts++
} }
} }
@ -688,13 +698,17 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
break break
} }
if validMeta.Deleted {
return validMeta, false
}
// We couldn't find any valid meta we are indeed corrupted, return true right away. // We couldn't find any valid meta we are indeed corrupted, return true right away.
if validMeta.Erasure.DataBlocks == 0 { if validMeta.Erasure.DataBlocks == 0 {
return validMeta, true return validMeta, true
} }
// We have valid meta, now verify if we have enough files with parity blocks. // We have valid meta, now verify if we have enough files with parity blocks.
return validMeta, corruptedErasureJSON+notFoundErasureJSON+notFoundParts > validMeta.Erasure.ParityBlocks return validMeta, corruptedErasureMeta+notFoundErasureMeta+notFoundParts > validMeta.Erasure.ParityBlocks
} }
// HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true. // HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.
@ -729,7 +743,11 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
writeQuorum = getWriteQuorum(len(storageDisks)) writeQuorum = getWriteQuorum(len(storageDisks))
} }
if !opts.DryRun && opts.Remove { if !opts.DryRun && opts.Remove {
er.deleteObject(healCtx, bucket, object, writeQuorum) if versionID == "" {
er.deleteObject(healCtx, bucket, object, writeQuorum)
} else {
er.deleteObjectVersion(healCtx, bucket, object, writeQuorum, FileInfo{VersionID: versionID})
}
} }
err = reduceReadQuorumErrs(ctx, errs, nil, writeQuorum-1) err = reduceReadQuorumErrs(ctx, errs, nil, writeQuorum-1)
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object) return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object)
@ -758,7 +776,11 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
writeQuorum = getWriteQuorum(len(storageDisks)) writeQuorum = getWriteQuorum(len(storageDisks))
} }
if !opts.DryRun && opts.Remove { if !opts.DryRun && opts.Remove {
er.deleteObject(ctx, bucket, object, writeQuorum) if versionID == "" {
er.deleteObject(ctx, bucket, object, writeQuorum)
} else {
er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{VersionID: versionID})
}
} }
} }
return defaultHealResult(latestFileInfo, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object) return defaultHealResult(latestFileInfo, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object)