Lock while creating buckets (#12999)

Ensure that one call will succeed and others will serialize

Example failure without code in place:
```
    bucket-policy-handlers_test.go:120: unexpected error: cmd.InsufficientWriteQuorum: Storage resources are insufficient for the write operation doz2wjqaovp5kvlrv11fyacowgcvoziszmkmzzz9nk9au946qwhci4zkane5-1/
    bucket-policy-handlers_test.go:120: unexpected error: cmd.InsufficientWriteQuorum: Storage resources are insufficient for the write operation doz2wjqaovp5kvlrv11fyacowgcvoziszmkmzzz9nk9au946qwhci4zkane5-1/
    bucket-policy-handlers_test.go:135: want 1 ok, got 0
```
This commit is contained in:
Klaus Post 2021-08-19 22:21:02 +02:00 committed by GitHub
parent e9d970154d
commit 47b577fcc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 0 deletions

View File

@ -26,6 +26,7 @@ import (
"net/http/httptest"
"reflect"
"strings"
"sync"
"testing"
"github.com/minio/minio/internal/auth"
@ -93,6 +94,52 @@ func getAnonWriteOnlyObjectPolicy(bucketName, prefix string) *policy.Policy {
}
}
// Wrapper for calling Create Bucket and ensure we get one and only one success.
func TestCreateBucket(t *testing.T) {
ExecObjectLayerAPITest(t, testCreateBucket, []string{"MakeBucketWithLocation"})
}
// testCreateBucket - Test for calling Create Bucket and ensure we get one and only one success.
func testCreateBucket(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
credentials auth.Credentials, t *testing.T) {
bucketName1 := fmt.Sprintf("%s-1", bucketName)
const n = 100
var start = make(chan struct{})
var ok, errs int
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
// Sync start.
<-start
if err := obj.MakeBucketWithLocation(GlobalContext, bucketName1, BucketOptions{}); err != nil {
if _, ok := err.(BucketExists); !ok {
t.Logf("unexpected error: %T: %v", err, err)
return
}
mu.Lock()
errs++
mu.Unlock()
return
}
mu.Lock()
ok++
mu.Unlock()
}()
}
close(start)
wg.Wait()
if ok != 1 {
t.Fatalf("want 1 ok, got %d", ok)
}
if errs != n-1 {
t.Fatalf("want %d errors, got %d", n-1, errs)
}
}
// Wrapper for calling Put Bucket Policy HTTP handler tests for both Erasure multiple disks and single node setup.
func TestPutBucketPolicyHandler(t *testing.T) {
ExecObjectLayerAPITest(t, testPutBucketPolicyHandler, []string{"PutBucketPolicy"})

View File

@ -582,6 +582,15 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error {
g := errgroup.WithNErrs(len(z.serverPools))
// Lock the bucket name before creating.
lk := z.NewNSLock(minioMetaTmpBucket, bucket+".lck")
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
// Create buckets in parallel across all sets.
for index := range z.serverPools {
index := index