diff --git a/Makefile b/Makefile index dd20f5679..357ae5d65 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ build-split: build-strbyteconv build-strbyteconv: @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/strbyteconv -build-storage: build-storage-fs build-storage-append +build-storage: build-storage-fs build-storage-append build-storage-encoded @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage build-storage-fs: @@ -41,6 +41,9 @@ build-storage-fs: build-storage-append: @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage/appendstorage +build-storage-encoded: + @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage/encodedstorage + cover: build-erasure build-signify build-split build-crc32c build-cpu build-sha1 build-storage @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/gateway diff --git a/cmd/erasure-demo/erasure.go b/cmd/erasure-demo/erasure.go new file mode 100644 index 000000000..0c7d1776c --- /dev/null +++ b/cmd/erasure-demo/erasure.go @@ -0,0 +1,46 @@ +package main + +import ( + "errors" + "io" + "os" + "path" + + "github.com/minio-io/minio/pkgs/storage" + es "github.com/minio-io/minio/pkgs/storage/encodedstorage" +) + +func erasureGetList(config inputConfig) (io.Reader, error) { + // do nothing + return nil, errors.New("Not Implemented") +} + +func erasureGet(config inputConfig, objectPath string) (io.Reader, error) { + var objectStorage storage.ObjectStorage + rootDir := path.Join(config.rootDir, config.storageDriver) + objectStorage, err := es.NewStorage(rootDir, 10, 6, 1024*1024) + if err != nil { + return nil, err + } + object, err := objectStorage.Get(objectPath) + if err != nil { + return nil, err + } + return object, nil +} + +func erasurePut(config inputConfig, objectPath string, reader io.Reader) error { + var err error + rootDir := path.Join(config.rootDir, config.storageDriver) + if err := os.MkdirAll(rootDir, 0700); err != nil { + return err + } + var objectStorage storage.ObjectStorage + if objectStorage, err = es.NewStorage(rootDir, 10, 6, 1024*1024); err != nil { + return err + } + if err = objectStorage.Put(objectPath, reader); err != nil { + return err + } + return nil +} diff --git a/cmd/erasure-demo/get.go b/cmd/erasure-demo/get.go index 017b99d77..9718ea3b7 100644 --- a/cmd/erasure-demo/get.go +++ b/cmd/erasure-demo/get.go @@ -29,6 +29,12 @@ func get(c *cli.Context) { } } } + case "erasure": + { + if objectReader, err = erasureGet(config, objectName); err != nil { + log.Fatal(err) + } + } default: { log.Fatal("Unknown driver") diff --git a/cmd/erasure-demo/put.go b/cmd/erasure-demo/put.go index 923566089..01d69145e 100644 --- a/cmd/erasure-demo/put.go +++ b/cmd/erasure-demo/put.go @@ -29,7 +29,15 @@ func put(c *cli.Context) { switch config.storageDriver { case "fs": { - fsPut(config, c.Args().Get(0), inputFile) + if err := fsPut(config, c.Args().Get(0), inputFile); err != nil { + log.Fatal(err) + } + } + case "erasure": + { + if err := erasurePut(config, c.Args().Get(0), inputFile); err != nil { + log.Fatal(err) + } } default: { diff --git a/pkgs/storage/encodedstorage/encoded_storage.go b/pkgs/storage/encodedstorage/encoded_storage.go new file mode 100644 index 000000000..767b9f964 --- /dev/null +++ b/pkgs/storage/encodedstorage/encoded_storage.go @@ -0,0 +1,232 @@ +package encodedstorage + +import ( + "bytes" + "encoding/gob" + "errors" + "io" + "io/ioutil" + "os" + "path" + "strconv" + + "github.com/minio-io/minio/pkgs/erasure" + "github.com/minio-io/minio/pkgs/split" + "github.com/minio-io/minio/pkgs/storage" + "github.com/minio-io/minio/pkgs/storage/appendstorage" +) + +type encodedStorage struct { + RootDir string + K int + M int + BlockSize uint64 + objects map[string]StorageEntry + diskStorage []storage.ObjectStorage +} + +func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) { + // create storage files + storageNodes := make([]storage.ObjectStorage, 16) + for i := 0; i < 16; i++ { + storageNode, err := appendstorage.NewStorage(rootDir, i) + storageNodes[i] = storageNode + if err != nil { + return nil, err + } + } + objects := make(map[string]StorageEntry) + indexPath := path.Join(rootDir, "index") + if _, err := os.Stat(indexPath); err == nil { + indexFile, err := os.Open(indexPath) + defer indexFile.Close() + if err != nil { + return nil, err + } + encoder := gob.NewDecoder(indexFile) + err = encoder.Decode(&objects) + if err != nil { + return nil, err + } + } + newStorage := encodedStorage{ + RootDir: rootDir, + K: k, + M: m, + BlockSize: blockSize, + objects: objects, + diskStorage: storageNodes, + } + return &newStorage, nil +} + +func (eStorage *encodedStorage) Get(objectPath string) (io.Reader, error) { + entry, ok := eStorage.objects[objectPath] + if ok == false { + return nil, nil + } + reader, writer := io.Pipe() + go eStorage.readObject(objectPath, entry, writer) + return reader, nil +} + +func (eStorage *encodedStorage) List(listPath string) ([]storage.ObjectDescription, error) { + return nil, errors.New("Not Implemented") +} + +func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error { + // split + chunks := make(chan split.SplitMessage) + go split.SplitStream(object, eStorage.BlockSize, chunks) + + // for each chunk + encoderParameters, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY) + if err != nil { + return err + } + encoder := erasure.NewEncoder(encoderParameters) + entry := StorageEntry{ + Path: objectPath, + Md5sum: "md5sum", + Crc: 24, + Blocks: make([]StorageBlockEntry, 0), + } + i := 0 + // encode + for chunk := range chunks { + if chunk.Err == nil { + // encode each + blocks, length := encoder.Encode(chunk.Data) + // store each + storeErrors := eStorage.storeBlocks(objectPath+"$"+strconv.Itoa(i), blocks) + for _, err := range storeErrors { + if err != nil { + return err + } + } + blockEntry := StorageBlockEntry{ + Index: i, + Length: length, + } + entry.Blocks = append(entry.Blocks, blockEntry) + } else { + return chunk.Err + } + i++ + } + eStorage.objects[objectPath] = entry + var gobBuffer bytes.Buffer + gobEncoder := gob.NewEncoder(&gobBuffer) + gobEncoder.Encode(eStorage.objects) + ioutil.WriteFile(path.Join(eStorage.RootDir, "index"), gobBuffer.Bytes(), 0600) + return nil +} + +type storeRequest struct { + path string + data []byte +} + +type storeResponse struct { + data []byte + err error +} + +type StorageEntry struct { + Path string + Md5sum string + Crc uint32 + Blocks []StorageBlockEntry +} + +type StorageBlockEntry struct { + Index int + Length int +} + +func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []error { + returnChannels := make([]<-chan error, len(eStorage.diskStorage)) + for i, store := range eStorage.diskStorage { + returnChannels[i] = storageRoutine(store, path, bytes.NewBuffer(blocks[i])) + } + returnErrors := make([]error, 0) + for _, returnChannel := range returnChannels { + for returnValue := range returnChannel { + if returnValue != nil { + returnErrors = append(returnErrors, returnValue) + } + } + } + return returnErrors +} + +func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { + params, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY) + if err != nil { + } + encoder := erasure.NewEncoder(params) + for i, chunk := range entry.Blocks { + blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i)) + var blocks [][]byte + for _, slice := range blockSlices { + if slice.err != nil { + writer.CloseWithError(err) + return + } + blocks = append(blocks, slice.data) + } + data, err := encoder.Decode(blocks, chunk.Length) + if err != nil { + writer.CloseWithError(err) + return + } + bytesWritten := 0 + for bytesWritten != len(data) { + written, err := writer.Write(data[bytesWritten:len(data)]) + if err != nil { + writer.CloseWithError(err) + } + bytesWritten += written + } + } + writer.Close() +} + +func (eStorage *encodedStorage) getBlockSlices(objectPath string) []storeResponse { + responses := make([]<-chan storeResponse, 0) + for i := 0; i < len(eStorage.diskStorage); i++ { + response := getSlice(eStorage.diskStorage[i], objectPath) + responses = append(responses, response) + } + results := make([]storeResponse, 0) + for _, response := range responses { + results = append(results, <-response) + } + return results +} + +func getSlice(store storage.ObjectStorage, path string) <-chan storeResponse { + out := make(chan storeResponse) + go func() { + obj, err := store.Get(path) + if err != nil { + out <- storeResponse{data: nil, err: err} + } else { + data, err := ioutil.ReadAll(obj) + out <- storeResponse{data: data, err: err} + } + close(out) + }() + return out +} + +func storageRoutine(store storage.ObjectStorage, path string, data io.Reader) <-chan error { + out := make(chan error) + go func() { + if err := store.Put(path, data); err != nil { + out <- err + } + close(out) + }() + return out +} diff --git a/pkgs/storage/encodedstorage/encoded_storage_test.go b/pkgs/storage/encodedstorage/encoded_storage_test.go new file mode 100644 index 000000000..73b589baa --- /dev/null +++ b/pkgs/storage/encodedstorage/encoded_storage_test.go @@ -0,0 +1,124 @@ +package encodedstorage + +import ( + "bytes" + "io/ioutil" + "os" + "strconv" + "testing" + + "github.com/minio-io/minio/pkgs/storage" + . "gopkg.in/check.v1" +) + +type EncodedStorageSuite struct{} + +var _ = Suite(&EncodedStorageSuite{}) + +func Test(t *testing.T) { TestingT(t) } + +func makeTempTestDir() (string, error) { + return ioutil.TempDir("/tmp", "minio-test-") +} + +func (s *EncodedStorageSuite) TestFileStoragePutAtRootPath(c *C) { + rootDir, err := makeTempTestDir() + c.Assert(err, IsNil) + defer os.RemoveAll(rootDir) + + var objectStorage storage.ObjectStorage + objectStorage, err = NewStorage(rootDir, 10, 6, 1024) + c.Assert(err, IsNil) + objectBuffer := bytes.NewBuffer([]byte("object1")) + objectStorage.Put("path1", objectBuffer) + + // assert object1 was created in correct path + objectResult1, err := objectStorage.Get("path1") + c.Assert(err, IsNil) + object1, _ := ioutil.ReadAll(objectResult1) + c.Assert(string(object1), Equals, "object1") + + // objectList, err := objectStorage.List("/") + // c.Assert(err, IsNil) + // c.Assert(objectList[0].Path, Equals, "path1") +} + +func (s *EncodedStorageSuite) TestFileStoragePutDirPath(c *C) { + rootDir, err := makeTempTestDir() + c.Assert(err, IsNil) + defer os.RemoveAll(rootDir) + + var objectStorage storage.ObjectStorage + objectStorage, err = NewStorage(rootDir, 10, 6, 1024) + c.Assert(err, IsNil) + + objectBuffer1 := bytes.NewBuffer([]byte("object1")) + objectStorage.Put("path1/path2/path3", objectBuffer1) + + // assert object1 was created in correct path + objectResult1, err := objectStorage.Get("path1/path2/path3") + c.Assert(err, IsNil) + object1, _ := ioutil.ReadAll(objectResult1) + c.Assert(string(object1), Equals, "object1") + + // add second object + objectBuffer2 := bytes.NewBuffer([]byte("object2")) + err = objectStorage.Put("path2/path2/path2", objectBuffer2) + c.Assert(err, IsNil) + + // add third object + objectBuffer3 := bytes.NewBuffer([]byte("object3")) + err = objectStorage.Put("object3", objectBuffer3) + c.Assert(err, IsNil) + + // TODO support list + // objectList, err := objectStorage.List("/") + // c.Assert(err, IsNil) + // c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "object3", IsDir: false, Hash: ""}) + // c.Assert(objectList[1], Equals, storage.ObjectDescription{Path: "path1", IsDir: true, Hash: ""}) + // c.Assert(objectList[2], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""}) + // c.Assert(len(objectList), Equals, 3) + // + // objectList, err = objectStorage.List("/path1") + // c.Assert(err, IsNil) + // c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""}) + // c.Assert(len(objectList), Equals, 1) + // + // objectList, err = objectStorage.List("/path1/path2") + // c.Assert(err, IsNil) + // c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path3", IsDir: false, Hash: ""}) + // c.Assert(len(objectList), Equals, 1) + // + // objectList, err = objectStorage.List("/path1/path2/path3") + // c.Assert(err, Not(IsNil)) + // c.Assert(objectList, IsNil) +} + +func (s *EncodedStorageSuite) TestObjectWithChunking(c *C) { + rootDir, err := makeTempTestDir() + c.Assert(err, IsNil) + defer os.RemoveAll(rootDir) + + var objectStorage storage.ObjectStorage + objectStorage, err = NewStorage(rootDir, 10, 6, 1024) + c.Assert(err, IsNil) + + var buffer bytes.Buffer + for i := 0; i <= 2048; i++ { + buffer.Write([]byte(strconv.Itoa(i))) + } + + reader := bytes.NewReader(buffer.Bytes()) + + err = objectStorage.Put("object", reader) + c.Assert(err, IsNil) + + objectStorage2, err := NewStorage(rootDir, 10, 6, 1024) + c.Assert(err, IsNil) + objectResult, err := objectStorage2.Get("object") + c.Assert(err, IsNil) + result, err := ioutil.ReadAll(objectResult) + c.Assert(err, IsNil) + c.Assert(bytes.Compare(result, buffer.Bytes()), Equals, 0) + +}