ilm: Remove object in HEAD/GET if having an applicable ILM rule (#11296)

Remove an object on the fly if there is a lifecycle rule with delete
expiry action for the corresponding object.
This commit is contained in:
Anis Elleuch 2021-02-01 18:52:11 +01:00 committed by GitHub
parent de4421d6a3
commit 65aa2bc614
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 331 additions and 188 deletions

View file

@ -591,15 +591,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
if hasLifecycleConfig && dobj.PurgeTransitioned == lifecycle.TransitionComplete { // clean up transitioned tier
action := lifecycle.DeleteAction
if dobj.VersionID != "" {
action = lifecycle.DeleteVersionAction
}
deleteTransitionedObject(ctx, newObjectLayerFn(), bucket, dobj.ObjectName, lifecycle.ObjectOpts{
Name: dobj.ObjectName,
VersionID: dobj.VersionID,
DeleteMarker: dobj.DeleteMarker,
}, action, true)
}, false, true)
}
}

View file

@ -65,6 +65,41 @@ func NewLifecycleSys() *LifecycleSys {
return &LifecycleSys{}
}
type expiryState struct {
expiryCh chan ObjectInfo
}
func (es *expiryState) queueExpiryTask(oi ObjectInfo) {
select {
case es.expiryCh <- oi:
default:
}
}
var (
globalExpiryState *expiryState
)
func newExpiryState() *expiryState {
es := &expiryState{
expiryCh: make(chan ObjectInfo, 10000),
}
go func() {
<-GlobalContext.Done()
close(es.expiryCh)
}()
return es
}
func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
globalExpiryState = newExpiryState()
go func() {
for oi := range globalExpiryState.expiryCh {
applyExpiryRule(ctx, objectAPI, oi, false)
}
}()
}
type transitionState struct {
// add future metrics here
transitionCh chan ObjectInfo
@ -246,7 +281,7 @@ func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
// 1. temporarily restored copies of objects (restored with the PostRestoreObject API) expired.
// 2. life cycle expiry date is met on the object.
// 3. Object is removed through DELETE api call
func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, action lifecycle.Action, isDeleteTierOnly bool) error {
func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, restoredObject, isDeleteTierOnly bool) error {
if lcOpts.TransitionStatus == "" && !isDeleteTierOnly {
return nil
}
@ -266,44 +301,41 @@ func deleteTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket
var opts ObjectOptions
opts.Versioned = globalBucketVersioningSys.Enabled(bucket)
opts.VersionID = lcOpts.VersionID
switch action {
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
if restoredObject {
// delete locally restored copy of object or object version
// from the source, while leaving metadata behind. The data on
// transitioned tier lies untouched and still accessible
opts.TransitionStatus = lcOpts.TransitionStatus
_, err = objectAPI.DeleteObject(ctx, bucket, object, opts)
return err
case lifecycle.DeleteAction, lifecycle.DeleteVersionAction:
// When an object is past expiry, delete the data from transitioned tier and
// metadata from source
if err := tgt.RemoveObject(context.Background(), arn.Bucket, object, miniogo.RemoveObjectOptions{VersionID: lcOpts.VersionID}); err != nil {
logger.LogIf(ctx, err)
}
if isDeleteTierOnly {
return nil
}
objInfo, err := objectAPI.DeleteObject(ctx, bucket, object, opts)
if err != nil {
return err
}
eventName := event.ObjectRemovedDelete
if lcOpts.DeleteMarker {
eventName = event.ObjectRemovedDeleteMarkerCreated
}
// Notify object deleted event.
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [ILM-EXPIRY]",
})
}
// When an object is past expiry, delete the data from transitioned tier and
// metadata from source
if err := tgt.RemoveObject(context.Background(), arn.Bucket, object, miniogo.RemoveObjectOptions{VersionID: lcOpts.VersionID}); err != nil {
logger.LogIf(ctx, err)
}
if isDeleteTierOnly {
return nil
}
objInfo, err := objectAPI.DeleteObject(ctx, bucket, object, opts)
if err != nil {
return err
}
eventName := event.ObjectRemovedDelete
if lcOpts.DeleteMarker {
eventName = event.ObjectRemovedDeleteMarkerCreated
}
// Notify object deleted event.
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [ILM-EXPIRY]",
})
// should never reach here
return nil
}

View file

