api: Fix verification of checkLeafDirectory. (#1347)

This fixes a problem where leaf directory has more than 1000
entries, also resulting in listing issues, leading to an infinite
loop.

Fixes #1334
This commit is contained in:
Harshavardhana 2016-04-21 18:05:26 -07:00
parent cb1116725b
commit 1284ecc6f2
3 changed files with 34 additions and 24 deletions

View file

@ -30,6 +30,7 @@ import (
)
const (
// Minio meta volume.
minioMetaVolume = ".minio"
)
@ -37,8 +38,9 @@ const (
// yes returns all the files inside it.
func (o objectAPI) checkLeafDirectory(prefixPath string) (isLeaf bool, fis []FileInfo) {
var allFileInfos []FileInfo
var markerPath string
for {
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, "", false, 1000)
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, false, 1000)
if e != nil {
break
}
@ -46,6 +48,7 @@ func (o objectAPI) checkLeafDirectory(prefixPath string) (isLeaf bool, fis []Fil
if eof {
break
}
markerPath = allFileInfos[len(allFileInfos)-1].Name
}
for _, fileInfo := range allFileInfos {
if fileInfo.Mode.IsDir() {
@ -80,7 +83,7 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
}
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashPathSeparator {
if delimiter != "" && delimiter != slashSeparator {
return ListMultipartsInfo{}, probe.NewError(UnsupportedDelimiter{
Delimiter: delimiter,
})
@ -93,7 +96,7 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
})
}
if uploadIDMarker != "" {
if strings.HasSuffix(keyMarker, slashPathSeparator) {
if strings.HasSuffix(keyMarker, slashSeparator) {
return result, probe.NewError(InvalidUploadIDKeyCombination{
UploadIDMarker: uploadIDMarker,
KeyMarker: keyMarker,
@ -111,13 +114,13 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
}
recursive := true
if delimiter == slashPathSeparator {
if delimiter == slashSeparator {
recursive = false
}
result.IsTruncated = true
result.MaxUploads = maxUploads
newMaxUploads := 0
// not using path.Join() as it strips off the trailing '/'.
// Not using path.Join() as it strips off the trailing '/'.
// Also bucket should always be followed by '/' even if prefix is empty.
prefixPath := pathJoin(bucket, prefix)
if recursive {
@ -135,9 +138,9 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
keyMarkerPath = fi.Name
// fi.Name will look like bucket/object/uploadID, extract object and uploadID.
uploadID := path.Base(fi.Name)
objectName := strings.TrimPrefix(path.Dir(fi.Name), bucket+slashPathSeparator)
objectName := strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket))
if strings.Contains(uploadID, ".") {
// contains partnumber and md5sum info, skip this.
// Contains partnumber and md5sum info, skip this.
continue
}
result.Uploads = append(result.Uploads, uploadMetadata{
@ -189,14 +192,14 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
var uploads []uploadMetadata
for _, fi := range fileInfos {
leaf, entries := o.checkLeafDirectory(fi.Name)
objectName := strings.TrimPrefix(fi.Name, bucket+slashPathSeparator)
objectName := strings.TrimPrefix(fi.Name, retainSlash(bucket))
if leaf {
for _, entry := range entries {
if strings.Contains(entry.Name, ".") {
if strings.Contains(path.Base(entry.Name), ".") {
continue
}
uploads = append(uploads, uploadMetadata{
Object: strings.TrimSuffix(objectName, slashPathSeparator),
Object: path.Dir(objectName),
UploadID: path.Base(entry.Name),
Initiated: entry.ModTime,
})
@ -222,7 +225,7 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
break
}
result.NextKeyMarker = uploads[index].Object
if strings.HasSuffix(uploads[index].Object, slashPathSeparator) {
if strings.HasSuffix(uploads[index].Object, slashSeparator) {
// for a directory entry
result.CommonPrefixes = append(result.CommonPrefixes, uploads[index].Object)
continue
@ -393,22 +396,24 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa
return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
result := ListPartsInfo{}
marker := ""
var markerPath string
nextPartNumberMarker := 0
uploadIDPath := path.Join(bucket, object, uploadID)
// Figure out the marker for the next subsequent calls, if the
// partNumberMarker is already set.
if partNumberMarker > 0 {
fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath+"."+strconv.Itoa(partNumberMarker)+".", "", false, 1)
uploadIDPartPrefix := uploadIDPath + "." + strconv.Itoa(partNumberMarker) + "."
fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, uploadIDPartPrefix, "", false, 1)
if e != nil {
return result, probe.NewError(e)
}
if len(fileInfos) == 0 {
return result, probe.NewError(InvalidPart{})
}
marker = fileInfos[0].Name
markerPath = fileInfos[0].Name
}
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath+".", marker, false, maxParts)
uploadIDPrefix := uploadIDPath + "."
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPrefix, markerPath, false, maxParts)
if e != nil {
return result, probe.NewError(InvalidPart{})
}

View file

@ -294,7 +294,7 @@ func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys
return ListObjectsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashPathSeparator {
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, probe.NewError(UnsupportedDelimiter{
Delimiter: delimiter,
})
@ -309,7 +309,7 @@ func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys
}
}
recursive := true
if delimiter == slashPathSeparator {
if delimiter == slashSeparator {
recursive = false
}
fileInfos, eof, e := o.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys)
@ -325,7 +325,7 @@ func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys
result := ListObjectsInfo{IsTruncated: !eof}
for _, fileInfo := range fileInfos {
// With delimiter set we fill in NextMarker and Prefixes.
if delimiter == slashPathSeparator {
if delimiter == slashSeparator {
result.NextMarker = fileInfo.Name
if fileInfo.Mode.IsDir() {
result.Prefixes = append(result.Prefixes, fileInfo.Name)

View file

@ -23,10 +23,6 @@ import (
"unicode/utf8"
)
const (
slashPathSeparator = "/"
)
// validBucket regexp.
var validBucket = regexp.MustCompile(`^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$`)
@ -91,8 +87,17 @@ func IsValidObjectPrefix(object string) bool {
}
func pathJoin(path1 string, path2 string) string {
return strings.TrimSuffix(path1, slashPathSeparator) + slashPathSeparator + path2
// Slash separator.
const slashSeparator = "/"
// retainSlash - retains slash from a path.
func retainSlash(s string) string {
return strings.TrimSuffix(s, slashSeparator) + slashSeparator
}
// pathJoin - path join.
func pathJoin(s1 string, s2 string) string {
return retainSlash(s1) + s2
}
// validates location constraint from the request body.