heal: Optimize heal listing by avoiding batches (#8901)

Also limit the heal per object if there is incoming
requests by suspending heal for longer periods of time.
This commit is contained in:
Harshavardhana 2020-01-29 12:05:44 +05:30 committed by GitHub
parent 5bd0e95eef
commit f98616dce7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 78 additions and 56 deletions

View file

@ -775,15 +775,15 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error {
// healObject - heal the given object and record result
func (h *healSequence) healObject(bucket, object string) error {
if h.isQuitting() {
return errHealStopSignalled
}
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return errServerNotInitialized
}
if h.isQuitting() {
return errHealStopSignalled
}
return h.queueHealTask(pathJoin(bucket, object), madmin.HealItemObject)
}

View file

@ -1254,7 +1254,7 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo
}
// HealObjects - no-op for fs. Valid only for XL.
func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) {
func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
logger.LogIf(ctx, NotImplemented{})
return NotImplemented{}
}
@ -1265,12 +1265,6 @@ func (fs *FSObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error)
return []BucketInfo{}, NotImplemented{}
}
// ListObjectsHeal - list all objects to be healed. Valid only for XL
func (fs *FSObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
logger.LogIf(ctx, NotImplemented{})
return ListObjectsInfo{}, NotImplemented{}
}
// GetMetrics - no op
func (fs *FSObjects) GetMetrics(ctx context.Context) (*Metrics, error) {
logger.LogIf(ctx, NotImplemented{})

View file

@ -148,11 +148,6 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck
return nil, NotImplemented{}
}
// ListObjectsHeal - Not implemented stub
func (a GatewayUnsupported) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
return ListObjectsInfo{}, NotImplemented{}
}
// HealObject - Not implemented stub
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (h madmin.HealResultItem, e error) {
return h, NotImplemented{}
@ -164,7 +159,7 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c
}
// HealObjects - Not implemented stub
func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) {
func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
return NotImplemented{}
}

View file

@ -99,10 +99,9 @@ type ObjectLayer interface {
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error)
HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error
HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
// Policy operations
SetBucketPolicy(context.Context, string, *policy.Policy) error

View file

@ -1626,46 +1626,45 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
// HealObjects - Heal all objects recursively at a specified prefix, any
// dangling objects deleted as well automatically.
func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error {
func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error {
endWalkCh := make(chan struct{})
defer close(endWalkCh)
marker := ""
recursive := true
entryChs := s.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh)
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfo, len(entryChs))
for {
entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
if !ok {
break
}
if quorumCount == s.drivesPerSet {
// Skip good entries.
continue
}
if httpServer := newHTTPServerFn(); httpServer != nil {
// Wait at max 10 minute for an inprogress request before proceeding to heal
waitCount := 600
// Any requests in progress, delay the heal.
for (httpServer.GetRequestCount() >= int32(s.setCount*s.drivesPerSet)) &&
for (httpServer.GetRequestCount() >= int32(s.drivesPerSet)) &&
waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
res, err := s.ListObjectsHeal(ctx, bucket, prefix, marker, "", maxObjectList)
if err != nil {
continue
if err := healObject(bucket, entry.Name); err != nil {
return toObjectErr(err, bucket, entry.Name)
}
for _, obj := range res.Objects {
if err = healObjectFn(bucket, obj.Name); err != nil {
return toObjectErr(err, bucket, obj.Name)
}
}
if !res.IsTruncated {
break
}
marker = res.NextMarker
}
return nil
}
func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, true)
}
// PutObjectTag - replace or add tags to an existing object
func (s *xlSets) PutObjectTag(ctx context.Context, bucket, object string, tags string) error {
return s.getHashedSet(object).PutObjectTag(ctx, bucket, object, tags)

View file

@ -23,12 +23,7 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
return nil, nil
}
// This is not implemented, look for xl-sets.ListObjectsHeal()
func (xl xlObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
return ListObjectsInfo{}, nil
}
// This is not implemented/needed anymore, look for xl-sets.HealObjects()
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, healFn func(string, string) error) (e error) {
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error {
return nil
}

View file

@ -24,6 +24,7 @@ import (
"net/http"
"strings"
"sync"
"time"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
@ -1309,19 +1310,58 @@ func (z *xlZones) HealBucket(ctx context.Context, bucket string, dryRun, remove
return r, nil
}
func (z *xlZones) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
if z.SingleZone() {
return z.zones[0].ListObjectsHeal(ctx, bucket, prefix, marker, delimiter, maxKeys)
}
return z.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, true)
}
type healObjectFn func(string, string) error
func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error {
func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error {
var zonesEntryChs [][]FileInfoCh
recursive := true
for _, zone := range z.zones {
if err := zone.HealObjects(ctx, bucket, prefix, healObjectFn); err != nil {
return err
endWalkCh := make(chan struct{})
defer close(endWalkCh)
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh))
}
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
}
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
for _, entryChs := range zonesEntryChs {
zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs)))
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
for {
entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
break
}
if quorumCount == zoneDrivesPerSet[zoneIndex] {
// Skip good entries.
continue
}
if httpServer := newHTTPServerFn(); httpServer != nil {
// Wait at max 10 minute for an inprogress request before proceeding to heal
waitCount := 600
// Any requests in progress, delay the heal.
for (httpServer.GetRequestCount() >= int32(zoneDrivesPerSet[zoneIndex])) &&
waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
if err := healObject(bucket, entry.Name); err != nil {
return toObjectErr(err, bucket, entry.Name)
}
}
return nil
}