Locker: Improve Refresh speed (#13430)

Refresh was doing a linear scan of all locked resources. This was adding 
up to significant delays in locking on high load systems with long 
running requests.

Add a secondary index for O(log(n)) UID -> resource lookups. 
Multiple resources are stored in consecutive strings.

Bonus fixes:

 * On multiple Unlock entries unlock the write locks we can.
 * Fix `expireOldLocks` skipping checks on entry after expiring one.
 * Return fast on canTakeUnlock/canTakeLock.
 * Prealloc some places.
This commit is contained in:
Klaus Post 2021-10-15 03:12:13 -07:00 committed by GitHub
parent 76239fa1ae
commit 779060bc16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 129 additions and 50 deletions

View File

@ -20,6 +20,8 @@ package cmd
import (
"context"
"fmt"
"math"
"strconv"
"sync"
"time"
@ -40,6 +42,7 @@ type lockRequesterInfo struct {
Owner string
// Quorum represents the quorum required for this lock to be active.
Quorum int
idx int
}
// isWriteLock returns whether the lock is a write or read lock.
@ -51,6 +54,7 @@ func isWriteLock(lri []lockRequesterInfo) bool {
type localLocker struct {
mutex sync.Mutex
lockMap map[string][]lockRequesterInfo
lockUID map[string]string // UUID -> resource map.
}
func (l *localLocker) String() string {
@ -58,25 +62,22 @@ func (l *localLocker) String() string {
}
func (l *localLocker) canTakeUnlock(resources ...string) bool {
var lkCnt int
for _, resource := range resources {
isWriteLockTaken := isWriteLock(l.lockMap[resource])
if isWriteLockTaken {
lkCnt++
if !isWriteLock(l.lockMap[resource]) {
return false
}
}
return lkCnt == len(resources)
return true
}
func (l *localLocker) canTakeLock(resources ...string) bool {
var noLkCnt int
for _, resource := range resources {
_, lockTaken := l.lockMap[resource]
if !lockTaken {
noLkCnt++
if lockTaken {
return false
}
}
return noLkCnt == len(resources)
return true
}
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
@ -91,7 +92,7 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
// No locks held on the all resources, so claim write
// lock on all resources at once.
for _, resource := range args.Resources {
for i, resource := range args.Resources {
l.lockMap[resource] = []lockRequesterInfo{
{
Name: resource,
@ -103,37 +104,46 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
TimeLastRefresh: UTCNow(),
Group: len(args.Resources) > 1,
Quorum: args.Quorum,
idx: i,
},
}
l.lockUID[formatUUID(args.UID, i)] = resource
}
return true, nil
}
func formatUUID(s string, idx int) string {
return s + strconv.Itoa(idx)
}
func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
l.mutex.Lock()
defer l.mutex.Unlock()
err = nil
if !l.canTakeUnlock(args.Resources...) {
// Unless it is a write lock reject it.
return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources)
}
for _, resource := range args.Resources {
if !l.canTakeUnlock(resource) {
// Unless it is a write lock reject it.
err = fmt.Errorf("unlock attempted on a read locked entity: %s", resource)
continue
}
lri, ok := l.lockMap[resource]
if ok {
l.removeEntry(resource, args, &lri)
reply = l.removeEntry(resource, args, &lri) || reply
}
}
return true, nil
return
}
// removeEntry based on the uid of the lock message, removes a single entry from the
// lockRequesterInfo array or the whole array from the map (in case of a write lock
// or last read lock)
// UID and optionally owner must match for entries to be deleted.
func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockRequesterInfo) bool {
// Find correct entry to remove based on uid.
for index, entry := range *lri {
if entry.UID == args.UID && entry.Owner == args.Owner {
if entry.UID == args.UID && (args.Owner == "" || entry.Owner == args.Owner) {
if len(*lri) == 1 {
// Remove the write lock.
delete(l.lockMap, name)
@ -142,6 +152,7 @@ func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockR
*lri = append((*lri)[:index], (*lri)[index+1:]...)
l.lockMap[name] = *lri
}
delete(l.lockUID, formatUUID(args.UID, entry.idx))
return true
}
}
@ -151,6 +162,10 @@ func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockR
}
func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > 1 {
return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource")
}
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0]
@ -168,16 +183,22 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
if reply = !isWriteLock(lri); reply {
// Unless there is a write lock
l.lockMap[resource] = append(l.lockMap[resource], lrInfo)
l.lockUID[args.UID] = formatUUID(resource, 0)
}
} else {
// No locks held on the given name, so claim (first) read lock
l.lockMap[resource] = []lockRequesterInfo{lrInfo}
l.lockUID[args.UID] = formatUUID(resource, 0)
reply = true
}
return reply, nil
}
func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > 1 {
return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource")
}
l.mutex.Lock()
defer l.mutex.Unlock()
var lri []lockRequesterInfo
@ -187,9 +208,9 @@ func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply boo
// No lock is held on the given name
return true, nil
}
if reply = !isWriteLock(lri); !reply {
if isWriteLock(lri) {
// A write-lock is held, cannot release a read lock
return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource)
return false, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource)
}
l.removeEntry(resource, args, &lri)
return reply, nil
@ -199,9 +220,9 @@ func (l *localLocker) DupLockMap() map[string][]lockRequesterInfo {
l.mutex.Lock()
defer l.mutex.Unlock()
lockCopy := map[string][]lockRequesterInfo{}
lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap))
for k, v := range l.lockMap {
lockCopy[k] = append(lockCopy[k], v...)
lockCopy[k] = append(make([]lockRequesterInfo, 0, len(v)), v...)
}
return lockCopy
}
@ -229,22 +250,53 @@ func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (rep
defer l.mutex.Unlock()
if len(args.UID) == 0 {
for _, resource := range args.Resources {
delete(l.lockMap, resource) // Remove the lock (irrespective of write or read lock)
lris, ok := l.lockMap[resource]
if !ok {
continue
}
// Collect uids, so we don't mutate while we delete
uids := make([]string, 0, len(lris))
for _, lri := range lris {
uids = append(uids, lri.UID)
}
// Delete collected uids:
for _, uid := range uids {
lris, ok := l.lockMap[resource]
if !ok {
// Just to be safe, delete uuids.
for idx := 0; idx < math.MaxInt32; idx++ {
mapID := formatUUID(uid, idx)
if _, ok := l.lockUID[mapID]; !ok {
break
}
delete(l.lockUID, mapID)
}
continue
}
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
}
}
return true, nil
}
lockFound := false
for _, lris := range l.lockMap {
for _, lri := range lris {
if lri.UID == args.UID {
l.removeEntry(lri.Name, dsync.LockArgs{UID: lri.UID}, &lris)
lockFound = true
}
idx := 0
for {
resource, ok := l.lockUID[formatUUID(args.UID, idx)]
if !ok {
return idx > 0, nil
}
lris, ok := l.lockMap[resource]
if !ok {
// Unexpected inconsistency, delete.
delete(l.lockUID, formatUUID(args.UID, idx))
idx++
continue
}
reply = true
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris)
idx++
}
return lockFound, nil
}
}
@ -256,18 +308,31 @@ func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refresh
l.mutex.Lock()
defer l.mutex.Unlock()
lockFound := false
for _, lri := range l.lockMap {
// Check whether uid is still active
for i := range lri {
if lri[i].UID == args.UID {
lri[i].TimeLastRefresh = UTCNow()
lockFound = true
// Check whether uid is still active.
resource, ok := l.lockUID[formatUUID(args.UID, 0)]
if !ok {
return false, nil
}
idx := 0
for {
lris, ok := l.lockMap[resource]
if !ok {
// Inconsistent. Delete UID.
delete(l.lockUID, formatUUID(args.UID, idx))
return idx > 0, nil
}
for i := range lris {
if lris[i].UID == args.UID {
lris[i].TimeLastRefresh = UTCNow()
}
}
idx++
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
if !ok {
// No more resources for UID, but we did update at least one.
return true, nil
}
}
return lockFound, nil
}
}
@ -277,10 +342,24 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, lris := range l.lockMap {
for _, lri := range lris {
if time.Since(lri.TimeLastRefresh) > interval {
l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris)
for k := range l.lockMap {
found := false
// Since we mutate the value, remove one per loop.
for {
lris, ok := l.lockMap[k]
if !ok {
break
}
for _, lri := range lris {
if time.Since(lri.TimeLastRefresh) > interval {
l.removeEntry(lri.Name, dsync.LockArgs{Owner: lri.Owner, UID: lri.UID}, &lris)
found = true
break
}
}
// We did not find any more to expire.
if !found {
break
}
}
}
@ -288,6 +367,7 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
func newLocker() *localLocker {
return &localLocker{
lockMap: make(map[string][]lockRequesterInfo),
lockMap: make(map[string][]lockRequesterInfo, 1000),
lockUID: make(map[string]string, 1000),
}
}

View File

@ -34,7 +34,7 @@ const (
lockMaintenanceInterval = 1 * time.Minute
// Lock validity duration
lockValidityDuration = 20 * time.Second
lockValidityDuration = 1 * time.Minute
)
// To abstract a node over network.

View File

@ -44,13 +44,12 @@ func BenchmarkLockArgs(b *testing.B) {
b.Fatal(err)
}
req := &http.Request{
Body: ioutil.NopCloser(bytes.NewReader(argBytes)),
}
req := &http.Request{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
req.Body = ioutil.NopCloser(bytes.NewReader(argBytes))
getLockArgs(req)
}
}
@ -64,12 +63,12 @@ func BenchmarkLockArgsOld(b *testing.B) {
req := &http.Request{
Form: values,
Body: ioutil.NopCloser(bytes.NewReader([]byte(`obj.txt`))),
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
req.Body = ioutil.NopCloser(bytes.NewReader([]byte(`obj.txt`)))
getLockArgsOld(req)
}
}