XL: Fix GetObject erasure decode issues. (#1793)

This commit is contained in:
Harshavardhana 2016-05-29 15:38:14 -07:00
parent 5e8de786b3
commit a4a0ea605b
12 changed files with 154 additions and 146 deletions

View file

@ -16,114 +16,83 @@
package main
import (
"bytes"
"errors"
"io"
)
import "errors"
// ReadFile - decoded erasure coded file.
func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int64) (io.Reader, error) {
var totalLeft = totalSize
var bufWriter = new(bytes.Buffer)
for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before.
var curBlockSize int64
if erasureBlockSize < totalLeft {
curBlockSize = erasureBlockSize
} else {
curBlockSize = totalLeft
func (e erasure) ReadFile(volume, path string, bufferOffset int64, startOffset int64, buffer []byte) (int64, error) {
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(int64(len(buffer)), e.DataBlocks)
offsetEncOffset := getEncodedBlockLen(startOffset, e.DataBlocks)
// Allocate encoded blocks up to storage disks.
enBlocks := make([][]byte, len(e.storageDisks))
// Counter to keep success data blocks.
var successDataBlocksCount = 0
var noReconstruct bool // Set for no reconstruction.
// Read from all the disks.
for index, disk := range e.storageDisks {
blockIndex := e.distribution[index] - 1
// Initialize shard slice and fill the data from each parts.
enBlocks[blockIndex] = make([]byte, curEncBlockSize)
if disk == nil {
enBlocks[blockIndex] = nil
continue
}
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(curBlockSize, e.DataBlocks)
// Allocate encoded blocks up to storage disks.
enBlocks := make([][]byte, len(e.storageDisks))
// Counter to keep success data blocks.
var successDataBlocksCount = 0
var noReconstruct bool // Set for no reconstruction.
// Read from all the disks.
for index, disk := range e.storageDisks {
blockIndex := e.distribution[index] - 1
// Initialize shard slice and fill the data from each parts.
enBlocks[blockIndex] = make([]byte, curEncBlockSize)
if disk == nil {
enBlocks[blockIndex] = nil
} else {
var offset = int64(0)
// Read the necessary blocks.
_, err := disk.ReadFile(volume, path, offset, enBlocks[blockIndex])
if err != nil {
enBlocks[blockIndex] = nil
}
}
// Verify if we have successfully read all the data blocks.
if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil {
successDataBlocksCount++
// Set when we have all the data blocks and no
// reconstruction is needed, so that we can avoid
// erasure reconstruction.
noReconstruct = successDataBlocksCount == e.DataBlocks
if noReconstruct {
// Break out we have read all the data blocks.
break
}
}
}
// Check blocks if they are all zero in length, we have corruption return error.
if checkBlockSize(enBlocks) == 0 {
return nil, errDataCorrupt
}
// Verify if reconstruction is needed, proceed with reconstruction.
if !noReconstruct {
err := e.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
return nil, err
}
// Verify reconstructed blocks (parity).
ok, err := e.ReedSolomon.Verify(enBlocks)
if err != nil {
return nil, err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return nil, err
}
}
// Get data blocks from encoded blocks.
dataBlocks := getDataBlocks(enBlocks, e.DataBlocks, int(curBlockSize))
// Verify if the offset is right for the block, if not move to
// the next block.
if startOffset > 0 {
startOffset = startOffset - int64(len(dataBlocks))
// Start offset is greater than or equal to zero, skip the dataBlocks.
if startOffset >= 0 {
totalLeft = totalLeft - erasureBlockSize
continue
}
// Now get back the remaining offset if startOffset is negative.
startOffset = startOffset + int64(len(dataBlocks))
}
// Copy data blocks.
_, err := bufWriter.Write(dataBlocks[startOffset:])
// Read the necessary blocks.
_, err := disk.ReadFile(volume, path, offsetEncOffset, enBlocks[blockIndex])
if err != nil {
return nil, err
enBlocks[blockIndex] = nil
}
// Verify if we have successfully read all the data blocks.
if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil {
successDataBlocksCount++
// Set when we have all the data blocks and no
// reconstruction is needed, so that we can avoid
// erasure reconstruction.
noReconstruct = successDataBlocksCount == e.DataBlocks
if noReconstruct {
// Break out we have read all the data blocks.
break
}
}
// Reset dataBlocks to relenquish memory.
dataBlocks = nil
// Save what's left after reading erasureBlockSize.
totalLeft = totalLeft - erasureBlockSize
}
return bufWriter, nil
// Check blocks if they are all zero in length, we have corruption return error.
if checkBlockSize(enBlocks) == 0 {
return 0, errDataCorrupt
}
// Verify if reconstruction is needed, proceed with reconstruction.
if !noReconstruct {
err := e.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
return 0, err
}
// Verify reconstructed blocks (parity).
ok, err := e.ReedSolomon.Verify(enBlocks)
if err != nil {
return 0, err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return 0, err
}
}
// Get data blocks from encoded blocks.
dataBlocks, err := getDataBlocks(enBlocks, e.DataBlocks, len(buffer))
if err != nil {
return 0, err
}
// Copy data blocks.
copy(buffer, dataBlocks[bufferOffset:])
// Relenquish memory.
dataBlocks = nil
return int64(len(buffer)), nil
}

