diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index c8af4aa97..61abb6438 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -443,7 +443,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin } objInfo = fi.ToObjectInfo(bucket, object) - if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { + if opts.VersionID != "" && !fi.VersionPurgeStatus().Empty() { // Make sure to return object info to provide extra information. return objInfo, toObjectErr(errMethodNotAllowed, bucket, object) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index d69f591a8..e5f9d8c71 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -339,6 +339,22 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{}) } +func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, object string, size int64) (idx int, err error) { + idx, err = z.getPoolIdxExistingNoLock(ctx, bucket, object) + if err != nil && !isErrObjectNotFound(err) { + return idx, err + } + + if isErrObjectNotFound(err) { + idx = z.getAvailablePoolIdx(ctx, bucket, object, size) + if idx < 0 { + return -1, toObjectErr(errDiskFull) + } + } + + return idx, nil +} + // getPoolIdx returns the found previous object and its corresponding pool idx, // if none are found falls back to most available space pool. func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { @@ -791,7 +807,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec return z.serverPools[0].PutObject(ctx, bucket, object, data, opts) } if !opts.NoLock { - ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck")) + ns := z.NewNSLock(bucket, object) lkctx, err := ns.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err @@ -801,7 +817,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec opts.NoLock = true } - idx, err := z.getPoolIdx(ctx, bucket, object, data.Size()) + idx, err := z.getPoolIdxNoLock(ctx, bucket, object, data.Size()) if err != nil { return ObjectInfo{}, err } @@ -939,7 +955,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) if !dstOpts.NoLock { - ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(dstBucket, dstObject, "newMultipartObject.lck")) + ns := z.NewNSLock(dstBucket, dstObject) lkctx, err := ns.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err @@ -949,7 +965,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec dstOpts.NoLock = true } - poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, srcInfo.Size) + poolIdx, err := z.getPoolIdxNoLock(ctx, dstBucket, dstObject, srcInfo.Size) if err != nil { return objInfo, err } @@ -982,6 +998,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec Versioned: dstOpts.Versioned, VersionID: dstOpts.VersionID, MTime: dstOpts.MTime, + NoLock: true, } return z.serverPools[poolIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) @@ -1162,14 +1179,6 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) } - ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck")) - lkctx, err := ns.GetLock(ctx, globalOperationTimeout) - if err != nil { - return "", err - } - ctx = lkctx.Context() - defer ns.Unlock(lkctx.Cancel) - for idx, pool := range z.serverPools { result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) if err != nil { @@ -1183,6 +1192,8 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } } + // any parallel writes on the object will block for this poolIdx + // to return since this holds a read lock on the namespace. idx, err := z.getPoolIdx(ctx, bucket, object, -1) if err != nil { return "", err diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 4defc9bcb..78ab0187d 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -1492,9 +1492,9 @@ func readXLMetaNoData(r io.Reader, size int64) ([]byte, error) { buf = append(buf, make([]byte, extra)...) _, err := io.ReadFull(r, buf[has:]) if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { // Returned if we read nothing. - return io.ErrUnexpectedEOF + return fmt.Errorf("readXLMetaNoData.readMore: %w", io.ErrUnexpectedEOF) } return fmt.Errorf("readXLMetaNoData.readMore: %w", err) } diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index fb3ff5764..1ab507846 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -162,8 +162,6 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i // Create lock array to capture the successful lockers locks := make([]string, len(restClnts)) - log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) - // Add total timeout ctx, cancel := context.WithTimeout(ctx, opts.Timeout) defer cancel() @@ -183,6 +181,8 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i } } + log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v, quorum: %d, tolerance: %d, lockClients: %d\n", id, source, dm.Names, isReadLock, opts, quorum, tolerance, len(restClnts)) + tolerance = len(restClnts) - quorum for { @@ -259,15 +259,16 @@ func forceUnlock(ctx context.Context, ds *Dsync, id string) { restClnts, _ := ds.GetLockers() + args := LockArgs{ + UID: id, + } + var wg sync.WaitGroup for index, c := range restClnts { wg.Add(1) // Send refresh request to all nodes go func(index int, c NetLocker) { defer wg.Done() - args := LockArgs{ - UID: id, - } c.ForceUnlock(ctx, args) }(index, c) } @@ -286,6 +287,10 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo ch := make(chan refreshResult, len(restClnts)) var wg sync.WaitGroup + args := LockArgs{ + UID: id, + } + for index, c := range restClnts { wg.Add(1) // Send refresh request to all nodes @@ -297,10 +302,6 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo return } - args := LockArgs{ - UID: id, - } - ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout) defer cancel() @@ -382,6 +383,14 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is ch := make(chan Granted, len(restClnts)) var wg sync.WaitGroup + args := LockArgs{ + Owner: owner, + UID: id, + Resources: lockNames, + Source: source, + Quorum: quorum, + } + // Combined timeout for the lock attempt. ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout) defer cancel() @@ -398,14 +407,6 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is return } - args := LockArgs{ - Owner: owner, - UID: id, - Resources: lockNames, - Source: source, - Quorum: quorum, - } - var locked bool var err error if isReadLock { @@ -465,9 +466,10 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance if !quorumLocked { - log("Releasing all acquired locks now abandoned after quorum was not met\n") + log("dsync: Unable to acquire lock in quorum %#v\n", args) + // Release all acquired locks without quorum. if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) { - log("Unable to release acquired locks, stale locks might be present\n") + log("Unable to release acquired locks, these locks will expire automatically %#v\n", args) } }