Avoid synchronizing usage writes (#11560)

If the periodic `case <-t.C:` save gets held up for a long time it will end up
synchronize all disk writes for saving the caches.

We add jitter to per set writes so they don't sync up and don't hold a
lock for the write, since it isn't needed anyway.

If an outage prevents writes for a long while we also add individual
waits for each disk in case there was a queue.

Furthermore limit the number of buffers kept to 2GiB, since this could get
huge in large clusters. This will not act as a hard limit but should be enough
for normal operation.
This commit is contained in:
Klaus Post 2021-02-18 00:38:37 -08:00 committed by Harshavardhana
parent 21718705b8
commit 5f41f6043d
2 changed files with 13 additions and 2 deletions

View file

@ -375,9 +375,14 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
mutex := newNSLock(globalIsDistErasure)
// Number of buffers, max 2GB.
n := setCount * setDriveCount
if n > 100 {
n = 100
}
// Initialize byte pool once for all sets, bpool size is set to
// setCount * setDriveCount with each memory upto blockSizeV1.
bp := bpool.NewBytePoolCap(setCount*setDriveCount, blockSizeV1, blockSizeV1*2)
bp := bpool.NewBytePoolCap(n, blockSizeV1, blockSizeV1*2)
for i := 0; i < setCount; i++ {
s.erasureDisks[i] = make([]StorageAPI, setDriveCount)

View file

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"sync"
"time"
@ -294,7 +295,8 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
var saverWg sync.WaitGroup
saverWg.Add(1)
go func() {
const updateTime = 30 * time.Second
// Add jitter to the update time so multiple sets don't sync up.
var updateTime = 30*time.Second + time.Duration(float64(10*time.Second)*rand.Float64())
t := time.NewTicker(updateTime)
defer t.Stop()
defer saverWg.Done()
@ -377,11 +379,15 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
if r := cache.root(); r != nil {
root = cache.flatten(*r)
}
t := time.Now()
bucketResults <- dataUsageEntryInfo{
Name: cache.Info.Name,
Parent: dataUsageRoot,
Entry: root,
}
// We want to avoid synchronizing up all writes in case
// the results are piled up.
time.Sleep(time.Duration(float64(time.Since(t)) * rand.Float64()))
// Save cache
logger.LogIf(ctx, cache.save(ctx, er, cacheName))
}