contoller: Implement controlled healing and trigger (#2381)

This patch introduces new command line 'control' 

- minio control 

TO manage minio server connecting through GoRPC API frontend.

- minio control heal 

Is implemented for healing objects.
This commit is contained in:
Krishna Srinivas 2016-08-18 00:06:33 +05:30 committed by Harshavardhana
parent 0b7dfab17a
commit e2498edb45
16 changed files with 846 additions and 27 deletions

134
control-main.go Normal file
View file

@ -0,0 +1,134 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"strings"
"net/rpc"
"net/url"
"github.com/minio/cli"
)
// "minio control" command.
var controlCmd = cli.Command{
Name: "control",
Usage: "Control and manage minio server.",
Action: mainControl,
Subcommands: []cli.Command{
healCmd,
},
}
func mainControl(c *cli.Context) {
cli.ShowCommandHelp(c, "")
}
var healCmd = cli.Command{
Name: "heal",
Usage: "To heal objects.",
Action: healControl,
CustomHelpTemplate: `NAME:
minio {{.Name}} - {{.Usage}}
USAGE:
minio {{.Name}} heal
EAMPLES:
1. Heal an object.
$ minio control heal http://localhost:9000/songs/classical/western/piano.mp3
2. Heal all objects in a bucket recursively.
$ minio control heal http://localhost:9000/songs
3. Heall all objects with a given prefix recursively.
$ minio control heal http://localhost:9000/songs/classical/
`,
}
// "minio control heal" entry point.
func healControl(c *cli.Context) {
// Parse bucket and object from url.URL.Path
parseBucketObject := func(path string) (bucketName string, objectName string) {
splits := strings.SplitN(path, string(slashSeparator), 3)
switch len(splits) {
case 0, 1:
bucketName = ""
objectName = ""
case 2:
bucketName = splits[1]
objectName = ""
case 3:
bucketName = splits[1]
objectName = splits[2]
}
return bucketName, objectName
}
if len(c.Args()) != 1 {
cli.ShowCommandHelpAndExit(c, "heal", 1)
}
parsedURL, err := url.ParseRequestURI(c.Args()[0])
fatalIf(err, "Unable to parse URL")
bucketName, objectName := parseBucketObject(parsedURL.Path)
if bucketName == "" {
cli.ShowCommandHelpAndExit(c, "heal", 1)
}
client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, healPath)
fatalIf(err, "Unable to connect to %s", parsedURL.Host)
// If object does not have trailing "/" then it's an object, hence heal it.
if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) {
fmt.Printf("Healing : /%s/%s", bucketName, objectName)
args := &HealObjectArgs{bucketName, objectName}
reply := &HealObjectReply{}
err = client.Call("Heal.HealObject", args, reply)
fatalIf(err, "RPC Heal.HealObject call failed")
fmt.Println()
return
}
// Recursively list and heal the objects.
prefix := objectName
marker := ""
for {
args := HealListArgs{bucketName, prefix, marker, "", 1000}
reply := &HealListReply{}
err = client.Call("Heal.ListObjects", args, reply)
fatalIf(err, "RPC Heal.ListObjects call failed")
// Heal the objects returned in the ListObjects reply.
for _, obj := range reply.Objects {
fmt.Printf("Healing : /%s/%s", bucketName, obj)
reply := &HealObjectReply{}
err = client.Call("Heal.HealObject", HealObjectArgs{bucketName, obj}, reply)
fatalIf(err, "RPC Heal.HealObject call failed")
fmt.Println()
}
if !reply.IsTruncated {
// End of listing.
break
}
marker = reply.NextMarker
}
}

84
erasure-healfile.go Normal file
View file

