Heal recursively all entries in config/ prefix (#6545)

This to ensure that we heal all entries in config/
prefix, we will have IAM and STS related files which
are being introduced in #6168 PR

This is a change to ensure that we heal all of them
properly, not just `config.json`
This commit is contained in:
Harshavardhana 2018-10-01 09:54:26 -07:00 committed by Nitish Tiwari
parent 839a758a36
commit b4772849f9
2 changed files with 60 additions and 23 deletions

View file

@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"fmt"
"path"
"runtime"
"strings"
"sync"
@ -386,7 +385,6 @@ func (h *healSequence) stop() {
// sequence automatically resumes. The return value indicates if the
// operation succeeded.
func (h *healSequence) pushHealResultItem(r madmin.HealResultItem) error {
// start a timer to keep an upper time limit to find an empty
// slot to add the given heal result - if no slot is found it
// means that the server is holding the maximum amount of
@ -543,24 +541,64 @@ func (h *healSequence) healConfig() error {
return errServerNotInitialized
}
configFile := path.Join(minioConfigPrefix, minioConfigFile)
configBackupFile := path.Join(minioConfigPrefix, minioConfigBackupFile)
for _, cfg := range []string{configFile, configBackupFile} {
res, err := objectAPI.HealObject(h.ctx, minioMetaBucket, cfg, h.settings.DryRun)
// NOTE: Healing on configs is run regardless
// of any bucket being selected, this is to ensure that
// configs are always uptodate and correct.
marker := ""
isTruncated := true
for isTruncated {
if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
// Lists all objects under `config` prefix.
objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, minioMetaBucket, minioConfigPrefix,
marker, "", 1000)
if err != nil {
return err
return errFnHealFromAPIErr(err)
}
res.Type = madmin.HealItemBucketMetadata
if err = h.pushHealResultItem(res); err != nil {
return err
for index := range objectInfos.Objects {
if h.isQuitting() {
return errHealStopSignalled
}
o := objectInfos.Objects[index]
res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun)
// Object might have been deleted, by the time heal
// was attempted we ignore this file an move on.
if isErrObjectNotFound(herr) {
continue
}
if herr != nil {
return herr
}
res.Type = madmin.HealItemBucketMetadata
if err = h.pushHealResultItem(res); err != nil {
return err
}
}
isTruncated = objectInfos.IsTruncated
marker = objectInfos.NextMarker
}
return nil
}
// healDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func (h *healSequence) healDiskFormat() error {
if h.isQuitting() {
return errHealStopSignalled
}
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
@ -586,6 +624,10 @@ func (h *healSequence) healDiskFormat() error {
// healBuckets - check for all buckets heal or just particular bucket.
func (h *healSequence) healBuckets() error {
if h.isQuitting() {
return errHealStopSignalled
}
// 1. If a bucket was specified, heal only the bucket.
if h.bucket != "" {
return h.healBucket(h.bucket)
@ -613,10 +655,6 @@ func (h *healSequence) healBuckets() error {
// healBucket - traverses and heals given bucket
func (h *healSequence) healBucket(bucket string) error {
if h.isQuitting() {
return errHealStopSignalled
}
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
@ -641,8 +679,7 @@ func (h *healSequence) healBucket(bucket string) error {
// and if so heal it.
_, err = objectAPI.GetObjectInfo(h.ctx, bucket, h.objPrefix, ObjectOptions{})
if err == nil {
err = h.healObject(bucket, h.objPrefix)
if err != nil {
if err = h.healObject(bucket, h.objPrefix); err != nil {
return err
}
}
@ -708,6 +745,9 @@ func (h *healSequence) healObject(bucket, object string) error {
}
hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun)
if isErrObjectNotFound(err) {
return nil
}
if err != nil {
hri.Detail = err.Error()
}

View file

@ -205,6 +205,10 @@ func healBucketMetadata(xl xlObjects, bucket string, dryRun bool) (
reqInfo := &logger.ReqInfo{BucketName: bucket}
ctx := logger.SetReqInfo(context.Background(), reqInfo)
result, healErr := xl.HealObject(ctx, minioMetaBucket, metaPath, dryRun)
// If object is not found, skip the file.
if isErrObjectNotFound(healErr) {
return nil
}
if healErr != nil {
return healErr
}
@ -604,13 +608,6 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
// and later the disk comes back up again, heal on the object
// should delete it.
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) {
defer func() {
// If object is not found, ignore the error.
if isErrObjectNotFound(err) {
err = nil
}
}()
// Create context that also contains information about the object and bucket.
// The top level handler might not have this information.
reqInfo := logger.GetReqInfo(ctx)