From 60a5aa7fa2d98e6378f5289bf2a2c2df65f0b498 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Mon, 25 Oct 2021 14:43:55 +0200 Subject: [PATCH] metrics: Add replication latency metrics Add a new prometheus metric for bucket replication latency e.g.: minio_bucket_replication_latency_ns{ bucket="testbucket", operation="upload", range="LESS_THAN_1_MiB", server="127.0.0.1:9001", targetArn="arn:minio:replication::45da043c-14f5-4da4-9316-aba5f77bf730:testbucket"} 2.2015663e+07 --- cmd/bucket-replication-stats.go | 7 +- cmd/bucket-replication-utils.go | 1 + cmd/bucket-replication.go | 12 +- cmd/bucket-stats.go | 96 +++++++++++ cmd/bucket-stats_gen.go | 275 +++++++++++++++++++++++++++++++- cmd/bucket-stats_gen_test.go | 113 +++++++++++++ cmd/metrics-v2.go | 40 +++-- cmd/metrics.go | 3 + cmd/xl-storage-disk-id-check.go | 4 +- go.mod | 1 - internal/ewma/ewma.go | 53 ++++++ internal/ewma/ewma_gen.go | 110 +++++++++++++ internal/ewma/ewma_gen_test.go | 123 ++++++++++++++ internal/ewma/ewma_test.go | 41 +++++ 14 files changed, 854 insertions(+), 25 deletions(-) create mode 100644 internal/ewma/ewma.go create mode 100644 internal/ewma/ewma_gen.go create mode 100644 internal/ewma/ewma_gen_test.go create mode 100644 internal/ewma/ewma_test.go diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index e2131d8f2..8a0cee240 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -73,7 +73,7 @@ func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) { } // Update updates in-memory replication statistics with new values. -func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, prevStatus replication.StatusType, opType replication.Type) { +func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration time.Duration, status, prevStatus replication.StatusType, opType replication.Type) { if r == nil { return } @@ -99,6 +99,11 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, pr case replication.Failed: atomic.AddInt64(&b.FailedSize, -1*n) } + if duration > 0 { + r.Lock() + b.Latency.update(n, duration) + r.Unlock() + } } case replication.Failed: if opType == replication.ObjectReplicationType { diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index e3a2965e0..a961db7ed 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -34,6 +34,7 @@ import ( type replicatedTargetInfo struct { Arn string Size int64 + Duration time.Duration ReplicationAction replicationAction // full or metadata only OpType replication.Type // whether incoming replication, existing object, healing etc.. ReplicationStatus replication.StatusType diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 85ac08e1e..60103ba33 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -433,7 +433,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj // to decrement pending count later. for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { - globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, replicationStatus, + globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, 0, replicationStatus, prevStatus, replication.DeleteReplicationType) } } @@ -938,7 +938,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje } for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { - globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType) + globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.Duration, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType) } } } @@ -963,6 +963,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje // replicateObjectToTarget replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { + startTime := time.Now() objInfo := ri.ObjectInfo.Clone() bucket := objInfo.Bucket object := objInfo.Name @@ -1100,6 +1101,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID) rinfo.ReplicationResynced = true } + rinfo.Duration = time.Since(startTime) }() // use core client to avoid doing multipart on PUT c := &miniogo.Core{Client: tgt.Client} @@ -1634,7 +1636,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, } if sz, err := objInfo.GetActualSize(); err == nil { for arn := range dsc.targetsMap { - globalReplicationStats.Update(objInfo.Bucket, arn, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) + globalReplicationStats.Update(objInfo.Bucket, arn, sz, 0, objInfo.ReplicationStatus, replication.StatusType(""), opType) } } } @@ -1642,10 +1644,10 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) { globalReplicationPool.queueReplicaDeleteTask(dv) for arn := range dv.ReplicationState.Targets { - globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) + globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } for arn := range dv.ReplicationState.PurgeTargets { - globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) + globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } } diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index 40144688f..ab65e699b 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -19,10 +19,103 @@ package cmd import ( "sync/atomic" + "time" + + "github.com/minio/minio/internal/ewma" ) //go:generate msgp -file $GOFILE +type sizeTag uint8 + +const ( + sizeLessThan1KiB = iota + sizeLessThan1MiB + sizeLessThan10MiB + sizeLessThan100MiB + sizeLessThan1GiB + sizeGreaterThan1GiB + // Add new entries here + + sizeLastElemMarker +) + +func sizeToTag(size int64) sizeTag { + switch { + case size < 1024: + return sizeLessThan1KiB + case size < 1024*1024: + return sizeLessThan1MiB + case size < 10*1024*1024: + return sizeLessThan10MiB + case size < 100*1024*1024: + return sizeLessThan100MiB + case size < 1024*1024*1024: + return sizeLessThan1GiB + default: + return sizeGreaterThan1GiB + } +} + +func sizeTagToString(tag sizeTag) string { + switch tag { + case sizeLessThan1KiB: + return "LESS_THAN_1_KiB" + case sizeLessThan1MiB: + return "LESS_THAN_1_MiB" + case sizeLessThan10MiB: + return "LESS_THAN_10_MiB" + case sizeLessThan100MiB: + return "LESS_THAN_100_MiB" + case sizeLessThan1GiB: + return "LESS_THAN_1_GiB" + case sizeGreaterThan1GiB: + return "GREATER_THAN_1_GiB" + default: + return "unknown" + } +} + +// ReplicationLatency holds information of bucket operations latency, such us uploads +type ReplicationLatency struct { + // Single & Multipart PUTs latency + UploadHistogram [sizeLastElemMarker]ewma.SimpleEWMA +} + +// Merge two replication latency into a new one, thread-unsafe +func (rl ReplicationLatency) merge(other ReplicationLatency) (newReplLatency ReplicationLatency) { + for tag := range rl.UploadHistogram { + m := ewma.SimpleEWMA{} + m.Set((rl.UploadHistogram[tag].Value() + other.UploadHistogram[tag].Value()) / 2) + newReplLatency.UploadHistogram[tag] = m + } + return +} + +// Get upload latency, thread-unsafe +func (rl ReplicationLatency) getUploadLatency() (ret map[string]uint64) { + ret = make(map[string]uint64) + for k, v := range rl.UploadHistogram { + ret[sizeTagToString(sizeTag(k))] = uint64(v.Value()) + } + return +} + +// Update replication upload latency with a new value +func (rl *ReplicationLatency) update(size int64, duration time.Duration) { + rl.UploadHistogram[sizeToTag(size)].Add(float64(duration)) +} + +// Safe clone replication latency +func (rl ReplicationLatency) clone() (newReplLatency ReplicationLatency) { + for tag := range rl.UploadHistogram { + m := ewma.SimpleEWMA{} + m.Set(rl.UploadHistogram[tag].Value()) + newReplLatency.UploadHistogram[tag] = m + } + return +} + // BucketStats bucket statistics type BucketStats struct { ReplicationStats BucketReplicationStats @@ -65,6 +158,7 @@ func (brs BucketReplicationStats) Clone() BucketReplicationStats { FailedCount: atomic.LoadInt64(&st.FailedCount), PendingSize: atomic.LoadInt64(&st.PendingSize), PendingCount: atomic.LoadInt64(&st.PendingCount), + Latency: st.Latency.clone(), } } // update total counts across targets @@ -93,6 +187,8 @@ type BucketReplicationStat struct { PendingCount int64 `json:"pendingReplicationCount"` // Total number of failed operations including metadata updates FailedCount int64 `json:"failedReplicationCount"` + // Replication latency information + Latency ReplicationLatency `json:"replicationLatency"` } func (bs *BucketReplicationStat) hasReplicationUsage() bool { diff --git a/cmd/bucket-stats_gen.go b/cmd/bucket-stats_gen.go index c0bec1059..e8c2e486a 100644 --- a/cmd/bucket-stats_gen.go +++ b/cmd/bucket-stats_gen.go @@ -60,6 +60,47 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "FailedCount") return } + case "Latency": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram") + return + } + if zb0003 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003} + return + } + for za0001 := range z.Latency.UploadHistogram { + err = z.Latency.UploadHistogram[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001) + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + } + } default: err = dc.Skip() if err != nil { @@ -73,9 +114,9 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 7 // write "PendingSize" - err = en.Append(0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) + err = en.Append(0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) if err != nil { return } @@ -134,15 +175,38 @@ func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "FailedCount") return } + // write "Latency" + err = en.Append(0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + if err != nil { + return + } + // map header, size 1 + // write "UploadHistogram" + err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(sizeLastElemMarker)) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram") + return + } + for za0001 := range z.Latency.UploadHistogram { + err = z.Latency.UploadHistogram[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001) + return + } + } return } // MarshalMsg implements msgp.Marshaler func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 7 // string "PendingSize" - o = append(o, 0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) + o = append(o, 0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) o = msgp.AppendInt64(o, z.PendingSize) // string "ReplicatedSize" o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) @@ -159,6 +223,19 @@ func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) { // string "FailedCount" o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) o = msgp.AppendInt64(o, z.FailedCount) + // string "Latency" + o = append(o, 0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + // map header, size 1 + // string "UploadHistogram" + o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker)) + for za0001 := range z.Latency.UploadHistogram { + o, err = z.Latency.UploadHistogram[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001) + return + } + } return } @@ -216,6 +293,47 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "FailedCount") return } + case "Latency": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram") + return + } + if zb0003 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003} + return + } + for za0001 := range z.Latency.UploadHistogram { + bts, err = z.Latency.UploadHistogram[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001) + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Latency") + return + } + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -230,7 +348,10 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BucketReplicationStat) Msgsize() (s int) { - s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size + s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size + 8 + 1 + 16 + msgp.ArrayHeaderSize + for za0001 := range z.Latency.UploadHistogram { + s += z.Latency.UploadHistogram[za0001].Msgsize() + } return } @@ -707,3 +828,147 @@ func (z *BucketStats) Msgsize() (s int) { s = 1 + 17 + z.ReplicationStats.Msgsize() return } + +// DecodeMsg implements msgp.Decodable +func (z *ReplicationLatency) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "UploadHistogram") + return + } + if zb0002 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0002} + return + } + for za0001 := range z.UploadHistogram { + err = z.UploadHistogram[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram", za0001) + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ReplicationLatency) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "UploadHistogram" + err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(sizeLastElemMarker)) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram") + return + } + for za0001 := range z.UploadHistogram { + err = z.UploadHistogram[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram", za0001) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ReplicationLatency) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "UploadHistogram" + o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d) + o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker)) + for za0001 := range z.UploadHistogram { + o, err = z.UploadHistogram[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram", za0001) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ReplicationLatency) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "UploadHistogram": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram") + return + } + if zb0002 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0002} + return + } + for za0001 := range z.UploadHistogram { + bts, err = z.UploadHistogram[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "UploadHistogram", za0001) + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ReplicationLatency) Msgsize() (s int) { + s = 1 + 16 + msgp.ArrayHeaderSize + for za0001 := range z.UploadHistogram { + s += z.UploadHistogram[za0001].Msgsize() + } + return +} diff --git a/cmd/bucket-stats_gen_test.go b/cmd/bucket-stats_gen_test.go index 33328d4f4..f50f1b964 100644 --- a/cmd/bucket-stats_gen_test.go +++ b/cmd/bucket-stats_gen_test.go @@ -347,3 +347,116 @@ func BenchmarkDecodeBucketStats(b *testing.B) { } } } + +func TestMarshalUnmarshalReplicationLatency(t *testing.T) { + v := ReplicationLatency{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeReplicationLatency(t *testing.T) { + v := ReplicationLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeReplicationLatency Msgsize() is inaccurate") + } + + vn := ReplicationLatency{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeReplicationLatency(b *testing.B) { + v := ReplicationLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 8b0300283..b38293e8d 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -99,17 +99,18 @@ const ( total MetricName = "total" freeInodes MetricName = "free_inodes" - failedCount MetricName = "failed_count" - failedBytes MetricName = "failed_bytes" - freeBytes MetricName = "free_bytes" - readBytes MetricName = "read_bytes" - rcharBytes MetricName = "rchar_bytes" - receivedBytes MetricName = "received_bytes" - sentBytes MetricName = "sent_bytes" - totalBytes MetricName = "total_bytes" - usedBytes MetricName = "used_bytes" - writeBytes MetricName = "write_bytes" - wcharBytes MetricName = "wchar_bytes" + failedCount MetricName = "failed_count" + failedBytes MetricName = "failed_bytes" + freeBytes MetricName = "free_bytes" + readBytes MetricName = "read_bytes" + rcharBytes MetricName = "rchar_bytes" + receivedBytes MetricName = "received_bytes" + latencyNanoSec MetricName = "latency_ns" + sentBytes MetricName = "sent_bytes" + totalBytes MetricName = "total_bytes" + usedBytes MetricName = "used_bytes" + writeBytes MetricName = "write_bytes" + wcharBytes MetricName = "wchar_bytes" usagePercent MetricName = "update_percent" @@ -409,6 +410,16 @@ func getBucketUsageObjectsTotalMD() MetricDescription { } } +func getBucketRepLatencyMD() MetricDescription { + return MetricDescription{ + Namespace: bucketMetricNamespace, + Subsystem: replicationSubsystem, + Name: latencyNanoSec, + Help: "Replication latency.", + Type: histogramMetric, + } +} + func getBucketRepFailedBytesMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -1482,6 +1493,13 @@ func getBucketUsageMetrics() MetricsGroup { Value: float64(stat.FailedCount), VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn}, }) + metrics = append(metrics, Metric{ + Description: getBucketRepLatencyMD(), + HistogramBucketLabel: "range", + Histogram: stat.Latency.getUploadLatency(), + VariableLabels: map[string]string{"bucket": bucket, "operation": "upload", "targetArn": arn}, + }) + } } diff --git a/cmd/metrics.go b/cmd/metrics.go index 1ec4000c8..8f483a08e 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -450,6 +450,7 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic FailedCount: stat.FailedCount + oldst.FailedCount, FailedSize: stat.FailedSize + oldst.FailedSize, ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, + Latency: stat.Latency.merge(oldst.Latency), } } } @@ -498,6 +499,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic // happen since data usage picture can lag behind actual usage state at the time of cluster start st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) + st.Latency = tgtstat.Latency + s.Stats[arn] = &st s.FailedSize += st.FailedSize s.FailedCount += st.FailedCount diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 1f08cb2ab..bdd646930 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -25,8 +25,8 @@ import ( "sync/atomic" "time" - "github.com/VividCortex/ewma" "github.com/minio/madmin-go" + "github.com/minio/minio/internal/ewma" ) //go:generate stringer -type=storageMetric -trimprefix=storageMetric $GOFILE @@ -71,7 +71,7 @@ type xlStorageDiskIDCheck struct { // please use `fieldalignment ./...` to check // if your changes are not causing any problems. storage StorageAPI - apiLatencies [storageMetricLast]ewma.MovingAverage + apiLatencies [storageMetricLast]*lockedSimpleEWMA diskID string apiCalls [storageMetricLast]uint64 } diff --git a/go.mod b/go.mod index 3c7b89238..e09e7f3fe 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/Azure/azure-pipeline-go v0.2.2 github.com/Azure/azure-storage-blob-go v0.10.0 github.com/Shopify/sarama v1.27.2 - github.com/VividCortex/ewma v1.1.1 github.com/alecthomas/participle v0.2.1 github.com/bcicen/jstream v1.0.1 github.com/beevik/ntp v0.3.0 diff --git a/internal/ewma/ewma.go b/internal/ewma/ewma.go new file mode 100644 index 000000000..b24b08e33 --- /dev/null +++ b/internal/ewma/ewma.go @@ -0,0 +1,53 @@ +// Package ewma implements exponentially weighted moving averages. +package ewma + +// Copyright (c) 2013 VividCortex, Inc. All rights reserved. +// Please see the LICENSE file for applicable license terms. + +//go:generate msgp -file $GOFILE + +const ( + // By default, we average over a one-minute period, which means the average + // age of the metrics in the period is 30 seconds. + avgMetricAge float64 = 30.0 + + // The formula for computing the decay factor from the average age comes + // from "Production and Operations Analysis" by Steven Nahmias. + decay float64 = 2 / (avgMetricAge + 1) +) + +// SimpleEWMA represents the exponentially weighted moving average of a +// series of numbers. It WILL have different behavior than the VariableEWMA +// for multiple reasons. It has no warm-up period and it uses a constant +// decay. These properties let it use less memory. It will also behave +// differently when it's equal to zero, which is assumed to mean +// uninitialized, so if a value is likely to actually become zero over time, +// then any non-zero value will cause a sharp jump instead of a small change. +// However, note that this takes a long time, and the value may just +// decays to a stable value that's close to zero, but which won't be mistaken +// for uninitialized. See http://play.golang.org/p/litxBDr_RC for example. +type SimpleEWMA struct { + // The current value of the average. After adding with Add(), this is + // updated to reflect the average of all values seen thus far. + // This is exported for msgp + Val float64 +} + +// Add adds a value to the series and updates the moving average. +func (e *SimpleEWMA) Add(value float64) { + if e.Val == 0 { // this is a proxy for "uninitialized" + e.Val = value + } else { + e.Val = (value * decay) + (e.Val * (1 - decay)) + } +} + +// Value returns the current value of the moving average. +func (e SimpleEWMA) Value() float64 { + return e.Val +} + +// Set sets the EWMA's value. +func (e *SimpleEWMA) Set(value float64) { + e.Val = value +} diff --git a/internal/ewma/ewma_gen.go b/internal/ewma/ewma_gen.go new file mode 100644 index 000000000..af5795186 --- /dev/null +++ b/internal/ewma/ewma_gen.go @@ -0,0 +1,110 @@ +package ewma + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *SimpleEWMA) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Val": + z.Val, err = dc.ReadFloat64() + if err != nil { + err = msgp.WrapError(err, "Val") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z SimpleEWMA) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "Val" + err = en.Append(0x81, 0xa3, 0x56, 0x61, 0x6c) + if err != nil { + return + } + err = en.WriteFloat64(z.Val) + if err != nil { + err = msgp.WrapError(err, "Val") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z SimpleEWMA) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "Val" + o = append(o, 0x81, 0xa3, 0x56, 0x61, 0x6c) + o = msgp.AppendFloat64(o, z.Val) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *SimpleEWMA) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Val": + z.Val, bts, err = msgp.ReadFloat64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Val") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z SimpleEWMA) Msgsize() (s int) { + s = 1 + 4 + msgp.Float64Size + return +} diff --git a/internal/ewma/ewma_gen_test.go b/internal/ewma/ewma_gen_test.go new file mode 100644 index 000000000..7bc22cf80 --- /dev/null +++ b/internal/ewma/ewma_gen_test.go @@ -0,0 +1,123 @@ +package ewma + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalSimpleEWMA(t *testing.T) { + v := SimpleEWMA{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgSimpleEWMA(b *testing.B) { + v := SimpleEWMA{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgSimpleEWMA(b *testing.B) { + v := SimpleEWMA{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalSimpleEWMA(b *testing.B) { + v := SimpleEWMA{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeSimpleEWMA(t *testing.T) { + v := SimpleEWMA{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeSimpleEWMA Msgsize() is inaccurate") + } + + vn := SimpleEWMA{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeSimpleEWMA(b *testing.B) { + v := SimpleEWMA{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeSimpleEWMA(b *testing.B) { + v := SimpleEWMA{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/internal/ewma/ewma_test.go b/internal/ewma/ewma_test.go new file mode 100644 index 000000000..e5d0378d2 --- /dev/null +++ b/internal/ewma/ewma_test.go @@ -0,0 +1,41 @@ +package ewma + +// Copyright (c) 2013 VividCortex, Inc. All rights reserved. +// Please see the LICENSE file for applicable license terms. + +import ( + "math" + "testing" +) + +const testMargin = 0.00000001 + +var samples = [100]float64{ + 4599, 5711, 4746, 4621, 5037, 4218, 4925, 4281, 5207, 5203, 5594, 5149, + 4948, 4994, 6056, 4417, 4973, 4714, 4964, 5280, 5074, 4913, 4119, 4522, + 4631, 4341, 4909, 4750, 4663, 5167, 3683, 4964, 5151, 4892, 4171, 5097, + 3546, 4144, 4551, 6557, 4234, 5026, 5220, 4144, 5547, 4747, 4732, 5327, + 5442, 4176, 4907, 3570, 4684, 4161, 5206, 4952, 4317, 4819, 4668, 4603, + 4885, 4645, 4401, 4362, 5035, 3954, 4738, 4545, 5433, 6326, 5927, 4983, + 5364, 4598, 5071, 5231, 5250, 4621, 4269, 3953, 3308, 3623, 5264, 5322, + 5395, 4753, 4936, 5315, 5243, 5060, 4989, 4921, 4480, 3426, 3687, 4220, + 3197, 5139, 6101, 5279, +} + +func withinMargin(a, b float64) bool { + return math.Abs(a-b) <= testMargin +} + +func TestSimpleEWMA(t *testing.T) { + var e SimpleEWMA + for _, f := range samples { + e.Add(f) + } + if !withinMargin(e.Value(), 4734.500946466118) { + t.Errorf("e.Value() is %v, wanted %v", e.Value(), 4734.500946466118) + } + e.Set(1.0) + if e.Value() != 1.0 { + t.Errorf("e.Value() is %v", e.Value()) + } +}