fix: implement splunk specific listObjects when delimiter=guidSplunk (#9186)

This commit is contained in:
Krishna Srinivas 2020-03-22 19:23:47 -07:00 committed by GitHub
parent da04cb91ce
commit 45b1c66195
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 289 additions and 5 deletions

View file

@ -126,6 +126,13 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) {
return d.disk.DeleteVol(volume)
}
func (d *naughtyDisk) WalkSplunk(volume, path, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := d.calcError(); err != nil {
return nil, err
}
return d.disk.WalkSplunk(volume, path, marker, endWalkCh)
}
func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := d.calcError(); err != nil {
return nil, err

View file

@ -117,6 +117,13 @@ func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive
return p.storage.Walk(volume, dirPath, marker, recursive, leafFile, readMetadataFn, endWalkCh)
}
func (p *posixDiskIDCheck) WalkSplunk(volume, dirPath string, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.WalkSplunk(volume, dirPath, marker, endWalkCh)
}
func (p *posixDiskIDCheck) ListDir(volume, dirPath string, count int, leafFile string) ([]string, error) {
if p.isDiskStale() {
return nil, errDiskNotFound

View file

@ -645,6 +645,129 @@ func (s *posix) DeleteVol(volume string) (err error) {
return nil
}
const guidSplunk = "guidSplunk"
// ListDirSplunk - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing SlashSeparator.
func (s *posix) ListDirSplunk(volume, dirPath string, count int) (entries []string, err error) {
guidIndex := strings.Index(dirPath, guidSplunk)
if guidIndex != -1 {
return nil, nil
}
const receiptJSON = "receipt.json"
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
}()
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat((volumeDir))
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
dirPath = pathJoin(volumeDir, dirPath)
if count > 0 {
entries, err = readDirN(dirPath, count)
} else {
entries, err = readDir(dirPath)
}
for i, entry := range entries {
if entry != receiptJSON {
continue
}
if _, serr := os.Stat(pathJoin(dirPath, entry, xlMetaJSONFile)); serr == nil {
entries[i] = strings.TrimSuffix(entry, SlashSeparator)
}
}
return entries, err
}
// WalkSplunk - is a sorted walker which returns file entries in lexically
// sorted order, additionally along with metadata about each of those entries.
// Implemented specifically for Splunk backend structure and List call with
// delimiter as "guidSplunk"
func (s *posix) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
ch = make(chan FileInfo)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
entries, err := s.ListDirSplunk(volume, dirPath, -1)
if err != nil {
return
}
if len(entries) == 0 {
return true, nil
}
sort.Strings(entries)
return false, filterMatchingPrefix(entries, dirEntry)
}
walkResultCh := startTreeWalk(context.Background(), volume, dirPath, marker, true, listDir, endWalkCh)
for {
walkResult, ok := <-walkResultCh
if !ok {
return
}
var fi FileInfo
if HasSuffix(walkResult.entry, SlashSeparator) {
fi = FileInfo{
Volume: volume,
Name: walkResult.entry,
Mode: os.ModeDir,
}
} else {
// Dynamic time delay.
t := UTCNow()
buf, err := s.ReadAll(volume, pathJoin(walkResult.entry, xlMetaJSONFile))
sleepDuration(time.Since(t), 10.0)
if err != nil {
continue
}
fi = readMetadata(buf, volume, walkResult.entry)
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
// Walk - is a sorted walker which returns file entries in lexically
// sorted order, additionally along with metadata about each of those entries.
func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile string,

View file

@ -45,6 +45,8 @@ type StorageAPI interface {
// Walk in sorted order directly on disk.
Walk(volume, dirPath string, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error)
// Walk in sorted order directly on disk.
WalkSplunk(volume, dirPath string, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error)
// File operations.
ListDir(volume, dirPath string, count int, leafFile string) ([]string, error)

View file

@ -333,6 +333,40 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf
return int64(n), err
}
func (client *storageRESTClient) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTDirPath, dirPath)
values.Set(storageRESTMarkerPath, marker)
respBody, err := client.call(storageRESTMethodWalkSplunk, values, nil, -1)
if err != nil {
return nil, err
}
ch := make(chan FileInfo)
go func() {
defer close(ch)
defer http.DrainBody(respBody)
decoder := gob.NewDecoder(respBody)
for {
var fi FileInfo
if gerr := decoder.Decode(&fi); gerr != nil {
// Upon error return
return
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
func (client *storageRESTClient) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {
values := make(url.Values)

View file

@ -40,6 +40,7 @@ const (
storageRESTMethodReadFileStream = "/readfilestream"
storageRESTMethodListDir = "/listdir"
storageRESTMethodWalk = "/walk"
storageRESTMethodWalkSplunk = "/walksplunk"
storageRESTMethodDeleteFile = "/deletefile"
storageRESTMethodDeleteFileBulk = "/deletefilebulk"
storageRESTMethodDeletePrefixes = "/deleteprefixes"

View file

@ -428,6 +428,30 @@ func readMetadata(buf []byte, volume, entry string) FileInfo {
}
}
// WalkHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
dirPath := vars[storageRESTDirPath]
markerPath := vars[storageRESTMarkerPath]
w.Header().Set(xhttp.ContentType, "text/event-stream")
encoder := gob.NewEncoder(w)
fch, err := s.storage.WalkSplunk(volume, dirPath, markerPath, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
}
for fi := range fch {
encoder.Encode(&fi)
}
w.(http.Flusher).Flush()
}
// WalkHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -446,12 +470,10 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request)
w.Header().Set(xhttp.ContentType, "text/event-stream")
encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w)
defer done()
fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, r.Context().Done())
if err != nil {
logger.LogIf(r.Context(), err)
s.writeErrorResponse(w, err)
return
}
for fi := range fch {
@ -760,6 +782,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTLeafFile)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkSplunk).HandlerFunc(httpTraceHdrs(server.WalkSplunkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeletePrefixes).HandlerFunc(httpTraceHdrs(server.DeletePrefixesHandler)).
Queries(restQueries(storageRESTVolume)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)).

View file

@ -96,7 +96,8 @@ type xlSets struct {
distributionAlgo string
// Merge tree walk
pool *MergeWalkPool
pool *MergeWalkPool
poolSplunk *MergeWalkPool
mrfMU sync.Mutex
mrfUploads map[string]int
@ -289,6 +290,7 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS
disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.XL.DistributionAlgo,
pool: NewMergeWalkPool(globalMergeLookupTimeout),
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
mrfUploads: make(map[string]int),
}
@ -968,6 +970,36 @@ func (s *xlSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker st
return entryChs
}
// Starts a walk channel across all disks and returns a slice of
// FileInfo channels which can be read from.
func (s *xlSets) startSplunkMergeWalksN(ctx context.Context, bucket, prefix, marker string, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh {
var entryChs []FileInfoCh
var success int
for _, set := range s.sets {
// Reset for the next erasure set.
success = ndisks
for _, disk := range set.getLoadBalancedDisks() {
if disk == nil {
// Disk can be offline
continue
}
entryCh, err := disk.WalkSplunk(bucket, prefix, marker, endWalkCh)
if err != nil {
// Disk walk returned error, ignore it.
continue
}
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
success--
if success == 0 {
break
}
}
}
return entryChs
}
func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
endWalkCh := make(chan struct{})
defer close(endWalkCh)

View file

@ -697,6 +697,58 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
return result, nil
}
func (z *xlZones) listObjectsSplunk(ctx context.Context, bucket, prefix, marker string, maxKeys int) (loi ListObjectsInfo, err error) {
if strings.Contains(prefix, guidSplunk) {
logger.LogIf(ctx, NotImplemented{})
return loi, NotImplemented{}
}
recursive := true
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
const ndisks = 3
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, ndisks)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, ndisks)
if len(entries.Files) == 0 {
return loi, nil
}
loi.IsTruncated = entries.IsTruncated
if loi.IsTruncated {
loi.NextMarker = entries.Files[len(entries.Files)-1].Name
}
for _, entry := range entries.Files {
objInfo := entry.ToObjectInfo()
splits := strings.Split(objInfo.Name, guidSplunk)
if len(splits) == 0 {
loi.Objects = append(loi.Objects, objInfo)
continue
}
loi.Prefixes = append(loi.Prefixes, splits[0]+guidSplunk)
}
if loi.IsTruncated {
for i, zone := range z.zones {
zone.poolSplunk.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, zonesEntryChs[i],
zonesEndWalkCh[i])
}
}
return loi, nil
}
func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
loi := ListObjectsInfo{}
@ -732,7 +784,9 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim
}
if delimiter != SlashSeparator && delimiter != "" {
// "heal" option passed can be ignored as the heal-listing does not send non-standard delimiter.
if delimiter == guidSplunk {
return z.listObjectsSplunk(ctx, bucket, prefix, marker, maxKeys)
}
return z.listObjectsNonSlash(ctx, bucket, prefix, marker, delimiter, maxKeys)
}