feat: Implement listing version 3.0 (#12605)

Co-authored-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Klaus Post 2021-07-05 15:34:41 -07:00 committed by GitHub
parent bb92989359
commit 05aebc52c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 636 additions and 990 deletions

View file

@ -340,7 +340,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq
} }
} }
if serverDebugLog { if serverDebugLog && len(healDisks) > 0 {
console.Debugf(color.Green("healDisk:")+" disk check timer fired, attempting to heal %d drives\n", len(healDisks)) console.Debugf(color.Green("healDisk:")+" disk check timer fired, attempting to heal %d drives\n", len(healDisks))
} }

View file

@ -176,9 +176,6 @@ func (d *dataUpdateTracker) latestWithDir(dir string) uint64 {
return d.current() return d.current()
} }
if isReservedOrInvalidBucket(bucket, false) { if isReservedOrInvalidBucket(bucket, false) {
if d.debug {
console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, dir)
}
return d.current() return d.current()
} }
@ -486,9 +483,6 @@ func (d *dataUpdateTracker) startCollector(ctx context.Context) {
} }
if isReservedOrInvalidBucket(bucket, false) { if isReservedOrInvalidBucket(bucket, false) {
if d.debug {
console.Debugf(color.Green("dataUpdateTracker:")+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, in)
}
continue continue
} }
split := splitPathDeterministic(in) split := splitPathDeterministic(in)
@ -512,7 +506,6 @@ func (d *dataUpdateTracker) markDirty(bucket, prefix string) {
} }
if isReservedOrInvalidBucket(bucket, false) && d.debug { if isReservedOrInvalidBucket(bucket, false) && d.debug {
console.Debugf(dateUpdateTrackerLogPrefix+" isReservedOrInvalidBucket: %v, entry: %v\n", bucket, prefix)
return return
} }
split := splitPathDeterministic(pathJoin(bucket, prefix)) split := splitPathDeterministic(pathJoin(bucket, prefix))

View file

@ -26,7 +26,6 @@ import (
"net/http" "net/http"
"sort" "sort"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -948,24 +947,16 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
} }
// Shortcut for APN/1.0 Veeam/1.0 Backup/10.0 merged, err := z.listPath(ctx, &opts)
// It requests unique blocks with a specific prefix.
// We skip scanning the parent directory for
// more objects matching the prefix.
ri := logger.GetReqInfo(ctx)
if ri != nil && strings.Contains(ri.UserAgent, `1.0 Veeam/1.0 Backup`) && strings.HasSuffix(prefix, ".blk") {
opts.discardResult = true
opts.Transient = true
}
merged, err := z.listPath(ctx, opts)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return loi, err return loi, err
} }
if versionMarker == "" { if versionMarker == "" {
o := listPathOptions{Marker: marker}
// If we are not looking for a specific version skip it. // If we are not looking for a specific version skip it.
marker, _ = parseMarker(marker)
merged.forwardPast(marker) o.parseMarker()
merged.forwardPast(o.Marker)
} }
objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker) objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker)
loi.IsTruncated = err == nil && len(objects) > 0 loi.IsTruncated = err == nil && len(objects) > 0
@ -982,7 +973,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
} }
if loi.IsTruncated { if loi.IsTruncated {
last := objects[len(objects)-1] last := objects[len(objects)-1]
loi.NextMarker = encodeMarker(last.Name, merged.listID) loi.NextMarker = opts.encodeMarker(last.Name)
loi.NextVersionIDMarker = last.VersionID loi.NextVersionIDMarker = last.VersionID
} }
return loi, nil return loi, nil
@ -1000,8 +991,7 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int {
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var loi ListObjectsInfo var loi ListObjectsInfo
opts := listPathOptions{
merged, err := z.listPath(ctx, listPathOptions{
Bucket: bucket, Bucket: bucket,
Prefix: prefix, Prefix: prefix,
Separator: delimiter, Separator: delimiter,
@ -1009,13 +999,14 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
Marker: marker, Marker: marker,
InclDeleted: false, InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
}) }
merged, err := z.listPath(ctx, &opts)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return loi, err return loi, err
} }
marker, _ = parseMarker(marker)
merged.forwardPast(marker) merged.forwardPast(opts.Marker)
// Default is recursive, if delimiter is set then list non recursive. // Default is recursive, if delimiter is set then list non recursive.
objects := merged.fileInfos(bucket, prefix, delimiter) objects := merged.fileInfos(bucket, prefix, delimiter)
@ -1033,7 +1024,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
} }
if loi.IsTruncated { if loi.IsTruncated {
last := objects[len(objects)-1] last := objects[len(objects)-1]
loi.NextMarker = encodeMarker(last.Name, merged.listID) loi.NextMarker = opts.encodeMarker(last.Name)
} }
return loi, nil return loi, nil
} }

View file

