honor maxWait heal config when maxIO hits (#11338)

This commit is contained in:
Harshavardhana 2021-01-25 07:53:12 -08:00 committed by GitHub
parent 0bf2d84f96
commit 82f0471d1b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 19 deletions

View file

@ -750,16 +750,19 @@ func (h *healSequence) healItemsFromSourceCh() error {
if !ok {
return nil
}
var itemType madmin.HealItemType
switch {
case source.bucket == nopHeal:
switch source.bucket {
case nopHeal:
continue
case source.bucket == SlashSeparator:
case SlashSeparator:
itemType = madmin.HealItemMetadata
case source.bucket != "" && source.object == "":
itemType = madmin.HealItemBucket
default:
itemType = madmin.HealItemObject
if source.object == "" {
itemType = madmin.HealItemBucket
} else {
itemType = madmin.HealItemObject
}
}
if err := h.queueHealTask(source, itemType); err != nil {

View file

@ -61,7 +61,6 @@ func waitForLowHTTPReq(maxIO int, maxWait time.Duration) {
}
// At max 10 attempts to wait with 100 millisecond interval before proceeding
waitCount := 10
waitTick := 100 * time.Millisecond
// Bucket notification and http trace are not costly, it is okay to ignore them
@ -70,14 +69,21 @@ func waitForLowHTTPReq(maxIO int, maxWait time.Duration) {
return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers())
}
tmpMaxWait := maxWait
if httpServer := newHTTPServerFn(); httpServer != nil {
// Any requests in progress, delay the heal.
for httpServer.GetRequestCount() >= maxIOFn() {
time.Sleep(waitTick)
waitCount--
if waitCount == 0 {
if tmpMaxWait > 0 {
if tmpMaxWait < waitTick {
time.Sleep(tmpMaxWait)
} else {
time.Sleep(waitTick)
}
tmpMaxWait = tmpMaxWait - waitTick
}
if tmpMaxWait <= 0 {
if intDataUpdateTracker.debug {
logger.Info("waitForLowHTTPReq: waited %d times, resuming", waitCount)
logger.Info("waitForLowHTTPReq: waited max %s, resuming", maxWait)
}
break
}
@ -91,20 +97,22 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
select {
case task, ok := <-h.tasks:
if !ok {
break
return
}
var res madmin.HealResultItem
var err error
switch {
case task.bucket == nopHeal:
switch task.bucket {
case nopHeal:
continue
case task.bucket == SlashSeparator:
case SlashSeparator:
res, err = healDiskFormat(ctx, objAPI, task.opts)
case task.bucket != "" && task.object == "":
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts)
case task.bucket != "" && task.object != "":
res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts)
default:
if task.object == "" {
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts)
} else {
res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts)
}
}
task.responseCh <- healResult{result: res, err: err}