XL: Add new metadata for checksum. (#1743)

This commit is contained in:
Harshavardhana 2016-05-24 17:48:58 -07:00 committed by Harshavardhana
parent b38b9fea79
commit 1e393c6c5b
7 changed files with 48 additions and 31 deletions

View file

@ -25,14 +25,24 @@
"release": "DEVELOPMENT.GOGET"
},
"erasure": {
"algorithm": "klauspost/reedsolomon/vandermonde",
"index": 2,
"distribution": [ 1, 3, 4, 2, 5, 8, 7, 6, 9 ],
"blockSize": 4194304,
"parity": 5,
"data": 5
},
"checksum": {
"enable": false,
"data": 5,
"checksum": [
{
"name": "object.00001",
"algorithm": "sha512",
"hash": "d9910e1492446389cfae6fe979db0245f96ca97ca2c7a25cab45805882004479320d866a47ea1f7be6a62625dd4de6caf7816009ef9d62779346d01a221b335c",
},
{
"name": "object.00002",
"algorithm": "sha512",
"hash": "d9910e1492446389cfae6fe979db0245f96ca97ca2c7a25cab45805882004479320d866a47ea1f7be6a62625dd4de6caf7816009ef9d62779346d01a221b335c",
},
],
},
"stat": {
"version": 0,

View file

@ -145,6 +145,7 @@ func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wclose
// CreateFile - create a file.
func (e erasure) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
// Input validation.
if !isValidVolname(volume) {
return nil, errInvalidArgument
}

View file

@ -136,8 +136,7 @@ func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int6
totalLeft = totalLeft - erasureBlockSize
continue
}
// Now get back the remaining offset if startOffset is
// negative.
// Now get back the remaining offset if startOffset is negative.
startOffset = startOffset + int64(len(dataBlocks))
}

View file

@ -52,8 +52,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
// Count for list errors encountered.
var listErrCount = 0
// Loop through and return the first success entry based on the
// selected random disk.
// Return the first success entry based on the selected random disk.
for listErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt, do not hit the same disk all the time.
randIndex := rand.Intn(len(xl.storageDisks) - 1)
@ -84,7 +83,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
}
// getRandomDisk - gives a random disk at any point in time from the
// available disk pool.
// available pool of disks.
func (xl xlObjects) getRandomDisk() (disk StorageAPI) {
randIndex := rand.Intn(len(xl.storageDisks) - 1)
disk = xl.storageDisks[randIndex] // Pick a random disk.

View file

@ -45,7 +45,7 @@ func (xl xlObjects) MakeBucket(bucket string) error {
// Wait for all make vol to finish.
wg.Wait()
// Loop through all the concocted errors.
// Look for specific errors and count them to be verified later.
for _, err := range dErrs {
if err == nil {
continue
@ -201,7 +201,7 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
}()
}
// Loop through all statVols, calculate the actual usage values.
// From all bucketsInfo, calculate the actual usage values.
var total, free int64
var bucketInfo BucketInfo
for _, bucketInfo = range bucketsInfo {
@ -211,6 +211,7 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
free += bucketInfo.Free
total += bucketInfo.Total
}
// Update the aggregated values.
bucketInfo.Free = free
bucketInfo.Total = total
@ -241,10 +242,10 @@ func (xl xlObjects) listBuckets() ([]BucketInfo, error) {
}(index, disk)
}
// For all the list volumes running in parallel to finish.
// Wait for all the list volumes running in parallel to finish.
wg.Wait()
// Loop through success vols and get aggregated usage values.
// From success vols map calculate aggregated usage values.
var volsInfo []VolInfo
var total, free int64
for _, volsInfo = range successVols {
@ -296,6 +297,7 @@ func (xl xlObjects) ListBuckets() ([]BucketInfo, error) {
if err != nil {
return nil, toObjectErr(err)
}
// Sort by bucket name before returning.
sort.Sort(byBucketName(bucketInfos))
return bucketInfos, nil
}
@ -334,7 +336,8 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// Wait for all the delete vols to finish.
wg.Wait()
// Loop through concocted errors and return anything unusual.
// Count the errors for known errors, return quickly if we found
// an unknown error.
for _, err := range dErrs {
if err != nil {
// We ignore error if errVolumeNotFound or errDiskNotFound
@ -346,7 +349,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
}
}
// Return err if all disks report volume not found.
// Return errVolumeNotFound if all disks report volume not found.
if volumeNotFoundErrCnt == len(xl.storageDisks) {
return toObjectErr(errVolumeNotFound, bucket)
}

View file

@ -27,7 +27,11 @@ import (
)
// Erasure block size.
const erasureBlockSize = 4 * 1024 * 1024 // 4MiB.
const (
erasureBlockSize = 4 * 1024 * 1024 // 4MiB.
erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde"
erasureAlgorithmISAL = "isa-l/reedsolomon/cauchy"
)
// objectPartInfo Info of each part kept in the multipart metadata
// file after CompleteMultipartUpload() is called.
@ -47,15 +51,18 @@ type xlMetaV1 struct {
Version int64 `json:"version"`
} `json:"stat"`
Erasure struct {
DataBlocks int `json:"data"`
ParityBlocks int `json:"parity"`
BlockSize int64 `json:"blockSize"`
Index int `json:"index"`
Distribution []int `json:"distribution"`
Algorithm string `json:"algorithm"`
DataBlocks int `json:"data"`
ParityBlocks int `json:"parity"`
BlockSize int64 `json:"blockSize"`
Index int `json:"index"`
Distribution []int `json:"distribution"`
Checksum []struct {
Name string `json:"name"`
Algorithm string `json:"algorithm"`
Hash string `json:"hash"`
} `json:"checksum"`
} `json:"erasure"`
Checksum struct {
Enable bool `json:"enable"`
} `json:"checksum"`
Minio struct {
Release string `json:"release"`
} `json:"minio"`
@ -234,6 +241,7 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
xlMeta.Version = "1"
xlMeta.Format = "xl"
xlMeta.Minio.Release = minioReleaseTag
xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost
xlMeta.Erasure.DataBlocks = xl.dataBlocks
xlMeta.Erasure.ParityBlocks = xl.parityBlocks
xlMeta.Erasure.BlockSize = erasureBlockSize
@ -278,7 +286,7 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
wg.Wait()
// FIXME: check for quorum.
// Loop through concocted errors and return the first one.
// Return the first error.
for _, err := range mErrs {
if err == nil {
continue

View file

@ -37,8 +37,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
return nil, toObjectErr(err, bucket, object)
}
totalObjectSize := xlMeta.Stat.Size // Total object size.
// Hold a read lock once more which can be released after the following go-routine ends.
// We hold RLock once more because the current function would return before the go routine below
// executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call).
@ -47,7 +45,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
defer nsMutex.RUnlock(bucket, object)
for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex]
r, err := xl.erasureDisk.ReadFile(bucket, pathJoin(object, part.Name), offset, totalObjectSize)
r, err := xl.erasureDisk.ReadFile(bucket, pathJoin(object, part.Name), offset, part.Size)
if err != nil {
fileWriter.CloseWithError(err)
return
@ -96,8 +94,7 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er
// Count for errors encountered.
var xlJSONErrCount = 0
// Loop through and return the first success entry based on the
// selected random disk.
// Return the first success entry based on the selected random disk.
for xlJSONErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt, do not hit the same disk all the time.
disk := xl.getRandomDisk() // Pick a random disk.
@ -314,7 +311,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
wg.Wait()
var fileNotFoundCnt, deleteFileErr int
// Loop through all the concocted errors.
// Count for specific errors.
for _, err := range dErrs {
if err == nil {
continue