View file

@ -16,13 +16,31 @@
package main
import "github.com/klauspost/reedsolomon"
// getDataBlocks - fetches the data block only part of the input encoded blocks.
func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte) {
for _, block := range enBlocks[:dataBlocks] {
data = append(data, block...)
func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte, err error) {
if len(enBlocks) < dataBlocks {
return nil, reedsolomon.ErrTooFewShards
}
data = data[:curBlockSize]
return data
size := 0
blocks := enBlocks[:dataBlocks]
for _, block := range blocks {
size += len(block)
}
if size < curBlockSize {
return nil, reedsolomon.ErrShortData
}
write := curBlockSize
for _, block := range blocks {
if write < len(block) {
data = append(data, block[:write]...)
return data, nil
}
data = append(data, block...)
write -= len(block)
}
return data, nil
}
// checkBlockSize return the size of a single block.

View file

@ -115,7 +115,7 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1)
// loadFormat - load format from disk.
func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
buffer := make([]byte, blockSize)
buffer := make([]byte, blockSizeV1)
offset := int64(0)
var n int64
n, err = disk.ReadFile(minioMetaBucket, formatConfigFile, offset, buffer)

View file

@ -57,7 +57,7 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
// readFSMetadata - returns the object metadata `fs.json` content.
func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) {
buffer := make([]byte, blockSize)
buffer := make([]byte, blockSizeV1)
n, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0), buffer)
if err != nil {
return fsMetaV1{}, err

View file

@ -305,7 +305,7 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s
// Initialize md5 writer.
md5Writer := md5.New()
var buf = make([]byte, blockSize)
var buf = make([]byte, blockSizeV1)
for {
n, err := io.ReadFull(data, buf)
if err == io.EOF {
@ -476,7 +476,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
tempObj := path.Join(tmpMetaPrefix, uploadID, "object1")
var buffer = make([]byte, blockSize)
var buffer = make([]byte, blockSizeV1)
// Loop through all parts, validate them and then commit to disk.
for i, part := range parts {

View file

@ -160,8 +160,8 @@ func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length i
for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before.
var curBlockSize int64
if blockSize < totalLeft {
curBlockSize = blockSize
if blockSizeV1 < totalLeft {
curBlockSize = blockSizeV1
} else {
curBlockSize = totalLeft
}
@ -212,10 +212,6 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
}, nil
}
const (
blockSize = 4 * 1024 * 1024 // 4MiB.
)
// PutObject - create an object.
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
// Verify if bucket is valid.
@ -245,7 +241,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
}
} else {
// Allocate buffer.
buf := make([]byte, blockSize)
buf := make([]byte, blockSizeV1)
for {
n, rErr := data.Read(buf)
if rErr == io.EOF {

View file

@ -21,6 +21,11 @@ import (
"sync"
)
const (
// Block size used for all internal operations version 1.
blockSizeV1 = 10 * 1024 * 1024 // 10MiB.
)
// Common initialization needed for both object layers.
func initObjectLayer(storageDisks ...StorageAPI) error {
// This happens for the first time, but keep this here since this

View file

@ -42,7 +42,7 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro
go func(index int, disk StorageAPI) {
defer wg.Done()
offset := int64(0)
var buffer = make([]byte, blockSize)
var buffer = make([]byte, blockSizeV1)
n, err := disk.ReadFile(bucket, xlMetaPath, offset, buffer)
if err != nil {
errs[index] = err

View file

@ -25,9 +25,8 @@ import (
"time"
)
// Erasure block size.
const (
erasureBlockSize = 4 * 1024 * 1024 // 4MiB.
// Erasure related constants.
erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde"
erasureAlgorithmISAL = "isa-l/reedsolomon/cauchy"
)
@ -140,8 +139,8 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
// Count for errors encountered.
var xlJSONErrCount = 0
// Allocate 4MiB buffer.
buffer := make([]byte, blockSize)
// Allocate 10MiB buffer.
buffer := make([]byte, blockSizeV1)
// Return the first successful lookup from a random list of disks.
for xlJSONErrCount < len(xl.storageDisks) {
@ -168,7 +167,7 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost
xlMeta.Erasure.DataBlocks = dataBlocks
xlMeta.Erasure.ParityBlocks = parityBlocks
xlMeta.Erasure.BlockSize = erasureBlockSize
xlMeta.Erasure.BlockSize = blockSizeV1
xlMeta.Erasure.Distribution = randErasureDistribution(dataBlocks + parityBlocks)
return xlMeta
}

View file

@ -81,7 +81,7 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI
// Read `uploads.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
var buffer = make([]byte, blockSize)
var buffer = make([]byte, blockSizeV1) // Allocate blockSized buffer.
n, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0), buffer)
if rErr != nil {
errs[index] = rErr

View file

@ -146,7 +146,10 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
// Initialize md5 writer.
md5Writer := md5.New()
buf := make([]byte, blockSize)
// Allocate blocksized buffer for reading.
buf := make([]byte, blockSizeV1)
// Read until io.EOF, fill the allocated buf.
for {
var n int
n, err = io.ReadFull(data, buf)
@ -167,6 +170,8 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
return "", toObjectErr(errUnexpected, bucket, object)
}
}
// Calculate new md5sum.
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
@ -174,6 +179,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
}
}
// Rename temporary part file to its final location.
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil {

View file

@ -1,7 +1,6 @@
package main
import (
"bytes"
"crypto/md5"
"encoding/hex"
"io"
@ -51,27 +50,42 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
if err != nil {
return toObjectErr(err, bucket, object)
}
totalLeft := length
for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex]
var buffer io.Reader
buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size)
if err != nil {
return err
}
if int64(buffer.(*bytes.Buffer).Len()) > totalLeft {
if _, err := io.CopyN(writer, buffer, totalLeft); err != nil {
totalLeft := part.Size
beginOffset := int64(0)
for totalLeft > 0 {
var curBlockSize int64
if xlMeta.Erasure.BlockSize < totalLeft {
curBlockSize = xlMeta.Erasure.BlockSize
} else {
curBlockSize = totalLeft
}
var buffer = make([]byte, curBlockSize)
var n int64
n, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, beginOffset, buffer)
if err != nil {
return err
}
return nil
if length > int64(len(buffer)) {
var m int
m, err = writer.Write(buffer)
if err != nil {
return err
}
length -= int64(m)
} else {
_, err = writer.Write(buffer[:length])
if err != nil {
return err
}
return nil
}
totalLeft -= partOffset + n
beginOffset += n
// Reset part offset to 0 to read rest of the parts from the beginning.
partOffset = 0
}
n, err := io.Copy(writer, buffer)
if err != nil {
return err
}
totalLeft -= n
// Reset part offset to 0 to read rest of the parts from the beginning.
partOffset = 0
}
return nil
}
@ -222,7 +236,8 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Initialize md5 writer.
md5Writer := md5.New()
buf := make([]byte, blockSize)
// Allocated blockSized buffer for reading.
buf := make([]byte, blockSizeV1)
for {
var n int
n, err = io.ReadFull(data, buf)