From a359e36e356796196fc306ef107360938c5520da Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 26 Aug 2020 19:29:35 -0700 Subject: [PATCH] tolerate listing with only readQuorum disks (#10357) We can reduce this further in the future, but this is a good value to keep around. With the advent of continuous healing, we can be assured that namespace will eventually be consistent so we are okay to avoid the necessity to a list across all drives on all sets. Bonus Pop()'s in parallel seem to have the potential to wait too on large drive setups and cause more slowness instead of gaining any performance remove it for now. Also, implement load balanced reply for local disks, ensuring that local disks have an affinity for - cleanupStaleMultipartUploads() --- cmd/background-newdisks-heal-ops.go | 2 +- cmd/config/storageclass/storage-class.go | 18 +-- cmd/config/storageclass/storage-class_test.go | 10 +- cmd/endpoint.go | 2 +- cmd/erasure-bucket.go | 3 +- cmd/erasure-common.go | 11 ++ cmd/erasure-multipart.go | 3 +- cmd/erasure-sets.go | 128 ++++++++---------- cmd/erasure-zones.go | 97 +++++-------- cmd/format-erasure.go | 42 +++--- cmd/format-erasure_test.go | 26 ++-- cmd/global-heal.go | 4 +- cmd/prepare-storage.go | 14 +- cmd/xl-storage.go | 6 +- 14 files changed, 162 insertions(+), 204 deletions(-) diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index e9b341725..ec4d9a1b6 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -183,7 +183,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, drivesToHeal // Heal all erasure sets that need for i, erasureSetToHeal := range erasureSetInZoneToHeal { for _, setIndex := range erasureSetToHeal { - err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].drivesPerSet) + err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount) if err != nil { logger.LogIf(ctx, err) } diff --git a/cmd/config/storageclass/storage-class.go b/cmd/config/storageclass/storage-class.go index 1e3fd5949..e1465f946 100644 --- a/cmd/config/storageclass/storage-class.go +++ b/cmd/config/storageclass/storage-class.go @@ -156,7 +156,7 @@ func parseStorageClass(storageClassEnv string) (sc StorageClass, err error) { } // Validates the parity disks. -func validateParity(ssParity, rrsParity, drivesPerSet int) (err error) { +func validateParity(ssParity, rrsParity, setDriveCount int) (err error) { if ssParity == 0 && rrsParity == 0 { return nil } @@ -174,12 +174,12 @@ func validateParity(ssParity, rrsParity, drivesPerSet int) (err error) { return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minParityDisks) } - if ssParity > drivesPerSet/2 { - return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, drivesPerSet/2) + if ssParity > setDriveCount/2 { + return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, setDriveCount/2) } - if rrsParity > drivesPerSet/2 { - return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, drivesPerSet/2) + if rrsParity > setDriveCount/2 { + return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, setDriveCount/2) } if ssParity > 0 && rrsParity > 0 { @@ -220,9 +220,9 @@ func Enabled(kvs config.KVS) bool { } // LookupConfig - lookup storage class config and override with valid environment settings if any. -func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) { +func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) { cfg = Config{} - cfg.Standard.Parity = drivesPerSet / 2 + cfg.Standard.Parity = setDriveCount / 2 cfg.RRS.Parity = defaultRRSParity if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil { @@ -239,7 +239,7 @@ func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) { } } if cfg.Standard.Parity == 0 { - cfg.Standard.Parity = drivesPerSet / 2 + cfg.Standard.Parity = setDriveCount / 2 } if rrsc != "" { @@ -254,7 +254,7 @@ func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) { // Validation is done after parsing both the storage classes. This is needed because we need one // storage class value to deduce the correct value of the other storage class. - if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, drivesPerSet); err != nil { + if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, setDriveCount); err != nil { return Config{}, err } diff --git a/cmd/config/storageclass/storage-class_test.go b/cmd/config/storageclass/storage-class_test.go index da47c33ec..f67c561ee 100644 --- a/cmd/config/storageclass/storage-class_test.go +++ b/cmd/config/storageclass/storage-class_test.go @@ -69,10 +69,10 @@ func TestParseStorageClass(t *testing.T) { func TestValidateParity(t *testing.T) { tests := []struct { - rrsParity int - ssParity int - success bool - drivesPerSet int + rrsParity int + ssParity int + success bool + setDriveCount int }{ {2, 4, true, 16}, {3, 3, true, 16}, @@ -85,7 +85,7 @@ func TestValidateParity(t *testing.T) { {9, 2, false, 16}, } for i, tt := range tests { - err := validateParity(tt.ssParity, tt.rrsParity, tt.drivesPerSet) + err := validateParity(tt.ssParity, tt.rrsParity, tt.setDriveCount) if err != nil && tt.success { t.Errorf("Test %d, Expected success, got %s", i+1, err) } diff --git a/cmd/endpoint.go b/cmd/endpoint.go index c174ce8ee..b9ecd1aa1 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -193,7 +193,7 @@ func NewEndpoint(arg string) (ep Endpoint, e error) { } // ZoneEndpoints represent endpoints in a given zone -// along with its setCount and drivesPerSet. +// along with its setCount and setDriveCount. type ZoneEndpoints struct { SetCount int DrivesPerSet int diff --git a/cmd/erasure-bucket.go b/cmd/erasure-bucket.go index 3c7f93515..696c38dac 100644 --- a/cmd/erasure-bucket.go +++ b/cmd/erasure-bucket.go @@ -133,8 +133,7 @@ func (er erasureObjects) listBuckets(ctx context.Context) (bucketsInfo []BucketI if err == nil { // NOTE: The assumption here is that volumes across all disks in // readQuorum have consistent view i.e they all have same number - // of buckets. This is essentially not verified since healing - // should take care of this. + // of buckets. var bucketsInfo []BucketInfo for _, volInfo := range volsInfo { if isReservedOrInvalidBucket(volInfo.Name, true) { diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 48218f1d4..639770e48 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -23,6 +23,17 @@ import ( "github.com/minio/minio/pkg/sync/errgroup" ) +func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { + disks := er.getDisks() + // Based on the random shuffling return back randomized disks. + for _, i := range hashOrder(UTCNow().String(), len(disks)) { + if disks[i-1] != nil && disks[i-1].IsLocal() { + newDisks = append(newDisks, disks[i-1]) + } + } + return newDisks +} + // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) { disks := er.getDisks() diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 6bdede785..09bb6f3c5 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -81,7 +81,8 @@ func (er erasureObjects) cleanupStaleMultipartUploads(ctx context.Context, clean return case <-ticker.C: var disk StorageAPI - for _, d := range er.getLoadBalancedDisks() { + // run multiple cleanup's local to this server. + for _, d := range er.getLoadBalancedLocalDisks() { if d != nil { disk = d break diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 45f726217..1be10ace5 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -75,7 +75,8 @@ type erasureSets struct { endpointStrings []string // Total number of sets and the number of disks per set. - setCount, drivesPerSet int + setCount, setDriveCount int + listTolerancePerSet int disksConnectEvent chan diskConnectInfo @@ -112,7 +113,7 @@ func (s *erasureSets) getDiskMap() map[string]StorageAPI { defer s.erasureDisksMu.RUnlock() for i := 0; i < s.setCount; i++ { - for j := 0; j < s.drivesPerSet; j++ { + for j := 0; j < s.setDriveCount; j++ { disk := s.erasureDisks[i][j] if disk == nil { continue @@ -228,7 +229,7 @@ func (s *erasureSets) connectDisks() { s.erasureDisks[setIndex][diskIndex].Close() } s.erasureDisks[setIndex][diskIndex] = disk - s.endpointStrings[setIndex*s.drivesPerSet+diskIndex] = disk.String() + s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String() s.erasureDisksMu.Unlock() go func(setIndex int) { // Send a new disk connect event with a timeout @@ -260,7 +261,7 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt func (s *erasureSets) GetLockers(setIndex int) func() []dsync.NetLocker { return func() []dsync.NetLocker { - lockers := make([]dsync.NetLocker, s.drivesPerSet) + lockers := make([]dsync.NetLocker, s.setDriveCount) copy(lockers, s.erasureLockers[setIndex]) return lockers } @@ -271,9 +272,9 @@ func (s *erasureSets) GetEndpoints(setIndex int) func() []string { s.erasureDisksMu.RLock() defer s.erasureDisksMu.RUnlock() - eps := make([]string, s.drivesPerSet) - for i := 0; i < s.drivesPerSet; i++ { - eps[i] = s.endpointStrings[setIndex*s.drivesPerSet+i] + eps := make([]string, s.setDriveCount) + for i := 0; i < s.setDriveCount; i++ { + eps[i] = s.endpointStrings[setIndex*s.setDriveCount+i] } return eps } @@ -284,7 +285,7 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI { return func() []StorageAPI { s.erasureDisksMu.RLock() defer s.erasureDisksMu.RUnlock() - disks := make([]StorageAPI, s.drivesPerSet) + disks := make([]StorageAPI, s.setDriveCount) copy(disks, s.erasureDisks[setIndex]) return disks } @@ -295,46 +296,47 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs // Initialize new set of erasure coded sets. func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3) (*erasureSets, error) { setCount := len(format.Erasure.Sets) - drivesPerSet := len(format.Erasure.Sets[0]) + setDriveCount := len(format.Erasure.Sets[0]) endpointStrings := make([]string, len(endpoints)) // Initialize the erasure sets instance. s := &erasureSets{ - sets: make([]*erasureObjects, setCount), - erasureDisks: make([][]StorageAPI, setCount), - erasureLockers: make([][]dsync.NetLocker, setCount), - endpoints: endpoints, - endpointStrings: endpointStrings, - setCount: setCount, - drivesPerSet: drivesPerSet, - format: format, - disksConnectEvent: make(chan diskConnectInfo), - disksConnectDoneCh: make(chan struct{}), - distributionAlgo: format.Erasure.DistributionAlgo, - deploymentID: uuid.MustParse(format.ID), - pool: NewMergeWalkPool(globalMergeLookupTimeout), - poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout), - poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout), - mrfOperations: make(map[healSource]int), + sets: make([]*erasureObjects, setCount), + erasureDisks: make([][]StorageAPI, setCount), + erasureLockers: make([][]dsync.NetLocker, setCount), + endpoints: endpoints, + endpointStrings: endpointStrings, + setCount: setCount, + setDriveCount: setDriveCount, + listTolerancePerSet: setDriveCount / 2, + format: format, + disksConnectEvent: make(chan diskConnectInfo), + disksConnectDoneCh: make(chan struct{}), + distributionAlgo: format.Erasure.DistributionAlgo, + deploymentID: uuid.MustParse(format.ID), + pool: NewMergeWalkPool(globalMergeLookupTimeout), + poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout), + poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout), + mrfOperations: make(map[healSource]int), } mutex := newNSLock(globalIsDistErasure) // Initialize byte pool once for all sets, bpool size is set to - // setCount * drivesPerSet with each memory upto blockSizeV1. - bp := bpool.NewBytePoolCap(setCount*drivesPerSet, blockSizeV1, blockSizeV1*2) + // setCount * setDriveCount with each memory upto blockSizeV1. + bp := bpool.NewBytePoolCap(setCount*setDriveCount, blockSizeV1, blockSizeV1*2) for i := 0; i < setCount; i++ { - s.erasureDisks[i] = make([]StorageAPI, drivesPerSet) - s.erasureLockers[i] = make([]dsync.NetLocker, drivesPerSet) + s.erasureDisks[i] = make([]StorageAPI, setDriveCount) + s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount) } for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { - endpoint := endpoints[i*drivesPerSet+j] + for j := 0; j < setDriveCount; j++ { + endpoint := endpoints[i*setDriveCount+j] // Rely on endpoints list to initialize, init lockers and available disks. s.erasureLockers[i][j] = newLockAPI(endpoint) - disk := storageDisks[i*drivesPerSet+j] + disk := storageDisks[i*setDriveCount+j] if disk == nil { continue } @@ -348,7 +350,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto disk.Close() continue } - s.endpointStrings[m*drivesPerSet+n] = disk.String() + s.endpointStrings[m*setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } @@ -384,7 +386,7 @@ func (s *erasureSets) NewNSLock(ctx context.Context, bucket string, objects ...s // SetDriveCount returns the current drives per set. func (s *erasureSets) SetDriveCount() int { - return s.drivesPerSet + return s.setDriveCount } // StorageUsageInfo - combines output of StorageInfo across all erasure coded object sets. @@ -458,13 +460,13 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) if scParity == 0 { - scParity = s.drivesPerSet / 2 + scParity = s.setDriveCount / 2 } - storageInfo.Backend.StandardSCData = s.drivesPerSet - scParity + storageInfo.Backend.StandardSCData = s.setDriveCount - scParity storageInfo.Backend.StandardSCParity = scParity rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS) - storageInfo.Backend.RRSCData = s.drivesPerSet - rrSCParity + storageInfo.Backend.RRSCData = s.setDriveCount - rrSCParity storageInfo.Backend.RRSCParity = rrSCParity if local { @@ -838,17 +840,9 @@ func (f *FileInfoCh) Push(fi FileInfo) { // if the caller wishes to list N entries to call lexicallySortedEntry // N times until this boolean is 'false'. func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) { - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - entries[j], entriesValid[j] = entryChs[j].Pop() - }() + entries[j], entriesValid[j] = entryChs[j].Pop() } - wg.Wait() var isTruncated = false for _, valid := range entriesValid { @@ -910,17 +904,9 @@ func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesVali // if the caller wishes to list N entries to call lexicallySortedEntry // N times until this boolean is 'false'. func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) { - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - entries[j], entriesValid[j] = entryChs[j].Pop() - }() + entries[j], entriesValid[j] = entryChs[j].Pop() } - wg.Wait() var isTruncated = false for _, valid := range entriesValid { @@ -1232,7 +1218,7 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) }(storageDisks) formats, _ := loadFormatErasureAll(storageDisks, false) - if err = checkFormatErasureValues(formats, s.drivesPerSet); err != nil { + if err = checkFormatErasureValues(formats, s.setDriveCount); err != nil { return err } @@ -1272,7 +1258,7 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) s.erasureDisks[m][n].Close() } - s.endpointStrings[m*s.drivesPerSet+n] = disk.String() + s.endpointStrings[m*s.setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } s.erasureDisksMu.Unlock() @@ -1354,7 +1340,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H }(storageDisks) formats, sErrs := loadFormatErasureAll(storageDisks, true) - if err = checkFormatErasureValues(formats, s.drivesPerSet); err != nil { + if err = checkFormatErasureValues(formats, s.setDriveCount); err != nil { return madmin.HealResultItem{}, err } @@ -1365,7 +1351,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H res = madmin.HealResultItem{ Type: madmin.HealItemMetadata, Detail: "disk-format", - DiskCount: s.setCount * s.drivesPerSet, + DiskCount: s.setCount * s.setDriveCount, SetCount: s.setCount, } @@ -1396,7 +1382,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H markUUIDsOffline(refFormat, formats) // Initialize a new set of set formats which will be written to disk. - newFormatSets := newHealFormatSets(refFormat, s.setCount, s.drivesPerSet, formats, sErrs) + newFormatSets := newHealFormatSets(refFormat, s.setCount, s.setDriveCount, formats, sErrs) // Look for all offline/unformatted disks in our reference format, // such that we can fill them up with new UUIDs, this looping also @@ -1413,7 +1399,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H newFormatSets[i][l].Erasure.This = mustGetUUID() refFormat.Erasure.Sets[i][j] = newFormatSets[i][l].Erasure.This for m, v := range res.After.Drives { - if v.Endpoint == s.endpoints.GetString(i*s.drivesPerSet+l) { + if v.Endpoint == s.endpoints.GetString(i*s.setDriveCount+l) { res.After.Drives[m].UUID = newFormatSets[i][l].Erasure.This res.After.Drives[m].State = madmin.DriveStateOk } @@ -1426,14 +1412,14 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H } if !dryRun { - var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.drivesPerSet) + var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.setDriveCount) for i := range newFormatSets { for j := range newFormatSets[i] { if newFormatSets[i][j] == nil { continue } - tmpNewFormats[i*s.drivesPerSet+j] = newFormatSets[i][j] - tmpNewFormats[i*s.drivesPerSet+j].Erasure.Sets = refFormat.Erasure.Sets + tmpNewFormats[i*s.setDriveCount+j] = newFormatSets[i][j] + tmpNewFormats[i*s.setDriveCount+j].Erasure.Sets = refFormat.Erasure.Sets } } @@ -1478,7 +1464,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H s.erasureDisks[m][n].Close() } - s.endpointStrings[m*s.drivesPerSet+n] = disk.String() + s.endpointStrings[m*s.setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } s.erasureDisksMu.Unlock() @@ -1496,7 +1482,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem result = madmin.HealResultItem{ Type: madmin.HealItemBucket, Bucket: bucket, - DiskCount: s.setCount * s.drivesPerSet, + DiskCount: s.setCount * s.setDriveCount, SetCount: s.setCount, } @@ -1512,7 +1498,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem // Check if we had quorum to write, if not return an appropriate error. _, afterDriveOnline := result.GetOnlineCounts() - if afterDriveOnline < ((s.setCount*s.drivesPerSet)/2)+1 { + if afterDriveOnline < ((s.setCount*s.setDriveCount)/2)+1 { return result, toObjectErr(errErasureWriteQuorum, bucket) } @@ -1568,7 +1554,7 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c return } - if quorumCount >= s.drivesPerSet/2 { + if quorumCount >= s.setDriveCount/2 { // Read quorum exists proceed for _, version := range entry.Versions { results <- version.ToObjectInfo(bucket, version.Name) @@ -1595,7 +1581,7 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c return } - if quorumCount >= s.drivesPerSet/2 { + if quorumCount >= s.setDriveCount/2 { // Read quorum exists proceed results <- entry.ToObjectInfo(bucket, entry.Name) } @@ -1622,14 +1608,14 @@ func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, op break } - if quorumCount == s.drivesPerSet && opts.ScanMode == madmin.HealNormalScan { + if quorumCount == s.setDriveCount && opts.ScanMode == madmin.HealNormalScan { // Skip good entries. continue } for _, version := range entry.Versions { // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(s.drivesPerSet), time.Second) + waitForLowHTTPReq(int32(s.setDriveCount), time.Second) if err := healObject(bucket, version.Name, version.VersionID); err != nil { return toObjectErr(err, bucket, version.Name) diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 9ffe3fea1..b4adadfdf 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -690,15 +690,15 @@ func (z *erasureZones) ListObjectsV2(ctx context.Context, bucket, prefix, contin func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { var zonesEntryChs [][]FileInfoCh - var zonesDrivesPerSet []int + var zonesListTolerancePerSet []int endWalkCh := make(chan struct{}) defer close(endWalkCh) for _, zone := range z.zones { zonesEntryChs = append(zonesEntryChs, - zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.drivesPerSet)) - zonesDrivesPerSet = append(zonesDrivesPerSet, zone.drivesPerSet) + zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet)) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } var objInfos []ObjectInfo @@ -723,7 +723,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, break } - if quorumCount < zonesDrivesPerSet[zoneIndex]/2 { + if quorumCount < zonesListTolerancePerSet[zoneIndex] { // Skip entries which are not found on upto ndisks/2. continue } @@ -810,20 +810,20 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma var zonesEntryChs [][]FileInfoCh var zonesEndWalkCh []chan struct{} - var drivesPerSets []int + var zonesListTolerancePerSet []int for _, zone := range z.zones { entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.drivesPerSet) + entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.listTolerancePerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - drivesPerSets = append(drivesPerSets, zone.drivesPerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } - entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets) + entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) if len(entries.Files) == 0 { return loi, nil } @@ -902,20 +902,20 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, var zonesEntryChs [][]FileInfoCh var zonesEndWalkCh []chan struct{} - var drivesPerSets []int + var zonesListTolerancePerSet []int for _, zone := range z.zones { entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet) + entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - drivesPerSets = append(drivesPerSets, zone.drivesPerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } - entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets) + entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) if len(entries.Files) == 0 { return loi, nil } @@ -951,18 +951,9 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, // N times until this boolean is 'false'. func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1040,18 +1031,9 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI // N times until this boolean is 'false'. func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1119,7 +1101,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE } // mergeZonesEntriesVersionsCh - merges FileInfoVersions channel to entries upto maxKeys. -func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, drivesPerSets []int) (entries FilesInfoVersions) { +func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, zonesListTolerancePerSet []int) (entries FilesInfoVersions) { var i = 0 var zonesEntriesInfos [][]FileInfoVersions var zonesEntriesValid [][]bool @@ -1134,8 +1116,8 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i break } - if quorumCount < drivesPerSets[zoneIndex]/2 { - // Skip entries which are not found on upto ndisks/2. + if quorumCount < zonesListTolerancePerSet[zoneIndex] { + // Skip entries which are not found upto the expected tolerance continue } @@ -1150,7 +1132,7 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i } // mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys. -func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSets []int) (entries FilesInfo) { +func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, zonesListTolerancePerSet []int) (entries FilesInfo) { var i = 0 var zonesEntriesInfos [][]FileInfo var zonesEntriesValid [][]bool @@ -1165,8 +1147,8 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSet break } - if quorumCount < drivesPerSets[zoneIndex]/2 { - // Skip entries which are not found on upto ndisks/2. + if quorumCount < zonesListTolerancePerSet[zoneIndex] { + // Skip entries which are not found upto configured tolerance. continue } @@ -1182,18 +1164,9 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSet func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1214,24 +1187,16 @@ func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zon zoneEntryChs[i][j].Push(zoneEntries[i][j]) } } + } return isTruncated } func isTruncatedZonesVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) bool { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1307,19 +1272,19 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m var zonesEntryChs [][]FileInfoVersionsCh var zonesEndWalkCh []chan struct{} - var drivesPerSets []int + var zonesListTolerancePerSet []int for _, zone := range z.zones { entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet) + entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - drivesPerSets = append(drivesPerSets, zone.drivesPerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } - entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, drivesPerSets) + entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) if len(entries.FilesVersions) == 0 { return loi, nil } @@ -1830,7 +1795,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results var zoneDrivesPerSet []int for _, zone := range z.zones { - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) } var zonesEntriesInfos [][]FileInfoVersions @@ -1871,7 +1836,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results var zoneDrivesPerSet []int for _, zone := range z.zones { - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) } var zonesEntriesInfos [][]FileInfo @@ -1918,7 +1883,7 @@ func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, o var zoneDrivesPerSet []int for _, zone := range z.zones { - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) } var zonesEntriesInfos [][]FileInfoVersions @@ -2082,7 +2047,7 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes for zoneIdx := range erasureSetUpCount { parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD) - diskCount := z.zones[zoneIdx].drivesPerSet + diskCount := z.zones[zoneIdx].setDriveCount if parityDrives == 0 { parityDrives = getDefaultParityBlocks(diskCount) } diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 8b57aacd7..bccebb4ef 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -439,7 +439,7 @@ func checkFormatErasureValue(formatErasure *formatErasureV3) error { } // Check all format values. -func checkFormatErasureValues(formats []*formatErasureV3, drivesPerSet int) error { +func checkFormatErasureValues(formats []*formatErasureV3, setDriveCount int) error { for i, formatErasure := range formats { if formatErasure == nil { continue @@ -454,8 +454,8 @@ func checkFormatErasureValues(formats []*formatErasureV3, drivesPerSet int) erro // Only if custom erasure drive count is set, // we should fail here other proceed to honor what // is present on the disk. - if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != drivesPerSet { - return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), drivesPerSet) + if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != setDriveCount { + return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), setDriveCount) } } return nil @@ -788,22 +788,22 @@ func fixFormatErasureV3(storageDisks []StorageAPI, endpoints Endpoints, formats } // initFormatErasure - save Erasure format configuration on all disks. -func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, drivesPerSet int, deploymentID string, sErrs []error) (*formatErasureV3, error) { - format := newFormatErasureV3(setCount, drivesPerSet) +func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, setDriveCount int, deploymentID string, sErrs []error) (*formatErasureV3, error) { + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, len(storageDisks)) - wantAtMost := ecDrivesNoConfig(drivesPerSet) + wantAtMost := ecDrivesNoConfig(setDriveCount) for i := 0; i < setCount; i++ { - hostCount := make(map[string]int, drivesPerSet) - for j := 0; j < drivesPerSet; j++ { - disk := storageDisks[i*drivesPerSet+j] + hostCount := make(map[string]int, setDriveCount) + for j := 0; j < setDriveCount; j++ { + disk := storageDisks[i*setDriveCount+j] newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] if deploymentID != "" { newFormat.ID = deploymentID } hostCount[disk.Hostname()]++ - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } if len(hostCount) > 0 { var once sync.Once @@ -817,8 +817,8 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, return } logger.Info(" * Set %v:", i+1) - for j := 0; j < drivesPerSet; j++ { - disk := storageDisks[i*drivesPerSet+j] + for j := 0; j < setDriveCount; j++ { + disk := storageDisks[i*setDriveCount+j] logger.Info(" - Drive: %s", disk.String()) } }) @@ -842,15 +842,15 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, // ecDrivesNoConfig returns the erasure coded drives in a set if no config has been set. // It will attempt to read it from env variable and fall back to drives/2. -func ecDrivesNoConfig(drivesPerSet int) int { +func ecDrivesNoConfig(setDriveCount int) int { ecDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD) if ecDrives == 0 { - cfg, err := storageclass.LookupConfig(nil, drivesPerSet) + cfg, err := storageclass.LookupConfig(nil, setDriveCount) if err == nil { ecDrives = cfg.Standard.Parity } if ecDrives == 0 { - ecDrives = drivesPerSet / 2 + ecDrives = setDriveCount / 2 } } return ecDrives @@ -920,14 +920,14 @@ func markUUIDsOffline(refFormat *formatErasureV3, formats []*formatErasureV3) { } // Initialize a new set of set formats which will be written to all disks. -func newHealFormatSets(refFormat *formatErasureV3, setCount, drivesPerSet int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 { +func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 { newFormats := make([][]*formatErasureV3, setCount) for i := range refFormat.Erasure.Sets { - newFormats[i] = make([]*formatErasureV3, drivesPerSet) + newFormats[i] = make([]*formatErasureV3, setDriveCount) } for i := range refFormat.Erasure.Sets { for j := range refFormat.Erasure.Sets[i] { - if errs[i*drivesPerSet+j] == errUnformattedDisk || errs[i*drivesPerSet+j] == nil { + if errs[i*setDriveCount+j] == errUnformattedDisk || errs[i*setDriveCount+j] == nil { newFormats[i][j] = &formatErasureV3{} newFormats[i][j].Version = refFormat.Version newFormats[i][j].ID = refFormat.ID @@ -935,13 +935,13 @@ func newHealFormatSets(refFormat *formatErasureV3, setCount, drivesPerSet int, f newFormats[i][j].Erasure.Version = refFormat.Erasure.Version newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo } - if errs[i*drivesPerSet+j] == errUnformattedDisk { + if errs[i*setDriveCount+j] == errUnformattedDisk { newFormats[i][j].Erasure.This = "" newFormats[i][j].Erasure.Sets = nil continue } - if errs[i*drivesPerSet+j] == nil { - newFormats[i][j].Erasure.This = formats[i*drivesPerSet+j].Erasure.This + if errs[i*setDriveCount+j] == nil { + newFormats[i][j].Erasure.This = formats[i*setDriveCount+j].Erasure.This newFormats[i][j].Erasure.Sets = nil } } diff --git a/cmd/format-erasure_test.go b/cmd/format-erasure_test.go index e64863aef..2d723b64b 100644 --- a/cmd/format-erasure_test.go +++ b/cmd/format-erasure_test.go @@ -324,16 +324,16 @@ func TestCheckFormatErasureValue(t *testing.T) { // Tests getFormatErasureInQuorum() func TestGetFormatErasureInQuorumCheck(t *testing.T) { setCount := 2 - drivesPerSet := 16 + setDriveCount := 16 - format := newFormatErasureV3(setCount, drivesPerSet) + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, 32) for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { + for j := 0; j < setDriveCount; j++ { newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } } @@ -390,16 +390,16 @@ func TestGetFormatErasureInQuorumCheck(t *testing.T) { // Tests formatErasureGetDeploymentID() func TestGetErasureID(t *testing.T) { setCount := 2 - drivesPerSet := 8 + setDriveCount := 8 - format := newFormatErasureV3(setCount, drivesPerSet) + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, 16) for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { + for j := 0; j < setDriveCount; j++ { newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } } @@ -445,17 +445,17 @@ func TestGetErasureID(t *testing.T) { // Initialize new format sets. func TestNewFormatSets(t *testing.T) { setCount := 2 - drivesPerSet := 16 + setDriveCount := 16 - format := newFormatErasureV3(setCount, drivesPerSet) + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, 32) errs := make([]error, 32) for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { + for j := 0; j < setDriveCount; j++ { newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } } @@ -467,7 +467,7 @@ func TestNewFormatSets(t *testing.T) { // 16th disk is unformatted. errs[15] = errUnformattedDisk - newFormats := newHealFormatSets(quorumFormat, setCount, drivesPerSet, formats, errs) + newFormats := newHealFormatSets(quorumFormat, setCount, setDriveCount, formats, errs) if newFormats == nil { t.Fatal("Unexpected failure") } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 8144e5fa3..7909c2e7a 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -89,7 +89,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, drivesPerSet int) error { +func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, setDriveCount int) error { buckets, err := xlObj.ListBuckets(ctx) if err != nil { return err @@ -151,7 +151,7 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, dr break } - if quorumCount == drivesPerSet { + if quorumCount == setDriveCount { // Skip good entries. continue } diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 80e6c5e0d..13f5ba8be 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -230,7 +230,7 @@ func IsServerResolvable(endpoint Endpoint) error { // connect to list of endpoints and load all Erasure disk formats, validate the formats are correct // and are in quorum, if no formats are found attempt to initialize all of them for the first // time. additionally make sure to close all the disks used in this attempt. -func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, zoneCount, setCount, drivesPerSet int, deploymentID string) (storageDisks []StorageAPI, format *formatErasureV3, err error) { +func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, zoneCount, setCount, setDriveCount int, deploymentID string) (storageDisks []StorageAPI, format *formatErasureV3, err error) { // Initialize all storage disks storageDisks, errs := initStorageDisksWithErrors(endpoints) @@ -268,17 +268,17 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, // most part unless one of the formats is not consistent // with expected Erasure format. For example if a user is // trying to pool FS backend into an Erasure set. - if err = checkFormatErasureValues(formatConfigs, drivesPerSet); err != nil { + if err = checkFormatErasureValues(formatConfigs, setDriveCount); err != nil { return nil, nil, err } // All disks report unformatted we should initialized everyone. if shouldInitErasureDisks(sErrs) && firstDisk { logger.Info("Formatting %s zone, %v set(s), %v drives per set.", - humanize.Ordinal(zoneCount), setCount, drivesPerSet) + humanize.Ordinal(zoneCount), setCount, setDriveCount) // Initialize erasure code format on disks - format, err = initFormatErasure(GlobalContext, storageDisks, setCount, drivesPerSet, deploymentID, sErrs) + format, err = initFormatErasure(GlobalContext, storageDisks, setCount, setDriveCount, deploymentID, sErrs) if err != nil { return nil, nil, err } @@ -347,8 +347,8 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, } // Format disks before initialization of object layer. -func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCount, drivesPerSet int, deploymentID string) ([]StorageAPI, *formatErasureV3, error) { - if len(endpoints) == 0 || setCount == 0 || drivesPerSet == 0 { +func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCount, setDriveCount int, deploymentID string) ([]StorageAPI, *formatErasureV3, error) { + if len(endpoints) == 0 || setCount == 0 || setDriveCount == 0 { return nil, nil, errInvalidArgument } @@ -374,7 +374,7 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCou for { select { case <-ticker.C: - storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, zoneCount, setCount, drivesPerSet, deploymentID) + storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, zoneCount, setCount, setDriveCount, deploymentID) if err != nil { tries++ switch err { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 1d40346a8..9565be82a 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -842,11 +842,7 @@ func (s *xlStorage) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan } walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, true, listDir, s.isLeafSplunk, s.isLeafDir, endWalkCh) - for { - walkResult, ok := <-walkResultCh - if !ok { - return - } + for walkResult := range walkResultCh { var fi FileInfo if HasSuffix(walkResult.entry, SlashSeparator) { fi = FileInfo{