Stop async listing earlier (#13160)

Stop async listing if we have not heard back from the client for 3 minutes.

This will stop spending resources on async listings when they are unlikely to get used. 
If the client returns a new listing will be started on the second request.

Stop saving cache metadata to disk. It is cleared on restarts anyway. Removes all 
load/save functionality
This commit is contained in:
Klaus Post 2021-09-08 11:06:45 -07:00 committed by GitHub
parent 951b1e6a7a
commit 3c2efd9cf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 48 additions and 467 deletions

View File

@ -18,25 +18,17 @@
package cmd
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"runtime/debug"
"sort"
"sync"
"time"
"github.com/klauspost/compress/s2"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
"github.com/tinylib/msgp/msgp"
)
//go:generate msgp -file $GOFILE -unexported
// a bucketMetacache keeps track of all caches generated
// for a bucket.
type bucketMetacache struct {
@ -78,108 +70,6 @@ func (b *bucketMetacache) debugf(format string, data ...interface{}) {
}
}
// loadBucketMetaCache will load the cache from the object layer.
// If the cache cannot be found a new one is created.
func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) {
objAPI := newObjectLayerFn()
for objAPI == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
time.Sleep(250 * time.Millisecond)
}
objAPI = newObjectLayerFn()
if objAPI == nil {
logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket))
}
}
var meta bucketMetacache
var decErr error
// Use global context for this.
r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{})
if err == nil {
dec := s2DecPool.Get().(*s2.Reader)
dec.Reset(r)
decErr = meta.DecodeMsg(msgp.NewReader(dec))
dec.Reset(nil)
r.Close()
s2DecPool.Put(dec)
}
if err != nil {
switch err.(type) {
case ObjectNotFound:
err = nil
case InsufficientReadQuorum:
// Cache is likely lost. Clean up and return new.
return newBucketMetacache(bucket, true), nil
default:
logger.LogIf(ctx, err)
}
return newBucketMetacache(bucket, false), err
}
if decErr != nil {
if errors.Is(err, context.Canceled) {
return newBucketMetacache(bucket, false), err
}
// Log the error, but assume the data is lost and return a fresh bucket.
// Otherwise a broken cache will never recover.
logger.LogIf(ctx, decErr)
return newBucketMetacache(bucket, true), nil
}
// Sanity check...
if meta.bucket != bucket {
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
return newBucketMetacache(bucket, true), nil
}
meta.cachesRoot = make(map[string][]string, len(meta.caches)/10)
// Index roots
for id, cache := range meta.caches {
meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id)
}
return &meta, nil
}
// save the bucket cache to the object storage.
func (b *bucketMetacache) save(ctx context.Context) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
// Keep lock while we marshal.
// We need a write lock since we update 'updated'
b.mu.Lock()
if !b.updated {
b.mu.Unlock()
return nil
}
// Save as s2 compressed msgpack
tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize()))
enc := s2.NewWriter(tmp)
err := msgp.Encode(enc, b)
if err != nil {
b.mu.Unlock()
return err
}
err = enc.Close()
if err != nil {
b.mu.Unlock()
return err
}
b.updated = false
b.mu.Unlock()
hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len()))
if err != nil {
return err
}
_, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr), ObjectOptions{})
logger.LogIf(ctx, err)
return err
}
// findCache will attempt to find a matching cache for the provided options.
// If a cache with the same ID exists already it will be returned.
// If none can be found a new is created with the provided ID.
@ -267,7 +157,7 @@ func (b *bucketMetacache) cleanup() {
})
// Keep first metacacheMaxEntries...
for _, cache := range remainCaches[metacacheMaxEntries:] {
if time.Since(cache.lastHandout) > 30*time.Minute {
if time.Since(cache.lastHandout) > metacacheMaxClientWait {
remove[cache.id] = struct{}{}
}
}

View File

@ -1,209 +0,0 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *bucketMetacache) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "bucket":
z.bucket, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "bucket")
return
}
case "caches":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
if z.caches == nil {
z.caches = make(map[string]metacache, zb0002)
} else if len(z.caches) > 0 {
for key := range z.caches {
delete(z.caches, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 metacache
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
err = za0002.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
z.caches[za0001] = za0002
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *bucketMetacache) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "bucket"
err = en.Append(0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74)
if err != nil {
return
}
err = en.WriteString(z.bucket)
if err != nil {
err = msgp.WrapError(err, "bucket")
return
}
// write "caches"
err = en.Append(0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.caches)))
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
for za0001, za0002 := range z.caches {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
err = za0002.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *bucketMetacache) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "bucket"
o = append(o, 0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74)
o = msgp.AppendString(o, z.bucket)
// string "caches"
o = append(o, 0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73)
o = msgp.AppendMapHeader(o, uint32(len(z.caches)))
for za0001, za0002 := range z.caches {
o = msgp.AppendString(o, za0001)
o, err = za0002.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *bucketMetacache) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "bucket":
z.bucket, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "bucket")
return
}
case "caches":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
if z.caches == nil {
z.caches = make(map[string]metacache, zb0002)
} else if len(z.caches) > 0 {
for key := range z.caches {
delete(z.caches, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 metacache
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
bts, err = za0002.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
z.caches[za0001] = za0002
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *bucketMetacache) Msgsize() (s int) {
s = 1 + 7 + msgp.StringPrefixSize + len(z.bucket) + 7 + msgp.MapHeaderSize
if z.caches != nil {
for za0001, za0002 := range z.caches {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize()
}
}
return
}

View File

@ -1,123 +0,0 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalbucketMetacache(t *testing.T) {
v := bucketMetacache{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgbucketMetacache(b *testing.B) {
v := bucketMetacache{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgbucketMetacache(b *testing.B) {
v := bucketMetacache{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalbucketMetacache(b *testing.B) {
v := bucketMetacache{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodebucketMetacache(t *testing.T) {
v := bucketMetacache{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodebucketMetacache Msgsize() is inaccurate")
}
vn := bucketMetacache{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodebucketMetacache(b *testing.B) {
v := bucketMetacache{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodebucketMetacache(b *testing.B) {
v := bucketMetacache{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -64,7 +64,6 @@ func (m *metacacheManager) initManager() {
defer t.Stop()
var exit bool
bg := context.Background()
for !exit {
select {
case <-t.C:
@ -76,7 +75,6 @@ func (m *metacacheManager) initManager() {
if !exit {
v.cleanup()
}
logger.LogIf(bg, v.save(bg))
}
m.mu.RUnlock()
m.mu.Lock()
@ -116,8 +114,8 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
// Return a transient bucket for invalid or system buckets.
m.mu.RLock()
b, ok := m.buckets[bucket]
m.mu.RUnlock()
if ok {
m.mu.RUnlock()
if b.bucket != bucket {
logger.Info("getBucket: cached bucket %s does not match this bucket %s", b.bucket, bucket)
debug.PrintStack()
@ -125,11 +123,12 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
return b
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
// See if someone else fetched it while we waited for the lock.
b, ok = m.buckets[bucket]
if ok {
m.mu.Unlock()
if b.bucket != bucket {
logger.Info("getBucket: newly cached bucket %s does not match this bucket %s", b.bucket, bucket)
debug.PrintStack()
@ -137,16 +136,9 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
return b
}
// Load bucket. If we fail return the transient bucket.
b, err := loadBucketMetaCache(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
}
if b.bucket != bucket {
logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket))
}
// New bucket. If we fail return the transient bucket.
b = newBucketMetacache(bucket, true)
m.buckets[bucket] = b
m.mu.Unlock()
return b
}

View File

@ -127,9 +127,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
return entries, io.EOF
}
if !errors.Is(err, context.DeadlineExceeded) {
// TODO: Remove, not really informational.
logger.LogIf(ctx, err)
o.debugln("listPath: deadline exceeded")
o.debugln("listPath: got error", err)
}
o.Transient = true
o.Create = false
@ -146,6 +144,22 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
} else {
// Continue listing
o.ID = c.id
go func(meta metacache) {
// Continuously update while we wait.
t := time.NewTicker(metacacheMaxClientWait / 10)
defer t.Stop()
select {
case <-ctx.Done():
// Request is done, stop updating.
return
case <-t.C:
meta.lastHandout = time.Now()
if rpc == nil {
meta, _ = localMetacacheMgr.updateCacheEntry(meta)
}
meta, _ = rpc.UpdateMetacacheListing(ctx, meta)
}
}(*c)
}
}
}

View File

@ -643,11 +643,20 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
metaMu.Lock()
meta := *mc.meta
meta, err = o.updateMetacacheListing(meta, rpc)
*mc.meta = meta
if meta.status == scanStateError {
logger.LogIf(ctx, err)
if err == nil && time.Since(meta.lastHandout) > metacacheMaxClientWait {
cancel()
exit = true
meta.status = scanStateError
meta.error = fmt.Sprintf("listing canceled since time since last handout was %v ago", time.Since(meta.lastHandout).Round(time.Second))
o.debugln(color.Green("saveMetaCacheStream: ") + meta.error)
meta, err = o.updateMetacacheListing(meta, rpc)
}
if err == nil {
*mc.meta = meta
if meta.status == scanStateError {
cancel()
exit = true
}
}
metaMu.Unlock()
}
@ -664,7 +673,7 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
if len(b.data) == 0 && b.n == 0 && o.Transient {
return nil
}
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n))
o.debugln(color.Green("saveMetaCacheStream:")+" saving block", b.n, "to", o.objectPath(b.n))
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
logger.LogIf(ctx, err)
custom := b.headerKV()

View File

@ -39,6 +39,9 @@ const (
// Time in which the initiator of a scan must have reported back.
metacacheMaxRunningAge = time.Minute
// Max time between client calls before dropping an async cache listing.
metacacheMaxClientWait = 3 * time.Minute
// metacacheBlockSize is the number of file/directory entries to have in each block.
metacacheBlockSize = 5000
@ -82,8 +85,9 @@ func (m *metacache) worthKeeping() bool {
case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge:
// Not finished and update for metacacheMaxRunningAge, discard it.
return false
case cache.finished() && time.Since(cache.lastHandout) > 30*time.Minute:
// Keep only for 30 minutes.
case cache.finished() && time.Since(cache.lastHandout) > 5*metacacheMaxClientWait:
// Keep for 15 minutes after we last saw the client.
// Since the cache is finished keeping it a bit longer doesn't hurt us.
return false
case cache.status == scanStateError || cache.status == scanStateNone:
// Remove failed listings after 5 minutes.
@ -113,6 +117,9 @@ func baseDirFromPrefix(prefix string) string {
func (m *metacache) update(update metacache) {
m.lastUpdate = UTCNow()
if m.lastHandout.After(m.lastHandout) {
m.lastHandout = UTCNow()
}
if m.status == scanStateStarted && update.status == scanStateSuccess {
m.ended = UTCNow()
}
@ -121,7 +128,8 @@ func (m *metacache) update(update metacache) {
m.status = update.status
}
if m.status == scanStateStarted && time.Since(m.lastHandout) > 15*time.Minute {
if m.status == scanStateStarted && time.Since(m.lastHandout) > metacacheMaxClientWait {
// Drop if client hasn't been seen for 3 minutes.
m.status = scanStateError
m.error = "client not seen"
}

View File

@ -226,7 +226,7 @@ func Test_metacache_finished(t *testing.T) {
func Test_metacache_worthKeeping(t *testing.T) {
// TODO: Update...
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: false}
wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: true, 7: false, 8: false}
for i, tt := range metaCacheTestset {
t.Run(tt.id, func(t *testing.T) {