fix: various performance improvements to tiering (#12965)

- deletes should always Sweep() for tiering at the
  end and does not need an extra getObjectInfo() call
- puts, copy and multipart writes should conditionally
  do getObjectInfo() when tiering targets are configured
- introduce 'TransitionedObject' struct for ease of usage
  and understanding.
- multiple-pools optimization deletes don't need to hold
  read locks verifying objects across namespace and pools.
This commit is contained in:
Harshavardhana 2021-08-17 07:50:00 -07:00 committed by GitHub
parent 654a6e9871
commit ef4d023c85
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 213 additions and 164 deletions

View file

@ -187,7 +187,7 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp
if objInfo.IsRemote() { if objInfo.IsRemote() {
// Check if object is being restored. For more information on x-amz-restore header see // Check if object is being restored. For more information on x-amz-restore header see
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
w.Header()[xhttp.AmzStorageClass] = []string{objInfo.TransitionTier} w.Header()[xhttp.AmzStorageClass] = []string{objInfo.TransitionedObject.Tier}
} }
if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil { if lc, err := globalLifecycleSys.Get(objInfo.Bucket); err == nil {

View file

@ -450,6 +450,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
if api.CacheAPI() != nil { if api.CacheAPI() != nil {
getObjectInfoFn = api.CacheAPI().GetObjectInfo getObjectInfoFn = api.CacheAPI().GetObjectInfo
} }
var ( var (
hasLockEnabled, replicateSync bool hasLockEnabled, replicateSync bool
goi ObjectInfo goi ObjectInfo
@ -460,6 +461,9 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
hasLockEnabled = true hasLockEnabled = true
} }
versioned := globalBucketVersioningSys.Enabled(bucket)
suspended := globalBucketVersioningSys.Suspended(bucket)
dErrs := make([]DeleteError, len(deleteObjects.Objects)) dErrs := make([]DeleteError, len(deleteObjects.Objects))
oss := make([]*objSweeper, len(deleteObjects.Objects)) oss := make([]*objSweeper, len(deleteObjects.Objects))
for index, object := range deleteObjects.Objects { for index, object := range deleteObjects.Objects {
@ -491,16 +495,24 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
} }
} }
oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(multiDelete(object)) opts := ObjectOptions{
// Mutations of objects on versioning suspended buckets VersionID: object.VersionID,
// affect its null version. Through opts below we select Versioned: versioned,
// the null version's remote object to delete if VersionSuspended: suspended,
// transitioned.
opts := oss[index].GetOpts()
goi, gerr = getObjectInfoFn(ctx, bucket, object.ObjectName, opts)
if gerr == nil {
oss[index].SetTransitionState(goi)
} }
if replicateDeletes || object.VersionID != "" && hasLockEnabled || !globalTierConfigMgr.Empty() {
if !globalTierConfigMgr.Empty() && object.VersionID == "" && opts.VersionSuspended {
opts.VersionID = nullVersionID
}
goi, gerr = getObjectInfoFn(ctx, bucket, object.ObjectName, opts)
}
if !globalTierConfigMgr.Empty() {
oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(opts.VersionID).WithVersioning(versioned, suspended)
oss[index].SetTransitionState(goi.TransitionedObject)
}
if replicateDeletes { if replicateDeletes {
replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectName: object.ObjectName, ObjectName: object.ObjectName,
@ -522,18 +534,16 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
} }
} }
} }
if object.VersionID != "" { if object.VersionID != "" && hasLockEnabled {
if hasLockEnabled { if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone {
if apiErrCode := enforceRetentionBypassForDelete(ctx, r, bucket, object, goi, gerr); apiErrCode != ErrNone { apiErr := errorCodes.ToAPIErr(apiErrCode)
apiErr := errorCodes.ToAPIErr(apiErrCode) dErrs[index] = DeleteError{
dErrs[index] = DeleteError{ Code: apiErr.Code,
Code: apiErr.Code, Message: apiErr.Description,
Message: apiErr.Description, Key: object.ObjectName,
Key: object.ObjectName, VersionID: object.VersionID,
VersionID: object.VersionID,
}
continue
} }
continue
} }
} }
@ -555,8 +565,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
deleteList := toNames(objectsToDelete) deleteList := toNames(objectsToDelete)
dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{ dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{
Versioned: globalBucketVersioningSys.Enabled(bucket), Versioned: versioned,
VersionSuspended: globalBucketVersioningSys.Suspended(bucket), VersionSuspended: suspended,
}) })
deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) deletedObjects := make([]DeletedObject, len(deleteObjects.Objects))
for i := range errs { for i := range errs {
@ -620,6 +630,10 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
// Notify deleted event for objects. // Notify deleted event for objects.
for _, dobj := range deletedObjects { for _, dobj := range deletedObjects {
if dobj.ObjectName == "" {
continue
}
eventName := event.ObjectRemovedDelete eventName := event.ObjectRemovedDelete
objInfo := ObjectInfo{ objInfo := ObjectInfo{
Name: dobj.ObjectName, Name: dobj.ObjectName,

View file

@ -232,9 +232,9 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *Ob
// When an object is past expiry or when a transitioned object is being // When an object is past expiry or when a transitioned object is being
// deleted, 'mark' the data in the remote tier for delete. // deleted, 'mark' the data in the remote tier for delete.
entry := jentry{ entry := jentry{
ObjName: oi.transitionedObjName, ObjName: oi.TransitionedObject.Name,
VersionID: oi.transitionVersionID, VersionID: oi.TransitionedObject.VersionID,
TierName: oi.TransitionTier, TierName: oi.TransitionedObject.Tier,
} }
if err := globalTierJournal.AddEntry(entry); err != nil { if err := globalTierJournal.AddEntry(entry); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
@ -316,7 +316,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo)
// getTransitionedObjectReader returns a reader from the transitioned tier. // getTransitionedObjectReader returns a reader from the transitioned tier.
func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, oi ObjectInfo, opts ObjectOptions) (gr *GetObjectReader, err error) { func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, oi ObjectInfo, opts ObjectOptions) (gr *GetObjectReader, err error) {
tgtClient, err := globalTierConfigMgr.getDriver(oi.TransitionTier) tgtClient, err := globalTierConfigMgr.getDriver(oi.TransitionedObject.Tier)
if err != nil { if err != nil {
return nil, fmt.Errorf("transition storage class not configured") return nil, fmt.Errorf("transition storage class not configured")
} }
@ -333,7 +333,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs
gopts.length = length gopts.length = length
} }
reader, err := tgtClient.Get(ctx, oi.transitionedObjName, remoteVersionID(oi.transitionVersionID), gopts) reader, err := tgtClient.Get(ctx, oi.TransitionedObject.Name, remoteVersionID(oi.TransitionedObject.VersionID), gopts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -557,7 +557,7 @@ func (fi FileInfo) IsRemote() bool {
// IsRemote returns true if this object version's contents are in its remote // IsRemote returns true if this object version's contents are in its remote
// tier. // tier.
func (oi ObjectInfo) IsRemote() bool { func (oi ObjectInfo) IsRemote() bool {
if oi.TransitionStatus != lifecycle.TransitionComplete { if oi.TransitionedObject.Status != lifecycle.TransitionComplete {
return false return false
} }
return !isRestoredObjectOnDisk(oi.UserDefined) return !isRestoredObjectOnDisk(oi.UserDefined)
@ -685,7 +685,7 @@ func (oi ObjectInfo) ToLifecycleOpts() lifecycle.ObjectOpts {
SuccessorModTime: oi.SuccessorModTime, SuccessorModTime: oi.SuccessorModTime,
RestoreOngoing: oi.RestoreOngoing, RestoreOngoing: oi.RestoreOngoing,
RestoreExpires: oi.RestoreExpires, RestoreExpires: oi.RestoreExpires,
TransitionStatus: oi.TransitionStatus, TransitionStatus: oi.TransitionedObject.Status,
RemoteTiersImmediately: globalDebugRemoteTiersImmediately, RemoteTiersImmediately: globalDebugRemoteTiersImmediately,
} }
} }

View file

@ -913,7 +913,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac
SuccessorModTime: meta.oi.SuccessorModTime, SuccessorModTime: meta.oi.SuccessorModTime,
RestoreOngoing: meta.oi.RestoreOngoing, RestoreOngoing: meta.oi.RestoreOngoing,
RestoreExpires: meta.oi.RestoreExpires, RestoreExpires: meta.oi.RestoreExpires,
TransitionStatus: meta.oi.TransitionStatus, TransitionStatus: meta.oi.TransitionedObject.Status,
RemoteTiersImmediately: globalDebugRemoteTiersImmediately, RemoteTiersImmediately: globalDebugRemoteTiersImmediately,
}) })
if i.debug { if i.debug {
@ -976,7 +976,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac
// applyTierObjSweep removes remote object pending deletion and the free-version // applyTierObjSweep removes remote object pending deletion and the free-version
// tracking this information. // tracking this information.
func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta actionMeta) { func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta actionMeta) {
if !meta.oi.tierFreeVersion { if !meta.oi.TransitionedObject.FreeVersion {
// nothing to be done // nothing to be done
return return
} }
@ -989,7 +989,7 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta
return err return err
} }
// Remove the remote object // Remove the remote object
err := deleteObjectFromRemoteTier(ctx, meta.oi.transitionedObjName, meta.oi.transitionVersionID, meta.oi.TransitionTier) err := deleteObjectFromRemoteTier(ctx, meta.oi.TransitionedObject.Name, meta.oi.TransitionedObject.VersionID, meta.oi.TransitionedObject.Tier)
if ignoreNotFoundErr(err) != nil { if ignoreNotFoundErr(err) != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return return
@ -1065,7 +1065,7 @@ func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj Ob
func applyTransitionAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) bool { func applyTransitionAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) bool {
srcOpts := ObjectOptions{} srcOpts := ObjectOptions{}
if obj.TransitionStatus == "" { if obj.TransitionedObject.Status == "" {
srcOpts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) srcOpts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket)
srcOpts.VersionID = obj.VersionID srcOpts.VersionID = obj.VersionID
// mark transition as pending // mark transition as pending
@ -1137,7 +1137,7 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay
// Apply object, object version, restored object or restored object version action on the given object // Apply object, object version, restored object or restored object version action on the given object
func applyExpiryRule(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject, applyOnVersion bool) bool { func applyExpiryRule(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject, applyOnVersion bool) bool {
if obj.TransitionStatus != "" { if obj.TransitionedObject.Status != "" {
return applyExpiryOnTransitionedObject(ctx, objLayer, obj, restoredObject) return applyExpiryOnTransitionedObject(ctx, objLayer, obj, restoredObject)
} }
return applyExpiryOnNonTransitionedObjects(ctx, objLayer, obj, applyOnVersion) return applyExpiryOnNonTransitionedObjects(ctx, objLayer, obj, applyOnVersion)

View file

@ -162,11 +162,13 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
objInfo.ReplicationStatus = replication.StatusType(fi.DeleteMarkerReplicationStatus) objInfo.ReplicationStatus = replication.StatusType(fi.DeleteMarkerReplicationStatus)
} }
objInfo.TransitionStatus = fi.TransitionStatus objInfo.TransitionedObject = TransitionedObject{
objInfo.transitionedObjName = fi.TransitionedObjName Name: fi.TransitionedObjName,
objInfo.transitionVersionID = fi.TransitionVersionID VersionID: fi.TransitionVersionID,
objInfo.tierFreeVersion = fi.TierFreeVersion() Status: fi.TransitionStatus,
objInfo.TransitionTier = fi.TransitionTier FreeVersion: fi.TierFreeVersion(),
Tier: fi.TransitionTier,
}
// etag/md5Sum has already been extracted. We need to // etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of // remove to avoid it from appearing as part of

View file

@ -273,12 +273,7 @@ type poolObjInfo struct {
Err error Err error
} }
// getPoolIdxExisting returns the (first) found object pool index containing an object. func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (idx int, err error) {
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
// If any other error is found, it is returned.
// The check is skipped if there is only one zone, and 0, nil is always returned in that case.
func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) {
if z.SinglePool() { if z.SinglePool() {
return 0, nil return 0, nil
} }
@ -294,7 +289,7 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
pinfo := poolObjInfo{ pinfo := poolObjInfo{
PoolIndex: i, PoolIndex: i,
} }
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{}) pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts)
poolObjInfos[i] = pinfo poolObjInfos[i] = pinfo
}(i, pool) }(i, pool)
} }
@ -331,6 +326,19 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
return -1, toObjectErr(errFileNotFound, bucket, object) return -1, toObjectErr(errFileNotFound, bucket, object)
} }
func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) {
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{NoLock: true})
}
// getPoolIdxExisting returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
// If any other error is found, it is returned.
// The check is skipped if there is only one zone, and 0, nil is always returned in that case.
func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) {
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{})
}
// getPoolIdx returns the found previous object and its corresponding pool idx, // getPoolIdx returns the found previous object and its corresponding pool idx,
// if none are found falls back to most available space pool. // if none are found falls back to most available space pool.
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
@ -837,27 +845,6 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
objSets.Add(objects[i].ObjectName) objSets.Add(objects[i].ObjectName)
} }
poolObjIdxMap := map[int][]ObjectToDelete{}
origIndexMap := map[int][]int{}
if !z.SinglePool() {
for j, obj := range objects {
idx, err := z.getPoolIdxExisting(ctx, bucket, obj.ObjectName)
if isErrObjectNotFound(err) {
derrs[j] = err
continue
}
if err != nil {
// Unhandled errors return right here.
for i := range derrs {
derrs[i] = err
}
return dobjects, derrs
}
poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj)
origIndexMap[idx] = append(origIndexMap[idx], j)
}
}
// Acquire a bulk write lock across 'objects' // Acquire a bulk write lock across 'objects'
multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...) multiDeleteLock := z.NewNSLock(bucket, objSets.ToSlice()...)
lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout) lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout)
@ -874,17 +861,65 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts) return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts)
} }
for idx, pool := range z.serverPools { // Fetch location of up to 10 objects concurrently.
objs := poolObjIdxMap[idx] poolObjIdxMap := map[int][]ObjectToDelete{}
orgIndexes := origIndexMap[idx] origIndexMap := map[int][]int{}
deletedObjects, errs := pool.DeleteObjects(ctx, bucket, objs, opts)
for i, derr := range errs { var mu sync.Mutex
if derr != nil { eg := errgroup.WithNErrs(len(objects)).WithConcurrency(10)
derrs[orgIndexes[i]] = derr cctx, cancel := eg.WithCancelOnError(ctx)
defer cancel()
for j, obj := range objects {
j := j
obj := obj
eg.Go(func() error {
idx, err := z.getPoolIdxExistingNoLock(cctx, bucket, obj.ObjectName)
if isErrObjectNotFound(err) {
derrs[j] = err
return nil
} }
dobjects[orgIndexes[i]] = deletedObjects[i] if err != nil {
} // unhandled errors return right here.
return err
}
mu.Lock()
poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj)
origIndexMap[idx] = append(origIndexMap[idx], j)
mu.Unlock()
return nil
}, j)
} }
if err := eg.WaitErr(); err != nil {
for i := range derrs {
derrs[i] = err
}
return dobjects, derrs
}
// Delete concurrently in all server pools.
var wg sync.WaitGroup
wg.Add(len(z.serverPools))
for idx, pool := range z.serverPools {
go func(idx int, pool *erasureSets) {
defer wg.Done()
objs := poolObjIdxMap[idx]
if len(objs) > 0 {
orgIndexes := origIndexMap[idx]
deletedObjects, errs := pool.DeleteObjects(ctx, bucket, objs, opts)
mu.Lock()
for i, derr := range errs {
if derr != nil {
derrs[orgIndexes[i]] = derr
}
dobjects[orgIndexes[i]] = deletedObjects[i]
}
mu.Unlock()
}
}(idx, pool)
}
wg.Wait()
return dobjects, derrs return dobjects, derrs
} }

