xl walk: Limit walk concurrent IO (#12885)

We are observing heavy system loads, potentially
locking the system up for periods when concurrent
listing operations are performed.

We place a per-disk lock on walk IO operations.
This will minimize the impact of concurrent listing
operations on the entire system and de-prioritize
them compared to other operations.

Single list operations should remain largely unaffected.
This commit is contained in:
Klaus Post 2021-08-19 03:10:36 +02:00 committed by Harshavardhana
parent f4c4e5fc33
commit ba04da8bf2
2 changed files with 13 additions and 0 deletions

View File

@ -111,7 +111,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if contextCanceled(ctx) {
return ctx.Err()
}
s.walkMu.Lock()
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
s.walkMu.Unlock()
if err != nil {
// Folder could have gone away in-between
if err != errVolumeNotFound && err != errFileNotFound {
@ -152,7 +154,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry
s.walkMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry))
s.walkMu.Unlock()
if err != nil {
logger.LogIf(ctx, err)
continue
@ -167,7 +171,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Check legacy.
if HasSuffix(entry, xlStorageFormatFileV1) {
var meta metaCacheEntry
s.walkMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry))
s.walkMu.Unlock()
if err != nil {
logger.LogIf(ctx, err)
continue
@ -217,7 +223,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash
}
s.walkMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFile))
s.walkMu.Unlock()
switch {
case err == nil:
// It was an object
@ -226,7 +234,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
}
out <- meta
case osIsNotExist(err):
s.walkMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
s.walkMu.Unlock()
if err == nil {
// Maybe rename? Would make it inconsistent across disks though.
// os.Rename(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1), pathJoin(volumeDir, meta.name, xlStorageFormatFile))

View File

@ -118,6 +118,9 @@ type xlStorage struct {
ctx context.Context
sync.RWMutex
// mutex to prevent concurrent read operations overloading walks.
walkMu sync.Mutex
}
// checkPathLength - returns error if given path name length more than 255