minio/cmd/xl-v1-healing.go
Harshavardhana 39331b6b4e xl: GetCheckSumInfo() shouldn't fail if hash not available. (#2984)
In a multipart upload scenario disks going down and coming backup
can lead to certain parts missing on the disk/server which was
going down. This is a valid case since these blocks can be
missing and should be healed through heal operation. But we are
not supposed to fail prematurely since we have enough data on
the other disks as well within read-quorum.

This fix relaxes previous assumption, fixes a major corruption
issue reproduced by @vadmeste.

Fixes #2976
2016-10-18 11:13:25 -07:00

224 lines
7.1 KiB
Go

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "sync"
// Heals a bucket if it doesn't exist on one of the disks.
func (xl xlObjects) HealBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return traceError(BucketNotFound{Bucket: bucket})
}
// Heal bucket - create buckets on disks where it does not exist.
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Initialize list of errors.
var dErrs = make([]error, len(xl.storageDisks))
// Make a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if disk == nil {
dErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
if _, err := disk.StatVol(bucket); err != nil {
if err != errVolumeNotFound {
dErrs[index] = traceError(err)
return
}
if err = disk.MakeVol(bucket); err != nil {
dErrs[index] = traceError(err)
}
}
}(index, disk)
}
// Wait for all make vol to finish.
wg.Wait()
// Do we have write quorum?.
if !isDiskQuorum(dErrs, xl.writeQuorum) {
// Purge successfully created buckets if we don't have writeQuorum.
xl.undoMakeBucket(bucket)
return toObjectErr(traceError(errXLWriteQuorum), bucket)
}
// Verify we have any other errors which should be returned as failure.
if reducedErr := reduceErrs(dErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}); reducedErr != nil {
return toObjectErr(reducedErr, bucket)
}
return nil
}
// HealObject heals a given object for all its missing entries.
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
// up again, heal on the object should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before healing.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
if err := reduceErrs(errs, nil); err != nil {
return toObjectErr(err, bucket, object)
}
if !xlShouldHeal(partsMetadata, errs) {
// There is nothing to heal.
return nil
}
// List of disks having latest version of the object.
latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
// List of disks having outdated version of the object or missing object.
outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs)
// Latest xlMetaV1 for reference.
latestMeta := pickValidXLMeta(partsMetadata, modTime)
for index, disk := range outDatedDisks {
// Before healing outdated disks, we need to remove xl.json
// and part files from "bucket/object/" so that
// rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds.
if disk == nil {
// Not an outdated disk.
continue
}
if errs[index] != nil {
// If there was an error (most likely errFileNotFound)
continue
}
// Outdated object with the same name exists that needs to be deleted.
outDatedMeta := partsMetadata[index]
// Delete all the parts.
for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ {
err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name))
if err != nil {
return traceError(err)
}
}
// Delete xl.json file.
err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
if err != nil {
return traceError(err)
}
}
// Reorder so that we have data disks first and parity disks next.
latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks)
outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks)
partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata)
// We write at temporary location and then rename to fianal location.
tmpID := getUUID()
// Checksum of the part files. checkSumInfos[index] will contain checksums
// of all the part files in the outDatedDisks[index]
checkSumInfos := make([][]checkSumInfo, len(outDatedDisks))
// Heal each part. erasureHealFile() will write the healed part to
// .minio/tmp/uuid/ which needs to be renamed later to the final location.
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
partName := latestMeta.Parts[partIndex].Name
partSize := latestMeta.Parts[partIndex].Size
erasure := latestMeta.Erasure
sumInfo := latestMeta.Erasure.GetCheckSumInfo(partName)
// Heal the part file.
checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
bucket, pathJoin(object, partName),
minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName),
partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
if err != nil {
return err
}
for index, sum := range checkSums {
if outDatedDisks[index] == nil {
continue
}
checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum})
}
}
// xl.json should be written to all the healed disks.
for index, disk := range outDatedDisks {
if disk == nil {
continue
}
partsMetadata[index] = latestMeta
partsMetadata[index].Erasure.Checksum = checkSumInfos[index]
}
// Generate and write `xl.json` generated from other disks.
err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
if err != nil {
return toObjectErr(err, bucket, object)
}
// Rename from tmp location to the actual location.
for _, disk := range outDatedDisks {
if disk == nil {
continue
}
// Remove any lingering partial data from current namespace.
err = disk.DeleteFile(bucket, retainSlash(object))
if err != nil && err != errFileNotFound {
return traceError(err)
}
// Attempt a rename now from healed data to final location.
err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
if err != nil {
return traceError(err)
}
}
return nil
}