deprecate listDir usage for healing (#9792)

listDir was incorrectly used for healing which
is slower, instead use Walk() to heal the entire
set.
This commit is contained in:
Harshavardhana 2020-06-09 17:09:19 -07:00 committed by GitHub
parent 1a0b7f58f9
commit 342ade03f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 87 additions and 222 deletions

View file

@ -115,7 +115,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
// Heal all erasure sets that need
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
for _, setIndex := range erasureSetToHeal {
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex])
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].drivesPerSet)
if err != nil {
logger.LogIf(ctx, err)
}

View file

@ -79,7 +79,7 @@ func getLocalBackgroundHealStatus() madmin.BgHealState {
}
// healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects) error {
func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects, drivesPerSet int) error {
buckets, err := xlObj.ListBuckets(ctx)
if err != nil {
return err
@ -108,12 +108,38 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects) error {
path: bucket.Name,
}
// List all objects in the current bucket and heal them
listDir := listDirFactory(ctx, xlObj.getLoadBalancedDisks()...)
walkResultCh := startTreeWalk(ctx, bucket.Name, "", "", true, listDir, nil)
for walkEntry := range walkResultCh {
var entryChs []FileInfoCh
for _, disk := range xlObj.getLoadBalancedDisks() {
if disk == nil {
// Disk can be offline
continue
}
entryCh, err := disk.Walk(bucket.Name, "", "", true, xlMetaJSONFile, readMetadata, ctx.Done())
if err != nil {
// Disk walk returned error, ignore it.
continue
}
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
}
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfo, len(entryChs))
for {
entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid)
if !ok {
return nil
}
if quorumCount == drivesPerSet {
// Skip good entries.
continue
}
bgSeq.sourceCh <- healSource{
path: pathJoin(bucket.Name, walkEntry.entry),
path: pathJoin(bucket.Name, entry.Name),
}
}
}
@ -173,7 +199,7 @@ func execLeaderTasks(ctx context.Context, z *xlZones) {
for _, zone := range z.zones {
// Heal set by set
for i, set := range zone.sets {
if err := healErasureSet(ctx, i, set); err != nil {
if err := healErasureSet(ctx, i, set, zone.drivesPerSet); err != nil {
logger.LogIf(ctx, err)
continue
}

View file

@ -27,6 +27,24 @@ import (
"time"
)
// Returns function "listDir" of the type listDirFunc.
// disks - used for doing disk.ListDir()
func listDirFactory(ctx context.Context, disk StorageAPI) ListDirFunc {
// Returns sorted merged entries from all the disks.
listDir := func(volume, dirPath, dirEntry string) (bool, []string) {
entries, err := disk.ListDir(volume, dirPath, -1, xlMetaJSONFile)
if err != nil {
return false, nil
}
if len(entries) == 0 {
return true, nil
}
sort.Strings(entries)
return false, filterMatchingPrefix(entries, dirEntry)
}
return listDir
}
// Fixed volume name that could be used across tests
const volume = "testvolume"
@ -219,8 +237,7 @@ func TestTreeWalkTimeout(t *testing.T) {
}
}
// Test ListDir - listDir should list entries from the first disk, if the first disk is down,
// it should list from the next disk.
// Test ListDir - listDir is expected to only list one disk.
func TestListDir(t *testing.T) {
file1 := "file1"
file2 := "file2"
@ -248,7 +265,8 @@ func TestListDir(t *testing.T) {
}
// create listDir function.
listDir := listDirFactory(context.Background(), disk1, disk2)
listDir1 := listDirFactory(context.Background(), disk1)
listDir2 := listDirFactory(context.Background(), disk2)
// Create file1 in fsDir1 and file2 in fsDir2.
disks := []StorageAPI{disk1, disk2}
@ -260,35 +278,23 @@ func TestListDir(t *testing.T) {
}
// Should list "file1" from fsDir1.
_, entries := listDir(volume, "", "")
if len(entries) != 2 {
t.Fatal("Expected the number of entries to be 2")
}
if entries[0] != file1 {
t.Fatal("Expected the entry to be file1")
}
if entries[1] != file2 {
t.Fatal("Expected the entry to be file2")
}
// Remove fsDir1, list should return entries from fsDir2
err = os.RemoveAll(fsDir1)
if err != nil {
t.Error(err)
}
// Should list "file2" from fsDir2.
_, entries = listDir(volume, "", "")
_, entries := listDir1(volume, "", "")
if len(entries) != 1 {
t.Fatal("Expected the number of entries to be 1")
}
if entries[0] != file1 {
t.Fatal("Expected the entry to be file1")
}
_, entries = listDir2(volume, "", "")
if len(entries) != 1 {
t.Fatal("Expected the number of entries to be 1")
}
if entries[0] != file2 {
t.Fatal("Expected the entry to be file2")
}
err = os.RemoveAll(fsDir2)
if err != nil {
t.Error(err)
}
}
// TestRecursiveWalk - tests if treeWalk returns entries correctly with and

View file

@ -814,9 +814,9 @@ func (f *FileInfoCh) Push(fi FileInfo) {
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
// if the caller wishes to list N entries to call leastEntry
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
for i := range entryChs {
entries[i], entriesValid[i] = entryChs[i].Pop()
}
@ -852,7 +852,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
return lentry, 0, isTruncated
}
leastEntryCount := 0
lexicallySortedEntryCount := 0
for i, valid := range entriesValid {
if !valid {
continue
@ -861,7 +861,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
// Entries are duplicated across disks,
// we should simply skip such entries.
if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) {
leastEntryCount++
lexicallySortedEntryCount++
continue
}
@ -870,7 +870,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
entryChs[i].Push(entries[i])
}
return lentry, leastEntryCount, isTruncated
return lentry, lexicallySortedEntryCount, isTruncated
}
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
@ -879,7 +879,7 @@ func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, ndisks int) (entries Fil
entriesInfos := make([]FileInfo, len(entryChs))
entriesValid := make([]bool, len(entryChs))
for {
fi, quorumCount, valid := leastEntry(entryChs, entriesInfos, entriesValid)
fi, quorumCount, valid := lexicallySortedEntry(entryChs, entriesInfos, entriesValid)
if !valid {
// We have reached EOF across all entryChs, break the loop.
break
@ -1003,7 +1003,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
break
}
result, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
result, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid)
if !ok {
eof = true
break
@ -1690,7 +1690,7 @@ func (s *xlSets) Walk(ctx context.Context, bucket, prefix string, results chan<-
defer close(results)
for {
entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid)
if !ok {
return
}
@ -1716,7 +1716,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, opts ma
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfo, len(entryChs))
for {
entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid)
if !ok {
break
}

View file

@ -18,174 +18,10 @@ package cmd
import (
"context"
"sort"
)
// Returns function "listDir" of the type listDirFunc.
// disks - used for doing disk.ListDir()
func listDirFactory(ctx context.Context, disks ...StorageAPI) ListDirFunc {
// Returns sorted merged entries from all the disks.
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, mergedEntries []string) {
for _, disk := range disks {
if disk == nil {
continue
}
var entries []string
var newEntries []string
var err error
entries, err = disk.ListDir(bucket, prefixDir, -1, xlMetaJSONFile)
if err != nil || len(entries) == 0 {
continue
}
// Find elements in entries which are not in mergedEntries
for _, entry := range entries {
idx := sort.SearchStrings(mergedEntries, entry)
// if entry is already present in mergedEntries don't add.
if idx < len(mergedEntries) && mergedEntries[idx] == entry {
continue
}
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
// Merge the entries and sort it.
mergedEntries = append(mergedEntries, newEntries...)
sort.Strings(mergedEntries)
}
}
if len(mergedEntries) == 0 {
return true, nil
}
return false, filterMatchingPrefix(mergedEntries, prefixEntry)
}
return listDir
}
// listObjects - wrapper function implemented over file tree walk.
func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == SlashSeparator {
recursive = false
}
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirFactory(ctx, xl.getLoadBalancedDisks()...)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, endWalkCh)
}
var objInfos []ObjectInfo
var eof bool
var nextMarker string
for i := 0; i < maxKeys; {
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true
break
}
entry := walkResult.entry
var objInfo ObjectInfo
if HasSuffix(entry, SlashSeparator) {
// Object name needs to be full path.
objInfo.Bucket = bucket
objInfo.Name = entry
objInfo.IsDir = true
} else {
// Set the Mode to a "regular" file.
var err error
objInfo, err = xl.getObjectInfo(ctx, bucket, entry, ObjectOptions{})
if err != nil {
// Ignore errFileNotFound as the object might have got
// deleted in the interim period of listing and getObjectInfo(),
// ignore quorum error as it might be an entry from an outdated disk.
if IsErrIgnored(err, []error{
errFileNotFound,
errXLReadQuorum,
}...) {
continue
}
return loi, toObjectErr(err, bucket, prefix)
}
}
nextMarker = objInfo.Name
objInfos = append(objInfos, objInfo)
i++
if walkResult.end {
eof = true
break
}
}
params := listParams{bucket, recursive, nextMarker, prefix}
if !eof {
xl.listPool.Set(params, walkResultCh, endWalkCh)
}
result := ListObjectsInfo{}
for _, objInfo := range objInfos {
if objInfo.IsDir && delimiter == SlashSeparator {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}
result.Objects = append(result.Objects, objInfo)
}
if !eof {
result.IsTruncated = true
if len(objInfos) > 0 {
result.NextMarker = objInfos[len(objInfos)-1].Name
}
}
return result, nil
}
// ListObjects - list all objects at prefix, delimited by '/'.
// ListObjects - list all objects at prefix, delimited by '/', shouldn't be called.
func (xl xlObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
if err := checkListObjsArgs(ctx, bucket, prefix, marker, xl); err != nil {
return loi, err
}
// With max keys of zero we have reached eof, return right here.
if maxKeys == 0 {
return loi, nil
}
// Marker is set validate pre-condition.
if marker != "" {
// Marker not common with prefix is not implemented.Send an empty response
if !HasPrefix(marker, prefix) {
return ListObjectsInfo{}, e
}
}
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter' along
// with the prefix. On a flat namespace with 'prefix' as '/'
// we don't have any entries, since all the keys are of form 'keyName/...'
if delimiter == SlashSeparator && prefix == SlashSeparator {
return loi, nil
}
// Over flowing count - reset to maxObjectList.
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
// Initiate a list operation, if successful filter and return quickly.
listObjInfo, err := xl.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err == nil {
// We got the entries successfully return.
return listObjInfo, nil
}
// Return error at the end.
return loi, toObjectErr(err, bucket, prefix)
// Shouldn't be called
return loi, NotImplemented{}
}

