metrics: Add replication latency metrics

Add a new prometheus metric for bucket replication latency

e.g.:
minio_bucket_replication_latency_ns{
    bucket="testbucket",
    operation="upload",
    range="LESS_THAN_1_MiB",
    server="127.0.0.1:9001",
    targetArn="arn:minio:replication::45da043c-14f5-4da4-9316-aba5f77bf730:testbucket"} 2.2015663e+07
This commit is contained in:
Anis Elleuch 2021-10-25 14:43:55 +02:00 committed by Anis Elleuch
parent 8774d10bdf
commit 60a5aa7fa2
14 changed files with 854 additions and 25 deletions

View file

@ -73,7 +73,7 @@ func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) {
}
// Update updates in-memory replication statistics with new values.
func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, prevStatus replication.StatusType, opType replication.Type) {
func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration time.Duration, status, prevStatus replication.StatusType, opType replication.Type) {
if r == nil {
return
}
@ -99,6 +99,11 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, pr
case replication.Failed:
atomic.AddInt64(&b.FailedSize, -1*n)
}
if duration > 0 {
r.Lock()
b.Latency.update(n, duration)
r.Unlock()
}
}
case replication.Failed:
if opType == replication.ObjectReplicationType {

View file

@ -34,6 +34,7 @@ import (
type replicatedTargetInfo struct {
Arn string
Size int64
Duration time.Duration
ReplicationAction replicationAction // full or metadata only
OpType replication.Type // whether incoming replication, existing object, healing etc..
ReplicationStatus replication.StatusType

View file

@ -433,7 +433,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
// to decrement pending count later.
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, replicationStatus,
globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, 0, replicationStatus,
prevStatus, replication.DeleteReplicationType)
}
}
@ -938,7 +938,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
}
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType)
globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.Duration, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType)
}
}
}
@ -963,6 +963,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
// replicateObjectToTarget replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
@ -1100,6 +1101,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
rinfo.ReplicationResynced = true
}
rinfo.Duration = time.Since(startTime)
}()
// use core client to avoid doing multipart on PUT
c := &miniogo.Core{Client: tgt.Client}
@ -1634,7 +1636,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
}
if sz, err := objInfo.GetActualSize(); err == nil {
for arn := range dsc.targetsMap {
globalReplicationStats.Update(objInfo.Bucket, arn, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
globalReplicationStats.Update(objInfo.Bucket, arn, sz, 0, objInfo.ReplicationStatus, replication.StatusType(""), opType)
}
}
}
@ -1642,10 +1644,10 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
globalReplicationPool.queueReplicaDeleteTask(dv)
for arn := range dv.ReplicationState.Targets {
globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}
for arn := range dv.ReplicationState.PurgeTargets {
globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}
}

View file

