listObjects: Channel based changes.

Supports:
 - prefixes
 - marker
This commit is contained in:
Harshavardhana 2016-01-19 17:49:48 -08:00
parent 9e18bfa60e
commit 682020ef2f
12 changed files with 202 additions and 314 deletions

View file

@ -53,7 +53,6 @@ const (
InvalidBucketName InvalidBucketName
InvalidDigest InvalidDigest
InvalidRange InvalidRange
InvalidRequest
InvalidMaxKeys InvalidMaxKeys
InvalidMaxUploads InvalidMaxUploads
InvalidMaxParts InvalidMaxParts

View file

@ -24,12 +24,12 @@ import (
) )
// parse bucket url queries // parse bucket url queries
func getBucketResources(values url.Values) (v fs.BucketResourcesMetadata) { func getBucketResources(values url.Values) (prefix, marker, delimiter string, maxkeys int, encodingType string) {
v.Prefix = values.Get("prefix") prefix = values.Get("prefix")
v.Marker = values.Get("marker") marker = values.Get("marker")
v.Maxkeys, _ = strconv.Atoi(values.Get("max-keys")) delimiter = values.Get("delimiter")
v.Delimiter = values.Get("delimiter") maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
v.EncodingType = values.Get("encoding-type") encodingType = values.Get("encoding-type")
return return
} }

View file

