refactor: refactor code to separate fs into object-layer and fs layer. (#1305)

This commit is contained in:
Krishna Srinivas 2016-04-08 23:07:38 +05:30 committed by Harshavardhana
parent 188bb92d8a
commit 3c48537f20
35 changed files with 1463 additions and 2543 deletions

View file

@ -63,7 +63,7 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, contentRange *h
setCommonHeaders(w)
// set object-related metadata headers
lastModified := objInfo.ModifiedTime.UTC().Format(http.TimeFormat)
lastModified := objInfo.ModTime.UTC().Format(http.TimeFormat)
w.Header().Set("Last-Modified", lastModified)
w.Header().Set("Content-Type", objInfo.ContentType)

View file

@ -260,7 +260,7 @@ func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKe
continue
}
content.Key = object.Name
content.LastModified = object.ModifiedTime.UTC().Format(timeFormatAMZ)
content.LastModified = object.ModTime.UTC().Format(timeFormatAMZ)
if object.MD5Sum != "" {
content.ETag = "\"" + object.MD5Sum + "\""
}

View file

@ -289,8 +289,6 @@ func (api objectStorageAPI) ListObjectsHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
case BucketNotFound:
writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path)
case ObjectNotFound:
writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path)
case ObjectNameInvalid:
writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path)
default:

View file

@ -1,182 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015-2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/minio/minio/pkg/probe"
)
const (
// listObjectsLimit - maximum list objects limit.
listObjectsLimit = 1000
)
// isDirExist - returns whether given directory is exist or not.
func isDirExist(dirname string) (bool, error) {
fi, e := os.Lstat(dirname)
if e != nil {
if os.IsNotExist(e) {
return false, nil
}
return false, e
}
return fi.IsDir(), nil
}
func (fs *Filesystem) saveTreeWalk(params listObjectParams, walker *treeWalker) {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
walkers, _ := fs.listObjectMap[params]
walkers = append(walkers, walker)
fs.listObjectMap[params] = walkers
}
func (fs *Filesystem) lookupTreeWalk(params listObjectParams) *treeWalker {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
if walkChs, ok := fs.listObjectMap[params]; ok {
for i, walkCh := range walkChs {
if !walkCh.timedOut {
newWalkChs := walkChs[i+1:]
if len(newWalkChs) > 0 {
fs.listObjectMap[params] = newWalkChs
} else {
delete(fs.listObjectMap, params)
}
return walkCh
}
}
// As all channels are timed out, delete the map entry
delete(fs.listObjectMap, params)
}
return nil
}
// ListObjects - lists all objects for a given prefix, returns up to
// maxKeys number of objects per call.
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error) {
result := ListObjectsInfo{}
// Input validation.
bucket, e := fs.checkBucketArg(bucket)
if e != nil {
return result, probe.NewError(e)
}
bucketDir := filepath.Join(fs.diskPath, bucket)
if !IsValidObjectPrefix(prefix) {
return result, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != "/" {
return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported. Only '/' is supported", delimiter))
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", marker, prefix))
}
}
// Return empty response for a valid request when maxKeys is 0.
if maxKeys == 0 {
return result, nil
}
// Over flowing maxkeys - reset to listObjectsLimit.
if maxKeys < 0 || maxKeys > listObjectsLimit {
maxKeys = listObjectsLimit
}
// Verify if prefix exists.
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
rootDir := filepath.Join(bucketDir, prefixDir)
if status, e := isDirExist(rootDir); !status {
if e == nil {
// Prefix does not exist, not an error just respond empty
// list response.
return result, nil
}
// Rest errors should be treated as failure.
return result, probe.NewError(e)
}
recursive := true
if delimiter == "/" {
recursive = false
}
// Maximum 1000 objects returned in a single to listObjects.
// Further calls will set right marker value to continue reading the rest of the objectList.
// popTreeWalker returns nil if the call to ListObject is done for the first time.
// On further calls to ListObjects to retrive more objects within the timeout period,
// popTreeWalker returns the channel from which rest of the objects can be retrieved.
walker := fs.lookupTreeWalk(listObjectParams{bucket, delimiter, marker, prefix})
if walker == nil {
walker = startTreeWalk(fs.diskPath, bucket, filepath.FromSlash(prefix), filepath.FromSlash(marker), recursive)
}
nextMarker := ""
for i := 0; i < maxKeys; {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
return result, nil
}
// For any walk error return right away.
if walkResult.err != nil {
return ListObjectsInfo{}, probe.NewError(walkResult.err)
}
objInfo := walkResult.objectInfo
objInfo.Name = filepath.ToSlash(objInfo.Name)
// Skip temporary files.
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") {
continue
}
// For objects being directory and delimited we set Prefixes.
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
} else {
result.Objects = append(result.Objects, objInfo)
}
// We have listed everything return.
if walkResult.end {
return result, nil
}
nextMarker = objInfo.Name
i++
}
// We haven't exhaused the list yet, set IsTruncated to 'true' so
// that the client can send another request.
result.IsTruncated = true
result.NextMarker = nextMarker
fs.saveTreeWalk(listObjectParams{bucket, delimiter, nextMarker, prefix}, walker)
return result, nil
}

View file

