Compare commits

...

7 Commits

Author SHA1 Message Date
Anis Elleuch d4ae7d5e62
fix: New disks healing should pick unformatted disks as well (#13054) (#13620)
A recent regression caused new disks not being re-formatted. In the old
code, a disk needed be 'online' to be chosen to be formatted but the
disk has to be already formatted for XL storage IsOnline() function to
return true.

It is enough to check if XL storage is nil or not if we want to avoid
formatting root disks.

Co-authored-by: Anis Elleuch <anis@min.io>

Co-authored-by: Anis Elleuch <anis@min.io>
2021-11-09 10:26:07 -08:00
Harshavardhana a3c36595a0 fix: all levels deep flat key match (#12996)
this addresses a regression from #12984
which only addresses flat key from single
level deep at bucket level.

added extra tests as well to cover all
these scenarios.
2021-09-22 10:36:20 -07:00
Klaus Post 87e4be5a58 fix: missing close on error for keepAlive connections (#13109)
Add missing close when error is reported
before body is done.
2021-09-15 10:25:14 -07:00
Klaus Post 6539d782cd listing: Don't log errFileNotFound and friends (#13119) 2021-09-15 10:25:13 -07:00
Klaus Post b622a8ec0c fix: hanging operations on PUT with slow IO (#13087)
#11878 added "keepHTTPResponseAlive" to CreateFile requests. 
The problem is that it will begin writing to the response before the 
body is read after 10 seconds. This will abort the writes on the 
client-side, since it assumes the server has received what it wants.

The proposed solution here is to monitor the completion of the body 
before beginning to send keepalive pings.

Fixes observed high number of goroutines stuck in `io.Copy` in 
`github.com/minio/minio/cmd.(*xlStorage).CreateFile` and 
`(*storageRESTClient).CreateFile` stuck in `http.DrainBody`.
2021-09-15 10:24:46 -07:00
Klaus Post 824ebc0c86 Stop async listing earlier (#13160)
Stop async listing if we have not heard back from the client for 3 minutes.

This will stop spending resources on async listings when they are unlikely to get used. 
If the client returns a new listing will be started on the second request.

Stop saving cache metadata to disk. It is cleared on restarts anyway. Removes all 
load/save functionality
2021-09-15 10:24:14 -07:00
Anis Elleuch 76496d4644 Fix deadlock when error during metacache generation (#13201)
A typo forgot to release a lock after acquiring it.
2021-09-13 09:59:35 -07:00
14 changed files with 189 additions and 476 deletions

View File

@ -57,6 +57,10 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) {
b.closeWithErr(err)
return n, err
}
if n != len(p) {
err = io.ErrShortWrite
b.closeWithErr(err)
}
return n, err
}

View File

@ -705,7 +705,7 @@ func saveUnformattedFormat(ctx context.Context, storageDisks []StorageAPI, forma
if format == nil {
continue
}
if storageDisks[index] != nil && storageDisks[index].IsOnline() {
if storageDisks[index] != nil {
if err := saveFormatErasure(storageDisks[index], format, true); err != nil {
return err
}

View File

@ -18,25 +18,17 @@
package cmd
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"runtime/debug"
"sort"
"sync"
"time"
"github.com/klauspost/compress/s2"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
"github.com/tinylib/msgp/msgp"
)
//go:generate msgp -file $GOFILE -unexported
// a bucketMetacache keeps track of all caches generated
// for a bucket.
type bucketMetacache struct {
@ -78,108 +70,6 @@ func (b *bucketMetacache) debugf(format string, data ...interface{}) {
}
}
// loadBucketMetaCache will load the cache from the object layer.
// If the cache cannot be found a new one is created.
func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) {
objAPI := newObjectLayerFn()
for objAPI == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
time.Sleep(250 * time.Millisecond)
}
objAPI = newObjectLayerFn()
if objAPI == nil {
logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket))
}
}
var meta bucketMetacache
var decErr error
// Use global context for this.
r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{})
if err == nil {
dec := s2DecPool.Get().(*s2.Reader)
dec.Reset(r)
decErr = meta.DecodeMsg(msgp.NewReader(dec))
dec.Reset(nil)
r.Close()
s2DecPool.Put(dec)
}
if err != nil {
switch err.(type) {
case ObjectNotFound:
err = nil
case InsufficientReadQuorum:
// Cache is likely lost. Clean up and return new.
return newBucketMetacache(bucket, true), nil
default:
logger.LogIf(ctx, err)
}
return newBucketMetacache(bucket, false), err
}
if decErr != nil {
if errors.Is(err, context.Canceled) {
return newBucketMetacache(bucket, false), err
}
// Log the error, but assume the data is lost and return a fresh bucket.
// Otherwise a broken cache will never recover.
logger.LogIf(ctx, decErr)
return newBucketMetacache(bucket, true), nil
}
// Sanity check...
if meta.bucket != bucket {
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
return newBucketMetacache(bucket, true), nil
}
meta.cachesRoot = make(map[string][]string, len(meta.caches)/10)
// Index roots
for id, cache := range meta.caches {
meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id)
}
return &meta, nil
}
// save the bucket cache to the object storage.
func (b *bucketMetacache) save(ctx context.Context) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
// Keep lock while we marshal.
// We need a write lock since we update 'updated'
b.mu.Lock()
if !b.updated {
b.mu.Unlock()
return nil
}
// Save as s2 compressed msgpack
tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize()))
enc := s2.NewWriter(tmp)
err := msgp.Encode(enc, b)
if err != nil {
b.mu.Unlock()
return err
}
err = enc.Close()
if err != nil {
b.mu.Unlock()
return err
}
b.updated = false
b.mu.Unlock()
hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len()))
if err != nil {
return err
}
_, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr), ObjectOptions{})
logger.LogIf(ctx, err)
return err
}
// findCache will attempt to find a matching cache for the provided options.
// If a cache with the same ID exists already it will be returned.
// If none can be found a new is created with the provided ID.
@ -267,7 +157,7 @@ func (b *bucketMetacache) cleanup() {
})
// Keep first metacacheMaxEntries...
for _, cache := range remainCaches[metacacheMaxEntries:] {
if time.Since(cache.lastHandout) > 30*time.Minute {
if time.Since(cache.lastHandout) > metacacheMaxClientWait {
remove[cache.id] = struct{}{}
}
}

View File

@ -1,209 +0,0 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *bucketMetacache) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "bucket":
z.bucket, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "bucket")
return
}
case "caches":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
if z.caches == nil {
z.caches = make(map[string]metacache, zb0002)
} else if len(z.caches) > 0 {
for key := range z.caches {
delete(z.caches, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 metacache
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
err = za0002.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
z.caches[za0001] = za0002
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *bucketMetacache) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "bucket"
err = en.Append(0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74)
if err != nil {
return
}
err = en.WriteString(z.bucket)
if err != nil {
err = msgp.WrapError(err, "bucket")
return
}
// write "caches"
err = en.Append(0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.caches)))
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
for za0001, za0002 := range z.caches {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
err = za0002.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *bucketMetacache) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "bucket"
o = append(o, 0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74)
o = msgp.AppendString(o, z.bucket)
// string "caches"
o = append(o, 0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73)
o = msgp.AppendMapHeader(o, uint32(len(z.caches)))
for za0001, za0002 := range z.caches {
o = msgp.AppendString(o, za0001)
o, err = za0002.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *bucketMetacache) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "bucket":
z.bucket, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "bucket")
return
}
case "caches":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
if z.caches == nil {
z.caches = make(map[string]metacache, zb0002)
} else if len(z.caches) > 0 {
for key := range z.caches {
delete(z.caches, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 metacache
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "caches")
return
}
bts, err = za0002.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "caches", za0001)
return
}
z.caches[za0001] = za0002
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *bucketMetacache) Msgsize() (s int) {
s = 1 + 7 + msgp.StringPrefixSize + len(z.bucket) + 7 + msgp.MapHeaderSize
if z.caches != nil {
for za0001, za0002 := range z.caches {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize()
}
}
return
}

