fix offline disk calculation (#9801)

Current code was relying on globalEndpoints as
the source of secondary truth to obtain
the missing endpoints list when the disk
is offline, this is problematic

- there is no way to know if the getDisks()
  returned endpoints total is same as the
  ones list of globalEndpoints and it
  belongs to a particular set.
- there is no order guarantee as getDisks()
  is ordered as per format.json, globalEndpoints
  may not be, so potentially end up including
  incorrect endpoints.

To fix this bring getEndpoints() just like getDisks()
to ensure that consistently ordered endpoints are
always available for us to ensure that returned values
are consistent with what each erasure set would observe.
This commit is contained in:
Harshavardhana 2020-06-10 17:10:31 -07:00 committed by GitHub
parent d26b24f670
commit 62b1da3e2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 128 deletions

View file

@ -189,8 +189,8 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) {
func (s *xlSets) connectDisks() {
var wg sync.WaitGroup
diskMap := s.getDiskMap()
for i, endpoint := range s.endpoints {
if isEndpointConnected(diskMap, s.endpointStrings[i]) {
for _, endpoint := range s.endpoints {
if isEndpointConnected(diskMap, endpoint.String()) {
continue
}
wg.Add(1)
@ -214,6 +214,7 @@ func (s *xlSets) connectDisks() {
s.xlDisks[setIndex][diskIndex].Close()
}
s.xlDisks[setIndex][diskIndex] = disk
s.endpointStrings[setIndex*s.drivesPerSet+diskIndex] = disk.String()
s.xlDisksMu.Unlock()
go func(setIndex int) {
// Send a new disk connect event with a timeout
@ -251,6 +252,19 @@ func (s *xlSets) GetLockers(setIndex int) func() []dsync.NetLocker {
}
}
func (s *xlSets) GetEndpoints(setIndex int) func() []string {
return func() []string {
s.xlDisksMu.RLock()
defer s.xlDisksMu.RUnlock()
eps := make([]string, s.drivesPerSet)
for i := 0; i < s.drivesPerSet; i++ {
eps[i] = s.endpointStrings[setIndex*s.drivesPerSet+i]
}
return eps
}
}
// GetDisks returns a closure for a given set, which provides list of disks per set.
func (s *xlSets) GetDisks(setIndex int) func() []StorageAPI {
return func() []StorageAPI {
@ -266,28 +280,20 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs
// Initialize new set of erasure coded sets.
func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatXLV3) (*xlSets, error) {
endpointStrings := make([]string, len(endpoints))
for i, endpoint := range endpoints {
if endpoint.IsLocal {
endpointStrings[i] = endpoint.Path
} else {
endpointStrings[i] = endpoint.String()
}
}
setCount := len(format.XL.Sets)
drivesPerSet := len(format.XL.Sets[0])
endpointStrings := make([]string, len(endpoints))
// Initialize the XL sets instance.
s := &xlSets{
sets: make([]*xlObjects, setCount),
xlDisks: make([][]StorageAPI, setCount),
xlLockers: make([][]dsync.NetLocker, setCount),
endpoints: endpoints,
endpointStrings: endpointStrings,
setCount: setCount,
drivesPerSet: drivesPerSet,
format: format,
endpoints: endpoints,
endpointStrings: endpointStrings,
disksConnectEvent: make(chan diskConnectInfo),
disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.XL.DistributionAlgo,
@ -309,8 +315,9 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA
for i := 0; i < setCount; i++ {
for j := 0; j < drivesPerSet; j++ {
endpoint := endpoints[i*drivesPerSet+j]
// Rely on endpoints list to initialize, init lockers and available disks.
s.xlLockers[i][j] = newLockAPI(s.endpoints[i*drivesPerSet+j])
s.xlLockers[i][j] = newLockAPI(endpoint)
disk := storageDisks[i*drivesPerSet+j]
if disk == nil {
@ -326,16 +333,18 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA
disk.Close()
continue
}
s.endpointStrings[m*drivesPerSet+n] = disk.String()
s.xlDisks[m][n] = disk
}
// Initialize xl objects for a given set.
s.sets[i] = &xlObjects{
getDisks: s.GetDisks(i),
getLockers: s.GetLockers(i),
nsMutex: mutex,
bp: bp,
mrfUploadCh: make(chan partialUpload, 10000),
getDisks: s.GetDisks(i),
getLockers: s.GetLockers(i),
getEndpoints: s.GetEndpoints(i),
nsMutex: mutex,
bp: bp,
mrfUploadCh: make(chan partialUpload, 10000),
}
go s.sets[i].cleanupStaleMultipartUploads(ctx,
@ -457,11 +466,12 @@ func (s *xlSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, []er
for i, set := range s.sets {
storageDisks := set.getDisks()
endpointStrings := set.getEndpoints()
for j, storageErr := range storageInfoErrs[i] {
if storageDisks[j] == OfflineDisk {
storageInfo.Backend.Sets[i][j] = madmin.DriveInfo{
State: madmin.DriveStateOffline,
Endpoint: s.endpointStrings[i*s.drivesPerSet+j],
Endpoint: endpointStrings[j],
}
continue
}
@ -1366,6 +1376,7 @@ func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
s.xlDisks[m][n].Close()
}
s.endpointStrings[m*s.drivesPerSet+n] = disk.String()
s.xlDisks[m][n] = disk
}
s.xlDisksMu.Unlock()
@ -1578,6 +1589,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
s.xlDisks[m][n].Close()
}
s.endpointStrings[m*s.drivesPerSet+n] = disk.String()
s.xlDisks[m][n] = disk
}
s.xlDisksMu.Unlock()
@ -1609,35 +1621,6 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove b
result.After.Drives = append(result.After.Drives, healResult.After.Drives...)
}
for i := range s.endpoints {
var foundBefore bool
for _, v := range result.Before.Drives {
if s.endpointStrings[i] == v.Endpoint {
foundBefore = true
}
}
if !foundBefore {
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: s.endpointStrings[i],
State: madmin.DriveStateOffline,
})
}
var foundAfter bool
for _, v := range result.After.Drives {
if s.endpointStrings[i] == v.Endpoint {
foundAfter = true
}
}
if !foundAfter {
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: s.endpointStrings[i],
State: madmin.DriveStateOffline,
})
}
}
// Check if we had quorum to write, if not return an appropriate error.
_, afterDriveOnline := result.GetOnlineCounts()
if afterDriveOnline < ((s.setCount*s.drivesPerSet)/2)+1 {

View file

@ -47,16 +47,17 @@ func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remov
}
storageDisks := xl.getDisks()
storageEndpoints := xl.getEndpoints()
// get write quorum for an object
writeQuorum := getWriteQuorum(len(storageDisks))
// Heal bucket.
return healBucket(ctx, storageDisks, bucket, writeQuorum, dryRun)
return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, dryRun)
}
// Heal bucket - create buckets on disks where it does not exist.
func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, writeQuorum int,
func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []string, bucket string, writeQuorum int,
dryRun bool) (res madmin.HealResultItem, err error) {
// Initialize sync waitgroup.
@ -118,14 +119,11 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
}
for i := range beforeState {
if storageDisks[i] != nil {
drive := storageDisks[i].String()
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: beforeState[i],
})
}
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
State: beforeState[i],
})
}
// Initialize sync waitgroup.
@ -154,14 +152,11 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
}
for i := range afterState {
if storageDisks[i] != nil {
drive := storageDisks[i].String()
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: afterState[i],
})
}
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i],
State: afterState[i],
})
}
return res, nil
@ -231,6 +226,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
dataBlocks := latestXLMeta.Erasure.DataBlocks
storageDisks := xl.getDisks()
storageEndpoints := xl.getEndpoints()
// List of disks having latest version of the object xl.json
// (by modtime).
@ -281,33 +277,29 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
driveState = madmin.DriveStateCorrupt
}
var drive string
if storageDisks[i] != nil {
drive = storageDisks[i].String()
}
if shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], modTime) {
outDatedDisks[i] = storageDisks[i]
disksToHealCount++
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
Endpoint: storageEndpoints[i],
State: driveState,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
Endpoint: storageEndpoints[i],
State: driveState,
})
continue
}
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
Endpoint: storageEndpoints[i],
State: driveState,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
Endpoint: storageEndpoints[i],
State: driveState,
})
}
@ -324,7 +316,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
if !dryRun && remove {
err = xl.deleteObject(ctx, bucket, object, writeQuorum, false)
}
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), err
return defaultHealResult(latestXLMeta, storageDisks, storageEndpoints, errs, bucket, object), err
}
return result, toObjectErr(errXLReadQuorum, bucket, object)
}
@ -490,6 +482,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
// is needed since we do not have a special backend format for directories.
func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) {
storageDisks := xl.getDisks()
storageEndpoints := xl.getEndpoints()
// Initialize heal result object
hr = madmin.HealResultItem{
@ -515,10 +508,7 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
// Prepare object creation in all disks
for i, err := range errs {
var drive string
if storageDisks[i] != nil {
drive = storageDisks[i].String()
}
drive := storageEndpoints[i]
switch err {
case nil:
hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
@ -558,7 +548,7 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
// Populates default heal result item entries with possible values when we are returning prematurely.
// This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []error, bucket, object string) madmin.HealResultItem {
func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, storageEndpoints []string, errs []error, bucket, object string) madmin.HealResultItem {
// Initialize heal result object
result := madmin.HealResultItem{
Type: madmin.HealItemObject,
@ -577,16 +567,17 @@ func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []
for index, disk := range storageDisks {
if disk == nil {
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateOffline,
UUID: "",
Endpoint: storageEndpoints[index],
State: madmin.DriveStateOffline,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateOffline,
UUID: "",
Endpoint: storageEndpoints[index],
State: madmin.DriveStateOffline,
})
continue
}
drive := disk.String()
driveState := madmin.DriveStateCorrupt
switch errs[index] {
case errFileNotFound, errVolumeNotFound:
@ -594,12 +585,12 @@ func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []
}
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
Endpoint: storageEndpoints[index],
State: driveState,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
Endpoint: storageEndpoints[index],
State: driveState,
})
}
@ -725,6 +716,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts
}
storageDisks := xl.getDisks()
storageEndpoints := xl.getEndpoints()
// Read metadata files from all the disks
partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object)
@ -740,12 +732,12 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts
xl.deleteObject(healCtx, bucket, object, writeQuorum, false)
}
err = reduceReadQuorumErrs(ctx, errs, nil, writeQuorum-1)
return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
return defaultHealResult(xlMetaV1{}, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object)
}
latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs)
if err != nil {
return defaultHealResult(xlMetaV1{}, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
return defaultHealResult(xlMetaV1{}, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object)
}
errCount := 0
@ -769,7 +761,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts
xl.deleteObject(ctx, bucket, object, writeQuorum, false)
}
}
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
return defaultHealResult(latestXLMeta, storageDisks, storageEndpoints, errs, bucket, object), toObjectErr(err, bucket, object)
}
}