@ -1,163 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"github.com/minio/minio/pkg/probe"
)
/// Bucket Operations
// DeleteBucket - delete a bucket.
func (fs Filesystem) DeleteBucket(bucket string) *probe.Error {
// Verify bucket is valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
bucket = getActualBucketname(fs.diskPath, bucket)
bucketDir := filepath.Join(fs.diskPath, bucket)
if e := os.Remove(bucketDir); e != nil {
// Error if there was no bucket in the first place.
if os.IsNotExist(e) {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
// On windows the string is slightly different, handle it here.
if strings.Contains(e.Error(), "directory is not empty") {
return probe.NewError(BucketNotEmpty{Bucket: bucket})
}
// Hopefully for all other operating systems, this is
// assumed to be consistent.
if strings.Contains(e.Error(), "directory not empty") {
return probe.NewError(BucketNotEmpty{Bucket: bucket})
}
return probe.NewError(e)
}
return nil
}
// ListBuckets - Get service.
func (fs Filesystem) ListBuckets() ([]BucketInfo, *probe.Error) {
files, e := ioutil.ReadDir(fs.diskPath)
if e != nil {
return []BucketInfo{}, probe.NewError(e)
}
var buckets []BucketInfo
for _, file := range files {
if !file.IsDir() {
// If not directory, ignore all file types.
continue
}
// If directories are found with odd names, skip them.
dirName := strings.ToLower(file.Name())
if !IsValidBucketName(dirName) {
continue
}
bucket := BucketInfo{
Name: dirName,
Created: file.ModTime(),
}
buckets = append(buckets, bucket)
}
// Remove duplicated entries.
buckets = removeDuplicateBuckets(buckets)
return buckets, nil
}
// removeDuplicateBuckets - remove duplicate buckets.
func removeDuplicateBuckets(buckets []BucketInfo) []BucketInfo {
length := len(buckets) - 1
for i := 0; i < length; i++ {
for j := i + 1; j <= length; j++ {
if buckets[i].Name == buckets[j].Name {
if buckets[i].Created.Sub(buckets[j].Created) > 0 {
buckets[i] = buckets[length]
} else {
buckets[j] = buckets[length]
}
buckets = buckets[0:length]
length--
j--
}
}
}
return buckets
}
// MakeBucket - PUT Bucket
func (fs Filesystem) MakeBucket(bucket string) *probe.Error {
if _, e := fs.checkBucketArg(bucket); e == nil {
return probe.NewError(BucketExists{Bucket: bucket})
} else if _, ok := e.(BucketNameInvalid); ok {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
bucketDir := filepath.Join(fs.diskPath, bucket)
// Make bucket.
if e := os.Mkdir(bucketDir, 0700); e != nil {
return probe.NewError(e)
}
return nil
}
// getActualBucketname - will convert incoming bucket names to
// corresponding actual bucketnames on the backend in a platform
// compatible way for all operating systems.
func getActualBucketname(fsPath, bucket string) string {
fd, e := os.Open(fsPath)
if e != nil {
return bucket
}
buckets, e := fd.Readdirnames(-1)
if e != nil {
return bucket
}
for _, b := range buckets {
// Verify if lowercase version of the bucket is equal
// to the incoming bucket, then use the proper name.
if strings.ToLower(b) == bucket {
return b
}
}
return bucket
}
// GetBucketInfo - get bucket metadata.
func (fs Filesystem) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) {
if !IsValidBucketName(bucket) {
return BucketInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
bucket = getActualBucketname(fs.diskPath, bucket)
// Get bucket path.
bucketDir := filepath.Join(fs.diskPath, bucket)
fi, e := os.Stat(bucketDir)
if e != nil {
// Check if bucket exists.
if os.IsNotExist(e) {
return BucketInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return BucketInfo{}, probe.NewError(e)
}
bucketMetadata := BucketInfo{}
bucketMetadata.Name = fi.Name()
bucketMetadata.Created = fi.ModTime()
return bucketMetadata, nil
}

View file

@ -1,256 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io/ioutil"
"os"
"strconv"
"strings"
"testing"
)
// The test not just includes asserting the correctness of the output,
// But also includes test cases for which the function should fail.
// For those cases for which it fails, its also asserted whether the function fails as expected.
func TestGetBucketInfo(t *testing.T) {
// Make a temporary directory to use as the fs.
directory, e := ioutil.TempDir("", "minio-metadata-test")
if e != nil {
t.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
t.Fatal(err)
}
// Creating few buckets.
for i := 0; i < 4; i++ {
err = fs.MakeBucket("meta-test-bucket." + strconv.Itoa(i))
if err != nil {
t.Fatal(err)
}
}
testCases := []struct {
bucketName string
metaData BucketInfo
e error
shouldPass bool
}{
// Test cases with invalid bucket names.
{".test", BucketInfo{}, BucketNameInvalid{Bucket: ".test"}, false},
{"Test", BucketInfo{}, BucketNameInvalid{Bucket: "Test"}, false},
{"---", BucketInfo{}, BucketNameInvalid{Bucket: "---"}, false},
{"ad", BucketInfo{}, BucketNameInvalid{Bucket: "ad"}, false},
// Test cases with non-existent buckets.
{"volatile-bucket-1", BucketInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false},
{"volatile-bucket-2", BucketInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false},
// Test cases with existing buckets.
{"meta-test-bucket.0", BucketInfo{Name: "meta-test-bucket.0"}, nil, true},
{"meta-test-bucket.1", BucketInfo{Name: "meta-test-bucket.1"}, nil, true},
{"meta-test-bucket.2", BucketInfo{Name: "meta-test-bucket.2"}, nil, true},
{"meta-test-bucket.3", BucketInfo{Name: "meta-test-bucket.3"}, nil, true},
}
for i, testCase := range testCases {
// The err returned is of type *probe.Error.
bucketInfo, err := fs.GetBucketInfo(testCase.bucketName)
if err != nil && testCase.shouldPass {
t.Errorf("Test %d: Expected to pass, but failed with: <ERROR> %s", i+1, err.Cause.Error())
}
if err == nil && !testCase.shouldPass {
t.Errorf("Test %d: Expected to fail with <ERROR> \"%s\", but passed instead", i+1, testCase.e.Error())
}
// Failed as expected, but does it fail for the expected reason.
if err != nil && !testCase.shouldPass {
if testCase.e.Error() != err.Cause.Error() {
t.Errorf("Test %d: Expected to fail with error \"%s\", but instead failed with error \"%s\" instead", i+1, testCase.e.Error(), err.Cause.Error())
}
}
// Since there are cases for which GetBucketInfo fails, this is necessary.
// Test passes as expected, but the output values are verified for correctness here.
if err == nil && testCase.shouldPass {
if testCase.bucketName != bucketInfo.Name {
t.Errorf("Test %d: Expected the bucket name to be \"%s\", but found \"%s\" instead", i+1, testCase.bucketName, bucketInfo.Name)
}
}
}
}
func TestListBuckets(t *testing.T) {
// Make a temporary directory to use as the fs.
directory, e := ioutil.TempDir("", "minio-benchmark")
if e != nil {
t.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
t.Fatal(err)
}
// Create a few buckets.
for i := 0; i < 10; i++ {
err = fs.MakeBucket("testbucket." + strconv.Itoa(i))
if err != nil {
t.Fatal(err)
}
}
// List, and ensure that they are all there.
metadatas, err := fs.ListBuckets()
if err != nil {
t.Fatal(err)
}
if len(metadatas) != 10 {
t.Errorf("incorrect length of metadatas (%d)\n", len(metadatas))
}
// Iterate over the buckets, ensuring that the name is correct.
for i := 0; i < len(metadatas); i++ {
if !strings.Contains(metadatas[i].Name, "testbucket") {
t.Fail()
}
}
}
func TestDeleteBucket(t *testing.T) {
// Make a temporary directory to use as the fs.
directory, e := ioutil.TempDir("", "minio-benchmark")
if e != nil {
t.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
t.Fatal(err)
}
// Deleting a bucket that doesn't exist should error.
err = fs.DeleteBucket("bucket")
if !strings.Contains(err.Cause.Error(), "Bucket not found:") {
t.Fail()
}
}
func BenchmarkListBuckets(b *testing.B) {
// Make a temporary directory to use as the fs.
directory, e := ioutil.TempDir("", "minio-benchmark")
if e != nil {
b.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
b.Fatal(err)
}
// Create a few buckets.
for i := 0; i < 20; i++ {
err = fs.MakeBucket("bucket." + strconv.Itoa(i))
if err != nil {
b.Fatal(err)
}
}
b.ResetTimer()
// List the buckets over and over and over.
for i := 0; i < b.N; i++ {
_, err = fs.ListBuckets()
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkDeleteBucket(b *testing.B) {
// Make a temporary directory to use as the fs.
directory, e := ioutil.TempDir("", "minio-benchmark")
if e != nil {
b.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Creating buckets takes time, so stop and start the timer.
b.StopTimer()
// Create and delete the bucket over and over.
err = fs.MakeBucket("bucket")
if err != nil {
b.Fatal(err)
}
b.StartTimer()
err = fs.DeleteBucket("bucket")
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkGetBucketInfo(b *testing.B) {
// Make a temporary directory to use as the fs.
directory, e := ioutil.TempDir("", "minio-benchmark")
if e != nil {
b.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
b.Fatal(err)
}
// Put up a bucket with some metadata.
err = fs.MakeBucket("bucket")
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Retrieve the metadata!
_, err := fs.GetBucketInfo("bucket")
if err != nil {
b.Fatal(err)
}
}
}

View file

@ -54,7 +54,8 @@ func (d byDirentName) Len() int { return len(d) }
func (d byDirentName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byDirentName) Less(i, j int) bool { return d[i].name < d[j].name }
// Using sort.Search() internally to jump to the file entry containing the prefix.
// Using sort.Search() internally to jump to the file entry containing
// the prefix.
func searchDirents(dirents []fsDirent, x string) int {
processFunc := func(i int) bool {
return dirents[i].name >= x
@ -64,9 +65,9 @@ func searchDirents(dirents []fsDirent, x string) int {
// Tree walk result carries results of tree walking.
type treeWalkResult struct {
objectInfo ObjectInfo
err error
end bool
fileInfo FileInfo
err error
end bool
}
// Tree walk notify carries a channel which notifies tree walk
@ -77,42 +78,42 @@ type treeWalker struct {
timedOut bool
}
// treeWalk walks FS directory tree recursively pushing ObjectInfo into the channel as and when it encounters files.
// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
// Convert dirent to ObjectInfo
direntToObjectInfo := func(dirent fsDirent) (ObjectInfo, error) {
objectInfo := ObjectInfo{}
// Convert dirent to FileInfo
direntToFileInfo := func(dirent fsDirent) (FileInfo, error) {
fileInfo := FileInfo{}
// Convert to full object name.
objectInfo.Name = filepath.Join(prefixDir, dirent.name)
fileInfo.Name = filepath.Join(prefixDir, dirent.name)
if dirent.modTime.IsZero() && dirent.size == 0 {
// ModifiedTime and Size are zero, Stat() and figure out
// the actual values that need to be set.
fi, err := os.Stat(filepath.Join(bucketDir, prefixDir, dirent.name))
if err != nil {
return ObjectInfo{}, err
return FileInfo{}, err
}
// Fill size and modtime.
objectInfo.ModifiedTime = fi.ModTime()
objectInfo.Size = fi.Size()
objectInfo.IsDir = fi.IsDir()
fileInfo.ModTime = fi.ModTime()
fileInfo.Size = fi.Size()
fileInfo.Mode = fi.Mode()
} else {
// If ModifiedTime or Size are set then use them
// If ModTime or Size are set then use them
// without attempting another Stat operation.
objectInfo.ModifiedTime = dirent.modTime
objectInfo.Size = dirent.size
objectInfo.IsDir = dirent.IsDir()
fileInfo.ModTime = dirent.modTime
fileInfo.Size = dirent.size
fileInfo.Mode = dirent.mode
}
if objectInfo.IsDir {
if fileInfo.Mode.IsDir() {
// Add os.PathSeparator suffix again for directories as
// filepath.Join would have removed it.
objectInfo.Size = 0
objectInfo.Name += string(os.PathSeparator)
fileInfo.Size = 0
fileInfo.Name += string(os.PathSeparator)
}
return objectInfo, nil
return fileInfo, nil
}
var markerBase, markerDir string
@ -158,13 +159,13 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b
}
continue
}
objectInfo, err := direntToObjectInfo(dirent)
fileInfo, err := direntToFileInfo(dirent)
if err != nil {
send(treeWalkResult{err: err})
return false
}
*count--
if !send(treeWalkResult{objectInfo: objectInfo}) {
if !send(treeWalkResult{fileInfo: fileInfo}) {
return false
}
}
@ -182,7 +183,7 @@ func startTreeWalk(fsPath, bucket, prefix, marker string, recursive bool) *treeW
// if prefix is "one/two/th" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/" and marker="three/four/five.txt"
// and entryPrefixMatch="th"
ch := make(chan treeWalkResult, listObjectsLimit)
ch := make(chan treeWalkResult, fsListLimit)
walkNotify := treeWalker{ch: ch}
entryPrefixMatch := prefix
prefixDir := ""
@ -196,8 +197,6 @@ func startTreeWalk(fsPath, bucket, prefix, marker string, recursive bool) *treeW
go func() {
defer close(ch)
send := func(walkResult treeWalkResult) bool {
// Add the bucket.
walkResult.objectInfo.Bucket = bucket
if count == 0 {
walkResult.end = true
}

View file

@ -98,7 +98,8 @@ func parseDirents(buf []byte) []fsDirent {
return dirents
}
// Read all directory entries, returns a list of lexically sorted entries.
// Read all directory entries, returns a list of lexically sorted
// entries.
func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
buf := make([]byte, readDirentBufSize)
f, err := os.Open(readDirPath)
@ -165,6 +166,5 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi
}
sort.Sort(byDirentName(dirents))
return dirents, nil
}

View file

@ -103,6 +103,5 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi
}
sort.Sort(byDirentName(dirents))
return dirents, nil
}

View file

@ -1,280 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"errors"
"os"
"path/filepath"
"strings"
"time"
)
func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) <-chan multipartObjectInfo {
objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit)
// TODO: check if bucketDir is absolute path
scanDir := bucketDir
dirDepth := bucketDir
if prefixPath != "" {
if !filepath.IsAbs(prefixPath) {
tmpPrefixPath := filepath.Join(bucketDir, prefixPath)
if strings.HasSuffix(prefixPath, string(os.PathSeparator)) {
tmpPrefixPath += string(os.PathSeparator)
}
prefixPath = tmpPrefixPath
}
// TODO: check if prefixPath starts with bucketDir
// Case #1: if prefixPath is /mnt/mys3/mybucket/2012/photos/paris, then
// dirDepth is /mnt/mys3/mybucket/2012/photos
// Case #2: if prefixPath is /mnt/mys3/mybucket/2012/photos/, then
// dirDepth is /mnt/mys3/mybucket/2012/photos
dirDepth = filepath.Dir(prefixPath)
scanDir = dirDepth
} else {
prefixPath = bucketDir
}
if markerPath != "" {
if !filepath.IsAbs(markerPath) {
tmpMarkerPath := filepath.Join(bucketDir, markerPath)
if strings.HasSuffix(markerPath, string(os.PathSeparator)) {
tmpMarkerPath += string(os.PathSeparator)
}
markerPath = tmpMarkerPath
}
// TODO: check markerPath must be a file
if uploadIDMarker != "" {
markerPath = filepath.Join(markerPath, uploadIDMarker+multipartUploadIDSuffix)
}
// TODO: check if markerPath starts with bucketDir
// TODO: check if markerPath starts with prefixPath
// Case #1: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png, then
// scanDir is /mnt/mys3/mybucket/2012/photos
// Case #2: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png/1fbd117a-268a-4ed0-85c9-8cc3888cbf20.uploadid, then
// scanDir is /mnt/mys3/mybucket/2012/photos/gophercon.png
// Case #3: if markerPath is /mnt/mys3/mybucket/2012/photos/, then
// scanDir is /mnt/mys3/mybucket/2012/photos
scanDir = filepath.Dir(markerPath)
} else {
markerPath = bucketDir
}
// Have bucketDir ends with os.PathSeparator
if !strings.HasSuffix(bucketDir, string(os.PathSeparator)) {
bucketDir += string(os.PathSeparator)
}
// Remove os.PathSeparator if scanDir ends with
if strings.HasSuffix(scanDir, string(os.PathSeparator)) {
scanDir = filepath.Dir(scanDir)
}
// goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel.
go func() {
defer close(objectInfoCh)
// send function - returns true if ObjectInfo is sent
// within (time.Second * 15) else false on timeout.
send := func(oi multipartObjectInfo) bool {
timer := time.After(time.Second * 15)
select {
case objectInfoCh <- oi:
return true
case <-timer:
return false
}
}
// filter function - filters directory entries matching multipart uploadids, prefix and marker
direntFilterFn := func(dirent fsDirent) bool {
// check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string
if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) {
// return if dirent's name starts with prefixPath and lexically higher than markerPath
return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath
}
return false
}
// filter function - filters directory entries matching multipart uploadids
subDirentFilterFn := func(dirent fsDirent) bool {
// check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string
return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix))
}
// lastObjInfo is used to save last object info which is sent at last with End=true
var lastObjInfo *multipartObjectInfo
sendError := func(err error) {
if lastObjInfo != nil {
if !send(*lastObjInfo) {
// as we got error sending lastObjInfo, we can't send the error
return
}
}
send(multipartObjectInfo{Err: err, End: true})
return
}
for {
dirents, err := scandir(scanDir, direntFilterFn, false)
if err != nil {
sendError(err)
return
}
var dirent fsDirent
for len(dirents) > 0 {
dirent, dirents = dirents[0], dirents[1:]
if dirent.IsRegular() {
// Handle uploadid file
name := strings.Replace(filepath.Dir(dirent.name), bucketDir, "", 1)
if name == "" {
// This should not happen ie uploadid file should not be in bucket directory
sendError(errors.New("Corrupted metadata"))
return
}
uploadID := strings.Split(filepath.Base(dirent.name), multipartUploadIDSuffix)[0]
// Solaris and older unixes have modTime to be
// empty, fallback to os.Stat() to fill missing values.
if dirent.modTime.IsZero() {
if fi, e := os.Stat(dirent.name); e == nil {
dirent.modTime = fi.ModTime()
} else {
sendError(e)
return
}
}
objInfo := multipartObjectInfo{
Name: name,
UploadID: uploadID,
ModifiedTime: dirent.modTime,
}
// as we got new object info, send last object info and keep new object info as last object info
if lastObjInfo != nil {
if !send(*lastObjInfo) {
return
}
}
lastObjInfo = &objInfo
continue
}
// Fetch sub dirents.
subDirents, err := scandir(dirent.name, subDirentFilterFn, false)
if err != nil {
sendError(err)
return
}
subDirFound := false
uploadIDDirents := []fsDirent{}
// If subDirents has a directory, then current dirent needs to be sent
for _, subdirent := range subDirents {
if subdirent.IsDir() {
subDirFound = true
if recursive {
break
}
}
if !recursive && subdirent.IsRegular() {
uploadIDDirents = append(uploadIDDirents, subdirent)
}
}
// Send directory only for non-recursive listing
if !recursive && (subDirFound || len(subDirents) == 0) {
// Solaris and older unixes have modTime to be
// empty, fallback to os.Stat() to fill missing values.
if dirent.modTime.IsZero() {
if fi, e := os.Stat(dirent.name); e == nil {
dirent.modTime = fi.ModTime()
} else {
sendError(e)
return
}
}
objInfo := multipartObjectInfo{
Name: strings.Replace(dirent.name, bucketDir, "", 1),
ModifiedTime: dirent.modTime,
IsDir: true,
}
// as we got new object info, send last object info and keep new object info as last object info
if lastObjInfo != nil {
if !send(*lastObjInfo) {
return
}
}
lastObjInfo = &objInfo
}
if recursive {
dirents = append(subDirents, dirents...)
} else {
dirents = append(uploadIDDirents, dirents...)
}
}
if !recursive {
break
}
markerPath = scanDir + string(os.PathSeparator)
if scanDir = filepath.Dir(scanDir); scanDir < dirDepth {
break
}
}
if lastObjInfo != nil {
// we got last object
lastObjInfo.End = true
if !send(*lastObjInfo) {
return
}
}
}()
return objectInfoCh
}
// multipartObjectInfo - Multipart object info
type multipartObjectInfo struct {
Name string
UploadID string
ModifiedTime time.Time
IsDir bool
Err error
End bool
}

View file