View File

@ -1,123 +0,0 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalbucketMetacache(t *testing.T) {
v := bucketMetacache{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgbucketMetacache(b *testing.B) {
v := bucketMetacache{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgbucketMetacache(b *testing.B) {
v := bucketMetacache{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalbucketMetacache(b *testing.B) {
v := bucketMetacache{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodebucketMetacache(t *testing.T) {
v := bucketMetacache{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodebucketMetacache Msgsize() is inaccurate")
}
vn := bucketMetacache{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodebucketMetacache(b *testing.B) {
v := bucketMetacache{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodebucketMetacache(b *testing.B) {
v := bucketMetacache{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -64,7 +64,6 @@ func (m *metacacheManager) initManager() {
defer t.Stop()
var exit bool
bg := context.Background()
for !exit {
select {
case <-t.C:
@ -76,7 +75,6 @@ func (m *metacacheManager) initManager() {
if !exit {
v.cleanup()
}
logger.LogIf(bg, v.save(bg))
}
m.mu.RUnlock()
m.mu.Lock()
@ -116,8 +114,8 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
// Return a transient bucket for invalid or system buckets.
m.mu.RLock()
b, ok := m.buckets[bucket]
m.mu.RUnlock()
if ok {
m.mu.RUnlock()
if b.bucket != bucket {
logger.Info("getBucket: cached bucket %s does not match this bucket %s", b.bucket, bucket)
debug.PrintStack()
@ -125,11 +123,12 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
return b
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
// See if someone else fetched it while we waited for the lock.
b, ok = m.buckets[bucket]
if ok {
m.mu.Unlock()
if b.bucket != bucket {
logger.Info("getBucket: newly cached bucket %s does not match this bucket %s", b.bucket, bucket)
debug.PrintStack()
@ -137,16 +136,9 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
return b
}
// Load bucket. If we fail return the transient bucket.
b, err := loadBucketMetaCache(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
}
if b.bucket != bucket {
logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket))
}
// New bucket. If we fail return the transient bucket.
b = newBucketMetacache(bucket, true)
m.buckets[bucket] = b
m.mu.Unlock()
return b
}

View File

@ -127,9 +127,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
return entries, io.EOF
}
if !errors.Is(err, context.DeadlineExceeded) {
// TODO: Remove, not really informational.
logger.LogIf(ctx, err)
o.debugln("listPath: deadline exceeded")
o.debugln("listPath: got error", err)
}
o.Transient = true
o.Create = false
@ -146,6 +144,22 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
} else {
// Continue listing
o.ID = c.id
go func(meta metacache) {
// Continuously update while we wait.
t := time.NewTicker(metacacheMaxClientWait / 10)
defer t.Stop()
select {
case <-ctx.Done():
// Request is done, stop updating.
return
case <-t.C:
meta.lastHandout = time.Now()
if rpc == nil {
meta, _ = localMetacacheMgr.updateCacheEntry(meta)
}
meta, _ = rpc.UpdateMetacacheListing(ctx, meta)
}
}(*c)
}
}
}

View File

@ -599,7 +599,7 @@ type metaCacheRPC struct {
func (m *metaCacheRPC) setErr(err string) {
m.mu.Lock()
defer m.mu.Lock()
defer m.mu.Unlock()
meta := *m.meta
if meta.status != scanStateError {
meta.error = err
@ -643,11 +643,20 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
metaMu.Lock()
meta := *mc.meta
meta, err = o.updateMetacacheListing(meta, rpc)
*mc.meta = meta
if meta.status == scanStateError {
logger.LogIf(ctx, err)
if err == nil && time.Since(meta.lastHandout) > metacacheMaxClientWait {
cancel()
exit = true
meta.status = scanStateError
meta.error = fmt.Sprintf("listing canceled since time since last handout was %v ago", time.Since(meta.lastHandout).Round(time.Second))
o.debugln(color.Green("saveMetaCacheStream: ") + meta.error)
meta, err = o.updateMetacacheListing(meta, rpc)
}
if err == nil {
*mc.meta = meta
if meta.status == scanStateError {
cancel()
exit = true
}
}
metaMu.Unlock()
}
@ -664,7 +673,7 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
if len(b.data) == 0 && b.n == 0 && o.Transient {
return nil
}
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n))
o.debugln(color.Green("saveMetaCacheStream:")+" saving block", b.n, "to", o.objectPath(b.n))
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
logger.LogIf(ctx, err)
custom := b.headerKV()
@ -696,6 +705,8 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
switch err.(type) {
case ObjectNotFound:
return err
case StorageErr:
return err
case InsufficientReadQuorum:
default:
logger.LogIf(ctx, err)

View File

@ -110,11 +110,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
var scanDir func(path string) error
scanDir = func(current string) error {
// always skip the directory that doesn't match the prefix
if len(current) > 0 && !strings.HasPrefix(current, prefix) {
return nil
}
// Skip forward, if requested...
forward := ""
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) {
@ -143,7 +138,16 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
}
dirObjects := make(map[string]struct{})
for i, entry := range entries {
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
// Do do not retain the file, since it doesn't
// match the prefix.
entries[i] = ""
continue
}
if len(forward) > 0 && entry < forward {
// Do do not retain the file, since its
// lexially smaller than 'forward'
entries[i] = ""
continue
}
if strings.HasSuffix(entry, slashSeparator) {
@ -199,6 +203,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Process in sort order.
sort.Strings(entries)
dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level as we have already filtered the list.
if len(forward) > 0 {
idx := sort.SearchStrings(entries, forward)
if idx > 0 {

View File

@ -39,6 +39,9 @@ const (
// Time in which the initiator of a scan must have reported back.
metacacheMaxRunningAge = time.Minute
// Max time between client calls before dropping an async cache listing.
metacacheMaxClientWait = 3 * time.Minute
// metacacheBlockSize is the number of file/directory entries to have in each block.
metacacheBlockSize = 5000
@ -82,8 +85,9 @@ func (m *metacache) worthKeeping() bool {
case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge:
// Not finished and update for metacacheMaxRunningAge, discard it.
return false
case cache.finished() && time.Since(cache.lastHandout) > 30*time.Minute:
// Keep only for 30 minutes.
case cache.finished() && time.Since(cache.lastHandout) > 5*metacacheMaxClientWait:
// Keep for 15 minutes after we last saw the client.
// Since the cache is finished keeping it a bit longer doesn't hurt us.
return false
case cache.status == scanStateError || cache.status == scanStateNone:
// Remove failed listings after 5 minutes.
@ -113,6 +117,9 @@ func baseDirFromPrefix(prefix string) string {
func (m *metacache) update(update metacache) {
m.lastUpdate = UTCNow()
if m.lastHandout.After(m.lastHandout) {
m.lastHandout = UTCNow()
}
if m.status == scanStateStarted && update.status == scanStateSuccess {
m.ended = UTCNow()
}
@ -121,7 +128,8 @@ func (m *metacache) update(update metacache) {
m.status = update.status
}
if m.status == scanStateStarted && time.Since(m.lastHandout) > 15*time.Minute {
if m.status == scanStateStarted && time.Since(m.lastHandout) > metacacheMaxClientWait {
// Drop if client hasn't been seen for 3 minutes.
m.status = scanStateError
m.error = "client not seen"
}

View File

@ -226,7 +226,7 @@ func Test_metacache_finished(t *testing.T) {
func Test_metacache_worthKeeping(t *testing.T) {
// TODO: Update...
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: false}
wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: true, 7: false, 8: false}
for i, tt := range metaCacheTestset {
t.Run(tt.id, func(t *testing.T) {

View File

@ -84,6 +84,10 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{testBuckets[4], "file1/guidSplunk-aaaa/file", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-10/issue", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-11/issue", "content", nil},
{testBuckets[5], "foo/201910/1122", "content", nil},
{testBuckets[5], "foo/201910/1112", "content", nil},
{testBuckets[5], "foo/201910/2112", "content", nil},
{testBuckets[5], "foo/201910_txt", "content", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
@ -477,6 +481,24 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
IsTruncated: true,
Prefixes: []string{"dir/day_id=2017-10-10/"},
},
// ListObjectsResult-37 list with prefix match 2 levels deep
{
IsTruncated: false,
Objects: []ObjectInfo{
{Name: "foo/201910/1112"},
{Name: "foo/201910/1122"},
},
},
// ListObjectsResult-38 list with prefix match 1 level deep
{
IsTruncated: false,
Objects: []ObjectInfo{
{Name: "foo/201910/1112"},
{Name: "foo/201910/1122"},
{Name: "foo/201910/2112"},
{Name: "foo/201910_txt"},
},
},
}
testCases := []struct {
@ -602,6 +624,9 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{testBuckets[4], "file1/", "", "guidSplunk", 1000, resultCases[35], nil, true},
// Test listing at prefix with expected prefix markers
{testBuckets[5], "dir/", "", SlashSeparator, 1, resultCases[36], nil, true},
// Test listing with prefix match
{testBuckets[5], "foo/201910/11", "", "", 1000, resultCases[37], nil, true},
{testBuckets[5], "foo/201910", "", "", 1000, resultCases[38], nil, true},
}
for i, testCase := range testCases {

View File

@ -375,6 +375,10 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
values.Set(storageRESTLength, strconv.Itoa(int(size)))
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
defer xhttp.DrainBody(respBody)
if err != nil {
return err
}
_, err = waitForHTTPResponse(respBody)
return err
}

View File

@ -326,8 +326,8 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
return
}
done := keepHTTPResponseAlive(w)
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body))
done, body := keepHTTPReqResponseAlive(w, r)
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body))
}
// DeleteVersion delete updated metadata.
@ -719,8 +719,100 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
}
}
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
// the Close() function is called.
type closeNotifier struct {
rc io.ReadCloser
done chan struct{}
}
func (c *closeNotifier) Read(p []byte) (n int, err error) {
n, err = c.rc.Read(p)
if err != nil {
if c.done != nil {
close(c.done)
c.done = nil
}
}
return n, err
}
func (c *closeNotifier) Close() error {
if c.done != nil {
close(c.done)
c.done = nil
}
return c.rc.Close()
}
// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage
// operations, such as bitrot verification or data usage scanning.
// Every 10 seconds a space character is sent.
// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker.
// The returned function should always be called to release resources.
// An optional error can be sent which will be picked as text only error,
// without its original type by the receiver.
// waitForHTTPResponse should be used to the receiving side.
func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) {
bodyDoneCh := make(chan struct{})
doneCh := make(chan error)
ctx := r.Context()
go func() {
// Wait for body to be read.
select {
case <-ctx.Done():
case <-bodyDoneCh:
case err := <-doneCh:
if err != nil {
w.Write([]byte{1})
w.Write([]byte(err.Error()))
} else {
w.Write([]byte{0})
}
close(doneCh)
return
}
defer close(doneCh)
// Initiate ticker after body has been read.
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
// Response not ready, write a filler byte.
w.Write([]byte{32})
w.(http.Flusher).Flush()
case err := <-doneCh:
if err != nil {
w.Write([]byte{1})
w.Write([]byte(err.Error()))
} else {
w.Write([]byte{0})
}
ticker.Stop()
return
}
}
}()
return func(err error) {
if doneCh == nil {
return
}
// Indicate we are ready to write.
doneCh <- err
// Wait for channel to be closed so we don't race on writes.
<-doneCh
// Clear so we can be called multiple times without crashing.
doneCh = nil
}, &closeNotifier{rc: r.Body, done: bodyDoneCh}
}
// keepHTTPResponseAlive can be used to avoid timeouts with long storage
// operations, such as bitrot verification or data usage scanning.
// keepHTTPResponseAlive may NOT be used until the request body has been read,
// use keepHTTPReqResponseAlive instead.
// Every 10 seconds a space character is sent.
// The returned function should always be called to release resources.
// An optional error can be sent which will be picked as text only error,