diff --git a/cmd/metacache-bucket.go b/cmd/metacache-bucket.go index 7ea9b6e67..26ff7b3b6 100644 --- a/cmd/metacache-bucket.go +++ b/cmd/metacache-bucket.go @@ -204,6 +204,7 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { // Check if exists already. if c, ok := b.caches[o.ID]; ok { + debugPrint("returning existing %v", o.ID) return c } @@ -279,6 +280,7 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { best = o.newMetacache() b.caches[o.ID] = best b.updated = true + debugPrint("returning new cache %s, bucket: %v", best.id, best.bucket) return best } @@ -442,17 +444,6 @@ func (b *bucketMetacache) deleteCache(id string) { } b.mu.Unlock() if ok { - ctx := context.Background() - objAPI := newObjectLayerFn() - if objAPI == nil { - logger.LogIf(ctx, errors.New("bucketMetacache: no object layer")) - return - } - ez, ok := objAPI.(*erasureServerSets) - if !ok { - logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureServerSets")) - return - } - ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(c.bucket, c.id)) + c.delete(context.Background()) } } diff --git a/cmd/metacache-manager.go b/cmd/metacache-manager.go index cffc56cb5..a3061f128 100644 --- a/cmd/metacache-manager.go +++ b/cmd/metacache-manager.go @@ -32,12 +32,14 @@ import ( // Therefore no cluster locks are required. var localMetacacheMgr = &metacacheManager{ buckets: make(map[string]*bucketMetacache), + trash: make(map[string]metacache), } type metacacheManager struct { mu sync.RWMutex init sync.Once buckets map[string]*bucketMetacache + trash map[string]metacache // Recently deleted lists. } const metacacheManagerTransientBucket = "**transient**" @@ -78,11 +80,56 @@ func (m *metacacheManager) initManager() { logger.LogIf(bg, v.save(bg)) } m.mu.RUnlock() + m.mu.Lock() + for k, v := range m.trash { + if time.Since(v.lastUpdate) > metacacheMaxRunningAge { + v.delete(context.Background()) + delete(m.trash, k) + } + } + m.mu.Unlock() } m.getTransient().deleteAll() }() } +// findCache will get a metacache. +func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache { + if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) { + return m.getTransient().findCache(o) + } + m.mu.RLock() + b, ok := m.buckets[o.Bucket] + if ok { + m.mu.RUnlock() + return b.findCache(o) + } + if meta, ok := m.trash[o.ID]; ok { + m.mu.RUnlock() + return meta + } + m.mu.RUnlock() + return m.getBucket(ctx, o.Bucket).findCache(o) +} + +// updateCacheEntry will update non-transient state. +func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) { + m.mu.RLock() + if meta, ok := m.trash[update.id]; ok { + m.mu.RUnlock() + return meta, nil + } + + b, ok := m.buckets[update.bucket] + if ok { + m.mu.RUnlock() + return b.updateCacheEntry(update) + } + m.mu.RUnlock() + // We should have either a trashed bucket or this + return metacache{}, errVolumeNotFound +} + // getBucket will get a bucket metacache or load it from disk if needed. func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache { m.init.Do(m.initManager) @@ -118,6 +165,7 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket b, err := loadBucketMetaCache(ctx, bucket) if err != nil { m.mu.Unlock() + logger.LogIf(ctx, err) return m.getTransient() } if b.bucket != bucket { @@ -130,22 +178,43 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket // deleteBucketCache will delete the bucket cache if it exists. func (m *metacacheManager) deleteBucketCache(bucket string) { + m.init.Do(m.initManager) m.mu.Lock() - defer m.mu.Unlock() b, ok := m.buckets[bucket] if !ok { + m.mu.Unlock() return } - b.deleteAll() delete(m.buckets, bucket) + m.mu.Unlock() + + // Since deletes may take some time we try to do it without + // holding lock to m all the time. + b.mu.Lock() + defer b.mu.Unlock() + for k, v := range b.caches { + if time.Since(v.lastUpdate) > metacacheMaxRunningAge { + v.delete(context.Background()) + continue + } + v.error = "Bucket deleted" + v.status = scanStateError + m.mu.Lock() + m.trash[k] = v + m.mu.Unlock() + } } // deleteAll will delete all caches. func (m *metacacheManager) deleteAll() { + m.init.Do(m.initManager) m.mu.Lock() defer m.mu.Unlock() - for _, b := range m.buckets { + for bucket, b := range m.buckets { b.deleteAll() + if !b.transient { + delete(m.buckets, bucket) + } } } @@ -165,9 +234,9 @@ func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTC o.Create = false var cache metacache if !o.Transient { - if rpc == nil { + if rpc == nil || o.Transient { // Local - cache = localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(o) + cache = localMetacacheMgr.findCache(ctx, o) } else { c, err := rpc.GetMetacacheListing(ctx, o) if err != nil { @@ -175,8 +244,6 @@ func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTC } cache = *c } - } else { - cache = localMetacacheMgr.getTransient().findCache(o) } if cache.status == scanStateNone || cache.fileNotFound { diff --git a/cmd/metacache-server-sets.go b/cmd/metacache-server-sets.go index 194040e8d..468ba49b0 100644 --- a/cmd/metacache-server-sets.go +++ b/cmd/metacache-server-sets.go @@ -91,9 +91,13 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir)) var cache metacache rpc := globalNotificationSys.restClientFromHash(o.Bucket) - if rpc == nil { + if isReservedOrInvalidBucket(o.Bucket, false) { + rpc = nil + o.Transient = true + } + if rpc == nil || o.Transient { // Local - cache = localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(o) + cache = localMetacacheMgr.findCache(ctx, o) } else { c, err := rpc.GetMetacacheListing(ctx, o) if err != nil { @@ -103,8 +107,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en return entries, io.EOF } logger.LogIf(ctx, err) - cache = localMetacacheMgr.getTransient().findCache(o) o.Transient = true + cache = localMetacacheMgr.findCache(ctx, o) } else { cache = *c } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index d0142366b..20fa6b090 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -255,7 +255,7 @@ func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClien return localMetacacheMgr.getTransient().updateCacheEntry(m) } if rpc == nil { - return localMetacacheMgr.getBucket(GlobalContext, o.Bucket).updateCacheEntry(m) + return localMetacacheMgr.updateCacheEntry(m) } return rpc.UpdateMetacacheListing(context.Background(), m) } diff --git a/cmd/metacache.go b/cmd/metacache.go index 204e13e0b..d63d85012 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -17,9 +17,14 @@ package cmd import ( + "context" + "errors" + "fmt" "path" "strings" "time" + + "github.com/minio/minio/cmd/logger" ) type scanStatus uint8 @@ -132,3 +137,21 @@ func baseDirFromPrefix(prefix string) string { } return b } + +// delete all cache data on disks. +func (m *metacache) delete(ctx context.Context) { + if m.bucket == "" || m.id == "" { + logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id)) + } + objAPI := newObjectLayerFn() + if objAPI == nil { + logger.LogIf(ctx, errors.New("metacache.delete: no object layer")) + return + } + ez, ok := objAPI.(*erasureServerSets) + if !ok { + logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerSets")) + return + } + ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id)) +} diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 2fa872beb..5f756ef34 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -634,13 +634,7 @@ func (s *peerRESTServer) UpdateMetacacheListingHandler(w http.ResponseWriter, r s.writeErrorResponse(w, err) return } - b := localMetacacheMgr.getBucket(ctx, req.bucket) - if b == nil { - s.writeErrorResponse(w, errServerNotInitialized) - return - } - - cache, err := b.updateCacheEntry(req) + cache, err := localMetacacheMgr.updateCacheEntry(req) if err != nil { s.writeErrorResponse(w, err) return