diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 65c9752b0..ef04c0b28 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -20,12 +20,14 @@ import ( "context" "encoding/json" "fmt" + "runtime" "strings" "sync" "time" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/pkg/sync/errgroup" ) // healStatusSummary - overall short summary of a healing sequence @@ -622,17 +624,40 @@ func (h *healSequence) healBucket(bucket string) error { return nil } + entries := runtime.NumCPU() * globalEndpoints.Nodes() + marker := "" isTruncated := true for isTruncated { + if globalHTTPServer != nil { + // Wait at max 1 minute for an inprogress request + // before proceeding to heal + waitCount := 60 + // Any requests in progress, delay the heal. + for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 { + waitCount-- + time.Sleep(1 * time.Second) + } + } + + // Heal numCPU * nodes objects at a time. objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, bucket, - h.objPrefix, marker, "", 1000) + h.objPrefix, marker, "", entries) if err != nil { return errFnHealFromAPIErr(err) } - for _, o := range objectInfos.Objects { - if err := h.healObject(o.Bucket, o.Name); err != nil { + g := errgroup.WithNErrs(len(objectInfos.Objects)) + for index := range objectInfos.Objects { + index := index + g.Go(func() error { + o := objectInfos.Objects[index] + return h.healObject(o.Bucket, o.Name) + }, index) + } + + for _, err := range g.Wait() { + if err != nil { return err } } diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 0e14d612e..17f75a45d 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -168,6 +168,18 @@ func NewEndpoint(arg string) (ep Endpoint, e error) { // EndpointList - list of same type of endpoint. type EndpointList []Endpoint +// Nodes - returns number of unique servers. +func (endpoints EndpointList) Nodes() int { + uniqueNodes := set.NewStringSet() + for _, endpoint := range endpoints { + if uniqueNodes.Contains(endpoint.Host) { + continue + } + uniqueNodes.Add(endpoint.Host) + } + return len(uniqueNodes) +} + // IsHTTPS - returns true if secure for URLEndpointType. func (endpoints EndpointList) IsHTTPS() bool { return endpoints[0].IsHTTPS()