View file

@ -59,6 +59,10 @@ type xlObjects struct {
// getLockers returns list of remote and local lockers.
getLockers func() []dsync.NetLocker
// getEndpoints returns list of endpoint strings belonging this set.
// some may be local and some remote.
getEndpoints func() []string
// Locker mutex map.
nsMutex *nsLockMap
@ -90,21 +94,17 @@ func (d byDiskTotal) Less(i, j int) bool {
}
// getDisksInfo - fetch disks info across all other storage API.
func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
disksInfo = make([]DiskInfo, len(disks))
onlineDisks = make(madmin.BackendDisks)
offlineDisks = make(madmin.BackendDisks)
for _, disk := range disks {
if disk == OfflineDisk {
continue
for _, ep := range endpoints {
if _, ok := offlineDisks[ep]; !ok {
offlineDisks[ep] = 0
}
peerAddr := disk.Hostname()
if _, ok := offlineDisks[peerAddr]; !ok {
offlineDisks[peerAddr] = 0
}
if _, ok := onlineDisks[peerAddr]; !ok {
onlineDisks[peerAddr] = 0
if _, ok := onlineDisks[ep]; !ok {
onlineDisks[ep] = 0
}
}
@ -136,32 +136,12 @@ func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []
if disks[i] == OfflineDisk {
continue
}
ep := endpoints[i]
if diskInfoErr != nil {
offlineDisks[disks[i].Hostname()]++
offlineDisks[ep]++
continue
}
onlineDisks[disks[i].Hostname()]++
}
// Iterate over the passed endpoints arguments and check
// if there are still disks missing from the offline/online lists
// and update them accordingly.
missingOfflineDisks := make(map[string]int)
for _, zone := range globalEndpoints {
for _, endpoint := range zone.Endpoints {
// if local is set and endpoint is not local
// we are not interested in remote disks.
if local && !endpoint.IsLocal {
continue
}
if _, ok := offlineDisks[endpoint.Host]; !ok {
missingOfflineDisks[endpoint.Host]++
}
}
}
for missingDisk, n := range missingOfflineDisks {
onlineDisks[missingDisk] = 0
offlineDisks[missingDisk] = n
onlineDisks[ep]++
}
// Success.
@ -169,8 +149,8 @@ func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []
}
// Get an aggregated storage info across all disks.
func getStorageInfo(disks []StorageAPI, local bool) (StorageInfo, []error) {
disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, local)
func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []error) {
disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, endpoints)
// Sort so that the first element is the smallest.
sort.Sort(byDiskTotal(disksInfo))
@ -206,19 +186,23 @@ func getStorageInfo(disks []StorageAPI, local bool) (StorageInfo, []error) {
func (xl xlObjects) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
disks := xl.getDisks()
endpoints := xl.getEndpoints()
if local {
var localDisks []StorageAPI
for _, disk := range disks {
var localEndpoints []string
for i, disk := range disks {
if disk != nil {
if disk.IsLocal() {
// Append this local disk since local flag is true
localDisks = append(localDisks, disk)
localEndpoints = append(localEndpoints, endpoints[i])
}
}
}
disks = localDisks
endpoints = localEndpoints
}
return getStorageInfo(disks, local)
return getStorageInfo(disks, endpoints)
}
// GetMetrics - is not implemented and shouldn't be called.