Merge pull request #105 from fkautz/pr_out_erasure_encoded_demo

This commit is contained in:
Harshavardhana 2014-12-11 00:43:02 -08:00
commit 063d668133
6 changed files with 421 additions and 2 deletions

View file

@ -32,7 +32,7 @@ build-split: build-strbyteconv
build-strbyteconv:
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/strbyteconv
build-storage: build-storage-fs build-storage-append
build-storage: build-storage-fs build-storage-append build-storage-encoded
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage
build-storage-fs:
@ -41,6 +41,9 @@ build-storage-fs:
build-storage-append:
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage/appendstorage
build-storage-encoded:
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage/encodedstorage
cover: build-erasure build-signify build-split build-crc32c build-cpu build-sha1 build-storage
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/gateway

View file

@ -0,0 +1,46 @@
package main
import (
"errors"
"io"
"os"
"path"
"github.com/minio-io/minio/pkgs/storage"
es "github.com/minio-io/minio/pkgs/storage/encodedstorage"
)
func erasureGetList(config inputConfig) (io.Reader, error) {
// do nothing
return nil, errors.New("Not Implemented")
}
func erasureGet(config inputConfig, objectPath string) (io.Reader, error) {
var objectStorage storage.ObjectStorage
rootDir := path.Join(config.rootDir, config.storageDriver)
objectStorage, err := es.NewStorage(rootDir, 10, 6, 1024*1024)
if err != nil {
return nil, err
}
object, err := objectStorage.Get(objectPath)
if err != nil {
return nil, err
}
return object, nil
}
func erasurePut(config inputConfig, objectPath string, reader io.Reader) error {
var err error
rootDir := path.Join(config.rootDir, config.storageDriver)
if err := os.MkdirAll(rootDir, 0700); err != nil {
return err
}
var objectStorage storage.ObjectStorage
if objectStorage, err = es.NewStorage(rootDir, 10, 6, 1024*1024); err != nil {
return err
}
if err = objectStorage.Put(objectPath, reader); err != nil {
return err
}
return nil
}

View file