View file

@ -113,16 +113,8 @@ type ObjectInfo struct {
// to a delete marker on an object. // to a delete marker on an object.
DeleteMarker bool DeleteMarker bool
// tierFreeVersion is true if this is a free-version // Transitioned object information
tierFreeVersion bool TransitionedObject TransitionedObject
// TransitionStatus indicates if transition is complete/pending
TransitionStatus string
// Name of transitioned object on remote tier
transitionedObjName string
// VersionID on the the remote tier
transitionVersionID string
// Name of remote tier object has transitioned to
TransitionTier string
// RestoreExpires indicates date a restored object expires // RestoreExpires indicates date a restored object expires
RestoreExpires time.Time RestoreExpires time.Time
@ -200,7 +192,7 @@ func (o ObjectInfo) Clone() (cinfo ObjectInfo) {
VersionID: o.VersionID, VersionID: o.VersionID,
IsLatest: o.IsLatest, IsLatest: o.IsLatest,
DeleteMarker: o.DeleteMarker, DeleteMarker: o.DeleteMarker,
TransitionStatus: o.TransitionStatus, TransitionedObject: o.TransitionedObject,
RestoreExpires: o.RestoreExpires, RestoreExpires: o.RestoreExpires,
RestoreOngoing: o.RestoreOngoing, RestoreOngoing: o.RestoreOngoing,
ContentType: o.ContentType, ContentType: o.ContentType,
@ -354,6 +346,15 @@ type ListMultipartsInfo struct {
EncodingType string // Not supported yet. EncodingType string // Not supported yet.
} }
// TransitionedObject transitioned object tier and status.
type TransitionedObject struct {
Name string
VersionID string
Tier string
FreeVersion bool
Status string
}
// DeletedObjectInfo - container for list objects versions deleted objects. // DeletedObjectInfo - container for list objects versions deleted objects.
type DeletedObjectInfo struct { type DeletedObjectInfo struct {
// Name of the bucket. // Name of the bucket.

View file

@ -219,6 +219,7 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts
// get ObjectOptions for PUT calls from encryption headers and metadata // get ObjectOptions for PUT calls from encryption headers and metadata
func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) {
versioned := globalBucketVersioningSys.Enabled(bucket) versioned := globalBucketVersioningSys.Enabled(bucket)
versionSuspended := globalBucketVersioningSys.Suspended(bucket)
vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID)) vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID))
if vid != "" && vid != nullVersionID { if vid != "" && vid != nullVersionID {
_, err := uuid.Parse(vid) _, err := uuid.Parse(vid)
@ -266,6 +267,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
UserDefined: metadata, UserDefined: metadata,
VersionID: vid, VersionID: vid,
Versioned: versioned, Versioned: versioned,
VersionSuspended: versionSuspended,
MTime: mtime, MTime: mtime,
}, nil }, nil
} }
@ -273,6 +275,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
opts, err = getOpts(ctx, r, bucket, object) opts, err = getOpts(ctx, r, bucket, object)
opts.VersionID = vid opts.VersionID = vid
opts.Versioned = versioned opts.Versioned = versioned
opts.VersionSuspended = versionSuspended
opts.UserDefined = metadata opts.UserDefined = metadata
return return
} }
@ -290,6 +293,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
UserDefined: metadata, UserDefined: metadata,
VersionID: vid, VersionID: vid,
Versioned: versioned, Versioned: versioned,
VersionSuspended: versionSuspended,
MTime: mtime, MTime: mtime,
}, nil }, nil
} }
@ -300,6 +304,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
} }
opts.VersionID = vid opts.VersionID = vid
opts.Versioned = versioned opts.Versioned = versioned
opts.VersionSuspended = versionSuspended
opts.MTime = mtime opts.MTime = mtime
return opts, nil return opts, nil
} }