@ -1,685 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015,2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
"github.com/skyrings/skyring-common/tools/uuid"
)
const (
minioMetaDir = ".minio"
multipartUploadIDSuffix = ".uploadid"
)
// Removes files and its parent directories up to a given level.
func removeFileTree(fileName string, level string) error {
if e := os.Remove(fileName); e != nil {
return e
}
for fileDir := filepath.Dir(fileName); fileDir > level; fileDir = filepath.Dir(fileDir) {
if status, e := isDirEmpty(fileDir); e != nil {
return e
} else if !status {
break
}
if e := os.Remove(fileDir); e != nil {
return e
}
}
return nil
}
// Takes an input stream and safely writes to disk, additionally
// verifies checksum.
func safeWriteFile(fileName string, data io.Reader, size int64, md5sum string) error {
safeFile, e := safe.CreateFileWithSuffix(fileName, "-")
if e != nil {
return e
}
md5Hasher := md5.New()
multiWriter := io.MultiWriter(md5Hasher, safeFile)
if size > 0 {
if _, e = io.CopyN(multiWriter, data, size); e != nil {
// Closes the file safely and removes it in a single atomic operation.
safeFile.CloseAndRemove()
return e
}
} else {
if _, e = io.Copy(multiWriter, data); e != nil {
// Closes the file safely and removes it in a single atomic operation.
safeFile.CloseAndRemove()
return e
}
}
dataMd5sum := hex.EncodeToString(md5Hasher.Sum(nil))
if md5sum != "" && !isMD5SumEqual(md5sum, dataMd5sum) {
// Closes the file safely and removes it in a single atomic operation.
safeFile.CloseAndRemove()
return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum}
}
// Safely close the file and atomically renames it the actual filePath.
safeFile.Close()
// Safely wrote the file.
return nil
}
func isFileExist(filename string) (bool, error) {
fi, e := os.Lstat(filename)
if e != nil {
if os.IsNotExist(e) {
return false, nil
}
return false, e
}
return fi.Mode().IsRegular(), nil
}
// Create an s3 compatible MD5sum for complete multipart transaction.
func makeS3MD5(md5Strs ...string) (string, *probe.Error) {
var finalMD5Bytes []byte
for _, md5Str := range md5Strs {
md5Bytes, e := hex.DecodeString(md5Str)
if e != nil {
return "", probe.NewError(e)
}
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
}
md5Hasher := md5.New()
md5Hasher.Write(finalMD5Bytes)
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs))
return s3MD5, nil
}
func (fs Filesystem) newUploadID(bucket, object string) (string, error) {
metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object)
// create metaObjectDir if not exist
if status, e := isDirExist(metaObjectDir); e != nil {
return "", e
} else if !status {
if e := os.MkdirAll(metaObjectDir, 0755); e != nil {
return "", e
}
}
for {
uuid, e := uuid.New()
if e != nil {
return "", e
}
uploadID := uuid.String()
uploadIDFile := filepath.Join(metaObjectDir, uploadID+multipartUploadIDSuffix)
if _, e := os.Lstat(uploadIDFile); e != nil {
if !os.IsNotExist(e) {
return "", e
}
// uploadIDFile doesn't exist, so create empty file to reserve the name
if e := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); e != nil {
return "", e
}
return uploadID, nil
}
// uploadIDFile already exists.
// loop again to try with different uuid generated.
}
}
func (fs Filesystem) isUploadIDExist(bucket, object, uploadID string) (bool, error) {
return isFileExist(filepath.Join(fs.diskPath, minioMetaDir, bucket, object, uploadID+multipartUploadIDSuffix))
}
func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error {
metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object)
uploadIDPrefix := uploadID + "."
dirents, e := scandir(metaObjectDir,
func(dirent fsDirent) bool {
return dirent.IsRegular() && strings.HasPrefix(dirent.name, uploadIDPrefix)
},
true)
if e != nil {
return e
}
for _, dirent := range dirents {
if e := os.Remove(filepath.Join(metaObjectDir, dirent.name)); e != nil {
return e
}
}
if status, e := isDirEmpty(metaObjectDir); e != nil {
return e
} else if status {
if e := removeFileTree(metaObjectDir, filepath.Join(fs.diskPath, minioMetaDir, bucket)); e != nil {
return e
}
}
return nil
}
func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) {
bucket, e := fs.checkBucketArg(bucket)
if e != nil {
return "", e
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Object: object}
}
return bucket, nil
}
// NewMultipartUpload - initiate a new multipart session
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
bucket = bucketDirName
} else {
return "", probe.NewError(e)
}
if e := checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil {
return "", probe.NewError(e)
}
uploadID, e := fs.newUploadID(bucket, object)
if e != nil {
return "", probe.NewError(e)
}
return uploadID, nil
}
// PutObjectPart - create a part in a multipart session
func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) {
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
bucket = bucketDirName
} else {
return "", probe.NewError(e)
}
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
//return "", probe.NewError(InternalError{Err: err})
return "", probe.NewError(e)
} else if !status {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
}
// Part id cannot be negative.
if partNumber <= 0 {
return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero"))
}
if partNumber > 10000 {
return "", probe.NewError(errors.New("invalid part id, should be not more than 10000"))
}
if e := checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil {
return "", probe.NewError(e)
}
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex)
partFilePath := filepath.Join(fs.diskPath, minioMetaDir, bucket, object, partSuffix)
if e := safeWriteFile(partFilePath, data, size, md5Hex); e != nil {
return "", probe.NewError(e)
}
return md5Hex, nil
}
// AbortMultipartUpload - abort an incomplete multipart session
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
bucket = bucketDirName
} else {
return probe.NewError(e)
}
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
//return probe.NewError(InternalError{Err: err})
return probe.NewError(e)
} else if !status {
return probe.NewError(InvalidUploadID{UploadID: uploadID})
}
if e := fs.cleanupUploadID(bucket, object, uploadID); e != nil {
return probe.NewError(e)
}
return nil
}
// CompleteMultipartUpload - complete a multipart upload and persist the data
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) {
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
bucket = bucketDirName
} else {
return ObjectInfo{}, probe.NewError(e)
}
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
//return probe.NewError(InternalError{Err: err})
return ObjectInfo{}, probe.NewError(e)
} else if !status {
return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
if e := checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil {
return ObjectInfo{}, probe.NewError(e)
}
metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object)
var md5Sums []string
for _, part := range parts {
partNumber := part.PartNumber
md5sum := strings.Trim(part.ETag, "\"")
partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum)
if status, err := isFileExist(partFile); err != nil {
return ObjectInfo{}, probe.NewError(err)
} else if !status {
return ObjectInfo{}, probe.NewError(InvalidPart{})
}
md5Sums = append(md5Sums, md5sum)
}
// Save the s3 md5.
s3MD5, err := makeS3MD5(md5Sums...)
if err != nil {
return ObjectInfo{}, err.Trace(md5Sums...)
}
completeObjectFile := filepath.Join(metaObjectDir, uploadID+".complete.")
safeFile, e := safe.CreateFileWithSuffix(completeObjectFile, "-")
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
for _, part := range parts {
partNumber := part.PartNumber
// Trim off the odd double quotes from ETag in the beginning and end.
md5sum := strings.TrimPrefix(part.ETag, "\"")
md5sum = strings.TrimSuffix(md5sum, "\"")
partFileStr := filepath.Join(metaObjectDir, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5sum))
var partFile *os.File
partFile, e = os.Open(partFileStr)
if e != nil {
// Remove the complete file safely.
safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e)
} else if _, e = io.Copy(safeFile, partFile); e != nil {
// Remove the complete file safely.
safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e)
}
partFile.Close() // Close part file after successful copy.
}
// All parts concatenated, safely close the temp file.
safeFile.Close()
// Stat to gather fresh stat info.
objSt, e := os.Stat(completeObjectFile)
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
bucketPath := filepath.Join(fs.diskPath, bucket)
objectPath := filepath.Join(bucketPath, object)
if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil {
os.Remove(completeObjectFile)
return ObjectInfo{}, probe.NewError(e)
}
if e = os.Rename(completeObjectFile, objectPath); e != nil {
os.Remove(completeObjectFile)
return ObjectInfo{}, probe.NewError(e)
}
fs.cleanupUploadID(bucket, object, uploadID) // TODO: handle and log the error
contentType := "application/octet-stream"
if objectExt := filepath.Ext(objectPath); objectExt != "" {
if content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]; ok {
contentType = content.ContentType
}
}
newObject := ObjectInfo{
Bucket: bucket,
Name: object,
ModifiedTime: objSt.ModTime(),
Size: objSt.Size(),
ContentType: contentType,
MD5Sum: s3MD5,
}
return newObject, nil
}
func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch <-chan multipartObjectInfo) {
fs.listMultipartObjectMapMutex.Lock()
defer fs.listMultipartObjectMapMutex.Unlock()
channels := []<-chan multipartObjectInfo{ch}
if _, ok := fs.listMultipartObjectMap[params]; ok {
channels = append(fs.listMultipartObjectMap[params], ch)
}
fs.listMultipartObjectMap[params] = channels
}
func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) <-chan multipartObjectInfo {
fs.listMultipartObjectMapMutex.Lock()
defer fs.listMultipartObjectMapMutex.Unlock()
if channels, ok := fs.listMultipartObjectMap[params]; ok {
var channel <-chan multipartObjectInfo
channel, channels = channels[0], channels[1:]
if len(channels) > 0 {
fs.listMultipartObjectMap[params] = channels
} else {
// do not store empty channel list
delete(fs.listMultipartObjectMap, params)
}
return channel
}
return nil
}
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) {
result := ListMultipartsInfo{}
bucket, e := fs.checkBucketArg(bucket)
if e != nil {
return result, probe.NewError(e)
}
if !IsValidObjectPrefix(objectPrefix) {
return result, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: objectPrefix})
}
prefixPath := filepath.FromSlash(objectPrefix)
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != "/" {
return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter))
}
if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) {
return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix))
}
markerPath := filepath.FromSlash(keyMarker)
if uploadIDMarker != "" {
if strings.HasSuffix(markerPath, string(os.PathSeparator)) {
return result, probe.NewError(fmt.Errorf("Invalid combination of uploadID marker '%s' and marker '%s'", uploadIDMarker, keyMarker))
}
id, e := uuid.Parse(uploadIDMarker)
if e != nil {
return result, probe.NewError(e)
}
if id.IsZero() {
return result, probe.NewError(fmt.Errorf("Invalid upload ID marker %s", uploadIDMarker))
}
}
// Return empty response if maxUploads is zero
if maxUploads == 0 {
return result, nil
}
// set listObjectsLimit to maxUploads for out-of-range limit
if maxUploads < 0 || maxUploads > listObjectsLimit {
maxUploads = listObjectsLimit
}
recursive := true
if delimiter == "/" {
recursive = false
}
metaBucketDir := filepath.Join(fs.diskPath, minioMetaDir, bucket)
// Lookup of if listMultipartObjectChannel is available for given
// parameters, else create a new one.
savedChannel := true
multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{
bucket: bucket,
delimiter: delimiter,
keyMarker: markerPath,
prefix: prefixPath,
uploadIDMarker: uploadIDMarker,
})
if multipartObjectInfoCh == nil {
multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive)
savedChannel = false
}
var objInfo *multipartObjectInfo
nextKeyMarker := ""
nextUploadIDMarker := ""
for i := 0; i < maxUploads; {
// read the channel
if oi, ok := <-multipartObjectInfoCh; ok {
objInfo = &oi
} else {
// closed channel
if i == 0 {
// first read
if !savedChannel {
// its valid to have a closed new channel for first read
multipartObjectInfoCh = nil
break
}
// invalid saved channel amd create new channel
multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, keyMarker,
uploadIDMarker, recursive)
} else {
// TODO: FIX: there is a chance of infinite loop if we get closed channel always
// the channel got closed due to timeout
// create a new channel
multipartObjectInfoCh = scanMultipartDir(metaBucketDir, objectPrefix, nextKeyMarker,
nextUploadIDMarker, recursive)
}
// make it as new channel
savedChannel = false
continue
}
if objInfo.Err != nil {
if os.IsNotExist(objInfo.Err) {
return ListMultipartsInfo{}, nil
}
return ListMultipartsInfo{}, probe.NewError(objInfo.Err)
}
// backward compatibility check
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") {
continue
}
// Directories are listed only if recursive is false
if objInfo.IsDir {
result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name)
} else {
result.Uploads = append(result.Uploads, uploadMetadata{
Object: objInfo.Name,
UploadID: objInfo.UploadID,
Initiated: objInfo.ModifiedTime,
})
}
nextKeyMarker = objInfo.Name
nextUploadIDMarker = objInfo.UploadID
i++
if objInfo.End {
// as we received last object, do not save this channel for later use
multipartObjectInfoCh = nil
break
}
}
if multipartObjectInfoCh != nil {
// we haven't received last object
result.IsTruncated = true
result.NextKeyMarker = nextKeyMarker
result.NextUploadIDMarker = nextUploadIDMarker
// save this channel for later use
fs.saveListMultipartObjectCh(listMultipartObjectParams{
bucket: bucket,
delimiter: delimiter,
keyMarker: nextKeyMarker,
prefix: objectPrefix,
uploadIDMarker: nextUploadIDMarker,
}, multipartObjectInfoCh)
}
return result, nil
}
// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata
func (fs Filesystem) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) {
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
bucket = bucketDirName
} else {
return ListPartsInfo{}, probe.NewError(err)
}
if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil {
//return probe.NewError(InternalError{Err: err})
return ListPartsInfo{}, probe.NewError(err)
} else if !status {
return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
// return empty ListPartsInfo
if maxParts == 0 {
return ListPartsInfo{}, nil
}
if maxParts < 0 || maxParts > 1000 {
maxParts = 1000
}
metaObjectDir := filepath.Join(fs.diskPath, minioMetaDir, bucket, object)
uploadIDPrefix := uploadID + "."
dirents, e := scandir(metaObjectDir,
func(dirent fsDirent) bool {
// Part file is a regular file and has to be started with 'UPLOADID.'
if !(dirent.IsRegular() && strings.HasPrefix(dirent.name, uploadIDPrefix)) {
return false
}
// Valid part file has to be 'UPLOADID.PARTNUMBER.MD5SUM'
tokens := strings.Split(dirent.name, ".")
if len(tokens) != 3 {
return false
}
if partNumber, err := strconv.Atoi(tokens[1]); err == nil {
if partNumber >= 1 && partNumber <= 10000 && partNumber > partNumberMarker {
return true
}
}
return false
},
true)
if e != nil {
return ListPartsInfo{}, probe.NewError(e)
}
isTruncated := false
nextPartNumberMarker := 0
parts := []partInfo{}
for i := range dirents {
if i == maxParts {
isTruncated = true
break
}
// In some OS modTime is empty and use os.Stat() to fill missing values
if dirents[i].modTime.IsZero() {
if fi, e := os.Stat(filepath.Join(metaObjectDir, dirents[i].name)); e == nil {
dirents[i].modTime = fi.ModTime()
dirents[i].size = fi.Size()
} else {
return ListPartsInfo{}, probe.NewError(e)
}
}
tokens := strings.Split(dirents[i].name, ".")
partNumber, _ := strconv.Atoi(tokens[1])
md5sum := tokens[2]
parts = append(parts, partInfo{
PartNumber: partNumber,
LastModified: dirents[i].modTime,
ETag: md5sum,
Size: dirents[i].size,
})
}
if isTruncated {
nextPartNumberMarker = 0
}
return ListPartsInfo{
Bucket: bucket,
Object: object,
UploadID: uploadID,
PartNumberMarker: partNumberMarker,
NextPartNumberMarker: nextPartNumberMarker,
MaxParts: maxParts,
IsTruncated: isTruncated,
Parts: parts,
}, nil
}