@ -0,0 +1,84 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "encoding/hex"
// Heals the erasure coded file. reedsolomon.Reconstruct() is used to reconstruct the missing parts.
func erasureHealFile(latestDisks []StorageAPI, outDatedDisks []StorageAPI, volume, path, healBucket, healPath string, size int64, blockSize int64, dataBlocks int, parityBlocks int, algo string) (checkSums []string, err error) {
var offset int64
remainingSize := size
// Hash for bitrot protection.
hashWriters := newHashWriters(len(outDatedDisks), bitRotAlgo)
for remainingSize > 0 {
curBlockSize := blockSize
if remainingSize < curBlockSize {
curBlockSize = remainingSize
}
// Calculate the block size that needs to be read from each disk.
curEncBlockSize := getChunkSize(curBlockSize, dataBlocks)
// Memory for reading data from disks and reconstructing missing data using erasure coding.
enBlocks := make([][]byte, len(latestDisks))
// Read data from the latest disks.
// FIXME: no need to read from all the disks. dataBlocks+1 is enough.
for index, disk := range latestDisks {
if disk == nil {
continue
}
enBlocks[index] = make([]byte, curEncBlockSize)
_, err := disk.ReadFile(volume, path, offset, enBlocks[index])
if err != nil {
enBlocks[index] = nil
}
}
// Reconstruct missing data.
err := decodeData(enBlocks, dataBlocks, parityBlocks)
if err != nil {
return nil, err
}
// Write to the healPath file.
for index, disk := range outDatedDisks {
if disk == nil {
continue
}
err := disk.AppendFile(healBucket, healPath, enBlocks[index])
if err != nil {
return nil, err
}
hashWriters[index].Write(enBlocks[index])
}
remainingSize -= curBlockSize
offset += curEncBlockSize
}
// Checksums for the bit rot.
checkSums = make([]string, len(outDatedDisks))
for index, disk := range outDatedDisks {
if disk == nil {
continue
}
checkSums[index] = hex.EncodeToString(hashWriters[index].Sum(nil))
}
return checkSums, nil
}

123
erasure-healfile_test.go Normal file
View file

@ -0,0 +1,123 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"bytes"
"crypto/rand"
"os"
"path"
"testing"
)
// Test erasureHealFile()
func TestErasureHealFile(t *testing.T) {
// Initialize environment needed for the test.
dataBlocks := 7
parityBlocks := 7
blockSize := int64(blockSizeV1)
setup, err := newErasureTestSetup(dataBlocks, parityBlocks, blockSize)
if err != nil {
t.Error(err)
return
}
defer setup.Remove()
disks := setup.disks
// Prepare a slice of 1MB with random data.
data := make([]byte, 1*1024*1024)
_, err = rand.Read(data)
if err != nil {
t.Fatal(err)
}
// Create a test file.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
}
latest := make([]StorageAPI, len(disks)) // Slice of latest disks
outDated := make([]StorageAPI, len(disks)) // Slice of outdated disks
// Test case when one part needs to be healed.
dataPath := path.Join(setup.diskPaths[0], "testbucket", "testobject1")
err = os.Remove(dataPath)
if err != nil {
t.Fatal(err)
}
copy(latest, disks)
latest[0] = nil
outDated[0] = disks[0]
healCheckSums, err := erasureHealFile(latest, outDated, "testbucket", "testobject1", "testbucket", "testobject1", 1*1024*1024, blockSize, dataBlocks, parityBlocks, bitRotAlgo)
// Checksum of the healed file should match.
if checkSums[0] != healCheckSums[0] {
t.Error("Healing failed, data does not match.")
}
// Test case when parityBlocks number of disks need to be healed.
// Should succeed.
copy(latest, disks)
for index := 0; index < parityBlocks; index++ {
dataPath := path.Join(setup.diskPaths[index], "testbucket", "testobject1")
err = os.Remove(dataPath)
if err != nil {
t.Fatal(err)
}
latest[index] = nil
outDated[index] = disks[index]
}
healCheckSums, err = erasureHealFile(latest, outDated, "testbucket", "testobject1", "testbucket", "testobject1", 1*1024*1024, blockSize, dataBlocks, parityBlocks, bitRotAlgo)
if err != nil {
t.Fatal(err)
}
// Checksums of the healed files should match.
for index := 0; index < parityBlocks; index++ {
if checkSums[index] != healCheckSums[index] {
t.Error("Healing failed, data does not match.")
}
}
for index := dataBlocks; index < len(disks); index++ {
if healCheckSums[index] != "" {
t.Errorf("expected healCheckSums[%d] to be empty", index)
}
}
// Test case when parityBlocks+1 number of disks need to be healed.
// Should fail.
copy(latest, disks)
for index := 0; index < parityBlocks+1; index++ {
dataPath := path.Join(setup.diskPaths[index], "testbucket", "testobject1")
err = os.Remove(dataPath)
if err != nil {
t.Fatal(err)
}
latest[index] = nil
outDated[index] = disks[index]
}
healCheckSums, err = erasureHealFile(latest, outDated, "testbucket", "testobject1", "testbucket", "testobject1", 1*1024*1024, blockSize, dataBlocks, parityBlocks, bitRotAlgo)
if err == nil {
t.Error("Expected erasureHealFile() to fail when the number of available disks <= parityBlocks")
}
}

