fix a crash from unstable sort for > 2 pools (#12501)

Fix in https://github.com/minio/minio/pull/12487 assumes that slices with 
tiebreaks are sorted equally. That is only the case for "stable"  sort versions.
This commit is contained in:
Klaus Post 2021-06-14 20:00:13 +02:00 committed by GitHub
parent 31971906ff
commit b89c0beea4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -617,19 +617,22 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
unlockOnDefer = true
}
errs := make([]error, len(z.serverPools))
grs := make([]*GetObjectReader, len(z.serverPools))
lockType = noLock // do not take locks at lower levels
checkPrecondFn := opts.CheckPrecondFn
opts.CheckPrecondFn = nil
results := make([]struct {
zIdx int
gr *GetObjectReader
err error
}, len(z.serverPools))
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
grs[i], errs[i] = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
results[i].zIdx = i
results[i].gr, results[i].err = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}(i, pool)
}
wg.Wait()
@ -638,59 +641,62 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
var (
mtime1 time.Time
mtime2 time.Time
)
if grs[i] != nil {
mtime1 = grs[i].ObjInfo.ModTime
sort.Slice(results, func(i, j int) bool {
var mtimeI, mtimeJ time.Time
if results[i].gr != nil {
mtimeI = results[i].gr.ObjInfo.ModTime
}
if grs[j] != nil {
mtime2 = grs[j].ObjInfo.ModTime
if results[j].gr != nil {
mtimeJ = results[j].gr.ObjInfo.ModTime
}
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(grs, less)
// On tiebreaks, choose the earliest.
if mtimeI.Equal(mtimeJ) {
return results[i].zIdx < results[j].zIdx
}
return mtimeI.After(mtimeJ)
})
var found = -1
for i, err := range errs {
if err == nil {
for i, res := range results {
if res.err == nil {
// Select earliest result
found = i
break
}
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
for _, grr := range grs {
if grr != nil {
grr.Close()
}
if !isErrObjectNotFound(res.err) && !isErrVersionNotFound(res.err) {
for _, res := range results {
res.gr.Close()
}
return gr, err
return nil, res.err
}
}
if found >= 0 {
if checkPrecondFn != nil && checkPrecondFn(grs[found].ObjInfo) {
for _, grr := range grs {
grr.Close()
}
return nil, PreConditionFailed{}
if found < 0 {
object = decodeDirObject(object)
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
for i, grr := range grs {
if i == found {
continue
}
grr.Close()
}
return grs[found], nil
return gr, ObjectNotFound{Bucket: bucket, Object: object}
}
object = decodeDirObject(object)
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
// Check preconditions.
if checkPrecondFn != nil && checkPrecondFn(results[found].gr.ObjInfo) {
for _, res := range results {
res.gr.Close()
}
return nil, PreConditionFailed{}
}
return gr, ObjectNotFound{Bucket: bucket, Object: object}
// Close all others
for i, res := range results {
if i == found {
continue
}
res.gr.Close()
}
return results[found].gr, nil
}
func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
@ -713,15 +719,19 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))
results := make([]struct {
zIdx int
oi ObjectInfo
err error
}, len(z.serverPools))
opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, opts)
results[i].zIdx = i
results[i].oi, results[i].err = pool.GetObjectInfo(ctx, bucket, object, opts)
}(i, pool)
}
wg.Wait()
@ -730,31 +740,27 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
mtime1 := objInfos[i].ModTime
mtime2 := objInfos[j].ModTime
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(objInfos, less)
var found = -1
for i, err := range errs {
sort.Slice(results, func(i, j int) bool {
a, b := results[i], results[j]
if a.oi.ModTime.Equal(b.oi.ModTime) {
// On tiebreak, select the lowest zone index.
return a.zIdx < b.zIdx
}
return a.oi.ModTime.After(b.oi.ModTime)
})
for _, res := range results {
// Return first found.
err := res.err
if err == nil {
found = i
break
return res.oi, nil
}
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// some errors such as MethodNotAllowed for delete marker
// should be returned upwards.
return objInfos[i], err
return res.oi, err
}
}
if found >= 0 {
return objInfos[found], nil
}
object = decodeDirObject(object)
if opts.VersionID != "" {
return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}