Refactor of xl.PutObjectPart and erasureCreateFile. (#2193)

* XL: Refactor of xl.PutObjectPart and erasureCreateFile.

* GetCheckSum and AddCheckSum methods for xlMetaV1

* Simple unit test case for erasureCreateFile()
This commit is contained in:
Krishna Srinivas 2016-07-15 03:29:01 +05:30 committed by Harshavardhana
parent af6109f89a
commit b090c7112e
8 changed files with 168 additions and 125 deletions

View file

@ -28,22 +28,20 @@ import (
// erasureCreateFile - writes an entire stream by erasure coding to
// all the disks, writes also calculate individual block's checksum
// for future bit-rot protection.
func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo, writeQuorum int) (newEInfos []erasureInfo, size int64, err error) {
// Just pick one eInfo.
eInfo := pickValidErasureInfo(eInfos)
func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, writeQuorum int) (size int64, checkSums []string, err error) {
// Allocated blockSized buffer for reading.
buf := make([]byte, eInfo.BlockSize)
buf := make([]byte, blockSize)
hashWriters := newHashWriters(len(disks))
// Read until io.EOF, erasure codes data and writes to all disks.
for {
var blocks [][]byte
n, rErr := io.ReadFull(data, buf)
n, rErr := io.ReadFull(reader, buf)
// FIXME: this is a bug in Golang, n == 0 and err ==
// io.ErrUnexpectedEOF for io.ReadFull function.
if n == 0 && rErr == io.ErrUnexpectedEOF {
return nil, 0, rErr
return 0, nil, rErr
}
if rErr == io.EOF {
// We have reached EOF on the first byte read, io.Reader
@ -51,56 +49,38 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName
// data. Will create a 0byte file instead.
if size == 0 {
blocks = make([][]byte, len(disks))
rErr = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum)
rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum)
if rErr != nil {
return nil, 0, rErr
return 0, nil, rErr
}
} // else we have reached EOF after few reads, no need to
// add an additional 0bytes at the end.
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
return nil, 0, rErr
return 0, nil, rErr
}
if n > 0 {
// Returns encoded blocks.
var enErr error
blocks, enErr = encodeData(buf[0:n], eInfo.DataBlocks, eInfo.ParityBlocks)
blocks, enErr = encodeData(buf[0:n], dataBlocks, parityBlocks)
if enErr != nil {
return nil, 0, enErr
return 0, nil, enErr
}
// Write to all disks.
if err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum); err != nil {
return nil, 0, err
if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil {
return 0, nil, err
}
size += int64(n)
}
}
// Save the checksums.
checkSums := make([]checkSumInfo, len(disks))
for index := range disks {
blockIndex := eInfo.Distribution[index] - 1
checkSums[blockIndex] = checkSumInfo{
Name: partName,
Algorithm: "blake2b",
Hash: hex.EncodeToString(hashWriters[blockIndex].Sum(nil)),
}
checkSums = make([]string, len(disks))
for i := range checkSums {
checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil))
}
// Erasure info update for checksum for each disks.
newEInfos = make([]erasureInfo, len(disks))
for index, eInfo := range eInfos {
if eInfo.IsValid() {
blockIndex := eInfo.Distribution[index] - 1
newEInfos[index] = eInfo
newEInfos[index].Checksum = append(newEInfos[index].Checksum, checkSums[blockIndex])
}
}
// Return newEInfos.
return newEInfos, size, nil
return size, checkSums, nil
}
// encodeData - encodes incoming data buffer into
@ -128,7 +108,7 @@ func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, erro
}
// appendFile - append data buffer at path.
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash, writeQuorum int) (err error) {
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hashWriters []hash.Hash, writeQuorum int) (err error) {
var wg = &sync.WaitGroup{}
var wErrs = make([]error, len(disks))
// Write encoded data to quorum disks in parallel.
@ -140,16 +120,14 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist
// Write encoded data in routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Pick the block from the distribution.
blockIndex := distribution[index] - 1
wErr := disk.AppendFile(volume, path, enBlocks[blockIndex])
wErr := disk.AppendFile(volume, path, enBlocks[index])
if wErr != nil {
wErrs[index] = wErr
return
}
// Calculate hash for each blocks.
hashWriters[blockIndex].Write(enBlocks[blockIndex])
hashWriters[index].Write(enBlocks[index])
// Successfully wrote.
wErrs[index] = nil

View file

@ -65,18 +65,36 @@ func isSuccessDataBlocks(enBlocks [][]byte, dataBlocks int) bool {
return successDataBlocksCount >= dataBlocks
}
// Return ordered partsMetadata depeinding on distribution.
func getOrderedPartsMetadata(distribution []int, partsMetadata []xlMetaV1) (orderedPartsMetadata []xlMetaV1) {
orderedPartsMetadata = make([]xlMetaV1, len(partsMetadata))
for index := range partsMetadata {
blockIndex := distribution[index]
orderedPartsMetadata[blockIndex-1] = partsMetadata[index]
}
return orderedPartsMetadata
}
// getOrderedDisks - get ordered disks from erasure distribution.
// returns ordered slice of disks from their actual distribution.
func getOrderedDisks(distribution []int, disks []StorageAPI, blockCheckSums []checkSumInfo) (orderedDisks []StorageAPI, orderedBlockCheckSums []checkSumInfo) {
func getOrderedDisks(distribution []int, disks []StorageAPI) (orderedDisks []StorageAPI) {
orderedDisks = make([]StorageAPI, len(disks))
orderedBlockCheckSums = make([]checkSumInfo, len(disks))
// From disks gets ordered disks.
for index := range disks {
blockIndex := distribution[index]
orderedDisks[blockIndex-1] = disks[index]
}
return orderedDisks
}
// Return ordered CheckSums depending on the distribution.
func getOrderedCheckSums(distribution []int, blockCheckSums []checkSumInfo) (orderedBlockCheckSums []checkSumInfo) {
orderedBlockCheckSums = make([]checkSumInfo, len(blockCheckSums))
for index := range blockCheckSums {
blockIndex := distribution[index]
orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index]
}
return orderedDisks, orderedBlockCheckSums
return orderedBlockCheckSums
}
// Return readable disks slice from which we can read parallelly.
@ -188,7 +206,8 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
// []orderedDisks will have first eInfo.DataBlocks disks as data
// disks and rest will be parity.
orderedDisks, orderedBlockCheckSums := getOrderedDisks(eInfo.Distribution, disks, blockCheckSums)
orderedDisks := getOrderedDisks(eInfo.Distribution, disks)
orderedBlockCheckSums := getOrderedCheckSums(eInfo.Distribution, blockCheckSums)
// bitRotVerify verifies if the file on a particular disk doesn't have bitrot
// by verifying the hash of the contents of the file.

View file

@ -109,9 +109,8 @@ func testGetReadDisks(t *testing.T, xl xlObjects) {
// actual distribution.
func testGetOrderedDisks(t *testing.T, xl xlObjects) {
disks := xl.storageDisks
blockCheckSums := make([]checkSumInfo, len(disks))
distribution := []int{16, 14, 12, 10, 8, 6, 4, 2, 1, 3, 5, 7, 9, 11, 13, 15}
orderedDisks, _ := getOrderedDisks(distribution, disks, blockCheckSums)
orderedDisks := getOrderedDisks(distribution, disks)
// From the "distribution" above you can notice that:
// 1st data block is in the 9th disk (i.e distribution index 8)
// 2nd data block is in the 8th disk (i.e distribution index 7) and so on.

View file

@ -18,6 +18,9 @@ package main
import (
"bytes"
"crypto/rand"
"io/ioutil"
"os"
"testing"
)
@ -183,3 +186,70 @@ func TestErasureDecode(t *testing.T) {
}
}
}
// Simulates a faulty disk for AppendFile()
type AppendDiskDown struct {
*posix
}
func (a AppendDiskDown) AppendFile(volume string, path string, buf []byte) error {
return errFaultyDisk
}
// Test erasureCreateFile()
// TODO:
// * check when more disks are down.
// * verify written content by using erasureReadFile.
func TestErasureCreateFile(t *testing.T) {
// Initialize environment needed for the test.
dataBlocks := 7
parityBlocks := 7
blockSize := int64(blockSizeV1)
diskPaths := make([]string, dataBlocks+parityBlocks)
disks := make([]StorageAPI, len(diskPaths))
for i := range diskPaths {
var err error
diskPaths[i], err = ioutil.TempDir(os.TempDir(), "minio-")
if err != nil {
t.Fatal("Unable to create tmp dir", err)
}
defer removeAll(diskPaths[i])
disks[i], err = newPosix(diskPaths[i])
if err != nil {
t.Fatal(err)
}
err = disks[i].MakeVol("testbucket")
if err != nil {
t.Fatal(err)
}
}
// Prepare a slice of 1MB with random data.
data := make([]byte, 1*1024*1024)
_, err := rand.Read(data)
if err != nil {
t.Fatal(err)
}
// Test when all disks are up.
size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
}
// Two disks down.
disks[4] = AppendDiskDown{disks[4].(*posix)}
disks[5] = AppendDiskDown{disks[5].(*posix)}
// Test when two disks are down.
size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
}
}