View file

@ -68,8 +68,9 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
} }
var walkResultCh chan treeWalkResult var walkResultCh chan treeWalkResult
var endWalkCh chan struct{} var endWalkCh chan struct{}
heal := false // true only for xl.ListObjectsHeal()
if maxUploads > 0 { if maxUploads > 0 {
walkResultCh, endWalkCh = fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) walkResultCh, endWalkCh = fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal})
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := fs.isMultipartUpload isLeaf := fs.isMultipartUpload
@ -144,7 +145,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if !eof { if !eof {
// Save the go-routine state in the pool so that it can continue from where it left off on // Save the go-routine state in the pool so that it can continue from where it left off on
// the next request. // the next request.
fs.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix}, walkResultCh, endWalkCh) fs.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkResultCh, endWalkCh)
} }
result.IsTruncated = !eof result.IsTruncated = !eof

View file

@ -574,7 +574,8 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
recursive = false recursive = false
} }
walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix}) heal := false // true only for xl.ListObjectsHeal()
walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix, heal})
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := func(bucket, object string) bool { isLeaf := func(bucket, object string) bool {
@ -616,7 +617,7 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
} }
i++ i++
} }
params := listParams{bucket, recursive, nextMarker, prefix} params := listParams{bucket, recursive, nextMarker, prefix, heal}
if !eof { if !eof {
fs.listPool.Set(params, walkResultCh, endWalkCh) fs.listPool.Set(params, walkResultCh, endWalkCh)
} }
@ -643,3 +644,13 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) return fs.listObjects(bucket, prefix, marker, delimiter, maxKeys)
} }
// HealObject - no-op for fs. Valid only for XL.
func (fs fsObjects) HealObject(bucket, object string) error {
return NotImplemented{}
}
// HealListObjects - list objects for healing. Valid only for XL
func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return ListObjectsInfo{}, NotImplemented{}
}

View file

@ -105,6 +105,7 @@ func registerApp() *cli.App {
registerCommand(serverCmd) registerCommand(serverCmd)
registerCommand(versionCmd) registerCommand(versionCmd)
registerCommand(updateCmd) registerCommand(updateCmd)
registerCommand(controlCmd)
// Set up app. // Set up app.
app := cli.NewApp() app := cli.NewApp()

View file

@ -257,3 +257,10 @@ type PartTooSmall struct {
func (e PartTooSmall) Error() string { func (e PartTooSmall) Error() string {
return fmt.Sprintf("Part size for %d should be atleast 5MB", e.PartNumber) return fmt.Sprintf("Part size for %d should be atleast 5MB", e.PartNumber)
} }
// NotImplemented If a feature is not implemented
type NotImplemented struct{}
func (e NotImplemented) Error() string {
return "Not Implemented"
}

View file

@ -30,12 +30,14 @@ type ObjectLayer interface {
ListBuckets() (buckets []BucketInfo, err error) ListBuckets() (buckets []BucketInfo, err error)
DeleteBucket(bucket string) error DeleteBucket(bucket string) error
ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
// Object operations. // Object operations.
GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error)
GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error)
PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string) (md5 string, err error) PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string) (md5 string, err error)
DeleteObject(bucket, object string) error DeleteObject(bucket, object string) error
HealObject(bucket, object string) error
// Multipart operations. // Multipart operations.
ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error)

View file

