From 30a3921d3e9df97ce6495056ed33c96975d5cb13 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Thu, 3 Jun 2021 14:26:51 -0700 Subject: [PATCH] [Tiering] Support remote tiers with object versioning (#12342) - Adds versioning support for S3 based remote tiers that have versioning enabled. This ensures that when reading or deleting we specify the specific version ID of the object. In case of deletion, this is important to ensure that the object version is actually deleted instead of simply being marked for deletion. - Stores the remote object's version id in the tier-journal. Tier-journal file version is not bumped up as serializing the new struct version is compatible with old journals without the remote object version id. - `storageRESTVersion` is bumped up as FileInfo struct now includes a `TransitionRemoteVersionID` member. - Azure and GCS support for this feature will be added subsequently. Co-authored-by: Krishnan Parthasarathi --- cmd/bucket-lifecycle.go | 36 +++++--- cmd/data-scanner.go | 2 +- cmd/erasure-metadata.go | 1 + cmd/erasure-object.go | 4 +- cmd/object-api-datatypes.go | 2 + cmd/storage-datatypes.go | 3 + cmd/storage-datatypes_gen.go | 34 ++++++-- cmd/storage-rest-common.go | 2 +- cmd/tier-journal.go | 31 +++++-- cmd/tier-journal_gen.go | 161 ++++++++++++++++++++++++++++++++++- cmd/tier-journal_gen_test.go | 113 ++++++++++++++++++++++++ cmd/tier-journal_test.go | 121 ++++++++++++++++++++++++++ cmd/tier-sweeper.go | 24 ++++-- cmd/warm-backend-azure.go | 16 ++-- cmd/warm-backend-gcs.go | 14 +-- cmd/warm-backend-s3.go | 19 +++-- cmd/warm-backend.go | 16 ++-- cmd/xl-storage-format-v2.go | 7 ++ 18 files changed, 536 insertions(+), 70 deletions(-) create mode 100644 cmd/tier-journal_test.go diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index ba3e62cef..05f229301 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -46,6 +46,8 @@ const ( TransitionStatus = "transition-status" // TransitionedObjectName name of transitioned object TransitionedObjectName = "transitioned-object" + // TransitionedVersionID is version of remote object + TransitionedVersionID = "transitioned-versionID" // TransitionTier name of transition storage class TransitionTier = "transition-tier" ) @@ -208,41 +210,46 @@ const ( // // 1. when a restored (via PostRestoreObject API) object expires. // 2. when a transitioned object expires (based on an ILM rule). -func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, remoteObject, tier string, action expireAction) error { +func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *ObjectInfo, lcOpts lifecycle.ObjectOpts, action expireAction) error { var opts ObjectOptions - opts.Versioned = globalBucketVersioningSys.Enabled(bucket) + opts.Versioned = globalBucketVersioningSys.Enabled(oi.Bucket) opts.VersionID = lcOpts.VersionID switch action { case expireObj: // When an object is past expiry or when a transitioned object is being // deleted, 'mark' the data in the remote tier for delete. - if err := globalTierJournal.AddEntry(jentry{ObjName: remoteObject, TierName: tier}); err != nil { + entry := jentry{ + ObjName: oi.transitionedObjName, + VersionID: oi.transitionVersionID, + TierName: oi.TransitionTier, + } + if err := globalTierJournal.AddEntry(entry); err != nil { logger.LogIf(ctx, err) return err } // Delete metadata on source, now that data in remote tier has been // marked for deletion. - if _, err := objectAPI.DeleteObject(ctx, bucket, object, opts); err != nil { + if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil { logger.LogIf(ctx, err) return err } // Send audit for the lifecycle delete operation - auditLogLifecycle(ctx, bucket, object) + auditLogLifecycle(ctx, oi.Bucket, oi.Name) eventName := event.ObjectRemovedDelete if lcOpts.DeleteMarker { eventName = event.ObjectRemovedDeleteMarkerCreated } objInfo := ObjectInfo{ - Name: object, + Name: oi.Name, VersionID: lcOpts.VersionID, DeleteMarker: lcOpts.DeleteMarker, } // Notify object deleted event. sendEvent(eventArgs{ EventName: eventName, - BucketName: bucket, + BucketName: oi.Bucket, Object: objInfo, Host: "Internal: [ILM-EXPIRY]", }) @@ -252,7 +259,7 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket // from the source, while leaving metadata behind. The data on // transitioned tier lies untouched and still accessible opts.Transition.ExpireRestored = true - _, err := objectAPI.DeleteObject(ctx, bucket, object, opts) + _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts) return err default: return fmt.Errorf("Unknown expire action %v", action) @@ -286,11 +293,12 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) UserTags: oi.UserTags, } tierName := getLifeCycleTransitionTier(ctx, lc, oi.Bucket, lcOpts) - opts := ObjectOptions{Transition: TransitionOptions{ - Status: lifecycle.TransitionPending, - Tier: tierName, - ETag: oi.ETag, - }, + opts := ObjectOptions{ + Transition: TransitionOptions{ + Status: lifecycle.TransitionPending, + Tier: tierName, + ETag: oi.ETag, + }, VersionID: oi.VersionID, Versioned: globalBucketVersioningSys.Enabled(oi.Bucket), MTime: oi.ModTime, @@ -327,7 +335,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs gopts.length = length } - reader, err := tgtClient.Get(ctx, oi.transitionedObjName, gopts) + reader, err := tgtClient.Get(ctx, oi.transitionedObjName, remoteVersionID(oi.transitionVersionID), gopts) if err != nil { return nil, err } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 7bd21b9ba..78d6a2f4f 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1068,7 +1068,7 @@ func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer, if restoredObject { action = expireRestoredObj } - if err := expireTransitionedObject(ctx, objLayer, obj.Bucket, obj.Name, lcOpts, obj.transitionedObjName, obj.TransitionTier, action); err != nil { + if err := expireTransitionedObject(ctx, objLayer, &obj, lcOpts, action); err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { return false } diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 0a39c5b62..b44f39444 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -155,6 +155,7 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { objInfo.TransitionStatus = fi.TransitionStatus objInfo.transitionedObjName = fi.TransitionedObjName + objInfo.transitionVersionID = fi.TransitionVersionID objInfo.TransitionTier = fi.TransitionTier // etag/md5Sum has already been extracted. We need to diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 966697989..51633c9a9 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1348,7 +1348,8 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st pw.CloseWithError(err) }() - err = tgtClient.Put(ctx, destObj, pr, fi.Size) + var rv remoteVersionID + rv, err = tgtClient.Put(ctx, destObj, pr, fi.Size) pr.CloseWithError(err) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err)) @@ -1357,6 +1358,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st fi.TransitionStatus = lifecycle.TransitionComplete fi.TransitionedObjName = destObj fi.TransitionTier = opts.Transition.Tier + fi.TransitionVersionID = string(rv) eventName := event.ObjectTransitionComplete storageDisks := er.getDisks() diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 3538da6bd..511506d9d 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -117,6 +117,8 @@ type ObjectInfo struct { TransitionStatus string // Name of transitioned object on remote tier transitionedObjName string + // VersionID on the the remote tier + transitionVersionID string // Name of remote tier object has transitioned to TransitionTier string diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 73daad7aa..e212de928 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -138,6 +138,9 @@ type FileInfo struct { TransitionedObjName string // TransitionTier is the storage class label assigned to remote tier. TransitionTier string + // TransitionVersionID stores a version ID of the object associate + // with the remote tier. + TransitionVersionID string // ExpireRestored indicates that the restored object is to be expired. ExpireRestored bool diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 073255188..875bcec75 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -550,8 +550,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 23 { - err = msgp.ArrayError{Wanted: 23, Got: zb0001} + if zb0001 != 24 { + err = msgp.ArrayError{Wanted: 24, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -594,6 +594,11 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "TransitionTier") return } + z.TransitionVersionID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TransitionVersionID") + return + } z.ExpireRestored, err = dc.ReadBool() if err != nil { err = msgp.WrapError(err, "ExpireRestored") @@ -715,8 +720,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 23 - err = en.Append(0xdc, 0x0, 0x17) + // array header, size 24 + err = en.Append(0xdc, 0x0, 0x18) if err != nil { return } @@ -760,6 +765,11 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "TransitionTier") return } + err = en.WriteString(z.TransitionVersionID) + if err != nil { + err = msgp.WrapError(err, "TransitionVersionID") + return + } err = en.WriteBool(z.ExpireRestored) if err != nil { err = msgp.WrapError(err, "ExpireRestored") @@ -860,8 +870,8 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 23 - o = append(o, 0xdc, 0x0, 0x17) + // array header, size 24 + o = append(o, 0xdc, 0x0, 0x18) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.VersionID) @@ -870,6 +880,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, z.TransitionStatus) o = msgp.AppendString(o, z.TransitionedObjName) o = msgp.AppendString(o, z.TransitionTier) + o = msgp.AppendString(o, z.TransitionVersionID) o = msgp.AppendBool(o, z.ExpireRestored) o = msgp.AppendString(o, z.DataDir) o = msgp.AppendBool(o, z.XLV1) @@ -911,8 +922,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 23 { - err = msgp.ArrayError{Wanted: 23, Got: zb0001} + if zb0001 != 24 { + err = msgp.ArrayError{Wanted: 24, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -955,6 +966,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "TransitionTier") return } + z.TransitionVersionID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TransitionVersionID") + return + } z.ExpireRestored, bts, err = msgp.ReadBoolBytes(bts) if err != nil { err = msgp.WrapError(err, "ExpireRestored") @@ -1077,7 +1093,7 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *FileInfo) Msgsize() (s int) { - s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.TransitionStatus) + msgp.StringPrefixSize + len(z.TransitionedObjName) + msgp.StringPrefixSize + len(z.TransitionTier) + msgp.BoolSize + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize + s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.TransitionStatus) + msgp.StringPrefixSize + len(z.TransitionedObjName) + msgp.StringPrefixSize + len(z.TransitionTier) + msgp.StringPrefixSize + len(z.TransitionVersionID) + msgp.BoolSize + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize if z.Metadata != nil { for za0001, za0002 := range z.Metadata { _ = za0002 diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 9ce15f03f..62f8ee7db 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - storageRESTVersion = "v35" // Inline bugfix needs all servers to be updated + storageRESTVersion = "v36" // Changes to FileInfo for tier-journal storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/tier-journal.go b/cmd/tier-journal.go index 87aa5af7d..97479f2c5 100644 --- a/cmd/tier-journal.go +++ b/cmd/tier-journal.go @@ -42,8 +42,9 @@ type tierJournal struct { } type jentry struct { - ObjName string `msg:"obj"` - TierName string `msg:"tier"` + ObjName string `msg:"obj"` + VersionID string `msg:"vid"` + TierName string `msg:"tier"` } const ( @@ -51,6 +52,10 @@ const ( tierJournalHdrLen = 2 // 2 bytes ) +var ( + errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version") +) + func initTierDeletionJournal(done <-chan struct{}) (*tierJournal, error) { diskPath := globalEndpoints.FirstLocalDiskPath() j := &tierJournal{ @@ -84,7 +89,7 @@ func (j *tierJournal) rotate() error { return j.Open() } -type walkFn func(objName, tierName string) error +type walkFn func(objName, rvID, tierName string) error func (j *tierJournal) ReadOnlyPath() string { return filepath.Join(j.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin") @@ -111,6 +116,7 @@ func (j *tierJournal) WalkEntries(fn walkFn) { } defer ro.Close() mr := msgp.NewReader(ro) + done := false for { var entry jentry @@ -123,9 +129,11 @@ func (j *tierJournal) WalkEntries(fn walkFn) { logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to decode journal entry %s", err)) break } - err = fn(entry.ObjName, entry.TierName) + err = fn(entry.ObjName, entry.VersionID, entry.TierName) if err != nil && !isErrObjectNotFound(err) { logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err)) + // We add the entry into the active journal to try again + // later. j.AddEntry(entry) } } @@ -134,12 +142,12 @@ func (j *tierJournal) WalkEntries(fn walkFn) { } } -func deleteObjectFromRemoteTier(objName, tierName string) error { +func deleteObjectFromRemoteTier(objName, rvID, tierName string) error { w, err := globalTierConfigMgr.getDriver(tierName) if err != nil { return err } - err = w.Remove(context.Background(), objName) + err = w.Remove(context.Background(), objName, remoteVersionID(rvID)) if err != nil { return err } @@ -263,8 +271,15 @@ func (j *tierJournal) OpenRO() (io.ReadCloser, error) { switch binary.LittleEndian.Uint16(data[:]) { case tierJournalVersion: + return file, nil default: - return nil, errors.New("unsupported pending deletes journal version") + return nil, errUnsupportedJournalVersion } - return file, nil +} + +// jentryV1 represents the entry in the journal before RemoteVersionID was +// added. It remains here for use in tests for the struct element addition. +type jentryV1 struct { + ObjName string `msg:"obj"` + TierName string `msg:"tier"` } diff --git a/cmd/tier-journal_gen.go b/cmd/tier-journal_gen.go index 17fba8488..f62c3af13 100644 --- a/cmd/tier-journal_gen.go +++ b/cmd/tier-journal_gen.go @@ -8,6 +8,159 @@ import ( // DecodeMsg implements msgp.Decodable func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "obj": + z.ObjName, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ObjName") + return + } + case "vid": + z.VersionID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "tier": + z.TierName, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TierName") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z jentry) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "obj" + err = en.Append(0x83, 0xa3, 0x6f, 0x62, 0x6a) + if err != nil { + return + } + err = en.WriteString(z.ObjName) + if err != nil { + err = msgp.WrapError(err, "ObjName") + return + } + // write "vid" + err = en.Append(0xa3, 0x76, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteString(z.VersionID) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + // write "tier" + err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72) + if err != nil { + return + } + err = en.WriteString(z.TierName) + if err != nil { + err = msgp.WrapError(err, "TierName") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z jentry) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 3 + // string "obj" + o = append(o, 0x83, 0xa3, 0x6f, 0x62, 0x6a) + o = msgp.AppendString(o, z.ObjName) + // string "vid" + o = append(o, 0xa3, 0x76, 0x69, 0x64) + o = msgp.AppendString(o, z.VersionID) + // string "tier" + o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72) + o = msgp.AppendString(o, z.TierName) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "obj": + z.ObjName, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjName") + return + } + case "vid": + z.VersionID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "tier": + z.TierName, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TierName") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z jentry) Msgsize() (s int) { + s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 4 + msgp.StringPrefixSize + len(z.VersionID) + 5 + msgp.StringPrefixSize + len(z.TierName) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *jentryV1) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte _ = field var zb0001 uint32 @@ -48,7 +201,7 @@ func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z jentry) EncodeMsg(en *msgp.Writer) (err error) { +func (z jentryV1) EncodeMsg(en *msgp.Writer) (err error) { // map header, size 2 // write "obj" err = en.Append(0x82, 0xa3, 0x6f, 0x62, 0x6a) @@ -74,7 +227,7 @@ func (z jentry) EncodeMsg(en *msgp.Writer) (err error) { } // MarshalMsg implements msgp.Marshaler -func (z jentry) MarshalMsg(b []byte) (o []byte, err error) { +func (z jentryV1) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) // map header, size 2 // string "obj" @@ -87,7 +240,7 @@ func (z jentry) MarshalMsg(b []byte) (o []byte, err error) { } // UnmarshalMsg implements msgp.Unmarshaler -func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) { +func (z *jentryV1) UnmarshalMsg(bts []byte) (o []byte, err error) { var field []byte _ = field var zb0001 uint32 @@ -129,7 +282,7 @@ func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z jentry) Msgsize() (s int) { +func (z jentryV1) Msgsize() (s int) { s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 5 + msgp.StringPrefixSize + len(z.TierName) return } diff --git a/cmd/tier-journal_gen_test.go b/cmd/tier-journal_gen_test.go index d126ae391..5cff069a5 100644 --- a/cmd/tier-journal_gen_test.go +++ b/cmd/tier-journal_gen_test.go @@ -121,3 +121,116 @@ func BenchmarkDecodejentry(b *testing.B) { } } } + +func TestMarshalUnmarshaljentryV1(t *testing.T) { + v := jentryV1{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgjentryV1(b *testing.B) { + v := jentryV1{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgjentryV1(b *testing.B) { + v := jentryV1{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshaljentryV1(b *testing.B) { + v := jentryV1{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodejentryV1(t *testing.T) { + v := jentryV1{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodejentryV1 Msgsize() is inaccurate") + } + + vn := jentryV1{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodejentryV1(b *testing.B) { + v := jentryV1{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodejentryV1(b *testing.B) { + v := jentryV1{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/tier-journal_test.go b/cmd/tier-journal_test.go new file mode 100644 index 000000000..5c6dd0c75 --- /dev/null +++ b/cmd/tier-journal_test.go @@ -0,0 +1,121 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +// TestJEntryReadOldToNew1 - tests that adding the RemoteVersionID parameter to the +// jentry struct does not cause unexpected errors when reading the serialized +// old version into new version. +func TestJEntryReadOldToNew1(t *testing.T) { + readOldToNewCases := []struct { + je jentryV1 + exp jentry + }{ + {jentryV1{"obj1", "tier1"}, jentry{"obj1", "", "tier1"}}, + {jentryV1{"obj1", ""}, jentry{"obj1", "", ""}}, + {jentryV1{"", "tier1"}, jentry{"", "", "tier1"}}, + {jentryV1{"", ""}, jentry{"", "", ""}}, + } + + var b bytes.Buffer + for _, item := range readOldToNewCases { + bs, err := item.je.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + b.Write(bs) + } + + mr := msgp.NewReader(&b) + for i, item := range readOldToNewCases { + var je jentry + err := je.DecodeMsg(mr) + if err != nil { + t.Fatal(err) + } + if je != item.exp { + t.Errorf("Case %d: Expected: %v Got: %v", i, item.exp, je) + } + } +} + +// TestJEntryWriteNewToOldMix1 - tests that adding the RemoteVersionID parameter +// to the jentry struct does not cause unexpected errors when writing. This +// simulates the case when the active journal has entries in the older version +// struct and due to errors new entries are added in the new version of the +// struct. +func TestJEntryWriteNewToOldMix1(t *testing.T) { + oldStructVals := []jentryV1{ + {"obj1", "tier1"}, + {"obj2", "tier2"}, + {"obj3", "tier3"}, + } + newStructVals := []jentry{ + {"obj4", "", "tier1"}, + {"obj5", "ver2", "tier2"}, + {"obj6", "", "tier3"}, + } + + // Write old struct version values followed by new version values. + var b bytes.Buffer + for _, item := range oldStructVals { + bs, err := item.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + b.Write(bs) + } + for _, item := range newStructVals { + bs, err := item.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + b.Write(bs) + } + + // Read into new struct version and check. + mr := msgp.NewReader(&b) + for i := 0; i < len(oldStructVals)+len(newStructVals); i++ { + var je jentry + err := je.DecodeMsg(mr) + if err != nil { + t.Fatal(err) + } + var expectedJe jentry + if i < len(oldStructVals) { + // For old struct values, the RemoteVersionID will be + // empty + expectedJe = jentry{ + ObjName: oldStructVals[i].ObjName, + VersionID: "", + TierName: oldStructVals[i].TierName, + } + } else { + expectedJe = newStructVals[i-len(oldStructVals)] + } + if expectedJe != je { + t.Errorf("Case %d: Expected: %v, Got: %v", i, expectedJe, je) + } + } +} diff --git a/cmd/tier-sweeper.go b/cmd/tier-sweeper.go index 9b5944a94..24cbcedda 100644 --- a/cmd/tier-sweeper.go +++ b/cmd/tier-sweeper.go @@ -41,14 +41,15 @@ import ( // logger.LogIf(ctx, err) // } type objSweeper struct { - Object string - Bucket string - ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs - Versioned bool - Suspended bool - TransitionStatus string - TransitionTier string - RemoteObject string + Object string + Bucket string + ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs + Versioned bool + Suspended bool + TransitionStatus string + TransitionTier string + TransitionVersionID string + RemoteObject string } // newObjSweeper returns an objSweeper for a given bucket and object. @@ -116,6 +117,7 @@ func (os *objSweeper) SetTransitionState(info ObjectInfo) { os.TransitionTier = info.TransitionTier os.TransitionStatus = info.TransitionStatus os.RemoteObject = info.transitionedObjName + os.TransitionVersionID = info.transitionVersionID } // shouldRemoveRemoteObject determines if a transitioned object should be @@ -142,7 +144,11 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) { delTier = true } if delTier { - return jentry{ObjName: os.RemoteObject, TierName: os.TransitionTier}, true + return jentry{ + ObjName: os.RemoteObject, + VersionID: os.TransitionVersionID, + TierName: os.TransitionTier, + }, true } return jentry{}, false } diff --git a/cmd/warm-backend-azure.go b/cmd/warm-backend-azure.go index 9b4e68979..01ec4033f 100644 --- a/cmd/warm-backend-azure.go +++ b/cmd/warm-backend-azure.go @@ -53,19 +53,23 @@ func (az *warmBackendAzure) tier() azblob.AccessTierType { } return azblob.AccessTierType("") } -func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) error { + +// FIXME: add support for remote version ID in Azure remote tier and remove +// this. Currently it's a no-op. + +func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) { blobURL := az.serviceURL.NewContainerURL(az.Bucket).NewBlockBlobURL(az.getDest(object)) // set tier if specified - if az.StorageClass != "" { if _, err := blobURL.SetTier(ctx, az.tier(), azblob.LeaseAccessConditions{}); err != nil { - return azureToObjectError(err, az.Bucket, object) + return "", azureToObjectError(err, az.Bucket, object) } } - _, err := azblob.UploadStreamToBlockBlob(ctx, r, blobURL, azblob.UploadStreamToBlockBlobOptions{}) - return azureToObjectError(err, az.Bucket, object) + res, err := azblob.UploadStreamToBlockBlob(ctx, r, blobURL, azblob.UploadStreamToBlockBlobOptions{}) + return remoteVersionID(res.Version()), azureToObjectError(err, az.Bucket, object) } -func (az *warmBackendAzure) Get(ctx context.Context, object string, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { +func (az *warmBackendAzure) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { if opts.startOffset < 0 { return nil, InvalidRange{} } @@ -79,7 +83,7 @@ func (az *warmBackendAzure) Get(ctx context.Context, object string, opts WarmBac return rc, nil } -func (az *warmBackendAzure) Remove(ctx context.Context, object string) error { +func (az *warmBackendAzure) Remove(ctx context.Context, object string, rv remoteVersionID) error { blob := az.serviceURL.NewContainerURL(az.Bucket).NewBlobURL(az.getDest(object)) _, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) return azureToObjectError(err, az.Bucket, object) diff --git a/cmd/warm-backend-gcs.go b/cmd/warm-backend-gcs.go index a24f07997..28f2edad5 100644 --- a/cmd/warm-backend-gcs.go +++ b/cmd/warm-backend-gcs.go @@ -43,7 +43,11 @@ func (gcs *warmBackendGCS) getDest(object string) string { } return destObj } -func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) error { + +// FIXME: add support for remote version ID in GCS remote tier and remove this. +// Currently it's a no-op. + +func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) (remoteVersionID, error) { object := gcs.client.Bucket(gcs.Bucket).Object(gcs.getDest(key)) //TODO: set storage class w := object.NewWriter(ctx) @@ -51,13 +55,13 @@ func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, w.ObjectAttrs.StorageClass = gcs.StorageClass } if _, err := io.Copy(w, data); err != nil { - return gcsToObjectError(err, gcs.Bucket, key) + return "", gcsToObjectError(err, gcs.Bucket, key) } - return w.Close() + return "", w.Close() } -func (gcs *warmBackendGCS) Get(ctx context.Context, key string, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { +func (gcs *warmBackendGCS) Get(ctx context.Context, key string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { // GCS storage decompresses a gzipped object by default and returns the data. // Refer to https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding // Need to set `Accept-Encoding` header to `gzip` when issuing a GetObject call, to be able @@ -73,7 +77,7 @@ func (gcs *warmBackendGCS) Get(ctx context.Context, key string, opts WarmBackend return r, nil } -func (gcs *warmBackendGCS) Remove(ctx context.Context, key string) error { +func (gcs *warmBackendGCS) Remove(ctx context.Context, key string, rv remoteVersionID) error { err := gcs.client.Bucket(gcs.Bucket).Object(gcs.getDest(key)).Delete(ctx) return gcsToObjectError(err, gcs.Bucket, key) } diff --git a/cmd/warm-backend-s3.go b/cmd/warm-backend-s3.go index de11dc469..a5c9ca830 100644 --- a/cmd/warm-backend-s3.go +++ b/cmd/warm-backend-s3.go @@ -56,14 +56,17 @@ func (s3 *warmBackendS3) getDest(object string) string { return destObj } -func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) error { - _, err := s3.client.PutObject(ctx, s3.Bucket, s3.getDest(object), r, length, minio.PutObjectOptions{StorageClass: s3.StorageClass}) - return s3.ToObjectError(err, object) +func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) { + res, err := s3.client.PutObject(ctx, s3.Bucket, s3.getDest(object), r, length, minio.PutObjectOptions{StorageClass: s3.StorageClass}) + return remoteVersionID(res.VersionID), s3.ToObjectError(err, object) } -func (s3 *warmBackendS3) Get(ctx context.Context, object string, opts WarmBackendGetOpts) (io.ReadCloser, error) { +func (s3 *warmBackendS3) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error) { gopts := minio.GetObjectOptions{} + if rv != "" { + gopts.VersionID = string(rv) + } if opts.startOffset >= 0 && opts.length > 0 { if err := gopts.SetRange(opts.startOffset, opts.startOffset+opts.length-1); err != nil { return nil, s3.ToObjectError(err, object) @@ -78,8 +81,12 @@ func (s3 *warmBackendS3) Get(ctx context.Context, object string, opts WarmBacken return r, nil } -func (s3 *warmBackendS3) Remove(ctx context.Context, object string) error { - err := s3.client.RemoveObject(ctx, s3.Bucket, s3.getDest(object), minio.RemoveObjectOptions{}) +func (s3 *warmBackendS3) Remove(ctx context.Context, object string, rv remoteVersionID) error { + ropts := minio.RemoveObjectOptions{} + if rv != "" { + ropts.VersionID = string(rv) + } + err := s3.client.RemoveObject(ctx, s3.Bucket, s3.getDest(object), ropts) return s3.ToObjectError(err, object) } diff --git a/cmd/warm-backend.go b/cmd/warm-backend.go index c1ccdfecc..441bf27af 100644 --- a/cmd/warm-backend.go +++ b/cmd/warm-backend.go @@ -36,9 +36,9 @@ type WarmBackendGetOpts struct { // WarmBackend provides interface to be implemented by remote tier backends type WarmBackend interface { - Put(ctx context.Context, object string, r io.Reader, length int64) error - Get(ctx context.Context, object string, opts WarmBackendGetOpts) (io.ReadCloser, error) - Remove(ctx context.Context, object string) error + Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) + Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error) + Remove(ctx context.Context, object string, rv remoteVersionID) error InUse(ctx context.Context) (bool, error) } @@ -48,7 +48,7 @@ const probeObject = "probeobject" // to perform all operations defined in the WarmBackend interface. func checkWarmBackend(ctx context.Context, w WarmBackend) error { var empty bytes.Reader - err := w.Put(ctx, probeObject, &empty, 0) + rv, err := w.Put(ctx, probeObject, &empty, 0) if err != nil { return tierPermErr{ Op: tierPut, @@ -56,7 +56,7 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error { } } - _, err = w.Get(ctx, probeObject, WarmBackendGetOpts{}) + _, err = w.Get(ctx, probeObject, rv, WarmBackendGetOpts{}) if err != nil { switch { case isErrBucketNotFound(err): @@ -71,7 +71,7 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error { } } - if err = w.Remove(ctx, probeObject); err != nil { + if err = w.Remove(ctx, probeObject, rv); err != nil { return tierPermErr{ Op: tierDelete, Err: err, @@ -115,6 +115,10 @@ func errIsTierPermError(err error) bool { return errors.As(err, &tpErr) } +// remoteVersionID represents the version id of an object in the remote tier. +// Its usage is remote tier cloud implementation specific. +type remoteVersionID string + // newWarmBackend instantiates the tier type specific WarmBackend, runs // checkWarmBackend on it. func newWarmBackend(ctx context.Context, tier madmin.TierConfig) (d WarmBackend, err error) { diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 9e7866c92..501d7d321 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -875,6 +875,9 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error { if fi.TransitionedObjName != "" { ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName) } + if fi.TransitionVersionID != "" { + ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID) + } if fi.TransitionTier != "" { ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier) } @@ -1020,6 +1023,9 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) { if o, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName]; ok { fi.TransitionedObjName = string(o) } + if rv, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID]; ok { + fi.TransitionVersionID = string(rv) + } if sc, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionTier]; ok { fi.TransitionTier = string(sc) } @@ -1188,6 +1194,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { case fi.TransitionStatus == lifecycle.TransitionComplete: z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionStatus] = []byte(fi.TransitionStatus) z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName) + z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID) z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier) default: z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)