@ -19,10 +19,103 @@ package cmd
import (
"sync/atomic"
"time"
"github.com/minio/minio/internal/ewma"
)
//go:generate msgp -file $GOFILE
type sizeTag uint8
const (
sizeLessThan1KiB = iota
sizeLessThan1MiB
sizeLessThan10MiB
sizeLessThan100MiB
sizeLessThan1GiB
sizeGreaterThan1GiB
// Add new entries here
sizeLastElemMarker
)
func sizeToTag(size int64) sizeTag {
switch {
case size < 1024:
return sizeLessThan1KiB
case size < 1024*1024:
return sizeLessThan1MiB
case size < 10*1024*1024:
return sizeLessThan10MiB
case size < 100*1024*1024:
return sizeLessThan100MiB
case size < 1024*1024*1024:
return sizeLessThan1GiB
default:
return sizeGreaterThan1GiB
}
}
func sizeTagToString(tag sizeTag) string {
switch tag {
case sizeLessThan1KiB:
return "LESS_THAN_1_KiB"
case sizeLessThan1MiB:
return "LESS_THAN_1_MiB"
case sizeLessThan10MiB:
return "LESS_THAN_10_MiB"
case sizeLessThan100MiB:
return "LESS_THAN_100_MiB"
case sizeLessThan1GiB:
return "LESS_THAN_1_GiB"
case sizeGreaterThan1GiB:
return "GREATER_THAN_1_GiB"
default:
return "unknown"
}
}
// ReplicationLatency holds information of bucket operations latency, such us uploads
type ReplicationLatency struct {
// Single & Multipart PUTs latency
UploadHistogram [sizeLastElemMarker]ewma.SimpleEWMA
}
// Merge two replication latency into a new one, thread-unsafe
func (rl ReplicationLatency) merge(other ReplicationLatency) (newReplLatency ReplicationLatency) {
for tag := range rl.UploadHistogram {
m := ewma.SimpleEWMA{}
m.Set((rl.UploadHistogram[tag].Value() + other.UploadHistogram[tag].Value()) / 2)
newReplLatency.UploadHistogram[tag] = m
}
return
}
// Get upload latency, thread-unsafe
func (rl ReplicationLatency) getUploadLatency() (ret map[string]uint64) {
ret = make(map[string]uint64)
for k, v := range rl.UploadHistogram {
ret[sizeTagToString(sizeTag(k))] = uint64(v.Value())
}
return
}
// Update replication upload latency with a new value
func (rl *ReplicationLatency) update(size int64, duration time.Duration) {
rl.UploadHistogram[sizeToTag(size)].Add(float64(duration))
}
// Safe clone replication latency
func (rl ReplicationLatency) clone() (newReplLatency ReplicationLatency) {
for tag := range rl.UploadHistogram {
m := ewma.SimpleEWMA{}
m.Set(rl.UploadHistogram[tag].Value())
newReplLatency.UploadHistogram[tag] = m
}
return
}
// BucketStats bucket statistics
type BucketStats struct {
ReplicationStats BucketReplicationStats
@ -65,6 +158,7 @@ func (brs BucketReplicationStats) Clone() BucketReplicationStats {
FailedCount: atomic.LoadInt64(&st.FailedCount),
PendingSize: atomic.LoadInt64(&st.PendingSize),
PendingCount: atomic.LoadInt64(&st.PendingCount),
Latency: st.Latency.clone(),
}
}
// update total counts across targets
@ -93,6 +187,8 @@ type BucketReplicationStat struct {
PendingCount int64 `json:"pendingReplicationCount"`
// Total number of failed operations including metadata updates
FailedCount int64 `json:"failedReplicationCount"`
// Replication latency information
Latency ReplicationLatency `json:"replicationLatency"`
}
func (bs *BucketReplicationStat) hasReplicationUsage() bool {

View file

@ -60,6 +60,47 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "FailedCount")
return
}
case "Latency":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
for zb0002 > 0 {
zb0002--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram")
return
}
if zb0003 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003}
return
}
for za0001 := range z.Latency.UploadHistogram {
err = z.Latency.UploadHistogram[za0001].DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001)
return
}
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
}
}
default:
err = dc.Skip()
if err != nil {
@ -73,9 +114,9 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// map header, size 7
// write "PendingSize"
err = en.Append(0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
err = en.Append(0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
@ -134,15 +175,38 @@ func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "FailedCount")
return
}
// write "Latency"
err = en.Append(0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79)
if err != nil {
return
}
// map header, size 1
// write "UploadHistogram"
err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(sizeLastElemMarker))
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram")
return
}
for za0001 := range z.Latency.UploadHistogram {
err = z.Latency.UploadHistogram[za0001].EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// map header, size 7
// string "PendingSize"
o = append(o, 0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
o = append(o, 0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendInt64(o, z.PendingSize)
// string "ReplicatedSize"
o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
@ -159,6 +223,19 @@ func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) {
// string "FailedCount"
o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74)
o = msgp.AppendInt64(o, z.FailedCount)
// string "Latency"
o = append(o, 0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79)
// map header, size 1
// string "UploadHistogram"
o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker))
for za0001 := range z.Latency.UploadHistogram {
o, err = z.Latency.UploadHistogram[za0001].MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001)
return
}
}
return
}
@ -216,6 +293,47 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "FailedCount")
return
}
case "Latency":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
for zb0002 > 0 {
zb0002--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
var zb0003 uint32
zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram")
return
}
if zb0003 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003}
return
}
for za0001 := range z.Latency.UploadHistogram {
bts, err = z.Latency.UploadHistogram[za0001].UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram", za0001)
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -230,7 +348,10 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketReplicationStat) Msgsize() (s int) {
s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size
s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size + 8 + 1 + 16 + msgp.ArrayHeaderSize
for za0001 := range z.Latency.UploadHistogram {
s += z.Latency.UploadHistogram[za0001].Msgsize()
}
return
}
@ -707,3 +828,147 @@ func (z *BucketStats) Msgsize() (s int) {
s = 1 + 17 + z.ReplicationStats.Msgsize()
return
}
// DecodeMsg implements msgp.Decodable
func (z *ReplicationLatency) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "UploadHistogram")
return
}
if zb0002 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0002}
return
}
for za0001 := range z.UploadHistogram {
err = z.UploadHistogram[za0001].DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram", za0001)
return
}
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *ReplicationLatency) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "UploadHistogram"
err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(sizeLastElemMarker))
if err != nil {
err = msgp.WrapError(err, "UploadHistogram")
return
}
for za0001 := range z.UploadHistogram {
err = z.UploadHistogram[za0001].EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *ReplicationLatency) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "UploadHistogram"
o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker))
for za0001 := range z.UploadHistogram {
o, err = z.UploadHistogram[za0001].MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram", za0001)
return
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *ReplicationLatency) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram")
return
}
if zb0002 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0002}
return
}
for za0001 := range z.UploadHistogram {
bts, err = z.UploadHistogram[za0001].UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram", za0001)
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *ReplicationLatency) Msgsize() (s int) {
s = 1 + 16 + msgp.ArrayHeaderSize
for za0001 := range z.UploadHistogram {
s += z.UploadHistogram[za0001].Msgsize()
}
return
}