@ -785,12 +785,12 @@ func (i *crawlItem) transformMetaDir() {
// actionMeta contains information used to apply actions.
type actionMeta struct {
oi ObjectInfo
successorModTime time.Time // The modtime of the successor version
numVersions int // The number of versions of this object
bitRotScan bool // indicates if bitrot check was requested.
oi ObjectInfo
bitRotScan bool // indicates if bitrot check was requested.
}
var applyActionsLogPrefix = color.Green("applyActions:")
// applyActions will apply lifecycle checks on to a scanned item.
// The resulting size on disk will always be returned.
// The metadata will be compared to consensus on the object layer before any changes are applied.
@ -800,7 +800,6 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action
if i.debug {
logger.LogIf(ctx, err)
}
applyActionsLogPrefix := color.Green("applyActions:")
if i.heal {
if i.debug {
if meta.oi.VersionID != "" {
@ -839,8 +838,8 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action
VersionID: meta.oi.VersionID,
DeleteMarker: meta.oi.DeleteMarker,
IsLatest: meta.oi.IsLatest,
NumVersions: meta.numVersions,
SuccessorModTime: meta.successorModTime,
NumVersions: meta.oi.NumVersions,
SuccessorModTime: meta.oi.SuccessorModTime,
RestoreOngoing: meta.oi.RestoreOngoing,
RestoreExpires: meta.oi.RestoreExpires,
TransitionStatus: meta.oi.TransitionStatus,
@ -884,96 +883,129 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action
return size
}
}
size = obj.Size
// Recalculate action.
var applied bool
action = evalActionFromLifecycle(ctx, *i.lifeCycle, obj, i.debug)
if action != lifecycle.NoneAction {
applied = applyLifecycleAction(ctx, action, o, obj)
}
if applied {
return 0
}
return size
}
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj ObjectInfo, debug bool) (action lifecycle.Action) {
lcOpts := lifecycle.ObjectOpts{
Name: i.objectPath(),
Name: obj.Name,
UserTags: obj.UserTags,
ModTime: obj.ModTime,
VersionID: obj.VersionID,
DeleteMarker: obj.DeleteMarker,
IsLatest: obj.IsLatest,
NumVersions: meta.numVersions,
SuccessorModTime: meta.successorModTime,
NumVersions: obj.NumVersions,
SuccessorModTime: obj.SuccessorModTime,
RestoreOngoing: obj.RestoreOngoing,
RestoreExpires: obj.RestoreExpires,
TransitionStatus: obj.TransitionStatus,
}
action = i.lifeCycle.ComputeAction(lcOpts)
if i.debug {
action = lc.ComputeAction(lcOpts)
if debug {
console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", action)
}
switch action {
case lifecycle.DeleteAction, lifecycle.DeleteVersionAction:
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
default:
// No action.
return size
if action == lifecycle.NoneAction {
return action
}
opts := ObjectOptions{}
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredVersionAction:
// Defensive code, should never happen
if obj.VersionID == "" {
return size
return lifecycle.NoneAction
}
if rcfg, _ := globalBucketObjectLockSys.Get(i.bucket); rcfg.LockEnabled {
if rcfg, _ := globalBucketObjectLockSys.Get(obj.Bucket); rcfg.LockEnabled {
locked := enforceRetentionForDeletion(ctx, obj)
if locked {
if i.debug {
if debug {
if obj.VersionID != "" {
console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", i.objectPath(), obj.VersionID)
console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
} else {
console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", i.objectPath())
console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
}
}
return size
return lifecycle.NoneAction
}
}
}
return action
}
func applyTransitionAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) bool {
opts := ObjectOptions{}
if obj.TransitionStatus == "" {
opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket)
opts.VersionID = obj.VersionID
case lifecycle.DeleteAction, lifecycle.DeleteRestoredAction:
opts.Versioned = globalBucketVersioningSys.Enabled(i.bucket)
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
if obj.TransitionStatus == "" {
opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket)
opts.VersionID = obj.VersionID
opts.TransitionStatus = lifecycle.TransitionPending
if _, err = o.DeleteObject(ctx, obj.Bucket, obj.Name, opts); err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0
}
// Assume it is still there.
logger.LogIf(ctx, err)
return size
}
}
globalTransitionState.queueTransitionTask(obj)
return 0
}
if obj.TransitionStatus != "" {
if err := deleteTransitionedObject(ctx, o, i.bucket, i.objectPath(), lcOpts, action, false); err != nil {
opts.TransitionStatus = lifecycle.TransitionPending
if _, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts); err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0
return false
}
// Assume it is still there.
logger.LogIf(ctx, err)
return size
return false
}
// Notification already sent at *deleteTransitionedObject*, return '0' here.
return 0
}
globalTransitionState.queueTransitionTask(obj)
return true
}
func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject bool) bool {
lcOpts := lifecycle.ObjectOpts{
Name: obj.Name,
UserTags: obj.UserTags,
ModTime: obj.ModTime,
VersionID: obj.VersionID,
DeleteMarker: obj.DeleteMarker,
IsLatest: obj.IsLatest,
NumVersions: obj.NumVersions,
SuccessorModTime: obj.SuccessorModTime,
RestoreOngoing: obj.RestoreOngoing,
RestoreExpires: obj.RestoreExpires,
TransitionStatus: obj.TransitionStatus,
}
obj, err = o.DeleteObject(ctx, i.bucket, i.objectPath(), opts)
if err := deleteTransitionedObject(ctx, objLayer, obj.Bucket, obj.Name, lcOpts, restoredObject, false); err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return false
}
logger.LogIf(ctx, err)
return false
}
// Notification already sent at *deleteTransitionedObject*, just return 'true' here.
return true
}
func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo) bool {
opts := ObjectOptions{}
opts.VersionID = obj.VersionID
if opts.VersionID == "" {
opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket)
}
obj, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts)
if err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0
return false
}
// Assume it is still there.
logger.LogIf(ctx, err)
return size
return false
}
eventName := event.ObjectRemovedDelete
@ -984,11 +1016,33 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action
// Notify object deleted event.
sendEvent(eventArgs{
EventName: eventName,
BucketName: i.bucket,
BucketName: obj.Bucket,
Object: obj,
Host: "Internal: [ILM-EXPIRY]",
})
return 0
return true
}
// Apply object, object version, restored object or restored object version action on the given object
func applyExpiryRule(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject bool) bool {
if obj.TransitionStatus != "" {
return applyExpiryOnTransitionedObject(ctx, objLayer, obj, restoredObject)
}
return applyExpiryOnNonTransitionedObjects(ctx, objLayer, obj)
}
// Perform actions (removal of transitioning of objects), return true the action is successfully performed
func applyLifecycleAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) (success bool) {
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
success = applyExpiryRule(ctx, objLayer, obj, false)
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
success = applyExpiryRule(ctx, objLayer, obj, true)
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
success = applyTransitionAction(ctx, action, objLayer, obj)
}
return
}
// objectPath returns the prefix and object name.

