Compare commits
7 Commits
master
...
RELEASE.20
Author | SHA1 | Date |
---|---|---|
Anis Elleuch | d4ae7d5e62 | |
Harshavardhana | a3c36595a0 | |
Klaus Post | 87e4be5a58 | |
Klaus Post | 6539d782cd | |
Klaus Post | b622a8ec0c | |
Klaus Post | 824ebc0c86 | |
Anis Elleuch | 76496d4644 |
|
@ -57,6 +57,10 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) {
|
|||
b.closeWithErr(err)
|
||||
return n, err
|
||||
}
|
||||
if n != len(p) {
|
||||
err = io.ErrShortWrite
|
||||
b.closeWithErr(err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
|
|
|
@ -705,7 +705,7 @@ func saveUnformattedFormat(ctx context.Context, storageDisks []StorageAPI, forma
|
|||
if format == nil {
|
||||
continue
|
||||
}
|
||||
if storageDisks[index] != nil && storageDisks[index].IsOnline() {
|
||||
if storageDisks[index] != nil {
|
||||
if err := saveFormatErasure(storageDisks[index], format, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -18,25 +18,17 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/compress/s2"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/console"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
|
||||
// a bucketMetacache keeps track of all caches generated
|
||||
// for a bucket.
|
||||
type bucketMetacache struct {
|
||||
|
@ -78,108 +70,6 @@ func (b *bucketMetacache) debugf(format string, data ...interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// loadBucketMetaCache will load the cache from the object layer.
|
||||
// If the cache cannot be found a new one is created.
|
||||
func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) {
|
||||
objAPI := newObjectLayerFn()
|
||||
for objAPI == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
objAPI = newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket))
|
||||
}
|
||||
}
|
||||
|
||||
var meta bucketMetacache
|
||||
var decErr error
|
||||
// Use global context for this.
|
||||
r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{})
|
||||
if err == nil {
|
||||
dec := s2DecPool.Get().(*s2.Reader)
|
||||
dec.Reset(r)
|
||||
decErr = meta.DecodeMsg(msgp.NewReader(dec))
|
||||
dec.Reset(nil)
|
||||
r.Close()
|
||||
s2DecPool.Put(dec)
|
||||
}
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
err = nil
|
||||
case InsufficientReadQuorum:
|
||||
// Cache is likely lost. Clean up and return new.
|
||||
return newBucketMetacache(bucket, true), nil
|
||||
default:
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return newBucketMetacache(bucket, false), err
|
||||
}
|
||||
if decErr != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return newBucketMetacache(bucket, false), err
|
||||
}
|
||||
// Log the error, but assume the data is lost and return a fresh bucket.
|
||||
// Otherwise a broken cache will never recover.
|
||||
logger.LogIf(ctx, decErr)
|
||||
return newBucketMetacache(bucket, true), nil
|
||||
}
|
||||
// Sanity check...
|
||||
if meta.bucket != bucket {
|
||||
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
|
||||
return newBucketMetacache(bucket, true), nil
|
||||
}
|
||||
meta.cachesRoot = make(map[string][]string, len(meta.caches)/10)
|
||||
// Index roots
|
||||
for id, cache := range meta.caches {
|
||||
meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id)
|
||||
}
|
||||
return &meta, nil
|
||||
}
|
||||
|
||||
// save the bucket cache to the object storage.
|
||||
func (b *bucketMetacache) save(ctx context.Context) error {
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
// Keep lock while we marshal.
|
||||
// We need a write lock since we update 'updated'
|
||||
b.mu.Lock()
|
||||
if !b.updated {
|
||||
b.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
// Save as s2 compressed msgpack
|
||||
tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize()))
|
||||
enc := s2.NewWriter(tmp)
|
||||
err := msgp.Encode(enc, b)
|
||||
if err != nil {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
err = enc.Close()
|
||||
if err != nil {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
b.updated = false
|
||||
b.mu.Unlock()
|
||||
|
||||
hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr), ObjectOptions{})
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// findCache will attempt to find a matching cache for the provided options.
|
||||
// If a cache with the same ID exists already it will be returned.
|
||||
// If none can be found a new is created with the provided ID.
|
||||
|
@ -267,7 +157,7 @@ func (b *bucketMetacache) cleanup() {
|
|||
})
|
||||
// Keep first metacacheMaxEntries...
|
||||
for _, cache := range remainCaches[metacacheMaxEntries:] {
|
||||
if time.Since(cache.lastHandout) > 30*time.Minute {
|
||||
if time.Since(cache.lastHandout) > metacacheMaxClientWait {
|
||||
remove[cache.id] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,209 +0,0 @@
|
|||
package cmd
|
||||
|
||||
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
|
||||
|
||||
import (
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
||||
// DecodeMsg implements msgp.Decodable
|
||||
func (z *bucketMetacache) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
var field []byte
|
||||
_ = field
|
||||
var zb0001 uint32
|
||||
zb0001, err = dc.ReadMapHeader()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
for zb0001 > 0 {
|
||||
zb0001--
|
||||
field, err = dc.ReadMapKeyPtr()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
switch msgp.UnsafeString(field) {
|
||||
case "bucket":
|
||||
z.bucket, err = dc.ReadString()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "bucket")
|
||||
return
|
||||
}
|
||||
case "caches":
|
||||
var zb0002 uint32
|
||||
zb0002, err = dc.ReadMapHeader()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches")
|
||||
return
|
||||
}
|
||||
if z.caches == nil {
|
||||
z.caches = make(map[string]metacache, zb0002)
|
||||
} else if len(z.caches) > 0 {
|
||||
for key := range z.caches {
|
||||
delete(z.caches, key)
|
||||
}
|
||||
}
|
||||
for zb0002 > 0 {
|
||||
zb0002--
|
||||
var za0001 string
|
||||
var za0002 metacache
|
||||
za0001, err = dc.ReadString()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches")
|
||||
return
|
||||
}
|
||||
err = za0002.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches", za0001)
|
||||
return
|
||||
}
|
||||
z.caches[za0001] = za0002
|
||||
}
|
||||
default:
|
||||
err = dc.Skip()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *bucketMetacache) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// map header, size 2
|
||||
// write "bucket"
|
||||
err = en.Append(0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteString(z.bucket)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "bucket")
|
||||
return
|
||||
}
|
||||
// write "caches"
|
||||
err = en.Append(0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteMapHeader(uint32(len(z.caches)))
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches")
|
||||
return
|
||||
}
|
||||
for za0001, za0002 := range z.caches {
|
||||
err = en.WriteString(za0001)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches")
|
||||
return
|
||||
}
|
||||
err = za0002.EncodeMsg(en)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches", za0001)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *bucketMetacache) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// map header, size 2
|
||||
// string "bucket"
|
||||
o = append(o, 0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74)
|
||||
o = msgp.AppendString(o, z.bucket)
|
||||
// string "caches"
|
||||
o = append(o, 0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73)
|
||||
o = msgp.AppendMapHeader(o, uint32(len(z.caches)))
|
||||
for za0001, za0002 := range z.caches {
|
||||
o = msgp.AppendString(o, za0001)
|
||||
o, err = za0002.MarshalMsg(o)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches", za0001)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// UnmarshalMsg implements msgp.Unmarshaler
|
||||
func (z *bucketMetacache) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
var field []byte
|
||||
_ = field
|
||||
var zb0001 uint32
|
||||
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
for zb0001 > 0 {
|
||||
zb0001--
|
||||
field, bts, err = msgp.ReadMapKeyZC(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
switch msgp.UnsafeString(field) {
|
||||
case "bucket":
|
||||
z.bucket, bts, err = msgp.ReadStringBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "bucket")
|
||||
return
|
||||
}
|
||||
case "caches":
|
||||
var zb0002 uint32
|
||||
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches")
|
||||
return
|
||||
}
|
||||
if z.caches == nil {
|
||||
z.caches = make(map[string]metacache, zb0002)
|
||||
} else if len(z.caches) > 0 {
|
||||
for key := range z.caches {
|
||||
delete(z.caches, key)
|
||||
}
|
||||
}
|
||||
for zb0002 > 0 {
|
||||
var za0001 string
|
||||
var za0002 metacache
|
||||
zb0002--
|
||||
za0001, bts, err = msgp.ReadStringBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches")
|
||||
return
|
||||
}
|
||||
bts, err = za0002.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "caches", za0001)
|
||||
return
|
||||
}
|
||||
z.caches[za0001] = za0002
|
||||
}
|
||||
default:
|
||||
bts, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
o = bts
|
||||
return
|
||||
}
|
||||
|
||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||
func (z *bucketMetacache) Msgsize() (s int) {
|
||||
s = 1 + 7 + msgp.StringPrefixSize + len(z.bucket) + 7 + msgp.MapHeaderSize
|
||||
if z.caches != nil {
|
||||
for za0001, za0002 := range z.caches {
|
||||
_ = za0002
|
||||
s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
|
@ -1,123 +0,0 @@
|
|||
package cmd
|
||||
|
||||
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
||||
func TestMarshalUnmarshalbucketMetacache(t *testing.T) {
|
||||
v := bucketMetacache{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgbucketMetacache(b *testing.B) {
|
||||
v := bucketMetacache{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgbucketMetacache(b *testing.B) {
|
||||
v := bucketMetacache{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalbucketMetacache(b *testing.B) {
|
||||
v := bucketMetacache{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecodebucketMetacache(t *testing.T) {
|
||||
v := bucketMetacache{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecodebucketMetacache Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := bucketMetacache{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodebucketMetacache(b *testing.B) {
|
||||
v := bucketMetacache{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecodebucketMetacache(b *testing.B) {
|
||||
v := bucketMetacache{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -64,7 +64,6 @@ func (m *metacacheManager) initManager() {
|
|||
defer t.Stop()
|
||||
|
||||
var exit bool
|
||||
bg := context.Background()
|
||||
for !exit {
|
||||
select {
|
||||
case <-t.C:
|
||||
|
@ -76,7 +75,6 @@ func (m *metacacheManager) initManager() {
|
|||
if !exit {
|
||||
v.cleanup()
|
||||
}
|
||||
logger.LogIf(bg, v.save(bg))
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
m.mu.Lock()
|
||||
|
@ -116,8 +114,8 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
|
|||
// Return a transient bucket for invalid or system buckets.
|
||||
m.mu.RLock()
|
||||
b, ok := m.buckets[bucket]
|
||||
m.mu.RUnlock()
|
||||
if ok {
|
||||
m.mu.RUnlock()
|
||||
if b.bucket != bucket {
|
||||
logger.Info("getBucket: cached bucket %s does not match this bucket %s", b.bucket, bucket)
|
||||
debug.PrintStack()
|
||||
|
@ -125,11 +123,12 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
|
|||
return b
|
||||
}
|
||||
|
||||
m.mu.RUnlock()
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
// See if someone else fetched it while we waited for the lock.
|
||||
b, ok = m.buckets[bucket]
|
||||
if ok {
|
||||
m.mu.Unlock()
|
||||
if b.bucket != bucket {
|
||||
logger.Info("getBucket: newly cached bucket %s does not match this bucket %s", b.bucket, bucket)
|
||||
debug.PrintStack()
|
||||
|
@ -137,16 +136,9 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
|
|||
return b
|
||||
}
|
||||
|
||||
// Load bucket. If we fail return the transient bucket.
|
||||
b, err := loadBucketMetaCache(ctx, bucket)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
if b.bucket != bucket {
|
||||
logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket))
|
||||
}
|
||||
// New bucket. If we fail return the transient bucket.
|
||||
b = newBucketMetacache(bucket, true)
|
||||
m.buckets[bucket] = b
|
||||
m.mu.Unlock()
|
||||
return b
|
||||
}
|
||||
|
||||
|
|
|
@ -127,9 +127,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
|
|||
return entries, io.EOF
|
||||
}
|
||||
if !errors.Is(err, context.DeadlineExceeded) {
|
||||
// TODO: Remove, not really informational.
|
||||
logger.LogIf(ctx, err)
|
||||
o.debugln("listPath: deadline exceeded")
|
||||
o.debugln("listPath: got error", err)
|
||||
}
|
||||
o.Transient = true
|
||||
o.Create = false
|
||||
|
@ -146,6 +144,22 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
|
|||
} else {
|
||||
// Continue listing
|
||||
o.ID = c.id
|
||||
go func(meta metacache) {
|
||||
// Continuously update while we wait.
|
||||
t := time.NewTicker(metacacheMaxClientWait / 10)
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Request is done, stop updating.
|
||||
return
|
||||
case <-t.C:
|
||||
meta.lastHandout = time.Now()
|
||||
if rpc == nil {
|
||||
meta, _ = localMetacacheMgr.updateCacheEntry(meta)
|
||||
}
|
||||
meta, _ = rpc.UpdateMetacacheListing(ctx, meta)
|
||||
}
|
||||
}(*c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -599,7 +599,7 @@ type metaCacheRPC struct {
|
|||
|
||||
func (m *metaCacheRPC) setErr(err string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
meta := *m.meta
|
||||
if meta.status != scanStateError {
|
||||
meta.error = err
|
||||
|
@ -643,11 +643,20 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
|
|||
metaMu.Lock()
|
||||
meta := *mc.meta
|
||||
meta, err = o.updateMetacacheListing(meta, rpc)
|
||||
*mc.meta = meta
|
||||
if meta.status == scanStateError {
|
||||
logger.LogIf(ctx, err)
|
||||
if err == nil && time.Since(meta.lastHandout) > metacacheMaxClientWait {
|
||||
cancel()
|
||||
exit = true
|
||||
meta.status = scanStateError
|
||||
meta.error = fmt.Sprintf("listing canceled since time since last handout was %v ago", time.Since(meta.lastHandout).Round(time.Second))
|
||||
o.debugln(color.Green("saveMetaCacheStream: ") + meta.error)
|
||||
meta, err = o.updateMetacacheListing(meta, rpc)
|
||||
}
|
||||
if err == nil {
|
||||
*mc.meta = meta
|
||||
if meta.status == scanStateError {
|
||||
cancel()
|
||||
exit = true
|
||||
}
|
||||
}
|
||||
metaMu.Unlock()
|
||||
}
|
||||
|
@ -664,7 +673,7 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
|
|||
if len(b.data) == 0 && b.n == 0 && o.Transient {
|
||||
return nil
|
||||
}
|
||||
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n))
|
||||
o.debugln(color.Green("saveMetaCacheStream:")+" saving block", b.n, "to", o.objectPath(b.n))
|
||||
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
|
||||
logger.LogIf(ctx, err)
|
||||
custom := b.headerKV()
|
||||
|
@ -696,6 +705,8 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
|
|||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
return err
|
||||
case StorageErr:
|
||||
return err
|
||||
case InsufficientReadQuorum:
|
||||
default:
|
||||
logger.LogIf(ctx, err)
|
||||
|
|
|
@ -110,11 +110,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
var scanDir func(path string) error
|
||||
|
||||
scanDir = func(current string) error {
|
||||
// always skip the directory that doesn't match the prefix
|
||||
if len(current) > 0 && !strings.HasPrefix(current, prefix) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip forward, if requested...
|
||||
forward := ""
|
||||
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) {
|
||||
|
@ -143,7 +138,16 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
}
|
||||
dirObjects := make(map[string]struct{})
|
||||
for i, entry := range entries {
|
||||
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
|
||||
// Do do not retain the file, since it doesn't
|
||||
// match the prefix.
|
||||
entries[i] = ""
|
||||
continue
|
||||
}
|
||||
if len(forward) > 0 && entry < forward {
|
||||
// Do do not retain the file, since its
|
||||
// lexially smaller than 'forward'
|
||||
entries[i] = ""
|
||||
continue
|
||||
}
|
||||
if strings.HasSuffix(entry, slashSeparator) {
|
||||
|
@ -199,6 +203,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
// Process in sort order.
|
||||
sort.Strings(entries)
|
||||
dirStack := make([]string, 0, 5)
|
||||
prefix = "" // Remove prefix after first level as we have already filtered the list.
|
||||
if len(forward) > 0 {
|
||||
idx := sort.SearchStrings(entries, forward)
|
||||
if idx > 0 {
|
||||
|
|
|
@ -39,6 +39,9 @@ const (
|
|||
// Time in which the initiator of a scan must have reported back.
|
||||
metacacheMaxRunningAge = time.Minute
|
||||
|
||||
// Max time between client calls before dropping an async cache listing.
|
||||
metacacheMaxClientWait = 3 * time.Minute
|
||||
|
||||
// metacacheBlockSize is the number of file/directory entries to have in each block.
|
||||
metacacheBlockSize = 5000
|
||||
|
||||
|
@ -82,8 +85,9 @@ func (m *metacache) worthKeeping() bool {
|
|||
case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge:
|
||||
// Not finished and update for metacacheMaxRunningAge, discard it.
|
||||
return false
|
||||
case cache.finished() && time.Since(cache.lastHandout) > 30*time.Minute:
|
||||
// Keep only for 30 minutes.
|
||||
case cache.finished() && time.Since(cache.lastHandout) > 5*metacacheMaxClientWait:
|
||||
// Keep for 15 minutes after we last saw the client.
|
||||
// Since the cache is finished keeping it a bit longer doesn't hurt us.
|
||||
return false
|
||||
case cache.status == scanStateError || cache.status == scanStateNone:
|
||||
// Remove failed listings after 5 minutes.
|
||||
|
@ -113,6 +117,9 @@ func baseDirFromPrefix(prefix string) string {
|
|||
func (m *metacache) update(update metacache) {
|
||||
m.lastUpdate = UTCNow()
|
||||
|
||||
if m.lastHandout.After(m.lastHandout) {
|
||||
m.lastHandout = UTCNow()
|
||||
}
|
||||
if m.status == scanStateStarted && update.status == scanStateSuccess {
|
||||
m.ended = UTCNow()
|
||||
}
|
||||
|
@ -121,7 +128,8 @@ func (m *metacache) update(update metacache) {
|
|||
m.status = update.status
|
||||
}
|
||||
|
||||
if m.status == scanStateStarted && time.Since(m.lastHandout) > 15*time.Minute {
|
||||
if m.status == scanStateStarted && time.Since(m.lastHandout) > metacacheMaxClientWait {
|
||||
// Drop if client hasn't been seen for 3 minutes.
|
||||
m.status = scanStateError
|
||||
m.error = "client not seen"
|
||||
}
|
||||
|
|
|
@ -226,7 +226,7 @@ func Test_metacache_finished(t *testing.T) {
|
|||
|
||||
func Test_metacache_worthKeeping(t *testing.T) {
|
||||
// TODO: Update...
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: false}
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: true, 7: false, 8: false}
|
||||
|
||||
for i, tt := range metaCacheTestset {
|
||||
t.Run(tt.id, func(t *testing.T) {
|
||||
|
|
|
@ -84,6 +84,10 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
|
|||
{testBuckets[4], "file1/guidSplunk-aaaa/file", "content", nil},
|
||||
{testBuckets[5], "dir/day_id=2017-10-10/issue", "content", nil},
|
||||
{testBuckets[5], "dir/day_id=2017-10-11/issue", "content", nil},
|
||||
{testBuckets[5], "foo/201910/1122", "content", nil},
|
||||
{testBuckets[5], "foo/201910/1112", "content", nil},
|
||||
{testBuckets[5], "foo/201910/2112", "content", nil},
|
||||
{testBuckets[5], "foo/201910_txt", "content", nil},
|
||||
}
|
||||
for _, object := range testObjects {
|
||||
md5Bytes := md5.Sum([]byte(object.content))
|
||||
|
@ -477,6 +481,24 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
|
|||
IsTruncated: true,
|
||||
Prefixes: []string{"dir/day_id=2017-10-10/"},
|
||||
},
|
||||
// ListObjectsResult-37 list with prefix match 2 levels deep
|
||||
{
|
||||
IsTruncated: false,
|
||||
Objects: []ObjectInfo{
|
||||
{Name: "foo/201910/1112"},
|
||||
{Name: "foo/201910/1122"},
|
||||
},
|
||||
},
|
||||
// ListObjectsResult-38 list with prefix match 1 level deep
|
||||
{
|
||||
IsTruncated: false,
|
||||
Objects: []ObjectInfo{
|
||||
{Name: "foo/201910/1112"},
|
||||
{Name: "foo/201910/1122"},
|
||||
{Name: "foo/201910/2112"},
|
||||
{Name: "foo/201910_txt"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
|
@ -602,6 +624,9 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
|
|||
{testBuckets[4], "file1/", "", "guidSplunk", 1000, resultCases[35], nil, true},
|
||||
// Test listing at prefix with expected prefix markers
|
||||
{testBuckets[5], "dir/", "", SlashSeparator, 1, resultCases[36], nil, true},
|
||||
// Test listing with prefix match
|
||||
{testBuckets[5], "foo/201910/11", "", "", 1000, resultCases[37], nil, true},
|
||||
{testBuckets[5], "foo/201910", "", "", 1000, resultCases[38], nil, true},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
|
|
|
@ -375,6 +375,10 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
|
|||
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
||||
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = waitForHTTPResponse(respBody)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -326,8 +326,8 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
|
|||
return
|
||||
}
|
||||
|
||||
done := keepHTTPResponseAlive(w)
|
||||
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body))
|
||||
done, body := keepHTTPReqResponseAlive(w, r)
|
||||
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body))
|
||||
}
|
||||
|
||||
// DeleteVersion delete updated metadata.
|
||||
|
@ -719,8 +719,100 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
|
|||
}
|
||||
}
|
||||
|
||||
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
|
||||
// the Close() function is called.
|
||||
type closeNotifier struct {
|
||||
rc io.ReadCloser
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (c *closeNotifier) Read(p []byte) (n int, err error) {
|
||||
n, err = c.rc.Read(p)
|
||||
if err != nil {
|
||||
if c.done != nil {
|
||||
close(c.done)
|
||||
c.done = nil
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c *closeNotifier) Close() error {
|
||||
if c.done != nil {
|
||||
close(c.done)
|
||||
c.done = nil
|
||||
}
|
||||
return c.rc.Close()
|
||||
}
|
||||
|
||||
// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage
|
||||
// operations, such as bitrot verification or data usage scanning.
|
||||
// Every 10 seconds a space character is sent.
|
||||
// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker.
|
||||
// The returned function should always be called to release resources.
|
||||
// An optional error can be sent which will be picked as text only error,
|
||||
// without its original type by the receiver.
|
||||
// waitForHTTPResponse should be used to the receiving side.
|
||||
func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) {
|
||||
bodyDoneCh := make(chan struct{})
|
||||
doneCh := make(chan error)
|
||||
ctx := r.Context()
|
||||
go func() {
|
||||
// Wait for body to be read.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-bodyDoneCh:
|
||||
case err := <-doneCh:
|
||||
if err != nil {
|
||||
w.Write([]byte{1})
|
||||
w.Write([]byte(err.Error()))
|
||||
} else {
|
||||
w.Write([]byte{0})
|
||||
}
|
||||
close(doneCh)
|
||||
return
|
||||
}
|
||||
defer close(doneCh)
|
||||
// Initiate ticker after body has been read.
|
||||
ticker := time.NewTicker(time.Second * 10)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Response not ready, write a filler byte.
|
||||
w.Write([]byte{32})
|
||||
w.(http.Flusher).Flush()
|
||||
case err := <-doneCh:
|
||||
if err != nil {
|
||||
w.Write([]byte{1})
|
||||
w.Write([]byte(err.Error()))
|
||||
} else {
|
||||
w.Write([]byte{0})
|
||||
}
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return func(err error) {
|
||||
if doneCh == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Indicate we are ready to write.
|
||||
doneCh <- err
|
||||
|
||||
// Wait for channel to be closed so we don't race on writes.
|
||||
<-doneCh
|
||||
|
||||
// Clear so we can be called multiple times without crashing.
|
||||
doneCh = nil
|
||||
}, &closeNotifier{rc: r.Body, done: bodyDoneCh}
|
||||
}
|
||||
|
||||
// keepHTTPResponseAlive can be used to avoid timeouts with long storage
|
||||
// operations, such as bitrot verification or data usage scanning.
|
||||
// keepHTTPResponseAlive may NOT be used until the request body has been read,
|
||||
// use keepHTTPReqResponseAlive instead.
|
||||
// Every 10 seconds a space character is sent.
|
||||
// The returned function should always be called to release resources.
|
||||
// An optional error can be sent which will be picked as text only error,
|
||||
|
|
Loading…
Reference in New Issue