dedup common prefixes at lower layer

This commit is contained in:
Harshavardhana 2021-02-04 02:40:14 -08:00
parent 7f3f3ee1ee
commit ea1e72ad48
2 changed files with 34 additions and 11 deletions

View file

@ -1155,7 +1155,7 @@ func mergeServerSetsEntriesCh(serverSetsEntryChs [][]FileInfoCh, maxKeys int, se
serverSetsEntriesInfos = append(serverSetsEntriesInfos, make([]FileInfo, len(entryChs)))
serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs)))
}
var prevEntry string
for {
fi, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
if !ok {
@ -1168,17 +1168,12 @@ func mergeServerSetsEntriesCh(serverSetsEntryChs [][]FileInfoCh, maxKeys int, se
continue
}
if HasSuffix(fi.Name, slashSeparator) && fi.Name == prevEntry {
continue
}
entries.Files = append(entries.Files, fi)
i++
if i == maxKeys {
entries.IsTruncated = isTruncatedServerSets(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
break
}
prevEntry = fi.Name
}
return entries
}
@ -1319,15 +1314,11 @@ func (z *erasureServerSets) listObjectVersions(ctx context.Context, bucket, pref
loi.NextMarker = entries.FilesVersions[len(entries.FilesVersions)-1].Name
}
var prevPrefix string
for _, entry := range entries.FilesVersions {
for _, version := range entry.Versions {
objInfo := version.ToObjectInfo(bucket, entry.Name)
if HasSuffix(objInfo.Name, SlashSeparator) && !recursive {
if objInfo.Name != prevPrefix {
loi.Prefixes = append(loi.Prefixes, objInfo.Name)
prevPrefix = objInfo.Name
}
loi.Prefixes = append(loi.Prefixes, objInfo.Name)
continue
}
loi.Objects = append(loi.Objects, objInfo)

View file

@ -974,9 +974,16 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st
}
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh)
dirObjects := make(map[string]struct{})
for walkResult := range walkResultCh {
var fiv FileInfoVersions
if HasSuffix(walkResult.entry, SlashSeparator) {
_, dirObj := dirObjects[walkResult.entry]
if dirObj {
continue
}
dirObjects[walkResult.entry] = struct{}{}
fiv = FileInfoVersions{
Volume: volume,
Name: walkResult.entry,
@ -998,6 +1005,18 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st
if err != nil {
continue
}
if HasSuffix(fiv.Name, globalDirSuffix) {
entry := strings.TrimSuffix(fiv.Name, globalDirSuffix) + slashSeparator
_, dirObj := dirObjects[entry]
if dirObj {
continue
}
if !recursive && len(fiv.Versions) >= 2 {
// Remove multiple versions for directory objects, in non-recursive
fiv.Versions = fiv.Versions[:1]
}
dirObjects[entry] = struct{}{}
}
}
select {
case ch <- fiv:
@ -1059,14 +1078,20 @@ func (s *xlStorage) Walk(ctx context.Context, volume, dirPath, marker string, re
}
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh)
dirObjects := make(map[string]struct{})
for walkResult := range walkResultCh {
var fi FileInfo
if HasSuffix(walkResult.entry, SlashSeparator) {
_, dirObj := dirObjects[walkResult.entry]
if dirObj {
continue
}
fi = FileInfo{
Volume: volume,
Name: walkResult.entry,
Mode: os.ModeDir,
}
dirObjects[walkResult.entry] = struct{}{}
} else {
var err error
var xlMetaBuf []byte
@ -1082,6 +1107,13 @@ func (s *xlStorage) Walk(ctx context.Context, volume, dirPath, marker string, re
// Ignore delete markers.
continue
}
if HasSuffix(fi.Name, globalDirSuffix) {
entry := strings.TrimSuffix(fi.Name, globalDirSuffix) + slashSeparator
if _, dirObj := dirObjects[entry]; dirObj {
continue
}
dirObjects[entry] = struct{}{}
}
}
select {
case ch <- fi: