Avoid returning duplicated prefixes containing stale files (#11644)
When a prefix is present in multiple sets and multiple zones, and it only contains stale files, it will be shown as empty. This fix reduces the chance of these use case appearing by testing on set index & zone index of a found prefix: If a prefix does not have a quorum in a given set, just do not attribute it a quorum and push it again to visit it later since there is a chance that it can have a quorum in another set or zone. Now if the same prefix does not have quorum in any set in any zone, it will never have a quourm and won't be shown. But sometimes a prefix can have quorum because it contains stale objects, this use case is not fixed. Co-authored-by: Anis Elleuch <anis@min.io>
This commit is contained in:
parent
38f12c9841
commit
bf9297723c
|
@ -83,6 +83,7 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
z.serverSets[i].zoneIndex = i
|
||||
}
|
||||
ctx, z.shutdown = context.WithCancel(ctx)
|
||||
go intDataUpdateTracker.start(ctx, localDrives...)
|
||||
|
@ -688,12 +689,13 @@ func (z *erasureServerSets) listObjectsNonSlash(ctx context.Context, bucket, pre
|
|||
serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs)))
|
||||
}
|
||||
|
||||
var prevEntryName string
|
||||
for {
|
||||
if len(objInfos) == maxKeys {
|
||||
break
|
||||
}
|
||||
|
||||
result, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
result, isDir, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
if !ok {
|
||||
eof = true
|
||||
break
|
||||
|
@ -704,6 +706,11 @@ func (z *erasureServerSets) listObjectsNonSlash(ctx context.Context, bucket, pre
|
|||
continue
|
||||
}
|
||||
|
||||
if isDir && result.Name == prevEntryName {
|
||||
continue
|
||||
}
|
||||
prevEntryName = result.Name
|
||||
|
||||
var objInfo ObjectInfo
|
||||
|
||||
index := strings.Index(strings.TrimPrefix(result.Name, prefix), delimiter)
|
||||
|
@ -933,7 +940,7 @@ func (z *erasureServerSets) listObjects(ctx context.Context, bucket, prefix, mar
|
|||
// again to list the next entry. It is callers responsibility
|
||||
// if the caller wishes to list N entries to call lexicallySortedEntry
|
||||
// N times until this boolean is 'false'.
|
||||
func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
|
||||
func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, bool, int, int, bool) {
|
||||
for i, entryChs := range zoneEntryChs {
|
||||
for j := range entryChs {
|
||||
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
|
||||
|
@ -955,7 +962,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
}
|
||||
|
||||
var lentry FileInfo
|
||||
var found bool
|
||||
var found, isDir bool
|
||||
var zoneIndex = -1
|
||||
var setIndex = -1
|
||||
for i, entriesValid := range zoneEntriesValid {
|
||||
|
@ -990,7 +997,11 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
// We haven't been able to find any least entry,
|
||||
// this would mean that we don't have valid entry.
|
||||
if !found {
|
||||
return lentry, 0, zoneIndex, isTruncated
|
||||
return lentry, false, 0, zoneIndex, isTruncated
|
||||
}
|
||||
|
||||
if HasSuffix(lentry.Name, slashSeparator) {
|
||||
isDir = true
|
||||
}
|
||||
|
||||
if HasSuffix(lentry.Name, globalDirSuffix) {
|
||||
|
@ -1005,17 +1016,26 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
}
|
||||
|
||||
zoneEntryName := zoneEntries[i][j].Name
|
||||
zoneEntryObjDir := false
|
||||
if HasSuffix(zoneEntryName, globalDirSuffix) {
|
||||
zoneEntryName = strings.TrimSuffix(zoneEntryName, globalDirSuffix) + slashSeparator
|
||||
zoneEntryObjDir = true
|
||||
}
|
||||
|
||||
// Entries are duplicated across disks,
|
||||
// we should simply skip such entries.
|
||||
if HasSuffix(lentry.Name, slashSeparator) || lentry.ModTime.Equal(zoneEntries[i][j].ModTime) && setIndex == zoneEntryChs[i][j].SetIndex {
|
||||
if lentry.Name == zoneEntryName {
|
||||
if lentry.Name == zoneEntryName {
|
||||
if isDir && zoneEntryObjDir {
|
||||
// Deduplicate directory & object-directory
|
||||
lexicallySortedEntryCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// Entries are duplicated across disks, we should simply skip such entries.
|
||||
if zoneIndex == zoneEntryChs[i][j].ZoneIndex && setIndex == zoneEntryChs[i][j].SetIndex {
|
||||
if isDir || lentry.ModTime.Equal(zoneEntries[i][j].ModTime) {
|
||||
lexicallySortedEntryCount++
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push all entries which are lexically higher
|
||||
|
@ -1024,7 +1044,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
}
|
||||
}
|
||||
|
||||
return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated
|
||||
return lentry, isDir, lexicallySortedEntryCount, zoneIndex, isTruncated
|
||||
}
|
||||
|
||||
// Calculate least entry across serverSets and across multiple FileInfoVersions
|
||||
|
@ -1034,7 +1054,7 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
|
|||
// again to list the next entry. It is callers responsibility
|
||||
// if the caller wishes to list N entries to call lexicallySortedEntry
|
||||
// N times until this boolean is 'false'.
|
||||
func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) {
|
||||
func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, bool, int, int, bool) {
|
||||
for i, entryChs := range zoneEntryChs {
|
||||
for j := range entryChs {
|
||||
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
|
||||
|
@ -1056,7 +1076,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
}
|
||||
|
||||
var lentry FileInfoVersions
|
||||
var found bool
|
||||
var found, isDir bool
|
||||
var zoneIndex = -1
|
||||
var setIndex = -1
|
||||
for i, entriesValid := range zoneEntriesValid {
|
||||
|
@ -1082,7 +1102,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
|
||||
if str1 < str2 {
|
||||
lentry = zoneEntries[i][j]
|
||||
zoneIndex = i
|
||||
zoneIndex = zoneEntryChs[i][j].ZoneIndex
|
||||
setIndex = zoneEntryChs[i][j].SetIndex
|
||||
}
|
||||
}
|
||||
|
@ -1091,7 +1111,11 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
// We haven't been able to find any least entry,
|
||||
// this would mean that we don't have valid entry.
|
||||
if !found {
|
||||
return lentry, 0, zoneIndex, isTruncated
|
||||
return lentry, false, 0, zoneIndex, isTruncated
|
||||
}
|
||||
|
||||
if HasSuffix(lentry.Name, slashSeparator) {
|
||||
isDir = true
|
||||
}
|
||||
|
||||
if HasSuffix(lentry.Name, globalDirSuffix) {
|
||||
|
@ -1106,17 +1130,26 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
}
|
||||
|
||||
zoneEntryName := zoneEntries[i][j].Name
|
||||
zoneEntryObjDir := false
|
||||
if HasSuffix(zoneEntryName, globalDirSuffix) {
|
||||
zoneEntryName = strings.TrimSuffix(zoneEntryName, globalDirSuffix) + slashSeparator
|
||||
zoneEntryObjDir = true
|
||||
}
|
||||
|
||||
// Entries are duplicated across disks,
|
||||
// we should simply skip such entries.
|
||||
if HasSuffix(lentry.Name, slashSeparator) || lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) && setIndex == zoneEntryChs[i][j].SetIndex {
|
||||
if lentry.Name == zoneEntryName {
|
||||
if lentry.Name == zoneEntryName {
|
||||
// Deduplicat diretory and objet directories in case of non recursive listing
|
||||
if isDir && zoneEntryObjDir {
|
||||
lexicallySortedEntryCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// Entries are duplicated across disks, we should simply skip such entries.
|
||||
if zoneIndex == zoneEntryChs[i][j].ZoneIndex && setIndex == zoneEntryChs[i][j].SetIndex {
|
||||
if isDir || lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) {
|
||||
lexicallySortedEntryCount++
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push all entries which are lexically higher
|
||||
|
@ -1125,7 +1158,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
|
|||
}
|
||||
}
|
||||
|
||||
return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated
|
||||
return lentry, isDir, lexicallySortedEntryCount, zoneIndex, isTruncated
|
||||
}
|
||||
|
||||
// mergeServerSetsEntriesVersionsCh - merges FileInfoVersions channel to entries upto maxKeys.
|
||||
|
@ -1138,8 +1171,9 @@ func mergeServerSetsEntriesVersionsCh(serverSetsEntryChs [][]FileInfoVersionsCh,
|
|||
serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs)))
|
||||
}
|
||||
|
||||
var prevEntryName string
|
||||
for {
|
||||
fi, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
fi, isDir, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
if !ok {
|
||||
// We have reached EOF across all entryChs, break the loop.
|
||||
break
|
||||
|
@ -1150,6 +1184,11 @@ func mergeServerSetsEntriesVersionsCh(serverSetsEntryChs [][]FileInfoVersionsCh,
|
|||
continue
|
||||
}
|
||||
|
||||
if isDir && prevEntryName == fi.Name {
|
||||
continue
|
||||
}
|
||||
prevEntryName = fi.Name
|
||||
|
||||
entries.FilesVersions = append(entries.FilesVersions, fi)
|
||||
i++
|
||||
if i == maxKeys {
|
||||
|
@ -1170,8 +1209,9 @@ func mergeServerSetsEntriesCh(serverSetsEntryChs [][]FileInfoCh, maxKeys int, se
|
|||
serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs)))
|
||||
}
|
||||
|
||||
var prevEntryName string
|
||||
for {
|
||||
fi, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
fi, isDir, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
if !ok {
|
||||
// We have reached EOF across all entryChs, break the loop.
|
||||
break
|
||||
|
@ -1182,6 +1222,11 @@ func mergeServerSetsEntriesCh(serverSetsEntryChs [][]FileInfoCh, maxKeys int, se
|
|||
continue
|
||||
}
|
||||
|
||||
if isDir && prevEntryName == fi.Name {
|
||||
continue
|
||||
}
|
||||
prevEntryName = fi.Name
|
||||
|
||||
entries.Files = append(entries.Files, fi)
|
||||
i++
|
||||
if i == maxKeys {
|
||||
|
@ -1792,17 +1837,25 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res
|
|||
go func() {
|
||||
defer close(results)
|
||||
|
||||
var prevEntryName string
|
||||
for {
|
||||
entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
entry, isDir, quorumCount, zoneIdx, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
if !ok {
|
||||
// We have reached EOF across all entryChs, break the loop.
|
||||
return
|
||||
}
|
||||
|
||||
if quorumCount >= serverSetsListTolerancePerSet[zoneIdx] {
|
||||
for _, version := range entry.Versions {
|
||||
results <- version.ToObjectInfo(bucket, version.Name)
|
||||
}
|
||||
if quorumCount < serverSetsListTolerancePerSet[zoneIdx] {
|
||||
continue
|
||||
}
|
||||
|
||||
if isDir && prevEntryName == entry.Name {
|
||||
continue
|
||||
}
|
||||
prevEntryName = entry.Name
|
||||
|
||||
for _, version := range entry.Versions {
|
||||
results <- version.ToObjectInfo(bucket, version.Name)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -1825,16 +1878,24 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res
|
|||
go func() {
|
||||
defer close(results)
|
||||
|
||||
var prevEntryName string
|
||||
for {
|
||||
entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
entry, isDir, quorumCount, zoneIdx, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
if !ok {
|
||||
// We have reached EOF across all entryChs, break the loop.
|
||||
return
|
||||
}
|
||||
|
||||
if quorumCount >= serverSetsListTolerancePerSet[zoneIdx] {
|
||||
results <- entry.ToObjectInfo(bucket, entry.Name)
|
||||
if quorumCount < serverSetsListTolerancePerSet[zoneIdx] {
|
||||
continue
|
||||
}
|
||||
|
||||
if isDir && entry.Name == prevEntryName {
|
||||
continue
|
||||
}
|
||||
prevEntryName = entry.Name
|
||||
|
||||
results <- entry.ToObjectInfo(bucket, entry.Name)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -1869,7 +1930,7 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri
|
|||
// actions they may want to take as if `prefix` is missing.
|
||||
err := toObjectErr(errFileNotFound, bucket, prefix)
|
||||
for {
|
||||
entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
entry, _, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
|
|
@ -99,6 +99,8 @@ type erasureSets struct {
|
|||
|
||||
mrfMU sync.Mutex
|
||||
mrfOperations map[healSource]int
|
||||
|
||||
zoneIndex int
|
||||
}
|
||||
|
||||
func isEndpointConnected(diskMap map[string]StorageAPI, endpoint string) bool {
|
||||
|
@ -832,10 +834,11 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
|
|||
|
||||
// FileInfoVersionsCh - file info versions channel
|
||||
type FileInfoVersionsCh struct {
|
||||
Ch chan FileInfoVersions
|
||||
Prev FileInfoVersions
|
||||
Valid bool
|
||||
SetIndex int
|
||||
Ch chan FileInfoVersions
|
||||
Prev FileInfoVersions
|
||||
Valid bool
|
||||
SetIndex int
|
||||
ZoneIndex int
|
||||
}
|
||||
|
||||
// Pop - pops a cached entry if any, or from the cached channel.
|
||||
|
@ -856,10 +859,11 @@ func (f *FileInfoVersionsCh) Push(fi FileInfoVersions) {
|
|||
|
||||
// FileInfoCh - file info channel
|
||||
type FileInfoCh struct {
|
||||
Ch chan FileInfo
|
||||
Prev FileInfo
|
||||
Valid bool
|
||||
SetIndex int
|
||||
Ch chan FileInfo
|
||||
Prev FileInfo
|
||||
Valid bool
|
||||
SetIndex int
|
||||
ZoneIndex int
|
||||
}
|
||||
|
||||
// Pop - pops a cached entry if any, or from the cached channel.
|
||||
|
@ -973,8 +977,9 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
|
|||
|
||||
mutex.Lock()
|
||||
entryChs = append(entryChs, FileInfoVersionsCh{
|
||||
Ch: entryCh,
|
||||
SetIndex: i,
|
||||
Ch: entryCh,
|
||||
SetIndex: i,
|
||||
ZoneIndex: s.zoneIndex,
|
||||
})
|
||||
mutex.Unlock()
|
||||
}(i, disk)
|
||||
|
@ -1010,8 +1015,9 @@ func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, mark
|
|||
}
|
||||
mutex.Lock()
|
||||
entryChs = append(entryChs, FileInfoCh{
|
||||
Ch: entryCh,
|
||||
SetIndex: i,
|
||||
Ch: entryCh,
|
||||
SetIndex: i,
|
||||
ZoneIndex: s.zoneIndex,
|
||||
})
|
||||
mutex.Unlock()
|
||||
}(i, disk)
|
||||
|
|
Loading…
Reference in a new issue