From 5df7bbf9f9b03e82754e9a187b99279da12608b9 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 4 Aug 2021 22:14:16 +0100 Subject: [PATCH] [10-28.sets branch] Use refresh locking mechanism in locks (#12862) * locking: Add Refresh for better locking cleanup * locking: Add timeout in unlock calls --- cmd/admin-handlers.go | 5 +- cmd/data-crawler.go | 4 +- cmd/disk-cache-backend.go | 30 ++-- cmd/erasure-multipart.go | 102 +++++++------ cmd/erasure-object.go | 46 +++--- cmd/erasure-server-sets.go | 24 ++-- cmd/fs-v1-multipart.go | 6 +- cmd/fs-v1.go | 55 +++++--- cmd/iam.go | 11 +- cmd/local-locker.go | 82 ++++++----- cmd/lock-rest-client.go | 20 +-- cmd/lock-rest-server-common.go | 4 +- cmd/lock-rest-server-common_test.go | 20 +-- cmd/lock-rest-server.go | 212 +++++++--------------------- cmd/namespace-lock.go | 100 +++++++++---- cmd/server-main.go | 7 +- cmd/xl-storage-disk-id-check.go | 144 +++++++++++++++++++ pkg/dsync/drwmutex.go | 189 ++++++++++++++++++++++--- pkg/dsync/drwmutex_test.go | 21 +-- pkg/dsync/dsync-server_test.go | 16 +++ pkg/dsync/dsync_test.go | 55 +++++++- pkg/dsync/rpc-client-impl_test.go | 6 +- pkg/dsync/rpc-client-interface.go | 8 +- 23 files changed, 769 insertions(+), 398 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 17bb48cca..7e2bdae7e 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1495,11 +1495,12 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request) defer cancel() nsLock := objectAPI.NewNSLock(minioMetaBucket, "obd-in-progress") - if err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock + lkctx, err := nsLock.GetLock(ctx, newDynamicTimeout(deadline, deadline)) + if err != nil { // returns a locked lock errResp(err) return } - defer nsLock.Unlock() + defer nsLock.Unlock(lkctx.Cancel) go func() { defer close(obdInfoCh) diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index f4548dc6f..3a6b9b493 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -197,11 +197,13 @@ func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { locker := objAPI.NewNSLock(minioMetaBucket, "runDataCrawler.lock") r := rand.New(rand.NewSource(time.Now().UnixNano())) for { - err := locker.GetLock(ctx, dataCrawlerLeaderLockTimeout) + lkctx, err := locker.GetLock(ctx, dataCrawlerLeaderLockTimeout) if err != nil { time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay))) continue } + ctx = lkctx.Context() + defer lkctx.Cancel() break // No unlock for "leader" lock. } diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 10c750e07..6bb2e892a 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -420,11 +420,12 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) { cLock := c.NewNSLockFn(cacheObjPath) - if err = cLock.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return } - - defer cLock.RUnlock() + ctx = lkctx.Context() + defer cLock.RUnlock(lkctx.Cancel) return c.statCache(ctx, cacheObjPath) } @@ -502,10 +503,12 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { cachedPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachedPath) - if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) + if err != nil { return err } - defer cLock.Unlock() + ctx = lkctx.Context() + defer cLock.Unlock(lkctx.Cancel) return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly) } @@ -667,10 +670,12 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read } cachePath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachePath) - if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) + if err != nil { return err } - defer cLock.Unlock() + ctx = lkctx.Context() + defer cLock.Unlock(lkctx.Cancel) meta, _, numHits, err := c.statCache(ctx, cachePath) // Case where object not yet cached @@ -867,11 +872,13 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) { cacheObjPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cacheObjPath) - if err := cLock.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return nil, numHits, err } + ctx = lkctx.Context() + defer cLock.RUnlock(lkctx.Cancel) - defer cLock.RUnlock() var objInfo ObjectInfo var rngInfo RangeInfo if objInfo, rngInfo, numHits, err = c.statRange(ctx, bucket, object, rs); err != nil { @@ -931,10 +938,11 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // Deletes the cached object func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) { cLock := c.NewNSLockFn(cacheObjPath) - if err := cLock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) + if err != nil { return err } - defer cLock.Unlock() + defer cLock.Unlock(lkctx.Cancel) return removeAll(cacheObjPath) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index c0b949b45..b4b13ad91 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -359,18 +359,22 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec // Implements S3 compatible Upload Part API. func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) - if err = partIDLock.GetLock(ctx, globalOperationTimeout); err != nil { + plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) + if err != nil { return PartInfo{}, err } - defer partIDLock.Unlock() + pctx := plkctx.Context() + defer partIDLock.Unlock(plkctx.Cancel) uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - if err = uploadIDRLock.GetRLock(ctx, globalOperationTimeout); err != nil { + rlkctx, err := uploadIDRLock.GetRLock(pctx, globalOperationTimeout) + if err != nil { return PartInfo{}, err } + rctx := rlkctx.Context() defer func() { if uploadIDRLock != nil { - uploadIDRLock.RUnlock() + uploadIDRLock.RUnlock(rlkctx.Cancel) } }() @@ -386,25 +390,25 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Validates if upload ID exists. - if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } // Read metadata associated with the object from all disks. - partsMetadata, errs = readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, + partsMetadata, errs = readAllFileInfo(rctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "") // Unlock upload id locks before, so others can get it. - uploadIDRLock.RUnlock() + uploadIDRLock.RUnlock(rlkctx.Cancel) uploadIDRLock = nil // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) + _, writeQuorum, err := objectQuorumFromMeta(pctx, er, partsMetadata, errs) if err != nil { return pi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } @@ -413,7 +417,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo onlineDisks, modTime := listOnlineDisks(er.getDisks(), partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, writeQuorum) if err != nil { return pi, err } @@ -430,7 +434,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete. defer er.deleteObject(context.Background(), minioMetaTmpBucket, tmpPart, writeQuorum) - erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) + erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -459,7 +463,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) } - n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum) + n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum) closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) @@ -479,26 +483,28 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Acquire write lock to update metadata. uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - if err = uploadIDWLock.GetLock(ctx, globalOperationTimeout); err != nil { + wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout) + if err != nil { return PartInfo{}, err } - defer uploadIDWLock.Unlock() + wctx := wlkctx.Context() + defer uploadIDWLock.Unlock(wlkctx.Cancel) // Validates if upload ID exists. - if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } // Rename temporary part file to its final location. partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix) - onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil) + onlineDisks, err = rename(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Read metadata again because it might be updated with parallel upload of another part. - partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "") - reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "") + reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } @@ -507,7 +513,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum) if err != nil { return pi, err } @@ -535,7 +541,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // Writes update `xl.meta` format for each disk. - if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { + if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } @@ -561,10 +567,12 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u } uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return MultipartInfo{}, err } - defer uploadIDLock.RUnlock() + ctx = lkctx.Context() + defer uploadIDLock.RUnlock(lkctx.Cancel) if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) @@ -607,12 +615,14 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u // Implements S3 compatible ListObjectParts API. The resulting // ListPartsInfo structure is marshaled directly into XML and // replied back to the client. -func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, e error) { +func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - if err := uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return ListPartsInfo{}, err } - defer uploadIDLock.RUnlock() + ctx = lkctx.Context() + defer uploadIDLock.RUnlock(lkctx.Cancel) if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) @@ -702,18 +712,20 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Hold read-locks to verify uploaded parts, also disallows // parallel part uploads as well. uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) - if err = uploadIDLock.GetRLock(ctx, globalOperationTimeout); err != nil { + rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return oi, err } - defer uploadIDLock.RUnlock() + rctx := rlkctx.Context() + defer uploadIDLock.RUnlock(rlkctx.Cancel) - if err = er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { return oi, toObjectErr(err, bucket, object, uploadID) } // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). - if opts.ParentIsObject != nil && opts.ParentIsObject(ctx, bucket, path.Dir(object)) { + if opts.ParentIsObject != nil && opts.ParentIsObject(rctx, bucket, path.Dir(object)) { return oi, toObjectErr(errFileParentIsFile, bucket, object) } @@ -727,15 +739,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) + _, writeQuorum, err := objectQuorumFromMeta(rctx, er, partsMetadata, errs) if err != nil { return oi, toObjectErr(err, bucket, object) } - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return oi, toObjectErr(reducedErr, bucket, object) } @@ -749,7 +761,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str var objectActualSize int64 // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum) if err != nil { return oi, err } @@ -835,6 +847,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str partsMetadata[index].Parts = fi.Parts } + // Hold namespace to complete the transaction + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return oi, err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + // Write final `xl.meta` at uploadID location if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) @@ -853,13 +874,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } } - // Hold namespace to complete the transaction - lk := er.NewNSLock(bucket, object) - if err = lk.GetLock(ctx, globalOperationTimeout); err != nil { - return oi, err - } - defer lk.Unlock() - // Rename the multipart object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, fi.DataDir, bucket, object, writeQuorum, nil); err != nil { @@ -895,12 +909,14 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // All parts are purged from all disks and reference to the uploadID // would be removed from the system, rollback is not possible on this // operation. -func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { +func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) { lk := er.NewNSLock(bucket, pathJoin(object, uploadID)) - if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { return err } - defer lk.Unlock() + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) // Validates if upload ID exists. if err := er.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index e3f8fad22..b54fc73c4 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -41,7 +41,7 @@ var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnform // CopyObject - copy object source object to destination object. // if source object and destination object are same we only // update metadata. -func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) { +func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) { // This call shouldn't be used for anything other than metadata updates or adding self referential versions. if !srcInfo.metadataOnly { return oi, NotImplemented{} @@ -49,10 +49,12 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d defer ObjectPathUpdated(path.Join(dstBucket, dstObject)) lk := er.NewNSLock(dstBucket, dstObject) - if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { return oi, err } - defer lk.Unlock() + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) // Read metadata associated with the object from all disks. storageDisks := er.getDisks() @@ -138,15 +140,19 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri lock := er.NewNSLock(bucket, object) switch lockType { case writeLock: - if err = lock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lock.GetLock(ctx, globalOperationTimeout) + if err != nil { return nil, err } - nsUnlocker = lock.Unlock + ctx = lkctx.Context() + nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } case readLock: - if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return nil, err } - nsUnlocker = lock.RUnlock + ctx = lkctx.Context() + nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } } unlockOnDefer = true } @@ -194,13 +200,15 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. -func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { +func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) { // Lock the object before reading. lk := er.NewNSLock(bucket, object) - if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) + if err != nil { return err } - defer lk.RUnlock() + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) // Start offset cannot be negative. if startOffset < 0 { @@ -345,10 +353,12 @@ func (er erasureObjects) getObject(ctx context.Context, bucket, object string, s func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { // Lock the object before reading. lk := er.NewNSLock(bucket, object) - if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) + if err != nil { return ObjectInfo{}, err } - defer lk.RUnlock() + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) return er.getObjectInfo(ctx, bucket, object, opts) } @@ -651,10 +661,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } lk := er.NewNSLock(bucket, object) - if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { return ObjectInfo{}, err } - defer lk.Unlock() + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) for i, w := range writers { if w == nil { @@ -921,10 +933,12 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string // Acquire a write lock before deleting the object. lk := er.NewNSLock(bucket, object) - if err = lk.GetLock(ctx, globalDeleteOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) + if err != nil { return ObjectInfo{}, err } - defer lk.Unlock() + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) storageDisks := er.getDisks() writeQuorum := len(storageDisks)/2 + 1 diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index 1d9ea7f24..feafae2f8 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -571,13 +571,15 @@ func (z *erasureServerSets) DeleteObjects(ctx context.Context, bucket string, ob // Acquire a bulk write lock across 'objects' multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...) - if err := multiDeleteLock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout) + if err != nil { for i := range derrs { derrs[i] = err } return nil, derrs } - defer multiDeleteLock.Unlock() + ctx = lkctx.Context() + defer multiDeleteLock.Unlock(lkctx.Cancel) for _, zone := range z.serverSets { deletedObjects, errs := zone.DeleteObjects(ctx, bucket, objects, opts) @@ -1687,10 +1689,12 @@ func (z *erasureServerSets) ListBuckets(ctx context.Context) (buckets []BucketIn func (z *erasureServerSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { // Acquire lock on format.json formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile) - if err := formatLock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := formatLock.GetLock(ctx, globalOperationTimeout) + if err != nil { return madmin.HealResultItem{}, err } - defer formatLock.Unlock() + ctx = lkctx.Context() + defer formatLock.Unlock(lkctx.Cancel) var r = madmin.HealResultItem{ Type: madmin.HealItemMetadata, @@ -1882,17 +1886,21 @@ func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, vers lk := z.NewNSLock(bucket, object) if bucket == minioMetaBucket { // For .minio.sys bucket heals we should hold write locks. - if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { return madmin.HealResultItem{}, err } - defer lk.Unlock() + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) } else { // Lock the object before healing. Use read lock since healing // will only regenerate parts & xl.meta of outdated disks. - if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) + if err != nil { return madmin.HealResultItem{}, err } - defer lk.RUnlock() + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) } for _, zone := range z.serverSets { diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index e1061c961..306c27735 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -715,10 +715,12 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Hold write lock on the object. destLock := fs.NewNSLock(bucket, object) - if err = destLock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := destLock.GetLock(ctx, globalOperationTimeout) + if err != nil { return oi, err } - defer destLock.Unlock() + ctx = lkctx.Context() + defer destLock.Unlock(lkctx.Cancel) bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix) fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index fbfbfbda6..33ff2182c 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -588,7 +588,7 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelet // CopyObject - copy object source object to destination object. // if source object and destination object are same we only // update metadata. -func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) { +func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) { if srcOpts.VersionID != "" && srcOpts.VersionID != nullVersionID { return oi, VersionNotFound{ Bucket: srcBucket, @@ -602,10 +602,12 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu if !cpSrcDstSame { objectDWLock := fs.NewNSLock(dstBucket, dstObject) - if err := objectDWLock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := objectDWLock.GetLock(ctx, globalOperationTimeout) + if err != nil { return oi, err } - defer objectDWLock.Unlock() + ctx = lkctx.Context() + defer objectDWLock.Unlock(lkctx.Cancel) } atomic.AddInt64(&fs.activeIOCount, 1) @@ -695,15 +697,19 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, lock := fs.NewNSLock(bucket, object) switch lockType { case writeLock: - if err = lock.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lock.GetLock(ctx, globalOperationTimeout) + if err != nil { return nil, err } - nsUnlocker = lock.Unlock + ctx = lkctx.Context() + nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } case readLock: - if err = lock.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) + if err != nil { return nil, err } - nsUnlocker = lock.RUnlock + ctx = lkctx.Context() + nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } } } @@ -786,11 +792,13 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse // Lock the object before reading. lk := fs.NewNSLock(bucket, object) - if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) + if err != nil { logger.LogIf(ctx, err) return err } - defer lk.RUnlock() + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) atomic.AddInt64(&fs.activeIOCount, 1) defer func() { @@ -1011,13 +1019,15 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) ( } // getObjectInfoWithLock - reads object metadata and replies back ObjectInfo. -func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) { +func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) { // Lock the object before reading. lk := fs.NewNSLock(bucket, object) - if err := lk.GetRLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) + if err != nil { return oi, err } - defer lk.RUnlock() + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) if err := checkGetObjArgs(ctx, bucket, object); err != nil { return oi, err @@ -1052,13 +1062,15 @@ func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, o oi, err := fs.getObjectInfoWithLock(ctx, bucket, object) if err == errCorruptedFormat || err == io.EOF { lk := fs.NewNSLock(bucket, object) - if err = lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { return oi, toObjectErr(err, bucket, object) } + ctx = lkctx.Context() fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) err = fs.createFsJSON(object, fsMetaPath) - lk.Unlock() + lk.Unlock(lkctx.Cancel) if err != nil { return oi, toObjectErr(err, bucket, object) } @@ -1092,7 +1104,7 @@ func (fs *FSObjects) parentDirIsObject(ctx context.Context, bucket, parent strin // until EOF, writes data directly to configured filesystem path. // Additionally writes `fs.json` which carries the necessary metadata // for future object operations. -func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error) { +func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { if opts.Versioned { return objInfo, NotImplemented{} } @@ -1103,11 +1115,14 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string // Lock the object. lk := fs.NewNSLock(bucket, object) - if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { logger.LogIf(ctx, err) return objInfo, err } - defer lk.Unlock() + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + defer ObjectPathUpdated(path.Join(bucket, object)) atomic.AddInt64(&fs.activeIOCount, 1) @@ -1285,10 +1300,12 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string, op // Acquire a write lock before deleting the object. lk := fs.NewNSLock(bucket, object) - if err = lk.GetLock(ctx, globalOperationTimeout); err != nil { + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { return objInfo, err } - defer lk.Unlock() + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) if err = checkDelObjArgs(ctx, bucket, object); err != nil { return objInfo, err diff --git a/cmd/iam.go b/cmd/iam.go index 59101f7bb..a4dfcd8bc 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -462,7 +462,8 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err := txnLk.GetLock(retryCtx, iamLockTimeout); err != nil { + lkctx, err := txnLk.GetLock(retryCtx, iamLockTimeout) + if err != nil { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock") continue } @@ -471,8 +472,8 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { // **** WARNING **** // Migrating to encrypted backend on etcd should happen before initialization of // IAM sub-system, make sure that we do not move the above codeblock elsewhere. - if err := migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil { - txnLk.Unlock() + if err := migrateIAMConfigsEtcdToEncrypted(lkctx.Context(), globalEtcdClient); err != nil { + txnLk.Unlock(lkctx.Cancel) logger.LogIf(ctx, fmt.Errorf("Unable to decrypt an encrypted ETCD backend for IAM users and policies: %w", err)) logger.LogIf(ctx, errors.New("IAM sub-system is partially initialized, some users may not be available")) return @@ -486,7 +487,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { // Migrate IAM configuration, if necessary. if err := sys.doIAMConfigMigration(ctx); err != nil { - txnLk.Unlock() + txnLk.Unlock(lkctx.Cancel) if errors.Is(err, errDiskNotFound) || errors.Is(err, errConfigNotFound) || errors.Is(err, context.Canceled) || @@ -503,7 +504,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { } // Successfully migrated, proceed to load the users. - txnLk.Unlock() + txnLk.Unlock(lkctx.Cancel) break } diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 323db0dc6..8082a8e5d 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -27,11 +27,11 @@ import ( // lockRequesterInfo stores various info from the client for each lock that is requested. type lockRequesterInfo struct { - Writer bool // Bool whether write or read lock. - UID string // UID to uniquely identify request of client. - Timestamp time.Time // Timestamp set at the time of initialization. - TimeLastCheck time.Time // Timestamp for last check of validity of lock. - Source string // Contains line, function and filename reqesting the lock. + Writer bool // Bool whether write or read lock. + UID string // UID to uniquely identify request of client. + Timestamp time.Time // Timestamp set at the time of initialization. + TimeLastRefresh time.Time // Timestamp for last lock refresh. + Source string // Contains line, function and filename reqesting the lock. // Owner represents the UUID of the owner who originally requested the lock // useful in expiry. Owner string @@ -92,20 +92,20 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool for _, resource := range args.Resources { l.lockMap[resource] = []lockRequesterInfo{ { - Writer: true, - Source: args.Source, - Owner: args.Owner, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - Quorum: args.Quorum, + Writer: true, + Source: args.Source, + Owner: args.Owner, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastRefresh: UTCNow(), + Quorum: args.Quorum, }, } } return true, nil } -func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { +func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() @@ -150,13 +150,13 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo l.mutex.Lock() defer l.mutex.Unlock() lrInfo := lockRequesterInfo{ - Writer: false, - Source: args.Source, - Owner: args.Owner, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - Quorum: args.Quorum, + Writer: false, + Source: args.Source, + Owner: args.Owner, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastRefresh: UTCNow(), + Quorum: args.Quorum, } resource := args.Resources[0] if lri, ok := l.lockMap[resource]; ok { @@ -172,7 +172,7 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo return reply, nil } -func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { +func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() var lri []lockRequesterInfo @@ -232,7 +232,7 @@ func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (rep } } -func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { +func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) { select { case <-ctx.Done(): return false, ctx.Err() @@ -240,33 +240,39 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired l.mutex.Lock() defer l.mutex.Unlock() + resource := args.Resources[0] // refresh check is always per resource. + // Lock found, proceed to verify if belongs to given uid. - for _, resource := range args.Resources { - if lri, ok := l.lockMap[resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == args.UID && entry.Owner == args.Owner { - return false, nil - } - } + lri, ok := l.lockMap[resource] + if !ok { + // lock doesn't exist yet, return false + return false, nil + } + + // Check whether uid is still active + for i := range lri { + if lri[i].UID == args.UID && lri[i].Owner == args.Owner { + lri[i].TimeLastRefresh = UTCNow() + return true, nil } } - return true, nil + + return false, nil } } // Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Caller must hold 'l.mutex' lock. -func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { +func (l *localLocker) expireOldLocks(interval time.Duration) { l.mutex.Lock() defer l.mutex.Unlock() - // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) - if lri, ok := l.lockMap[nlrip.name]; ok { - // Even if the entry exists, it may not be the same entry which was - // considered as expired, so we simply an attempt to remove it if its - // not possible there is nothing we need to do. - l.removeEntry(nlrip.name, dsync.LockArgs{Owner: nlrip.lri.Owner, UID: nlrip.lri.UID}, &lri) + for resource, lris := range l.lockMap { + for _, lri := range lris { + if time.Since(lri.TimeLastRefresh) > interval { + l.removeEntry(resource, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris) + } + } } } diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 2ad9afddd..ce0e85491 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -45,8 +45,8 @@ func toLockError(err error) error { switch err.Error() { case errLockConflict.Error(): return errLockConflict - case errLockNotExpired.Error(): - return errLockNotExpired + case errLockNotFound.Error(): + return errLockNotFound } return err } @@ -105,7 +105,7 @@ func (client *lockRESTClient) restCall(ctx context.Context, call string, args ds switch err { case nil: return true, nil - case errLockConflict, errLockNotExpired: + case errLockConflict, errLockNotFound: return false, nil default: return false, err @@ -123,18 +123,18 @@ func (client *lockRESTClient) Lock(ctx context.Context, args dsync.LockArgs) (re } // RUnlock calls read unlock REST API. -func (client *lockRESTClient) RUnlock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(context.Background(), lockRESTMethodRUnlock, args) +func (client *lockRESTClient) RUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodRUnlock, args) } // Unlock calls write unlock RPC. -func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(context.Background(), lockRESTMethodUnlock, args) +func (client *lockRESTClient) Unlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodUnlock, args) } -// Expired calls expired handler to check if lock args have expired. -func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { - return client.restCall(ctx, lockRESTMethodExpired, args) +// RUnlock calls read unlock REST API. +func (client *lockRESTClient) Refresh(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodRefresh, args) } // ForceUnlock calls force unlock handler to forcibly unlock an active lock. diff --git a/cmd/lock-rest-server-common.go b/cmd/lock-rest-server-common.go index e6d62930e..05bfa3ff5 100644 --- a/cmd/lock-rest-server-common.go +++ b/cmd/lock-rest-server-common.go @@ -28,11 +28,11 @@ const ( const ( lockRESTMethodHealth = "/health" + lockRESTMethodRefresh = "/refresh" lockRESTMethodLock = "/lock" lockRESTMethodRLock = "/rlock" lockRESTMethodUnlock = "/unlock" lockRESTMethodRUnlock = "/runlock" - lockRESTMethodExpired = "/expired" lockRESTMethodForceUnlock = "/force-unlock" // lockRESTOwner represents owner UUID @@ -52,6 +52,6 @@ const ( var ( errLockConflict = errors.New("lock conflict") - errLockNotExpired = errors.New("lock not expired") errLockNotInitialized = errors.New("lock not initialized") + errLockNotFound = errors.New("lock not found") ) diff --git a/cmd/lock-rest-server-common_test.go b/cmd/lock-rest-server-common_test.go index 57724a9c0..33d457a85 100644 --- a/cmd/lock-rest-server-common_test.go +++ b/cmd/lock-rest-server-common_test.go @@ -55,18 +55,18 @@ func TestLockRpcServerRemoveEntry(t *testing.T) { defer os.RemoveAll(testPath) lockRequesterInfo1 := lockRequesterInfo{ - Owner: "owner", - Writer: true, - UID: "0123-4567", - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), + Owner: "owner", + Writer: true, + UID: "0123-4567", + Timestamp: UTCNow(), + TimeLastRefresh: UTCNow(), } lockRequesterInfo2 := lockRequesterInfo{ - Owner: "owner", - Writer: true, - UID: "89ab-cdef", - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), + Owner: "owner", + Writer: true, + UID: "89ab-cdef", + Timestamp: UTCNow(), + TimeLastRefresh: UTCNow(), } locker.ll.lockMap["name"] = []lockRequesterInfo{ diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 27663a348..4c6df138f 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -20,7 +20,6 @@ import ( "bufio" "context" "errors" - "math/rand" "net/http" "path" "sort" @@ -35,8 +34,8 @@ const ( // Lock maintenance interval. lockMaintenanceInterval = 30 * time.Second - // Lock validity check interval. - lockValidityCheckInterval = 5 * time.Second + // Lock validity duration + lockValidityDuration = 20 * time.Second ) // To abstract a node over network. @@ -96,6 +95,31 @@ func (l *lockRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) { l.IsValid(w, r) } +// RefreshHandler - refresh the current lock +func (l *lockRESTServer) RefreshHandler(w http.ResponseWriter, r *http.Request) { + if !l.IsValid(w, r) { + l.writeErrorResponse(w, errors.New("invalid request")) + return + } + + args, err := getLockArgs(r) + if err != nil { + l.writeErrorResponse(w, err) + return + } + + refreshed, err := l.ll.Refresh(r.Context(), args) + if err != nil { + l.writeErrorResponse(w, err) + return + } + + if !refreshed { + l.writeErrorResponse(w, errLockNotFound) + return + } +} + // LockHandler - Acquires a lock. func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { @@ -132,7 +156,7 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) { return } - _, err = l.ll.Unlock(args) + _, err = l.ll.Unlock(context.Background(), args) // Ignore the Unlock() "reply" return value because if err == nil, "reply" is always true // Consequently, if err != nil, reply is always false if err != nil { @@ -179,36 +203,12 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request) // Ignore the RUnlock() "reply" return value because if err == nil, "reply" is always true. // Consequently, if err != nil, reply is always false - if _, err = l.ll.RUnlock(args); err != nil { + if _, err = l.ll.RUnlock(context.Background(), args); err != nil { l.writeErrorResponse(w, err) return } } -// ExpiredHandler - query expired lock status. -func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) { - if !l.IsValid(w, r) { - l.writeErrorResponse(w, errors.New("Invalid request")) - return - } - - args, err := getLockArgs(r) - if err != nil { - l.writeErrorResponse(w, err) - return - } - - expired, err := l.ll.Expired(r.Context(), args) - if err != nil { - l.writeErrorResponse(w, err) - return - } - if !expired { - l.writeErrorResponse(w, errLockNotExpired) - return - } -} - // ForceUnlockHandler - query expired lock status. func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { @@ -228,125 +228,17 @@ func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Reque } } -// nameLockRequesterInfoPair is a helper type for lock maintenance -type nameLockRequesterInfoPair struct { - name string - lri lockRequesterInfo -} - -// getLongLivedLocks returns locks that are older than a certain time and -// have not been 'checked' for validity too soon enough -func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterInfoPair { - nlripMap := make(map[Endpoint][]nameLockRequesterInfoPair) - for endpoint, locker := range globalLockServers { - rslt := []nameLockRequesterInfoPair{} - locker.mutex.Lock() - for name, lriArray := range locker.lockMap { - for idx := range lriArray { - // Check whether enough time has gone by since last check - if time.Since(lriArray[idx].TimeLastCheck) >= interval { - rslt = append(rslt, nameLockRequesterInfoPair{ - name: name, - lri: lriArray[idx], - }) - lriArray[idx].TimeLastCheck = UTCNow() - } - } - } - nlripMap[endpoint] = rslt - locker.mutex.Unlock() - } - return nlripMap -} - -// lockMaintenance loops over locks that have been active for some time and checks back -// with the original server whether it is still alive or not -// -// Following logic inside ignores the errors generated for Dsync.Active operation. -// - server at client down -// - some network error (and server is up normally) -// -// We will ignore the error, and we will retry later to get a resolve on this lock -func lockMaintenance(ctx context.Context, interval time.Duration) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return nil - } - - z, ok := objAPI.(*erasureServerSets) - if !ok { - return nil - } - - type nlock struct { - locks int - writer bool - } - - updateNlocks := func(nlripsMap map[string]nlock, name string, writer bool) { - nlk, ok := nlripsMap[name] - if !ok { - nlripsMap[name] = nlock{ - locks: 1, - writer: writer, - } - } else { - nlk.locks++ - nlripsMap[name] = nlk - } - } - - allLockersFn := z.GetAllLockers - - // Validate if long lived locks are indeed clean. - // Get list of long lived locks to check for staleness. - for lendpoint, nlrips := range getLongLivedLocks(interval) { - nlripsMap := make(map[string]nlock, len(nlrips)) - for _, nlrip := range nlrips { - for _, c := range allLockersFn() { - if !c.IsOnline() || c == nil { - continue - } - - ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) - - // Call back to original server verify whether the lock is - // still active (based on name & uid) - expired, err := c.Expired(ctx, dsync.LockArgs{ - Owner: nlrip.lri.Owner, - UID: nlrip.lri.UID, - Resources: []string{nlrip.name}, - }) - cancel() - if err != nil { - updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) - continue - } - - if !expired { - updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) - } - } - - // less than the quorum, we have locks expired. - if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum { - // Purge the stale entry if it exists. - globalLockServers[lendpoint].removeEntryIfExists(nlrip) - } - - } - } - - return nil -} - -// Start lock maintenance from all lock servers. -func startLockMaintenance(ctx context.Context) { +// lockMaintenance loops over all locks and discards locks +// that have not been refreshed for some time. +func lockMaintenance(ctx context.Context) { // Wait until the object API is ready // no need to start the lock maintenance // if ObjectAPI is not initialized. + + var objAPI ObjectLayer + for { - objAPI := newObjectLayerFn() + objAPI = newObjectLayerFn() if objAPI == nil { time.Sleep(time.Second) continue @@ -354,26 +246,26 @@ func startLockMaintenance(ctx context.Context) { break } - // Initialize a new ticker with a minute between each ticks. - ticker := time.NewTicker(lockMaintenanceInterval) - // Stop the timer upon service closure and cleanup the go-routine. - defer ticker.Stop() + if _, ok := objAPI.(*erasureServerSets); !ok { + return + } + + // Initialize a new ticker with 1 minute between each ticks. + lkTimer := time.NewTimer(lockMaintenanceInterval) + // Stop the timer upon returning. + defer lkTimer.Stop() - r := rand.New(rand.NewSource(UTCNow().UnixNano())) for { // Verifies every minute for locks held more than 2 minutes. select { case <-ctx.Done(): return - case <-ticker.C: - // Start with random sleep time, so as to avoid - // "synchronous checks" between servers - duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) - time.Sleep(duration) - if err := lockMaintenance(ctx, lockValidityCheckInterval); err != nil { - // Sleep right after an error. - duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval)) - time.Sleep(duration) + case <-lkTimer.C: + // Reset the timer for next cycle. + lkTimer.Reset(lockMaintenanceInterval) + + for _, lockServer := range globalLockServers { + lockServer.expireOldLocks(lockValidityDuration) } } } @@ -393,16 +285,16 @@ func registerLockRESTHandlers(router *mux.Router, endpointServerSets EndpointSer subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter() subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRefresh).HandlerFunc(httpTraceHdrs(lockServer.RefreshHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(lockServer.ForceUnlockHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)) globalLockServers[endpoint] = lockServer.ll } } - go startLockMaintenance(GlobalContext) + go lockMaintenance(GlobalContext) } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index ede34793f..b3925745c 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2016, 2017, 2018, 2019 MinIO, Inc. + * MinIO Cloud Storage, (C) 2018-2021 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,10 +38,28 @@ var globalLockServers = make(map[Endpoint]*localLocker) // RWLocker - locker interface to introduce GetRLock, RUnlock. type RWLocker interface { - GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) - Unlock() - GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) - RUnlock() + GetLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error) + Unlock(cancel context.CancelFunc) + GetRLock(ctx context.Context, timeout *dynamicTimeout) (lkCtx LockContext, timedOutErr error) + RUnlock(cancel context.CancelFunc) +} + +// LockContext lock context holds the lock backed context and canceler for the context. +type LockContext struct { + ctx context.Context + cancel context.CancelFunc +} + +// Context returns lock context +func (l LockContext) Context() context.Context { + return l.ctx +} + +// Cancel function calls cancel() function +func (l LockContext) Cancel() { + if l.cancel != nil { + l.cancel() + } } // newNSLock - return a new name space lock map. @@ -142,42 +160,52 @@ type distLockInstance struct { } // Lock - block until write lock is taken or timeout has occurred. -func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { +func (di *distLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (LockContext, error) { lockSource := getSource(2) start := UTCNow() - if !di.rwMutex.GetLock(ctx, di.opsID, lockSource, dsync.Options{ + newCtx, cancel := context.WithCancel(ctx) + if !di.rwMutex.GetLock(newCtx, cancel, di.opsID, lockSource, dsync.Options{ Timeout: timeout.Timeout(), }) { timeout.LogFailure() - return OperationTimedOut{} + cancel() + return LockContext{ctx: ctx, cancel: func() {}}, OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) - return nil + return LockContext{ctx: newCtx, cancel: cancel}, nil } // Unlock - block until write lock is released. -func (di *distLockInstance) Unlock() { +func (di *distLockInstance) Unlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } di.rwMutex.Unlock() } // RLock - block until read lock is taken or timeout has occurred. -func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { +func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (LockContext, error) { lockSource := getSource(2) start := UTCNow() - if !di.rwMutex.GetRLock(ctx, di.opsID, lockSource, dsync.Options{ + newCtx, cancel := context.WithCancel(ctx) + if !di.rwMutex.GetRLock(ctx, cancel, di.opsID, lockSource, dsync.Options{ Timeout: timeout.Timeout(), }) { timeout.LogFailure() - return OperationTimedOut{} + cancel() + return LockContext{ctx: ctx, cancel: func() {}}, OperationTimedOut{} } timeout.LogSuccess(UTCNow().Sub(start)) - return nil + return LockContext{ctx: newCtx, cancel: cancel}, nil } // RUnlock - block until read lock is released. -func (di *distLockInstance) RUnlock() { +func (di *distLockInstance) RUnlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } di.rwMutex.RUnlock() } @@ -205,27 +233,32 @@ func (n *nsLockMap) NewNSLock(lockers func() ([]dsync.NetLocker, string), volume } // Lock - block until write lock is taken or timeout has occurred. -func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { +func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeout) (_ LockContext, timedOutErr error) { lockSource := getSource(2) start := UTCNow() const readLock = false - var success []int + success := make([]int, len(li.paths)) for i, path := range li.paths { if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { timeout.LogFailure() - for _, sint := range success { - li.ns.unlock(li.volume, li.paths[sint], readLock) + for si, sint := range success { + if sint == 1 { + li.ns.unlock(li.volume, li.paths[si], readLock) + } } - return OperationTimedOut{} + return LockContext{}, OperationTimedOut{} } - success = append(success, i) + success[i] = 1 } timeout.LogSuccess(UTCNow().Sub(start)) - return + return LockContext{ctx: ctx, cancel: func() {}}, nil } // Unlock - block until write lock is released. -func (li *localLockInstance) Unlock() { +func (li *localLockInstance) Unlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } const readLock = false for _, path := range li.paths { li.ns.unlock(li.volume, path, readLock) @@ -233,27 +266,32 @@ func (li *localLockInstance) Unlock() { } // RLock - block until read lock is taken or timeout has occurred. -func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (timedOutErr error) { +func (li *localLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeout) (_ LockContext, timedOutErr error) { lockSource := getSource(2) start := UTCNow() const readLock = true - var success []int + success := make([]int, len(li.paths)) for i, path := range li.paths { if !li.ns.lock(ctx, li.volume, path, lockSource, li.opsID, readLock, timeout.Timeout()) { timeout.LogFailure() - for _, sint := range success { - li.ns.unlock(li.volume, li.paths[sint], readLock) + for si, sint := range success { + if sint == 1 { + li.ns.unlock(li.volume, li.paths[si], readLock) + } } - return OperationTimedOut{} + return LockContext{}, OperationTimedOut{} } - success = append(success, i) + success[i] = 1 } timeout.LogSuccess(UTCNow().Sub(start)) - return + return LockContext{ctx: ctx, cancel: func() {}}, nil } // RUnlock - block until read lock is released. -func (li *localLockInstance) RUnlock() { +func (li *localLockInstance) RUnlock(cancel context.CancelFunc) { + if cancel != nil { + cancel() + } const readLock = true for _, path := range li.paths { li.ns.unlock(li.volume, path, readLock) diff --git a/cmd/server-main.go b/cmd/server-main.go index 67576c17d..eae81e7ee 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -232,7 +232,8 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { for range retry.NewTimerWithJitter(retryCtx, 500*time.Millisecond, time.Second, retry.MaxJitter) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err = txnLk.GetLock(retryCtx, configLockTimeout); err != nil { + lkctx, err := txnLk.GetLock(retryCtx, configLockTimeout) + if err != nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") continue } @@ -249,7 +250,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // Upon success migrating the config, initialize all sub-systems // if all sub-systems initialized successfully return right away if err = initAllSubsystems(ctx, newObject); err == nil { - txnLk.Unlock() + txnLk.Unlock(lkctx.Cancel) // All successful return. if globalIsDistErasure { // These messages only meant primarily for distributed setup, so only log during distributed setup. @@ -259,7 +260,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { } } - txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible. + txnLk.Unlock(lkctx.Cancel) // Unlock the transaction lock and allow other nodes to acquire the lock if possible. // One of these retriable errors shall be retried. if errors.Is(err, errDiskNotFound) || diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 32eb0a89a..0d06a2a28 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -56,6 +56,12 @@ func (p *xlStorageDiskIDCheck) Healing() bool { } func (p *xlStorageDiskIDCheck) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { + select { + case <-ctx.Done(): + return dataUsageCache{}, ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return dataUsageCache{}, err } @@ -93,6 +99,12 @@ func (p *xlStorageDiskIDCheck) checkDiskStale() error { } func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err error) { + select { + case <-ctx.Done(): + return DiskInfo{}, ctx.Err() + default: + } + info, err = p.storage.DiskInfo(ctx) if err != nil { return info, err @@ -108,6 +120,12 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err } func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...string) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -115,6 +133,12 @@ func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...strin } func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -122,6 +146,12 @@ func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err } func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) ([]VolInfo, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return nil, err } @@ -129,6 +159,12 @@ func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) ([]VolInfo, error) } func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol VolInfo, err error) { + select { + case <-ctx.Done(): + return VolInfo{}, ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return vol, err } @@ -136,6 +172,12 @@ func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol } func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -143,6 +185,12 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for } func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return nil, err } @@ -166,6 +214,12 @@ func (p *xlStorageDiskIDCheck) WalkSplunk(ctx context.Context, volume, dirPath, } func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return nil, err } @@ -174,6 +228,12 @@ func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath stri } func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return 0, err } @@ -182,6 +242,12 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path } func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -190,6 +256,12 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa } func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return err } @@ -198,6 +270,12 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path stri } func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return nil, err } @@ -206,6 +284,12 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path } func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return err } @@ -214,6 +298,12 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat } func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return err } @@ -222,6 +312,12 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat } func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -230,6 +326,12 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa } func (p *xlStorageDiskIDCheck) CheckFile(ctx context.Context, volume string, path string) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -238,6 +340,12 @@ func (p *xlStorageDiskIDCheck) CheckFile(ctx context.Context, volume string, pat } func (p *xlStorageDiskIDCheck) DeleteFile(ctx context.Context, volume string, path string) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -257,6 +365,12 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string } func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := p.checkDiskStale(); err != nil { return err } @@ -265,6 +379,12 @@ func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path stri } func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -273,6 +393,12 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path } func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -281,6 +407,12 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s } func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return err } @@ -289,6 +421,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s } func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { + select { + case <-ctx.Done(): + return fi, ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return fi, err } @@ -297,6 +435,12 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if err = p.checkDiskStale(); err != nil { return nil, err } diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index e27542a98..d1083d63a 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -41,17 +41,28 @@ func log(format string, data ...interface{}) { } } -// DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before. -const DRWMutexAcquireTimeout = 1 * time.Second // 1 second. +// dRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before. +const drwMutexAcquireTimeout = 1 * time.Second // 1 second. + +// dRWMutexUnlockTimeout - timeout for the unlock call +const drwMutexUnlockCallTimeout = 30 * time.Second + +// dRWMutexRefreshTimeout - timeout for the refresh call +const drwMutexRefreshTimeout = 5 * time.Second + +// dRWMutexRefreshInterval - the interval between two refresh calls +const drwMutexRefreshInterval = 10 * time.Second + const drwMutexInfinite = 1<<63 - 1 // A DRWMutex is a distributed mutual exclusion lock. type DRWMutex struct { - Names []string - writeLocks []string // Array of nodes that granted a write lock - readersLocks [][]string // Array of array of nodes that granted reader locks - m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node - clnt *Dsync + Names []string + writeLocks []string // Array of nodes that granted a write lock + readersLocks [][]string // Array of array of nodes that granted reader locks + m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node + clnt *Dsync + cancelRefresh context.CancelFunc } // Granted - represents a structure of a granted lock. @@ -85,7 +96,7 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex { func (dm *DRWMutex) Lock(id, source string) { isReadLock := false - dm.lockBlocking(context.Background(), id, source, isReadLock, Options{ + dm.lockBlocking(context.Background(), nil, id, source, isReadLock, Options{ Timeout: drwMutexInfinite, }) } @@ -101,10 +112,10 @@ type Options struct { // If the lock is already in use, the calling go routine // blocks until either the mutex becomes available and return success or // more time has passed than the timeout value and return false. -func (dm *DRWMutex) GetLock(ctx context.Context, id, source string, opts Options) (locked bool) { +func (dm *DRWMutex) GetLock(ctx context.Context, cancel context.CancelFunc, id, source string, opts Options) (locked bool) { isReadLock := false - return dm.lockBlocking(ctx, id, source, isReadLock, opts) + return dm.lockBlocking(ctx, cancel, id, source, isReadLock, opts) } // RLock holds a read lock on dm. @@ -114,7 +125,7 @@ func (dm *DRWMutex) GetLock(ctx context.Context, id, source string, opts Options func (dm *DRWMutex) RLock(id, source string) { isReadLock := true - dm.lockBlocking(context.Background(), id, source, isReadLock, Options{ + dm.lockBlocking(context.Background(), nil, id, source, isReadLock, Options{ Timeout: drwMutexInfinite, }) } @@ -125,10 +136,10 @@ func (dm *DRWMutex) RLock(id, source string) { // Otherwise the calling go routine blocks until either the mutex becomes // available and return success or more time has passed than the timeout // value and return false. -func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Options) (locked bool) { +func (dm *DRWMutex) GetRLock(ctx context.Context, cancel context.CancelFunc, id, source string, opts Options) (locked bool) { isReadLock := true - return dm.lockBlocking(ctx, id, source, isReadLock, opts) + return dm.lockBlocking(ctx, cancel, id, source, isReadLock, opts) } const ( @@ -140,7 +151,7 @@ const ( // The function will loop using a built-in timing randomized back-off // algorithm until either the lock is acquired successfully or more // time has elapsed than the timeout value. -func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) { +func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), id, source string, isReadLock bool, opts Options) (locked bool) { restClnts, _ := dm.clnt.GetLockers() r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -195,6 +206,10 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL dm.m.Unlock() log("lockBlocking %s/%s for %#v: granted\n", id, source, dm.Names) + + // Refresh lock continuously and cancel if there is no quorum in the lock anymore + dm.startContinousLockRefresh(lockLossCallback, id, source, quorum) + return locked } @@ -203,6 +218,137 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL } } +func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, source string, quorum int) { + ctx, cancel := context.WithCancel(context.Background()) + + dm.m.Lock() + dm.cancelRefresh = cancel + dm.m.Unlock() + + go func() { + defer cancel() + + refreshTimer := time.NewTimer(drwMutexRefreshInterval) + defer refreshTimer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-refreshTimer.C: + refreshTimer.Reset(drwMutexRefreshInterval) + refreshed, err := refresh(ctx, dm.clnt, id, source, quorum, dm.Names...) + if err == nil && !refreshed { + if lockLossCallback != nil { + lockLossCallback() + } + return + } + } + } + }() +} + +type refreshResult struct { + offline bool + succeeded bool +} + +func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int, lockNames ...string) (bool, error) { + restClnts, owner := ds.GetLockers() + + // Create buffered channel of size equal to total number of nodes. + ch := make(chan refreshResult, len(restClnts)) + var wg sync.WaitGroup + + for index, c := range restClnts { + wg.Add(1) + // Send refresh request to all nodes + go func(index int, c NetLocker) { + defer wg.Done() + + if c == nil { + ch <- refreshResult{offline: true} + return + } + + args := LockArgs{ + Owner: owner, + UID: id, + Resources: lockNames, + Source: source, + Quorum: quorum, + } + + ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshTimeout) + defer cancel() + + refreshed, err := c.Refresh(ctx, args) + if refreshed && err == nil { + ch <- refreshResult{succeeded: true} + } else { + if err != nil { + ch <- refreshResult{offline: true} + log("dsync: Unable to call Refresh failed with %s for %#v at %s\n", err, args, c) + } else { + ch <- refreshResult{succeeded: false} + log("dsync: Refresh returned false for %#v at %s\n", args, c) + } + } + + }(index, c) + } + + // Wait until we have either + // + // a) received all refresh responses + // b) received too many refreshed for quorum to be still possible + // c) timed out + // + i, refreshFailed, refreshSucceeded := 0, 0, 0 + done := false + + for ; i < len(restClnts); i++ { + select { + case refresh := <-ch: + if refresh.offline { + continue + } + if refresh.succeeded { + refreshSucceeded++ + } else { + refreshFailed++ + } + if refreshFailed > quorum { + // We know that we are not going to succeed with refresh + done = true + } + case <-ctx.Done(): + // Refreshing is canceled + return false, ctx.Err() + } + + if done { + break + } + } + + refreshQuorum := refreshSucceeded >= quorum + if !refreshQuorum { + refreshQuorum = refreshFailed < quorum + } + + // We may have some unused results in ch, release them async. + go func() { + wg.Wait() + close(ch) + for range ch { + } + }() + + return refreshQuorum, nil +} + // lock tries to acquire the distributed lock, returning true or false. func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, tolerance, quorum int, lockNames ...string) bool { for i := range *locks { @@ -216,7 +362,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is var wg sync.WaitGroup // Combined timeout for the lock attempt. - ctx, cancel := context.WithTimeout(ctx, DRWMutexAcquireTimeout) + ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout) defer cancel() for index, c := range restClnts { wg.Add(1) @@ -386,6 +532,9 @@ func releaseAll(ds *Dsync, tolerance int, owner string, locks *[]string, isReadL // // It is a run-time error if dm is not locked on entry to Unlock. func (dm *DRWMutex) Unlock() { + dm.m.Lock() + dm.cancelRefresh() + dm.m.Unlock() restClnts, owner := dm.clnt.GetLockers() // create temp array on stack @@ -425,6 +574,9 @@ func (dm *DRWMutex) Unlock() { // // It is a run-time error if dm is not locked on entry to RUnlock. func (dm *DRWMutex) RUnlock() { + dm.m.Lock() + dm.cancelRefresh() + dm.m.Unlock() // create temp array on stack restClnts, owner := dm.clnt.GetLockers() @@ -465,13 +617,16 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo Resources: names, } + ctx, cancel := context.WithTimeout(context.Background(), drwMutexUnlockCallTimeout) + defer cancel() + if isReadLock { - if _, err := c.RUnlock(args); err != nil { + if _, err := c.RUnlock(ctx, args); err != nil { log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c) return false } } else { - if _, err := c.Unlock(args); err != nil { + if _, err := c.Unlock(ctx, args); err != nil { log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c) return false } diff --git a/pkg/dsync/drwmutex_test.go b/pkg/dsync/drwmutex_test.go index 7a20bd451..f831f9689 100644 --- a/pkg/dsync/drwmutex_test.go +++ b/pkg/dsync/drwmutex_test.go @@ -36,12 +36,14 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { drwm := NewDRWMutex(ds, "simplelock") - if !drwm.GetRLock(context.Background(), id, source, Options{Timeout: time.Second}) { + ctx1, cancel1 := context.WithCancel(context.Background()) + if !drwm.GetRLock(ctx1, cancel1, id, source, Options{Timeout: time.Second}) { panic("Failed to acquire read lock") } // fmt.Println("1st read lock acquired, waiting...") - if !drwm.GetRLock(context.Background(), id, source, Options{Timeout: time.Second}) { + ctx2, cancel2 := context.WithCancel(context.Background()) + if !drwm.GetRLock(ctx2, cancel2, id, source, Options{Timeout: time.Second}) { panic("Failed to acquire read lock") } // fmt.Println("2nd read lock acquired, waiting...") @@ -59,7 +61,8 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) { }() // fmt.Println("Trying to acquire write lock, waiting...") - locked = drwm.GetLock(context.Background(), id, source, Options{Timeout: duration}) + ctx3, cancel3 := context.WithCancel(context.Background()) + locked = drwm.GetLock(ctx3, cancel3, id, source, Options{Timeout: duration}) if locked { // fmt.Println("Write lock acquired, waiting...") time.Sleep(time.Second) @@ -93,7 +96,8 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { drwm := NewDRWMutex(ds, "duallock") // fmt.Println("Getting initial write lock") - if !drwm.GetLock(context.Background(), id, source, Options{Timeout: time.Second}) { + ctx1, cancel1 := context.WithCancel(context.Background()) + if !drwm.GetLock(ctx1, cancel1, id, source, Options{Timeout: time.Second}) { panic("Failed to acquire initial write lock") } @@ -104,7 +108,8 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) { }() // fmt.Println("Trying to acquire 2nd write lock, waiting...") - locked = drwm.GetLock(context.Background(), id, source, Options{Timeout: duration}) + ctx2, cancel2 := context.WithCancel(context.Background()) + locked = drwm.GetLock(ctx2, cancel2, id, source, Options{Timeout: duration}) if locked { // fmt.Println("2nd write lock acquired, waiting...") time.Sleep(time.Second) @@ -139,7 +144,7 @@ func TestDualWriteLockTimedOut(t *testing.T) { // Borrowed from rwmutex_test.go func parallelReader(ctx context.Context, m *DRWMutex, clocked, cunlock, cdone chan bool) { - if m.GetRLock(ctx, id, source, Options{Timeout: time.Second}) { + if m.GetRLock(ctx, nil, id, source, Options{Timeout: time.Second}) { clocked <- true <-cunlock m.RUnlock() @@ -182,7 +187,7 @@ func TestParallelReaders(t *testing.T) { // Borrowed from rwmutex_test.go func reader(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) { for i := 0; i < numIterations; i++ { - if rwm.GetRLock(context.Background(), id, source, Options{Timeout: time.Second}) { + if rwm.GetRLock(context.Background(), nil, id, source, Options{Timeout: time.Second}) { n := atomic.AddInt32(activity, 1) if n < 1 || n >= 10000 { panic(fmt.Sprintf("wlock(%d)\n", n)) @@ -199,7 +204,7 @@ func reader(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) // Borrowed from rwmutex_test.go func writer(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) { for i := 0; i < numIterations; i++ { - if rwm.GetLock(context.Background(), id, source, Options{Timeout: time.Second}) { + if rwm.GetLock(context.Background(), nil, id, source, Options{Timeout: time.Second}) { n := atomic.AddInt32(activity, 10000) if n != 10000 { panic(fmt.Sprintf("wlock(%d)\n", n)) diff --git a/pkg/dsync/dsync-server_test.go b/pkg/dsync/dsync-server_test.go index fad0fb7be..ea75ef2d3 100644 --- a/pkg/dsync/dsync-server_test.go +++ b/pkg/dsync/dsync-server_test.go @@ -30,6 +30,15 @@ type lockServer struct { // Map of locks, with negative value indicating (exclusive) write lock // and positive values indicating number of read locks lockMap map[string]int64 + + // Refresh returns lock not found if set to true + lockNotFound bool +} + +func (l *lockServer) setRefreshReply(refreshed bool) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.lockNotFound = !refreshed } func (l *lockServer) Lock(args *LockArgs, reply *bool) error { @@ -91,6 +100,13 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { return nil } +func (l *lockServer) Refresh(args *LockArgs, reply *bool) error { + l.mutex.Lock() + defer l.mutex.Unlock() + *reply = !l.lockNotFound + return nil +} + func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() diff --git a/pkg/dsync/dsync_test.go b/pkg/dsync/dsync_test.go index 87450ce35..e0094865d 100644 --- a/pkg/dsync/dsync_test.go +++ b/pkg/dsync/dsync_test.go @@ -19,6 +19,7 @@ package dsync_test import ( + "context" "fmt" "log" "math/rand" @@ -32,19 +33,26 @@ import ( "time" "github.com/google/uuid" + "github.com/minio/minio/pkg/dsync" . "github.com/minio/minio/pkg/dsync" ) +const numberOfNodes = 5 + var ds *Dsync var rpcPaths []string // list of rpc paths where lock server is serving. -func startRPCServers(nodes []string) { +var nodes = make([]string, numberOfNodes) // list of node IP addrs or hostname with ports. +var lockServers []*lockServer + +func startRPCServers() { for i := range nodes { server := rpc.NewServer() - server.RegisterName("Dsync", &lockServer{ + ls := &lockServer{ mutex: sync.Mutex{}, lockMap: make(map[string]int64), - }) + } + server.RegisterName("Dsync", ls) // For some reason the registration paths need to be different (even for different server objs) server.HandleHTTP(rpcPaths[i], fmt.Sprintf("%s-debug", rpcPaths[i])) l, e := net.Listen("tcp", ":"+strconv.Itoa(i+12345)) @@ -52,6 +60,8 @@ func startRPCServers(nodes []string) { log.Fatal("listen error:", e) } go http.Serve(l, nil) + + lockServers = append(lockServers, ls) } // Let servers start @@ -64,7 +74,6 @@ func TestMain(m *testing.M) { rand.Seed(time.Now().UTC().UnixNano()) - nodes := make([]string, 5) // list of node IP addrs or hostname with ports. for i := range nodes { nodes[i] = fmt.Sprintf("127.0.0.1:%d", i+12345) } @@ -82,7 +91,7 @@ func TestMain(m *testing.M) { GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() }, } - startRPCServers(nodes) + startRPCServers() os.Exit(m.Run()) } @@ -231,6 +240,42 @@ func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) { time.Sleep(10 * time.Millisecond) } +// Test refreshing lock +func TestFailedRefreshLock(t *testing.T) { + // Simulate Refresh RPC response to return no locking found + for i := range lockServers { + lockServers[i].setRefreshReply(false) + } + + dm := NewDRWMutex(ds, "aap") + wg := sync.WaitGroup{} + wg.Add(1) + + ctx, cl := context.WithCancel(context.Background()) + cancel := func() { + cl() + wg.Done() + } + + if !dm.GetLock(ctx, cancel, id, source, dsync.Options{Timeout: 5 * time.Minute}) { + t.Fatal("GetLock() should be successful") + } + + // Wait until context is canceled + wg.Wait() + if ctx.Err() == nil { + t.Fatal("Unexpected error", ctx.Err()) + } + + // Should be safe operation in all cases + dm.Unlock() + + // Revert Refresh RPC response to locking found + for i := range lockServers { + lockServers[i].setRefreshReply(false) + } +} + // Borrowed from mutex_test.go func HammerMutex(m *DRWMutex, loops int, cdone chan bool) { for i := 0; i < loops; i++ { diff --git a/pkg/dsync/rpc-client-impl_test.go b/pkg/dsync/rpc-client-impl_test.go index 2172a0cd5..b1878eb6c 100644 --- a/pkg/dsync/rpc-client-impl_test.go +++ b/pkg/dsync/rpc-client-impl_test.go @@ -114,9 +114,9 @@ func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err err return status, err } -func (rpcClient *ReconnectRPCClient) Expired(ctx context.Context, args LockArgs) (expired bool, err error) { - err = rpcClient.Call("Dsync.Expired", &args, &expired) - return expired, err +func (rpcClient *ReconnectRPCClient) Refresh(ctx context.Context, args LockArgs) (refreshed bool, err error) { + err = rpcClient.Call("Dsync.Refresh", &args, &refreshed) + return refreshed, err } func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (status bool, err error) { diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index f200e345b..b3d6e6a89 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -53,18 +53,18 @@ type NetLocker interface { // Do read unlock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation // * an error on failure of unlock request operation. - RUnlock(args LockArgs) (bool, error) + RUnlock(ctx context.Context, args LockArgs) (bool, error) // Do write unlock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation // * an error on failure of unlock request operation. - Unlock(args LockArgs) (bool, error) + Unlock(ctx context.Context, args LockArgs) (bool, error) // Force unlock a resource ForceUnlock(ctx context.Context, args LockArgs) (bool, error) - // Expired returns if current lock args has expired. - Expired(ctx context.Context, args LockArgs) (bool, error) + // Refresh the given lock to prevent it from becoming stale + Refresh(ctx context.Context, args LockArgs) (bool, error) // Returns underlying endpoint of this lock client instance. String() string