minio/cmd/namespace-lock_test.go
Harshavardhana 4daa0d2cee lock: Moving locking to handler layer. (#3381)
This is implemented so that the issues like in the
following flow don't affect the behavior of operation.

```
GetObjectInfo()
.... --> Time window for mutation (no lock held)
.... --> Time window for mutation (no lock held)
GetObject()
```

This happens when two simultaneous uploads are made
to the same object the object has returned wrong
info to the client.

Another classic example is "CopyObject" API itself
which reads from a source object and copies to
destination object.

Fixes #3370
Fixes #2912
2016-12-10 16:15:12 -08:00

417 lines
13 KiB
Go

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"strconv"
"sync"
"testing"
"time"
)
// Tests functionality provided by namespace lock.
func TestNamespaceLockTest(t *testing.T) {
// List of test cases.
testCases := []struct {
lk func(s1, s2, s3 string)
unlk func(s1, s2, s3 string)
rlk func(s1, s2, s3 string)
runlk func(s1, s2, s3 string)
lkCount int
lockedRefCount uint
unlockedRefCount uint
shouldPass bool
}{
{
lk: globalNSMutex.Lock,
unlk: globalNSMutex.Unlock,
lockedRefCount: 1,
unlockedRefCount: 0,
shouldPass: true,
},
{
rlk: globalNSMutex.RLock,
runlk: globalNSMutex.RUnlock,
lockedRefCount: 4,
unlockedRefCount: 2,
shouldPass: true,
},
{
rlk: globalNSMutex.RLock,
runlk: globalNSMutex.RUnlock,
lockedRefCount: 1,
unlockedRefCount: 0,
shouldPass: true,
},
}
// Run all test cases.
// Write lock tests.
testCase := testCases[0]
testCase.lk("a", "b", "c") // lock once.
nsLk, ok := globalNSMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
}
// Validate locked ref count.
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref)
}
testCase.unlk("a", "b", "c") // unlock once.
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.unlockedRefCount, nsLk.ref)
}
_, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
if ok && !testCase.shouldPass {
t.Errorf("Lock map found after unlock.")
}
// Read lock tests.
testCase = testCases[1]
testCase.rlk("a", "b", "c") // lock once.
testCase.rlk("a", "b", "c") // lock second time.
testCase.rlk("a", "b", "c") // lock third time.
testCase.rlk("a", "b", "c") // lock fourth time.
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
}
// Validate locked ref count.
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 1, testCase.lockedRefCount, nsLk.ref)
}
testCase.runlk("a", "b", "c") // unlock once.
testCase.runlk("a", "b", "c") // unlock second time.
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 2, testCase.unlockedRefCount, nsLk.ref)
}
_, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock map not found.")
}
// Read lock 0 ref count.
testCase = testCases[2]
testCase.rlk("a", "c", "d") // lock once.
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "c"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
}
// Validate locked ref count.
if testCase.lockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.lockedRefCount, nsLk.ref)
}
testCase.runlk("a", "c", "d") // unlock once.
if testCase.unlockedRefCount != nsLk.ref && testCase.shouldPass {
t.Errorf("Test %d fails, expected to pass. Wanted ref count is %d, got %d", 3, testCase.unlockedRefCount, nsLk.ref)
}
_, ok = globalNSMutex.lockMap[nsParam{"a", "c"}]
if ok && !testCase.shouldPass {
t.Errorf("Lock map not found.")
}
}
func TestLockStats(t *testing.T) {
expectedResult := []lockStateCase{
// Test case - 1.
// Case where 10 read locks are held.
// Entry for any of the 10 reads locks has to be found.
// Since they held in a loop, Lock origin for first 10 read locks (opsID 0-9) should be the same.
{
volume: "my-bucket",
path: "my-object",
opsID: "0",
readLock: true,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 10,
expectedRunningLockCount: 10,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 10,
expectedVolPathRunningCount: 10,
expectedVolPathBlockCount: 0,
},
// Test case - 2.
// Case where the first 5 read locks are released.
// Entry for any of the 6-10th "Running" reads lock has to be found.
{
volume: "my-bucket",
path: "my-object",
opsID: "6",
readLock: true,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 5,
expectedRunningLockCount: 5,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 5,
expectedVolPathRunningCount: 5,
expectedVolPathBlockCount: 0,
},
// Test case - 3.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 2,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 2,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 1,
},
// Test case - 4.
{
volume: "my-bucket",
path: "my-object",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 0,
expectedVolPathBlockCount: 1,
},
// Test case - 5.
{
volume: "my-bucket",
path: "my-object",
opsID: "11",
readLock: false,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 0,
},
// Test case - 6.
// Case where in the first 5 read locks are released, but 2 write locks are
// blocked waiting for the remaining 5 read locks locks to be released (10 read locks were held initially).
// We check the entry for the first blocked write call here.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
// write lock is held at line 318.
// this confirms that we are looking the right write lock.
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
// count of held(running) + blocked locks.
expectedGlobalLockCount: 7,
// count of acquired locks.
expectedRunningLockCount: 5,
// 2 write calls are blocked, waiting for the remaining 5 read locks.
expectedBlockedLockCount: 2,
expectedVolPathLockCount: 7,
expectedVolPathRunningCount: 5,
expectedVolPathBlockCount: 2,
},
// Test case - 7.
// Case where in 9 out of 10 read locks are released.
// Since there's one more pending read lock, the 2 write locks are still blocked.
// Testing the entry for the last read lock.
{volume: "my-bucket",
path: "my-object",
opsID: "9",
readLock: true,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
// Total running + blocked locks.
// 2 blocked write lock.
expectedGlobalLockCount: 3,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 2,
expectedVolPathLockCount: 3,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 2,
},
// Test case - 8.
{
volume: "my-bucket",
path: "my-object",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 0,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 0,
},
}
var wg sync.WaitGroup
// initializing the locks.
initNSLock(false)
// hold 10 read locks.
for i := 0; i < 10; i++ {
globalNSMutex.RLock("my-bucket", "my-object", strconv.Itoa(i))
}
// expected lock info.
expectedLockStats := expectedResult[0]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 1)
// unlock 5 readlock.
for i := 0; i < 5; i++ {
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i))
}
expectedLockStats = expectedResult[1]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 2)
syncChan := make(chan struct{}, 1)
wg.Add(1)
go func() {
defer wg.Done()
// blocks till all read locks are released.
globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(10))
// Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock.
expectedWLockStats := expectedResult[2]
// Since the write lock acquired here, the number of blocked locks should reduce by 1 and
// count of running locks should increase by 1.
verifyLockState(expectedWLockStats, t, 3)
// release the write lock.
globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10))
// The number of running locks should decrease by 1.
// expectedWLockStats = expectedResult[3]
// verifyLockState(expectedWLockStats, t, 4)
// Take the lock stats after the first write lock is unlocked.
// Only then unlock then second write lock.
syncChan <- struct{}{}
}()
// waiting so that the write locks in the above go routines are held.
// sleeping so that we can predict the order of the write locks held.
time.Sleep(100 * time.Millisecond)
// since there are 5 more readlocks still held on <"my-bucket","my-object">,
// an attempt to hold write locks blocks. So its run in a new go routine.
wg.Add(1)
go func() {
defer wg.Done()
// blocks till all read locks are released.
globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(11))
// Once the above attempt to lock is unblocked/acquired, we release the lock.
// Unlock the second write lock only after lock stats for first write lock release is taken.
<-syncChan
// The number of running locks should decrease by 1.
expectedWLockStats := expectedResult[4]
verifyLockState(expectedWLockStats, t, 5)
globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11))
}()
expectedLockStats = expectedResult[5]
time.Sleep(1 * time.Second)
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 6)
// unlock 4 out of remaining 5 read locks.
for i := 0; i < 4; i++ {
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5))
}
// verify the entry for one remaining read lock and count of blocked write locks.
expectedLockStats = expectedResult[6]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 7)
// Releasing the last read lock.
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9))
wg.Wait()
expectedLockStats = expectedResult[7]
// verify the actual lock info with the expected one.
verifyGlobalLockStats(expectedLockStats, t, 8)
}
// Tests functionality to forcefully unlock locks.
func TestNamespaceForceUnlockTest(t *testing.T) {
// Create lock.
lock := globalNSMutex.NewNSLock("bucket", "object")
lock.Lock()
// Forcefully unlock lock.
globalNSMutex.ForceUnlock("bucket", "object")
ch := make(chan struct{}, 1)
go func() {
// Try to claim lock again.
anotherLock := globalNSMutex.NewNSLock("bucket", "object")
anotherLock.Lock()
// And signal succes.
ch <- struct{}{}
}()
select {
case <-ch:
// Signalled so all is fine.
break
case <-time.After(100 * time.Millisecond):
// In case we hit the time out, the lock has not been cleared.
t.Errorf("Lock not cleared.")
}
// Clean up lock.
globalNSMutex.ForceUnlock("bucket", "object")
}