@ -34,7 +34,6 @@ type apiConfig struct {
requestsPool chan struct{} requestsPool chan struct{}
clusterDeadline time.Duration clusterDeadline time.Duration
listQuorum int listQuorum int
extendListLife time.Duration
corsAllowOrigins []string corsAllowOrigins []string
// total drives per erasure set across pools. // total drives per erasure set across pools.
totalDriveCount int totalDriveCount int
@ -81,7 +80,6 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
} }
t.requestsDeadline = cfg.RequestsDeadline t.requestsDeadline = cfg.RequestsDeadline
t.listQuorum = cfg.GetListQuorum() t.listQuorum = cfg.GetListQuorum()
t.extendListLife = cfg.ExtendListLife
if globalReplicationPool != nil && if globalReplicationPool != nil &&
cfg.ReplicationWorkers != t.replicationWorkers { cfg.ReplicationWorkers != t.replicationWorkers {
globalReplicationPool.ResizeFailedWorkers(cfg.ReplicationFailedWorkers) globalReplicationPool.ResizeFailedWorkers(cfg.ReplicationFailedWorkers)
@ -98,13 +96,6 @@ func (t *apiConfig) getListQuorum() int {
return t.listQuorum return t.listQuorum
} }
func (t *apiConfig) getExtendListLife() time.Duration {
t.mu.RLock()
defer t.mu.RUnlock()
return t.extendListLife
}
func (t *apiConfig) getCorsAllowOrigins() []string { func (t *apiConfig) getCorsAllowOrigins() []string {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()

View file

@ -23,10 +23,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"path"
"runtime/debug" "runtime/debug"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
@ -51,9 +49,8 @@ type bucketMetacache struct {
cachesRoot map[string][]string `msg:"-"` cachesRoot map[string][]string `msg:"-"`
// Internal state // Internal state
mu sync.RWMutex `msg:"-"` mu sync.RWMutex `msg:"-"`
updated bool `msg:"-"` updated bool `msg:"-"`
transient bool `msg:"-"` // bucket used for non-persisted caches.
} }
// newBucketMetacache creates a new bucketMetacache. // newBucketMetacache creates a new bucketMetacache.
@ -146,9 +143,6 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
// save the bucket cache to the object storage. // save the bucket cache to the object storage.
func (b *bucketMetacache) save(ctx context.Context) error { func (b *bucketMetacache) save(ctx context.Context) error {
if b.transient {
return nil
}
objAPI := newObjectLayerFn() objAPI := newObjectLayerFn()
if objAPI == nil { if objAPI == nil {
return errServerNotInitialized return errServerNotInitialized
@ -195,76 +189,24 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
return metacache{} return metacache{}
} }
if o.Bucket != b.bucket && !b.transient { if o.Bucket != b.bucket {
logger.Info("bucketMetacache.findCache: bucket %s does not match this bucket %s", o.Bucket, b.bucket) logger.Info("bucketMetacache.findCache: bucket %s does not match this bucket %s", o.Bucket, b.bucket)
debug.PrintStack() debug.PrintStack()
return metacache{} return metacache{}
} }
extend := globalAPIConfig.getExtendListLife()
// Grab a write lock, since we create one if we cannot find one. // Grab a write lock, since we create one if we cannot find one.
if o.Create { b.mu.Lock()
b.mu.Lock() defer b.mu.Unlock()
defer b.mu.Unlock()
} else {
b.mu.RLock()
defer b.mu.RUnlock()
}
// Check if exists already. // Check if exists already.
if c, ok := b.caches[o.ID]; ok { if c, ok := b.caches[o.ID]; ok {
c.lastHandout = time.Now()
b.caches[o.ID] = c
b.debugf("returning existing %v", o.ID) b.debugf("returning existing %v", o.ID)
return c return c
} }
// No need to do expensive checks on transients.
if b.transient {
if !o.Create {
return metacache{
id: o.ID,
bucket: o.Bucket,
status: scanStateNone,
}
}
// Create new
best := o.newMetacache()
b.caches[o.ID] = best
b.updated = true
b.debugf("returning new cache %s, bucket: %v", best.id, best.bucket)
return best
}
var best metacache
rootSplit := strings.Split(o.BaseDir, slashSeparator)
for i := range rootSplit {
interesting := b.cachesRoot[path.Join(rootSplit[:i+1]...)]
for _, id := range interesting {
cached, ok := b.caches[id]
if !ok {
continue
}
if !cached.matches(&o, extend) {
continue
}
if cached.started.Before(best.started) {
b.debugf("cache %s disregarded - we have a better", cached.id)
// If we already have a newer, keep that.
continue
}
best = cached
}
}
if !best.started.IsZero() {
if o.Create {
best.lastHandout = UTCNow()
b.caches[best.id] = best
b.updated = true
}
b.debugf("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended)
return best
}
if !o.Create { if !o.Create {
return metacache{ return metacache{
id: o.ID, id: o.ID,
@ -274,7 +216,7 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
} }
// Create new and add. // Create new and add.
best = o.newMetacache() best := o.newMetacache()
b.caches[o.ID] = best b.caches[o.ID] = best
b.cachesRoot[best.root] = append(b.cachesRoot[best.root], best.id) b.cachesRoot[best.root] = append(b.cachesRoot[best.root], best.id)
b.updated = true b.updated = true
@ -286,19 +228,13 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
func (b *bucketMetacache) cleanup() { func (b *bucketMetacache) cleanup() {
// Entries to remove. // Entries to remove.
remove := make(map[string]struct{}) remove := make(map[string]struct{})
currentCycle := intDataUpdateTracker.current()
// Test on a copy // Test on a copy
// cleanup is the only one deleting caches. // cleanup is the only one deleting caches.
caches, rootIdx := b.cloneCaches() caches, _ := b.cloneCaches()
for id, cache := range caches { for id, cache := range caches {
if b.transient && time.Since(cache.lastUpdate) > 10*time.Minute && time.Since(cache.lastHandout) > 10*time.Minute { if !cache.worthKeeping() {
// Keep transient caches only for 15 minutes.
remove[id] = struct{}{}
continue
}
if !cache.worthKeeping(currentCycle) {
b.debugf("cache %s not worth keeping", id) b.debugf("cache %s not worth keeping", id)
remove[id] = struct{}{} remove[id] = struct{}{}
continue continue
@ -308,44 +244,13 @@ func (b *bucketMetacache) cleanup() {
remove[id] = struct{}{} remove[id] = struct{}{}
continue continue
} }
if cache.bucket != b.bucket && !b.transient { if cache.bucket != b.bucket {
logger.Info("cache bucket mismatch %s != %s", b.bucket, cache.bucket) logger.Info("cache bucket mismatch %s != %s", b.bucket, cache.bucket)
remove[id] = struct{}{} remove[id] = struct{}{}
continue continue
} }
} }
// Check all non-deleted against eachother.
// O(n*n), but should still be rather quick.
for id, cache := range caches {
if b.transient {
break
}
if _, ok := remove[id]; ok {
continue
}
interesting := interestingCaches(cache.root, rootIdx)
for _, id2 := range interesting {
if _, ok := remove[id2]; ok || id2 == id {
// Don't check against one we are already removing
continue
}
cache2, ok := caches[id2]
if !ok {
continue
}
if cache.canBeReplacedBy(&cache2) {
b.debugf("cache %s can be replaced by %s", id, cache2.id)
remove[id] = struct{}{}
break
} else {
b.debugf("cache %s can be NOT replaced by %s", id, cache2.id)
}
}
}
// If above limit, remove the caches with the oldest handout time. // If above limit, remove the caches with the oldest handout time.
if len(caches)-len(remove) > metacacheMaxEntries { if len(caches)-len(remove) > metacacheMaxEntries {
remainCaches := make([]metacache, 0, len(caches)-len(remove)) remainCaches := make([]metacache, 0, len(caches)-len(remove))
@ -374,18 +279,6 @@ func (b *bucketMetacache) cleanup() {
} }
} }
// Potentially interesting caches.
// Will only add root if request is for root.
func interestingCaches(root string, cachesRoot map[string][]string) []string {
var interesting []string
rootSplit := strings.Split(root, slashSeparator)
for i := range rootSplit {
want := path.Join(rootSplit[:i+1]...)
interesting = append(interesting, cachesRoot[want]...)
}
return interesting
}
// updateCacheEntry will update a cache. // updateCacheEntry will update a cache.
// Returns the updated status. // Returns the updated status.
func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error) { func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error) {
@ -434,25 +327,10 @@ func (b *bucketMetacache) deleteAll() {
defer b.mu.Unlock() defer b.mu.Unlock()
b.updated = true b.updated = true
if !b.transient { // Delete all.
// Delete all. ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
b.caches = make(map[string]metacache, 10)
b.cachesRoot = make(map[string][]string, 10)
return
}
// Transient are in different buckets.
var wg sync.WaitGroup
for id := range b.caches {
wg.Add(1)
go func(cache metacache) {
defer wg.Done()
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
}(b.caches[id])
}
wg.Wait()
b.caches = make(map[string]metacache, 10) b.caches = make(map[string]metacache, 10)
b.cachesRoot = make(map[string][]string, 10)
} }
// deleteCache will delete a specific cache and all files related to it across the cluster. // deleteCache will delete a specific cache and all files related to it across the cluster.

View file

@ -46,8 +46,6 @@ func Benchmark_bucketMetacache_findCache(b *testing.B) {
Recursive: false, Recursive: false,
Separator: slashSeparator, Separator: slashSeparator,
Create: true, Create: true,
CurrentCycle: uint64(i),
OldestCycle: uint64(i - 1),
}) })
} }
b.ReportAllocs() b.ReportAllocs()
@ -65,8 +63,6 @@ func Benchmark_bucketMetacache_findCache(b *testing.B) {
Recursive: false, Recursive: false,
Separator: slashSeparator, Separator: slashSeparator,
Create: true, Create: true,
CurrentCycle: uint64(i % elements),
OldestCycle: uint64(0),
}) })
} }
} }

View file

@ -19,9 +19,12 @@ package cmd
import ( import (
"bytes" "bytes"
"context"
"os" "os"
"sort" "sort"
"strings" "strings"
"github.com/minio/pkg/console"
) )
// metaCacheEntry is an object or a directory within an unknown bucket. // metaCacheEntry is an object or a directory within an unknown bucket.
@ -38,7 +41,7 @@ type metaCacheEntry struct {
// isDir returns if the entry is representing a prefix directory. // isDir returns if the entry is representing a prefix directory.
func (e metaCacheEntry) isDir() bool { func (e metaCacheEntry) isDir() bool {
return len(e.metadata) == 0 return len(e.metadata) == 0 && strings.HasSuffix(e.name, slashSeparator)
} }
// isObject returns if the entry is representing an object. // isObject returns if the entry is representing an object.
@ -51,15 +54,6 @@ func (e metaCacheEntry) hasPrefix(s string) bool {
return strings.HasPrefix(e.name, s) return strings.HasPrefix(e.name, s)
} }
// likelyMatches returns if the entries match by comparing name and metadata length.
func (e *metaCacheEntry) likelyMatches(other *metaCacheEntry) bool {
// This should reject 99%
if len(e.metadata) != len(other.metadata) || e.name != other.name {
return false
}
return true
}
// matches returns if the entries match by comparing their latest version fileinfo. // matches returns if the entries match by comparing their latest version fileinfo.
func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool { func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool {
if e == nil && other == nil { if e == nil && other == nil {
@ -510,6 +504,111 @@ func (m *metaCacheEntriesSorted) forwardPast(s string) {
m.o = m.o[idx:] m.o = m.o[idx:]
} }
// mergeEntryChannels will merge entries from in and return them sorted on out.
// To signify no more results are on an input channel, close it.
// The output channel will be closed when all inputs are emptied.
// If file names are equal, compareMeta is called to select which one to choose.
// The entry not chosen will be discarded.
// If the context is canceled the function will return the error,
// otherwise the function will return nil.
func mergeEntryChannels(ctx context.Context, in []chan metaCacheEntry, out chan<- metaCacheEntry, compareMeta func(existing, other *metaCacheEntry) (replace bool)) error {
defer close(out)
top := make([]*metaCacheEntry, len(in))
nDone := 0
ctxDone := ctx.Done()
// Use simpler forwarder.
if len(in) == 1 {
for {
select {
case <-ctxDone:
return ctx.Err()
case v, ok := <-in[0]:
if !ok {
return nil
}
select {
case <-ctxDone:
return ctx.Err()
case out <- v:
}
}
}
}
selectFrom := func(idx int) error {
select {
case <-ctxDone:
return ctx.Err()
case entry, ok := <-in[idx]:
if !ok {
top[idx] = nil
nDone++
} else {
top[idx] = &entry
}
}
return nil
}
// Populate all...
for i := range in {
if err := selectFrom(i); err != nil {
return err
}
}
last := ""
// Choose the best to return.
for {
if nDone == len(in) {
return nil
}
best := top[0]
bestIdx := 0
for i, other := range top[1:] {
otherIdx := i + 1
if other == nil {
continue
}
if best == nil {
best = other
bestIdx = otherIdx
continue
}
if best.name == other.name {
if compareMeta(best, other) {
// Replace "best"
if err := selectFrom(bestIdx); err != nil {
return err
}
best = other
bestIdx = otherIdx
} else {
// Keep best, replace "other"
if err := selectFrom(otherIdx); err != nil {
return err
}
}
continue
}
if best.name > other.name {
best = other
bestIdx = otherIdx
}
}
if best.name > last {
out <- *best
last = best.name
} else {
console.Debugln("mergeEntryChannels: discarding duplicate", best.name, "<=", last)
}
// Replace entry we just sent.
if err := selectFrom(bestIdx); err != nil {
return err
}
}
}
// merge will merge other into m. // merge will merge other into m.
// If the same entries exists in both and metadata matches only one is added, // If the same entries exists in both and metadata matches only one is added,
// otherwise the entry from m will be placed first. // otherwise the entry from m will be placed first.
@ -633,44 +732,3 @@ func (m *metaCacheEntriesSorted) entries() metaCacheEntries {
} }
return m.o return m.o
} }
// deduplicate entries in the list.
// If compareMeta is set it will be used to resolve conflicts.
// The function should return whether the existing entry should be replaced with other.
// If no compareMeta is provided duplicates may be left.
// This is indicated by the returned boolean.
func (m *metaCacheEntriesSorted) deduplicate(compareMeta func(existing, other *metaCacheEntry) (replace bool)) (dupesLeft bool) {
dst := m.o[:0]
for j := range m.o {
found := false
obj := &m.o[j]
for i := len(dst) - 1; i >= 0; i++ {
existing := &dst[i]
if existing.name != obj.name {
break
}
// Use given resolution function first if any.
if compareMeta != nil {
if compareMeta(existing, obj) {
dst[i] = *obj
}
found = true
break
}
if obj.likelyMatches(existing) {
found = true
break
}
// Matches, move on.
dupesLeft = true
continue
}
if !found {
dst = append(dst, *obj)
}
}
m.o = dst
return dupesLeft
}