@ -92,8 +92,7 @@ func generateAccessControlPolicyResponse(acl fs.BucketACL) AccessControlPolicyRe
} }
// generates an ListObjects response for the said bucket with other enumerated options. // generates an ListObjects response for the said bucket with other enumerated options.
// func generateListObjectsResponse(bucket string, objects []fs.ObjectMetadata, bucketResources fs.BucketResourcesMetadata) ListObjectsResponse { func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKeys int, resp fs.ListObjectsResult) ListObjectsResponse {
func generateListObjectsResponse(bucket string, req fs.ListObjectsReq, resp fs.ListObjectsResp) ListObjectsResponse {
var contents []*Object var contents []*Object
var prefixes []*CommonPrefix var prefixes []*CommonPrefix
var owner = Owner{} var owner = Owner{}
@ -119,10 +118,10 @@ func generateListObjectsResponse(bucket string, req fs.ListObjectsReq, resp fs.L
data.Name = bucket data.Name = bucket
data.Contents = contents data.Contents = contents
data.MaxKeys = req.MaxKeys data.Prefix = prefix
data.Prefix = req.Prefix data.Marker = marker
data.Delimiter = req.Delimiter data.Delimiter = delimiter
data.Marker = req.Marker data.MaxKeys = maxKeys
data.NextMarker = resp.NextMarker data.NextMarker = resp.NextMarker
data.IsTruncated = resp.IsTruncated data.IsTruncated = resp.IsTruncated

View file

@ -52,6 +52,7 @@ func (api CloudStorageAPI) GetBucketLocationHandler(w http.ResponseWriter, req *
default: default:
writeErrorResponse(w, req, InternalError, req.URL.Path) writeErrorResponse(w, req, InternalError, req.URL.Path)
} }
return
} }
// TODO: Location value for LocationResponse is deliberately not used, until // TODO: Location value for LocationResponse is deliberately not used, until
@ -128,25 +129,21 @@ func (api CloudStorageAPI) ListObjectsHandler(w http.ResponseWriter, req *http.R
} }
} }
} }
resources := getBucketResources(req.URL.Query())
if resources.Maxkeys < 0 { // TODO handle encoding type.
prefix, marker, delimiter, maxkeys, _ := getBucketResources(req.URL.Query())
if maxkeys < 0 {
writeErrorResponse(w, req, InvalidMaxKeys, req.URL.Path) writeErrorResponse(w, req, InvalidMaxKeys, req.URL.Path)
return return
} }
if resources.Maxkeys == 0 { if maxkeys == 0 {
resources.Maxkeys = maxObjectList maxkeys = maxObjectList
} }
listReq := fs.ListObjectsReq{ listResp, err := api.Filesystem.ListObjects(bucket, prefix, marker, delimiter, maxkeys)
Prefix: resources.Prefix,
Marker: resources.Marker,
Delimiter: resources.Delimiter,
MaxKeys: resources.Maxkeys,
}
listResp, err := api.Filesystem.ListObjects(bucket, listReq)
if err == nil { if err == nil {
// generate response // generate response
response := generateListObjectsResponse(bucket, listReq, listResp) response := generateListObjectsResponse(bucket, prefix, marker, delimiter, maxkeys, listResp)
encodedSuccessResponse := encodeSuccessResponse(response) encodedSuccessResponse := encodeSuccessResponse(response)
// Write headers // Write headers
setCommonHeaders(w) setCommonHeaders(w)

View file

@ -165,59 +165,50 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) {
func testPaging(c *check.C, create func() Filesystem) { func testPaging(c *check.C, create func() Filesystem) {
fs := create() fs := create()
fs.MakeBucket("bucket", "") fs.MakeBucket("bucket", "")
resources := BucketResourcesMetadata{} result, err := fs.ListObjects("bucket", "", "", "", 0)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
// check before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, i+1) c.Assert(len(result.Objects), check.Equals, i+1)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
} }
// check after paging occurs pages work // check after paging occurs pages work
for i := 6; i <= 10; i++ { for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 5) c.Assert(len(result.Objects), check.Equals, 5)
c.Assert(resources.IsTruncated, check.Equals, true) c.Assert(result.IsTruncated, check.Equals, true)
} }
// check paging with prefix at end returns less objects // check paging with prefix at end returns less objects
{ {
_, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) _, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) _, err = fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 2) c.Assert(len(result.Objects), check.Equals, 2)
} }
// check ordering of pages // check ordering of pages
{ {
resources.Prefix = "" result, err = fs.ListObjects("bucket", "", "", "", 1000)
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
} }
// check delimited results with delimiter and prefix // check delimited results with delimiter and prefix
@ -226,72 +217,49 @@ func testPaging(c *check.C, create func() Filesystem) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) _, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var prefixes []string result, err = fs.ListObjects("bucket", "this/is/", "", "/", 10)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = "this/is/"
resources.Maxkeys = 10
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 1) c.Assert(len(result.Objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/is/also/") c.Assert(result.Prefixes[0], check.Equals, "this/is/also/")
} }
time.Sleep(time.Second) time.Sleep(time.Second)
// check delimited results with delimiter without prefix // check delimited results with delimiter without prefix
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "", "/", 1000)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/") c.Assert(result.Prefixes[0], check.Equals, "this/")
} }
// check results with Marker // check results with Marker
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "newPrefix", "", 3)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Prefix = ""
resources.Marker = "newPrefix"
resources.Delimiter = ""
resources.Maxkeys = 3
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix2")
c.Assert(objects[1].Object, check.Equals, "obj0") c.Assert(result.Objects[1].Object, check.Equals, "obj0")
c.Assert(objects[2].Object, check.Equals, "obj1") c.Assert(result.Objects[2].Object, check.Equals, "obj1")
} }
// check ordering of results with prefix // check ordering of results with prefix
{ {
resources.Prefix = "obj" result, err = fs.ListObjects("bucket", "obj", "", "", 1000)
resources.Delimiter = ""
resources.Marker = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "obj0") c.Assert(result.Objects[0].Object, check.Equals, "obj0")
c.Assert(objects[1].Object, check.Equals, "obj1") c.Assert(result.Objects[1].Object, check.Equals, "obj1")
c.Assert(objects[2].Object, check.Equals, "obj10") c.Assert(result.Objects[2].Object, check.Equals, "obj10")
c.Assert(objects[3].Object, check.Equals, "obj2") c.Assert(result.Objects[3].Object, check.Equals, "obj2")
c.Assert(objects[4].Object, check.Equals, "obj3") c.Assert(result.Objects[4].Object, check.Equals, "obj3")
} }
// check ordering of results with prefix and no paging // check ordering of results with prefix and no paging
{ {
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Marker = ""
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
} }
} }
@ -417,11 +385,10 @@ func testListBucketsOrder(c *check.C, create func() Filesystem) {
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) {
fs := create() fs := create()
resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} result, err := fs.ListObjects("bucket", "", "", "", 1000)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.Not(check.IsNil)) c.Assert(err, check.Not(check.IsNil))
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
} }
func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) {

View file

@ -26,7 +26,6 @@ import (
"encoding/xml" "encoding/xml"
"math/rand" "math/rand"
"strconv" "strconv"
"time"
"gopkg.in/check.v1" "gopkg.in/check.v1"
) )
@ -165,59 +164,50 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) {
func testPaging(c *check.C, create func() Filesystem) { func testPaging(c *check.C, create func() Filesystem) {
fs := create() fs := create()
fs.MakeBucket("bucket", "") fs.MakeBucket("bucket", "")
resources := BucketResourcesMetadata{} result, err := fs.ListObjects("bucket", "", "", "", 0)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
// check before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, i+1) c.Assert(len(result.Objects), check.Equals, i+1)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
} }
// check after paging occurs pages work // check after paging occurs pages work
for i := 6; i <= 10; i++ { for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil) _, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Maxkeys = 5 result, err = fs.ListObjects("bucket", "", "", "", 5)
resources.Prefix = ""
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 5) c.Assert(len(result.Objects), check.Equals, 5)
c.Assert(resources.IsTruncated, check.Equals, true) c.Assert(result.IsTruncated, check.Equals, true)
} }
// check paging with prefix at end returns less objects // check paging with prefix at end returns less objects
{ {
_, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil) _, err = fs.CreateObject("bucket", "newPrefix", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) fs.CreateObject("bucket", "newPrefix2", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 2) c.Assert(len(result.Objects), check.Equals, 2)
} }
// check ordering of pages // check ordering of pages
{ {
resources.Prefix = "" result, err = fs.ListObjects("bucket", "", "", "", 1000)
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
} }
// check delimited results with delimiter and prefix // check delimited results with delimiter and prefix
@ -226,72 +216,48 @@ func testPaging(c *check.C, create func() Filesystem) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
_, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil) _, err = fs.CreateObject("bucket", "this/is/also/a/delimited/file", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"), nil)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
var prefixes []string result, err = fs.ListObjects("bucket", "this/is/", "", "/", 10)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = "this/is/"
resources.Maxkeys = 10
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(objects), check.Equals, 1) c.Assert(len(result.Objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/is/also/") c.Assert(result.Prefixes[0], check.Equals, "this/is/also/")
} }
time.Sleep(time.Second)
// check delimited results with delimiter without prefix // check delimited results with delimiter without prefix
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "", "/", 1000)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
c.Assert(objects[2].Object, check.Equals, "obj0") c.Assert(result.Objects[2].Object, check.Equals, "obj0")
c.Assert(objects[3].Object, check.Equals, "obj1") c.Assert(result.Objects[3].Object, check.Equals, "obj1")
c.Assert(objects[4].Object, check.Equals, "obj10") c.Assert(result.Objects[4].Object, check.Equals, "obj10")
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/") c.Assert(result.Prefixes[0], check.Equals, "this/")
} }
// check results with Marker // check results with Marker
{ {
var prefixes []string result, err = fs.ListObjects("bucket", "", "newPrefix", "", 3)
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Prefix = ""
resources.Marker = "newPrefix"
resources.Delimiter = ""
resources.Maxkeys = 3
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix2")
c.Assert(objects[1].Object, check.Equals, "obj0") c.Assert(result.Objects[1].Object, check.Equals, "obj0")
c.Assert(objects[2].Object, check.Equals, "obj1") c.Assert(result.Objects[2].Object, check.Equals, "obj1")
} }
// check ordering of results with prefix // check ordering of results with prefix
{ {
resources.Prefix = "obj" result, err = fs.ListObjects("bucket", "obj", "", "", 1000)
resources.Delimiter = ""
resources.Marker = ""
resources.Maxkeys = 1000
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "obj0") c.Assert(result.Objects[0].Object, check.Equals, "obj0")
c.Assert(objects[1].Object, check.Equals, "obj1") c.Assert(result.Objects[1].Object, check.Equals, "obj1")
c.Assert(objects[2].Object, check.Equals, "obj10") c.Assert(result.Objects[2].Object, check.Equals, "obj10")
c.Assert(objects[3].Object, check.Equals, "obj2") c.Assert(result.Objects[3].Object, check.Equals, "obj2")
c.Assert(objects[4].Object, check.Equals, "obj3") c.Assert(result.Objects[4].Object, check.Equals, "obj3")
} }
// check ordering of results with prefix and no paging // check ordering of results with prefix and no paging
{ {
resources.Prefix = "new" result, err = fs.ListObjects("bucket", "new", "", "", 5)
resources.Marker = ""
resources.Maxkeys = 5
objects, resources, err = fs.ListObjects("bucket", resources)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(objects[0].Object, check.Equals, "newPrefix") c.Assert(result.Objects[0].Object, check.Equals, "newPrefix")
c.Assert(objects[1].Object, check.Equals, "newPrefix2") c.Assert(result.Objects[1].Object, check.Equals, "newPrefix2")
} }
} }
@ -416,11 +382,10 @@ func testListBucketsOrder(c *check.C, create func() Filesystem) {
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) { func testListObjectsTestsForNonExistantBucket(c *check.C, create func() Filesystem) {
fs := create() fs := create()
resources := BucketResourcesMetadata{Prefix: "", Maxkeys: 1000} result, err := fs.ListObjects("bucket", "", "", "", 1000)
objects, resources, err := fs.ListObjects("bucket", resources)
c.Assert(err, check.Not(check.IsNil)) c.Assert(err, check.Not(check.IsNil))
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(result.IsTruncated, check.Equals, false)
c.Assert(len(objects), check.Equals, 0) c.Assert(len(result.Objects), check.Equals, 0)
} }
func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) { func testNonExistantObjectInBucket(c *check.C, create func() Filesystem) {

View file

@ -118,19 +118,15 @@ type BucketMultipartResourcesMetadata struct {
CommonPrefixes []string CommonPrefixes []string
} }
// BucketResourcesMetadata - various types of bucket resources // ListObjectsResult - container for list object request results.
type BucketResourcesMetadata struct { type ListObjectsResult struct {
Prefix string IsTruncated bool
Marker string NextMarker string
NextMarker string Objects []ObjectMetadata
Maxkeys int Prefixes []string
EncodingType string
Delimiter string
IsTruncated bool
CommonPrefixes []string
} }
type ListObjectsReq struct { type listObjectsReq struct {
Bucket string Bucket string
Prefix string Prefix string
Marker string Marker string
@ -138,21 +134,14 @@ type ListObjectsReq struct {
MaxKeys int MaxKeys int
} }
type ListObjectsResp struct {
IsTruncated bool
NextMarker string
Objects []ObjectMetadata
Prefixes []string
}
type listServiceReq struct { type listServiceReq struct {
req ListObjectsReq req listObjectsReq
respCh chan ListObjectsResp respCh chan ListObjectsResult
} }
type listWorkerReq struct { type listWorkerReq struct {
req ListObjectsReq req listObjectsReq
respCh chan ListObjectsResp respCh chan ListObjectsResult
} }
// CompletePart - completed part container // CompletePart - completed part container

View file

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2015 Minio, Inc. * Minio Cloud Storage, (C) 2015-2016 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,7 +18,6 @@ package fs
import ( import (
"errors" "errors"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -27,74 +26,88 @@ import (
"github.com/minio/minio-xl/pkg/probe" "github.com/minio/minio-xl/pkg/probe"
) )
func (fs Filesystem) listWorker(startReq ListObjectsReq) (chan<- listWorkerReq, *probe.Error) { func (fs Filesystem) listWorker(startReq listObjectsReq) (chan<- listWorkerReq, *probe.Error) {
Separator := string(os.PathSeparator)
bucket := startReq.Bucket bucket := startReq.Bucket
prefix := startReq.Prefix prefix := startReq.Prefix
marker := startReq.Marker marker := startReq.Marker
delimiter := startReq.Delimiter delimiter := startReq.Delimiter
quit := make(chan bool) quitWalker := make(chan bool)
if marker != "" {
return nil, probe.NewError(errors.New("Not supported"))
}
if delimiter != "" && delimiter != Separator {
return nil, probe.NewError(errors.New("Not supported"))
}
reqCh := make(chan listWorkerReq) reqCh := make(chan listWorkerReq)
walkerCh := make(chan ObjectMetadata) walkerCh := make(chan ObjectMetadata)
go func() { go func() {
rootPath := filepath.Join(fs.path, bucket, prefix) var rootPath string
stripPath := filepath.Join(fs.path, bucket) + Separator bucketPath := filepath.Join(fs.path, bucket)
trimBucketPathPrefix := bucketPath + string(os.PathSeparator)
prefixPath := trimBucketPathPrefix + prefix
st, err := os.Stat(prefixPath)
if err != nil && os.IsNotExist(err) {
rootPath = bucketPath
} else {
if st.IsDir() && !strings.HasSuffix(prefix, delimiter) {
rootPath = bucketPath
} else {
rootPath = prefixPath
}
}
filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
if path == rootPath { if path == rootPath {
return nil return nil
} }
if info.IsDir() { if info.IsDir() {
path = path + Separator path = path + string(os.PathSeparator)
} }
objectName := strings.TrimPrefix(path, stripPath) objectName := strings.TrimPrefix(path, trimBucketPathPrefix)
object := ObjectMetadata{ if strings.HasPrefix(objectName, prefix) {
Object: objectName, if marker >= objectName {
Created: info.ModTime(), return nil
Mode: info.Mode(), }
Size: info.Size(), object := ObjectMetadata{
} Object: objectName,
select { Created: info.ModTime(),
case walkerCh <- object: Mode: info.Mode(),
// do nothings Size: info.Size(),
case <-quit: }
fmt.Println("walker got quit") select {
// returning error ends the Walk() case walkerCh <- object:
return errors.New("Ending") // Do nothing
} case <-quitWalker:
if delimiter == Separator && info.IsDir() { // Returning error ends the Walk()
return filepath.SkipDir return errors.New("Ending")
}
if delimiter != "" && info.IsDir() {
return filepath.SkipDir
}
} }
return nil return nil
}) })
close(walkerCh) close(walkerCh)
}() }()
go func() { go func() {
resp := ListObjectsResp{} resp := ListObjectsResult{}
for { for {
select { select {
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
fmt.Println("worker got timeout") quitWalker <- true
quit <- true timeoutReq := listObjectsReq{bucket, prefix, marker, delimiter, 0}
timeoutReq := ListObjectsReq{bucket, prefix, marker, delimiter, 0}
fmt.Println("after timeout", fs)
fs.timeoutReqCh <- timeoutReq fs.timeoutReqCh <- timeoutReq
// FIXME: can there be a race such that sender on reqCh panics? // FIXME: can there be a race such that sender on reqCh panics?
return return
case req := <-reqCh: case req, ok := <-reqCh:
resp = ListObjectsResp{} if !ok {
return
}
resp = ListObjectsResult{}
resp.Objects = make([]ObjectMetadata, 0) resp.Objects = make([]ObjectMetadata, 0)
resp.Prefixes = make([]string, 0) resp.Prefixes = make([]string, 0)
count := 0 count := 0
for object := range walkerCh { for object := range walkerCh {
if count == req.req.MaxKeys {
resp.IsTruncated = true
break
}
if object.Mode.IsDir() { if object.Mode.IsDir() {
if delimiter == "" { if delimiter == "" {
// skip directories for recursive list // Skip directories for recursive list
continue continue
} }
resp.Prefixes = append(resp.Prefixes, object.Object) resp.Prefixes = append(resp.Prefixes, object.Object)
@ -103,13 +116,7 @@ func (fs Filesystem) listWorker(startReq ListObjectsReq) (chan<- listWorkerReq,
} }
resp.NextMarker = object.Object resp.NextMarker = object.Object
count++ count++
if count == req.req.MaxKeys {
resp.IsTruncated = true
break
}
} }
fmt.Println("response objects: ", len(resp.Objects))
marker = resp.NextMarker
req.respCh <- resp req.respCh <- resp
} }
} }
@ -118,9 +125,8 @@ func (fs Filesystem) listWorker(startReq ListObjectsReq) (chan<- listWorkerReq,
} }
func (fs *Filesystem) startListService() *probe.Error { func (fs *Filesystem) startListService() *probe.Error {
fmt.Println("startListService starting")
listServiceReqCh := make(chan listServiceReq) listServiceReqCh := make(chan listServiceReq)
timeoutReqCh := make(chan ListObjectsReq) timeoutReqCh := make(chan listObjectsReq)
reqToListWorkerReqCh := make(map[string](chan<- listWorkerReq)) reqToListWorkerReqCh := make(map[string](chan<- listWorkerReq))
reqToStr := func(bucket string, prefix string, marker string, delimiter string) string { reqToStr := func(bucket string, prefix string, marker string, delimiter string) string {
return strings.Join([]string{bucket, prefix, marker, delimiter}, ":") return strings.Join([]string{bucket, prefix, marker, delimiter}, ":")
@ -129,7 +135,6 @@ func (fs *Filesystem) startListService() *probe.Error {
for { for {
select { select {
case timeoutReq := <-timeoutReqCh: case timeoutReq := <-timeoutReqCh:
fmt.Println("listservice got timeout on ", timeoutReq)
reqStr := reqToStr(timeoutReq.Bucket, timeoutReq.Prefix, timeoutReq.Marker, timeoutReq.Delimiter) reqStr := reqToStr(timeoutReq.Bucket, timeoutReq.Prefix, timeoutReq.Marker, timeoutReq.Delimiter)
listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr]
if ok { if ok {
@ -137,27 +142,22 @@ func (fs *Filesystem) startListService() *probe.Error {
} }
delete(reqToListWorkerReqCh, reqStr) delete(reqToListWorkerReqCh, reqStr)
case serviceReq := <-listServiceReqCh: case serviceReq := <-listServiceReqCh:
fmt.Println("serviceReq received", serviceReq)
fmt.Println("sending to listservicereqch", fs)
reqStr := reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, serviceReq.req.Marker, serviceReq.req.Delimiter) reqStr := reqToStr(serviceReq.req.Bucket, serviceReq.req.Prefix, serviceReq.req.Marker, serviceReq.req.Delimiter)
listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr] listWorkerReqCh, ok := reqToListWorkerReqCh[reqStr]
if !ok { if !ok {
var err *probe.Error var err *probe.Error
listWorkerReqCh, err = fs.listWorker(serviceReq.req) listWorkerReqCh, err = fs.listWorker(serviceReq.req)
if err != nil { if err != nil {
fmt.Println("listWorker returned error", err) serviceReq.respCh <- ListObjectsResult{}
serviceReq.respCh <- ListObjectsResp{}
return return
} }
reqToListWorkerReqCh[reqStr] = listWorkerReqCh reqToListWorkerReqCh[reqStr] = listWorkerReqCh
} }
respCh := make(chan ListObjectsResp) respCh := make(chan ListObjectsResult)
listWorkerReqCh <- listWorkerReq{serviceReq.req, respCh} listWorkerReqCh <- listWorkerReq{serviceReq.req, respCh}
resp, ok := <-respCh resp, ok := <-respCh
if !ok { if !ok {
serviceReq.respCh <- ListObjectsResp{} serviceReq.respCh <- ListObjectsResult{}
fmt.Println("listWorker resp was not ok")
return return
} }
delete(reqToListWorkerReqCh, reqStr) delete(reqToListWorkerReqCh, reqStr)
@ -177,13 +177,12 @@ func (fs *Filesystem) startListService() *probe.Error {
} }
// ListObjects - // ListObjects -
func (fs Filesystem) ListObjects(bucket string, req ListObjectsReq) (ListObjectsResp, *probe.Error) { func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) {
fs.lock.Lock() fs.lock.Lock()
defer fs.lock.Unlock() defer fs.lock.Unlock()
Separator := string(os.PathSeparator)
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return ListObjectsResp{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
} }
bucket = fs.denormalizeBucket(bucket) bucket = fs.denormalizeBucket(bucket)
@ -191,39 +190,34 @@ func (fs Filesystem) ListObjects(bucket string, req ListObjectsReq) (ListObjects
// check bucket exists // check bucket exists
if _, e := os.Stat(rootPrefix); e != nil { if _, e := os.Stat(rootPrefix); e != nil {
if os.IsNotExist(e) { if os.IsNotExist(e) {
return ListObjectsResp{}, probe.NewError(BucketNotFound{Bucket: bucket}) return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket})
} }
return ListObjectsResp{}, probe.NewError(e) return ListObjectsResult{}, probe.NewError(e)
}
canonicalize := func(str string) string {
return strings.Replace(str, "/", string(os.PathSeparator), -1)
}
decanonicalize := func(str string) string {
return strings.Replace(str, string(os.PathSeparator), "/", -1)
} }
req := listObjectsReq{}
req.Bucket = bucket req.Bucket = bucket
req.Prefix = canonicalize(req.Prefix) req.Prefix = filepath.FromSlash(prefix)
req.Marker = canonicalize(req.Marker) req.Marker = filepath.FromSlash(marker)
req.Delimiter = canonicalize(req.Delimiter) req.Delimiter = filepath.FromSlash(delimiter)
req.MaxKeys = maxKeys
if req.Delimiter != "" && req.Delimiter != Separator { respCh := make(chan ListObjectsResult)
return ListObjectsResp{}, probe.NewError(errors.New("not supported"))
}
respCh := make(chan ListObjectsResp)
fs.listServiceReqCh <- listServiceReq{req, respCh} fs.listServiceReqCh <- listServiceReq{req, respCh}
resp := <-respCh resp := <-respCh
for i := 0; i < len(resp.Prefixes); i++ { for i := 0; i < len(resp.Prefixes); i++ {
resp.Prefixes[i] = decanonicalize(resp.Prefixes[i]) resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i])
} }
for i := 0; i < len(resp.Objects); i++ { for i := 0; i < len(resp.Objects); i++ {
resp.Objects[i].Object = decanonicalize(resp.Objects[i].Object) resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object)
} }
if req.Delimiter == "" { if req.Delimiter == "" {
// unset NextMaker for recursive list // This element is set only if you have delimiter set.
// If response does not include the NextMaker and it is
// truncated, you can use the value of the last Key in the
// response as the marker in the subsequent request to get the
// next set of object keys.
resp.NextMarker = "" resp.NextMarker = ""
} }
return resp, nil return resp, nil

View file

@ -34,7 +34,7 @@ type Filesystem struct {
multiparts *Multiparts multiparts *Multiparts
buckets *Buckets buckets *Buckets
listServiceReqCh chan<- listServiceReq listServiceReqCh chan<- listServiceReq
timeoutReqCh chan<- ListObjectsReq timeoutReqCh chan<- listObjectsReq
} }
// Buckets holds acl information // Buckets holds acl information
@ -94,10 +94,10 @@ func New(rootPath string) (Filesystem, *probe.Error) {
return Filesystem{}, err.Trace() return Filesystem{}, err.Trace()
} }
} }
a := Filesystem{lock: new(sync.Mutex)} fs := Filesystem{lock: new(sync.Mutex)}
a.path = rootPath fs.path = rootPath
a.multiparts = multiparts fs.multiparts = multiparts
a.buckets = buckets fs.buckets = buckets
/// Defaults /// Defaults
// maximum buckets to be listed from list buckets. // maximum buckets to be listed from list buckets.
@ -105,6 +105,7 @@ func New(rootPath string) (Filesystem, *probe.Error) {
// minium free disk required for i/o operations to succeed. // minium free disk required for i/o operations to succeed.
fs.minFreeDisk = 10 fs.minFreeDisk = 10
// Start list goroutine.
err = fs.startListService() err = fs.startListService()
if err != nil { if err != nil {
return Filesystem{}, err.Trace(rootPath) return Filesystem{}, err.Trace(rootPath)

View file

@ -159,24 +159,6 @@ func createConfigPath() *probe.Error {
return nil return nil
} }
// isAuthConfigFileExists is auth config file exists?
func isConfigFileExists() bool {
if _, err := os.Stat(mustGetConfigFile()); err != nil {
if os.IsNotExist(err) {
return false
}
panic(err)
}
return true
}
// mustGetConfigFile always get users config file, if not panic
func mustGetConfigFile() string {
configFile, err := getConfigFile()
fatalIf(err.Trace(), "Unable to get config file.", nil)
return configFile
}
// getConfigFile get users config file // getConfigFile get users config file
func getConfigFile() (string, *probe.Error) { func getConfigFile() (string, *probe.Error) {
configPath, err := getConfigPath() configPath, err := getConfigPath()

View file

@ -126,16 +126,17 @@ var ignoredHeaders = map[string]bool{
} }
func (s *MyAPIFSCacheSuite) newRequest(method, urlStr string, contentLength int64, body io.ReadSeeker) (*http.Request, error) { func (s *MyAPIFSCacheSuite) newRequest(method, urlStr string, contentLength int64, body io.ReadSeeker) (*http.Request, error) {
if method == "" {
method = "POST"
}
t := time.Now().UTC() t := time.Now().UTC()
req, err := http.NewRequest(method, urlStr, nil) req, err := http.NewRequest(method, urlStr, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Header.Set("x-amz-date", t.Format(iso8601Format)) req.Header.Set("x-amz-date", t.Format(iso8601Format))
if method == "" {
method = "POST"
}
// add Content-Length // add Content-Length
req.ContentLength = contentLength req.ContentLength = contentLength

View file

@ -74,12 +74,6 @@ const (
minioUpdateExperimentalURL = "https://dl.minio.io/server/minio/experimental/" minioUpdateExperimentalURL = "https://dl.minio.io/server/minio/experimental/"
) )
// minioUpdates container to hold updates json.
type minioUpdates struct {
BuildDate string
Platforms map[string]string
}
// updateMessage container to hold update messages. // updateMessage container to hold update messages.
type updateMessage struct { type updateMessage struct {
Status string `json:"status"` Status string `json:"status"`