From 19ecdc75a8e787eb0a576092088282c41d44cb36 Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Wed, 22 Sep 2021 13:48:45 -0400 Subject: [PATCH] replication: Simplify metrics calculation (#13274) Also doing some code cleanup --- cmd/bucket-replication.go | 6 +----- cmd/bucket-stats.go | 23 ----------------------- cmd/data-scanner.go | 16 ++++++---------- cmd/data-usage-cache.go | 2 ++ cmd/metrics.go | 31 +++++++++++++++---------------- 5 files changed, 24 insertions(+), 54 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 237a89e9c..7615958b1 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -893,10 +893,6 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje }(i, tgt) } wg.Wait() - // metadata update once.. - if objInfo.UserTags != "" { - objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags - } // FIXME: add support for missing replication events // - event.ObjectReplicationMissedThreshold // - event.ObjectReplicationReplicatedAfterThreshold @@ -962,7 +958,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje // replicateObjectToTarget replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { - objInfo := ri.ObjectInfo + objInfo := ri.ObjectInfo.Clone() bucket := objInfo.Bucket object := objInfo.Name var ( diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index ab3dcbf6d..40144688f 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -51,29 +51,6 @@ func (brs *BucketReplicationStats) Empty() bool { return len(brs.Stats) == 0 && brs.ReplicaSize == 0 } -// UpdateStat updates replication stats for the target arn -func (brs *BucketReplicationStats) UpdateStat(arn string, stat *BucketReplicationStat) { - var s BucketReplicationStat - if st, ok := brs.Stats[arn]; ok { - s = *st - } - // update target metric - atomic.AddInt64(&s.FailedSize, stat.FailedSize) - atomic.AddInt64(&s.FailedCount, stat.FailedCount) - atomic.AddInt64(&s.PendingCount, stat.PendingCount) - atomic.AddInt64(&s.PendingSize, stat.PendingSize) - atomic.AddInt64(&s.ReplicaSize, stat.ReplicaSize) - atomic.AddInt64(&s.ReplicatedSize, stat.ReplicatedSize) - // update total counts across targets - atomic.AddInt64(&brs.FailedSize, stat.FailedSize) - atomic.AddInt64(&brs.FailedCount, stat.FailedCount) - atomic.AddInt64(&brs.PendingCount, stat.PendingCount) - atomic.AddInt64(&brs.PendingSize, stat.PendingSize) - atomic.AddInt64(&brs.ReplicaSize, stat.ReplicaSize) - atomic.AddInt64(&brs.ReplicatedSize, stat.ReplicatedSize) - brs.Stats[arn] = &s -} - // Clone creates a new BucketReplicationStats copy func (brs BucketReplicationStats) Clone() BucketReplicationStats { c := BucketReplicationStats{ diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 40ca18031..63eb6ec85 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1126,29 +1126,25 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj case replication.Pending: tgtSizeS.pendingCount++ tgtSizeS.pendingSize += oi.Size + sizeS.pendingCount++ + sizeS.pendingSize += oi.Size case replication.Failed: tgtSizeS.failedSize += oi.Size tgtSizeS.failedCount++ + sizeS.failedSize += oi.Size + sizeS.failedCount++ case replication.Completed, "COMPLETE": tgtSizeS.replicatedSize += oi.Size + sizeS.replicatedSize += oi.Size } sizeS.replTargetStats[arn] = tgtSizeS } } switch oi.ReplicationStatus { - case replication.Pending: - sizeS.pendingCount++ - sizeS.pendingSize += oi.Size + case replication.Pending, replication.Failed: globalReplicationPool.queueReplicaTask(roi) return - case replication.Failed: - sizeS.failedSize += oi.Size - sizeS.failedCount++ - globalReplicationPool.queueReplicaTask(roi) - return - case replication.Completed, "COMPLETE": - sizeS.replicatedSize += oi.Size case replication.Replica: sizeS.replicaSize += oi.Size } diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 0fcb91919..1e0764ebe 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -1003,6 +1003,7 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { FailedSize: v.ReplicationStats.FailedSize, FailedCount: v.ReplicationStats.FailedCount, PendingSize: v.ReplicationStats.PendingSize, + PendingCount: v.ReplicationStats.PendingCount, } due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize } @@ -1061,6 +1062,7 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { FailedSize: v.ReplicationStats.FailedSize, FailedCount: v.ReplicationStats.FailedCount, PendingSize: v.ReplicationStats.PendingSize, + PendingCount: v.ReplicationStats.PendingCount, } due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize } diff --git a/cmd/metrics.go b/cmd/metrics.go index 25f78c439..31ce9a2e9 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -435,13 +435,12 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) { // get the most current of in-memory replication stats and data usage info from crawler. func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) { - s = BucketReplicationStats{} - bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) // accumulate cluster bucket stats stats := make(map[string]*BucketReplicationStat) + var totReplicaSize int64 for _, bucketStat := range bucketStats { - s.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize + totReplicaSize += bucketStat.ReplicationStats.ReplicaSize for arn, stat := range bucketStat.ReplicationStats.Stats { oldst, ok := stats[arn] if !ok { @@ -452,16 +451,13 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic FailedSize: stat.FailedSize + oldst.FailedSize, ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize, } - s.FailedCount += stats[arn].FailedCount - s.FailedSize += stats[arn].FailedSize - s.ReplicatedSize += stats[arn].ReplicatedSize } } - s.Stats = make(map[string]*BucketReplicationStat, len(stats)) // add initial usage stat to cluster stats usageStat := globalReplicationStats.GetInitialUsage(bucket) if usageStat.Stats != nil { + totReplicaSize += usageStat.ReplicaSize for arn, stat := range usageStat.Stats { st, ok := stats[arn] if !ok { @@ -478,34 +474,37 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic stats[arn] = st } } - s.ReplicaSize += usageStat.ReplicaSize + s = BucketReplicationStats{ + Stats: make(map[string]*BucketReplicationStat, len(stats)), + } + var latestTotReplicatedSize int64 + for _, st := range u.ReplicationInfo { + latestTotReplicatedSize += int64(st.ReplicatedSize) + } // normalize computed real time stats with latest usage stat - var usgReplicatedSize int64 for arn, tgtstat := range stats { st := BucketReplicationStat{} bu, ok := usageStat.Stats[arn] if !ok { bu = &BucketReplicationStat{} } - usgReplicatedSize += bu.ReplicatedSize // use in memory replication stats if it is ahead of usage info. st.ReplicatedSize = bu.ReplicatedSize if tgtstat.ReplicatedSize >= bu.ReplicatedSize { st.ReplicatedSize = tgtstat.ReplicatedSize } + s.ReplicatedSize += st.ReplicatedSize // Reset FailedSize and FailedCount to 0 for negative overflows which can // happen since data usage picture can lag behind actual usage state at the time of cluster start st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0)) st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0)) - st.ReplicaSize = int64(math.Max(float64(tgtstat.ReplicaSize), float64(u.ReplicaSize))) s.Stats[arn] = &st + s.FailedSize += st.FailedSize + s.FailedCount += st.FailedCount } // normalize overall stats - s.FailedSize = int64(math.Max(float64(s.FailedSize), 0)) - s.FailedCount = int64(math.Max(float64(s.FailedCount), 0)) - s.ReplicaSize = int64(math.Max(float64(s.ReplicaSize), float64(u.ReplicaSize))) - s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(usgReplicatedSize))) - + s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize))) + s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize))) return s }