View file

@ -18,7 +18,6 @@
package cmd package cmd
import ( import (
"bytes"
"reflect" "reflect"
"sort" "sort"
"testing" "testing"
@ -96,51 +95,6 @@ func Test_metaCacheEntries_merge(t *testing.T) {
} }
} }
func Test_metaCacheEntries_dedupe(t *testing.T) {
org := loadMetacacheSampleEntries(t)
a, b := org.shallowClone(), org.shallowClone()
// Merge b into a
a.merge(b, -1)
if a.deduplicate(nil) {
t.Fatal("deduplicate returned duplicate entries left")
}
want := loadMetacacheSampleNames
got := a.entries().names()
if !reflect.DeepEqual(want, got) {
t.Errorf("got unexpected result: %#v", got)
}
}
func Test_metaCacheEntries_dedupe2(t *testing.T) {
org := loadMetacacheSampleEntries(t)
a, b := org.shallowClone(), org.shallowClone()
// Replace metadata in b
testMarker := []byte("sampleset")
for i := range b.o {
b.o[i].metadata = testMarker
}
// Merge b into a
a.merge(b, -1)
if a.deduplicate(func(existing, other *metaCacheEntry) (replace bool) {
a := bytes.Equal(existing.metadata, testMarker)
b := bytes.Equal(other.metadata, testMarker)
if a == b {
t.Fatal("got same number of testmarkers, only one should be given", a, b)
}
return b
}) {
t.Fatal("deduplicate returned duplicate entries left, we should always resolve")
}
want := loadMetacacheSampleNames
got := a.entries().names()
if !reflect.DeepEqual(want, got) {
t.Errorf("got unexpected result: %#v", got)
}
}
func Test_metaCacheEntries_filterObjects(t *testing.T) { func Test_metaCacheEntries_filterObjects(t *testing.T) {
data := loadMetacacheSampleEntries(t) data := loadMetacacheSampleEntries(t)
data.filterObjectsOnly() data.filterObjectsOnly()

View file

@ -43,16 +43,11 @@ type metacacheManager struct {
trash map[string]metacache // Recently deleted lists. trash map[string]metacache // Recently deleted lists.
} }
const metacacheManagerTransientBucket = "**transient**"
const metacacheMaxEntries = 5000 const metacacheMaxEntries = 5000
// initManager will start async saving the cache. // initManager will start async saving the cache.
func (m *metacacheManager) initManager() { func (m *metacacheManager) initManager() {
// Add a transient bucket. // Add a transient bucket.
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
tb.transient = true
m.buckets[metacacheManagerTransientBucket] = tb
// Start saver when object layer is ready. // Start saver when object layer is ready.
go func() { go func() {
objAPI := newObjectLayerFn() objAPI := newObjectLayerFn()
@ -96,25 +91,6 @@ func (m *metacacheManager) initManager() {
}() }()
} }
// findCache will get a metacache.
func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache {
if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) {
return m.getTransient().findCache(o)
}
m.mu.RLock()
b, ok := m.buckets[o.Bucket]
if ok {
m.mu.RUnlock()
return b.findCache(o)
}
if meta, ok := m.trash[o.ID]; ok {
m.mu.RUnlock()
return meta
}
m.mu.RUnlock()
return m.getBucket(ctx, o.Bucket).findCache(o)
}
// updateCacheEntry will update non-transient state. // updateCacheEntry will update non-transient state.
func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) { func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) {
m.mu.RLock() m.mu.RLock()
@ -138,9 +114,6 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
m.init.Do(m.initManager) m.init.Do(m.initManager)
// Return a transient bucket for invalid or system buckets. // Return a transient bucket for invalid or system buckets.
if isReservedOrInvalidBucket(bucket, false) {
return m.getTransient()
}
m.mu.RLock() m.mu.RLock()
b, ok := m.buckets[bucket] b, ok := m.buckets[bucket]
m.mu.RUnlock() m.mu.RUnlock()
@ -167,9 +140,7 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
// Load bucket. If we fail return the transient bucket. // Load bucket. If we fail return the transient bucket.
b, err := loadBucketMetaCache(ctx, bucket) b, err := loadBucketMetaCache(ctx, bucket)
if err != nil { if err != nil {
m.mu.Unlock()
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return m.getTransient()
} }
if b.bucket != bucket { if b.bucket != bucket {
logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket)) logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket))
@ -215,36 +186,20 @@ func (m *metacacheManager) deleteAll() {
defer m.mu.Unlock() defer m.mu.Unlock()
for bucket, b := range m.buckets { for bucket, b := range m.buckets {
b.deleteAll() b.deleteAll()
if !b.transient { delete(m.buckets, bucket)
delete(m.buckets, bucket)
}
} }
} }
// getTransient will return a transient bucket.
func (m *metacacheManager) getTransient() *bucketMetacache {
m.init.Do(m.initManager)
m.mu.RLock()
bmc := m.buckets[metacacheManagerTransientBucket]
m.mu.RUnlock()
return bmc
}
// checkMetacacheState should be used if data is not updating. // checkMetacacheState should be used if data is not updating.
// Should only be called if a failure occurred. // Should only be called if a failure occurred.
func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error { func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error {
// We operate on a copy... // We operate on a copy...
o.Create = false o.Create = false
var cache metacache c, err := rpc.GetMetacacheListing(ctx, o)
if rpc == nil || o.Transient { if err != nil {
cache = localMetacacheMgr.findCache(ctx, o) return err
} else {
c, err := rpc.GetMetacacheListing(ctx, o)
if err != nil {
return err
}
cache = *c
} }
cache := *c
if cache.status == scanStateNone || cache.fileNotFound { if cache.status == scanStateNone || cache.fileNotFound {
return errFileNotFound return errFileNotFound
@ -255,11 +210,7 @@ func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTC
err := fmt.Errorf("timeout: list %s not updated", cache.id) err := fmt.Errorf("timeout: list %s not updated", cache.id)
cache.error = err.Error() cache.error = err.Error()
cache.status = scanStateError cache.status = scanStateError
if rpc == nil || o.Transient { rpc.UpdateMetacacheListing(ctx, cache)
localMetacacheMgr.updateCacheEntry(cache)
} else {
rpc.UpdateMetacacheListing(ctx, cache)
}
return err return err
} }
return nil return nil

View file

@ -20,6 +20,7 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"strings" "strings"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
@ -27,15 +28,16 @@ import (
// markerTagVersion is the marker version. // markerTagVersion is the marker version.
// Should not need to be updated unless a fundamental change is made to the marker format. // Should not need to be updated unless a fundamental change is made to the marker format.
const markerTagVersion = "v1" const markerTagVersion = "v2"
// parseMarker will parse a marker possibly encoded with encodeMarker // parseMarker will parse a marker possibly encoded with encodeMarker
func parseMarker(s string) (marker, uuid string) { func (o *listPathOptions) parseMarker() {
s := o.Marker
if !strings.Contains(s, "[minio_cache:"+markerTagVersion) { if !strings.Contains(s, "[minio_cache:"+markerTagVersion) {
return s, "" return
} }
start := strings.LastIndex(s, "[") start := strings.LastIndex(s, "[")
marker = s[:start] o.Marker = s[:start]
end := strings.LastIndex(s, "]") end := strings.LastIndex(s, "]")
tag := strings.Trim(s[start:end], "[]") tag := strings.Trim(s[start:end], "[]")
tags := strings.Split(tag, ",") tags := strings.Split(tag, ",")
@ -50,22 +52,41 @@ func parseMarker(s string) (marker, uuid string) {
break break
} }
case "id": case "id":
uuid = kv[1] o.ID = kv[1]
case "return":
o.ID = mustGetUUID()
o.Create = true
case "p": // pool
v, err := strconv.ParseInt(kv[1], 10, 64)
if err != nil {
o.ID = mustGetUUID()
o.Create = true
continue
}
o.pool = int(v)
case "s": // set
v, err := strconv.ParseInt(kv[1], 10, 64)
if err != nil {
o.ID = mustGetUUID()
o.Create = true
continue
}
o.set = int(v)
default: default:
// Ignore unknown // Ignore unknown
} }
} }
return
} }
// encodeMarker will encode a uuid and return it as a marker. // encodeMarker will encode a uuid and return it as a marker.
// uuid cannot contain '[', ':' or ','. // uuid cannot contain '[', ':' or ','.
func encodeMarker(marker, uuid string) string { func (o listPathOptions) encodeMarker(marker string) string {
if uuid == "" { if o.ID == "" {
return marker // Mark as returning listing...
return fmt.Sprintf("%s[minio_cache:%s,return:]", marker, markerTagVersion)
} }
if strings.ContainsAny(uuid, "[:,") { if strings.ContainsAny(o.ID, "[:,") {
logger.LogIf(context.Background(), fmt.Errorf("encodeMarker: uuid %s contained invalid characters", uuid)) logger.LogIf(context.Background(), fmt.Errorf("encodeMarker: uuid %s contained invalid characters", o.ID))
} }
return fmt.Sprintf("%s[minio_cache:%s,id:%s]", marker, markerTagVersion, uuid) return fmt.Sprintf("%s[minio_cache:%s,id:%s,p:%d,s:%d]", marker, markerTagVersion, o.ID, o.pool, o.set)
} }