View file

@ -1372,12 +1372,14 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
objInfo.ModTime = remoteObjInfo.LastModified objInfo.ModTime = remoteObjInfo.LastModified
} else { } else {
os := newObjSweeper(dstBucket, dstObject) os := newObjSweeper(dstBucket, dstObject).WithVersioning(dstOpts.Versioned, dstOpts.VersionSuspended)
// Get appropriate object info to identify the remote object to delete // Get appropriate object info to identify the remote object to delete
if !srcInfo.metadataOnly { if !srcInfo.metadataOnly {
goiOpts := os.GetOpts() goiOpts := os.GetOpts()
if goi, gerr := getObjectInfo(ctx, dstBucket, dstObject, goiOpts); gerr == nil { if !globalTierConfigMgr.Empty() {
os.SetTransitionState(goi) if goi, gerr := getObjectInfo(ctx, dstBucket, dstObject, goiOpts); gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
} }
} }
@ -1395,7 +1397,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
} }
// Remove the transitioned object whose object version is being overwritten. // Remove the transitioned object whose object version is being overwritten.
defer logger.LogIf(ctx, os.Sweep()) if !globalTierConfigMgr.Empty() {
logger.LogIf(ctx, os.Sweep())
}
} }
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
@ -1685,11 +1689,13 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
// Ensure that metadata does not contain sensitive information // Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata) crypto.RemoveSensitiveEntries(metadata)
os := newObjSweeper(bucket, object) os := newObjSweeper(bucket, object).WithVersioning(opts.Versioned, opts.VersionSuspended)
// Get appropriate object info to identify the remote object to delete if !globalTierConfigMgr.Empty() {
goiOpts := os.GetOpts() // Get appropriate object info to identify the remote object to delete
if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { goiOpts := os.GetOpts()
os.SetTransitionState(goi) if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
} }
// Create the object.. // Create the object..
@ -1752,7 +1758,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}) })
// Remove the transitioned object whose object version is being overwritten. // Remove the transitioned object whose object version is being overwritten.
logger.LogIf(ctx, os.Sweep()) if !globalTierConfigMgr.Empty() {
logger.LogIf(ctx, os.Sweep())
}
} }
// PutObjectExtractHandler - PUT Object extract is an extended API // PutObjectExtractHandler - PUT Object extract is an extended API
@ -3061,11 +3069,15 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
os := newObjSweeper(bucket, object) versioned := globalBucketVersioningSys.Enabled(bucket)
// Get appropriate object info to identify the remote object to delete suspended := globalBucketVersioningSys.Suspended(bucket)
goiOpts := os.GetOpts() os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended)
if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil { if !globalTierConfigMgr.Empty() {
os.SetTransitionState(goi) // Get appropriate object info to identify the remote object to delete
goiOpts := os.GetOpts()
if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
} }
setEventStreamHeaders(w) setEventStreamHeaders(w)
@ -3134,7 +3146,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}) })
// Remove the transitioned object whose object version is being overwritten. // Remove the transitioned object whose object version is being overwritten.
logger.LogIf(ctx, os.Sweep()) if !globalTierConfigMgr.Empty() {
logger.LogIf(ctx, os.Sweep())
}
} }
/// Delete objectAPIHandlers /// Delete objectAPIHandlers
@ -3164,11 +3178,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
return return
} }
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
if globalDNSConfig != nil { if globalDNSConfig != nil {
_, err := globalDNSConfig.Get(bucket) _, err := globalDNSConfig.Get(bucket)
if err != nil && err != dns.ErrNotImplemented { if err != nil && err != dns.ErrNotImplemented {
@ -3187,16 +3196,20 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
gerr error gerr error
) )
var goiOpts ObjectOptions getObjectInfo := objectAPI.GetObjectInfo
os := newObjSweeper(bucket, object).WithVersion(singleDelete(*r)) if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
os := newObjSweeper(bucket, object).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended)
// Mutations of objects on versioning suspended buckets // Mutations of objects on versioning suspended buckets
// affect its null version. Through opts below we select // affect its null version. Through opts below we select
// the null version's remote object to delete if // the null version's remote object to delete if
// transitioned. // transitioned.
goiOpts = os.GetOpts() goiOpts := os.GetOpts()
goi, gerr = getObjectInfo(ctx, bucket, object, goiOpts) goi, gerr = getObjectInfo(ctx, bucket, object, goiOpts)
if gerr == nil { if gerr == nil {
os.SetTransitionState(goi) os.SetTransitionState(goi.TransitionedObject)
} }
replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr) replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
@ -3309,8 +3322,9 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
} }
// Remove the transitioned object whose object version is being overwritten. // Remove the transitioned object whose object version is being overwritten.
logger.LogIf(ctx, os.Sweep()) if !globalTierConfigMgr.Empty() {
os.Sweep()
}
} }
// PutObjectLegalHoldHandler - set legal hold configuration to object, // PutObjectLegalHoldHandler - set legal hold configuration to object,
@ -3907,13 +3921,14 @@ func (api objectAPIHandlers) PostRestoreObjectHandler(w http.ResponseWriter, r *
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }
objInfo, err := getObjectInfo(ctx, bucket, object, opts) objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil { if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }
if objInfo.TransitionStatus != lifecycle.TransitionComplete { if objInfo.TransitionedObject.Status != lifecycle.TransitionComplete {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL)
return return
} }