@ -29,6 +29,12 @@ func get(c *cli.Context) {
}
}
}
case "erasure":
{
if objectReader, err = erasureGet(config, objectName); err != nil {
log.Fatal(err)
}
}
default:
{
log.Fatal("Unknown driver")

View file

@ -29,7 +29,15 @@ func put(c *cli.Context) {
switch config.storageDriver {
case "fs":
{
fsPut(config, c.Args().Get(0), inputFile)
if err := fsPut(config, c.Args().Get(0), inputFile); err != nil {
log.Fatal(err)
}
}
case "erasure":
{
if err := erasurePut(config, c.Args().Get(0), inputFile); err != nil {
log.Fatal(err)
}
}
default:
{

View file

@ -0,0 +1,232 @@
package encodedstorage
import (
"bytes"
"encoding/gob"
"errors"
"io"
"io/ioutil"
"os"
"path"
"strconv"
"github.com/minio-io/minio/pkgs/erasure"
"github.com/minio-io/minio/pkgs/split"
"github.com/minio-io/minio/pkgs/storage"
"github.com/minio-io/minio/pkgs/storage/appendstorage"
)
type encodedStorage struct {
RootDir string
K int
M int
BlockSize uint64
objects map[string]StorageEntry
diskStorage []storage.ObjectStorage
}
func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) {
// create storage files
storageNodes := make([]storage.ObjectStorage, 16)
for i := 0; i < 16; i++ {
storageNode, err := appendstorage.NewStorage(rootDir, i)
storageNodes[i] = storageNode
if err != nil {
return nil, err
}
}
objects := make(map[string]StorageEntry)
indexPath := path.Join(rootDir, "index")
if _, err := os.Stat(indexPath); err == nil {
indexFile, err := os.Open(indexPath)
defer indexFile.Close()
if err != nil {
return nil, err
}
encoder := gob.NewDecoder(indexFile)
err = encoder.Decode(&objects)
if err != nil {
return nil, err
}
}
newStorage := encodedStorage{
RootDir: rootDir,
K: k,
M: m,
BlockSize: blockSize,
objects: objects,
diskStorage: storageNodes,
}
return &newStorage, nil
}
func (eStorage *encodedStorage) Get(objectPath string) (io.Reader, error) {
entry, ok := eStorage.objects[objectPath]
if ok == false {
return nil, nil
}
reader, writer := io.Pipe()
go eStorage.readObject(objectPath, entry, writer)
return reader, nil
}
func (eStorage *encodedStorage) List(listPath string) ([]storage.ObjectDescription, error) {
return nil, errors.New("Not Implemented")
}
func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error {
// split
chunks := make(chan split.SplitMessage)
go split.SplitStream(object, eStorage.BlockSize, chunks)
// for each chunk
encoderParameters, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY)
if err != nil {
return err
}
encoder := erasure.NewEncoder(encoderParameters)
entry := StorageEntry{
Path: objectPath,
Md5sum: "md5sum",
Crc: 24,
Blocks: make([]StorageBlockEntry, 0),
}
i := 0
// encode
for chunk := range chunks {
if chunk.Err == nil {
// encode each
blocks, length := encoder.Encode(chunk.Data)
// store each
storeErrors := eStorage.storeBlocks(objectPath+"$"+strconv.Itoa(i), blocks)
for _, err := range storeErrors {
if err != nil {
return err
}
}
blockEntry := StorageBlockEntry{
Index: i,
Length: length,
}
entry.Blocks = append(entry.Blocks, blockEntry)
} else {
return chunk.Err
}
i++
}
eStorage.objects[objectPath] = entry
var gobBuffer bytes.Buffer
gobEncoder := gob.NewEncoder(&gobBuffer)
gobEncoder.Encode(eStorage.objects)
ioutil.WriteFile(path.Join(eStorage.RootDir, "index"), gobBuffer.Bytes(), 0600)
return nil
}
type storeRequest struct {
path string
data []byte
}
type storeResponse struct {
data []byte
err error
}
type StorageEntry struct {
Path string
Md5sum string
Crc uint32
Blocks []StorageBlockEntry
}
type StorageBlockEntry struct {
Index int
Length int
}
func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []error {
returnChannels := make([]<-chan error, len(eStorage.diskStorage))
for i, store := range eStorage.diskStorage {
returnChannels[i] = storageRoutine(store, path, bytes.NewBuffer(blocks[i]))
}
returnErrors := make([]error, 0)
for _, returnChannel := range returnChannels {
for returnValue := range returnChannel {
if returnValue != nil {
returnErrors = append(returnErrors, returnValue)
}
}
}
return returnErrors
}
func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) {
params, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY)
if err != nil {
}
encoder := erasure.NewEncoder(params)
for i, chunk := range entry.Blocks {
blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i))
var blocks [][]byte
for _, slice := range blockSlices {
if slice.err != nil {
writer.CloseWithError(err)
return
}
blocks = append(blocks, slice.data)
}
data, err := encoder.Decode(blocks, chunk.Length)
if err != nil {
writer.CloseWithError(err)
return
}
bytesWritten := 0
for bytesWritten != len(data) {
written, err := writer.Write(data[bytesWritten:len(data)])
if err != nil {
writer.CloseWithError(err)
}
bytesWritten += written
}
}
writer.Close()
}
func (eStorage *encodedStorage) getBlockSlices(objectPath string) []storeResponse {
responses := make([]<-chan storeResponse, 0)
for i := 0; i < len(eStorage.diskStorage); i++ {
response := getSlice(eStorage.diskStorage[i], objectPath)
responses = append(responses, response)
}
results := make([]storeResponse, 0)
for _, response := range responses {
results = append(results, <-response)
}
return results
}
func getSlice(store storage.ObjectStorage, path string) <-chan storeResponse {
out := make(chan storeResponse)
go func() {
obj, err := store.Get(path)
if err != nil {
out <- storeResponse{data: nil, err: err}
} else {
data, err := ioutil.ReadAll(obj)
out <- storeResponse{data: data, err: err}
}
close(out)
}()
return out
}
func storageRoutine(store storage.ObjectStorage, path string, data io.Reader) <-chan error {
out := make(chan error)
go func() {
if err := store.Put(path, data); err != nil {
out <- err
}
close(out)
}()
return out
}

View file

