heal: Fix healing empty directories (#7154)

This commit fixes the computation of Before/After healing state
for empty directories.

Issues before the commit:
- Before state doesn't reflect the real status (no StatVol() called)
- For any MakeVol() error, healObjectDir is exited directly, which is
  wrong.
This commit is contained in:
Anis Elleuch 2019-01-30 19:51:56 +01:00 committed by kannappanr
parent d3553f8dfc
commit 2d9860e875

View file

@ -515,44 +515,55 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
ObjectSize: 0,
}
hr.Before.Drives = make([]madmin.HealDriveInfo, len(storageDisks))
hr.After.Drives = make([]madmin.HealDriveInfo, len(storageDisks))
var wg sync.WaitGroup
// Prepare object creation in all disks
for _, disk := range storageDisks {
if disk == nil {
hr.Before.Drives = append(hr.Before.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateOffline,
})
hr.After.Drives = append(hr.After.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateMissing,
})
continue
}
drive := disk.String()
hr.Before.Drives = append(hr.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: madmin.DriveStateMissing,
})
hr.After.Drives = append(hr.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: madmin.DriveStateMissing,
})
if !dryRun {
if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists {
return hr, toObjectErr(err, bucket, object)
for i, d := range storageDisks {
wg.Add(1)
go func(idx int, disk StorageAPI) {
defer wg.Done()
if disk == nil {
hr.Before.Drives[idx] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
hr.After.Drives[idx] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
return
}
for i, v := range hr.Before.Drives {
if v.Endpoint == drive {
hr.After.Drives[i].State = madmin.DriveStateOk
}
drive := disk.String()
hr.Before.Drives[idx] = madmin.HealDriveInfo{UUID: "", Endpoint: drive, State: madmin.DriveStateOffline}
hr.After.Drives[idx] = madmin.HealDriveInfo{UUID: "", Endpoint: drive, State: madmin.DriveStateOffline}
_, statErr := disk.StatVol(pathJoin(bucket, object))
switch statErr {
case nil:
hr.Before.Drives[idx].State = madmin.DriveStateOk
hr.After.Drives[idx].State = madmin.DriveStateOk
// Object is fine in this disk, nothing to be done anymore, exiting
return
case errVolumeNotFound:
hr.Before.Drives[idx].State = madmin.DriveStateMissing
hr.After.Drives[idx].State = madmin.DriveStateMissing
default:
logger.LogIf(ctx, err)
return
}
}
if dryRun {
return
}
if err := disk.MakeVol(pathJoin(bucket, object)); err == nil || err == errVolumeExists {
hr.After.Drives[idx].State = madmin.DriveStateOk
} else {
logger.LogIf(ctx, err)
hr.After.Drives[idx].State = madmin.DriveStateOffline
}
}(i, d)
}
wg.Wait()
return hr, nil
}