From 2fa9320df24d0d4d8ef5db208913c076c664a049 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 22 Mar 2015 15:14:06 -0700 Subject: [PATCH] De-couple donut into smaller files, useful for ease in external integration --- pkg/storage/donut/donutdriver.go | 278 ------------------------------- pkg/storage/donut/donutwriter.go | 88 ++++++++++ pkg/storage/donut/erasure.go | 137 +++++++++++++++ pkg/storage/donut/local.go | 76 +++++++++ 4 files changed, 301 insertions(+), 278 deletions(-) create mode 100644 pkg/storage/donut/donutwriter.go create mode 100644 pkg/storage/donut/erasure.go create mode 100644 pkg/storage/donut/local.go diff --git a/pkg/storage/donut/donutdriver.go b/pkg/storage/donut/donutdriver.go index 798dca56a..c68ea2f08 100644 --- a/pkg/storage/donut/donutdriver.go +++ b/pkg/storage/donut/donutdriver.go @@ -1,21 +1,11 @@ package donut import ( - "bytes" - "encoding/json" "errors" "io" - "io/ioutil" - "os" - "path" "sort" "strconv" "strings" - "time" - - "github.com/minio-io/minio/pkg/encoding/erasure" - "github.com/minio-io/minio/pkg/utils/split" - "path/filepath" ) type donutDriver struct { @@ -140,271 +130,3 @@ func (driver donutDriver) ListObjects(bucketName string) ([]string, error) { } return nil, errors.New("Bucket not found") } - -func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { - totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"]) - totalLeft, _ := strconv.Atoi(donutMetadata["totalLength"]) - blockSize, _ := strconv.Atoi(donutMetadata["blockSize"]) - params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) - encoder := erasure.NewEncoder(params) - for _, reader := range readers { - defer reader.Close() - } - for i := 0; i < totalChunks; i++ { - encodedBytes := make([][]byte, 16) - for i, reader := range readers { - var bytesBuffer bytes.Buffer - io.Copy(&bytesBuffer, reader) - encodedBytes[i] = bytesBuffer.Bytes() - } - curBlockSize := totalLeft - if blockSize < totalLeft { - curBlockSize = blockSize - } - decodedData, err := encoder.Decode(encodedBytes, curBlockSize) - if err != nil { - writer.CloseWithError(err) - return - } - io.Copy(writer, bytes.NewBuffer(decodedData)) - totalLeft = totalLeft - blockSize - } - writer.Close() -} - -// erasure writer - -type erasureWriter struct { - writers []Writer - metadata map[string]string - donutMetadata map[string]string // not exposed - erasureWriter *io.PipeWriter - isClosed <-chan bool -} - -func newErasureWriter(writers []Writer) ObjectWriter { - r, w := io.Pipe() - isClosed := make(chan bool) - writer := erasureWriter{ - writers: writers, - metadata: make(map[string]string), - erasureWriter: w, - isClosed: isClosed, - } - go erasureGoroutine(r, writer, isClosed) - return writer -} - -func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- bool) { - chunks := split.Stream(r, 10*1024*1024) - params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) - encoder := erasure.NewEncoder(params) - chunkCount := 0 - totalLength := 0 - for chunk := range chunks { - if chunk.Err == nil { - totalLength = totalLength + len(chunk.Data) - encodedBlocks, _ := encoder.Encode(chunk.Data) - for blockIndex, block := range encodedBlocks { - io.Copy(eWriter.writers[blockIndex], bytes.NewBuffer(block)) - } - } - chunkCount = chunkCount + 1 - } - metadata := make(map[string]string) - metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024) - metadata["chunkCount"] = strconv.Itoa(chunkCount) - metadata["created"] = time.Now().Format(time.RFC3339Nano) - metadata["erasureK"] = "8" - metadata["erasureM"] = "8" - metadata["erasureTechnique"] = "Cauchy" - metadata["totalLength"] = strconv.Itoa(totalLength) - for _, nodeWriter := range eWriter.writers { - if nodeWriter != nil { - nodeWriter.SetMetadata(eWriter.metadata) - nodeWriter.SetDonutMetadata(metadata) - nodeWriter.Close() - } - } - isClosed <- true -} - -func (d erasureWriter) Write(data []byte) (int, error) { - io.Copy(d.erasureWriter, bytes.NewBuffer(data)) - return len(data), nil -} - -func (d erasureWriter) Close() error { - d.erasureWriter.Close() - <-d.isClosed - return nil -} - -func (d erasureWriter) CloseWithError(err error) error { - for _, writer := range d.writers { - if writer != nil { - writer.CloseWithError(err) - } - } - return nil -} - -func (d erasureWriter) SetMetadata(metadata map[string]string) error { - for k := range d.metadata { - delete(d.metadata, k) - } - for k, v := range metadata { - d.metadata[k] = v - } - return nil -} - -func (d erasureWriter) GetMetadata() (map[string]string, error) { - metadata := make(map[string]string) - for k, v := range d.metadata { - metadata[k] = v - } - return metadata, nil -} - -type localDirectoryNode struct { - root string -} - -func (node localDirectoryNode) GetBuckets() ([]string, error) { - return nil, errors.New("Not Implemented") -} - -func (node localDirectoryNode) GetWriter(bucket, object string) (Writer, error) { - objectPath := path.Join(node.root, bucket, object) - err := os.MkdirAll(objectPath, 0700) - if err != nil { - return nil, err - } - return newDonutFileWriter(objectPath) -} - -func (node localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) { - return os.Open(path.Join(node.root, bucket, object, "data")) -} - -func (node localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) { - return node.getMetadata(bucket, object, "metadata.json") -} -func (node localDirectoryNode) GetDonutMetadata(bucket, object string) (map[string]string, error) { - return node.getMetadata(bucket, object, "donutMetadata.json") -} - -func (node localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) { - file, err := os.Open(path.Join(node.root, bucket, object, fileName)) - defer file.Close() - if err != nil { - return nil, err - } - metadata := make(map[string]string) - decoder := json.NewDecoder(file) - if err := decoder.Decode(&metadata); err != nil { - return nil, err - } - return metadata, nil -} - -func (node localDirectoryNode) ListObjects(bucketName string) ([]string, error) { - prefix := path.Join(node.root, bucketName) - var objects []string - if err := filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() && strings.HasSuffix(path, "data") { - object := strings.TrimPrefix(path, prefix+"/") - object = strings.TrimSuffix(object, "/data") - objects = append(objects, object) - } - return nil - }); err != nil { - return nil, err - } - sort.Strings(objects) - return objects, nil -} - -func newDonutFileWriter(objectDir string) (Writer, error) { - dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return nil, err - } - return donutFileWriter{ - root: objectDir, - file: dataFile, - metadata: make(map[string]string), - donutMetadata: make(map[string]string), - }, nil -} - -type donutFileWriter struct { - root string - file *os.File - metadata map[string]string - donutMetadata map[string]string - err error -} - -func (d donutFileWriter) Write(data []byte) (int, error) { - return d.file.Write(data) -} - -func (d donutFileWriter) Close() error { - if d.err != nil { - return d.err - } - metadata, _ := json.Marshal(d.metadata) - ioutil.WriteFile(path.Join(d.root, "metadata.json"), metadata, 0600) - donutMetadata, _ := json.Marshal(d.donutMetadata) - ioutil.WriteFile(path.Join(d.root, "donutMetadata.json"), donutMetadata, 0600) - - return d.file.Close() -} - -func (d donutFileWriter) CloseWithError(err error) error { - if d.err != nil { - d.err = err - } - return d.Close() -} - -func (d donutFileWriter) SetMetadata(metadata map[string]string) error { - for k := range d.metadata { - delete(d.metadata, k) - } - for k, v := range metadata { - d.metadata[k] = v - } - return nil -} - -func (d donutFileWriter) GetMetadata() (map[string]string, error) { - metadata := make(map[string]string) - for k, v := range d.metadata { - metadata[k] = v - } - return metadata, nil -} - -func (d donutFileWriter) SetDonutMetadata(metadata map[string]string) error { - for k := range d.donutMetadata { - delete(d.donutMetadata, k) - } - for k, v := range metadata { - d.donutMetadata[k] = v - } - return nil -} - -func (d donutFileWriter) GetDonutMetadata() (map[string]string, error) { - donutMetadata := make(map[string]string) - for k, v := range d.donutMetadata { - donutMetadata[k] = v - } - return donutMetadata, nil -} diff --git a/pkg/storage/donut/donutwriter.go b/pkg/storage/donut/donutwriter.go new file mode 100644 index 000000000..4ef68f358 --- /dev/null +++ b/pkg/storage/donut/donutwriter.go @@ -0,0 +1,88 @@ +package donut + +import ( + "encoding/json" + "io/ioutil" + "os" + "path" +) + +func newDonutFileWriter(objectDir string) (Writer, error) { + dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return nil, err + } + return donutFileWriter{ + root: objectDir, + file: dataFile, + metadata: make(map[string]string), + donutMetadata: make(map[string]string), + }, nil +} + +type donutFileWriter struct { + root string + file *os.File + metadata map[string]string + donutMetadata map[string]string + err error +} + +func (d donutFileWriter) Write(data []byte) (int, error) { + return d.file.Write(data) +} + +func (d donutFileWriter) Close() error { + if d.err != nil { + return d.err + } + metadata, _ := json.Marshal(d.metadata) + ioutil.WriteFile(path.Join(d.root, "metadata.json"), metadata, 0600) + donutMetadata, _ := json.Marshal(d.donutMetadata) + ioutil.WriteFile(path.Join(d.root, "donutMetadata.json"), donutMetadata, 0600) + + return d.file.Close() +} + +func (d donutFileWriter) CloseWithError(err error) error { + if d.err != nil { + d.err = err + } + return d.Close() +} + +func (d donutFileWriter) SetMetadata(metadata map[string]string) error { + for k := range d.metadata { + delete(d.metadata, k) + } + for k, v := range metadata { + d.metadata[k] = v + } + return nil +} + +func (d donutFileWriter) GetMetadata() (map[string]string, error) { + metadata := make(map[string]string) + for k, v := range d.metadata { + metadata[k] = v + } + return metadata, nil +} + +func (d donutFileWriter) SetDonutMetadata(metadata map[string]string) error { + for k := range d.donutMetadata { + delete(d.donutMetadata, k) + } + for k, v := range metadata { + d.donutMetadata[k] = v + } + return nil +} + +func (d donutFileWriter) GetDonutMetadata() (map[string]string, error) { + donutMetadata := make(map[string]string) + for k, v := range d.donutMetadata { + donutMetadata[k] = v + } + return donutMetadata, nil +} diff --git a/pkg/storage/donut/erasure.go b/pkg/storage/donut/erasure.go new file mode 100644 index 000000000..3c361351f --- /dev/null +++ b/pkg/storage/donut/erasure.go @@ -0,0 +1,137 @@ +package donut + +import ( + "bytes" + "io" + "strconv" + "time" + + "github.com/minio-io/minio/pkg/encoding/erasure" + "github.com/minio-io/minio/pkg/utils/split" +) + +func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { + totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"]) + totalLeft, _ := strconv.Atoi(donutMetadata["totalLength"]) + blockSize, _ := strconv.Atoi(donutMetadata["blockSize"]) + params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + encoder := erasure.NewEncoder(params) + for _, reader := range readers { + defer reader.Close() + } + for i := 0; i < totalChunks; i++ { + encodedBytes := make([][]byte, 16) + for i, reader := range readers { + var bytesBuffer bytes.Buffer + io.Copy(&bytesBuffer, reader) + encodedBytes[i] = bytesBuffer.Bytes() + } + curBlockSize := totalLeft + if blockSize < totalLeft { + curBlockSize = blockSize + } + decodedData, err := encoder.Decode(encodedBytes, curBlockSize) + if err != nil { + writer.CloseWithError(err) + return + } + io.Copy(writer, bytes.NewBuffer(decodedData)) + totalLeft = totalLeft - blockSize + } + writer.Close() +} + +// erasure writer + +type erasureWriter struct { + writers []Writer + metadata map[string]string + donutMetadata map[string]string // not exposed + erasureWriter *io.PipeWriter + isClosed <-chan bool +} + +func newErasureWriter(writers []Writer) ObjectWriter { + r, w := io.Pipe() + isClosed := make(chan bool) + writer := erasureWriter{ + writers: writers, + metadata: make(map[string]string), + erasureWriter: w, + isClosed: isClosed, + } + go erasureGoroutine(r, writer, isClosed) + return writer +} + +func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- bool) { + chunks := split.Stream(r, 10*1024*1024) + params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + encoder := erasure.NewEncoder(params) + chunkCount := 0 + totalLength := 0 + for chunk := range chunks { + if chunk.Err == nil { + totalLength = totalLength + len(chunk.Data) + encodedBlocks, _ := encoder.Encode(chunk.Data) + for blockIndex, block := range encodedBlocks { + io.Copy(eWriter.writers[blockIndex], bytes.NewBuffer(block)) + } + } + chunkCount = chunkCount + 1 + } + metadata := make(map[string]string) + metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024) + metadata["chunkCount"] = strconv.Itoa(chunkCount) + metadata["created"] = time.Now().Format(time.RFC3339Nano) + metadata["erasureK"] = "8" + metadata["erasureM"] = "8" + metadata["erasureTechnique"] = "Cauchy" + metadata["totalLength"] = strconv.Itoa(totalLength) + for _, nodeWriter := range eWriter.writers { + if nodeWriter != nil { + nodeWriter.SetMetadata(eWriter.metadata) + nodeWriter.SetDonutMetadata(metadata) + nodeWriter.Close() + } + } + isClosed <- true +} + +func (d erasureWriter) Write(data []byte) (int, error) { + io.Copy(d.erasureWriter, bytes.NewBuffer(data)) + return len(data), nil +} + +func (d erasureWriter) Close() error { + d.erasureWriter.Close() + <-d.isClosed + return nil +} + +func (d erasureWriter) CloseWithError(err error) error { + for _, writer := range d.writers { + if writer != nil { + writer.CloseWithError(err) + } + } + return nil +} + +func (d erasureWriter) SetMetadata(metadata map[string]string) error { + for k := range d.metadata { + delete(d.metadata, k) + } + for k, v := range metadata { + d.metadata[k] = v + } + return nil +} + +func (d erasureWriter) GetMetadata() (map[string]string, error) { + metadata := make(map[string]string) + for k, v := range d.metadata { + metadata[k] = v + } + return metadata, nil +} diff --git a/pkg/storage/donut/local.go b/pkg/storage/donut/local.go new file mode 100644 index 000000000..9462ac62c --- /dev/null +++ b/pkg/storage/donut/local.go @@ -0,0 +1,76 @@ +package donut + +import ( + "errors" + "io" + "os" + "path" + "sort" + "strings" + + "encoding/json" + "path/filepath" +) + +type localDirectoryNode struct { + root string +} + +func (d localDirectoryNode) GetBuckets() ([]string, error) { + return nil, errors.New("Not Implemented") +} + +func (d localDirectoryNode) GetWriter(bucket, object string) (Writer, error) { + objectPath := path.Join(d.root, bucket, object) + err := os.MkdirAll(objectPath, 0700) + if err != nil { + return nil, err + } + return newDonutFileWriter(objectPath) +} + +func (d localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) { + return os.Open(path.Join(d.root, bucket, object, "data")) +} + +func (d localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) { + return d.getMetadata(bucket, object, "metadata.json") +} +func (d localDirectoryNode) GetDonutMetadata(bucket, object string) (map[string]string, error) { + return d.getMetadata(bucket, object, "donutMetadata.json") +} + +func (d localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) { + file, err := os.Open(path.Join(d.root, bucket, object, fileName)) + defer file.Close() + if err != nil { + return nil, err + } + metadata := make(map[string]string) + decoder := json.NewDecoder(file) + if err := decoder.Decode(&metadata); err != nil { + return nil, err + } + return metadata, nil + +} + +func (d localDirectoryNode) ListObjects(bucketName string) ([]string, error) { + prefix := path.Join(d.root, bucketName) + var objects []string + if err := filepath.Walk(prefix, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() && strings.HasSuffix(path, "data") { + object := strings.TrimPrefix(path, prefix+"/") + object = strings.TrimSuffix(object, "/data") + objects = append(objects, object) + } + return nil + }); err != nil { + return nil, err + } + sort.Strings(objects) + return objects, nil +}