View file

@ -347,3 +347,116 @@ func BenchmarkDecodeBucketStats(b *testing.B) {
}
}
}
func TestMarshalUnmarshalReplicationLatency(t *testing.T) {
v := ReplicationLatency{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeReplicationLatency(t *testing.T) {
v := ReplicationLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeReplicationLatency Msgsize() is inaccurate")
}
vn := ReplicationLatency{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View file

@ -99,17 +99,18 @@ const (
total MetricName = "total"
freeInodes MetricName = "free_inodes"
failedCount MetricName = "failed_count"
failedBytes MetricName = "failed_bytes"
freeBytes MetricName = "free_bytes"
readBytes MetricName = "read_bytes"
rcharBytes MetricName = "rchar_bytes"
receivedBytes MetricName = "received_bytes"
sentBytes MetricName = "sent_bytes"
totalBytes MetricName = "total_bytes"
usedBytes MetricName = "used_bytes"
writeBytes MetricName = "write_bytes"
wcharBytes MetricName = "wchar_bytes"
failedCount MetricName = "failed_count"
failedBytes MetricName = "failed_bytes"
freeBytes MetricName = "free_bytes"
readBytes MetricName = "read_bytes"
rcharBytes MetricName = "rchar_bytes"
receivedBytes MetricName = "received_bytes"
latencyNanoSec MetricName = "latency_ns"
sentBytes MetricName = "sent_bytes"
totalBytes MetricName = "total_bytes"
usedBytes MetricName = "used_bytes"
writeBytes MetricName = "write_bytes"
wcharBytes MetricName = "wchar_bytes"
usagePercent MetricName = "update_percent"
@ -409,6 +410,16 @@ func getBucketUsageObjectsTotalMD() MetricDescription {
}
}
func getBucketRepLatencyMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
Subsystem: replicationSubsystem,
Name: latencyNanoSec,
Help: "Replication latency.",
Type: histogramMetric,
}
}
func getBucketRepFailedBytesMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@ -1482,6 +1493,13 @@ func getBucketUsageMetrics() MetricsGroup {
Value: float64(stat.FailedCount),
VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn},
})
metrics = append(metrics, Metric{
Description: getBucketRepLatencyMD(),
HistogramBucketLabel: "range",
Histogram: stat.Latency.getUploadLatency(),
VariableLabels: map[string]string{"bucket": bucket, "operation": "upload", "targetArn": arn},
})
}
}

View file

@ -450,6 +450,7 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
FailedCount: stat.FailedCount + oldst.FailedCount,
FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
Latency: stat.Latency.merge(oldst.Latency),
}
}
}
@ -498,6 +499,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.Latency = tgtstat.Latency
s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount

View file

@ -25,8 +25,8 @@ import (
"sync/atomic"
"time"
"github.com/VividCortex/ewma"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/ewma"
)
//go:generate stringer -type=storageMetric -trimprefix=storageMetric $GOFILE
@ -71,7 +71,7 @@ type xlStorageDiskIDCheck struct {
// please use `fieldalignment ./...` to check
// if your changes are not causing any problems.
storage StorageAPI
apiLatencies [storageMetricLast]ewma.MovingAverage
apiLatencies [storageMetricLast]*lockedSimpleEWMA
diskID string
apiCalls [storageMetricLast]uint64
}

1
go.mod
View file