View file

@ -1,338 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"bytes"
"crypto/md5"
"io"
"os"
"path/filepath"
"strings"
"encoding/hex"
"runtime"
"github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
)
/// Object Operations
// GetObject - GET object
func (fs Filesystem) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) {
// Input validation.
bucket, e := fs.checkBucketArg(bucket)
if e != nil {
return nil, probe.NewError(e)
}
if !IsValidObjectName(object) {
return nil, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
objectPath := filepath.Join(fs.diskPath, bucket, object)
file, e := os.Open(objectPath)
if e != nil {
// If the object doesn't exist, the bucket might not exist either. Stat for
// the bucket and give a better error message if that is true.
if os.IsNotExist(e) {
_, e = os.Stat(filepath.Join(fs.diskPath, bucket))
if os.IsNotExist(e) {
return nil, probe.NewError(BucketNotFound{Bucket: bucket})
}
return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return nil, probe.NewError(e)
}
// Initiate a cached stat operation on the file handler.
st, e := file.Stat()
if e != nil {
return nil, probe.NewError(e)
}
// Object path is a directory prefix, return object not found error.
if st.IsDir() {
return nil, probe.NewError(ObjectExistsAsPrefix{
Bucket: bucket,
Prefix: object,
})
}
// Seek to a starting offset.
_, e = file.Seek(startOffset, os.SEEK_SET)
if e != nil {
// When the "handle is invalid", the file might be a directory on Windows.
if runtime.GOOS == "windows" && strings.Contains(e.Error(), "handle is invalid") {
return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return nil, probe.NewError(e)
}
return file, nil
}
// GetObjectInfo - get object info.
func (fs Filesystem) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) {
// Input validation.
bucket, e := fs.checkBucketArg(bucket)
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
if !IsValidObjectName(object) {
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
info, err := getObjectInfo(fs.diskPath, bucket, object)
if err != nil {
if os.IsNotExist(err.ToGoError()) {
return ObjectInfo{}, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return ObjectInfo{}, err.Trace(bucket, object)
}
if info.IsDir {
return ObjectInfo{}, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return info, nil
}
// getObjectInfo - get object stat info.
func getObjectInfo(rootPath, bucket, object string) (ObjectInfo, *probe.Error) {
// Do not use filepath.Join() since filepath.Join strips off any
// object names with '/', use them as is in a static manner so
// that we can send a proper 'ObjectNotFound' reply back upon os.Stat().
var objectPath string
// For windows use its special os.PathSeparator == "\\"
if runtime.GOOS == "windows" {
objectPath = rootPath + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object
} else {
objectPath = rootPath + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object
}
stat, e := os.Stat(objectPath)
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
contentType := "application/octet-stream"
if runtime.GOOS == "windows" {
object = filepath.ToSlash(object)
}
if objectExt := filepath.Ext(object); objectExt != "" {
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
if ok {
contentType = content.ContentType
}
}
objInfo := ObjectInfo{
Bucket: bucket,
Name: object,
ModifiedTime: stat.ModTime(),
Size: stat.Size(),
ContentType: contentType,
IsDir: stat.Mode().IsDir(),
}
return objInfo, nil
}
// isMD5SumEqual - returns error if md5sum mismatches, success its `nil`
func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool {
// Verify the md5sum.
if expectedMD5Sum != "" && actualMD5Sum != "" {
// Decode md5sum to bytes from their hexadecimal
// representations.
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return false
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return false
}
// Verify md5sum bytes are equal after successful decoding.
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return false
}
return true
}
return false
}
// PutObject - create an object.
func (fs Filesystem) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) {
// Check bucket name valid.
bucket, e := fs.checkBucketArg(bucket)
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
bucketDir := filepath.Join(fs.diskPath, bucket)
// Verify object path legal.
if !IsValidObjectName(object) {
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if e = checkDiskFree(fs.diskPath, fs.minFreeDisk); e != nil {
return ObjectInfo{}, probe.NewError(e)
}
// Get object path.
objectPath := filepath.Join(bucketDir, object)
// md5Hex representation.
var md5Hex string
if len(metadata) != 0 {
md5Hex = metadata["md5Sum"]
}
// Write object.
safeFile, e := safe.CreateFileWithPrefix(objectPath, md5Hex+"$tmpobject")
if e != nil {
switch e := e.(type) {
case *os.PathError:
if e.Op == "mkdir" {
if strings.Contains(e.Error(), "not a directory") {
return ObjectInfo{}, probe.NewError(ObjectExistsAsPrefix{Bucket: bucket, Prefix: object})
}
}
return ObjectInfo{}, probe.NewError(e)
default:
return ObjectInfo{}, probe.NewError(e)
}
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, safeFile)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, e = io.CopyN(multiWriter, data, size); e != nil {
safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e)
}
} else {
if _, e = io.Copy(multiWriter, data); e != nil {
safeFile.CloseAndRemove()
return ObjectInfo{}, probe.NewError(e)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
return ObjectInfo{}, probe.NewError(BadDigest{md5Hex, newMD5Hex})
}
}
// Set stat again to get the latest metadata.
st, e := os.Stat(safeFile.Name())
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
contentType := "application/octet-stream"
if objectExt := filepath.Ext(objectPath); objectExt != "" {
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
if ok {
contentType = content.ContentType
}
}
newObject := ObjectInfo{
Bucket: bucket,
Name: object,
ModifiedTime: st.ModTime(),
Size: st.Size(),
MD5Sum: newMD5Hex,
ContentType: contentType,
}
// Safely close and atomically rename the file.
safeFile.Close()
return newObject, nil
}
// deleteObjectPath - delete object path if its empty.
func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error {
if basePath == deletePath {
return nil
}
// Verify if the path exists.
pathSt, e := os.Stat(deletePath)
if e != nil {
if os.IsNotExist(e) {
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return probe.NewError(e)
}
if pathSt.IsDir() {
// Verify if directory is empty.
empty, e := isDirEmpty(deletePath)
if e != nil {
return probe.NewError(e)
}
if !empty {
return nil
}
}
// Attempt to remove path.
if e := os.Remove(deletePath); e != nil {
return probe.NewError(e)
}
// Recursively go down the next path and delete again.
if err := deleteObjectPath(basePath, filepath.Dir(deletePath), bucket, object); err != nil {
return err.Trace(basePath, deletePath, bucket, object)
}
return nil
}
// DeleteObject - delete object.
func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error {
// Check bucket name valid
bucket, e := fs.checkBucketArg(bucket)
if e != nil {
return probe.NewError(e)
}
bucketDir := filepath.Join(fs.diskPath, bucket)
// Verify object path legal
if !IsValidObjectName(object) {
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// Do not use filepath.Join() since filepath.Join strips off any
// object names with '/', use them as is in a static manner so
// that we can send a proper 'ObjectNotFound' reply back upon os.Stat().
var objectPath string
if runtime.GOOS == "windows" {
objectPath = fs.diskPath + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object
} else {
objectPath = fs.diskPath + string(os.PathSeparator) + bucket + string(os.PathSeparator) + object
}
// Delete object path if its empty.
err := deleteObjectPath(bucketDir, objectPath, bucket, object)
if err != nil {
if os.IsNotExist(err.ToGoError()) {
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return err.Trace(bucketDir, objectPath, bucket, object)
}
return nil
}

547
fs.go
View file

@ -1,5 +1,5 @@
/*
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,96 +17,513 @@
package main
import (
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
)
// listObjectParams - list object params used for list object map
type listObjectParams struct {
const (
fsListLimit = 1000
)
// listParams - list object params used for list object map
type listParams struct {
bucket string
delimiter string
recursive bool
marker string
prefix string
}
// listMultipartObjectParams - list multipart object params used for list multipart object map
type listMultipartObjectParams struct {
bucket string
delimiter string
keyMarker string
prefix string
uploadIDMarker string
// fsStorage - implements StorageAPI interface.
type fsStorage struct {
diskPath string
diskInfo disk.Info
minFreeDisk int64
rwLock *sync.RWMutex
listObjectMap map[listParams][]*treeWalker
listObjectMapMutex *sync.Mutex
}
// Filesystem - local variables
type Filesystem struct {
diskPath string
minFreeDisk int64
rwLock *sync.RWMutex
listObjectMap map[listObjectParams][]*treeWalker
listObjectMapMutex *sync.Mutex
listMultipartObjectMap map[listMultipartObjectParams][]<-chan multipartObjectInfo
listMultipartObjectMapMutex *sync.Mutex
}
// newFS instantiate a new filesystem.
func newFS(diskPath string) (ObjectAPI, *probe.Error) {
fs := &Filesystem{
rwLock: &sync.RWMutex{},
// isDirEmpty - returns whether given directory is empty or not.
func isDirEmpty(dirname string) (status bool, err error) {
f, err := os.Open(dirname)
if err == nil {
defer f.Close()
if _, err = f.Readdirnames(1); err == io.EOF {
status = true
err = nil
}
}
fs.diskPath = diskPath
return status, err
}
/// Defaults
// Minium free disk required for i/o operations to succeed.
fs.minFreeDisk = 5
// isDirExist - returns whether given directory exists or not.
func isDirExist(dirname string) (bool, error) {
fi, e := os.Lstat(dirname)
if e != nil {
if os.IsNotExist(e) {
return false, nil
}
return false, e
}
return fi.IsDir(), nil
}
// Initialize list object map.
fs.listObjectMap = make(map[listObjectParams][]*treeWalker)
fs.listObjectMapMutex = &sync.Mutex{}
// Initialize list multipart map.
fs.listMultipartObjectMap = make(map[listMultipartObjectParams][]<-chan multipartObjectInfo)
fs.listMultipartObjectMapMutex = &sync.Mutex{}
// Return here.
// Initialize a new storage disk.
func newFS(diskPath string) (StorageAPI, error) {
if diskPath == "" {
return nil, errInvalidArgument
}
st, e := os.Stat(diskPath)
if e != nil {
return nil, e
}
if !st.IsDir() {
return nil, syscall.ENOTDIR
}
diskInfo, e := disk.GetInfo(diskPath)
if e != nil {
return nil, e
}
fs := fsStorage{
diskPath: diskPath,
diskInfo: diskInfo,
minFreeDisk: 5, // Minimum 5% disk should be free.
listObjectMap: make(map[listParams][]*treeWalker),
listObjectMapMutex: &sync.Mutex{},
rwLock: &sync.RWMutex{},
}
return fs, nil
}
func (fs Filesystem) checkBucketArg(bucket string) (string, error) {
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
// checkDiskFree verifies if disk path has sufficient minium free disk space.
func checkDiskFree(diskPath string, minFreeDisk int64) (err error) {
di, err := disk.GetInfo(diskPath)
if err != nil {
return err
}
bucket = getActualBucketname(fs.diskPath, bucket)
if status, e := isDirExist(filepath.Join(fs.diskPath, bucket)); !status {
if e == nil {
return "", BucketNotFound{Bucket: bucket}
} else if os.IsNotExist(e) {
return "", BucketNotFound{Bucket: bucket}
} else {
return "", e
}
}
return bucket, nil
}
func checkDiskFree(diskPath string, minFreeDisk int64) error {
di, e := disk.GetInfo(diskPath)
if e != nil {
return e
}
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
// Remove 5% from total space for cumulative disk
// space used for journalling, inodes etc.
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
if int64(availableDiskSpace) <= minFreeDisk {
return RootPathFull{Path: diskPath}
return errDiskPathFull
}
// Success.
return nil
}
// Make a volume entry.
func (s fsStorage) MakeVol(volume string) (err error) {
if volume == "" {
return errInvalidArgument
}
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
}
volumeDir := getVolumeDir(s.diskPath, volume)
if _, err = os.Stat(volumeDir); err == nil {
return errVolumeExists
}
// Make a volume entry.
if err = os.Mkdir(volumeDir, 0700); err != nil {
return err
}
return nil
}
// removeDuplicateVols - remove duplicate volumes.
func removeDuplicateVols(vols []VolInfo) []VolInfo {
length := len(vols) - 1
for i := 0; i < length; i++ {
for j := i + 1; j <= length; j++ {
if vols[i].Name == vols[j].Name {
// Pick the latest volume, if there is a duplicate.
if vols[i].Created.Sub(vols[j].Created) > 0 {
vols[i] = vols[length]
} else {
vols[j] = vols[length]
}
vols = vols[0:length]
length--
j--
}
}
}
return vols
}
// ListVols - list volumes.
func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
files, err := ioutil.ReadDir(s.diskPath)
if err != nil {
return nil, err
}
for _, file := range files {
if !file.IsDir() {
// If not directory, ignore all file types.
continue
}
volInfo := VolInfo{
Name: file.Name(),
Created: file.ModTime(),
}
volsInfo = append(volsInfo, volInfo)
}
// Remove duplicated volume entries.
volsInfo = removeDuplicateVols(volsInfo)
return volsInfo, nil
}
// getVolumeDir - will convert incoming volume names to
// corresponding valid volume names on the backend in a platform
// compatible way for all operating systems.
func getVolumeDir(diskPath, volume string) string {
volumes, e := ioutil.ReadDir(diskPath)
if e != nil {
return volume
}
for _, vol := range volumes {
// Verify if lowercase version of the volume
// is equal to the incoming volume, then use the proper name.
if strings.ToLower(vol.Name()) == volume {
return filepath.Join(diskPath, vol.Name())
}
}
return filepath.Join(diskPath, volume)
}
// StatVol - get volume info.
func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
if volume == "" {
return VolInfo{}, errInvalidArgument
}
volumeDir := getVolumeDir(s.diskPath, volume)
// Stat a volume entry.
var st os.FileInfo
st, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return VolInfo{}, errVolumeNotFound
}
return VolInfo{}, err
}
return VolInfo{
Name: st.Name(),
Created: st.ModTime(),
}, nil
}
// DeleteVol - delete a volume.
func (s fsStorage) DeleteVol(volume string) error {
if volume == "" {
return errInvalidArgument
}
err := os.Remove(getVolumeDir(s.diskPath, volume))
if err != nil && os.IsNotExist(err) {
return errVolumeNotFound
}
return err
}
// Save the goroutine reference in the map
func (s *fsStorage) saveTreeWalk(params listParams, walker *treeWalker) {
s.listObjectMapMutex.Lock()
defer s.listObjectMapMutex.Unlock()
walkers, _ := s.listObjectMap[params]
walkers = append(walkers, walker)
s.listObjectMap[params] = walkers
}
// Lookup the goroutine reference from map
func (s *fsStorage) lookupTreeWalk(params listParams) *treeWalker {
s.listObjectMapMutex.Lock()
defer s.listObjectMapMutex.Unlock()
if walkChs, ok := s.listObjectMap[params]; ok {
for i, walkCh := range walkChs {
if !walkCh.timedOut {
newWalkChs := walkChs[i+1:]
if len(newWalkChs) > 0 {
s.listObjectMap[params] = newWalkChs
} else {
delete(s.listObjectMap, params)
}
return walkCh
}
}
// As all channels are timed out, delete the map entry
delete(s.listObjectMap, params)
}
return nil
}
// GetRootPath - get root path.
func (fs Filesystem) GetRootPath() string {
return fs.diskPath
// List of special prefixes for files, includes old and new ones.
var specialPrefixes = []string{
"$multipart",
"$tmpobject",
"$tmpfile",
// Add new special prefixes if any used.
}
// List operation.
func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) ([]FileInfo, bool, error) {
if volume == "" {
return nil, true, errInvalidArgument
}
var fileInfos []FileInfo
volumeDir := getVolumeDir(s.diskPath, volume)
// Verify if volume directory exists
if exists, err := isDirExist(volumeDir); !exists {
if err == nil {
return nil, true, errVolumeNotFound
} else if os.IsNotExist(err) {
return nil, true, errVolumeNotFound
} else {
return nil, true, err
}
}
if marker != "" {
// Verify if marker has prefix.
if marker != "" && !strings.HasPrefix(marker, prefix) {
return nil, true, errInvalidArgument
}
}
// Return empty response for a valid request when count is 0.
if count == 0 {
return nil, true, nil
}
// Over flowing count - reset to fsListLimit.
if count < 0 || count > fsListLimit {
count = fsListLimit
}
// Verify if prefix exists.
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
prefixRootDir := filepath.Join(volumeDir, prefixDir)
if status, err := isDirExist(prefixRootDir); !status {
if err == nil {
// Prefix does not exist, not an error just respond empty list response.
return nil, true, nil
}
// Rest errors should be treated as failure.
return nil, true, err
}
// Maximum 1000 files returned in a single call.
// Further calls will set right marker value to continue reading the rest of the files.
// popTreeWalker returns nil if the call to ListFiles is done for the first time.
// On further calls to ListFiles to retrive more files within the timeout period,
// popTreeWalker returns the channel from which rest of the objects can be retrieved.
walker := s.lookupTreeWalk(listParams{volume, recursive, marker, prefix})
if walker == nil {
walker = startTreeWalk(s.diskPath, volume, filepath.FromSlash(prefix), filepath.FromSlash(marker), recursive)
}
nextMarker := ""
for i := 0; i < count; {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
return fileInfos, true, nil
}
// For any walk error return right away.
if walkResult.err != nil {
return nil, true, walkResult.err
}
fileInfo := walkResult.fileInfo
fileInfo.Name = filepath.ToSlash(fileInfo.Name)
// TODO: Find a proper place to skip these files.
// Skip temporary files.
for _, specialPrefix := range specialPrefixes {
if strings.Contains(fileInfo.Name, specialPrefix) {
if walkResult.end {
return fileInfos, true, nil
}
continue
}
}
fileInfos = append(fileInfos, fileInfo)
// We have listed everything return.
if walkResult.end {
return fileInfos, true, nil
}
nextMarker = fileInfo.Name
i++
}
s.saveTreeWalk(listParams{volume, recursive, nextMarker, prefix}, walker)
return fileInfos, false, nil
}
// ReadFile - read a file at a given offset.
func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) {
if volume == "" || path == "" {
return nil, errInvalidArgument
}
volumeDir := getVolumeDir(s.diskPath, volume)
// Verify if volume directory exists
var exists bool
if exists, err = isDirExist(volumeDir); !exists {
if err == nil {
return nil, errVolumeNotFound
} else if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else {
return nil, err
}
}
filePath := filepath.Join(volumeDir, path)
file, err := os.Open(filePath)
if err != nil {
if os.IsNotExist(err) {
return nil, errFileNotFound
}
return nil, err
}
st, err := file.Stat()
if err != nil {
return nil, err
}
// Verify if its not a regular file, since subsequent Seek is undefined.
if !st.Mode().IsRegular() {
return nil, errIsNotRegular
}
// Seek to requested offset.
_, err = file.Seek(offset, os.SEEK_SET)
if err != nil {
return nil, err
}
return file, nil
}
// CreateFile - create a file at path.
func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
if volume == "" || path == "" {
return nil, errInvalidArgument
}
if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil {
return nil, e
}
volumeDir := getVolumeDir(s.diskPath, volume)
// Verify if volume directory exists
if exists, err := isDirExist(volumeDir); !exists {
if err == nil {
return nil, errVolumeNotFound
} else if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else {
return nil, err
}
}
filePath := filepath.Join(volumeDir, path)
// Verify if the file already exists and is not of regular type.
if st, err := os.Stat(filePath); err == nil {
if st.IsDir() {
return nil, errIsNotRegular
}
}
return safe.CreateFileWithPrefix(filePath, "$tmpfile")
}
// StatFile - get file info.
func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) {
if volume == "" || path == "" {
return FileInfo{}, errInvalidArgument
}
volumeDir := getVolumeDir(s.diskPath, volume)
// Verify if volume directory exists
var exists bool
if exists, err = isDirExist(volumeDir); !exists {
if err == nil {
return FileInfo{}, errVolumeNotFound
} else if os.IsNotExist(err) {
return FileInfo{}, errVolumeNotFound
} else {
return FileInfo{}, err
}
}
filePath := filepath.Join(volumeDir, path)
st, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
return FileInfo{}, errFileNotFound
}
return FileInfo{}, err
}
if st.Mode().IsDir() {
return FileInfo{}, errIsNotRegular
}
file = FileInfo{
Volume: volume,
Name: path,
ModTime: st.ModTime(),
Size: st.Size(),
Mode: st.Mode(),
}
return file, nil
}
// deleteFile - delete file path if its empty.
func deleteFile(basePath, deletePath, volume, path string) error {
if basePath == deletePath {
return nil
}
// Verify if the path exists.
pathSt, e := os.Stat(deletePath)
if e != nil {
return e
}
if pathSt.IsDir() {
// Verify if directory is empty.
empty, e := isDirEmpty(deletePath)
if e != nil {
return e
}
if !empty {
return nil
}
}
// Attempt to remove path.
if e := os.Remove(deletePath); e != nil {
return e
}
// Recursively go down the next path and delete again.
if e := deleteFile(basePath, filepath.Dir(deletePath), volume, path); e != nil {
return e
}
return nil
}
// DeleteFile - delete a file at path.
func (s fsStorage) DeleteFile(volume, path string) error {
if volume == "" || path == "" {
return errInvalidArgument
}
volumeDir := getVolumeDir(s.diskPath, volume)
// Following code is needed so that we retain "/" suffix if any in
// path argument. Do not use filepath.Join() since it would strip
// off any suffixes.
filePath := s.diskPath + string(os.PathSeparator) + volume + string(os.PathSeparator) + path
// Delete file and delete parent directory as well if its empty.
return deleteFile(volumeDir, filePath, volume, path)
}

View file

@ -1,44 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io/ioutil"
"os"
. "gopkg.in/check.v1"
)
func (s *MyAPISuite) TestAPISuite(c *C) {
var storageList []string
create := func() ObjectAPI {
path, e := ioutil.TempDir(os.TempDir(), "minio-")
c.Check(e, IsNil)
storageList = append(storageList, path)
store, err := newFS(path)
c.Check(err, IsNil)
return store
}
APITestSuite(c, create)
defer removeRoots(c, storageList)
}
func removeRoots(c *C, roots []string) {
for _, root := range roots {
os.RemoveAll(root)
}
}

View file

@ -24,29 +24,34 @@ import (
"strconv"
"strings"
"testing"
"github.com/minio/minio/pkg/probe"
)
func TestListObjects(t *testing.T) {
// Make a temporary directory to use as the fs.
// Make a temporary directory to use as the obj.
directory, e := ioutil.TempDir("", "minio-list-object-test")
if e != nil {
t.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
t.Fatal(err)
// Create the obj.
fs, e := newFS(directory)
if e != nil {
t.Fatal(e)
}
obj := newObjectLayer(fs)
var err *probe.Error
// This bucket is used for testing ListObject operations.
err = fs.MakeBucket("test-bucket-list-object")
err = obj.MakeBucket("test-bucket-list-object")
if err != nil {
t.Fatal(err)
}
// Will not store any objects in this bucket,
// Its to test ListObjects on an empty bucket.
err = fs.MakeBucket("empty-bucket")
err = obj.MakeBucket("empty-bucket")
if err != nil {
t.Fatal(err)
}
@ -57,36 +62,36 @@ func TestListObjects(t *testing.T) {
}
defer os.Remove(tmpfile.Name()) // clean up
_, err = fs.PutObject("test-bucket-list-object", "Asia-maps", int64(len("asia-maps")), bytes.NewBufferString("asia-maps"), nil)
_, err = obj.PutObject("test-bucket-list-object", "Asia-maps", int64(len("asia-maps")), bytes.NewBufferString("asia-maps"), nil)
if err != nil {
t.Fatal(e)
}
_, err = fs.PutObject("test-bucket-list-object", "Asia/India/India-summer-photos-1", int64(len("contentstring")), bytes.NewBufferString("contentstring"), nil)
_, err = obj.PutObject("test-bucket-list-object", "Asia/India/India-summer-photos-1", int64(len("contentstring")), bytes.NewBufferString("contentstring"), nil)
if err != nil {
t.Fatal(e)
}
_, err = fs.PutObject("test-bucket-list-object", "Asia/India/Karnataka/Bangalore/Koramangala/pics", int64(len("contentstring")), bytes.NewBufferString("contentstring"), nil)
_, err = obj.PutObject("test-bucket-list-object", "Asia/India/Karnataka/Bangalore/Koramangala/pics", int64(len("contentstring")), bytes.NewBufferString("contentstring"), nil)
if err != nil {
t.Fatal(e)
}
for i := 0; i < 2; i++ {
key := "newPrefix" + strconv.Itoa(i)
_, err = fs.PutObject("test-bucket-list-object", key, int64(len(key)), bytes.NewBufferString(key), nil)
_, err = obj.PutObject("test-bucket-list-object", key, int64(len(key)), bytes.NewBufferString(key), nil)
if err != nil {
t.Fatal(err)
}
}
_, err = fs.PutObject("test-bucket-list-object", "newzen/zen/recurse/again/again/again/pics", int64(len("recurse")), bytes.NewBufferString("recurse"), nil)
_, err = obj.PutObject("test-bucket-list-object", "newzen/zen/recurse/again/again/again/pics", int64(len("recurse")), bytes.NewBufferString("recurse"), nil)
if err != nil {
t.Fatal(e)
}
for i := 0; i < 3; i++ {
key := "obj" + strconv.Itoa(i)
_, err = fs.PutObject("test-bucket-list-object", key, int64(len(key)), bytes.NewBufferString(key), nil)
_, err = obj.PutObject("test-bucket-list-object", key, int64(len(key)), bytes.NewBufferString(key), nil)
if err != nil {
t.Fatal(err)
}
@ -529,7 +534,7 @@ func TestListObjects(t *testing.T) {
}
for i, testCase := range testCases {
result, err := fs.ListObjects(testCase.bucketName, testCase.prefix, testCase.marker, testCase.delimeter, testCase.maxKeys)
result, err := obj.ListObjects(testCase.bucketName, testCase.prefix, testCase.marker, testCase.delimeter, testCase.maxKeys)
if err != nil && testCase.shouldPass {
t.Errorf("Test %d: Expected to pass, but failed with: <ERROR> %s", i+1, err.Cause.Error())
}
@ -565,28 +570,31 @@ func TestListObjects(t *testing.T) {
}
func BenchmarkListObjects(b *testing.B) {
// Make a temporary directory to use as the fs.
// Make a temporary directory to use as the obj.
directory, e := ioutil.TempDir("", "minio-list-benchmark")
if e != nil {
b.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
b.Fatal(err)
// Create the obj.
fs, e := newFS(directory)
if e != nil {
b.Fatal(e)
}
obj := newObjectLayer(fs)
var err *probe.Error
// Create a bucket.
err = fs.MakeBucket("ls-benchmark-bucket")
err = obj.MakeBucket("ls-benchmark-bucket")
if err != nil {
b.Fatal(err)
}
for i := 0; i < 20000; i++ {
key := "obj" + strconv.Itoa(i)
_, err = fs.PutObject("ls-benchmark-bucket", key, int64(len(key)), bytes.NewBufferString(key), nil)
_, err = obj.PutObject("ls-benchmark-bucket", key, int64(len(key)), bytes.NewBufferString(key), nil)
if err != nil {
b.Fatal(err)
}
@ -596,7 +604,7 @@ func BenchmarkListObjects(b *testing.B) {
// List the buckets over and over and over.
for i := 0; i < b.N; i++ {
_, err = fs.ListObjects("ls-benchmark-bucket", "", "obj9000", "", -1)
_, err = obj.ListObjects("ls-benchmark-bucket", "", "obj9000", "", -1)
if err != nil {
b.Fatal(err)
}

385
object-api.go Normal file
View file

@ -0,0 +1,385 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"path/filepath"
"strings"
"github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
)
type objectAPI struct {
storage StorageAPI
}
func newObjectLayer(storage StorageAPI) *objectAPI {
return &objectAPI{storage}
}
/// Bucket operations
// MakeBucket - make a bucket.
func (o objectAPI) MakeBucket(bucket string) *probe.Error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if e := o.storage.MakeVol(bucket); e != nil {
if e == errVolumeExists {
return probe.NewError(BucketExists{Bucket: bucket})
}
return probe.NewError(e)
}
return nil
}
// GetBucketInfo - get bucket info.
func (o objectAPI) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
vi, e := o.storage.StatVol(bucket)
if e != nil {
if e == errVolumeNotFound {
return BucketInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return BucketInfo{}, probe.NewError(e)
}
return BucketInfo{
Name: vi.Name,
Created: vi.Created,
}, nil
}
// ListBuckets - list buckets.
func (o objectAPI) ListBuckets() ([]BucketInfo, *probe.Error) {
var bucketInfos []BucketInfo
vols, e := o.storage.ListVols()
if e != nil {
return nil, probe.NewError(e)
}
for _, vol := range vols {
if !IsValidBucketName(vol.Name) {
continue
}
bucketInfos = append(bucketInfos, BucketInfo{vol.Name, vol.Created})
}
return bucketInfos, nil
}
// DeleteBucket - delete a bucket.
func (o objectAPI) DeleteBucket(bucket string) *probe.Error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if e := o.storage.DeleteVol(bucket); e != nil {
if e == errVolumeNotFound {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
return probe.NewError(e)
}
return nil
}
/// Object Operations
// GetObject - get an object.
func (o objectAPI) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return nil, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return nil, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
r, e := o.storage.ReadFile(bucket, object, startOffset)
if e != nil {
if e == errVolumeNotFound {
return nil, probe.NewError(BucketNotFound{Bucket: bucket})
} else if e == errFileNotFound {
return nil, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
}
return nil, probe.NewError(e)
}
return r, nil
}
// GetObjectInfo - get object info.
func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
fi, e := o.storage.StatFile(bucket, object)
if e != nil {
if e == errVolumeNotFound {
return ObjectInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
} else if e == errFileNotFound || e == errIsNotRegular {
return ObjectInfo{}, probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
// Handle more lower level errors if needed.
} else {
return ObjectInfo{}, probe.NewError(e)
}
}
contentType := "application/octet-stream"
if objectExt := filepath.Ext(object); objectExt != "" {
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
if ok {
contentType = content.ContentType
}
}
return ObjectInfo{
Bucket: fi.Volume,
Name: fi.Name,
ModTime: fi.ModTime,
Size: fi.Size,
IsDir: fi.Mode.IsDir(),
ContentType: contentType,
MD5Sum: "", // Read from metadata.
}, nil
}
func (o objectAPI) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{
Bucket: bucket,
Object: object,
})
}
fileWriter, e := o.storage.CreateFile(bucket, object)
if e != nil {
if e == errVolumeNotFound {
return ObjectInfo{}, probe.NewError(BucketNotFound{
Bucket: bucket,
})
} else if e == errIsNotRegular {
return ObjectInfo{}, probe.NewError(ObjectExistsAsPrefix{
Bucket: bucket,
Prefix: object,
})
}
return ObjectInfo{}, probe.NewError(e)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, e = io.CopyN(multiWriter, data, size); e != nil {
fileWriter.(*safe.File).CloseAndRemove()
return ObjectInfo{}, probe.NewError(e)
}
} else {
if _, e = io.Copy(multiWriter, data); e != nil {
fileWriter.(*safe.File).CloseAndRemove()
return ObjectInfo{}, probe.NewError(e)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
// md5Hex representation.
var md5Hex string
if len(metadata) != 0 {
md5Hex = metadata["md5Sum"]
}
if md5Hex != "" {
if newMD5Hex != md5Hex {
fileWriter.(*safe.File).CloseAndRemove()
return ObjectInfo{}, probe.NewError(BadDigest{md5Hex, newMD5Hex})
}
}
e = fileWriter.Close()
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
fi, e := o.storage.StatFile(bucket, object)
if e != nil {
return ObjectInfo{}, probe.NewError(e)
}
contentType := "application/octet-stream"
if objectExt := filepath.Ext(object); objectExt != "" {
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
if ok {
contentType = content.ContentType
}
}
return ObjectInfo{
Bucket: fi.Volume,
Name: fi.Name,
ModTime: fi.ModTime,
Size: fi.Size,
ContentType: contentType,
MD5Sum: newMD5Hex,
}, nil
}
func (o objectAPI) DeleteObject(bucket, object string) *probe.Error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if e := o.storage.DeleteFile(bucket, object); e != nil {
if e == errVolumeNotFound {
return probe.NewError(BucketNotFound{Bucket: bucket})
}
return probe.NewError(e)
}
return nil
}
func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != "/" {
return ListObjectsInfo{}, probe.NewError(fmt.Errorf("delimiter '%s' is not supported. Only '/' is supported", delimiter))
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", marker, prefix))
}
}
recursive := true
if delimiter == "/" {
recursive = false
}
fileInfos, eof, e := o.storage.ListFiles(bucket, prefix, marker, recursive, maxKeys)
if e != nil {
if e == errVolumeNotFound {
return ListObjectsInfo{}, probe.NewError(BucketNotFound{Bucket: bucket})
}
return ListObjectsInfo{}, probe.NewError(e)
}
if maxKeys == 0 {
return ListObjectsInfo{}, nil
}
result := ListObjectsInfo{IsTruncated: !eof}
for _, fileInfo := range fileInfos {
result.NextMarker = fileInfo.Name
if fileInfo.Mode.IsDir() {
result.Prefixes = append(result.Prefixes, fileInfo.Name)
continue
}
result.Objects = append(result.Objects, ObjectInfo{
Name: fileInfo.Name,
ModTime: fileInfo.ModTime,
Size: fileInfo.Size,
IsDir: fileInfo.Mode.IsDir(),
})
}
return result, nil
}
func (o objectAPI) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListMultipartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectPrefix(objectPrefix) {
return ListMultipartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: objectPrefix})
}
return ListMultipartsInfo{}, probe.NewError(errors.New("Not implemented"))
}
func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return "", probe.NewError(errors.New("Not implemented"))
}
func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return "", probe.NewError(errors.New("Not implemented"))
}
func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ListPartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return ListPartsInfo{}, probe.NewError(errors.New("Not implemented"))
}
func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return ObjectInfo{}, probe.NewError(errors.New("Not implemented"))
}
func (o objectAPI) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(object) {
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return probe.NewError(errors.New("Not implemented"))
}

View file

@ -20,14 +20,13 @@ import (
"bytes"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"github.com/minio/minio/pkg/probe"
)
// Testing GetObjectInfo().
@ -38,17 +37,21 @@ func TestGetObjectInfo(t *testing.T) {
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
t.Fatal(err)
// Create the obj.
fs, e := newFS(directory)
if e != nil {
t.Fatal(e)
}
obj := newObjectLayer(fs)
var err *probe.Error
// This bucket is used for testing getObjectInfo operations.
err = fs.MakeBucket("test-getobjectinfo")
err = obj.MakeBucket("test-getobjectinfo")
if err != nil {
t.Fatal(err)
}
_, err = fs.PutObject("test-getobjectinfo", "Asia/asiapics.jpg", int64(len("asiapics")), bytes.NewBufferString("asiapics"), nil)
_, err = obj.PutObject("test-getobjectinfo", "Asia/asiapics.jpg", int64(len("asiapics")), bytes.NewBufferString("asiapics"), nil)
if err != nil {
t.Fatal(err)
}
@ -76,8 +79,8 @@ func TestGetObjectInfo(t *testing.T) {
{"abcdefgh", "abc", ObjectInfo{}, BucketNotFound{Bucket: "abcdefgh"}, false},
{"ijklmnop", "efg", ObjectInfo{}, BucketNotFound{Bucket: "ijklmnop"}, false},
// Test cases with valid but non-existing bucket names and invalid object name (Test number 8-9).
{"test-getobjectinfo", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "test-getobjectinfo", Object: ""}, false},
{"test-getobjectinfo", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "test-getobjectinfo", Object: ""}, false},
{"abcdefgh", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "abcdefgh", Object: ""}, false},
{"ijklmnop", "", ObjectInfo{}, ObjectNameInvalid{Bucket: "ijklmnop", Object: ""}, false},
// Test cases with non-existing object name with existing bucket (Test number 10-12).
{"test-getobjectinfo", "Africa", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Africa"}, false},
{"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false},
@ -88,7 +91,7 @@ func TestGetObjectInfo(t *testing.T) {
{"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true},
}
for i, testCase := range testCases {
result, err := fs.GetObjectInfo(testCase.bucketName, testCase.objectName)
result, err := obj.GetObjectInfo(testCase.bucketName, testCase.objectName)
if err != nil && testCase.shouldPass {
t.Errorf("Test %d: Expected to pass, but failed with: <ERROR> %s", i+1, err.Cause.Error())
}
@ -120,107 +123,25 @@ func TestGetObjectInfo(t *testing.T) {
}
}
// Testing getObjectInfo().
func TestGetObjectInfoCore(t *testing.T) {
directory, e := ioutil.TempDir("", "minio-get-objinfo-test")
if e != nil {
t.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
t.Fatal(err)
}
// This bucket is used for testing getObjectInfo operations.
err = fs.MakeBucket("test-getobjinfo")
if err != nil {
t.Fatal(err)
}
_, err = fs.PutObject("test-getobjinfo", "Asia/asiapics.jpg", int64(len("asiapics")), bytes.NewBufferString("asiapics"), nil)
if err != nil {
t.Fatal(err)
}
resultCases := []ObjectInfo{
// ObjectInfo - 1.
// ObjectName object name set to a existing directory in the test case.
{Bucket: "test-getobjinfo", Name: "Asia", Size: 0, ContentType: "application/octet-stream", IsDir: true},
// ObjectInfo -2.
// ObjectName set to a existing object in the test case.
{Bucket: "test-getobjinfo", Name: "Asia/asiapics.jpg", Size: int64(len("asiapics")), ContentType: "image/jpeg", IsDir: false},
// ObjectInfo-3.
// Object name set to a non-existing object in the test case.
{Bucket: "test-getobjinfo", Name: "Africa", Size: 0, ContentType: "image/jpeg", IsDir: false},
}
testCases := []struct {
bucketName string
objectName string
// Expected output of getObjectInfo.
result ObjectInfo
err error
// Flag indicating whether the test is expected to pass or not.
shouldPass bool
}{
// Testcase with object name set to a existing directory ( Test number 1).
{"test-getobjinfo", "Asia", resultCases[0], nil, true},
// ObjectName set to a existing object ( Test number 2).
{"test-getobjinfo", "Asia/asiapics.jpg", resultCases[1], nil, true},
// Object name set to a non-existing object. (Test number 3).
{"test-getobjinfo", "Africa", resultCases[2], fmt.Errorf("%s", filepath.FromSlash("test-getobjinfo/Africa")), false},
}
rootPath := fs.(*Filesystem).GetRootPath()
for i, testCase := range testCases {
result, err := getObjectInfo(rootPath, testCase.bucketName, testCase.objectName)
if err != nil && testCase.shouldPass {
t.Errorf("Test %d: Expected to pass, but failed with: <ERROR> %s", i+1, err.Cause.Error())
}
if err == nil && !testCase.shouldPass {
t.Errorf("Test %d: Expected to fail with <ERROR> \"%s\", but passed instead", i+1, testCase.err.Error())
}
// Failed as expected, but does it fail for the expected reason.
if err != nil && !testCase.shouldPass {
if !strings.Contains(err.Cause.Error(), testCase.err.Error()) {
t.Errorf("Test %d: Expected to fail with error \"%s\", but instead failed with error \"%s\" instead", i+1, testCase.err.Error(), err.Cause.Error())
}
}
// Test passes as expected, but the output values are verified for correctness here.
if err == nil && testCase.shouldPass {
if testCase.result.Bucket != result.Bucket {
t.Fatalf("Test %d: Expected Bucket name to be '%s', but found '%s' instead", i+1, testCase.result.Bucket, result.Bucket)
}
if testCase.result.Name != result.Name {
t.Errorf("Test %d: Expected Object name to be %s, but instead found it to be %s", i+1, testCase.result.Name, result.Name)
}
if testCase.result.ContentType != result.ContentType {
t.Errorf("Test %d: Expected Content Type of the object to be %v, but instead found it to be %v", i+1, testCase.result.ContentType, result.ContentType)
}
if testCase.result.IsDir != result.IsDir {
t.Errorf("Test %d: Expected IsDir flag of the object to be %v, but instead found it to be %v", i+1, testCase.result.IsDir, result.IsDir)
}
}
}
}
func BenchmarkGetObject(b *testing.B) {
// Make a temporary directory to use as the fs.
// Make a temporary directory to use as the obj.
directory, e := ioutil.TempDir("", "minio-benchmark-getobject")
if e != nil {
b.Fatal(e)
}
defer os.RemoveAll(directory)
// Create the fs.
fs, err := newFS(directory)
if err != nil {
b.Fatal(err)
// Create the obj.
fs, e := newFS(directory)
if e != nil {
b.Fatal(e)
}
obj := newObjectLayer(fs)
var err *probe.Error
// Make a bucket and put in a few objects.
err = fs.MakeBucket("bucket")
err = obj.MakeBucket("bucket")
if err != nil {
b.Fatal(err)
}
@ -231,7 +152,7 @@ func BenchmarkGetObject(b *testing.B) {
metadata := make(map[string]string)
for i := 0; i < 10; i++ {
metadata["md5Sum"] = hex.EncodeToString(hasher.Sum(nil))
_, err = fs.PutObject("bucket", "object"+strconv.Itoa(i), int64(len(text)), bytes.NewBufferString(text), metadata)
_, err = obj.PutObject("bucket", "object"+strconv.Itoa(i), int64(len(text)), bytes.NewBufferString(text), metadata)
if err != nil {
b.Fatal(err)
}
@ -240,7 +161,7 @@ func BenchmarkGetObject(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var buffer = new(bytes.Buffer)
r, err := fs.GetObject("bucket", "object"+strconv.Itoa(i%10), 0)
r, err := obj.GetObject("bucket", "object"+strconv.Itoa(i%10), 0)
if err != nil {
b.Error(err)
}

View file

@ -26,14 +26,13 @@ type BucketInfo struct {
// ObjectInfo - object info.
type ObjectInfo struct {
Bucket string
Name string
ModifiedTime time.Time
ContentType string
MD5Sum string
Size int64
IsDir bool
Err error
Bucket string
Name string
ModTime time.Time
ContentType string
MD5Sum string
Size int64
IsDir bool
}
// ListPartsInfo - various types of object resources.

View file

@ -97,7 +97,7 @@ func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Requ
return
}
}
// Fetch object stat info.
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
if err != nil {
switch err.ToGoError().(type) {
@ -117,7 +117,7 @@ func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Requ
}
// Verify 'If-Modified-Since' and 'If-Unmodified-Since'.
lastModified := objInfo.ModifiedTime
lastModified := objInfo.ModTime
if checkLastModified(w, r, lastModified) {
return
}
@ -137,8 +137,15 @@ func (api objectStorageAPI) GetObjectHandler(w http.ResponseWriter, r *http.Requ
startOffset := hrange.start
readCloser, err := api.ObjectAPI.GetObject(bucket, object, startOffset)
if err != nil {
errorIf(err.Trace(), "GetObject failed.", nil)
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
switch err.ToGoError().(type) {
case BucketNotFound:
writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path)
case ObjectNotFound:
writeErrorResponse(w, r, errAllowableObjectNotFound(bucket, r), r.URL.Path)
default:
errorIf(err.Trace(), "GetObject failed.", nil)
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
}
return
}
defer readCloser.Close() // Close after this handler returns.
@ -304,7 +311,7 @@ func (api objectStorageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Req
}
// Verify 'If-Modified-Since' and 'If-Unmodified-Since'.
lastModified := objInfo.ModifiedTime
lastModified := objInfo.ModTime
if checkLastModified(w, r, lastModified) {
return
}
@ -399,7 +406,7 @@ func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Req
// Verify x-amz-copy-source-if-modified-since and
// x-amz-copy-source-if-unmodified-since.
lastModified := objInfo.ModifiedTime
lastModified := objInfo.ModTime
if checkCopySourceLastModified(w, r, lastModified) {
return
}
@ -471,7 +478,7 @@ func (api objectStorageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Req
}
return
}
response := generateCopyObjectResponse(objInfo.MD5Sum, objInfo.ModifiedTime)
response := generateCopyObjectResponse(objInfo.MD5Sum, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
// write headers
setCommonHeaders(w)

View file

@ -1 +0,0 @@
package main

View file

@ -42,8 +42,8 @@ func APITestSuite(c *check.C, create func() ObjectAPI) {
testNonExistantObjectInBucket(c, create)
testGetDirectoryReturnsObjectNotFound(c, create)
testDefaultContentType(c, create)
testMultipartObjectCreation(c, create)
testMultipartObjectAbort(c, create)
// testMultipartObjectCreation(c, create)
// testMultipartObjectAbort(c, create)
}
func testMakeBucket(c *check.C, create func() ObjectAPI) {
@ -390,22 +390,22 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectAPI)
_, err = fs.GetObject("bucket", "dir1", 0)
switch err := err.ToGoError().(type) {
case ObjectExistsAsPrefix:
case ObjectNotFound:
c.Assert(err.Bucket, check.Equals, "bucket")
c.Assert(err.Prefix, check.Equals, "dir1")
c.Assert(err.Object, check.Equals, "dir1")
default:
// force a failure with a line number
c.Assert(err.Error(), check.Equals, "Object exists on : bucket as prefix dir1")
c.Assert(err, check.Equals, "ObjectNotFound")
}
_, err = fs.GetObject("bucket", "dir1/", 0)
switch err := err.ToGoError().(type) {
case ObjectExistsAsPrefix:
case ObjectNotFound:
c.Assert(err.Bucket, check.Equals, "bucket")
c.Assert(err.Prefix, check.Equals, "dir1/")
c.Assert(err.Object, check.Equals, "dir1/")
default:
// force a failure with a line number
c.Assert(err.Error(), check.Equals, "Object exists on : bucket as prefix dir1")
c.Assert(err, check.Equals, "ObjectNotFound")
}
}

View file

@ -268,8 +268,9 @@ func serverMain(c *cli.Context) {
_, e := os.Stat(fsPath)
fatalIf(probe.NewError(e), "Unable to validate the path", nil)
// Initialize filesystem storage layer.
objectAPI, err = newFS(fsPath)
fatalIf(err.Trace(fsPath), "Initializing filesystem failed.", nil)
storage, e := newFS(fsPath)
fatalIf(probe.NewError(e), "Initializing filesystem failed.", nil)
objectAPI = newObjectLayer(storage)
}
// Configure server.

View file

@ -18,7 +18,6 @@ package main
import (
"bytes"
"crypto/md5"
"io"
"io/ioutil"
"net"
@ -99,7 +98,9 @@ func (s *MyAPISuite) SetUpSuite(c *C) {
fs, err := newFS(fsroot)
c.Assert(err, IsNil)
apiServer := configureServer(addr, fs)
obj := newObjectLayer(fs)
apiServer := configureServer(addr, obj)
testAPIFSCacheServer = httptest.NewServer(apiServer.Handler)
}
@ -1023,6 +1024,7 @@ func (s *MyAPISuite) TestGetObjectRangeErrors(c *C) {
verifyError(c, response, "InvalidRange", "The requested range cannot be satisfied.", http.StatusRequestedRangeNotSatisfiable)
}
/*
func (s *MyAPISuite) TestObjectMultipartAbort(c *C) {
request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultipartabort", 0, nil)
c.Assert(err, IsNil)
@ -1309,6 +1311,7 @@ func (s *MyAPISuite) TestObjectMultipart(c *C) {
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
}
*/
func verifyError(c *C, response *http.Response, code, description string, statusCode int) {
data, err := ioutil.ReadAll(response.Body)

View file

@ -27,7 +27,7 @@ type StorageAPI interface {
DeleteVol(volume string) (err error)
// File operations.
ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, isEOF bool, err error)
ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error)
ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error)
CreateFile(volume string, path string) (writeCloser io.WriteCloser, err error)
StatFile(volume string, path string) (file FileInfo, err error)

View file

@ -1,35 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io"
"os"
)
// isDirEmpty - returns whether given directory is empty or not.
func isDirEmpty(dirname string) (status bool, err error) {
f, err := os.Open(dirname)
if err == nil {
defer f.Close()
if _, err = f.Readdirnames(1); err == io.EOF {
status = true
err = nil
}
}
return status, err
}

View file

@ -1,19 +1,3 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
@ -21,17 +5,17 @@ import (
"time"
)
// VolInfo - volume info
type VolInfo struct {
Name string
Created time.Time
}
// FileInfo - file stat information.
type FileInfo struct {
Volume string
Name string
ModTime time.Time
Size int64
Type os.FileMode
}
// VolInfo - volume info
type VolInfo struct {
Name string
Created time.Time
Mode os.FileMode
}

34
storage-errors.go Normal file
View file

@ -0,0 +1,34 @@
/*
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "errors"
// errDiskPathFull - cannot create volume or files when disk is full.
var errDiskPathFull = errors.New("Disk path full.")
// errFileNotFound - cannot find the file.
var errFileNotFound = errors.New("File not found.")
// errVolumeExists - cannot create same volume again.
var errVolumeExists = errors.New("Volume already exists.")
// errIsNotRegular - not a regular file type.
var errIsNotRegular = errors.New("Not a regular file type.")
// errVolumeNotFound - cannot find the volume.
var errVolumeNotFound = errors.New("Volume not found.")

View file

@ -1,273 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"errors"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"syscall"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/safe"
)
// ErrDiskPathFull - cannot create volume or files when disk is full.
var ErrDiskPathFull = errors.New("Disk path full.")
// ErrVolumeExists - cannot create same volume again.
var ErrVolumeExists = errors.New("Volume already exists.")
// ErrIsNotRegular - is not a regular file type.
var ErrIsNotRegular = errors.New("Not a regular file type.")
// localStorage implements StorageAPI on top of provided diskPath.
type localStorage struct {
diskPath string
fsInfo disk.Info
minFreeDisk int64
}
// Initialize a new local storage.
func newLocalStorage(diskPath string) (StorageAPI, error) {
if diskPath == "" {
return nil, errInvalidArgument
}
st, e := os.Stat(diskPath)
if e != nil {
return nil, e
}
if !st.IsDir() {
return nil, syscall.ENOTDIR
}
info, e := disk.GetInfo(diskPath)
if e != nil {
return nil, e
}
disk := localStorage{
diskPath: diskPath,
fsInfo: info,
minFreeDisk: 5, // Minimum 5% disk should be free.
}
return disk, nil
}
// Make a volume entry.
func (s localStorage) MakeVol(volume string) error {
if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil {
return e
}
volumeDir := getVolumeDir(s.diskPath, volume)
if _, e := os.Stat(volumeDir); e == nil {
return ErrVolumeExists
}
// Make a volume entry.
if e := os.Mkdir(volumeDir, 0700); e != nil {
return e
}
return nil
}
// removeDuplicateVols - remove duplicate volumes.
func removeDuplicateVols(vols []VolInfo) []VolInfo {
length := len(vols) - 1
for i := 0; i < length; i++ {
for j := i + 1; j <= length; j++ {
if vols[i].Name == vols[j].Name {
// Pick the latest volume from a duplicate entry.
if vols[i].Created.Sub(vols[j].Created) > 0 {
vols[i] = vols[length]
} else {
vols[j] = vols[length]
}
vols = vols[0:length]
length--
j--
}
}
}
return vols
}
// ListVols - list volumes.
func (s localStorage) ListVols() ([]VolInfo, error) {
files, e := ioutil.ReadDir(s.diskPath)
if e != nil {
return nil, e
}
var volsInfo []VolInfo
for _, file := range files {
if !file.IsDir() {
// If not directory, ignore all file types.
continue
}
volInfo := VolInfo{
Name: file.Name(),
Created: file.ModTime(),
}
volsInfo = append(volsInfo, volInfo)
}
// Remove duplicated volume entries.
volsInfo = removeDuplicateVols(volsInfo)
return volsInfo, nil
}
// getVolumeDir - will convert incoming volume names to
// corresponding valid volume names on the backend in a platform
// compatible way for all operating systems.
func getVolumeDir(diskPath, volume string) string {
volumes, e := ioutil.ReadDir(diskPath)
if e != nil {
return volume
}
for _, vol := range volumes {
// Verify if lowercase version of the volume
// is equal to the incoming volume, then use the proper name.
if strings.ToLower(vol.Name()) == volume {
return filepath.Join(diskPath, vol.Name())
}
}
return filepath.Join(diskPath, volume)
}
// StatVol - get volume info.
func (s localStorage) StatVol(volume string) (VolInfo, error) {
volumeDir := getVolumeDir(s.diskPath, volume)
// Stat a volume entry.
st, e := os.Stat(volumeDir)
if e != nil {
return VolInfo{}, e
}
volInfo := VolInfo{}
volInfo.Name = st.Name()
volInfo.Created = st.ModTime()
return volInfo, nil
}
// DeleteVol - delete a volume.
func (s localStorage) DeleteVol(volume string) error {
return os.Remove(getVolumeDir(s.diskPath, volume))
}
/// File operations.
// ListFiles - list files are prefix and marker.
func (s localStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, isEOF bool, err error) {
// TODO
return files, true, nil
}
// ReadFile - read a file at a given offset.
func (s localStorage) ReadFile(volume string, path string, offset int64) (io.ReadCloser, error) {
filePath := filepath.Join(getVolumeDir(s.diskPath, volume), path)
file, e := os.Open(filePath)
if e != nil {
return nil, e
}
st, e := file.Stat()
if e != nil {
return nil, e
}
// Verify if its not a regular file, since subsequent Seek is undefined.
if !st.Mode().IsRegular() {
return nil, ErrIsNotRegular
}
_, e = file.Seek(offset, os.SEEK_SET)
if e != nil {
return nil, e
}
return file, nil
}
// CreateFile - create a file at path.
func (s localStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil {
return nil, e
}
filePath := filepath.Join(getVolumeDir(s.diskPath, volume), path)
// Creates a safe file.
return safe.CreateFileWithPrefix(filePath, "$tmpfile")
}
// StatFile - get file info.
func (s localStorage) StatFile(volume, path string) (file FileInfo, err error) {
filePath := filepath.Join(getVolumeDir(s.diskPath, volume), path)
st, e := os.Stat(filePath)
if e != nil {
return FileInfo{}, e
}
file = FileInfo{
Volume: volume,
Name: st.Name(),
ModTime: st.ModTime(),
Size: st.Size(),
Type: st.Mode(),
}
return file, nil
}
// deleteFile - delete file path if its empty.
func deleteFile(basePath, deletePath, volume, path string) error {
if basePath == deletePath {
return nil
}
// Verify if the path exists.
pathSt, e := os.Stat(deletePath)
if e != nil {
return e
}
if pathSt.IsDir() {
// Verify if directory is empty.
empty, e := isDirEmpty(deletePath)
if e != nil {
return e
}
if !empty {
return nil
}
}
// Attempt to remove path.
if e := os.Remove(deletePath); e != nil {
return e
}
// Recursively go down the next path and delete again.
if e := deleteFile(basePath, filepath.Dir(deletePath), volume, path); e != nil {
return e
}
return nil
}
// DeleteFile - delete a file at path.
func (s localStorage) DeleteFile(volume, path string) error {
volumeDir := getVolumeDir(s.diskPath, volume)
// Following code is needed so that we retain "/" suffix if any
// in path argument. Do not use filepath.Join() since it would
// strip off any suffixes.
filePath := s.diskPath + string(os.PathSeparator) + volume + string(os.PathSeparator) + path
// Convert to platform friendly paths.
filePath = filepath.FromSlash(filePath)
// Delete file and delete parent directory as well if its empty.
return deleteFile(volumeDir, filePath, volume, path)
}

178
storage-network.go Normal file
View file

@ -0,0 +1,178 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"errors"
"io"
"net"
"net/http"
"net/rpc"
"time"
)
type networkStorage struct {
address string
connection *rpc.Client
httpClient *http.Client
}
const (
connected = "200 Connected to Go RPC"
dialTimeoutSecs = 30 // 30 seconds.
)
// Initialize new network storage.
func newNetworkStorage(address string) (StorageAPI, error) {
// Dial to the address with timeout of 30secs, this includes DNS resolution.
conn, err := net.DialTimeout("tcp", address, dialTimeoutSecs*time.Second)
if err != nil {
return nil, err
}
// Initialize rpc client with dialed connection.
rpcClient := rpc.NewClient(conn)
// Initialize http client.
httpClient := &http.Client{
// Setting a sensible time out of 2minutes to wait for
// response headers. Request is pro-actively cancelled
// after 2minutes if no response was received from server.
Timeout: 2 * time.Minute,
Transport: http.DefaultTransport,
}
// Initialize network storage.
ndisk := &networkStorage{
address: address,
connection: rpcClient,
httpClient: httpClient,
}
// Returns successfully here.
return ndisk, nil
}
// MakeVol - make a volume.
func (n networkStorage) MakeVol(volume string) error {
reply := GenericReply{}
return n.connection.Call("Storage.MakeVolHandler", volume, &reply)
}
// ListVols - List all volumes.
func (n networkStorage) ListVols() (vols []VolInfo, err error) {
ListVols := ListVolsReply{}
err = n.connection.Call("Storage.ListVolsHandler", "", &ListVols)
if err != nil {
return nil, err
}
return ListVols.Vols, nil
}
// StatVol - get current Stat volume info.
func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
if err = n.connection.Call("Storage.StatVolHandler", volume, &volInfo); err != nil {
return VolInfo{}, err
}
return volInfo, nil
}
// DeleteVol - Delete a volume.
func (n networkStorage) DeleteVol(volume string) error {
reply := GenericReply{}
return n.connection.Call("Storage.DeleteVolHandler", volume, &reply)
}
// File operations.
// CreateFile - create file.
func (n networkStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
createFileReply := CreateFileReply{}
if err = n.connection.Call("Storage.CreateFileHandler", CreateFileArgs{
Vol: volume,
Path: path,
}, &createFileReply); err != nil {
return nil, err
}
contentType := "application/octet-stream"
readCloser, writeCloser := io.Pipe()
defer readCloser.Close()
go n.httpClient.Post(createFileReply.URL, contentType, readCloser)
return writeCloser, nil
}
// StatFile - get latest Stat information for a file at path.
func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) {
if err = n.connection.Call("Storage.StatFileHandler", StatFileArgs{
Vol: volume,
Path: path,
}, &fileInfo); err != nil {
return FileInfo{}, err
}
return fileInfo, nil
}
// ReadFile - reads a file.
func (n networkStorage) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) {
readFileReply := ReadFileReply{}
if err = n.connection.Call("Storage.ReadFileHandler", ReadFileArgs{
Vol: volume,
Path: path,
Offset: offset,
}, &readFileReply); err != nil {
return nil, err
}
resp, err := n.httpClient.Get(readFileReply.URL)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, errors.New("Invalid response")
}
return resp.Body, nil
}
// ListFiles - List all files in a volume.
func (n networkStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) {
listFilesReply := ListFilesReply{}
if err = n.connection.Call("Storage.ListFilesHandler", ListFilesArgs{
Vol: volume,
Prefix: prefix,
Marker: marker,
Recursive: recursive,
Count: count,
}, &listFilesReply); err != nil {
return nil, true, err
}
// List of files.
files = listFilesReply.Files
// EOF.
eof = listFilesReply.EOF
return files, eof, nil
}
// DeleteFile - Delete a file at path.
func (n networkStorage) DeleteFile(volume, path string) (err error) {
reply := GenericReply{}
if err = n.connection.Call("Storage.DeleteFileHandler", DeleteFileArgs{
Vol: volume,
Path: path,
}, &reply); err != nil {
return err
}
return nil
}

78
storage-rpc-datatypes.go Normal file
View file

@ -0,0 +1,78 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
// GenericReply generic rpc reply.
type GenericReply struct{}
// GenericArgs generic rpc args.
type GenericArgs struct{}
// ListVolsReply list vols rpc reply.
type ListVolsReply struct {
Vols []VolInfo
}
// ListFilesArgs list file args.
type ListFilesArgs struct {
Vol string
Prefix string
Marker string
Recursive bool
Count int
}
// ListFilesReply list file reply.
type ListFilesReply struct {
Files []FileInfo
EOF bool
}
// ReadFileArgs read file args.
type ReadFileArgs struct {
Vol string
Path string
Offset int64
}
// ReadFileReply read file reply.
type ReadFileReply struct {
URL string
}
// CreateFileArgs create file args.
type CreateFileArgs struct {
Vol string
Path string
}
// CreateFileReply create file reply.
type CreateFileReply struct {
URL string
}
// StatFileArgs stat file args.
type StatFileArgs struct {
Vol string
Path string
}
// DeleteFileArgs delete file args.
type DeleteFileArgs struct {
Vol string
Path string
}

165
storage-rpc-server.go Normal file
View file

@ -0,0 +1,165 @@
package main
import (
"fmt"
"io"
"net/http"
"net/rpc"
"net/url"
"os"
"path"
"strconv"
router "github.com/gorilla/mux"
"github.com/minio/minio/pkg/probe"
"github.com/minio/minio/pkg/safe"
)
// Storage server implements rpc primitives to facilitate exporting a
// disk over a network.
type storageServer struct {
storage StorageAPI
}
/// Volume operations handlers
// MakeVolHandler - make vol handler is rpc wrapper for MakeVol operation.
func (s *storageServer) MakeVolHandler(arg *string, reply *GenericReply) error {
return s.storage.MakeVol(*arg)
}
// ListVolsHandler - list vols handler is rpc wrapper for ListVols operation.
func (s *storageServer) ListVolsHandler(arg *string, reply *ListVolsReply) error {
vols, err := s.storage.ListVols()
if err != nil {
return err
}
reply.Vols = vols
return nil
}
// StatVolHandler - stat vol handler is a rpc wrapper for StatVol operation.
func (s *storageServer) StatVolHandler(arg *string, reply *VolInfo) error {
volInfo, err := s.storage.StatVol(*arg)
if err != nil {
return err
}
*reply = volInfo
return nil
}
// DeleteVolHandler - delete vol handler is a rpc wrapper for
// DeleteVol operation.
func (s *storageServer) DeleteVolHandler(arg *string, reply *GenericReply) error {
return s.storage.DeleteVol(*arg)
}
/// File operations
// ListFilesHandler - list files handler.
func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesReply) error {
files, eof, err := s.storage.ListFiles(arg.Vol, arg.Prefix, arg.Marker, arg.Recursive, arg.Count)
if err != nil {
return err
}
reply.Files = files
reply.EOF = eof
return nil
}
// ReadFileHandler - read file handler is a wrapper to provide
// destination URL for reading files.
func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *ReadFileReply) error {
endpoint := "http://localhost:9000/minio/rpc/storage" // TODO fix this.
newURL, err := url.Parse(fmt.Sprintf("%s/%s", endpoint, path.Join(arg.Vol, arg.Path)))
if err != nil {
return err
}
q := newURL.Query()
q.Set("offset", fmt.Sprintf("%d", arg.Offset))
newURL.RawQuery = q.Encode()
reply.URL = newURL.String()
return nil
}
// CreateFileHandler - create file handler is rpc wrapper to create file.
func (s *storageServer) CreateFileHandler(arg *CreateFileArgs, reply *CreateFileReply) error {
endpoint := "http://localhost:9000/minio/rpc/storage" // TODO fix this.
newURL, err := url.Parse(fmt.Sprintf("%s/%s", endpoint, path.Join(arg.Vol, arg.Path)))
if err != nil {
return err
}
reply.URL = newURL.String()
return nil
}
// StatFileHandler - stat file handler is rpc wrapper to stat file.
func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) error {
fileInfo, err := s.storage.StatFile(arg.Vol, arg.Path)
if err != nil {
return err
}
*reply = fileInfo
return nil
}
// DeleteFileHandler - delete file handler is rpc wrapper to delete file.
func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error {
return s.storage.DeleteFile(arg.Vol, arg.Path)
}
// StreamUpload - stream upload handler.
func (s *storageServer) StreamUploadHandler(w http.ResponseWriter, r *http.Request) {
vars := router.Vars(r)
volume := vars["volume"]
path := vars["path"]
writeCloser, err := s.storage.CreateFile(volume, path)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reader := r.Body
if _, err = io.Copy(writeCloser, reader); err != nil {
writeCloser.(*safe.File).CloseAndRemove()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeCloser.Close()
}
// StreamDownloadHandler - stream download handler.
func (s *storageServer) StreamDownloadHandler(w http.ResponseWriter, r *http.Request) {
vars := router.Vars(r)
volume := vars["volume"]
path := vars["path"]
offset, err := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
readCloser, err := s.storage.ReadFile(volume, path, offset)
if err != nil {
httpErr := http.StatusBadRequest
if os.IsNotExist(err) {
httpErr = http.StatusNotFound
}
http.Error(w, err.Error(), httpErr)
return
}
io.Copy(w, readCloser)
}
func registerStorageServer(mux *router.Router, diskPath string) {
// Minio storage routes.
fs, e := newFS(diskPath)
fatalIf(probe.NewError(e), "Unable to initialize storage disk.", nil)
storageRPCServer := rpc.NewServer()
stServer := &storageServer{
storage: fs,
}
storageRPCServer.RegisterName("Storage", stServer)
storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
storageRouter.Path("/rpc/storage").Handler(storageRPCServer)
storageRouter.Methods("POST").Path("/rpc/storage/upload/{volume}/{path:.+}").HandlerFunc(stServer.StreamUploadHandler)
storageRouter.Methods("GET").Path("/rpc/storage/download/{volume}/{path:.+}").Queries("offset", "").HandlerFunc(stServer.StreamDownloadHandler)
}

View file

@ -107,15 +107,16 @@ type DiskInfoRep struct {
// DiskInfo - get disk statistics.
func (web *webAPI) DiskInfo(r *http.Request, args *WebGenericArgs, reply *DiskInfoRep) error {
if !isJWTReqAuthenticated(r) {
return &json2.Error{Message: "Unauthorized request"}
}
info, e := disk.GetInfo(web.ObjectAPI.(*Filesystem).GetRootPath())
if e != nil {
return &json2.Error{Message: e.Error()}
}
reply.DiskInfo = info
reply.UIVersion = miniobrowser.UIVersion
// FIXME: bring in StatFS in StorageAPI interface and uncomment the below lines.
// if !isJWTReqAuthenticated(r) {
// return &json2.Error{Message: "Unauthorized request"}
// }
// info, e := disk.GetInfo(web.ObjectAPI.(*Filesystem).GetRootPath())
// if e != nil {
// return &json2.Error{Message: e.Error()}
// }
// reply.DiskInfo = info
// reply.UIVersion = miniobrowser.UIVersion
return nil
}
@ -212,7 +213,7 @@ func (web *webAPI) ListObjects(r *http.Request, args *ListObjectsArgs, reply *Li
for _, obj := range lo.Objects {
reply.Objects = append(reply.Objects, WebObjectInfo{
Key: obj.Name,
LastModified: obj.ModifiedTime,
LastModified: obj.ModTime,
Size: obj.Size,
})
}