View file

@ -131,10 +131,9 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
// Returns slice of online disks needed.
// - slice returing readable disks.
// - xlMetaV1
// - bool value indicating if healing is needed.
func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) {
onlineDisks = make([]StorageAPI, len(xl.storageDisks))
// - modTime of the Object
func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) {
onlineDisks = make([]StorageAPI, len(disks))
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)
@ -145,7 +144,7 @@ func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onl
// Create a new online disks slice, which have common uuid.
for index, t := range modTimes {
if t == modTime {
onlineDisks[index] = xl.storageDisks[index]
onlineDisks[index] = disks[index]
} else {
onlineDisks[index] = nil
}

View file

@ -163,6 +163,27 @@ func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
sort.Sort(byObjectPartNumber(m.Parts))
}
// AddCheckSum - add checksum of a part.
func (m *xlMetaV1) AddCheckSum(partName, algorithm, checkSum string) {
for i, sum := range m.Erasure.Checksum {
if sum.Name == partName {
m.Erasure.Checksum[i] = checkSumInfo{partName, "blake2b", checkSum}
return
}
}
m.Erasure.Checksum = append(m.Erasure.Checksum, checkSumInfo{partName, "blake2b", checkSum})
}
// GetCheckSum - get checksum of a part.
func (m *xlMetaV1) GetCheckSum(partName string) (checkSum, algorithm string, err error) {
for _, sum := range m.Erasure.Checksum {
if sum.Name == partName {
return sum.Hash, sum.Algorithm, nil
}
}
return "", "", errUnexpected
}
// ObjectToPartOffset - translate offset of an object to offset of its individual part.
func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) {
if offset == 0 {
@ -187,10 +208,11 @@ func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset in
// pickValidXLMeta - picks one valid xlMeta content and returns from a
// slice of xlmeta content. If no value is found this function panics
// and dies.
func pickValidXLMeta(xlMetas []xlMetaV1) xlMetaV1 {
for _, xlMeta := range xlMetas {
if xlMeta.IsValid() {
return xlMeta
func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) xlMetaV1 {
// Pick latest valid metadata.
for _, meta := range metaArr {
if meta.IsValid() && meta.Stat.ModTime == modTime {
return meta
}
}
panic("Unable to look for valid XL metadata content")

View file

@ -354,10 +354,13 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
// List all online disks.
onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs)
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata)
xlMeta := pickValidXLMeta(partsMetadata, modTime)
onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks)
partsMetadata = getOrderedPartsMetadata(xlMeta.Erasure.Distribution, partsMetadata)
// Need a unique name for the part being written in minioMetaBucket to
// accommodate concurrent PutObjectPart requests
@ -379,17 +382,8 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Construct a tee reader for md5sum.
teeReader := io.TeeReader(data, md5Writer)
// Collect all the previous erasure infos across the disk.
var eInfos []erasureInfo
for index := range onlineDisks {
eInfos = append(eInfos, partsMetadata[index].Erasure)
}
// Erasure code data and write across all disks.
newEInfos, sizeWritten, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos, xl.writeQuorum)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
}
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum)
// For size == -1, perhaps client is sending in chunked encoding
// set the size as size that was actually written.
@ -421,7 +415,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
// Validates if upload ID exists again.
// Validate again if upload ID still exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
}
@ -433,34 +427,17 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", toObjectErr(err, minioMetaBucket, partPath)
}
// Read metadata (again) associated with the object from all disks.
// Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath)
if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
var updatedEInfos []erasureInfo
for index := range partsMetadata {
updatedEInfos = append(updatedEInfos, partsMetadata[index].Erasure)
}
for index, eInfo := range newEInfos {
if eInfo.IsValid() {
// Use a map to find union of checksums of parts that
// we concurrently written and committed before this
// part. N B For a different, concurrent upload of the
// same part, the last written content remains.
finalChecksums := unionChecksumInfos(newEInfos[index].Checksum, updatedEInfos[index].Checksum, partSuffix)
updatedEInfos[index] = eInfo
updatedEInfos[index].Checksum = finalChecksums
}
}
// Get current highest version based on re-read partsMetadata.
onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
xlMeta = pickValidXLMeta(partsMetadata)
// Get current highest version based on re-read partsMetadata.
onlineDisks, _ = xl.listOnlineDisks(partsMetadata, errs)
xlMeta = pickValidXLMeta(partsMetadata, modTime)
// Once part is successfully committed, proceed with updating XL metadata.
xlMeta.Stat.ModTime = time.Now().UTC()
@ -468,10 +445,12 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Add the current part.
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
// Update `xl.json` content for each disks.
for index := range partsMetadata {
for index, disk := range onlineDisks {
if disk == nil {
continue
}
partsMetadata[index].Parts = xlMeta.Parts
partsMetadata[index].Erasure = updatedEInfos[index]
partsMetadata[index].AddCheckSum(partSuffix, "blake2b", checkSums[index])
}
// Write all the checksum metadata.
@ -630,11 +609,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
_, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
// Calculate full object size.
var objectSize int64
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata)
xlMeta := pickValidXLMeta(partsMetadata, modTime)
// Save current xl meta for validation.
var currentXLMeta = xlMeta

