minio/cmd/metacache-manager.go
Harshavardhana b3c56b53fb
fix: metacache should only rename entries during cleanup (#11503)
To avoid large delays in metacache cleanup, use rename
instead of recursive delete calls, renames are cheaper
move the content to minioMetaTmpBucket and then cleanup
this folder once in 24hrs instead.

If the new cache can replace an existing one, we should
let it replace since that is currently being saved anyways,
this avoids pile up of 1000's of metacache entires for
same listing calls that are not necessary to be stored
on disk.
2021-02-11 10:22:03 -08:00

271 lines
6.6 KiB
Go

/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
)
// localMetacacheMgr is the *local* manager for this peer.
// It should never be used directly since buckets are
// distributed deterministically.
// Therefore no cluster locks are required.
var localMetacacheMgr = &metacacheManager{
buckets: make(map[string]*bucketMetacache),
trash: make(map[string]metacache),
}
type metacacheManager struct {
mu sync.RWMutex
init sync.Once
buckets map[string]*bucketMetacache
trash map[string]metacache // Recently deleted lists.
}
const metacacheManagerTransientBucket = "**transient**"
const metacacheMaxEntries = 5000
// initManager will start async saving the cache.
func (m *metacacheManager) initManager() {
// Add a transient bucket.
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
tb.transient = true
m.buckets[metacacheManagerTransientBucket] = tb
// Start saver when object layer is ready.
go func() {
objAPI := newObjectLayerFn()
for objAPI == nil {
time.Sleep(time.Second)
objAPI = newObjectLayerFn()
}
if !globalIsErasure {
logger.Info("metacacheManager was initialized in non-erasure mode, skipping save")
return
}
t := time.NewTicker(time.Minute)
defer t.Stop()
var exit bool
bg := context.Background()
for !exit {
select {
case <-t.C:
case <-GlobalContext.Done():
exit = true
}
m.mu.RLock()
for _, v := range m.buckets {
if !exit {
v.cleanup()
}
logger.LogIf(bg, v.save(bg))
}
m.mu.RUnlock()
m.mu.Lock()
for k, v := range m.trash {
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
v.delete(context.Background())
delete(m.trash, k)
}
}
m.mu.Unlock()
}
}()
}
// findCache will get a metacache.
func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache {
if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) {
return m.getTransient().findCache(o)
}
m.mu.RLock()
b, ok := m.buckets[o.Bucket]
if ok {
m.mu.RUnlock()
return b.findCache(o)
}
if meta, ok := m.trash[o.ID]; ok {
m.mu.RUnlock()
return meta
}
m.mu.RUnlock()
return m.getBucket(ctx, o.Bucket).findCache(o)
}
// updateCacheEntry will update non-transient state.
func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) {
m.mu.RLock()
if meta, ok := m.trash[update.id]; ok {
m.mu.RUnlock()
return meta, nil
}
b, ok := m.buckets[update.bucket]
m.mu.RUnlock()
if ok {
return b.updateCacheEntry(update)
}
// We should have either a trashed bucket or this
return metacache{}, errVolumeNotFound
}
// getBucket will get a bucket metacache or load it from disk if needed.
func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache {
m.init.Do(m.initManager)
// Return a transient bucket for invalid or system buckets.
if isReservedOrInvalidBucket(bucket, false) {
return m.getTransient()
}
m.mu.RLock()
b, ok := m.buckets[bucket]
m.mu.RUnlock()
if ok {
if b.bucket != bucket {
logger.Info("getBucket: cached bucket %s does not match this bucket %s", b.bucket, bucket)
debug.PrintStack()
}
return b
}
m.mu.Lock()
// See if someone else fetched it while we waited for the lock.
b, ok = m.buckets[bucket]
if ok {
m.mu.Unlock()
if b.bucket != bucket {
logger.Info("getBucket: newly cached bucket %s does not match this bucket %s", b.bucket, bucket)
debug.PrintStack()
}
return b
}
// Load bucket. If we fail return the transient bucket.
b, err := loadBucketMetaCache(ctx, bucket)
if err != nil {
m.mu.Unlock()
logger.LogIf(ctx, err)
return m.getTransient()
}
if b.bucket != bucket {
logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket))
}
m.buckets[bucket] = b
m.mu.Unlock()
return b
}
// deleteBucketCache will delete the bucket cache if it exists.
func (m *metacacheManager) deleteBucketCache(bucket string) {
m.init.Do(m.initManager)
m.mu.Lock()
b, ok := m.buckets[bucket]
if !ok {
m.mu.Unlock()
return
}
delete(m.buckets, bucket)
m.mu.Unlock()
// Since deletes may take some time we try to do it without
// holding lock to m all the time.
b.mu.Lock()
defer b.mu.Unlock()
for k, v := range b.caches {
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
v.delete(context.Background())
continue
}
v.error = "Bucket deleted"
v.status = scanStateError
m.mu.Lock()
m.trash[k] = v
m.mu.Unlock()
}
}
// deleteAll will delete all caches.
func (m *metacacheManager) deleteAll() {
m.init.Do(m.initManager)
m.mu.Lock()
defer m.mu.Unlock()
for bucket, b := range m.buckets {
b.deleteAll()
if !b.transient {
delete(m.buckets, bucket)
}
}
}
// getTransient will return a transient bucket.
func (m *metacacheManager) getTransient() *bucketMetacache {
m.init.Do(m.initManager)
m.mu.RLock()
bmc := m.buckets[metacacheManagerTransientBucket]
m.mu.RUnlock()
return bmc
}
// checkMetacacheState should be used if data is not updating.
// Should only be called if a failure occurred.
func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error {
// We operate on a copy...
o.Create = false
var cache metacache
if rpc == nil || o.Transient {
cache = localMetacacheMgr.findCache(ctx, o)
} else {
c, err := rpc.GetMetacacheListing(ctx, o)
if err != nil {
return err
}
cache = *c
}
if cache.status == scanStateNone || cache.fileNotFound {
return errFileNotFound
}
if cache.status == scanStateSuccess || cache.status == scanStateStarted {
if time.Since(cache.lastUpdate) > metacacheMaxRunningAge {
// We got a stale entry, mark error on handling server.
err := fmt.Errorf("timeout: list %s not updated", cache.id)
cache.error = err.Error()
cache.status = scanStateError
if rpc == nil || o.Transient {
localMetacacheMgr.updateCacheEntry(cache)
} else {
rpc.UpdateMetacacheListing(ctx, cache)
}
return err
}
return nil
}
if cache.error != "" {
return fmt.Errorf("async cache listing failed with: %s", cache.error)
}
return nil
}