View file

@ -65,9 +65,6 @@ type xlObjects struct {
// Byte pools used for temporary i/o buffers.
bp *bpool.BytePoolCap
// TODO: ListObjects pool management, should be removed in future.
listPool *TreeWalkPool
mrfUploadCh chan partialUpload
}

View file

@ -624,7 +624,7 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
break
}
result, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
result, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
eof = true
break
@ -852,9 +852,9 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
// if the caller wishes to list N entries to call leastEntry
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
for i, entryChs := range zoneEntryChs {
for j := range entryChs {
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
@ -902,7 +902,7 @@ func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneE
return lentry, 0, zoneIndex, isTruncated
}
leastEntryCount := 0
lexicallySortedEntryCount := 0
for i, entriesValid := range zoneEntriesValid {
for j, valid := range entriesValid {
if !valid {
@ -912,7 +912,7 @@ func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneE
// Entries are duplicated across disks,
// we should simply skip such entries.
if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) {
leastEntryCount++
lexicallySortedEntryCount++
continue
}
@ -922,7 +922,7 @@ func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneE
}
}
return lentry, leastEntryCount, zoneIndex, isTruncated
return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated
}
// mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys.
@ -935,7 +935,7 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int)
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
}
for {
fi, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
fi, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
// We have reached EOF across all entryChs, break the loop.
break
@ -1476,7 +1476,7 @@ func (z *xlZones) Walk(ctx context.Context, bucket, prefix string, results chan<
defer close(results)
for {
entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs,
entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs,
zonesEntriesInfos, zonesEntriesValid)
if !ok {
return
@ -1519,7 +1519,7 @@ func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, opts m
}
for {
entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
break
}