diff --git a/fs-multipart-dir.go b/fs-multipart-dir.go index b023cbb6d..84268a46a 100644 --- a/fs-multipart-dir.go +++ b/fs-multipart-dir.go @@ -24,9 +24,8 @@ import ( "time" ) -func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) multipartObjectInfoChannel { +func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) <-chan multipartObjectInfo { objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit) - timeoutCh := make(chan struct{}, 1) // TODO: check if bucketDir is absolute path scanDir := bucketDir @@ -96,7 +95,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, // goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel. go func() { defer close(objectInfoCh) - defer close(timeoutCh) // send function - returns true if ObjectInfo is sent // within (time.Second * 15) else false on timeout. @@ -106,27 +104,45 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, case objectInfoCh <- oi: return true case <-timer: - timeoutCh <- struct{}{} return false } } - for { - // Filters scandir entries. This filter function is - // specific for multipart listing. - multipartFilterFn := func(dirent fsDirent) bool { - // Verify if dirent is a directory a regular file - // with match uploadID suffix. - if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) { - // Return if dirent matches prefix and - // lexically higher than marker. - return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath - } - return false + // filter function - filters directory entries matching multipart uploadids, prefix and marker + direntFilterFn := func(dirent fsDirent) bool { + // check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string + if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) { + // return if dirent's name starts with prefixPath and lexically higher than markerPath + return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath } - dirents, err := scandir(scanDir, multipartFilterFn, false) + return false + } + + // filter function - filters directory entries matching multipart uploadids + subDirentFilterFn := func(dirent fsDirent) bool { + // check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string + return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) + } + + // lastObjInfo is used to save last object info which is sent at last with End=true + var lastObjInfo *multipartObjectInfo + + sendError := func(err error) { + if lastObjInfo != nil { + if !send(*lastObjInfo) { + // as we got error sending lastObjInfo, we can't send the error + return + } + } + + send(multipartObjectInfo{Err: err, End: true}) + return + } + + for { + dirents, err := scandir(scanDir, direntFilterFn, false) if err != nil { - send(multipartObjectInfo{Err: err}) + sendError(err) return } @@ -138,19 +154,19 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, name := strings.Replace(filepath.Dir(dirent.name), bucketDir, "", 1) if name == "" { // This should not happen ie uploadid file should not be in bucket directory - send(multipartObjectInfo{Err: errors.New("Corrupted metadata")}) + sendError(errors.New("Corrupted metadata")) return } uploadID := strings.Split(filepath.Base(dirent.name), multipartUploadIDSuffix)[0] // Solaris and older unixes have modTime to be - // empty, fall back to os.Stat() to fill missing values. + // empty, fallback to os.Stat() to fill missing values. if dirent.modTime.IsZero() { if fi, e := os.Stat(dirent.name); e == nil { dirent.modTime = fi.ModTime() } else { - send(multipartObjectInfo{Err: e}) + sendError(e) return } } @@ -161,20 +177,21 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, ModifiedTime: dirent.modTime, } - if !send(objInfo) { - return + // as we got new object info, send last object info and keep new object info as last object info + if lastObjInfo != nil { + if !send(*lastObjInfo) { + return + } } + lastObjInfo = &objInfo continue } - multipartSubDirentFilterFn := func(dirent fsDirent) bool { - return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) - } // Fetch sub dirents. - subDirents, err := scandir(dirent.name, multipartSubDirentFilterFn, false) + subDirents, err := scandir(dirent.name, subDirentFilterFn, false) if err != nil { - send(multipartObjectInfo{Err: err}) + sendError(err) return } @@ -198,12 +215,12 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, // Send directory only for non-recursive listing if !recursive && (subDirFound || len(subDirents) == 0) { // Solaris and older unixes have modTime to be - // empty, fall back to os.Stat() to fill missing values. + // empty, fallback to os.Stat() to fill missing values. if dirent.modTime.IsZero() { if fi, e := os.Stat(dirent.name); e == nil { dirent.modTime = fi.ModTime() } else { - send(multipartObjectInfo{Err: e}) + sendError(e) return } } @@ -214,9 +231,13 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, IsDir: true, } - if !send(objInfo) { - return + // as we got new object info, send last object info and keep new object info as last object info + if lastObjInfo != nil { + if !send(*lastObjInfo) { + return + } } + lastObjInfo = &objInfo } if recursive { @@ -235,10 +256,17 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, break } } + + if lastObjInfo != nil { + // we got last object + lastObjInfo.End = true + if !send(*lastObjInfo) { + return + } + } }() - // Return multipart info. - return multipartObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh} + return objectInfoCh } // multipartObjectInfo - Multipart object info @@ -248,67 +276,5 @@ type multipartObjectInfo struct { ModifiedTime time.Time IsDir bool Err error -} - -// multipartObjectInfoChannel - multipart object info channel -type multipartObjectInfoChannel struct { - ch <-chan multipartObjectInfo - objInfo *multipartObjectInfo - closed bool - timeoutCh <-chan struct{} - timedOut bool -} - -func (oic *multipartObjectInfoChannel) Read() (multipartObjectInfo, bool) { - if oic.closed { - return multipartObjectInfo{}, false - } - if oic.objInfo == nil { - // First read. - if oi, ok := <-oic.ch; ok { - oic.objInfo = &oi - } else { - oic.closed = true - return multipartObjectInfo{}, false - } - } - - retObjInfo := *oic.objInfo - status := true - oic.objInfo = nil - - // Read once more to know whether it was last read. - if oi, ok := <-oic.ch; ok { - oic.objInfo = &oi - } else { - oic.closed = true - } - - return retObjInfo, status -} - -// IsClosed - return whether channel is closed or not. -func (oic multipartObjectInfoChannel) IsClosed() bool { - if oic.objInfo != nil { - return false - } - return oic.closed -} - -// IsTimedOut - return whether channel is closed due to timeout. -func (oic multipartObjectInfoChannel) IsTimedOut() bool { - if oic.timedOut { - return true - } - - select { - case _, ok := <-oic.timeoutCh: - if ok { - oic.timedOut = true - return true - } - return false - default: - return false - } + End bool } diff --git a/fs-multipart.go b/fs-multipart.go index c6c8ea726..4ceec5af7 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2015 Minio, Inc. + * Minio Cloud Storage, (C) 2015,2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -394,11 +394,11 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa return newObject, nil } -func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { +func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch <-chan multipartObjectInfo) { fs.listMultipartObjectMapMutex.Lock() defer fs.listMultipartObjectMapMutex.Unlock() - channels := []multipartObjectInfoChannel{ch} + channels := []<-chan multipartObjectInfo{ch} if _, ok := fs.listMultipartObjectMap[params]; ok { channels = append(fs.listMultipartObjectMap[params], ch) } @@ -406,27 +406,23 @@ func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams fs.listMultipartObjectMap[params] = channels } -func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { +func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) <-chan multipartObjectInfo { fs.listMultipartObjectMapMutex.Lock() defer fs.listMultipartObjectMapMutex.Unlock() if channels, ok := fs.listMultipartObjectMap[params]; ok { - for i, channel := range channels { - if !channel.IsTimedOut() { - chs := channels[i+1:] - if len(chs) > 0 { - fs.listMultipartObjectMap[params] = chs - } else { - delete(fs.listMultipartObjectMap, params) - } - - return &channel - } + var channel <-chan multipartObjectInfo + channel, channels = channels[0], channels[1:] + if len(channels) > 0 { + fs.listMultipartObjectMap[params] = channels + } else { + // do not store empty channel list + delete(fs.listMultipartObjectMap, params) } - // As all channels are timed out, delete the map entry - delete(fs.listMultipartObjectMap, params) + return channel } + return nil } @@ -484,8 +480,10 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa } metaBucketDir := filepath.Join(fs.diskPath, minioMetaDir, bucket) + // Lookup of if listMultipartObjectChannel is available for given // parameters, else create a new one. + savedChannel := true multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{ bucket: bucket, delimiter: delimiter, @@ -493,58 +491,94 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa prefix: prefixPath, uploadIDMarker: uploadIDMarker, }) + if multipartObjectInfoCh == nil { - ch := scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) - multipartObjectInfoCh = &ch + multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) + savedChannel = false } + var objInfo *multipartObjectInfo nextKeyMarker := "" nextUploadIDMarker := "" for i := 0; i < maxUploads; { - multipartObjInfo, ok := multipartObjectInfoCh.Read() - if !ok { - // Closed channel. - return result, nil + // read the channel + if oi, ok := <-multipartObjectInfoCh; ok { + objInfo = &oi + } else { + // closed channel + if i == 0 { + // first read + if !savedChannel { + // its valid to have a closed new channel for first read + multipartObjectInfoCh = nil + break + } + + // invalid saved channel amd create new channel + multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, + uploadIDMarker, recursive) + } else { + // TODO: FIX: there is a chance of infinite loop if we get closed channel always + // the channel got closed due to timeout + // create a new channel + multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, nextKeyMarker, + nextUploadIDMarker, recursive) + } + + // make it as new channel + savedChannel = false + continue } - if multipartObjInfo.Err != nil { - if os.IsNotExist(multipartObjInfo.Err) { + if objInfo.Err != nil { + if os.IsNotExist(objInfo.Err) { return ListMultipartsInfo{}, nil } - return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) + + return ListMultipartsInfo{}, probe.NewError(objInfo.Err) } - if strings.Contains(multipartObjInfo.Name, "$multiparts") || - strings.Contains(multipartObjInfo.Name, "$tmpobject") { + // backward compatibility check + if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { continue } // Directories are listed only if recursive is false - if multipartObjInfo.IsDir { - result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) + if objInfo.IsDir { + result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name) } else { result.Uploads = append(result.Uploads, uploadMetadata{ - Object: multipartObjInfo.Name, - UploadID: multipartObjInfo.UploadID, - Initiated: multipartObjInfo.ModifiedTime, + Object: objInfo.Name, + UploadID: objInfo.UploadID, + Initiated: objInfo.ModifiedTime, }) } - nextKeyMarker = multipartObjInfo.Name - nextUploadIDMarker = multipartObjInfo.UploadID + + nextKeyMarker = objInfo.Name + nextUploadIDMarker = objInfo.UploadID i++ + + if objInfo.End { + // as we received last object, do not save this channel for later use + multipartObjectInfoCh = nil + break + } } - if !multipartObjectInfoCh.IsClosed() { + if multipartObjectInfoCh != nil { + // we haven't received last object result.IsTruncated = true result.NextKeyMarker = nextKeyMarker result.NextUploadIDMarker = nextUploadIDMarker + + // save this channel for later use fs.saveListMultipartObjectCh(listMultipartObjectParams{ bucket: bucket, delimiter: delimiter, keyMarker: nextKeyMarker, prefix: objectPrefix, uploadIDMarker: nextUploadIDMarker, - }, *multipartObjectInfoCh) + }, multipartObjectInfoCh) } return result, nil diff --git a/fs.go b/fs.go index dbf927f65..66d832e3c 100644 --- a/fs.go +++ b/fs.go @@ -49,7 +49,7 @@ type Filesystem struct { rwLock *sync.RWMutex listObjectMap map[listObjectParams][]*treeWalker listObjectMapMutex *sync.Mutex - listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel + listMultipartObjectMap map[listMultipartObjectParams][]<-chan multipartObjectInfo listMultipartObjectMapMutex *sync.Mutex } @@ -69,7 +69,7 @@ func newFS(diskPath string) (ObjectAPI, *probe.Error) { fs.listObjectMapMutex = &sync.Mutex{} // Initialize list multipart map. - fs.listMultipartObjectMap = make(map[listMultipartObjectParams][]multipartObjectInfoChannel) + fs.listMultipartObjectMap = make(map[listMultipartObjectParams][]<-chan multipartObjectInfo) fs.listMultipartObjectMapMutex = &sync.Mutex{} // Return here.