@ -7,7 +7,6 @@ require (
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/Shopify/sarama v1.27.2
github.com/VividCortex/ewma v1.1.1
github.com/alecthomas/participle v0.2.1
github.com/bcicen/jstream v1.0.1
github.com/beevik/ntp v0.3.0

53
internal/ewma/ewma.go Normal file
View file

@ -0,0 +1,53 @@
// Package ewma implements exponentially weighted moving averages.
package ewma
// Copyright (c) 2013 VividCortex, Inc. All rights reserved.
// Please see the LICENSE file for applicable license terms.
//go:generate msgp -file $GOFILE
const (
// By default, we average over a one-minute period, which means the average
// age of the metrics in the period is 30 seconds.
avgMetricAge float64 = 30.0
// The formula for computing the decay factor from the average age comes
// from "Production and Operations Analysis" by Steven Nahmias.
decay float64 = 2 / (avgMetricAge + 1)
)
// SimpleEWMA represents the exponentially weighted moving average of a
// series of numbers. It WILL have different behavior than the VariableEWMA
// for multiple reasons. It has no warm-up period and it uses a constant
// decay. These properties let it use less memory. It will also behave
// differently when it's equal to zero, which is assumed to mean
// uninitialized, so if a value is likely to actually become zero over time,
// then any non-zero value will cause a sharp jump instead of a small change.
// However, note that this takes a long time, and the value may just
// decays to a stable value that's close to zero, but which won't be mistaken
// for uninitialized. See http://play.golang.org/p/litxBDr_RC for example.
type SimpleEWMA struct {
// The current value of the average. After adding with Add(), this is
// updated to reflect the average of all values seen thus far.
// This is exported for msgp
Val float64
}
// Add adds a value to the series and updates the moving average.
func (e *SimpleEWMA) Add(value float64) {
if e.Val == 0 { // this is a proxy for "uninitialized"
e.Val = value
} else {
e.Val = (value * decay) + (e.Val * (1 - decay))
}
}
// Value returns the current value of the moving average.
func (e SimpleEWMA) Value() float64 {
return e.Val
}
// Set sets the EWMA's value.
func (e *SimpleEWMA) Set(value float64) {
e.Val = value
}

110
internal/ewma/ewma_gen.go Normal file
View file

@ -0,0 +1,110 @@
package ewma
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *SimpleEWMA) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Val":
z.Val, err = dc.ReadFloat64()
if err != nil {
err = msgp.WrapError(err, "Val")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z SimpleEWMA) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "Val"
err = en.Append(0x81, 0xa3, 0x56, 0x61, 0x6c)
if err != nil {
return
}
err = en.WriteFloat64(z.Val)
if err != nil {
err = msgp.WrapError(err, "Val")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z SimpleEWMA) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "Val"
o = append(o, 0x81, 0xa3, 0x56, 0x61, 0x6c)
o = msgp.AppendFloat64(o, z.Val)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *SimpleEWMA) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Val":
z.Val, bts, err = msgp.ReadFloat64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Val")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z SimpleEWMA) Msgsize() (s int) {
s = 1 + 4 + msgp.Float64Size
return
}

View file

@ -0,0 +1,123 @@
package ewma
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalSimpleEWMA(t *testing.T) {
v := SimpleEWMA{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgSimpleEWMA(b *testing.B) {
v := SimpleEWMA{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgSimpleEWMA(b *testing.B) {
v := SimpleEWMA{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalSimpleEWMA(b *testing.B) {
v := SimpleEWMA{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeSimpleEWMA(t *testing.T) {
v := SimpleEWMA{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeSimpleEWMA Msgsize() is inaccurate")
}
vn := SimpleEWMA{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeSimpleEWMA(b *testing.B) {
v := SimpleEWMA{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeSimpleEWMA(b *testing.B) {
v := SimpleEWMA{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View file

@ -0,0 +1,41 @@
package ewma
// Copyright (c) 2013 VividCortex, Inc. All rights reserved.
// Please see the LICENSE file for applicable license terms.
import (
"math"
"testing"
)
const testMargin = 0.00000001
var samples = [100]float64{
4599, 5711, 4746, 4621, 5037, 4218, 4925, 4281, 5207, 5203, 5594, 5149,
4948, 4994, 6056, 4417, 4973, 4714, 4964, 5280, 5074, 4913, 4119, 4522,
4631, 4341, 4909, 4750, 4663, 5167, 3683, 4964, 5151, 4892, 4171, 5097,
3546, 4144, 4551, 6557, 4234, 5026, 5220, 4144, 5547, 4747, 4732, 5327,
5442, 4176, 4907, 3570, 4684, 4161, 5206, 4952, 4317, 4819, 4668, 4603,
4885, 4645, 4401, 4362, 5035, 3954, 4738, 4545, 5433, 6326, 5927, 4983,
5364, 4598, 5071, 5231, 5250, 4621, 4269, 3953, 3308, 3623, 5264, 5322,
5395, 4753, 4936, 5315, 5243, 5060, 4989, 4921, 4480, 3426, 3687, 4220,
3197, 5139, 6101, 5279,
}
func withinMargin(a, b float64) bool {
return math.Abs(a-b) <= testMargin
}
func TestSimpleEWMA(t *testing.T) {
var e SimpleEWMA
for _, f := range samples {
e.Add(f)
}
if !withinMargin(e.Value(), 4734.500946466118) {
t.Errorf("e.Value() is %v, wanted %v", e.Value(), 4734.500946466118)
}
e.Set(1.0)
if e.Value() != 1.0 {
t.Errorf("e.Value() is %v", e.Value())
}
}