Fix wrong reporting of total disks after restart (#13326)

A restart of the cluster and a failed disk will wrongly count 
the number of total disks.
This commit is contained in:
Anis Elleuch 2021-09-29 19:36:19 +01:00 committed by GitHub
parent 7f6ed35347
commit 1d9e91e00f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 44 deletions

View File

@ -22,16 +22,6 @@ import (
"sync"
)
func (er erasureObjects) getLocalDisks() (localDisks []StorageAPI) {
disks := er.getDisks()
for _, disk := range disks {
if disk != nil && disk.IsLocal() {
localDisks = append(localDisks, disk)
}
}
return localDisks
}
func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
// Based on the random shuffling return back randomized disks.

View File

@ -53,7 +53,7 @@ func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts mad
}
// Heal bucket - create buckets on disks where it does not exist.
func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []string, bucket string, writeQuorum int,
func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, writeQuorum int,
opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
// Initialize sync waitgroup.
@ -114,7 +114,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
for i := range beforeState {
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
Endpoint: storageEndpoints[i].String(),
State: beforeState[i],
})
}
@ -124,7 +124,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
for i := range beforeState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
Endpoint: storageEndpoints[i].String(),
State: madmin.DriveStateOk,
})
}
@ -159,7 +159,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
for i := range afterState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
Endpoint: storageEndpoints[i].String(),
State: afterState[i],
})
}
@ -345,24 +345,24 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
disksToHealCount++
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
Endpoint: storageEndpoints[i].String(),
State: driveState,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
Endpoint: storageEndpoints[i].String(),
State: driveState,
})
continue
}
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
Endpoint: storageEndpoints[i].String(),
State: driveState,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
Endpoint: storageEndpoints[i].String(),
State: driveState,
})
}
@ -609,7 +609,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
// Prepare object creation in all disks
for i, err := range errs {
drive := storageEndpoints[i]
drive := storageEndpoints[i].String()
switch err {
case nil:
hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
@ -650,7 +650,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
// Populates default heal result item entries with possible values when we are returning prematurely.
// This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []string, errs []error, bucket, object, versionID string) madmin.HealResultItem {
func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []Endpoint, errs []error, bucket, object, versionID string) madmin.HealResultItem {
// Initialize heal result object
result := madmin.HealResultItem{
Type: madmin.HealItemObject,
@ -673,12 +673,12 @@ func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageA
if disk == nil {
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[index],
Endpoint: storageEndpoints[index].String(),
State: madmin.DriveStateOffline,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[index],
Endpoint: storageEndpoints[index].String(),
State: madmin.DriveStateOffline,
})
continue
@ -692,12 +692,12 @@ func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageA
}
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[index],
Endpoint: storageEndpoints[index].String(),
State: driveState,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[index],
Endpoint: storageEndpoints[index].String(),
State: driveState,
})
}

View File

@ -257,7 +257,6 @@ func (s *erasureSets) connectDisks() {
s.erasureDisks[setIndex][diskIndex] = disk
}
disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex)
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
setsJustConnected[setIndex] = true
s.erasureDisksMu.Unlock()
}(endpoint)
@ -314,14 +313,14 @@ func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string
}
}
func (s *erasureSets) GetEndpoints(setIndex int) func() []string {
return func() []string {
func (s *erasureSets) GetEndpoints(setIndex int) func() []Endpoint {
return func() []Endpoint {
s.erasureDisksMu.RLock()
defer s.erasureDisksMu.RUnlock()
eps := make([]string, s.setDriveCount)
eps := make([]Endpoint, s.setDriveCount)
for i := 0; i < s.setDriveCount; i++ {
eps[i] = s.endpointStrings[setIndex*s.setDriveCount+i]
eps[i] = s.endpoints[setIndex*s.setDriveCount+i]
}
return eps
}
@ -348,9 +347,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
setDriveCount := len(format.Erasure.Sets[0])
endpointStrings := make([]string, len(endpoints))
// Fill endpointString with the same order of endpoints passed as
// arguments but it will be reordered later according to disks order
for i, endpoint := range endpoints {
endpointStrings[i] = endpoint.String()
}
@ -422,6 +418,10 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
if err != nil {
continue
}
if m != i || n != j {
return nil, fmt.Errorf("found a disk in an unexpected location, pool: %d, found (set=%d, disk=%d) expected (set=%d, disk=%d)",
poolIdx, m, n, i, j)
}
disk.SetDiskLoc(s.poolIndex, m, n)
s.endpointStrings[m*setDriveCount+n] = disk.String()
s.erasureDisks[m][n] = disk
@ -531,10 +531,15 @@ func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjec
object = decodeDirObject(object)
var disksEndpoints []string
for _, endpoint := range set.getEndpoints() {
disksEndpoints = append(disksEndpoints, endpoint.String())
}
op := auditObjectOp{
Pool: set.poolIndex + 1,
Set: set.setIndex + 1,
Disks: set.getEndpoints(),
Disks: disksEndpoints,
}
var objectErasureSetTag *auditObjectErasureMap
@ -1306,7 +1311,6 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
if storageDisks[index] != nil {
storageDisks[index].SetDiskLoc(s.poolIndex, m, n)
s.erasureDisks[m][n] = storageDisks[index]
s.endpointStrings[m*s.setDriveCount+n] = storageDisks[index].String()
}
}

View File

@ -57,7 +57,7 @@ type erasureObjects struct {
// getEndpoints returns list of endpoint strings belonging this set.
// some may be local and some remote.
getEndpoints func() []string
getEndpoints func() []Endpoint
// Locker mutex map.
nsMutex *nsLockMap
@ -164,7 +164,7 @@ func getOnlineOfflineDisksStats(disksInfo []madmin.Disk) (onlineDisks, offlineDi
}
// getDisksInfo - fetch disks info across all other storage API.
func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Disk, errs []error) {
func getDisksInfo(disks []StorageAPI, endpoints []Endpoint) (disksInfo []madmin.Disk, errs []error) {
disksInfo = make([]madmin.Disk, len(disks))
g := errgroup.WithNErrs(len(disks))
@ -175,7 +175,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di
logger.LogIf(GlobalContext, fmt.Errorf("%s: %s", errDiskNotFound, endpoints[index]))
disksInfo[index] = madmin.Disk{
State: diskErrToDriveState(errDiskNotFound),
Endpoint: endpoints[index],
Endpoint: endpoints[index].String(),
}
// Storage disk is empty, perhaps ignored disk or not available.
return errDiskNotFound
@ -222,7 +222,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di
}
// Get an aggregated storage info across all disks.
func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []error) {
func getStorageInfo(disks []StorageAPI, endpoints []Endpoint) (StorageInfo, []error) {
disksInfo, errs := getDisksInfo(disks, endpoints)
// Sort so that the first element is the smallest.
@ -245,14 +245,20 @@ func (er erasureObjects) StorageInfo(ctx context.Context) (StorageInfo, []error)
// LocalStorageInfo - returns underlying local storage statistics.
func (er erasureObjects) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
disks := er.getLocalDisks()
endpoints := make([]string, len(disks))
for i, disk := range disks {
if disk != nil {
endpoints[i] = disk.String()
disks := er.getDisks()
endpoints := er.getEndpoints()
var localDisks []StorageAPI
var localEndpoints []Endpoint
for i, endpoint := range endpoints {
if endpoint.IsLocal {
localDisks = append(localDisks, disks[i])
localEndpoints = append(localEndpoints, endpoint)
}
}
return getStorageInfo(disks, endpoints)
return getStorageInfo(localDisks, localEndpoints)
}
func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, healing bool) {