View file

@ -107,18 +107,21 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
}
objInfo := ObjectInfo{
IsDir: HasSuffix(object, SlashSeparator),
Bucket: bucket,
Name: object,
VersionID: versionID,
IsLatest: fi.IsLatest,
DeleteMarker: fi.Deleted,
Size: fi.Size,
ModTime: fi.ModTime,
Legacy: fi.XLV1,
ContentType: fi.Metadata["content-type"],
ContentEncoding: fi.Metadata["content-encoding"],
IsDir: HasSuffix(object, SlashSeparator),
Bucket: bucket,
Name: object,
VersionID: versionID,
IsLatest: fi.IsLatest,
DeleteMarker: fi.Deleted,
Size: fi.Size,
ModTime: fi.ModTime,
Legacy: fi.XLV1,
ContentType: fi.Metadata["content-type"],
ContentEncoding: fi.Metadata["content-encoding"],
NumVersions: fi.NumVersions,
SuccessorModTime: fi.SuccessorModTime,
}
// Update expires
var (
t time.Time

View file

@ -237,6 +237,11 @@ type ObjectInfo struct {
backendType BackendType
VersionPurgeStatus VersionPurgeStatusType
// The total count of all versions of this object
NumVersions int
// The modtime of the successor object version if any
SuccessorModTime time.Time
}
// MultipartInfo captures metadata information about the uploadId

View file

@ -431,6 +431,16 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
objInfo := gr.ObjInfo
// Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
action := evalActionFromLifecycle(ctx, *lc, objInfo, false)
if action == lifecycle.DeleteAction || action == lifecycle.DeleteVersionAction {
globalExpiryState.queueExpiryTask(objInfo)
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
@ -590,6 +600,16 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
return
}
// Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
action := evalActionFromLifecycle(ctx, *lc, objInfo, false)
if action == lifecycle.DeleteAction || action == lifecycle.DeleteVersionAction {
globalExpiryState.queueExpiryTask(objInfo)
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
@ -2820,10 +2840,6 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
}
if goi.TransitionStatus == lifecycle.TransitionComplete { // clean up transitioned tier
action := lifecycle.DeleteAction
if goi.VersionID != "" {
action = lifecycle.DeleteVersionAction
}
deleteTransitionedObject(ctx, newObjectLayerFn(), bucket, object, lifecycle.ObjectOpts{
Name: object,
UserTags: goi.UserTags,
@ -2831,7 +2847,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
DeleteMarker: goi.DeleteMarker,
TransitionStatus: goi.TransitionStatus,
IsLatest: goi.IsLatest,
}, action, true)
}, false, true)
}
setPutObjHeaders(w, objInfo, true)

