fix: improper ticker usage in goroutines (#11468)

- lock maintenance loop was incorrectly sleeping
  as well as using ticker badly, leading to
  extra expiration routines getting triggered
  that could flood the network.

- multipart upload cleanup should be based on
  timer instead of ticker, to ensure that long
  running jobs don't get triggered twice.

- make sure to get right lockers for object name
This commit is contained in:
Harshavardhana 2021-02-05 19:23:48 -08:00 committed by GitHub
parent 1fdafaf72f
commit 88c1bb0720
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 36 deletions

View file

@ -423,14 +423,17 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
} }
func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
ticker := time.NewTicker(cleanupInterval) timer := time.NewTimer(cleanupInterval)
defer ticker.Stop() defer timer.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-timer.C:
// Reset for the next interval
timer.Reset(cleanupInterval)
for _, set := range s.sets { for _, set := range s.sets {
set.cleanupStaleUploads(ctx, expiry) set.cleanupStaleUploads(ctx, expiry)
} }

View file

@ -849,14 +849,17 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
// on all buckets for every `cleanupInterval`, this function is // on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine. // blocking and should be run in a go-routine.
func (fs *FSObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { func (fs *FSObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
ticker := time.NewTicker(cleanupInterval) timer := time.NewTimer(cleanupInterval)
defer ticker.Stop() defer timer.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-timer.C:
// Reset for the next interval
timer.Reset(cleanupInterval)
now := time.Now() now := time.Now()
entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))
if err != nil { if err != nil {

View file

@ -249,7 +249,10 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired
// Lock found, proceed to verify if belongs to given uid. // Lock found, proceed to verify if belongs to given uid.
lri, ok := l.lockMap[resource] lri, ok := l.lockMap[resource]
if !ok { if !ok {
return true, nil // lock doesn't exist yet not reason to
// expire that doesn't exist yet - it may be
// racing with other active lock requests.
return false, nil
} }
// Check whether uid is still active // Check whether uid is still active
@ -258,7 +261,7 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired
ep := globalRemoteEndpoints[args.Owner] ep := globalRemoteEndpoints[args.Owner]
if !ep.IsLocal { if !ep.IsLocal {
// check if the owner is online // check if the owner is online
return isServerResolvable(ep, 1*time.Second) != nil, nil return isServerResolvable(ep, 3*time.Second) != nil, nil
} }
return false, nil return false, nil
} }

View file

@ -33,7 +33,7 @@ import (
const ( const (
// Lock maintenance interval. // Lock maintenance interval.
lockMaintenanceInterval = 10 * time.Second lockMaintenanceInterval = 15 * time.Second
// Lock validity check interval. // Lock validity check interval.
lockValidityCheckInterval = 30 * time.Second lockValidityCheckInterval = 30 * time.Second
@ -254,15 +254,15 @@ func getLongLivedLocks(interval time.Duration) []lockRequesterInfo {
// - some network error (and server is up normally) // - some network error (and server is up normally)
// //
// We will ignore the error, and we will retry later to get a resolve on this lock // We will ignore the error, and we will retry later to get a resolve on this lock
func lockMaintenance(ctx context.Context, interval time.Duration) error { func lockMaintenance(ctx context.Context, interval time.Duration) {
objAPI := newObjectLayerFn() objAPI := newObjectLayerFn()
if objAPI == nil { if objAPI == nil {
return nil return
} }
z, ok := objAPI.(*erasureServerPools) z, ok := objAPI.(*erasureServerPools)
if !ok { if !ok {
return nil return
} }
type nlock struct { type nlock struct {
@ -295,14 +295,15 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
if lrip.Group { if lrip.Group {
lockers, _ = z.serverPools[0].getHashedSet("").getLockers() lockers, _ = z.serverPools[0].getHashedSet("").getLockers()
} else { } else {
lockers, _ = z.serverPools[0].getHashedSet(lrip.Name).getLockers() _, objName := path2BucketObject(lrip.Name)
lockers, _ = z.serverPools[0].getHashedSet(objName).getLockers()
} }
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(lockers)) wg.Add(len(lockers))
for _, c := range lockers { for _, c := range lockers {
go func(lrip lockRequesterInfo, c dsync.NetLocker) { go func(lrip lockRequesterInfo, c dsync.NetLocker) {
defer wg.Done() defer wg.Done()
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// Call back to all participating servers, verify // Call back to all participating servers, verify
// if each of those servers think lock is still // if each of those servers think lock is still
@ -336,8 +337,6 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
globalLockServer.removeEntryIfExists(lrip) globalLockServer.removeEntryIfExists(lrip)
} }
} }
return nil
} }
// Start lock maintenance from all lock servers. // Start lock maintenance from all lock servers.
@ -354,27 +353,28 @@ func startLockMaintenance(ctx context.Context) {
break break
} }
// Initialize a new ticker with a minute between each ticks. // Initialize a new ticker with 15secs between each ticks.
ticker := time.NewTicker(lockMaintenanceInterval) lkTimer := time.NewTimer(lockMaintenanceInterval)
// Stop the timer upon service closure and cleanup the go-routine. // Stop the timer upon returning.
defer ticker.Stop() defer lkTimer.Stop()
r := rand.New(rand.NewSource(UTCNow().UnixNano())) r := rand.New(rand.NewSource(UTCNow().UnixNano()))
// Start with random sleep time, so as to avoid
// "synchronous checks" between servers
duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval))
time.Sleep(duration)
for { for {
// Verifies every minute for locks held more than 2 minutes. // Verifies every minute for locks held more than 2 minutes.
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-lkTimer.C:
// Start with random sleep time, so as to avoid // Reset the timer for next cycle.
// "synchronous checks" between servers lkTimer.Reset(time.Duration(r.Float64() * float64(lockMaintenanceInterval)))
duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval))
time.Sleep(duration) lockMaintenance(ctx, lockValidityCheckInterval)
if err := lockMaintenance(ctx, lockValidityCheckInterval); err != nil {
// Sleep right after an error.
duration := time.Duration(r.Float64() * float64(lockMaintenanceInterval))
time.Sleep(duration)
}
} }
} }
} }

View file

@ -64,6 +64,8 @@ func (m *metacacheManager) initManager() {
} }
t := time.NewTicker(time.Minute) t := time.NewTicker(time.Minute)
defer t.Stop()
var exit bool var exit bool
bg := context.Background() bg := context.Background()
for !exit { for !exit {

View file

@ -92,8 +92,7 @@ func (dm *DRWMutex) Lock(id, source string) {
// Options lock options. // Options lock options.
type Options struct { type Options struct {
Timeout time.Duration Timeout time.Duration
Tolerance int
} }
// GetLock tries to get a write lock on dm before the timeout elapses. // GetLock tries to get a write lock on dm before the timeout elapses.
@ -155,10 +154,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL
defer cancel() defer cancel()
// Tolerance is not set, defaults to half of the locker clients. // Tolerance is not set, defaults to half of the locker clients.
tolerance := opts.Tolerance tolerance := len(restClnts) / 2
if tolerance == 0 {
tolerance = len(restClnts) / 2
}
// Quorum is effectively = total clients subtracted with tolerance limit // Quorum is effectively = total clients subtracted with tolerance limit
quorum := len(restClnts) - tolerance quorum := len(restClnts) - tolerance