misc: various fixes
- listObjects optimized to handle max-keys=1 when prefix is object Some applications albeit poorly written rather than using headObject rely on listObjects to check for existence of object, this unusual request always has prefix=(to actual object) and max-keys=1 handle this situation specially such that we can avoid readdir() on the top level parent to avoid sorting and skipping, ensuring that such type of listObjects() always behaves similar to a headObject() call. - add deadline for specific disks not globally
This commit is contained in:
parent
f3cd606976
commit
f4c4e5fc33
|
@ -115,7 +115,11 @@ func (er erasureObjects) renameAll(ctx context.Context, bucket, prefix string) {
|
|||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
disk.RenameFile(ctx, bucket, prefix, minioMetaTmpBucket, mustGetUUID())
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
disk.RenameFile(ctx, bucket, prefix, minioMetaTmpDeletedBucket, mustGetUUID())
|
||||
}(disk)
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -130,6 +134,10 @@ func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
|
|||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
disk.Delete(ctx, bucket, prefix, true)
|
||||
}(disk)
|
||||
}
|
||||
|
|
|
@ -963,6 +963,21 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int {
|
|||
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
||||
var loi ListObjectsInfo
|
||||
|
||||
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
|
||||
// Optimization for certain applications like
|
||||
// - Cohesity
|
||||
// - Actifio, Splunk etc.
|
||||
// which send ListObjects requests where the actual object
|
||||
// itself is the prefix and max-keys=1 in such scenarios
|
||||
// we can simply verify locally if such an object exists
|
||||
// to avoid the need for ListObjects().
|
||||
objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
|
||||
if err == nil {
|
||||
loi.Objects = append(loi.Objects, objInfo)
|
||||
return loi, nil
|
||||
}
|
||||
}
|
||||
|
||||
merged, err := z.listPath(ctx, listPathOptions{
|
||||
Bucket: bucket,
|
||||
Prefix: prefix,
|
||||
|
@ -1292,18 +1307,6 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo
|
|||
return nil
|
||||
}
|
||||
|
||||
// deleteAll will delete a bucket+prefix unconditionally across all disks.
|
||||
// Note that set distribution is ignored so it should only be used in cases where
|
||||
// data is not distributed across sets.
|
||||
// Errors are logged but individual disk failures are not returned.
|
||||
func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix string) {
|
||||
for _, servers := range z.serverPools {
|
||||
for _, set := range servers.sets {
|
||||
set.deleteAll(ctx, bucket, prefix)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// renameAll will rename bucket+prefix unconditionally across all disks to
|
||||
// minioMetaTmpBucket + unique uuid,
|
||||
// Note that set distribution is ignored so it should only be used in cases where
|
||||
|
|
|
@ -311,7 +311,7 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques
|
|||
}
|
||||
|
||||
var reportNotFound bool
|
||||
if v := vars[storageRESTReportNotFound]; v != "" {
|
||||
if v := r.URL.Query().Get(storageRESTReportNotFound); v != "" {
|
||||
reportNotFound, err = strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
|
|
|
@ -18,7 +18,6 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
|
@ -238,14 +237,10 @@ func (m *metacache) delete(ctx context.Context) {
|
|||
logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id))
|
||||
}
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
|
||||
return
|
||||
if objAPI != nil {
|
||||
ez, ok := objAPI.(*erasureServerPools)
|
||||
if ok {
|
||||
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
|
||||
}
|
||||
}
|
||||
ez, ok := objAPI.(*erasureServerPools)
|
||||
if !ok {
|
||||
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools"))
|
||||
return
|
||||
}
|
||||
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue