From ed16ce9b7333fb0cba0c9a2b8aefadd852a30301 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 26 Aug 2021 20:32:58 -0700 Subject: [PATCH] add healing workers support to parallelize healing (#13081) Faster healing as well as making healing more responsive for faster scanner times. also fixes a bug introduced in #13079, newly replaced disks were not healing automatically. --- cmd/admin-heal-ops.go | 20 ++++++++++----- cmd/background-heal-ops.go | 38 +++++++++++++++++------------ cmd/background-newdisks-heal-ops.go | 8 ------ cmd/data-scanner.go | 9 ++----- cmd/global-heal.go | 26 ++++++++++++++------ 5 files changed, 58 insertions(+), 43 deletions(-) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 2fc939034..098dd0b49 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -20,6 +20,7 @@ package cmd import ( "context" "encoding/json" + "errors" "fmt" "net/http" "sort" @@ -691,11 +692,11 @@ func (h *healSequence) logHeal(healType madmin.HealItemType) { func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { // Send heal request task := healTask{ - bucket: source.bucket, - object: source.object, - versionID: source.versionID, - opts: h.settings, - responseCh: h.respCh, + bucket: source.bucket, + object: source.object, + versionID: source.versionID, + opts: h.settings, + respCh: h.respCh, } if source.opts != nil { task.opts = *source.opts @@ -707,11 +708,18 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem h.lastHealActivity = UTCNow() h.mutex.Unlock() - globalBackgroundHealRoutine.queueHealTask(task) + select { + case globalBackgroundHealRoutine.tasks <- task: + case <-h.ctx.Done(): + return nil + } select { case res := <-h.respCh: if !h.reportProgress { + if errors.Is(res.err, errSkipFile) { // this is only sent usually by nopHeal + return nil + } // Object might have been deleted, by the time heal // was attempted, we should ignore this object and // return the error and not calculate this object diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 559c55c81..fe2b3bfa6 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "runtime" "github.com/minio/madmin-go" ) @@ -33,7 +34,7 @@ type healTask struct { versionID string opts madmin.HealOpts // Healing response will be sent here - responseCh chan healResult + respCh chan healResult } // healResult represents a healing result with a possible error @@ -44,13 +45,8 @@ type healResult struct { // healRoutine receives heal tasks, to heal buckets, objects and format.json type healRoutine struct { - tasks chan healTask - doneCh chan struct{} -} - -// Add a new task in the tasks queue -func (h *healRoutine) queueHealTask(task healTask) { - h.tasks <- task + tasks chan healTask + workers int } func systemIO() int { @@ -68,8 +64,18 @@ func waitForLowHTTPReq() { globalHealConfig.Wait(currentIO, systemIO) } +func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { + // Run the background healer + globalBackgroundHealRoutine = newHealRoutine() + for i := 0; i < globalBackgroundHealRoutine.workers; i++ { + go globalBackgroundHealRoutine.AddWorker(ctx, objAPI) + } + + globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence(), objAPI) +} + // Wait for heal requests and process them -func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { +func (h *healRoutine) AddWorker(ctx context.Context, objAPI ObjectLayer) { for { select { case task, ok := <-h.tasks: @@ -81,6 +87,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { var err error switch task.bucket { case nopHeal: + task.respCh <- healResult{err: errSkipFile} continue case SlashSeparator: res, err = healDiskFormat(ctx, objAPI, task.opts) @@ -92,10 +99,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { } } - task.responseCh <- healResult{result: res, err: err} - - case <-h.doneCh: - return + task.respCh <- healResult{result: res, err: err} case <-ctx.Done(): return } @@ -103,9 +107,13 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { } func newHealRoutine() *healRoutine { + workers := runtime.GOMAXPROCS(0) / 2 + if workers == 0 { + workers = 4 + } return &healRoutine{ - tasks: make(chan healTask), - doneCh: make(chan struct{}), + tasks: make(chan healTask), + workers: workers, } } diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 2044df5df..c45d77b5f 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -308,14 +308,6 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) { } -func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { - // Run the background healer - globalBackgroundHealRoutine = newHealRoutine() - go globalBackgroundHealRoutine.run(ctx, objAPI) - - globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence(), objAPI) -} - // monitorLocalDisksAndHeal - ensures that detected new disks are healed // 1. Only the concerned erasure set will be listed and healed // 2. Only the node hosting the disk is responsible to perform the heal diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 406ff709e..d9b140251 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -674,13 +674,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int entry, ok := entries.resolve(&resolver) if !ok { - for _, err := range errs { - if err != nil { - return - } - } - - // If no errors, queue it for healing. + // check if we can get one entry atleast + // proceed to heal nonetheless. entry, _ = entries.firstFound() } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 5fda5dfd6..8877a3ae9 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -234,17 +234,26 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn return } } + fivs, err := entry.fileInfoVersions(bucket.Name) if err != nil { - logger.LogIf(ctx, err) + err := bgSeq.queueHealTask(healSource{ + bucket: bucket.Name, + object: entry.name, + versionID: "", + }, madmin.HealItemObject) + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } return } for _, version := range fivs.Versions { - if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ - ScanMode: scanMode, - Remove: healDeleteDangling, - }); err != nil { + if _, err := er.HealObject(ctx, bucket.Name, version.Name, + version.VersionID, madmin.HealOpts{ + ScanMode: scanMode, + Remove: healDeleteDangling, + }); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // If not deleted, assume they failed. tracker.ItemsFailed++ @@ -283,9 +292,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn agreed: healEntry, partial: func(entries metaCacheEntries, nAgreed int, errs []error) { entry, ok := entries.resolve(&resolver) - if ok { - healEntry(*entry) + if !ok { + // check if we can get one entry atleast + // proceed to heal nonetheless. + entry, _ = entries.firstFound() } + healEntry(*entry) }, finished: nil, })