View file

@ -494,6 +494,7 @@ func serverMain(ctx *cli.Context) {
initAutoHeal(GlobalContext, newObject)
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
initBackgroundExpiry(GlobalContext, newObject)
}
initDataCrawler(GlobalContext, newObject)

View file

@ -155,6 +155,9 @@ type FileInfo struct {
VersionPurgeStatus VersionPurgeStatusType
Data []byte // optionally carries object data
NumVersions int
SuccessorModTime time.Time
}
// VersionPurgeStatusKey denotes purge status in metadata

View file

@ -245,8 +245,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 18 {
err = msgp.ArrayError{Wanted: 18, Got: zb0001}
if zb0001 != 20 {
err = msgp.ArrayError{Wanted: 20, Got: zb0001}
return
}
z.Volume, err = dc.ReadString()
@ -380,13 +380,23 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Data")
return
}
z.NumVersions, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "NumVersions")
return
}
z.SuccessorModTime, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "SuccessorModTime")
return
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 18
err = en.Append(0xdc, 0x0, 0x12)
// array header, size 20
err = en.Append(0xdc, 0x0, 0x14)
if err != nil {
return
}
@ -499,14 +509,24 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Data")
return
}
err = en.WriteInt(z.NumVersions)
if err != nil {
err = msgp.WrapError(err, "NumVersions")
return
}
err = en.WriteTime(z.SuccessorModTime)
if err != nil {
err = msgp.WrapError(err, "SuccessorModTime")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 18
o = append(o, 0xdc, 0x0, 0x12)
// array header, size 20
o = append(o, 0xdc, 0x0, 0x14)
o = msgp.AppendString(o, z.Volume)
o = msgp.AppendString(o, z.Name)
o = msgp.AppendString(o, z.VersionID)
@ -540,6 +560,8 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus)
o = msgp.AppendString(o, string(z.VersionPurgeStatus))
o = msgp.AppendBytes(o, z.Data)
o = msgp.AppendInt(o, z.NumVersions)
o = msgp.AppendTime(o, z.SuccessorModTime)
return
}
@ -551,8 +573,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 18 {
err = msgp.ArrayError{Wanted: 18, Got: zb0001}
if zb0001 != 20 {
err = msgp.ArrayError{Wanted: 20, Got: zb0001}
return
}
z.Volume, bts, err = msgp.ReadStringBytes(bts)
@ -686,6 +708,16 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Data")
return
}
z.NumVersions, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "NumVersions")
return
}
z.SuccessorModTime, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "SuccessorModTime")
return
}
o = bts
return
}
@ -703,7 +735,7 @@ func (z *FileInfo) Msgsize() (s int) {
for za0003 := range z.Parts {
s += z.Parts[za0003].Msgsize()
}
s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data)
s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) + msgp.IntSize + msgp.TimeSize
return
}

View file

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v25" // Add more small file optimization
storageRESTVersion = "v26" // Add NumVersions/SuccessorModTime fields in FileInfo
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)

View file

@ -777,17 +777,13 @@ next:
scheduleReplicationDelete(ctx, dobj, objectAPI, replicateSync)
}
if goi.TransitionStatus == lifecycle.TransitionComplete && err == nil && goi.VersionID == "" {
action := lifecycle.DeleteAction
if goi.VersionID != "" {
action = lifecycle.DeleteVersionAction
}
deleteTransitionedObject(ctx, newObjectLayerFn(), args.BucketName, objectName, lifecycle.ObjectOpts{
Name: objectName,
UserTags: goi.UserTags,
VersionID: goi.VersionID,
DeleteMarker: goi.DeleteMarker,
IsLatest: goi.IsLatest,
}, action, true)
}, false, true)
}
logger.LogIf(ctx, err)

View file

