From c8050bc0792da5db472af23e56c8fbb822f23576 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 27 Apr 2021 08:24:44 -0700 Subject: [PATCH] fix: sleeper behavior in data scanner (#12164) do not apply healReplication() for ILM expired, transitioned objects --- cmd/data-scanner.go | 39 +++++++++++++++++++++++++-------------- cmd/fs-v1.go | 2 +- cmd/xl-storage.go | 15 ++++++++------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 39372092a..b80330010 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -45,8 +45,9 @@ import ( ) const ( - dataScannerSleepPerFolder = time.Millisecond // Time to wait between folders. - dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles. + dataScannerSleepPerFolder = 20 * time.Millisecond // Time to wait between folders. + dataScannerStartDelay = 1 * time.Minute // Time to wait on startup and between cycles. + dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles. healDeleteDangling = true healFolderIncludeProb = 32 // Include a clean folder one in n cycles. @@ -60,7 +61,9 @@ var ( dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) // Sleeper values are updated when config is loaded. scannerSleeper = newDynamicSleeper(10, 10*time.Second) - scannerCycle = &safeDuration{} + scannerCycle = &safeDuration{ + t: dataScannerStartDelay, + } ) // initDataScanner will start the scanner in the background. @@ -450,6 +453,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } if typ&os.ModeDir != 0 { + scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder) + h := hashPath(entName) _, exists := f.oldCache.Cache[h.Key()] cache.addChildString(entName) @@ -562,8 +567,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix) } - // Dynamic time delay. - wait := scannerSleeper.Timer(ctx) resolver.bucket = bucket foundObjs := false @@ -592,9 +595,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo // agreed value less than expected quorum dangling = nAgreed < resolver.objQuorum || nAgreed < resolver.dirQuorum - // Sleep and reset. - wait() - wait = scannerSleeper.Timer(ctx) entry, ok := entries.resolve(&resolver) if !ok { for _, err := range errs { @@ -614,9 +614,14 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo if entry.isDir() { return } + + // wait on timer per object. + wait := scannerSleeper.Timer(ctx) + // We got an entry which we should be able to heal. fiv, err := entry.fileInfoVersions(bucket) if err != nil { + wait() err := bgSeq.queueHealTask(healSource{ bucket: bucket, object: entry.name, @@ -628,10 +633,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo foundObjs = foundObjs || err == nil return } + for _, ver := range fiv.Versions { // Sleep and reset. wait() wait = scannerSleeper.Timer(ctx) + err := bgSeq.queueHealTask(healSource{ bucket: bucket, object: fiv.Name, @@ -662,6 +669,9 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo console.Debugf(healObjectsPrefix+" deleting dangling directory %s\n", prefix) } + // wait on timer per object. + wait := scannerSleeper.Timer(ctx) + objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{ Recursive: true, Remove: healDeleteDangling, @@ -669,7 +679,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo func(bucket, object, versionID string) error { // Wait for each heal as per scanner frequency. wait() - wait = scannerSleeper.Timer(ctx) return bgSeq.queueHealTask(healSource{ bucket: bucket, object: object, @@ -678,8 +687,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo }) } - wait() - // Add unless healing returned an error. if foundObjs { this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv} @@ -936,13 +943,17 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac // The resulting size on disk will always be returned. // The metadata will be compared to consensus on the object layer before any changes are applied. // If no metadata is supplied, -1 is returned if no action is taken. -func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) int64 { +func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta, sizeS *sizeSummary) int64 { applied, size := i.applyLifecycle(ctx, o, meta) // For instance, an applied lifecycle means we remove/transitioned an object // from the current deployment, which means we don't have to call healing // routine even if we are asked to do via heal flag. - if !applied && i.heal { - size = i.applyHealing(ctx, o, meta) + if !applied { + if i.heal { + size = i.applyHealing(ctx, o, meta) + } + // replicate only if lifecycle rules are not applied. + i.healReplication(ctx, o, meta.oi.Clone(), sizeS) } return size } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index ee6f5d45a..c3a98ed5e 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -360,7 +360,7 @@ func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUs } oi := fsMeta.ToObjectInfo(bucket, object, fi) - sz := item.applyActions(ctx, fs, actionMeta{oi: oi}) + sz := item.applyActions(ctx, fs, actionMeta{oi: oi}, &sizeSummary{}) if sz >= 0 { return sizeSummary{totalSize: sz}, nil } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 5e4a2dc12..cd9160cfc 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -394,6 +394,10 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUs // return initialized object layer objAPI := newObjectLayerFn() + // object layer not initialized, return. + if objAPI == nil { + return cache, errServerNotInitialized + } globalHealConfigMu.Lock() healOpts := globalHealConfig @@ -431,13 +435,10 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUs sizeS := sizeSummary{} for _, version := range fivs.Versions { oi := version.ToObjectInfo(item.bucket, item.objectPath()) - if objAPI != nil { - totalSize += item.applyActions(ctx, objAPI, actionMeta{ - oi: oi, - bitRotScan: healOpts.Bitrot, - }) - item.healReplication(ctx, objAPI, oi.Clone(), &sizeS) - } + totalSize += item.applyActions(ctx, objAPI, actionMeta{ + oi: oi, + bitRotScan: healOpts.Bitrot, + }, &sizeS) } sizeS.totalSize = totalSize return sizeS, nil