View file

@ -23,7 +23,6 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path"
pathutil "path" pathutil "path"
"strings" "strings"
"sync" "sync"
@ -57,7 +56,7 @@ func renameAllBucketMetacache(epPath string) error {
// Required important fields are Bucket, Prefix, Separator. // Required important fields are Bucket, Prefix, Separator.
// Other important fields are Limit, Marker. // Other important fields are Limit, Marker.
// List ID always derived from the Marker. // List ID always derived from the Marker.
func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) {
if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil { if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil {
return entries, err return entries, err
} }
@ -95,140 +94,217 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
} }
// Decode and get the optional list id from the marker. // Decode and get the optional list id from the marker.
o.Marker, o.ID = parseMarker(o.Marker) o.parseMarker()
o.Create = o.ID == ""
if o.ID == "" {
o.ID = mustGetUUID()
}
o.BaseDir = baseDirFromPrefix(o.Prefix) o.BaseDir = baseDirFromPrefix(o.Prefix)
if o.discardResult { o.Transient = o.Transient || isReservedOrInvalidBucket(o.Bucket, false)
// Override for single object. if o.Transient {
o.BaseDir = o.Prefix o.Create = false
} }
// For very small recursive listings, don't same cache. // We have 2 cases:
// Attempts to avoid expensive listings to run for a long // 1) Cold listing, just list.
// while when clients aren't interested in results. // 2) Returning, but with no id. Start async listing.
// If the client DOES resume the listing a full cache // 3) Returning, with ID, stream from list.
// will be generated due to the marker without ID and this check failing. //
if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive {
o.discardResult = true
o.Transient = true
}
var cache metacache
// If we don't have a list id we must ask the server if it has a cache or create a new. // If we don't have a list id we must ask the server if it has a cache or create a new.
if o.Create { if o.ID != "" && !o.Transient {
o.CurrentCycle = intDataUpdateTracker.current() // Create or ping with handout...
o.OldestCycle = globalNotificationSys.findEarliestCleanBloomFilter(ctx, path.Join(o.Bucket, o.BaseDir))
var cache metacache
rpc := globalNotificationSys.restClientFromHash(o.Bucket) rpc := globalNotificationSys.restClientFromHash(o.Bucket)
if isReservedOrInvalidBucket(o.Bucket, false) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
rpc = nil defer cancel()
o.Transient = true var c *metacache
} if rpc == nil {
// Apply prefix filter if enabled. resp := localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(*o)
o.SetFilter() c = &resp
if rpc == nil || o.Transient {
// Local
cache = localMetacacheMgr.findCache(ctx, o)
} else { } else {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) c, err = rpc.GetMetacacheListing(ctx, *o)
defer cancel() }
c, err := rpc.GetMetacacheListing(ctx, o) if err != nil {
if err != nil { if errors.Is(err, context.Canceled) {
if errors.Is(err, context.Canceled) { // Context is canceled, return at once.
// Context is canceled, return at once. // request canceled, no entries to return
// request canceled, no entries to return return entries, io.EOF
return entries, io.EOF }
} if !errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, context.DeadlineExceeded) { // TODO: Remove, not really informational.
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} o.debugln("listPath: deadline exceeded")
o.Transient = true }
cache = localMetacacheMgr.findCache(ctx, o) o.Transient = true
o.Create = false
o.ID = mustGetUUID()
} else {
if c.fileNotFound {
// No cache found, no entries found.
return entries, io.EOF
}
if c.status == scanStateError || c.status == scanStateNone {
o.ID = ""
o.Create = false
o.debugln("scan status", c.status, " - waiting a roundtrip to create")
} else { } else {
cache = *c // Continue listing
o.ID = c.id
} }
} }
if cache.fileNotFound {
// No cache found, no entries found.
return entries, io.EOF
}
// Only create if we created a new.
o.Create = o.ID == cache.id
o.ID = cache.id
} }
if o.ID != "" && !o.Transient {
// We have an existing list ID, continue streaming.
if o.Create {
o.debugln("Creating", o)
entries, err = z.listAndSave(ctx, o)
if err == nil || err == io.EOF {
return entries, err
}
entries.truncate(0)
} else {
if o.pool < len(z.serverPools) && o.set < len(z.serverPools[o.pool].sets) {
o.debugln("Resuming", o)
entries, err = z.serverPools[o.pool].sets[o.set].streamMetadataParts(ctx, *o)
if err == nil {
return entries, nil
}
} else {
err = fmt.Errorf("invalid pool/set")
o.pool, o.set = 0, 0
}
}
if IsErr(err, []error{
nil,
context.Canceled,
context.DeadlineExceeded,
// io.EOF is expected and should be returned but no need to log it.
io.EOF,
}...) {
// Expected good errors we don't need to return error.
return entries, err
}
entries.truncate(0)
go func() {
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
defer cancel()
if c, err := rpc.GetMetacacheListing(ctx, *o); err == nil {
c.error = "no longer used"
c.status = scanStateError
}
}()
o.ID = ""
}
// Do listing in-place.
// Create output for our results.
// Create filter for results.
o.debugln("Raw List", o)
filterCh := make(chan metaCacheEntry, o.Limit)
filteredResults := o.gatherResults(filterCh)
listCtx, cancelList := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
var listErr error
go func(o listPathOptions) {
defer wg.Done()
o.Limit = 0
listErr = z.listMerged(listCtx, o, filterCh)
o.debugln("listMerged returned with", listErr)
}(*o)
entries, err = filteredResults()
cancelList()
wg.Wait()
if listErr != nil && !errors.Is(listErr, context.Canceled) {
return entries, listErr
}
truncated := entries.len() > o.Limit || err == nil
entries.truncate(o.Limit)
if !o.Transient && truncated {
if o.ID == "" {
entries.listID = mustGetUUID()
} else {
entries.listID = o.ID
}
}
if !truncated {
return entries, io.EOF
}
return entries, nil
}
// listMerged will list across all sets and return a merged results stream.
// The result channel is closed when no more results are expected.
func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) error {
var mu sync.Mutex var mu sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
var errs []error var errs []error
allAtEOF := true allAtEOF := true
var inputs []chan metaCacheEntry
mu.Lock() mu.Lock()
// Ask all sets and merge entries. // Ask all sets and merge entries.
listCtx, cancelList := context.WithCancel(ctx)
defer cancelList()
for _, pool := range z.serverPools { for _, pool := range z.serverPools {
for _, set := range pool.sets { for _, set := range pool.sets {
wg.Add(1) wg.Add(1)
results := make(chan metaCacheEntry, 100)
inputs = append(inputs, results)
go func(i int, set *erasureObjects) { go func(i int, set *erasureObjects) {
defer wg.Done() defer wg.Done()
e, err := set.listPath(ctx, o) err := set.listPath(listCtx, o, results)
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if err == nil { if err == nil {
allAtEOF = false allAtEOF = false
} }
errs[i] = err errs[i] = err
entries.merge(e, -1)
// Resolve non-trivial conflicts
entries.deduplicate(func(existing, other *metaCacheEntry) (replace bool) {
// Pick object over directory
if existing.isDir() && !other.isDir() {
return true
}
if !existing.isDir() && other.isDir() {
return false
}
eFIV, err := existing.fileInfo(o.Bucket)
if err != nil {
return true
}
oFIV, err := other.fileInfo(o.Bucket)
if err != nil {
return false
}
// Replace if modtime is newer
if !oFIV.ModTime.Equal(eFIV.ModTime) {
return oFIV.ModTime.After(eFIV.ModTime)
}
// Use NumVersions as a final tiebreaker.
return oFIV.NumVersions > eFIV.NumVersions
})
if entries.len() > o.Limit {
allAtEOF = false
entries.truncate(o.Limit)
}
}(len(errs), set) }(len(errs), set)
errs = append(errs, nil) errs = append(errs, nil)
} }
} }
mu.Unlock() mu.Unlock()
// Gather results to a single channel.
err := mergeEntryChannels(ctx, inputs, results, func(existing, other *metaCacheEntry) (replace bool) {
// Pick object over directory
if existing.isDir() && !other.isDir() {
return true
}
if !existing.isDir() && other.isDir() {
return false
}
eFIV, err := existing.fileInfo(o.Bucket)
if err != nil {
return true
}
oFIV, err := other.fileInfo(o.Bucket)
if err != nil {
return false
}
// Replace if modtime is newer
if !oFIV.ModTime.Equal(eFIV.ModTime) {
return oFIV.ModTime.After(eFIV.ModTime)
}
// Use NumVersions as a final tiebreaker.
return oFIV.NumVersions > eFIV.NumVersions
})
cancelList()
wg.Wait() wg.Wait()
if err != nil {
return err
}
if contextCanceled(ctx) {
return ctx.Err()
}
if isAllNotFound(errs) { if isAllNotFound(errs) {
// All sets returned not found. return nil
go func() {
// Update master cache with that information.
cache.status = scanStateSuccess
cache.fileNotFound = true
o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket))
}()
// cache returned not found, entries truncated.
return entries, io.EOF
} }
for _, err := range errs { for _, err := range errs {
if err == nil { if err == nil || contextCanceled(ctx) {
allAtEOF = false allAtEOF = false
continue continue
} }
@ -236,15 +312,66 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
continue continue
} }
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return entries, err return err
} }
truncated := entries.len() > o.Limit || !allAtEOF if allAtEOF {
entries.truncate(o.Limit) // TODO" Maybe, maybe not
if !o.discardResult { return io.EOF
entries.listID = o.ID
} }
if !truncated { return nil
return entries, io.EOF }
}
return entries, nil func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) {
// Use ID as the object name...
o.pool = z.getAvailablePoolIdx(ctx, minioMetaBucket, o.ID, 10<<20)
if o.pool < 0 {
// No space or similar, don't persist the listing.
o.pool = 0
o.Create = false
o.ID = ""
o.Transient = true
return entries, errDiskFull
}
o.set = z.serverPools[o.pool].getHashedSetIndex(o.ID)
saver := z.serverPools[o.pool].sets[o.set]
// Disconnect from call above, but cancel on exit.
listCtx, cancel := context.WithCancel(GlobalContext)
saveCh := make(chan metaCacheEntry, metacacheBlockSize)
inCh := make(chan metaCacheEntry, metacacheBlockSize)
outCh := make(chan metaCacheEntry, o.Limit)
filteredResults := o.gatherResults(outCh)
mc := o.newMetacache()
meta := metaCacheRPC{meta: &mc, cancel: cancel, rpc: globalNotificationSys.restClientFromHash(o.Bucket), o: *o}
// Save listing...
go func() {
if err := saver.saveMetaCacheStream(listCtx, &meta, saveCh); err != nil {
meta.setErr(err.Error())
}
cancel()
}()
// Do listing...
go func(o listPathOptions) {
err := z.listMerged(listCtx, o, inCh)
if err != nil {
meta.setErr(err.Error())
}
o.debugln("listAndSave: listing", o.ID, "finished with ", err)
}(*o)
// Write listing to results and saver.
go func() {
for entry := range inCh {
outCh <- entry
saveCh <- entry
}
close(outCh)
close(saveCh)
}()
return filteredResults()
} }

