Compare commits

...

3 Commits

Author SHA1 Message Date
Harshavardhana e301b98669 commit the binary to hotfix branch 2021-08-17 20:07:12 -07:00
Klaus Post 49e62ce384 Fix listPathRaw/WalkDir cancelation (#11905)
In #11888 we observe a lot of running, WalkDir calls.

There doesn't appear to be any listerners for these calls, so they should be aborted.

Ensure that WalkDir aborts when upstream cancels the request.

Fixes #11888
2021-08-17 20:06:15 -07:00
Harshavardhana 8d6650214c improve performance by skipping prefixes properly 2021-08-17 17:11:38 -07:00
4 changed files with 30 additions and 9 deletions

View File

@ -429,3 +429,13 @@ func getTLSConfig() (x509Certs []*x509.Certificate, manager *certs.Manager, secu
secureConn = true
return x509Certs, manager, secureConn, nil
}
// contextCanceled returns whether a context is canceled.
func contextCanceled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

View File

@ -811,9 +811,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
if len(disks) == 0 {
return fmt.Errorf("listPathRaw: 0 drives provided")
}
// Disconnect from call above, but cancel on exit.
ctx, cancel := context.WithCancel(GlobalContext)
// Cancel upstream if we finish before we expect.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
askDisks := len(disks)
@ -825,6 +824,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
if err != nil {
return err
}
// Make sure we close the pipe so blocked writes doesn't stay around.
defer r.CloseWithError(context.Canceled)
// Send request to each disk.
go func() {
werr := d.WalkDir(ctx, WalkDirOptions{
@ -836,7 +837,10 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
ForwardTo: opts.forwardTo,
}, w)
w.CloseWithError(werr)
if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() {
if werr != io.EOF && werr != nil &&
werr.Error() != errFileNotFound.Error() &&
werr.Error() != errVolumeNotFound.Error() &&
!errors.Is(werr, context.Canceled) {
logger.LogIf(ctx, werr)
}
}()

View File

@ -113,6 +113,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
forward := opts.ForwardTo
var scanDir func(path string) error
scanDir = func(current string) error {
if len(current) > 0 && !strings.HasPrefix(current, prefix) {
return nil
}
if contextCanceled(ctx) {
return ctx.Err()
}
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
if err != nil {
// Folder could have gone away in-between
@ -127,9 +133,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
}
dirObjects := make(map[string]struct{})
for i, entry := range entries {
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
continue
}
if len(forward) > 0 && entry < forward {
continue
}
@ -148,6 +151,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Do do not retain the file.
entries[i] = ""
if contextCanceled(ctx) {
return ctx.Err()
}
// If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry
@ -183,12 +189,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Process in sort order.
sort.Strings(entries)
dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level.
for _, entry := range entries {
if entry == "" {
continue
}
if contextCanceled(ctx) {
return ctx.Err()
}
meta := metaCacheEntry{name: PathJoin(current, entry)}
// If directory entry on stack before this, pop it now.

BIN
minio.gz Executable file

Binary file not shown.