View file

@ -18,11 +18,7 @@
package cmd package cmd
import ( import (
"net/http"
"strings"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
xhttp "github.com/minio/minio/internal/http"
) )
// objSweeper determines if a transitioned object needs to be removed from the remote tier. // objSweeper determines if a transitioned object needs to be removed from the remote tier.
@ -43,7 +39,7 @@ import (
type objSweeper struct { type objSweeper struct {
Object string Object string
Bucket string Bucket string
ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs VersionID string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs
Versioned bool Versioned bool
Suspended bool Suspended bool
TransitionStatus string TransitionStatus string
@ -55,46 +51,22 @@ type objSweeper struct {
// newObjSweeper returns an objSweeper for a given bucket and object. // newObjSweeper returns an objSweeper for a given bucket and object.
// It initializes the versioning information using bucket name. // It initializes the versioning information using bucket name.
func newObjSweeper(bucket, object string) *objSweeper { func newObjSweeper(bucket, object string) *objSweeper {
versioned := globalBucketVersioningSys.Enabled(bucket)
suspended := globalBucketVersioningSys.Suspended(bucket)
return &objSweeper{ return &objSweeper{
Object: object, Object: object,
Bucket: bucket, Bucket: bucket,
Versioned: versioned,
Suspended: suspended,
} }
} }
// versionIDer interface is used to fetch object versionIDer from disparate sources
// like http.Request and ObjectToDelete.
type versionIDer interface {
GetVersionID() string
}
// multiDelete is a type alias for ObjectToDelete to implement versionID
// interface
type multiDelete ObjectToDelete
// GetVersionID returns object version of an object to be deleted via
// multi-delete API.
func (md multiDelete) GetVersionID() string {
return md.VersionID
}
// singleDelete is a type alias for http.Request to implement versionID
// interface
type singleDelete http.Request
// GetVersionID returns object version of an object to be deleted via (simple)
// delete API. Note only when the versionID is set explicitly by the application
// will we return a non-empty versionID.
func (sd singleDelete) GetVersionID() string {
return strings.TrimSpace(sd.URL.Query().Get(xhttp.VersionID))
}
// WithVersion sets the version ID from v // WithVersion sets the version ID from v
func (os *objSweeper) WithVersion(v versionIDer) *objSweeper { func (os *objSweeper) WithVersion(vid string) *objSweeper {
os.ReqVersion = v.GetVersionID() os.VersionID = vid
return os
}
// WithVersioning sets bucket versioning for sweeper.
func (os *objSweeper) WithVersioning(versioned, suspended bool) *objSweeper {
os.Versioned = versioned
os.Suspended = suspended
return os return os
} }
@ -102,22 +74,22 @@ func (os *objSweeper) WithVersion(v versionIDer) *objSweeper {
// overwritten or deleted depending on bucket versioning status. // overwritten or deleted depending on bucket versioning status.
func (os *objSweeper) GetOpts() ObjectOptions { func (os *objSweeper) GetOpts() ObjectOptions {
opts := ObjectOptions{ opts := ObjectOptions{
VersionID: os.ReqVersion, VersionID: os.VersionID,
Versioned: os.Versioned, Versioned: os.Versioned,
VersionSuspended: os.Suspended, VersionSuspended: os.Suspended,
} }
if os.Suspended && os.ReqVersion == "" { if os.Suspended && os.VersionID == "" {
opts.VersionID = nullVersionID opts.VersionID = nullVersionID
} }
return opts return opts
} }
// SetTransitionState sets ILM transition related information from given info. // SetTransitionState sets ILM transition related information from given info.
func (os *objSweeper) SetTransitionState(info ObjectInfo) { func (os *objSweeper) SetTransitionState(info TransitionedObject) {
os.TransitionTier = info.TransitionTier os.TransitionTier = info.Tier
os.TransitionStatus = info.TransitionStatus os.TransitionStatus = info.Status
os.RemoteObject = info.transitionedObjName os.RemoteObject = info.Name
os.TransitionVersionID = info.transitionVersionID os.TransitionVersionID = info.VersionID
} }
// shouldRemoveRemoteObject determines if a transitioned object should be // shouldRemoveRemoteObject determines if a transitioned object should be
@ -140,7 +112,7 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) {
switch { switch {
case !os.Versioned, os.Suspended: // 1, 2.a, 2.b case !os.Versioned, os.Suspended: // 1, 2.a, 2.b
delTier = true delTier = true
case os.Versioned && os.ReqVersion != "": // 3.a case os.Versioned && os.VersionID != "": // 3.a
delTier = true delTier = true
} }
if delTier { if delTier {

View file

@ -115,6 +115,11 @@ func (config *TierConfigMgr) Add(ctx context.Context, tier madmin.TierConfig) er
return nil return nil
} }
// Empty returns if tier targets are empty
func (config *TierConfigMgr) Empty() bool {
return len(config.ListTiers()) == 0
}
// ListTiers lists remote tiers configured in this deployment. // ListTiers lists remote tiers configured in this deployment.
func (config *TierConfigMgr) ListTiers() []madmin.TierConfig { func (config *TierConfigMgr) ListTiers() []madmin.TierConfig {
config.RLock() config.RLock()