View file

@ -25,6 +25,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math/rand"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -82,13 +83,6 @@ type listPathOptions struct {
// Create indicates that the lister should not attempt to load an existing cache. // Create indicates that the lister should not attempt to load an existing cache.
Create bool Create bool
// CurrentCycle indicates the current bloom cycle.
// Will be used if a new scan is started.
CurrentCycle uint64
// OldestCycle indicates the oldest cycle acceptable.
OldestCycle uint64
// Include pure directories. // Include pure directories.
IncludeDirectories bool IncludeDirectories bool
@ -97,9 +91,8 @@ type listPathOptions struct {
// A transient result will never be returned from the cache so knowing the list id is required. // A transient result will never be returned from the cache so knowing the list id is required.
Transient bool Transient bool
// discardResult will not persist the cache to storage. // pool and set of where the cache is located.
// When the initial results are returned listing will be canceled. pool, set int
discardResult bool
} }
func init() { func init() {
@ -109,20 +102,18 @@ func init() {
// newMetacache constructs a new metacache from the options. // newMetacache constructs a new metacache from the options.
func (o listPathOptions) newMetacache() metacache { func (o listPathOptions) newMetacache() metacache {
return metacache{ return metacache{
id: o.ID, id: o.ID,
bucket: o.Bucket, bucket: o.Bucket,
root: o.BaseDir, root: o.BaseDir,
recursive: o.Recursive, recursive: o.Recursive,
status: scanStateStarted, status: scanStateStarted,
error: "", error: "",
started: UTCNow(), started: UTCNow(),
lastHandout: UTCNow(), lastHandout: UTCNow(),
lastUpdate: UTCNow(), lastUpdate: UTCNow(),
ended: time.Time{}, ended: time.Time{},
startedCycle: o.CurrentCycle, dataVersion: metacacheStreamVersion,
endedCycle: 0, filter: o.FilterPrefix,
dataVersion: metacacheStreamVersion,
filter: o.FilterPrefix,
} }
} }
@ -240,9 +231,6 @@ func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) {
// updateMetacacheListing will update the metacache listing. // updateMetacacheListing will update the metacache listing.
func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) { func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) {
if o.Transient {
return localMetacacheMgr.getTransient().updateCacheEntry(m)
}
if rpc == nil { if rpc == nil {
return localMetacacheMgr.updateCacheEntry(m) return localMetacacheMgr.updateCacheEntry(m)
} }
@ -274,9 +262,6 @@ func (o *listPathOptions) SetFilter() {
switch { switch {
case metacacheSharePrefix: case metacacheSharePrefix:
return return
case o.CurrentCycle != o.OldestCycle:
// We have a clean bloom filter
return
case o.Prefix == o.BaseDir: case o.Prefix == o.BaseDir:
// No additional prefix // No additional prefix
return return
@ -521,251 +506,207 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
} }
// Will return io.EOF if continuing would not yield more results. // Will return io.EOF if continuing would not yield more results.
func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) {
defer close(results)
o.debugf(color.Green("listPath:")+" with options: %#v", o) o.debugf(color.Green("listPath:")+" with options: %#v", o)
// See if we have the listing stored. askDisks := o.AskDisks
if !o.Create && !o.discardResult { listingQuorum := o.AskDisks - 1
entries, err := er.streamMetadataParts(ctx, o)
if IsErr(err, []error{
nil,
context.Canceled,
context.DeadlineExceeded,
}...) {
// Expected good errors we don't need to return error.
return entries, nil
}
if !errors.Is(err, io.EOF) { // io.EOF is expected and should be returned but no need to log it.
// Log an return errors on unexpected errors.
logger.LogIf(ctx, err)
}
return entries, err
}
meta := o.newMetacache()
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
var metaMu sync.Mutex
o.debugln(color.Green("listPath:")+" scanning bucket:", o.Bucket, "basedir:", o.BaseDir, "prefix:", o.Prefix, "marker:", o.Marker)
// Disconnect from call above, but cancel on exit.
ctx, cancel := context.WithCancel(GlobalContext)
// We need to ask disks.
disks := er.getOnlineDisks() disks := er.getOnlineDisks()
defer func() { // Special case: ask all disks if the drive count is 4
o.debugln(color.Green("listPath:")+" returning:", entries.len(), "err:", err)
if err != nil && !errors.Is(err, io.EOF) {
go func(err string) {
metaMu.Lock()
if meta.status != scanStateError {
meta.error = err
meta.status = scanStateError
}
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}(err.Error())
cancel()
}
}()
askDisks := o.AskDisks
if askDisks == 0 {
askDisks = globalAPIConfig.getListQuorum()
}
// make sure atleast default '3' lists object is present.
listingQuorum := askDisks
if askDisks == -1 || er.setDriveCount == 4 { if askDisks == -1 || er.setDriveCount == 4 {
askDisks = len(disks) // with 'strict' quorum list on all online disks. askDisks = len(disks) // with 'strict' quorum list on all online disks.
listingQuorum = getReadQuorum(er.setDriveCount) listingQuorum = getReadQuorum(er.setDriveCount)
} }
if askDisks == 0 {
if len(disks) < askDisks { askDisks = globalAPIConfig.getListQuorum()
err = InsufficientReadQuorum{} listingQuorum = askDisks
logger.LogIf(ctx, fmt.Errorf("listPath: Insufficient disks, %d of %d needed are available", len(disks), askDisks))
cancel()
return
} }
if askDisks > 0 && len(disks) > askDisks {
// Select askDisks random disks. rand.Shuffle(len(disks), func(i, j int) {
if len(disks) > askDisks { disks[i], disks[j] = disks[j], disks[i]
})
disks = disks[:askDisks] disks = disks[:askDisks]
} }
// Create output for our results. // How to resolve results.
var cacheCh chan metaCacheEntry resolver := metadataResolutionParams{
if !o.discardResult { dirQuorum: listingQuorum,
cacheCh = make(chan metaCacheEntry, metacacheBlockSize) objQuorum: listingQuorum,
bucket: o.Bucket,
} }
// Create filter for results. ctxDone := ctx.Done()
filterCh := make(chan metaCacheEntry, 100) return listPathRaw(ctx, listPathRawOptions{
filteredResults := o.gatherResults(filterCh) disks: disks,
closeChannels := func() { bucket: o.Bucket,
if !o.discardResult { path: o.BaseDir,
close(cacheCh) recursive: o.Recursive,
} filterPrefix: o.FilterPrefix,
close(filterCh) minDisks: listingQuorum,
} forwardTo: o.Marker,
agreed: func(entry metaCacheEntry) {
// Cancel listing on return if non-saved list. select {
if o.discardResult { case <-ctxDone:
defer cancel() case results <- entry:
} }
},
go func() { partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
defer cancel() // Results Disagree :-(
// Save continuous updates entry, ok := entries.resolve(&resolver)
go func() { if ok {
var err error
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
var exit bool
for !exit {
select { select {
case <-ticker.C: case <-ctxDone:
case <-ctx.Done(): case results <- *entry:
exit = true
} }
metaMu.Lock()
meta.endedCycle = intDataUpdateTracker.current()
meta, err = o.updateMetacacheListing(meta, rpc)
if meta.status == scanStateError {
logger.LogIf(ctx, err)
cancel()
exit = true
}
metaMu.Unlock()
} }
}() },
})
}
const retryDelay = 200 * time.Millisecond type metaCacheRPC struct {
const maxTries = 5 o listPathOptions
mu sync.Mutex
meta *metacache
rpc *peerRESTClient
cancel context.CancelFunc
}
var bw *metacacheBlockWriter func (m *metaCacheRPC) setErr(err string) {
// Don't save single object listings. m.mu.Lock()
if !o.discardResult { defer m.mu.Lock()
// Write results to disk. meta := *m.meta
bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error { if meta.status != scanStateError {
// if the block is 0 bytes and its a first block skip it. meta.error = err
// skip only this for Transient caches. meta.status = scanStateError
if len(b.data) == 0 && b.n == 0 && o.Transient { } else {
return nil // An error is already set.
} return
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n)) }
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data))) meta, _ = m.o.updateMetacacheListing(meta, m.rpc)
logger.LogIf(ctx, err) *m.meta = meta
custom := b.headerKV() }
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
ParentIsObject: nil,
})
if err != nil {
metaMu.Lock()
if meta.error != "" {
meta.status = scanStateError
meta.error = err.Error()
}
metaMu.Unlock()
cancel()
return err
}
if b.n == 0 {
return nil
}
// Update block 0 metadata.
var retries int
for {
meta := b.headerKV()
fi := FileInfo{
Metadata: make(map[string]string, len(meta)),
}
for k, v := range meta {
fi.Metadata[k] = v
}
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi)
if err == nil {
break
}
switch err.(type) {
case ObjectNotFound:
return err
case InsufficientReadQuorum:
default:
logger.LogIf(ctx, err)
}
if retries >= maxTries {
return err
}
retries++
time.Sleep(retryDelay)
}
return nil
})
}
// How to resolve results. func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCacheRPC, entries <-chan metaCacheEntry) (err error) {
resolver := metadataResolutionParams{ o := mc.o
dirQuorum: listingQuorum, o.debugf(color.Green("saveMetaCacheStream:")+" with options: %#v", o)
objQuorum: listingQuorum,
bucket: o.Bucket,
}
err := listPathRaw(ctx, listPathRawOptions{ metaMu := &mc.mu
disks: disks, rpc := mc.rpc
bucket: o.Bucket, cancel := mc.cancel
path: o.BaseDir, defer func() {
recursive: o.Recursive, o.debugln(color.Green("saveMetaCacheStream:")+"err:", err)
filterPrefix: o.FilterPrefix, if err != nil && !errors.Is(err, io.EOF) {
minDisks: listingQuorum, go mc.setErr(err.Error())
agreed: func(entry metaCacheEntry) { cancel()
if !o.discardResult {
cacheCh <- entry
}
filterCh <- entry
},
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
// Results Disagree :-(
entry, ok := entries.resolve(&resolver)
if ok {
if !o.discardResult {
cacheCh <- *entry
}
filterCh <- *entry
}
},
})
metaMu.Lock()
if err != nil {
meta.status = scanStateError
meta.error = err.Error()
}
// Save success
if meta.error == "" {
meta.status = scanStateSuccess
meta.endedCycle = intDataUpdateTracker.current()
}
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
closeChannels()
if !o.discardResult {
if err := bw.Close(); err != nil {
metaMu.Lock()
meta.error = err.Error()
meta.status = scanStateError
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}
} }
}() }()
return filteredResults() defer cancel()
// Save continuous updates
go func() {
var err error
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
var exit bool
for !exit {
select {
case <-ticker.C:
case <-ctx.Done():
exit = true
}
metaMu.Lock()
meta := *mc.meta
meta, err = o.updateMetacacheListing(meta, rpc)
*mc.meta = meta
if meta.status == scanStateError {
logger.LogIf(ctx, err)
cancel()
exit = true
}
metaMu.Unlock()
}
}()
const retryDelay = 200 * time.Millisecond
const maxTries = 5
// Keep destination...
// Write results to disk.
bw := newMetacacheBlockWriter(entries, func(b *metacacheBlock) error {
// if the block is 0 bytes and its a first block skip it.
// skip only this for Transient caches.
if len(b.data) == 0 && b.n == 0 && o.Transient {
return nil
}
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n))
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
logger.LogIf(ctx, err)
custom := b.headerKV()
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
ParentIsObject: nil,
})
if err != nil {
mc.setErr(err.Error())
cancel()
return err
}
if b.n == 0 {
return nil
}
// Update block 0 metadata.
var retries int
for {
meta := b.headerKV()
fi := FileInfo{
Metadata: make(map[string]string, len(meta)),
}
for k, v := range meta {
fi.Metadata[k] = v
}
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), fi)
if err == nil {
break
}
switch err.(type) {
case ObjectNotFound:
return err
case InsufficientReadQuorum:
default:
logger.LogIf(ctx, err)
}
if retries >= maxTries {
return err
}
retries++
time.Sleep(retryDelay)
}
return nil
})
metaMu.Lock()
if err != nil {
mc.setErr(err.Error())
return
}
// Save success
if mc.meta.error == "" {
mc.meta.status = scanStateSuccess
}
meta := *mc.meta
meta, _ = o.updateMetacacheListing(meta, rpc)
*mc.meta = meta
metaMu.Unlock()
if err := bw.Close(); err != nil {
mc.setErr(err.Error())
}
return
} }
type listPathRawOptions struct { type listPathRawOptions struct {

View file

@ -52,7 +52,7 @@ import (
// Streams can be assumed to be sorted in ascending order. // Streams can be assumed to be sorted in ascending order.
// If the stream ends before a false boolean it can be assumed it was truncated. // If the stream ends before a false boolean it can be assumed it was truncated.
const metacacheStreamVersion = 1 const metacacheStreamVersion = 2
// metacacheWriter provides a serializer of metacache objects. // metacacheWriter provides a serializer of metacache objects.
type metacacheWriter struct { type metacacheWriter struct {
@ -262,7 +262,7 @@ func newMetacacheReader(r io.Reader) *metacacheReader {
return err return err
} }
switch v { switch v {
case metacacheStreamVersion: case 1, 2:
default: default:
return fmt.Errorf("metacacheReader: Unknown version: %d", v) return fmt.Errorf("metacacheReader: Unknown version: %d", v)
} }

View file

@ -107,9 +107,18 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
prefix := opts.FilterPrefix prefix := opts.FilterPrefix
forward := opts.ForwardTo
var scanDir func(path string) error var scanDir func(path string) error
scanDir = func(current string) error { scanDir = func(current string) error {
// Skip forward, if requested...
forward := ""
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) {
forward = strings.TrimPrefix(opts.ForwardTo, current)
if idx := strings.IndexByte(forward, '/'); idx > 0 {
forward = forward[:idx]
}
}
if contextCanceled(ctx) { if contextCanceled(ctx) {
return ctx.Err() return ctx.Err()
} }
@ -187,6 +196,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
sort.Strings(entries) sort.Strings(entries)
dirStack := make([]string, 0, 5) dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level. prefix = "" // Remove prefix after first level.
if len(forward) > 0 {
idx := sort.SearchStrings(entries, forward)
if idx > 0 {
entries = entries[idx:]
}
}
for _, entry := range entries { for _, entry := range entries {
if entry == "" { if entry == "" {
@ -256,10 +271,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
out <- metaCacheEntry{name: pop} out <- metaCacheEntry{name: pop}
if opts.Recursive { if opts.Recursive {
// Scan folder we found. Should be in correct sort order where we are. // Scan folder we found. Should be in correct sort order where we are.
forward = ""
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, pop) {
forward = strings.TrimPrefix(opts.ForwardTo, pop)
}
logger.LogIf(ctx, scanDir(pop)) logger.LogIf(ctx, scanDir(pop))
} }
dirStack = dirStack[:len(dirStack)-1] dirStack = dirStack[:len(dirStack)-1]

View file

@ -65,8 +65,6 @@ type metacache struct {
ended time.Time `msg:"end"` ended time.Time `msg:"end"`
lastUpdate time.Time `msg:"u"` lastUpdate time.Time `msg:"u"`
lastHandout time.Time `msg:"lh"` lastHandout time.Time `msg:"lh"`
startedCycle uint64 `msg:"stc"`
endedCycle uint64 `msg:"endc"`
dataVersion uint8 `msg:"v"` dataVersion uint8 `msg:"v"`
} }
@ -74,66 +72,8 @@ func (m *metacache) finished() bool {
return !m.ended.IsZero() return !m.ended.IsZero()
} }
// matches returns whether the metacache matches the options given.
func (m *metacache) matches(o *listPathOptions, extend time.Duration) bool {
if o == nil {
return false
}
// Never return transient caches if there is no id.
if m.status == scanStateError || m.status == scanStateNone || m.dataVersion != metacacheStreamVersion {
o.debugf("cache %s state or stream version mismatch", m.id)
return false
}
if m.startedCycle < o.OldestCycle {
o.debugf("cache %s cycle too old", m.id)
return false
}
// Root of what we are looking for must at least have the same
if !strings.HasPrefix(o.BaseDir, m.root) {
o.debugf("cache %s prefix mismatch, cached:%v, want:%v", m.id, m.root, o.BaseDir)
return false
}
if m.filter != "" && strings.HasPrefix(m.filter, o.FilterPrefix) {
o.debugf("cache %s cannot be used because of filter %s", m.id, m.filter)
return false
}
if o.Recursive && !m.recursive {
o.debugf("cache %s not recursive", m.id)
// If this is recursive the cached listing must be as well.
return false
}
if o.Separator != slashSeparator && !m.recursive {
o.debugf("cache %s not slashsep and not recursive", m.id)
// Non slash separator requires recursive.
return false
}
if !m.finished() && time.Since(m.lastUpdate) > metacacheMaxRunningAge {
o.debugf("cache %s not running, time: %v", m.id, time.Since(m.lastUpdate))
// Abandoned
return false
}
if m.finished() && m.endedCycle <= o.OldestCycle {
if extend <= 0 {
// If scan has ended the oldest requested must be less.
o.debugf("cache %s ended and cycle (%v) <= oldest allowed (%v)", m.id, m.endedCycle, o.OldestCycle)
return false
}
if time.Since(m.lastUpdate) > metacacheMaxRunningAge+extend {
// Cache ended within bloom cycle, but we can extend the life.
o.debugf("cache %s ended (%v) and beyond extended life (%v)", m.id, m.lastUpdate, metacacheMaxRunningAge+extend)
return false
}
}
return true
}
// worthKeeping indicates if the cache by itself is worth keeping. // worthKeeping indicates if the cache by itself is worth keeping.
func (m *metacache) worthKeeping(currentCycle uint64) bool { func (m *metacache) worthKeeping() bool {
if m == nil { if m == nil {
return false return false
} }
@ -142,59 +82,16 @@ func (m *metacache) worthKeeping(currentCycle uint64) bool {
case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge: case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge:
// Not finished and update for metacacheMaxRunningAge, discard it. // Not finished and update for metacacheMaxRunningAge, discard it.
return false return false
case cache.finished() && cache.startedCycle > currentCycle: case cache.finished() && time.Since(cache.lastHandout) > 30*time.Minute:
// Cycle is somehow bigger. // Keep only for 30 minutes.
return false
case cache.finished() && time.Since(cache.lastHandout) > 48*time.Hour:
// Keep only for 2 days. Fallback if scanner is clogged.
return false
case cache.finished() && currentCycle >= dataUsageUpdateDirCycles && cache.startedCycle < currentCycle-dataUsageUpdateDirCycles:
// Cycle is too old to be valuable.
return false return false
case cache.status == scanStateError || cache.status == scanStateNone: case cache.status == scanStateError || cache.status == scanStateNone:
// Remove failed listings after 5 minutes. // Remove failed listings after 5 minutes.
return time.Since(cache.lastUpdate) < 5*time.Minute return time.Since(cache.lastUpdate) > 5*time.Minute
} }
return true return true
} }
// canBeReplacedBy.
// Both must pass the worthKeeping check.
func (m *metacache) canBeReplacedBy(other *metacache) bool {
// If the other is older it can never replace.
if other.started.Before(m.started) || m.id == other.id {
return false
}
if other.status == scanStateNone || other.status == scanStateError {
return false
}
if m.status == scanStateStarted && time.Since(m.lastUpdate) < metacacheMaxRunningAge {
return false
}
// Keep it around a bit longer.
if time.Since(m.lastHandout) < 30*time.Minute || time.Since(m.lastUpdate) < metacacheMaxRunningAge {
return false
}
// Go through recursive combinations.
switch {
case !m.recursive && !other.recursive:
// If both not recursive root must match.
return m.root == other.root && strings.HasPrefix(m.filter, other.filter)
case m.recursive && !other.recursive:
// A recursive can never be replaced by a non-recursive
return false
case !m.recursive && other.recursive:
// If other is recursive it must contain this root
return strings.HasPrefix(m.root, other.root) && other.filter == ""
case m.recursive && other.recursive:
// Similar if both are recursive
return strings.HasPrefix(m.root, other.root) && other.filter == ""
}
panic("should be unreachable")
}
// baseDirFromPrefix will return the base directory given an object path. // baseDirFromPrefix will return the base directory given an object path.
// For example an object with name prefix/folder/object.ext will return `prefix/folder/`. // For example an object with name prefix/folder/object.ext will return `prefix/folder/`.
func baseDirFromPrefix(prefix string) string { func baseDirFromPrefix(prefix string) string {
@ -218,13 +115,17 @@ func (m *metacache) update(update metacache) {
if m.status == scanStateStarted && update.status == scanStateSuccess { if m.status == scanStateStarted && update.status == scanStateSuccess {
m.ended = UTCNow() m.ended = UTCNow()
m.endedCycle = update.endedCycle
} }
if m.status == scanStateStarted && update.status != scanStateStarted { if m.status == scanStateStarted && update.status != scanStateStarted {
m.status = update.status m.status = update.status
} }
if m.status == scanStateStarted && time.Since(m.lastHandout) > 15*time.Minute {
m.status = scanStateError
m.error = "client not seen"
}
if m.error == "" && update.error != "" { if m.error == "" && update.error != "" {
m.error = update.error m.error = update.error
m.status = scanStateError m.status = scanStateError

View file

@ -100,18 +100,6 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "lastHandout") err = msgp.WrapError(err, "lastHandout")
return return
} }
case "stc":
z.startedCycle, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "startedCycle")
return
}
case "endc":
z.endedCycle, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "endedCycle")
return
}
case "v": case "v":
z.dataVersion, err = dc.ReadUint8() z.dataVersion, err = dc.ReadUint8()
if err != nil { if err != nil {
@ -131,9 +119,9 @@ func (z *metacache) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) { func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 15 // map header, size 13
// write "id" // write "id"
err = en.Append(0x8f, 0xa2, 0x69, 0x64) err = en.Append(0x8d, 0xa2, 0x69, 0x64)
if err != nil { if err != nil {
return return
} }
@ -252,26 +240,6 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "lastHandout") err = msgp.WrapError(err, "lastHandout")
return return
} }
// write "stc"
err = en.Append(0xa3, 0x73, 0x74, 0x63)
if err != nil {
return
}
err = en.WriteUint64(z.startedCycle)
if err != nil {
err = msgp.WrapError(err, "startedCycle")
return
}
// write "endc"
err = en.Append(0xa4, 0x65, 0x6e, 0x64, 0x63)
if err != nil {
return
}
err = en.WriteUint64(z.endedCycle)
if err != nil {
err = msgp.WrapError(err, "endedCycle")
return
}
// write "v" // write "v"
err = en.Append(0xa1, 0x76) err = en.Append(0xa1, 0x76)
if err != nil { if err != nil {
@ -288,9 +256,9 @@ func (z *metacache) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) { func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 15 // map header, size 13
// string "id" // string "id"
o = append(o, 0x8f, 0xa2, 0x69, 0x64) o = append(o, 0x8d, 0xa2, 0x69, 0x64)
o = msgp.AppendString(o, z.id) o = msgp.AppendString(o, z.id)
// string "b" // string "b"
o = append(o, 0xa1, 0x62) o = append(o, 0xa1, 0x62)
@ -325,12 +293,6 @@ func (z *metacache) MarshalMsg(b []byte) (o []byte, err error) {
// string "lh" // string "lh"
o = append(o, 0xa2, 0x6c, 0x68) o = append(o, 0xa2, 0x6c, 0x68)
o = msgp.AppendTime(o, z.lastHandout) o = msgp.AppendTime(o, z.lastHandout)
// string "stc"
o = append(o, 0xa3, 0x73, 0x74, 0x63)
o = msgp.AppendUint64(o, z.startedCycle)
// string "endc"
o = append(o, 0xa4, 0x65, 0x6e, 0x64, 0x63)
o = msgp.AppendUint64(o, z.endedCycle)
// string "v" // string "v"
o = append(o, 0xa1, 0x76) o = append(o, 0xa1, 0x76)
o = msgp.AppendUint8(o, z.dataVersion) o = msgp.AppendUint8(o, z.dataVersion)
@ -431,18 +393,6 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "lastHandout") err = msgp.WrapError(err, "lastHandout")
return return
} }
case "stc":
z.startedCycle, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "startedCycle")
return
}
case "endc":
z.endedCycle, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "endedCycle")
return
}
case "v": case "v":
z.dataVersion, bts, err = msgp.ReadUint8Bytes(bts) z.dataVersion, bts, err = msgp.ReadUint8Bytes(bts)
if err != nil { if err != nil {
@ -463,7 +413,7 @@ func (z *metacache) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *metacache) Msgsize() (s int) { func (z *metacache) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.filter) + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 4 + msgp.Uint64Size + 5 + msgp.Uint64Size + 2 + msgp.Uint8Size s = 1 + 3 + msgp.StringPrefixSize + len(z.id) + 2 + msgp.StringPrefixSize + len(z.bucket) + 5 + msgp.StringPrefixSize + len(z.root) + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.filter) + 5 + msgp.Uint8Size + 4 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.error) + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 2 + msgp.TimeSize + 3 + msgp.TimeSize + 2 + msgp.Uint8Size
return return
} }

