diff --git a/erasure-readfile.go b/erasure-readfile.go index a20f4dde1..5edaab574 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -16,114 +16,83 @@ package main -import ( - "bytes" - "errors" - "io" -) +import "errors" // ReadFile - decoded erasure coded file. -func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int64) (io.Reader, error) { - var totalLeft = totalSize - var bufWriter = new(bytes.Buffer) - for totalLeft > 0 { - // Figure out the right blockSize as it was encoded before. - var curBlockSize int64 - if erasureBlockSize < totalLeft { - curBlockSize = erasureBlockSize - } else { - curBlockSize = totalLeft +func (e erasure) ReadFile(volume, path string, bufferOffset int64, startOffset int64, buffer []byte) (int64, error) { + // Calculate the current encoded block size. + curEncBlockSize := getEncodedBlockLen(int64(len(buffer)), e.DataBlocks) + offsetEncOffset := getEncodedBlockLen(startOffset, e.DataBlocks) + + // Allocate encoded blocks up to storage disks. + enBlocks := make([][]byte, len(e.storageDisks)) + + // Counter to keep success data blocks. + var successDataBlocksCount = 0 + var noReconstruct bool // Set for no reconstruction. + + // Read from all the disks. + for index, disk := range e.storageDisks { + blockIndex := e.distribution[index] - 1 + // Initialize shard slice and fill the data from each parts. + enBlocks[blockIndex] = make([]byte, curEncBlockSize) + if disk == nil { + enBlocks[blockIndex] = nil + continue } - - // Calculate the current encoded block size. - curEncBlockSize := getEncodedBlockLen(curBlockSize, e.DataBlocks) - - // Allocate encoded blocks up to storage disks. - enBlocks := make([][]byte, len(e.storageDisks)) - - // Counter to keep success data blocks. - var successDataBlocksCount = 0 - var noReconstruct bool // Set for no reconstruction. - - // Read from all the disks. - for index, disk := range e.storageDisks { - blockIndex := e.distribution[index] - 1 - // Initialize shard slice and fill the data from each parts. - enBlocks[blockIndex] = make([]byte, curEncBlockSize) - if disk == nil { - enBlocks[blockIndex] = nil - } else { - var offset = int64(0) - // Read the necessary blocks. - _, err := disk.ReadFile(volume, path, offset, enBlocks[blockIndex]) - if err != nil { - enBlocks[blockIndex] = nil - } - } - // Verify if we have successfully read all the data blocks. - if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil { - successDataBlocksCount++ - // Set when we have all the data blocks and no - // reconstruction is needed, so that we can avoid - // erasure reconstruction. - noReconstruct = successDataBlocksCount == e.DataBlocks - if noReconstruct { - // Break out we have read all the data blocks. - break - } - } - } - - // Check blocks if they are all zero in length, we have corruption return error. - if checkBlockSize(enBlocks) == 0 { - return nil, errDataCorrupt - } - - // Verify if reconstruction is needed, proceed with reconstruction. - if !noReconstruct { - err := e.ReedSolomon.Reconstruct(enBlocks) - if err != nil { - return nil, err - } - // Verify reconstructed blocks (parity). - ok, err := e.ReedSolomon.Verify(enBlocks) - if err != nil { - return nil, err - } - if !ok { - // Blocks cannot be reconstructed, corrupted data. - err = errors.New("Verification failed after reconstruction, data likely corrupted.") - return nil, err - } - } - - // Get data blocks from encoded blocks. - dataBlocks := getDataBlocks(enBlocks, e.DataBlocks, int(curBlockSize)) - - // Verify if the offset is right for the block, if not move to - // the next block. - if startOffset > 0 { - startOffset = startOffset - int64(len(dataBlocks)) - // Start offset is greater than or equal to zero, skip the dataBlocks. - if startOffset >= 0 { - totalLeft = totalLeft - erasureBlockSize - continue - } - // Now get back the remaining offset if startOffset is negative. - startOffset = startOffset + int64(len(dataBlocks)) - } - - // Copy data blocks. - _, err := bufWriter.Write(dataBlocks[startOffset:]) + // Read the necessary blocks. + _, err := disk.ReadFile(volume, path, offsetEncOffset, enBlocks[blockIndex]) if err != nil { - return nil, err + enBlocks[blockIndex] = nil + } + // Verify if we have successfully read all the data blocks. + if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil { + successDataBlocksCount++ + // Set when we have all the data blocks and no + // reconstruction is needed, so that we can avoid + // erasure reconstruction. + noReconstruct = successDataBlocksCount == e.DataBlocks + if noReconstruct { + // Break out we have read all the data blocks. + break + } } - - // Reset dataBlocks to relenquish memory. - dataBlocks = nil - - // Save what's left after reading erasureBlockSize. - totalLeft = totalLeft - erasureBlockSize } - return bufWriter, nil + + // Check blocks if they are all zero in length, we have corruption return error. + if checkBlockSize(enBlocks) == 0 { + return 0, errDataCorrupt + } + + // Verify if reconstruction is needed, proceed with reconstruction. + if !noReconstruct { + err := e.ReedSolomon.Reconstruct(enBlocks) + if err != nil { + return 0, err + } + // Verify reconstructed blocks (parity). + ok, err := e.ReedSolomon.Verify(enBlocks) + if err != nil { + return 0, err + } + if !ok { + // Blocks cannot be reconstructed, corrupted data. + err = errors.New("Verification failed after reconstruction, data likely corrupted.") + return 0, err + } + } + + // Get data blocks from encoded blocks. + dataBlocks, err := getDataBlocks(enBlocks, e.DataBlocks, len(buffer)) + if err != nil { + return 0, err + } + + // Copy data blocks. + copy(buffer, dataBlocks[bufferOffset:]) + + // Relenquish memory. + dataBlocks = nil + + return int64(len(buffer)), nil } diff --git a/erasure-utils.go b/erasure-utils.go index b992983b8..86dca895a 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -16,13 +16,31 @@ package main +import "github.com/klauspost/reedsolomon" + // getDataBlocks - fetches the data block only part of the input encoded blocks. -func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte) { - for _, block := range enBlocks[:dataBlocks] { - data = append(data, block...) +func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte, err error) { + if len(enBlocks) < dataBlocks { + return nil, reedsolomon.ErrTooFewShards } - data = data[:curBlockSize] - return data + size := 0 + blocks := enBlocks[:dataBlocks] + for _, block := range blocks { + size += len(block) + } + if size < curBlockSize { + return nil, reedsolomon.ErrShortData + } + write := curBlockSize + for _, block := range blocks { + if write < len(block) { + data = append(data, block[:write]...) + return data, nil + } + data = append(data, block...) + write -= len(block) + } + return data, nil } // checkBlockSize return the size of a single block. diff --git a/format-config-v1.go b/format-config-v1.go index f9547adb1..c4342a2f2 100644 --- a/format-config-v1.go +++ b/format-config-v1.go @@ -115,7 +115,7 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) // loadFormat - load format from disk. func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { - buffer := make([]byte, blockSize) + buffer := make([]byte, blockSizeV1) offset := int64(0) var n int64 n, err = disk.ReadFile(minioMetaBucket, formatConfigFile, offset, buffer) diff --git a/fs-v1-metadata.go b/fs-v1-metadata.go index c87d1061f..84d26c359 100644 --- a/fs-v1-metadata.go +++ b/fs-v1-metadata.go @@ -57,7 +57,7 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin // readFSMetadata - returns the object metadata `fs.json` content. func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) { - buffer := make([]byte, blockSize) + buffer := make([]byte, blockSizeV1) n, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0), buffer) if err != nil { return fsMetaV1{}, err diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 768925e8f..7d6fc63f5 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -305,7 +305,7 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s // Initialize md5 writer. md5Writer := md5.New() - var buf = make([]byte, blockSize) + var buf = make([]byte, blockSizeV1) for { n, err := io.ReadFull(data, buf) if err == io.EOF { @@ -476,7 +476,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } tempObj := path.Join(tmpMetaPrefix, uploadID, "object1") - var buffer = make([]byte, blockSize) + var buffer = make([]byte, blockSizeV1) // Loop through all parts, validate them and then commit to disk. for i, part := range parts { diff --git a/fs-v1.go b/fs-v1.go index fdc1ba091..063818023 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -160,8 +160,8 @@ func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length i for totalLeft > 0 { // Figure out the right blockSize as it was encoded before. var curBlockSize int64 - if blockSize < totalLeft { - curBlockSize = blockSize + if blockSizeV1 < totalLeft { + curBlockSize = blockSizeV1 } else { curBlockSize = totalLeft } @@ -212,10 +212,6 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { }, nil } -const ( - blockSize = 4 * 1024 * 1024 // 4MiB. -) - // PutObject - create an object. func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { // Verify if bucket is valid. @@ -245,7 +241,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } } else { // Allocate buffer. - buf := make([]byte, blockSize) + buf := make([]byte, blockSizeV1) for { n, rErr := data.Read(buf) if rErr == io.EOF { diff --git a/object-common.go b/object-common.go index 0bb5dde98..27747313e 100644 --- a/object-common.go +++ b/object-common.go @@ -21,6 +21,11 @@ import ( "sync" ) +const ( + // Block size used for all internal operations version 1. + blockSizeV1 = 10 * 1024 * 1024 // 10MiB. +) + // Common initialization needed for both object layers. func initObjectLayer(storageDisks ...StorageAPI) error { // This happens for the first time, but keep this here since this diff --git a/xl-v1-healing.go b/xl-v1-healing.go index 76f664921..b29feaed3 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -42,7 +42,7 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro go func(index int, disk StorageAPI) { defer wg.Done() offset := int64(0) - var buffer = make([]byte, blockSize) + var buffer = make([]byte, blockSizeV1) n, err := disk.ReadFile(bucket, xlMetaPath, offset, buffer) if err != nil { errs[index] = err diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 790530bed..2911956b2 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -25,9 +25,8 @@ import ( "time" ) -// Erasure block size. const ( - erasureBlockSize = 4 * 1024 * 1024 // 4MiB. + // Erasure related constants. erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde" erasureAlgorithmISAL = "isa-l/reedsolomon/cauchy" ) @@ -140,8 +139,8 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err // Count for errors encountered. var xlJSONErrCount = 0 - // Allocate 4MiB buffer. - buffer := make([]byte, blockSize) + // Allocate 10MiB buffer. + buffer := make([]byte, blockSizeV1) // Return the first successful lookup from a random list of disks. for xlJSONErrCount < len(xl.storageDisks) { @@ -168,7 +167,7 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost xlMeta.Erasure.DataBlocks = dataBlocks xlMeta.Erasure.ParityBlocks = parityBlocks - xlMeta.Erasure.BlockSize = erasureBlockSize + xlMeta.Erasure.BlockSize = blockSizeV1 xlMeta.Erasure.Distribution = randErasureDistribution(dataBlocks + parityBlocks) return xlMeta } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 67a5d6d4a..216a81242 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -81,7 +81,7 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI // Read `uploads.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() - var buffer = make([]byte, blockSize) + var buffer = make([]byte, blockSizeV1) // Allocate blockSized buffer. n, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0), buffer) if rErr != nil { errs[index] = rErr diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index c4c197d95..5698be491 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -146,7 +146,10 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s // Initialize md5 writer. md5Writer := md5.New() - buf := make([]byte, blockSize) + // Allocate blocksized buffer for reading. + buf := make([]byte, blockSizeV1) + + // Read until io.EOF, fill the allocated buf. for { var n int n, err = io.ReadFull(data, buf) @@ -167,6 +170,8 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s return "", toObjectErr(errUnexpected, bucket, object) } } + + // Calculate new md5sum. newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) if md5Hex != "" { if newMD5Hex != md5Hex { @@ -174,6 +179,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s } } + // Rename temporary part file to its final location. partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) if err != nil { diff --git a/xl-v1-object.go b/xl-v1-object.go index 604ace929..a0a6abd33 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "crypto/md5" "encoding/hex" "io" @@ -51,27 +50,42 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i if err != nil { return toObjectErr(err, bucket, object) } - totalLeft := length for ; partIndex < len(xlMeta.Parts); partIndex++ { part := xlMeta.Parts[partIndex] - var buffer io.Reader - buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size) - if err != nil { - return err - } - if int64(buffer.(*bytes.Buffer).Len()) > totalLeft { - if _, err := io.CopyN(writer, buffer, totalLeft); err != nil { + totalLeft := part.Size + beginOffset := int64(0) + for totalLeft > 0 { + var curBlockSize int64 + if xlMeta.Erasure.BlockSize < totalLeft { + curBlockSize = xlMeta.Erasure.BlockSize + } else { + curBlockSize = totalLeft + } + var buffer = make([]byte, curBlockSize) + var n int64 + n, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, beginOffset, buffer) + if err != nil { return err } - return nil + if length > int64(len(buffer)) { + var m int + m, err = writer.Write(buffer) + if err != nil { + return err + } + length -= int64(m) + } else { + _, err = writer.Write(buffer[:length]) + if err != nil { + return err + } + return nil + } + totalLeft -= partOffset + n + beginOffset += n + // Reset part offset to 0 to read rest of the parts from the beginning. + partOffset = 0 } - n, err := io.Copy(writer, buffer) - if err != nil { - return err - } - totalLeft -= n - // Reset part offset to 0 to read rest of the parts from the beginning. - partOffset = 0 } return nil } @@ -222,7 +236,8 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Initialize md5 writer. md5Writer := md5.New() - buf := make([]byte, blockSize) + // Allocated blockSized buffer for reading. + buf := make([]byte, blockSizeV1) for { var n int n, err = io.ReadFull(data, buf)