add healing fixes for delete-marker (#12788)

- delete-markers are incorrectly reported
  as corrupt with wrong data sent to client
  'mc admin heal -r' on objects with delete
  marker will report as 'grey' incorrectly.

- do not heal delete-markers during HeadObject()
  this can lead to inconsistent order of heals
  on the object, although this is not an issue
  in terms of order of versions it is rather
  simpler to keep the same order on all drives.

- defaultHealResult() should handle 'err == nil'
  case such that valid cases should be handled
  as 'drive' status OK.
This commit is contained in:
Harshavardhana 2021-07-26 08:01:41 -07:00 committed by GitHub
parent 39874b77ed
commit f175ff8f66
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 32 deletions

View file

@ -357,7 +357,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
}
// File is fully gone, fileInfo is empty.
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs,
bucket, object, versionID), err
}
// If less than read quorum number of disks have all the parts
@ -642,30 +643,25 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
// Populates default heal result item entries with possible values when we are returning prematurely.
// This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []string, errs []error, bucket, object, versionID string, defaultParityCount int) madmin.HealResultItem {
func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []string, errs []error, bucket, object, versionID string) madmin.HealResultItem {
// Initialize heal result object
result := madmin.HealResultItem{
Type: madmin.HealItemObject,
Bucket: bucket,
Object: object,
VersionID: versionID,
DiskCount: len(storageDisks),
Type: madmin.HealItemObject,
Bucket: bucket,
Object: object,
ObjectSize: lfi.Size,
VersionID: versionID,
DiskCount: len(storageDisks),
}
if lfi.IsValid() {
result.ObjectSize = lfi.Size
result.ParityBlocks = lfi.Erasure.ParityBlocks
} else {
// Default to most common configuration for erasure blocks.
result.ParityBlocks = defaultParityCount
result.ParityBlocks = er.defaultParityCount
}
result.DataBlocks = len(storageDisks) - result.ParityBlocks
if errs == nil {
// No disks related errors are provided, quit
return result
}
for index, disk := range storageDisks {
if disk == nil {
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
@ -684,6 +680,8 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints
switch errs[index] {
case errFileNotFound, errVolumeNotFound:
driveState = madmin.DriveStateMissing
case nil:
driveState = madmin.DriveStateOk
}
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
@ -697,15 +695,6 @@ func defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints
})
}
if !lfi.IsValid() {
// Default to most common configuration for erasure blocks.
result.ParityBlocks = defaultParityCount
result.DataBlocks = len(storageDisks) - defaultParityCount
} else {
result.ParityBlocks = lfi.Erasure.ParityBlocks
result.DataBlocks = lfi.Erasure.DataBlocks
}
return result
}
@ -820,15 +809,18 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
if versionID != "" {
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
}
return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
}
return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), toObjectErr(err, bucket, object, versionID)
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID)
}
readQuorum := len(storageDisks) - er.defaultParityCount
err := toObjectErr(reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum), bucket, object, versionID)
return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
err := toObjectErr(reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum),
bucket, object, versionID)
return er.defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID), err
}
// Object is considered dangling/corrupted if any only
@ -921,14 +913,16 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
}
// Nothing to do, file is already gone.
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
}
// Return early if all ok and not deep scanning.
if opts.ScanMode == madmin.HealNormalScan && fileInfoConsistent(ctx, partsMetadata, errs) {
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
if err == nil && fi.VersionID == versionID {
return defaultHealResult(fi, storageDisks, storageEndpoints, errs, bucket, object, versionID, fi.Erasure.ParityBlocks), nil
return er.defaultHealResult(fi, storageDisks, storageEndpoints,
errs, bucket, object, versionID), nil
}
}

View file

@ -442,7 +442,9 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
// if missing metadata can be reconstructed, attempt to reconstruct.
if missingBlocks > 0 && missingBlocks < readQuorum {
// additionally do not heal delete markers inline, let them be
// healed upon regular heal process.
if !fi.Deleted && missingBlocks > 0 && missingBlocks < readQuorum {
if _, healing := er.getOnlineDisksWithHealing(); !healing {
go healObject(bucket, object, fi.VersionID, madmin.HealNormalScan)
}

View file

@ -906,8 +906,12 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
var combinedErr []string
for i, err := range errs {
if err != nil {
combinedErr = append(combinedErr,
fmt.Sprintf("disk %s returned: %s", disks[i], err))
if disks[i] != nil {
combinedErr = append(combinedErr,
fmt.Sprintf("disk %s returned: %s", disks[i], err))
} else {
combinedErr = append(combinedErr, err.Error())
}
}
}
return errors.New(strings.Join(combinedErr, ", "))