@ -652,6 +652,18 @@ func (z xlMetaV2) ListVersions(volume, path string) (versions []FileInfo, modTim
return versions, latestModTime, nil
}
func getModTimeFromVersion(v xlMetaV2Version) time.Time {
switch v.Type {
case ObjectType:
return time.Unix(0, v.ObjectV2.ModTime)
case DeleteType:
return time.Unix(0, v.DeleteMarker.ModTime)
case LegacyType:
return v.ObjectV1.Stat.ModTime
}
return time.Time{}
}
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure
// for consumption across callers.
func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err error) {
@ -663,52 +675,6 @@ func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e
}
}
var latestModTime time.Time
var latestIndex int
for i, version := range z.Versions {
if !version.Valid() {
logger.LogIf(GlobalContext, fmt.Errorf("invalid version detected %#v", version))
return FileInfo{}, errFileNotFound
}
var modTime time.Time
switch version.Type {
case ObjectType:
modTime = time.Unix(0, version.ObjectV2.ModTime)
case DeleteType:
modTime = time.Unix(0, version.DeleteMarker.ModTime)
case LegacyType:
modTime = version.ObjectV1.Stat.ModTime
}
if modTime.After(latestModTime) {
latestModTime = modTime
latestIndex = i
}
}
if versionID == "" {
if len(z.Versions) >= 1 {
if !z.Versions[latestIndex].Valid() {
logger.LogIf(GlobalContext, fmt.Errorf("invalid version detected %#v", z.Versions[latestIndex]))
return FileInfo{}, errFileNotFound
}
switch z.Versions[latestIndex].Type {
case ObjectType:
fi, err = z.Versions[latestIndex].ObjectV2.ToFileInfo(volume, path)
fi.IsLatest = true
return fi, err
case DeleteType:
fi, err = z.Versions[latestIndex].DeleteMarker.ToFileInfo(volume, path)
fi.IsLatest = true
return fi, err
case LegacyType:
fi, err = z.Versions[latestIndex].ObjectV1.ToFileInfo(volume, path)
fi.IsLatest = true
return fi, err
}
}
return FileInfo{}, errFileNotFound
}
for _, version := range z.Versions {
if !version.Valid() {
logger.LogIf(GlobalContext, fmt.Errorf("invalid version detected %#v", version))
@ -716,29 +682,75 @@ func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e
return FileInfo{}, errFileNotFound
}
return FileInfo{}, errFileVersionNotFound
}
}
orderedVersions := make([]xlMetaV2Version, len(z.Versions))
copy(orderedVersions, z.Versions)
sort.Slice(orderedVersions, func(i, j int) bool {
mtime1 := getModTimeFromVersion(orderedVersions[i])
mtime2 := getModTimeFromVersion(orderedVersions[j])
return mtime1.After(mtime2)
})
if versionID == "" {
if len(orderedVersions) >= 1 {
switch orderedVersions[0].Type {
case ObjectType:
fi, err = orderedVersions[0].ObjectV2.ToFileInfo(volume, path)
case DeleteType:
fi, err = orderedVersions[0].DeleteMarker.ToFileInfo(volume, path)
case LegacyType:
fi, err = orderedVersions[0].ObjectV1.ToFileInfo(volume, path)
}
fi.IsLatest = true
fi.NumVersions = len(orderedVersions)
return fi, err
}
return FileInfo{}, errFileNotFound
}
var i = -1
var version xlMetaV2Version
findVersion:
for i, version = range orderedVersions {
switch version.Type {
case ObjectType:
if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) {
fi, err = version.ObjectV2.ToFileInfo(volume, path)
fi.IsLatest = latestModTime.Equal(fi.ModTime)
return fi, err
break findVersion
}
case LegacyType:
if version.ObjectV1.VersionID == versionID {
fi, err = version.ObjectV1.ToFileInfo(volume, path)
fi.IsLatest = latestModTime.Equal(fi.ModTime)
return fi, err
break findVersion
}
case DeleteType:
if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) {
fi, err = version.DeleteMarker.ToFileInfo(volume, path)
fi.IsLatest = latestModTime.Equal(fi.ModTime)
return fi, err
break findVersion
}
}
}
if err != nil {
return fi, err
}
if i >= 0 {
// A version is found, fill dynamic fields
fi.IsLatest = i == 0
fi.NumVersions = len(z.Versions)
if i < len(orderedVersions)-1 {
fi.SuccessorModTime = getModTimeFromVersion(orderedVersions[i+1])
}
return fi, nil
}
if versionID == "" {
return FileInfo{}, errFileNotFound
}

View file

@ -377,21 +377,14 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
}
var totalSize int64
var numVersions = len(fivs.Versions)
sizeS := sizeSummary{}
for i, version := range fivs.Versions {
var successorModTime time.Time
if i > 0 {
successorModTime = fivs.Versions[i-1].ModTime
}
for _, version := range fivs.Versions {
oi := version.ToObjectInfo(item.bucket, item.objectPath())
if objAPI != nil {
totalSize += item.applyActions(ctx, objAPI, actionMeta{
numVersions: numVersions,
successorModTime: successorModTime,
oi: oi,
bitRotScan: healOpts.Bitrot,
oi: oi,
bitRotScan: healOpts.Bitrot,
})
item.healReplication(ctx, objAPI, oi, &sizeS)
}