@ -95,6 +95,12 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
// Register all routers. // Register all routers.
registerStorageRPCRouter(mux, storageRPC) registerStorageRPCRouter(mux, storageRPC)
// FIXME: till net/rpc auth is brought in "minio control" can be enabled only though
// this env variable.
if os.Getenv("MINIO_CONTROL") != "" {
registerControlRPCRouter(mux, objAPI)
}
// set environmental variable MINIO_BROWSER=off to disable minio web browser. // set environmental variable MINIO_BROWSER=off to disable minio web browser.
// By default minio web browser is enabled. // By default minio web browser is enabled.
if !strings.EqualFold(os.Getenv("MINIO_BROWSER"), "off") { if !strings.EqualFold(os.Getenv("MINIO_BROWSER"), "off") {

85
rpc-control.go Normal file
View file

@ -0,0 +1,85 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"net/rpc"
router "github.com/gorilla/mux"
)
// Routes paths for "minio control" commands.
const (
controlRPCPath = reservedBucket + "/control"
healPath = controlRPCPath + "/heal"
)
// Register control RPC handlers.
func registerControlRPCRouter(mux *router.Router, objAPI ObjectLayer) {
healRPCServer := rpc.NewServer()
healRPCServer.RegisterName("Heal", &healHandler{objAPI})
mux.Path(healPath).Handler(healRPCServer)
}
// Handler for object healing.
type healHandler struct {
ObjectAPI ObjectLayer
}
// HealListArgs - argument for ListObjects RPC.
type HealListArgs struct {
Bucket string
Prefix string
Marker string
Delimiter string
MaxKeys int
}
// HealListReply - reply by ListObjects RPC.
type HealListReply struct {
IsTruncated bool
NextMarker string
Objects []string
}
// ListObjects - list objects.
func (h healHandler) ListObjects(arg *HealListArgs, reply *HealListReply) error {
info, err := h.ObjectAPI.ListObjectsHeal(arg.Bucket, arg.Prefix, arg.Marker, arg.Delimiter, arg.MaxKeys)
if err != nil {
return err
}
reply.IsTruncated = info.IsTruncated
reply.NextMarker = info.NextMarker
for _, obj := range info.Objects {
reply.Objects = append(reply.Objects, obj.Name)
}
return nil
}
// HealObjectArgs - argument for HealObject RPC.
type HealObjectArgs struct {
Bucket string
Object string
}
// HealObjectReply - reply by HealObject RPC.
type HealObjectReply struct{}
// HealObject - heal the object.
func (h healHandler) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error {
return h.ObjectAPI.HealObject(arg.Bucket, arg.Object)
}

View file

@ -33,6 +33,7 @@ type listParams struct {
recursive bool recursive bool
marker string marker string
prefix string prefix string
heal bool
} }
// errWalkAbort - returned by doTreeWalk() if it returns prematurely. // errWalkAbort - returned by doTreeWalk() if it returns prematurely.

View file

@ -77,25 +77,6 @@ func listObjectModtimes(partsMetadata []xlMetaV1, errs []error) (modTimes []time
return modTimes return modTimes
} }
func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
onlineDiskCount := diskCount(onlineDisks)
// If online disks count is lesser than configured disks, most
// probably we need to heal the file, additionally verify if the
// count is lesser than readQuorum, if not we throw an error.
if onlineDiskCount < len(xl.storageDisks) {
// Online disks lesser than total storage disks, needs to be
// healed. unless we do not have readQuorum.
heal = true
// Verify if online disks count are lesser than readQuorum
// threshold, return an error.
if onlineDiskCount < xl.readQuorum {
errorIf(errXLReadQuorum, "Unable to establish read quorum, disks are offline.")
return false
}
}
return heal
}
// Returns slice of online disks needed. // Returns slice of online disks needed.
// - slice returing readable disks. // - slice returing readable disks.
// - modTime of the Object // - modTime of the Object
@ -118,3 +99,50 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error)
} }
return onlineDisks, modTime return onlineDisks, modTime
} }
// Return disks with the outdated or missing object.
func outDatedDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (outDatedDisks []StorageAPI) {
outDatedDisks = make([]StorageAPI, len(disks))
latestDisks, _ := listOnlineDisks(disks, partsMetadata, errs)
for index, disk := range latestDisks {
if errs[index] == errFileNotFound {
outDatedDisks[index] = disks[index]
continue
}
if errs[index] != nil {
continue
}
if disk == nil {
outDatedDisks[index] = disks[index]
}
}
return outDatedDisks
}
// Return xlMetaV1 of the latest version of the object.
func xlLatestMetadata(partsMetadata []xlMetaV1, errs []error) (latestMeta xlMetaV1) {
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)
// Reduce list of UUIDs to a single common value.
modTime := commonTime(modTimes)
return pickValidXLMeta(partsMetadata, modTime)
}
// Returns if the object should be healed.
func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool {
modTime := commonTime(listObjectModtimes(partsMetadata, errs))
for index := range partsMetadata {
if errs[index] == errFileNotFound {
return true
}
if errs[index] != nil {
continue
}
if modTime != partsMetadata[index].Stat.ModTime {
return true
}
}
return false
}