View file

@ -37,8 +37,6 @@ var metaCacheTestset = []metacache{
ended: metaCacheTestsetTimestamp.Add(time.Minute), ended: metaCacheTestsetTimestamp.Add(time.Minute),
lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute),
lastHandout: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp,
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
1: { 1: {
@ -53,8 +51,6 @@ var metaCacheTestset = []metacache{
ended: metaCacheTestsetTimestamp.Add(time.Minute), ended: metaCacheTestsetTimestamp.Add(time.Minute),
lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute),
lastHandout: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp,
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
2: { 2: {
@ -69,8 +65,6 @@ var metaCacheTestset = []metacache{
ended: metaCacheTestsetTimestamp, ended: metaCacheTestsetTimestamp,
lastUpdate: metaCacheTestsetTimestamp, lastUpdate: metaCacheTestsetTimestamp,
lastHandout: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp,
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
3: { 3: {
@ -85,8 +79,6 @@ var metaCacheTestset = []metacache{
ended: metaCacheTestsetTimestamp.Add(-20 * time.Minute), ended: metaCacheTestsetTimestamp.Add(-20 * time.Minute),
lastUpdate: metaCacheTestsetTimestamp.Add(-20 * time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(-20 * time.Minute),
lastHandout: metaCacheTestsetTimestamp.Add(-20 * time.Minute), lastHandout: metaCacheTestsetTimestamp.Add(-20 * time.Minute),
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
4: { 4: {
@ -101,8 +93,6 @@ var metaCacheTestset = []metacache{
ended: time.Time{}, ended: time.Time{},
lastUpdate: metaCacheTestsetTimestamp.Add(-time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(-time.Minute),
lastHandout: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp,
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
5: { 5: {
@ -117,8 +107,6 @@ var metaCacheTestset = []metacache{
ended: metaCacheTestsetTimestamp.Add(time.Minute), ended: metaCacheTestsetTimestamp.Add(time.Minute),
lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(time.Minute),
lastHandout: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp,
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
6: { 6: {
@ -133,8 +121,6 @@ var metaCacheTestset = []metacache{
ended: metaCacheTestsetTimestamp.Add(-8 * time.Minute), ended: metaCacheTestsetTimestamp.Add(-8 * time.Minute),
lastUpdate: metaCacheTestsetTimestamp.Add(-8 * time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(-8 * time.Minute),
lastHandout: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp,
startedCycle: 6,
endedCycle: 8,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
7: { 7: {
@ -149,8 +135,6 @@ var metaCacheTestset = []metacache{
ended: time.Time{}, ended: time.Time{},
lastUpdate: metaCacheTestsetTimestamp.Add(-1 * time.Minute), lastUpdate: metaCacheTestsetTimestamp.Add(-1 * time.Minute),
lastHandout: metaCacheTestsetTimestamp, lastHandout: metaCacheTestsetTimestamp,
startedCycle: 10,
endedCycle: 0,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
8: { 8: {
@ -165,8 +149,6 @@ var metaCacheTestset = []metacache{
ended: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour), ended: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour),
lastUpdate: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour), lastUpdate: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour),
lastHandout: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour), lastHandout: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour),
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion, dataVersion: metacacheStreamVersion,
}, },
} }
@ -222,45 +204,6 @@ func Test_baseDirFromPrefix(t *testing.T) {
} }
} }
func Test_metacache_canBeReplacedBy(t *testing.T) {
testAgainst := metacache{
id: "case-1-modified",
bucket: "bucket",
root: "folder/prefix",
recursive: true,
status: scanStateSuccess,
fileNotFound: false,
error: "",
started: metaCacheTestsetTimestamp.Add(time.Minute),
ended: metaCacheTestsetTimestamp.Add(2 * time.Minute),
lastUpdate: metaCacheTestsetTimestamp.Add(2 * time.Minute),
lastHandout: metaCacheTestsetTimestamp.Add(time.Minute),
startedCycle: 10,
endedCycle: 10,
dataVersion: metacacheStreamVersion,
}
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: true, 5: false, 6: true, 7: false, 8: false}
for i, tt := range metaCacheTestset {
t.Run(tt.id, func(t *testing.T) {
var want bool
if i >= len(wantResults) {
t.Logf("no expected result for test #%d", i)
} else {
want = wantResults[i]
}
// Add an hour, otherwise it will never be replaced.
// We operated on a copy.
tt.lastHandout = tt.lastHandout.Add(-2 * time.Hour)
tt.lastUpdate = tt.lastHandout.Add(-2 * time.Hour)
got := tt.canBeReplacedBy(&testAgainst)
if got != want {
t.Errorf("#%d: want %v, got %v", i, want, got)
}
})
}
}
func Test_metacache_finished(t *testing.T) { func Test_metacache_finished(t *testing.T) {
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: true} wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: true}
@ -282,7 +225,8 @@ func Test_metacache_finished(t *testing.T) {
} }
func Test_metacache_worthKeeping(t *testing.T) { func Test_metacache_worthKeeping(t *testing.T) {
wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: false, 7: false, 8: false} // TODO: Update...
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: false}
for i, tt := range metaCacheTestset { for i, tt := range metaCacheTestset {
t.Run(tt.id, func(t *testing.T) { t.Run(tt.id, func(t *testing.T) {
@ -293,7 +237,7 @@ func Test_metacache_worthKeeping(t *testing.T) {
want = wantResults[i] want = wantResults[i]
} }
got := tt.worthKeeping(7 + dataUsageUpdateDirCycles) got := tt.worthKeeping()
if got != want { if got != want {
t.Errorf("#%d: want %v, got %v", i, want, got) t.Errorf("#%d: want %v, got %v", i, want, got)
} }

View file

@ -498,55 +498,6 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6
return bf, nil return bf, nil
} }
// findEarliestCleanBloomFilter will find the earliest bloom filter across the cluster
// where the directory is clean.
// Due to how objects are stored this can include object names.
func (sys *NotificationSys) findEarliestCleanBloomFilter(ctx context.Context, dir string) uint64 {
// Load initial state from local...
current := intDataUpdateTracker.current()
best := intDataUpdateTracker.latestWithDir(dir)
if best == current {
// If the current is dirty no need to check others.
return current
}
var req = bloomFilterRequest{
Current: 0,
Oldest: best,
OldestClean: dir,
}
var mu sync.Mutex
g := errgroup.WithNErrs(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
g.Go(func() error {
serverBF, err := client.cycleServerBloomFilter(ctx, req)
// Keep lock while checking result.
mu.Lock()
defer mu.Unlock()
if err != nil {
// Error, don't assume clean.
best = current
logger.LogIf(ctx, err)
return nil
}
if serverBF.OldestIdx > best {
best = serverBF.OldestIdx
}
return nil
}, idx)
}
g.Wait()
return best
}
var errPeerNotReachable = errors.New("peer is not reachable") var errPeerNotReachable = errors.New("peer is not reachable")
// GetLocks - makes GetLocks RPC call on all peers. // GetLocks - makes GetLocks RPC call on all peers.

View file

@ -36,7 +36,6 @@ const (
apiCorsAllowOrigin = "cors_allow_origin" apiCorsAllowOrigin = "cors_allow_origin"
apiRemoteTransportDeadline = "remote_transport_deadline" apiRemoteTransportDeadline = "remote_transport_deadline"
apiListQuorum = "list_quorum" apiListQuorum = "list_quorum"
apiExtendListCacheLife = "extend_list_cache_life"
apiReplicationWorkers = "replication_workers" apiReplicationWorkers = "replication_workers"
apiReplicationFailedWorkers = "replication_failed_workers" apiReplicationFailedWorkers = "replication_failed_workers"
@ -46,7 +45,6 @@ const (
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS"
EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS"
EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS" EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS"
@ -85,10 +83,6 @@ var (
Key: apiListQuorum, Key: apiListQuorum,
Value: "optimal", Value: "optimal",
}, },
config.KV{
Key: apiExtendListCacheLife,
Value: "0s",
},
config.KV{ config.KV{
Key: apiReplicationWorkers, Key: apiReplicationWorkers,
Value: "250", Value: "250",
@ -108,7 +102,6 @@ type Config struct {
CorsAllowOrigin []string `json:"cors_allow_origin"` CorsAllowOrigin []string `json:"cors_allow_origin"`
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
ListQuorum string `json:"list_quorum"` ListQuorum string `json:"list_quorum"`
ExtendListLife time.Duration `json:"extend_list_cache_life"`
ReplicationWorkers int `json:"replication_workers"` ReplicationWorkers int `json:"replication_workers"`
ReplicationFailedWorkers int `json:"replication_failed_workers"` ReplicationFailedWorkers int `json:"replication_failed_workers"`
} }
@ -144,6 +137,7 @@ func (sCfg Config) GetListQuorum() int {
func LookupConfig(kvs config.KVS) (cfg Config, err error) { func LookupConfig(kvs config.KVS) (cfg Config, err error) {
// remove this since we have removed this already. // remove this since we have removed this already.
kvs.Delete(apiReadyDeadline) kvs.Delete(apiReadyDeadline)
kvs.Delete("extend_list_cache_life")
if err = config.CheckValidKeys(config.APISubSys, kvs, DefaultKVS); err != nil { if err = config.CheckValidKeys(config.APISubSys, kvs, DefaultKVS); err != nil {
return cfg, err return cfg, err
@ -183,11 +177,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
return cfg, errors.New("invalid value for list strict quorum") return cfg, errors.New("invalid value for list strict quorum")
} }
listLife, err := time.ParseDuration(env.Get(EnvAPIExtendListCacheLife, kvs.Get(apiExtendListCacheLife)))
if err != nil {
return cfg, err
}
replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.Get(apiReplicationWorkers))) replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.Get(apiReplicationWorkers)))
if err != nil { if err != nil {
return cfg, err return cfg, err
@ -213,7 +202,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
CorsAllowOrigin: corsAllowOrigin, CorsAllowOrigin: corsAllowOrigin,
RemoteTransportDeadline: remoteTransportDeadline, RemoteTransportDeadline: remoteTransportDeadline,
ListQuorum: listQuorum, ListQuorum: listQuorum,
ExtendListLife: listLife,
ReplicationWorkers: replicationWorkers, ReplicationWorkers: replicationWorkers,
ReplicationFailedWorkers: replicationFailedWorkers, ReplicationFailedWorkers: replicationFailedWorkers,
}, nil }, nil