minio/pkg/donut/bucket.go
Harshavardhana e082f26e10 Improving EncoderStream to return error only upon non io.EOF.
io.EOF is okay since io.ReadFull will not have read any bytes at all.

Also making error channel receive only for go routine.
2015-07-25 15:57:30 -07:00

612 lines
18 KiB
Go

/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bytes"
"fmt"
"io"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"crypto/md5"
"encoding/hex"
"encoding/json"
"github.com/minio/minio/pkg/crypto/sha256"
"github.com/minio/minio/pkg/crypto/sha512"
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/iodine"
)
const (
blockSize = 10 * 1024 * 1024
)
// internal struct carrying bucket specific information
type bucket struct {
name string
acl string
time time.Time
donutName string
nodes map[string]node
lock *sync.Mutex
}
// newBucket - instantiate a new bucket
func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bucket, BucketMetadata, error) {
errParams := map[string]string{
"bucketName": bucketName,
"donutName": donutName,
"aclType": aclType,
}
if strings.TrimSpace(bucketName) == "" || strings.TrimSpace(donutName) == "" {
return bucket{}, BucketMetadata{}, iodine.New(InvalidArgument{}, errParams)
}
b := bucket{}
t := time.Now().UTC()
b.name = bucketName
b.acl = aclType
b.time = t
b.donutName = donutName
b.nodes = nodes
b.lock = new(sync.Mutex)
metadata := BucketMetadata{}
metadata.Version = bucketMetadataVersion
metadata.Name = bucketName
metadata.ACL = BucketACL(aclType)
metadata.Created = t
metadata.Metadata = make(map[string]string)
metadata.BucketObjects = make(map[string]struct{})
return b, metadata, nil
}
// getBucketName -
func (b bucket) getBucketName() string {
return b.name
}
// getBucketMetadataReaders -
func (b bucket) getBucketMetadataReaders() (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
var disks map[int]disk.Disk
var err error
for _, node := range b.nodes {
disks, err = node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
}
var bucketMetaDataReader io.ReadCloser
for order, disk := range disks {
bucketMetaDataReader, err = disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig))
if err != nil {
continue
}
readers[order] = bucketMetaDataReader
}
if err != nil {
return nil, iodine.New(err, nil)
}
return readers, nil
}
// getBucketMetadata -
func (b bucket) getBucketMetadata() (*AllBuckets, error) {
var err error
metadata := new(AllBuckets)
readers, err := b.getBucketMetadataReaders()
if err != nil {
return nil, iodine.New(err, nil)
}
for _, reader := range readers {
defer reader.Close()
}
for _, reader := range readers {
jenc := json.NewDecoder(reader)
if err = jenc.Decode(metadata); err == nil {
return metadata, nil
}
}
return nil, iodine.New(err, nil)
}
// GetObjectMetadata - get metadata for an object
func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) {
b.lock.Lock()
defer b.lock.Unlock()
return b.readObjectMetadata(objectName)
}
// ListObjects - list all objects
func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) {
b.lock.Lock()
defer b.lock.Unlock()
if maxkeys <= 0 {
maxkeys = 1000
}
var isTruncated bool
var objects []string
bucketMetadata, err := b.getBucketMetadata()
if err != nil {
return ListObjectsResults{}, iodine.New(err, nil)
}
for objectName := range bucketMetadata.Buckets[b.getBucketName()].Multiparts {
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
if objectName > marker {
objects = append(objects, objectName)
}
}
}
for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjects {
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
if objectName > marker {
objects = append(objects, objectName)
}
}
}
if strings.TrimSpace(prefix) != "" {
objects = TrimPrefix(objects, prefix)
}
var prefixes []string
var filteredObjects []string
filteredObjects = objects
if strings.TrimSpace(delimiter) != "" {
filteredObjects = HasNoDelimiter(objects, delimiter)
prefixes = HasDelimiter(objects, delimiter)
prefixes = SplitDelimiter(prefixes, delimiter)
prefixes = SortUnique(prefixes)
}
var results []string
var commonPrefixes []string
for _, commonPrefix := range prefixes {
commonPrefixes = append(commonPrefixes, prefix+commonPrefix)
}
filteredObjects = RemoveDuplicates(filteredObjects)
sort.Strings(filteredObjects)
for _, objectName := range filteredObjects {
if len(results) >= maxkeys {
isTruncated = true
break
}
results = append(results, prefix+objectName)
}
results = RemoveDuplicates(results)
commonPrefixes = RemoveDuplicates(commonPrefixes)
sort.Strings(commonPrefixes)
listObjects := ListObjectsResults{}
listObjects.Objects = make(map[string]ObjectMetadata)
listObjects.CommonPrefixes = commonPrefixes
listObjects.IsTruncated = isTruncated
for _, objectName := range results {
objMetadata, err := b.readObjectMetadata(normalizeObjectName(objectName))
if err != nil {
return ListObjectsResults{}, iodine.New(err, nil)
}
listObjects.Objects[objectName] = objMetadata
}
return listObjects, nil
}
// ReadObject - open an object to read
func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, err error) {
b.lock.Lock()
defer b.lock.Unlock()
reader, writer := io.Pipe()
// get list of objects
bucketMetadata, err := b.getBucketMetadata()
if err != nil {
return nil, 0, iodine.New(err, nil)
}
// check if object exists
if _, ok := bucketMetadata.Buckets[b.getBucketName()].BucketObjects[objectName]; !ok {
return nil, 0, iodine.New(ObjectNotFound{Object: objectName}, nil)
}
objMetadata, err := b.readObjectMetadata(normalizeObjectName(objectName))
if err != nil {
return nil, 0, iodine.New(err, nil)
}
// read and reply back to GetObject() request in a go-routine
go b.readObjectData(normalizeObjectName(objectName), writer, objMetadata)
return reader, objMetadata.Size, nil
}
// WriteObject - write a new object into bucket
func (b bucket) WriteObject(objectName string, objectData io.Reader, size int64, expectedMD5Sum string, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
b.lock.Lock()
defer b.lock.Unlock()
if objectName == "" || objectData == nil {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
}
writers, err := b.getObjectWriters(normalizeObjectName(objectName), "data")
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
sumMD5 := md5.New()
sum256 := sha256.New()
sum512 := sha512.New()
objMetadata := ObjectMetadata{}
objMetadata.Version = objectMetadataVersion
objMetadata.Created = time.Now().UTC()
// if total writers are only '1' do not compute erasure
switch len(writers) == 1 {
case true:
mw := io.MultiWriter(writers[0], sumMD5, sum256, sum512)
totalLength, err := io.Copy(mw, objectData)
if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
}
objMetadata.Size = totalLength
case false:
// calculate data and parity dictated by total number of writers
k, m, err := b.getDataAndParity(len(writers))
if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
}
mwriter := io.MultiWriter(sumMD5, sum256, sum512)
// write encoded data with k, m and writers
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, size, mwriter)
if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
}
/// donutMetadata section
objMetadata.BlockSize = blockSize
objMetadata.ChunkCount = chunkCount
objMetadata.DataDisks = k
objMetadata.ParityDisks = m
objMetadata.ErasureTechnique = "Cauchy"
objMetadata.Size = int64(totalLength)
}
objMetadata.Bucket = b.getBucketName()
objMetadata.Object = objectName
dataMD5sum := sumMD5.Sum(nil)
dataSHA512sum := sum512.Sum(nil)
if signature != nil {
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sum256.Sum(nil)))
if err != nil {
// error occurred while doing signature calculation, we return and also cleanup any temporary writers.
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
}
if !ok {
// purge all writers, when control flow reaches here
//
// Signature mismatch occurred all temp files to be removed and all data purged.
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(SignatureDoesNotMatch{}, nil)
}
}
objMetadata.MD5Sum = hex.EncodeToString(dataMD5sum)
objMetadata.SHA512Sum = hex.EncodeToString(dataSHA512sum)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := b.isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), objMetadata.MD5Sum); err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
}
objMetadata.Metadata = metadata
// write object specific metadata
if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil {
// purge all writers, when control flow reaches here
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
}
// close all writers, when control flow reaches here
for _, writer := range writers {
writer.Close()
}
return objMetadata, nil
}
// isMD5SumEqual - returns error if md5sum mismatches, other its `nil`
func (b bucket) isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return iodine.New(BadDigest{}, nil)
}
return nil
}
return iodine.New(InvalidArgument{}, nil)
}
// writeObjectMetadata - write additional object metadata
func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadata) error {
if objMetadata.Object == "" {
return iodine.New(InvalidArgument{}, nil)
}
objMetadataWriters, err := b.getObjectWriters(objectName, objectMetadataConfig)
if err != nil {
return iodine.New(err, nil)
}
for _, objMetadataWriter := range objMetadataWriters {
jenc := json.NewEncoder(objMetadataWriter)
if err := jenc.Encode(&objMetadata); err != nil {
// Close writers and purge all temporary entries
CleanupWritersOnError(objMetadataWriters)
return iodine.New(err, nil)
}
}
for _, objMetadataWriter := range objMetadataWriters {
objMetadataWriter.Close()
}
return nil
}
// readObjectMetadata - read object metadata
func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) {
if objectName == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
}
var err error
objMetadata := ObjectMetadata{}
objMetadataReaders, err := b.getObjectReaders(objectName, objectMetadataConfig)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
for _, objMetadataReader := range objMetadataReaders {
defer objMetadataReader.Close()
}
for _, objMetadataReader := range objMetadataReaders {
jdec := json.NewDecoder(objMetadataReader)
if err = jdec.Decode(&objMetadata); err == nil {
return objMetadata, nil
}
}
return ObjectMetadata{}, iodine.New(err, nil)
}
// TODO - This a temporary normalization of objectNames, need to find a better way
//
// normalizedObjectName - all objectNames with "/" get normalized to a simple objectName
//
// example:
// user provided value - "this/is/my/deep/directory/structure"
// donut normalized value - "this-is-my-deep-directory-structure"
//
func normalizeObjectName(objectName string) string {
// replace every '/' with '-'
return strings.Replace(objectName, "/", "-", -1)
}
// getDataAndParity - calculate k, m (data and parity) values from number of disks
func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) {
if totalWriters <= 1 {
return 0, 0, iodine.New(InvalidArgument{}, nil)
}
quotient := totalWriters / 2 // not using float or abs to let integer round off to lower value
// quotient cannot be bigger than (255 / 2) = 127
if quotient > 127 {
return 0, 0, iodine.New(ParityOverflow{}, nil)
}
remainder := totalWriters % 2 // will be 1 for odd and 0 for even numbers
k = uint8(quotient + remainder)
m = uint8(quotient)
return k, m, nil
}
// writeObjectData -
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, size int64, writer io.Writer) (int, int, error) {
encoder, err := newEncoder(k, m, "Cauchy")
chunkSize := int64(10 * 1024 * 1024)
if err != nil {
return 0, 0, iodine.New(err, nil)
}
chunkCount := 0
totalLength := 0
remaining := size
for remaining > 0 {
readSize := chunkSize
if remaining < chunkSize {
readSize = remaining
}
remaining = remaining - readSize
totalLength = totalLength + int(readSize)
encodedBlocks, inputData, err := encoder.EncodeStream(objectData, readSize)
if err != nil {
return 0, 0, iodine.New(err, nil)
}
_, err = writer.Write(inputData)
if err != nil {
return 0, 0, iodine.New(err, nil)
}
for blockIndex, block := range encodedBlocks {
errCh := make(chan error, 1)
go func(writer io.Writer, reader io.Reader, errCh chan<- error) {
defer close(errCh)
_, err := io.Copy(writer, reader)
errCh <- err
}(writers[blockIndex], bytes.NewReader(block), errCh)
if err := <-errCh; err != nil {
// Returning error is fine here CleanupErrors() would cleanup writers
return 0, 0, iodine.New(err, nil)
}
}
chunkCount = chunkCount + 1
}
return chunkCount, totalLength, nil
}
// readObjectData -
func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) {
readers, err := b.getObjectReaders(objectName, "data")
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
for _, reader := range readers {
defer reader.Close()
}
expectedMd5sum, err := hex.DecodeString(objMetadata.MD5Sum)
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
expected512Sum, err := hex.DecodeString(objMetadata.SHA512Sum)
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
hasher := md5.New()
sum512hasher := sha256.New()
mwriter := io.MultiWriter(writer, hasher, sum512hasher)
switch len(readers) > 1 {
case true:
if objMetadata.ErasureTechnique == "" {
writer.CloseWithError(iodine.New(MissingErasureTechnique{}, nil))
return
}
encoder, err := newEncoder(objMetadata.DataDisks, objMetadata.ParityDisks, objMetadata.ErasureTechnique)
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
totalLeft := objMetadata.Size
for i := 0; i < objMetadata.ChunkCount; i++ {
decodedData, err := b.decodeEncodedData(totalLeft, int64(objMetadata.BlockSize), readers, encoder, writer)
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
_, err = io.Copy(mwriter, bytes.NewReader(decodedData))
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
totalLeft = totalLeft - int64(objMetadata.BlockSize)
}
case false:
_, err := io.Copy(writer, readers[0])
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
}
// check if decodedData md5sum matches
if !bytes.Equal(expectedMd5sum, hasher.Sum(nil)) {
writer.CloseWithError(iodine.New(ChecksumMismatch{}, nil))
return
}
if !bytes.Equal(expected512Sum, sum512hasher.Sum(nil)) {
writer.CloseWithError(iodine.New(ChecksumMismatch{}, nil))
return
}
writer.Close()
return
}
// decodeEncodedData -
func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers map[int]io.ReadCloser, encoder encoder, writer *io.PipeWriter) ([]byte, error) {
var curBlockSize int64
if blockSize < totalLeft {
curBlockSize = blockSize
} else {
curBlockSize = totalLeft
}
curChunkSize, err := encoder.GetEncodedBlockLen(int(curBlockSize))
if err != nil {
return nil, iodine.New(err, nil)
}
encodedBytes := make([][]byte, encoder.k+encoder.m)
for i, reader := range readers {
var bytesBuffer bytes.Buffer
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
if err != nil {
return nil, iodine.New(err, nil)
}
encodedBytes[i] = bytesBuffer.Bytes()
}
decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize))
if err != nil {
return nil, iodine.New(err, nil)
}
return decodedData, nil
}
// getObjectReaders -
func (b bucket) getObjectReaders(objectName, objectMeta string) (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
var disks map[int]disk.Disk
var err error
nodeSlice := 0
for _, node := range b.nodes {
disks, err = node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
for order, disk := range disks {
var objectSlice io.ReadCloser
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
objectPath := filepath.Join(b.donutName, bucketSlice, objectName, objectMeta)
objectSlice, err = disk.OpenFile(objectPath)
if err == nil {
readers[order] = objectSlice
}
}
nodeSlice = nodeSlice + 1
}
if err != nil {
return nil, iodine.New(err, nil)
}
return readers, nil
}
// getObjectWriters -
func (b bucket) getObjectWriters(objectName, objectMeta string) ([]io.WriteCloser, error) {
var writers []io.WriteCloser
nodeSlice := 0
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
writers = make([]io.WriteCloser, len(disks))
for order, disk := range disks {
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
objectPath := filepath.Join(b.donutName, bucketSlice, objectName, objectMeta)
objectSlice, err := disk.CreateFile(objectPath)
if err != nil {
return nil, iodine.New(err, nil)
}
writers[order] = objectSlice
}
nodeSlice = nodeSlice + 1
}
return writers, nil
}