206
xl-v1-list-objects-heal.go Normal file
View file

@ -0,0 +1,206 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"path"
"sort"
"strings"
)
func listDirHealFactory(disks ...StorageAPI) listDirFunc {
// Returns sorted merged entries from all the disks.
listDir := func(bucket, prefixDir, prefixEntry string) (mergedentries []string, delayIsLeaf bool, err error) {
for _, disk := range disks {
var entries []string
var newEntries []string
entries, err = disk.ListDir(bucket, prefixDir)
if err != nil {
// Skip the disk of listDir returns error.
continue
}
for i, entry := range entries {
if strings.HasSuffix(entry, slashSeparator) {
if _, err = disk.StatFile(bucket, path.Join(prefixDir, entry, xlMetaJSONFile)); err == nil {
// If it is an object trim the trailing "/"
entries[i] = strings.TrimSuffix(entry, slashSeparator)
}
}
}
if len(mergedentries) == 0 {
// For the first successful disk.ListDir()
mergedentries = entries
sort.Strings(mergedentries)
continue
}
// find elements in entries which are not in mergedentries
for _, entry := range entries {
idx := sort.SearchStrings(mergedentries, entry)
if mergedentries[idx] == entry {
continue
}
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
// Merge the entries and sort it.
mergedentries = append(mergedentries, newEntries...)
sort.Strings(mergedentries)
}
}
return mergedentries, false, nil
}
return listDir
}
// listObjectsHeal - wrapper function implemented over file tree walk.
func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == slashSeparator {
recursive = false
}
// "heal" true for listObjectsHeal() and false for listObjects()
heal := true
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirHealFactory(xl.storageDisks...)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, nil, endWalkCh)
}
var objInfos []ObjectInfo
var eof bool
var nextMarker string
for i := 0; i < maxKeys; {
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
// File not found is a valid case.
if walkResult.err == errFileNotFound {
return ListObjectsInfo{}, nil
}
return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix)
}
entry := walkResult.entry
var objInfo ObjectInfo
if strings.HasSuffix(entry, slashSeparator) {
// Object name needs to be full path.
objInfo.Bucket = bucket
objInfo.Name = entry
objInfo.IsDir = true
} else {
objInfo.Bucket = bucket
objInfo.Name = entry
}
nextMarker = objInfo.Name
objInfos = append(objInfos, objInfo)
i++
if walkResult.end == true {
eof = true
break
}
}
params := listParams{bucket, recursive, nextMarker, prefix, heal}
if !eof {
xl.listPool.Set(params, walkResultCh, endWalkCh)
}
result := ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}
result.Objects = append(result.Objects, ObjectInfo{
Name: objInfo.Name,
ModTime: objInfo.ModTime,
Size: objInfo.Size,
IsDir: false,
})
}
return result, nil
}
// ListObjects - list all objects at prefix, delimited by '/'.
func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify if bucket exists.
if !xl.isBucketExist(bucket) {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
// Verify if delimiter is anything other than '/', which we do not support.
if delimiter != "" && delimiter != slashSeparator {
return ListObjectsInfo{}, UnsupportedDelimiter{
Delimiter: delimiter,
}
}
// Verify if marker has prefix.
if marker != "" {
if !strings.HasPrefix(marker, prefix) {
return ListObjectsInfo{}, InvalidMarkerPrefixCombination{
Marker: marker,
Prefix: prefix,
}
}
}
// With max keys of zero we have reached eof, return right here.
if maxKeys == 0 {
return ListObjectsInfo{}, nil
}
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter' along
// with the prefix. On a flat namespace with 'prefix' as '/'
// we don't have any entries, since all the keys are of form 'keyName/...'
if delimiter == slashSeparator && prefix == slashSeparator {
return ListObjectsInfo{}, nil
}
// Over flowing count - reset to maxObjectList.
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
// Initiate a list operation, if successful filter and return quickly.
listObjInfo, err := xl.listObjectsHeal(bucket, prefix, marker, delimiter, maxKeys)
if err == nil {
// We got the entries successfully return.
return listObjInfo, nil
}
// Return error at the end.
return ListObjectsInfo{}, toObjectErr(err, bucket, prefix)
}

View file