View file

@ -76,7 +76,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
}
// List all online disks.
onlineDisks, modTime := xl.listOnlineDisks(metaArr, errs)
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, metaArr, errs)
// Pick latest valid metadata.
var xlMeta xlMetaV1
@ -381,28 +381,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix)
tempObj := uniqueID
nsMutex.RLock(bucket, object)
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
nsMutex.RUnlock(bucket, object)
// Do we have write quroum?.
if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
// errFileNotFound is handled specially since it's OK for the object to
// not exists in the namespace yet.
if errCount, reducedErr := reduceErrs(errs); reducedErr != nil && reducedErr != errFileNotFound {
if errCount < xl.writeQuorum {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
return "", toObjectErr(reducedErr, bucket, object)
}
// List all online disks.
onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs)
var mw io.Writer
// Initialize md5 writer.
md5Writer := md5.New()
@ -447,14 +425,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Initialize xl meta.
xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks)
// Collect all the previous erasure infos across the disk.
var eInfos []erasureInfo
for range onlineDisks {
eInfos = append(eInfos, xlMeta.Erasure)
}
onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks)
// Erasure code and write across all disks.
newEInfos, sizeWritten, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "part.1", teeReader, eInfos, xl.writeQuorum)
// Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xl.writeQuorum)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
}
@ -531,10 +505,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Add the final part.
xlMeta.AddObjectPart(1, "part.1", newMD5Hex, xlMeta.Stat.Size)
partsMetadata := make([]xlMetaV1, len(xl.storageDisks))
// Update `xl.json` content on each disks.
for index := range partsMetadata {
partsMetadata[index] = xlMeta
partsMetadata[index].Erasure = newEInfos[index]
partsMetadata[index].AddCheckSum("part.1", "blake2b", checkSums[index])
}
// Write unique `xl.json` for each disk.