list: Fix continuation among similar prefixe names

WalkDir was removing trailing slashes but then do sorting which can go
wrong in one case:

There are two prefixes at the same level, apache2-doc/ and apache2/.
apache2/ should always come next to apache2-doc/ but after removing
the trailing slash, sorting makes apache2 comes first.

However the issue happens only during continuation listing, because of
some code that is ignoring object names that are supposed to be past the
marker but wrong sorting as explained above will make some prefixes to
be lost during listing.
This commit is contained in:
Anis Elleuch 2021-11-09 09:46:47 -05:00
parent edf1f4233b
commit 364cc9ae15
2 changed files with 133 additions and 5 deletions

View file

@ -117,7 +117,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) {
forward = strings.TrimPrefix(opts.ForwardTo, current)
if idx := strings.IndexByte(forward, '/'); idx > 0 {
forward = forward[:idx]
forward = forward[:idx+1]
}
}
if contextCanceled(ctx) {
@ -163,8 +163,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
entries[i] = entry
continue
}
// Trim slash, maybe compiler is clever?
entries[i] = entries[i][:len(entry)-1]
continue
}
// Do do not retain the file.
@ -259,12 +257,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// It was an object
if isDirObj {
meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator
} else {
meta.name = strings.TrimSuffix(meta.name, slashSeparator)
}
out <- meta
case osIsNotExist(err), isSysErrIsDir(err):
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
if err == nil {
// It was an object
meta.name = strings.TrimSuffix(meta.name, slashSeparator)
out <- meta
continue
}
@ -272,8 +273,8 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// NOT an object, append to stack (with slash)
// If dirObject, but no metadata (which is unexpected) we skip it.
if !isDirObj {
if !isDirEmpty(pathJoin(volumeDir, meta.name+slashSeparator)) {
dirStack = append(dirStack, meta.name+slashSeparator)
if !isDirEmpty(pathJoin(volumeDir, meta.name)) {
dirStack = append(dirStack, meta.name)
}
}
case isSysErrNotDir(err):

View file

@ -1378,6 +1378,133 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
}
}
// Wrapper for calling ListObjects continuation tests for both Erasure multiple disks and single node setup.
func TestListObjectsContinuation(t *testing.T) {
ExecObjectLayerTest(t, testListObjectsContinuation)
}
// Unit test for ListObjects in general.
func testListObjectsContinuation(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
t, _ := t1.(*testing.T)
testBuckets := []string{
// This bucket is used for testing ListObject operations.
"test-bucket-list-object-continuation-1",
"test-bucket-list-object-continuation-2",
}
for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{})
if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error())
}
}
var err error
testObjects := []struct {
parentBucket string
name string
content string
meta map[string]string
}{
{testBuckets[0], "apache2-doc/1.txt", "contentstring", nil},
{testBuckets[0], "apache2/1.txt", "contentstring", nil},
{testBuckets[1], "azerty/1.txt", "contentstring", nil},
{testBuckets[1], "apache2-doc/1.txt", "contentstring", nil},
{testBuckets[1], "apache2/1.txt", "contentstring", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
_, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta})
if err != nil {
t.Fatalf("%s : %s", instanceType, err.Error())
}
}
// Formualting the result data set to be expected from ListObjects call inside the tests,
// This will be used in testCases and used for asserting the correctness of ListObjects output in the tests.
resultCases := []ListObjectsInfo{
{
Objects: []ObjectInfo{
{Name: "apache2-doc/1.txt"},
{Name: "apache2/1.txt"},
},
},
{
Prefixes: []string{"apache2-doc/", "apache2/", "azerty/"},
},
}
testCases := []struct {
// Inputs to ListObjects.
bucketName string
prefix string
delimiter string
page int
// Expected output of ListObjects.
result ListObjectsInfo
}{
{testBuckets[0], "", "", 1, resultCases[0]},
{testBuckets[1], "apache2", "", 1, resultCases[0]},
{testBuckets[1], "", "/", 1, resultCases[1]},
}
for i, testCase := range testCases {
testCase := testCase
t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) {
var foundObjects []ObjectInfo
var foundPrefixes []string
var marker = ""
for {
result, err := obj.ListObjects(context.Background(), testCase.bucketName,
testCase.prefix, marker, testCase.delimiter, testCase.page)
if err != nil {
t.Fatalf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s", i+1, instanceType, err.Error())
}
foundObjects = append(foundObjects, result.Objects...)
foundPrefixes = append(foundPrefixes, result.Prefixes...)
if !result.IsTruncated {
break
}
marker = result.NextMarker
}
if len(testCase.result.Objects) != len(foundObjects) {
t.Logf("want: %v", objInfoNames(testCase.result.Objects))
t.Logf("got: %v", objInfoNames(foundObjects))
t.Errorf("Test %d: %s: Expected number of objects in the result to be '%d', but found '%d' objects instead",
i+1, instanceType, len(testCase.result.Objects), len(foundObjects))
}
for j := 0; j < len(testCase.result.Objects); j++ {
if j >= len(foundObjects) {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but not nothing instead", i+1, instanceType, testCase.result.Objects[j].Name)
continue
}
if testCase.result.Objects[j].Name != foundObjects[j].Name {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, foundObjects[j].Name)
}
}
if len(testCase.result.Prefixes) != len(foundPrefixes) {
t.Logf("want: %v", testCase.result.Prefixes)
t.Logf("got: %v", foundPrefixes)
t.Errorf("Test %d: %s: Expected number of prefixes in the result to be '%d', but found '%d' prefixes instead",
i+1, instanceType, len(testCase.result.Prefixes), len(foundPrefixes))
}
for j := 0; j < len(testCase.result.Prefixes); j++ {
if j >= len(foundPrefixes) {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found no result", i+1, instanceType, testCase.result.Prefixes[j])
continue
}
if testCase.result.Prefixes[j] != foundPrefixes[j] {
t.Errorf("Test %d: %s: Expected prefix name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Prefixes[j], foundPrefixes[j])
}
}
})
}
}
// Initialize FS backend for the benchmark.
func initFSObjectsB(disk string, t *testing.B) (obj ObjectLayer) {
var err error