@ -0,0 +1,124 @@
package encodedstorage
import (
"bytes"
"io/ioutil"
"os"
"strconv"
"testing"
"github.com/minio-io/minio/pkgs/storage"
. "gopkg.in/check.v1"
)
type EncodedStorageSuite struct{}
var _ = Suite(&EncodedStorageSuite{})
func Test(t *testing.T) { TestingT(t) }
func makeTempTestDir() (string, error) {
return ioutil.TempDir("/tmp", "minio-test-")
}
func (s *EncodedStorageSuite) TestFileStoragePutAtRootPath(c *C) {
rootDir, err := makeTempTestDir()
c.Assert(err, IsNil)
defer os.RemoveAll(rootDir)
var objectStorage storage.ObjectStorage
objectStorage, err = NewStorage(rootDir, 10, 6, 1024)
c.Assert(err, IsNil)
objectBuffer := bytes.NewBuffer([]byte("object1"))
objectStorage.Put("path1", objectBuffer)
// assert object1 was created in correct path
objectResult1, err := objectStorage.Get("path1")
c.Assert(err, IsNil)
object1, _ := ioutil.ReadAll(objectResult1)
c.Assert(string(object1), Equals, "object1")
// objectList, err := objectStorage.List("/")
// c.Assert(err, IsNil)
// c.Assert(objectList[0].Path, Equals, "path1")
}
func (s *EncodedStorageSuite) TestFileStoragePutDirPath(c *C) {
rootDir, err := makeTempTestDir()
c.Assert(err, IsNil)
defer os.RemoveAll(rootDir)
var objectStorage storage.ObjectStorage
objectStorage, err = NewStorage(rootDir, 10, 6, 1024)
c.Assert(err, IsNil)
objectBuffer1 := bytes.NewBuffer([]byte("object1"))
objectStorage.Put("path1/path2/path3", objectBuffer1)
// assert object1 was created in correct path
objectResult1, err := objectStorage.Get("path1/path2/path3")
c.Assert(err, IsNil)
object1, _ := ioutil.ReadAll(objectResult1)
c.Assert(string(object1), Equals, "object1")
// add second object
objectBuffer2 := bytes.NewBuffer([]byte("object2"))
err = objectStorage.Put("path2/path2/path2", objectBuffer2)
c.Assert(err, IsNil)
// add third object
objectBuffer3 := bytes.NewBuffer([]byte("object3"))
err = objectStorage.Put("object3", objectBuffer3)
c.Assert(err, IsNil)
// TODO support list
// objectList, err := objectStorage.List("/")
// c.Assert(err, IsNil)
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "object3", IsDir: false, Hash: ""})
// c.Assert(objectList[1], Equals, storage.ObjectDescription{Path: "path1", IsDir: true, Hash: ""})
// c.Assert(objectList[2], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
// c.Assert(len(objectList), Equals, 3)
//
// objectList, err = objectStorage.List("/path1")
// c.Assert(err, IsNil)
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
// c.Assert(len(objectList), Equals, 1)
//
// objectList, err = objectStorage.List("/path1/path2")
// c.Assert(err, IsNil)
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path3", IsDir: false, Hash: ""})
// c.Assert(len(objectList), Equals, 1)
//
// objectList, err = objectStorage.List("/path1/path2/path3")
// c.Assert(err, Not(IsNil))
// c.Assert(objectList, IsNil)
}
func (s *EncodedStorageSuite) TestObjectWithChunking(c *C) {
rootDir, err := makeTempTestDir()
c.Assert(err, IsNil)
defer os.RemoveAll(rootDir)
var objectStorage storage.ObjectStorage
objectStorage, err = NewStorage(rootDir, 10, 6, 1024)
c.Assert(err, IsNil)
var buffer bytes.Buffer
for i := 0; i <= 2048; i++ {
buffer.Write([]byte(strconv.Itoa(i)))
}
reader := bytes.NewReader(buffer.Bytes())
err = objectStorage.Put("object", reader)
c.Assert(err, IsNil)
objectStorage2, err := NewStorage(rootDir, 10, 6, 1024)
c.Assert(err, IsNil)
objectResult, err := objectStorage2.Get("object")
c.Assert(err, IsNil)
result, err := ioutil.ReadAll(objectResult)
c.Assert(err, IsNil)
c.Assert(bytes.Compare(result, buffer.Bytes()), Equals, 0)
}