@ -26,7 +26,8 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
recursive = false recursive = false
} }
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) heal := false // true only for xl.ListObjectsHeal
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal})
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := xl.isObject isLeaf := xl.isObject
@ -81,7 +82,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
} }
} }
params := listParams{bucket, recursive, nextMarker, prefix} params := listParams{bucket, recursive, nextMarker, prefix, heal}
if !eof { if !eof {
xl.listPool.Set(params, walkResultCh, endWalkCh) xl.listPool.Set(params, walkResultCh, endWalkCh)
} }

View file

@ -85,9 +85,10 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
} }
var walkerCh chan treeWalkResult var walkerCh chan treeWalkResult
var walkerDoneCh chan struct{} var walkerDoneCh chan struct{}
heal := false // true only for xl.ListObjectsHeal
// Validate if we need to list further depending on maxUploads. // Validate if we need to list further depending on maxUploads.
if maxUploads > 0 { if maxUploads > 0 {
walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal})
if walkerCh == nil { if walkerCh == nil {
walkerDoneCh = make(chan struct{}) walkerDoneCh = make(chan struct{})
isLeaf := xl.isMultipartUpload isLeaf := xl.isMultipartUpload
@ -179,7 +180,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if !eof { if !eof {
// Save the go-routine state in the pool so that it can continue from where it left off on // Save the go-routine state in the pool so that it can continue from where it left off on
// the next request. // the next request.
xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix}, walkerCh, walkerDoneCh) xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkerCh, walkerDoneCh)
} }
result.IsTruncated = !eof result.IsTruncated = !eof

View file

@ -213,6 +213,134 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
return nil return nil
} }
// HealObject - heal the object.
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
// up again, heal on the object should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
// FIXME: return Invalid prefix.
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Lock the object before healing.
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
if err := reduceErrs(errs, nil); err != nil {
return toObjectErr(err, bucket, object)
}
if !xlShouldHeal(partsMetadata, errs) {
// There is nothing to heal.
return nil
}
// List of disks having latest version of the object.
latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
// List of disks having outdated version of the object or missing object.
outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs)
// Latest xlMetaV1 for reference.
latestMeta := pickValidXLMeta(partsMetadata, modTime)
for index, disk := range outDatedDisks {
// Before healing outdated disks, we need to remove xl.json
// and part files from "bucket/object/" so that
// rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds.
if disk == nil {
// Not an outdated disk.
continue
}
if errs[index] != nil {
// If there was an error (most likely errFileNotFound)
continue
}
// Outdated object with the same name exists that needs to be deleted.
outDatedMeta := partsMetadata[index]
// Delete all the parts.
for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ {
err := disk.DeleteFile(bucket,
pathJoin(object, outDatedMeta.Parts[partIndex].Name))
if err != nil {
return err
}
}
// Delete xl.json file.
err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
if err != nil {
return err
}
}
// Reorder so that we have data disks first and parity disks next.
latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks)
outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks)
partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata)
// We write at temporary location and then rename to fianal location.
tmpID := getUUID()
// Checksum of the part files. checkSumInfos[index] will contain checksums of all the part files
// in the outDatedDisks[index]
checkSumInfos := make([][]checkSumInfo, len(outDatedDisks))
// Heal each part. erasureHealFile() will write the healed part to
// .minio/tmp/uuid/ which needs to be renamed later to the final location.
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
partName := latestMeta.Parts[partIndex].Name
partSize := latestMeta.Parts[partIndex].Size
erasure := latestMeta.Erasure
sumInfo, err := latestMeta.Erasure.GetCheckSumInfo(partName)
if err != nil {
return err
}
// Heal the part file.
checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
bucket, pathJoin(object, partName),
minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName),
partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
if err != nil {
return err
}
for index, sum := range checkSums {
if outDatedDisks[index] == nil {
continue
}
checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum})
}
}
// xl.json should be written to all the healed disks.
for index, disk := range outDatedDisks {
if disk == nil {
continue
}
partsMetadata[index] = latestMeta
partsMetadata[index].Erasure.Checksum = checkSumInfos[index]
}
err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
if err != nil {
return toObjectErr(err, bucket, object)
}
// Rename from tmp location to the actual location.
for _, disk := range outDatedDisks {
if disk == nil {
continue
}
err := disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
if err != nil {
return err
}
}
return nil
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo. // GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
// Verify if bucket is valid. // Verify if bucket is valid.