listing also match sets index for proper quorum
This commit is contained in:
parent
006c69f716
commit
254a78838d
|
@ -957,8 +957,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
var lentry FileInfo
|
||||
var found bool
|
||||
var zoneIndex = -1
|
||||
// TODO: following loop can be merged with above
|
||||
// loop, explore this possibility.
|
||||
var setIndex = -1
|
||||
for i, entriesValid := range zoneEntriesValid {
|
||||
for j, valid := range entriesValid {
|
||||
if !valid {
|
||||
|
@ -968,6 +967,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
lentry = zoneEntries[i][j]
|
||||
found = true
|
||||
zoneIndex = i
|
||||
setIndex = zoneEntryChs[i][j].SetIndex
|
||||
continue
|
||||
}
|
||||
str1 := zoneEntries[i][j].Name
|
||||
|
@ -982,6 +982,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
if str1 < str2 {
|
||||
lentry = zoneEntries[i][j]
|
||||
zoneIndex = i
|
||||
setIndex = zoneEntryChs[i][j].SetIndex
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1001,7 +1002,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
|
||||
// Entries are duplicated across disks,
|
||||
// we should simply skip such entries.
|
||||
if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) {
|
||||
if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) && setIndex == zoneEntryChs[i][j].SetIndex {
|
||||
lexicallySortedEntryCount++
|
||||
continue
|
||||
}
|
||||
|
@ -1050,6 +1051,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
var lentry FileInfoVersions
|
||||
var found bool
|
||||
var zoneIndex = -1
|
||||
var setIndex = -1
|
||||
for i, entriesValid := range zoneEntriesValid {
|
||||
for j, valid := range entriesValid {
|
||||
if !valid {
|
||||
|
@ -1059,6 +1061,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
lentry = zoneEntries[i][j]
|
||||
found = true
|
||||
zoneIndex = i
|
||||
setIndex = zoneEntryChs[i][j].SetIndex
|
||||
continue
|
||||
}
|
||||
str1 := zoneEntries[i][j].Name
|
||||
|
@ -1073,6 +1076,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
if str1 < str2 {
|
||||
lentry = zoneEntries[i][j]
|
||||
zoneIndex = i
|
||||
setIndex = zoneEntryChs[i][j].SetIndex
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1092,7 +1096,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
|
||||
// Entries are duplicated across disks,
|
||||
// we should simply skip such entries.
|
||||
if lentry.Name == zoneEntries[i][j].Name && lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) {
|
||||
if lentry.Name == zoneEntries[i][j].Name && lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) && setIndex == zoneEntryChs[i][j].SetIndex {
|
||||
lexicallySortedEntryCount++
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -832,9 +832,10 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
|
|||
|
||||
// FileInfoVersionsCh - file info versions channel
|
||||
type FileInfoVersionsCh struct {
|
||||
Ch chan FileInfoVersions
|
||||
Prev FileInfoVersions
|
||||
Valid bool
|
||||
Ch chan FileInfoVersions
|
||||
Prev FileInfoVersions
|
||||
Valid bool
|
||||
SetIndex int
|
||||
}
|
||||
|
||||
// Pop - pops a cached entry if any, or from the cached channel.
|
||||
|
@ -855,9 +856,10 @@ func (f *FileInfoVersionsCh) Push(fi FileInfoVersions) {
|
|||
|
||||
// FileInfoCh - file info channel
|
||||
type FileInfoCh struct {
|
||||
Ch chan FileInfo
|
||||
Prev FileInfo
|
||||
Valid bool
|
||||
Ch chan FileInfo
|
||||
Prev FileInfo
|
||||
Valid bool
|
||||
SetIndex int
|
||||
}
|
||||
|
||||
// Pop - pops a cached entry if any, or from the cached channel.
|
||||
|
@ -884,8 +886,8 @@ func (f *FileInfoCh) Push(fi FileInfo) {
|
|||
// if the caller wishes to list N entries to call lexicallySortedEntry
|
||||
// N times until this boolean is 'false'.
|
||||
func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) {
|
||||
for j := range entryChs {
|
||||
entries[j], entriesValid[j] = entryChs[j].Pop()
|
||||
for i := range entryChs {
|
||||
entries[i], entriesValid[i] = entryChs[i].Pop()
|
||||
}
|
||||
|
||||
var isTruncated = false
|
||||
|
@ -899,6 +901,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
|
|||
|
||||
var lentry FileInfoVersions
|
||||
var found bool
|
||||
var setIndex = -1
|
||||
for i, valid := range entriesValid {
|
||||
if !valid {
|
||||
continue
|
||||
|
@ -906,10 +909,12 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
|
|||
if !found {
|
||||
lentry = entries[i]
|
||||
found = true
|
||||
setIndex = i
|
||||
continue
|
||||
}
|
||||
if entries[i].Name < lentry.Name {
|
||||
lentry = entries[i]
|
||||
setIndex = i
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -927,7 +932,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
|
|||
|
||||
// Entries are duplicated across disks,
|
||||
// we should simply skip such entries.
|
||||
if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) {
|
||||
if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) && setIndex == entryChs[i].SetIndex {
|
||||
lexicallySortedEntryCount++
|
||||
continue
|
||||
}
|
||||
|
@ -954,11 +959,11 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
|
|||
var entryChs []FileInfoVersionsCh
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
for _, set := range s.sets {
|
||||
for i, set := range s.sets {
|
||||
// Reset for the next erasure set.
|
||||
for _, disk := range set.getLoadBalancedNDisks(ndisks) {
|
||||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
go func(i int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
|
||||
|
@ -968,10 +973,11 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
|
|||
|
||||
mutex.Lock()
|
||||
entryChs = append(entryChs, FileInfoVersionsCh{
|
||||
Ch: entryCh,
|
||||
Ch: entryCh,
|
||||
SetIndex: i,
|
||||
})
|
||||
mutex.Unlock()
|
||||
}(disk)
|
||||
}(i, disk)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -984,11 +990,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
|
|||
var entryChs []FileInfoCh
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
for _, set := range s.sets {
|
||||
for i, set := range s.sets {
|
||||
// Reset for the next erasure set.
|
||||
for _, disk := range set.getLoadBalancedNDisks(ndisks) {
|
||||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
go func(i int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
var entryCh chan FileInfo
|
||||
|
@ -1004,10 +1010,11 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
|
|||
}
|
||||
mutex.Lock()
|
||||
entryChs = append(entryChs, FileInfoCh{
|
||||
Ch: entryCh,
|
||||
Ch: entryCh,
|
||||
SetIndex: i,
|
||||
})
|
||||
mutex.Unlock()
|
||||
}(disk)
|
||||
}(i, disk)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
|
|
@ -148,7 +148,8 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
|
|||
}
|
||||
mu.Lock()
|
||||
entryChs = append(entryChs, FileInfoVersionsCh{
|
||||
Ch: entryCh,
|
||||
Ch: entryCh,
|
||||
SetIndex: setIndex,
|
||||
})
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
|
Loading…
Reference in a new issue