enhance multipart functions to use fsDirent (#1304)

* backend/fs: scanMulitpartDir returns directories only for recursive listing

* backend/fs: enhance multipart functions to use fsDirent
This commit is contained in:
Bala FA 2016-04-09 00:16:03 +05:30 committed by Harshavardhana
parent bedd867c0b
commit 6af761c86c
5 changed files with 249 additions and 203 deletions

View file

@ -26,31 +26,34 @@ import (
// fsDirent carries directory entries.
type fsDirent struct {
name string
modifiedTime time.Time // On Solaris and older unix distros this is empty.
size int64 // On Solaris and older unix distros this is empty.
isDir bool
name string
modTime time.Time // On Solaris and older unix distros this is empty.
size int64 // On Solaris and older unix distros this is empty.
mode os.FileMode
}
// byDirentNames is a collection satisfying sort.Interface.
type byDirentNames []fsDirent
func (d byDirentNames) Len() int { return len(d) }
func (d byDirentNames) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byDirentNames) Less(i, j int) bool {
n1 := d[i].name
if d[i].isDir {
n1 = n1 + string(os.PathSeparator)
}
n2 := d[j].name
if d[j].isDir {
n2 = n2 + string(os.PathSeparator)
}
return n1 < n2
// IsDir - returns true if fsDirent is a directory
func (ent fsDirent) IsDir() bool {
return ent.mode.IsDir()
}
// IsSymlink - returns true if fsDirent is a symbolic link
func (ent fsDirent) IsSymlink() bool {
return ent.mode&os.ModeSymlink == os.ModeSymlink
}
// IsRegular - returns true if fsDirent is a regular file
func (ent fsDirent) IsRegular() bool {
return ent.mode.IsRegular()
}
// byDirentName is a collection satisfying sort.Interface.
type byDirentName []fsDirent
func (d byDirentName) Len() int { return len(d) }
func (d byDirentName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byDirentName) Less(i, j int) bool { return d[i].name < d[j].name }
// Using sort.Search() internally to jump to the file entry containing the prefix.
func searchDirents(dirents []fsDirent, x string) int {
processFunc := func(i int) bool {
@ -85,7 +88,7 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b
objectInfo := ObjectInfo{}
// Convert to full object name.
objectInfo.Name = filepath.Join(prefixDir, dirent.name)
if dirent.modifiedTime.IsZero() && dirent.size == 0 {
if dirent.modTime.IsZero() && dirent.size == 0 {
// ModifiedTime and Size are zero, Stat() and figure out
// the actual values that need to be set.
fi, err := os.Stat(filepath.Join(bucketDir, prefixDir, dirent.name))
@ -99,9 +102,9 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b
} else {
// If ModifiedTime or Size are set then use them
// without attempting another Stat operation.
objectInfo.ModifiedTime = dirent.modifiedTime
objectInfo.ModifiedTime = dirent.modTime
objectInfo.Size = dirent.size
objectInfo.IsDir = dirent.isDir
objectInfo.IsDir = dirent.IsDir()
}
if objectInfo.IsDir {
// Add os.PathSeparator suffix again for directories as
@ -135,13 +138,13 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b
dirents = dirents[searchDirents(dirents, markerDir):]
*count += len(dirents)
for i, dirent := range dirents {
if i == 0 && markerDir == dirent.name && !dirent.isDir {
if i == 0 && markerDir == dirent.name && !dirent.IsDir() {
// If the first entry is not a directory
// we need to skip this entry.
*count--
continue
}
if dirent.isDir && recursive {
if dirent.IsDir() && recursive {
// If the entry is a directory, we will need recurse into it.
markerArg := ""
if dirent.name == markerDir {

View file

@ -20,6 +20,7 @@ package main
import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
@ -28,7 +29,10 @@ import (
)
const (
// Large enough buffer size for ReadDirent() syscall
// readDirentBufSize for syscall.ReadDirent() to hold multiple
// directory entries in one buffer. golang source uses 4096 as
// buffer size whereas we want 25 times larger to save lots of
// entries to avoid multiple syscall.ReadDirent() call.
readDirentBufSize = 4096 * 25
)
@ -65,9 +69,30 @@ func parseDirents(buf []byte) []fsDirent {
if name == "." || name == ".." {
continue
}
var mode os.FileMode
switch dirent.Type {
case syscall.DT_BLK, syscall.DT_WHT:
mode = os.ModeDevice
case syscall.DT_CHR:
mode = os.ModeDevice | os.ModeCharDevice
case syscall.DT_DIR:
mode = os.ModeDir
case syscall.DT_FIFO:
mode = os.ModeNamedPipe
case syscall.DT_LNK:
mode = os.ModeSymlink
case syscall.DT_REG:
mode = 0
case syscall.DT_SOCK:
mode = os.ModeSocket
case syscall.DT_UNKNOWN:
mode = 0xffffffff
}
dirents = append(dirents, fsDirent{
name: name,
isDir: (dirent.Type == syscall.DT_DIR),
name: name,
mode: mode,
})
}
return dirents
@ -91,7 +116,7 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
break
}
for _, dirent := range parseDirents(buf[:nbuf]) {
if dirent.isDir {
if dirent.IsDir() {
dirent.name += string(os.PathSeparator)
dirent.size = 0
}
@ -100,6 +125,46 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
}
}
}
sort.Sort(byDirentNames(dirents))
sort.Sort(byDirentName(dirents))
return dirents, nil
}
// scans the directory dirPath, calling filter() on each directory
// entry. Entries for which filter() returns true are stored, lexically
// sorted using sort.Sort(). If filter is NULL, all entries are selected.
// If namesOnly is true, dirPath is not appended into entry name.
func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) {
buf := make([]byte, readDirentBufSize)
d, err := os.Open(dirPath)
if err != nil {
return nil, err
}
defer d.Close()
fd := int(d.Fd())
dirents := []fsDirent{}
for {
nbuf, err := syscall.ReadDirent(fd, buf)
if err != nil {
return nil, err
}
if nbuf <= 0 {
break
}
for _, dirent := range parseDirents(buf[:nbuf]) {
if !namesOnly {
dirent.name = filepath.Join(dirPath, dirent.name)
}
if dirent.IsDir() {
dirent.name += string(os.PathSeparator)
}
if filter == nil || filter(dirent) {
dirents = append(dirents, dirent)
}
}
}
sort.Sort(byDirentName(dirents))
return dirents, nil
}

View file

@ -21,6 +21,7 @@ package main
import (
"io"
"os"
"path/filepath"
"sort"
"strings"
)
@ -43,12 +44,12 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
}
for _, fi := range fis {
dirent := fsDirent{
name: fi.Name(),
size: fi.Size(),
modifiedTime: fi.ModTime(),
isDir: fi.IsDir(),
name: fi.Name(),
modTime: fi.ModTime(),
size: fi.Size(),
mode: fi.Mode(),
}
if dirent.isDir {
if dirent.IsDir() {
dirent.name += string(os.PathSeparator)
dirent.size = 0
}
@ -58,6 +59,50 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
}
}
// Sort dirents.
sort.Sort(byDirentNames(dirents))
sort.Sort(byDirentName(dirents))
return dirents, nil
}
// scans the directory dirPath, calling filter() on each directory
// entry. Entries for which filter() returns true are stored, lexically
// sorted using sort.Sort(). If filter is NULL, all entries are selected.
// If namesOnly is true, dirPath is not appended into entry name.
func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) {
d, err := os.Open(dirPath)
if err != nil {
return nil, err
}
defer d.Close()
var dirents []fsDirent
for {
fis, err := d.Readdir(1000)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
for _, fi := range fis {
dirent := fsDirent{
name: fi.Name(),
modTime: fi.ModTime(),
size: fi.Size(),
mode: fi.Mode(),
}
if !namesOnly {
dirent.name = filepath.Join(dirPath, dirent.name)
}
if dirent.IsDir() {
dirent.name += string(os.PathSeparator)
}
if filter == nil || filter(dirent) {
dirents = append(dirents, dirent)
}
}
}
sort.Sort(byDirentName(dirents))
return dirents, nil
}

View file

@ -18,118 +18,12 @@ package main
import (
"errors"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
// DirEntry - directory entry
type DirEntry struct {
Name string
Size int64
Mode os.FileMode
ModTime time.Time
}
// IsDir - returns true if DirEntry is a directory
func (entry DirEntry) IsDir() bool {
return entry.Mode.IsDir()
}
// IsSymlink - returns true if DirEntry is a symbolic link
func (entry DirEntry) IsSymlink() bool {
return entry.Mode&os.ModeSymlink == os.ModeSymlink
}
// IsRegular - returns true if DirEntry is a regular file
func (entry DirEntry) IsRegular() bool {
return entry.Mode.IsRegular()
}
// sort interface for DirEntry slice
type byEntryName []DirEntry
func (f byEntryName) Len() int { return len(f) }
func (f byEntryName) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
func (f byEntryName) Less(i, j int) bool { return f[i].Name < f[j].Name }
func filteredReaddir(dirname string, filter func(DirEntry) bool, appendPath bool) ([]DirEntry, error) {
result := []DirEntry{}
d, err := os.Open(dirname)
if err != nil {
return result, err
}
defer d.Close()
for {
fis, err := d.Readdir(1000)
if err != nil {
if err == io.EOF {
break
}
return result, err
}
for _, fi := range fis {
name := fi.Name()
if appendPath {
name = filepath.Join(dirname, name)
}
if fi.IsDir() {
name += string(os.PathSeparator)
}
entry := DirEntry{Name: name, Size: fi.Size(), Mode: fi.Mode(), ModTime: fi.ModTime()}
if filter == nil || filter(entry) {
result = append(result, entry)
}
}
}
sort.Sort(byEntryName(result))
return result, nil
}
func filteredReaddirnames(dirname string, filter func(string) bool) ([]string, error) {
result := []string{}
d, err := os.Open(dirname)
if err != nil {
return result, err
}
defer d.Close()
for {
names, err := d.Readdirnames(1000)
if err != nil {
if err == io.EOF {
break
}
return result, err
}
for _, name := range names {
if filter == nil || filter(name) {
result = append(result, name)
}
}
}
sort.Strings(result)
return result, nil
}
func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) multipartObjectInfoChannel {
objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit)
timeoutCh := make(chan struct{}, 1)
@ -218,39 +112,49 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
}
for {
entries, err := filteredReaddir(scanDir,
func(entry DirEntry) bool {
if entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix)) {
return strings.HasPrefix(entry.Name, prefixPath) && entry.Name > markerPath
dirents, err := scandir(scanDir,
func(dirent fsDirent) bool {
if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, uploadIDSuffix)) {
return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath
}
return false
},
true)
false)
if err != nil {
send(multipartObjectInfo{Err: err})
return
}
var entry DirEntry
for len(entries) > 0 {
entry, entries = entries[0], entries[1:]
var dirent fsDirent
for len(dirents) > 0 {
dirent, dirents = dirents[0], dirents[1:]
if entry.IsRegular() {
if dirent.IsRegular() {
// Handle uploadid file
name := strings.Replace(filepath.Dir(entry.Name), bucketDir, "", 1)
name := strings.Replace(filepath.Dir(dirent.name), bucketDir, "", 1)
if name == "" {
// This should not happen ie uploadid file should not be in bucket directory
send(multipartObjectInfo{Err: errors.New("corrupted meta data")})
return
}
uploadID := strings.Split(filepath.Base(entry.Name), uploadIDSuffix)[0]
uploadID := strings.Split(filepath.Base(dirent.name), uploadIDSuffix)[0]
// In some OS modTime is empty and use os.Stat() to fill missing values
if dirent.modTime.IsZero() {
if fi, e := os.Stat(dirent.name); e == nil {
dirent.modTime = fi.ModTime()
} else {
send(multipartObjectInfo{Err: e})
return
}
}
objInfo := multipartObjectInfo{
Name: name,
UploadID: uploadID,
ModifiedTime: entry.ModTime,
ModifiedTime: dirent.modTime,
}
if !send(objInfo) {
@ -260,21 +164,21 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
continue
}
subentries, err := filteredReaddir(entry.Name,
func(entry DirEntry) bool {
return entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix))
subDirents, err := scandir(dirent.name,
func(dirent fsDirent) bool {
return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, uploadIDSuffix))
},
true)
false)
if err != nil {
send(multipartObjectInfo{Err: err})
return
}
subDirFound := false
uploadIDEntries := []DirEntry{}
// If subentries has a directory, then current entry needs to be sent
for _, subentry := range subentries {
if subentry.IsDir() {
uploadIDDirents := []fsDirent{}
// If subDirents has a directory, then current dirent needs to be sent
for _, subdirent := range subDirents {
if subdirent.IsDir() {
subDirFound = true
if recursive {
@ -282,15 +186,26 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
}
}
if !recursive && subentry.IsRegular() {
uploadIDEntries = append(uploadIDEntries, subentry)
if !recursive && subdirent.IsRegular() {
uploadIDDirents = append(uploadIDDirents, subdirent)
}
}
if subDirFound || len(subentries) == 0 {
// send directory only for non-recursive listing
if !recursive && (subDirFound || len(subDirents) == 0) {
// In some OS modTime is empty and use os.Stat() to fill missing values
if dirent.modTime.IsZero() {
if fi, e := os.Stat(dirent.name); e == nil {
dirent.modTime = fi.ModTime()
} else {
send(multipartObjectInfo{Err: e})
return
}
}
objInfo := multipartObjectInfo{
Name: strings.Replace(entry.Name, bucketDir, "", 1),
ModifiedTime: entry.ModTime,
Name: strings.Replace(dirent.name, bucketDir, "", 1),
ModifiedTime: dirent.modTime,
IsDir: true,
}
@ -300,9 +215,9 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
}
if recursive {
entries = append(subentries, entries...)
dirents = append(subDirents, dirents...)
} else {
entries = append(uploadIDEntries, entries...)
dirents = append(uploadIDDirents, dirents...)
}
}

View file

@ -170,18 +170,18 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error {
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
uploadIDPrefix := uploadID + "."
names, e := filteredReaddirnames(metaObjectDir,
func(name string) bool {
return strings.HasPrefix(name, uploadIDPrefix)
dirents, e := scandir(metaObjectDir,
func(dirent fsDirent) bool {
return dirent.IsRegular() && strings.HasPrefix(dirent.name, uploadIDPrefix)
},
)
true)
if e != nil {
return e
}
for _, name := range names {
if e := os.Remove(filepath.Join(metaObjectDir, name)); e != nil {
for _, dirent := range dirents {
if e := os.Remove(filepath.Join(metaObjectDir, dirent.name)); e != nil {
//return InternalError{Err: err}
return e
}
@ -515,9 +515,7 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa
}
recursive := true
skipDir := true
if delimiter == "/" {
skipDir = false
recursive = false
}
@ -557,10 +555,7 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa
continue
}
if multipartObjInfo.IsDir && skipDir {
continue
}
// Directories are listed only if recursive is false
if multipartObjInfo.IsDir {
result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name)
} else {
@ -600,49 +595,72 @@ func (fs Filesystem) ListObjectParts(bucket, object, uploadID string, partNumber
return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
// return empty ListPartsInfo
if maxParts == 0 {
return ListPartsInfo{}, nil
}
if maxParts < 0 || maxParts > 1000 {
maxParts = 1000
}
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
entries, err := filteredReaddir(metaObjectDir,
func(entry DirEntry) bool {
if tokens := strings.Split(entry.Name, "."); len(tokens) == 3 {
if tokens[0] == uploadID {
if partNumber, err := strconv.Atoi(tokens[1]); err == nil {
if partNumber >= 1 && partNumber <= 10000 && partNumber > partNumberMarker {
return true
}
}
uploadIDPrefix := uploadID + "."
dirents, e := scandir(metaObjectDir,
func(dirent fsDirent) bool {
// Part file is a regular file and has to be started with 'UPLOADID.'
if !(dirent.IsRegular() && strings.HasPrefix(dirent.name, uploadIDPrefix)) {
return false
}
// Valid part file has to be 'UPLOADID.PARTNUMBER.MD5SUM'
tokens := strings.Split(dirent.name, ".")
if len(tokens) != 3 {
return false
}
if partNumber, err := strconv.Atoi(tokens[1]); err == nil {
if partNumber >= 1 && partNumber <= 10000 && partNumber > partNumberMarker {
return true
}
}
return false
},
false,
)
if err != nil {
return ListPartsInfo{}, probe.NewError(err)
true)
if e != nil {
return ListPartsInfo{}, probe.NewError(e)
}
isTruncated := false
if maxParts <= 0 || maxParts > 1000 {
maxParts = 1000
}
nextPartNumberMarker := 0
parts := []partInfo{}
for i := range entries {
for i := range dirents {
if i == maxParts {
isTruncated = true
break
}
tokens := strings.Split(entries[i].Name, ".")
// In some OS modTime is empty and use os.Stat() to fill missing values
if dirents[i].modTime.IsZero() {
if fi, e := os.Stat(filepath.Join(metaObjectDir, dirents[i].name)); e == nil {
dirents[i].modTime = fi.ModTime()
dirents[i].size = fi.Size()
} else {
return ListPartsInfo{}, probe.NewError(e)
}
}
tokens := strings.Split(dirents[i].name, ".")
partNumber, _ := strconv.Atoi(tokens[1])
md5sum := tokens[2]
parts = append(parts, partInfo{
PartNumber: partNumber,
LastModified: entries[i].ModTime,
LastModified: dirents[i].modTime,
ETag: md5sum,
Size: entries[i].Size,
Size: dirents[i].size,
})
}