diff --git a/pkg/fs/api_suite_nix_test.go b/pkg/fs/api_suite_nix_test.go index 40f60f7d5..0c6378734 100644 --- a/pkg/fs/api_suite_nix_test.go +++ b/pkg/fs/api_suite_nix_test.go @@ -174,22 +174,20 @@ func testPaging(c *check.C, create func() Filesystem) { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - // TODO - //result, err = fs.ListObjects("bucket", "", "", "", 5) - //c.Assert(err, check.IsNil) - //c.Assert(len(result.Objects), check.Equals, i+1) - //c.Assert(result.IsTruncated, check.Equals, false) + result, err = fs.ListObjects("bucket", "", "", "", 5) + c.Assert(err, check.IsNil) + c.Assert(len(result.Objects), check.Equals, i+1) + c.Assert(result.IsTruncated, check.Equals, false) } // check after paging occurs pages work for i := 6; i <= 10; i++ { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - // TODO - //result, err = fs.ListObjects("bucket", "obj", "", "", 5) - //c.Assert(err, check.IsNil) - //c.Assert(len(result.Objects), check.Equals, 5) - //c.Assert(result.IsTruncated, check.Equals, true) + result, err = fs.ListObjects("bucket", "obj", "", "", 5) + c.Assert(err, check.IsNil) + c.Assert(len(result.Objects), check.Equals, 5) + c.Assert(result.IsTruncated, check.Equals, true) } // check paging with prefix at end returns less objects { diff --git a/pkg/fs/api_suite_windows_test.go b/pkg/fs/api_suite_windows_test.go index 574ce69dc..ecc179374 100644 --- a/pkg/fs/api_suite_windows_test.go +++ b/pkg/fs/api_suite_windows_test.go @@ -173,22 +173,20 @@ func testPaging(c *check.C, create func() Filesystem) { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - // TODO - //result, err = fs.ListObjects("bucket", "", "", "", 5) - //c.Assert(err, check.IsNil) - //c.Assert(len(result.Objects), check.Equals, i+1) - //c.Assert(result.IsTruncated, check.Equals, false) + result, err = fs.ListObjects("bucket", "", "", "", 5) + c.Assert(err, check.IsNil) + c.Assert(len(result.Objects), check.Equals, i+1) + c.Assert(result.IsTruncated, check.Equals, false) } // check after paging occurs pages work for i := 6; i <= 10; i++ { key := "obj" + strconv.Itoa(i) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) c.Assert(err, check.IsNil) - // TODO - //result, err = fs.ListObjects("bucket", "", "", "", 5) - //c.Assert(err, check.IsNil) - //c.Assert(len(result.Objects), check.Equals, 5) - //c.Assert(result.IsTruncated, check.Equals, true) + result, err = fs.ListObjects("bucket", "", "", "", 5) + c.Assert(err, check.IsNil) + c.Assert(len(result.Objects), check.Equals, 5) + c.Assert(result.IsTruncated, check.Equals, true) } // check paging with prefix at end returns less objects { diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 7cb3ce0ca..96ad95179 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -60,7 +60,7 @@ type listWorkerReq struct { func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) { quitWalker := make(chan bool) reqCh := make(chan listWorkerReq) - walkerCh := make(chan ObjectMetadata, 1000) + walkerCh := make(chan ObjectMetadata, 2000) go func() { defer close(walkerCh) var walkPath string @@ -81,6 +81,7 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe } } ioutils.FTW(walkPath, func(path string, info os.FileInfo, e error) error { + // For any error return right here. if e != nil { return e } @@ -88,10 +89,11 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe if strings.Contains(path, "$multiparts") || strings.Contains(path, "$tmpobject") { return nil } - // We don't need to list the walk path. - if path == walkPath { + // We don't need to list the walk path if its a directory. + if path == walkPath && info.IsDir() { return nil } + // Skip all directories if there is no delimiter. if info.IsDir() && delimiter == "" { return nil } @@ -115,8 +117,7 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe // Returning error ends the file tree Walk(). return errors.New("Quit list worker.") } - // If delimiter is set, we stop if current path is a - // directory. + // If delimiter is set, we stop if current path is a directory. if delimiter != "" && info.IsDir() { return ioutils.ErrSkipDir } @@ -128,9 +129,9 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe go func() { for { select { - // Timeout after 1 seconds if request did not arrive for + // Timeout after 30 seconds if request did not arrive for // the given list parameters. - case <-time.After(1 * time.Second): + case <-time.After(30 * time.Second): quitWalker <- true // Quit file path walk if running. // Send back the hash for this request. fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter) @@ -143,7 +144,32 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe } resp := ListObjectsResult{} var count int - for object := range walkerCh { + for { + // We have read all the keys necessary by now. We + // cleanly break out. + if count == maxKeys { + if delimiter != "" { + // Set the next marker for the next request. + // This element is set only if you have delimiter set. + // If response does not include the NextMaker and it is + // truncated, you can use the value of the last Key in the + // response as the marker in the subsequent request to get the + // next set of object keys. + if len(resp.Objects) > 0 { + // NextMarker is only set when there + // are more than maxKeys worth of + // objects for a given prefix path. + resp.NextMarker = resp.Objects[len(resp.Objects)-1:][0].Object + } + } + resp.IsTruncated = len(walkerCh) > 0 + break + } + object, walkerOK := <-walkerCh + // If the channel is closed return right here. + if !walkerOK { + break + } // Verify if the object is lexically smaller than // the marker, we will skip those objects. if marker != "" { @@ -162,36 +188,15 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe if object.Mode.IsDir() { resp.Prefixes = append(resp.Prefixes, object.Object) } else { - // Rest of them are treated as files. + // Rest of them are treated as objects. resp.Objects = append(resp.Objects, object) } } else { // In-case of no delimiters, there are no - // prefixes all are considered to be objects. + // prefixes - all are considered to be objects. resp.Objects = append(resp.Objects, object) } - count++ // Bump the counter - // Verify if we have reached the maxKeys requested. - if count == maxKeys { - if delimiter != "" { - // Set the next marker for the next request. - // This element is set only if you have delimiter set. - // If response does not include the NextMaker and it is - // truncated, you can use the value of the last Key in the - // response as the marker in the subsequent request to get the - // next set of object keys. - if len(resp.Objects) > 0 { - // NextMarker is only set when there - // are more than maxKeys worth of - // objects for a given prefix path. - resp.NextMarker = resp.Objects[len(resp.Objects)-1:][0].Object - } - } - // Set truncated boolean to indicate the - // client to send the next batch of requests. - resp.IsTruncated = true - break - } + count++ // Bump the number. } // Set the marker right here for the new set of the // values coming in the from the client. @@ -271,10 +276,6 @@ func (fs *Filesystem) listObjectsService() *probe.Error { delete(reqToListWorkerReqCh, reqHash) if !resp.IsTruncated { close(listWorkerReqCh) - } else { - nextMarker := resp.NextMarker - reqHash = fnvSum(bucket, prefix, nextMarker, delimiter) - reqToListWorkerReqCh[reqHash] = listWorkerReqCh } srvReq.respCh <- resp }