fix: missing entries on first list resume (#13627)

On first list resume or when specifying a custom markers entries could be missed in rare cases.

Do conservative truncation of entries when forwarding.

Replaces #13619
This commit is contained in:
Klaus Post 2021-11-10 19:41:21 +01:00 committed by GitHub
parent d008e90d50
commit c897b6a82d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 154 additions and 4 deletions

View File

@ -116,6 +116,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
forward := ""
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) {
forward = strings.TrimPrefix(opts.ForwardTo, current)
// Trim further directories and trailing slash.
if idx := strings.IndexByte(forward, '/'); idx > 0 {
forward = forward[:idx]
}
@ -163,7 +164,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
entries[i] = entry
continue
}
// Trim slash, maybe compiler is clever?
// Trim slash, since we don't know if this is folder or object.
entries[i] = entries[i][:len(entry)-1]
continue
}
@ -214,9 +215,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level as we have already filtered the list.
if len(forward) > 0 {
idx := sort.SearchStrings(entries, forward)
if idx > 0 {
entries = entries[idx:]
// Conservative forwarding. Entries may be either objects or prefixes.
for i, entry := range entries {
if entry >= forward || strings.HasPrefix(forward, entry) {
entries = entries[i:]
break
}
}
}

View File

@ -1378,6 +1378,152 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
}
}
// Wrapper for calling ListObjects continuation tests for both Erasure multiple disks and single node setup.
func TestListObjectsContinuation(t *testing.T) {
ExecObjectLayerTest(t, testListObjectsContinuation)
}
// Unit test for ListObjects in general.
func testListObjectsContinuation(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
t, _ := t1.(*testing.T)
testBuckets := []string{
// This bucket is used for testing ListObject operations.
"test-bucket-list-object-continuation-1",
"test-bucket-list-object-continuation-2",
}
for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{})
if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error())
}
}
var err error
testObjects := []struct {
parentBucket string
name string
content string
meta map[string]string
}{
{testBuckets[0], "a/1.txt", "contentstring", nil},
{testBuckets[0], "a-1.txt", "contentstring", nil},
{testBuckets[0], "a.txt", "contentstring", nil},
{testBuckets[0], "apache2-doc/1.txt", "contentstring", nil},
{testBuckets[0], "apache2/1.txt", "contentstring", nil},
{testBuckets[0], "apache2/-sub/2.txt", "contentstring", nil},
{testBuckets[1], "azerty/1.txt", "contentstring", nil},
{testBuckets[1], "apache2-doc/1.txt", "contentstring", nil},
{testBuckets[1], "apache2/1.txt", "contentstring", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
_, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta})
if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error())
}
}
// Formualting the result data set to be expected from ListObjects call inside the tests,
// This will be used in testCases and used for asserting the correctness of ListObjects output in the tests.
resultCases := []ListObjectsInfo{
{
Objects: []ObjectInfo{
{Name: "a-1.txt"},
{Name: "a.txt"},
{Name: "a/1.txt"},
{Name: "apache2-doc/1.txt"},
{Name: "apache2/-sub/2.txt"},
{Name: "apache2/1.txt"},
},
},
{
Objects: []ObjectInfo{
{Name: "apache2-doc/1.txt"},
{Name: "apache2/1.txt"},
},
},
{
Prefixes: []string{"apache2-doc/", "apache2/", "azerty/"},
},
}
testCases := []struct {
// Inputs to ListObjects.
bucketName string
prefix string
delimiter string
page int
// Expected output of ListObjects.
result ListObjectsInfo
}{
{testBuckets[0], "", "", 1, resultCases[0]},
{testBuckets[0], "a", "", 1, resultCases[0]},
{testBuckets[1], "apache", "", 1, resultCases[1]},
{testBuckets[1], "", "/", 1, resultCases[2]},
}
for i, testCase := range testCases {
testCase := testCase
t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) {
var foundObjects []ObjectInfo
var foundPrefixes []string
var marker = ""
for {
result, err := obj.ListObjects(context.Background(), testCase.bucketName,
testCase.prefix, marker, testCase.delimiter, testCase.page)
if err != nil {
t.Fatalf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s", i+1, instanceType, err.Error())
}
foundObjects = append(foundObjects, result.Objects...)
foundPrefixes = append(foundPrefixes, result.Prefixes...)
if !result.IsTruncated {
break
}
marker = result.NextMarker
if len(result.Objects) > 0 {
// Discard marker, so it cannot resume listing.
marker = result.Objects[len(result.Objects)-1].Name
}
}
if len(testCase.result.Objects) != len(foundObjects) {
t.Logf("want: %v", objInfoNames(testCase.result.Objects))
t.Logf("got: %v", objInfoNames(foundObjects))
t.Errorf("Test %d: %s: Expected number of objects in the result to be '%d', but found '%d' objects instead",
i+1, instanceType, len(testCase.result.Objects), len(foundObjects))
}
for j := 0; j < len(testCase.result.Objects); j++ {
if j >= len(foundObjects) {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.result.Objects[j].Name)
continue
}
if testCase.result.Objects[j].Name != foundObjects[j].Name {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, foundObjects[j].Name)
}
}
if len(testCase.result.Prefixes) != len(foundPrefixes) {
t.Logf("want: %v", testCase.result.Prefixes)
t.Logf("got: %v", foundPrefixes)
t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead",
i+1, instanceType, len(testCase.result.Prefixes), len(foundPrefixes))
}
for j := 0; j < len(testCase.result.Prefixes); j++ {
if j >= len(foundPrefixes) {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.result.Prefixes[j])
continue
}
if testCase.result.Prefixes[j] != foundPrefixes[j] {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Prefixes[j], foundPrefixes[j])
}
}
})
}
}
// Initialize FS backend for the benchmark.
func initFSObjectsB(disk string, t *testing.B) (obj ObjectLayer) {
var err error