Drop Pending size and count from replication metrics (#12378)

Real-time metrics calculated in-memory rely on the initial
replication metrics saved with data usage. However, this can
lag behind the actual state of the cluster at the time of server 
restart leading to inaccurate Pending size/counts reported to
Prometheus. Dropping the Pending metrics as this can be more 
reliably monitored by applications with replication notifications.

Signed-off-by: Poorna Krishnamoorthy <poorna@minio.io>
This commit is contained in:
Poorna Krishnamoorthy 2021-05-31 20:26:52 -07:00 committed by GitHub
parent ab7410af11
commit 3690de0c6b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 113 deletions

View file

@ -26,11 +26,9 @@ import (
)
func (b *BucketReplicationStats) hasReplicationUsage() bool {
return b.PendingSize > 0 ||
b.FailedSize > 0 ||
return b.FailedSize > 0 ||
b.ReplicatedSize > 0 ||
b.ReplicaSize > 0 ||
b.PendingCount > 0 ||
b.FailedCount > 0
}
@ -67,38 +65,23 @@ func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus rep
}
r.RUnlock()
switch status {
case replication.Pending:
if opType == replication.ObjectReplicationType {
atomic.AddUint64(&b.PendingSize, uint64(n))
}
atomic.AddUint64(&b.PendingCount, 1)
case replication.Completed:
switch prevStatus { // adjust counters based on previous state
case replication.Pending:
atomic.AddUint64(&b.PendingCount, ^uint64(0))
case replication.Failed:
atomic.AddUint64(&b.FailedCount, ^uint64(0))
}
if opType == replication.ObjectReplicationType {
atomic.AddUint64(&b.ReplicatedSize, uint64(n))
switch prevStatus {
case replication.Pending:
atomic.AddUint64(&b.PendingSize, ^uint64(n-1))
case replication.Failed:
atomic.AddUint64(&b.FailedSize, ^uint64(n-1))
}
}
case replication.Failed:
// count failures only once - not on every retry
switch prevStatus { // adjust counters based on previous state
case replication.Pending:
atomic.AddUint64(&b.PendingCount, ^uint64(0))
}
if opType == replication.ObjectReplicationType {
if prevStatus == replication.Pending {
atomic.AddUint64(&b.FailedSize, uint64(n))
atomic.AddUint64(&b.FailedCount, 1)
atomic.AddUint64(&b.PendingSize, ^uint64(n-1))
}
}
case replication.Replica:
@ -125,11 +108,9 @@ func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats
return BucketReplicationStats{}
}
return BucketReplicationStats{
PendingSize: atomic.LoadUint64(&st.PendingSize),
FailedSize: atomic.LoadUint64(&st.FailedSize),
ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize),
ReplicaSize: atomic.LoadUint64(&st.ReplicaSize),
PendingCount: atomic.LoadUint64(&st.PendingCount),
FailedCount: atomic.LoadUint64(&st.FailedCount),
}
}
@ -149,11 +130,9 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
}
return BucketReplicationStats{
PendingSize: atomic.LoadUint64(&st.PendingSize),
FailedSize: atomic.LoadUint64(&st.FailedSize),
ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize),
ReplicaSize: atomic.LoadUint64(&st.ReplicaSize),
PendingCount: atomic.LoadUint64(&st.PendingCount),
FailedCount: atomic.LoadUint64(&st.FailedCount),
}
}
@ -177,11 +156,9 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio
for bucket, usage := range dataUsageInfo.BucketsUsage {
b := &BucketReplicationStats{
PendingSize: usage.ReplicationPendingSize,
FailedSize: usage.ReplicationFailedSize,
ReplicatedSize: usage.ReplicatedSize,
ReplicaSize: usage.ReplicaSize,
PendingCount: usage.ReplicationPendingCount,
FailedCount: usage.ReplicationFailedCount,
}
if b.hasReplicationUsage() {

View file

@ -98,8 +98,6 @@ const (
failedCount MetricName = "failed_count"
failedBytes MetricName = "failed_bytes"
freeBytes MetricName = "free_bytes"
pendingBytes MetricName = "pending_bytes"
pendingCount MetricName = "pending_count"
readBytes MetricName = "read_bytes"
rcharBytes MetricName = "rchar_bytes"
receivedBytes MetricName = "received_bytes"
@ -400,15 +398,7 @@ func getBucketUsageObjectsTotalMD() MetricDescription {
Type: gaugeMetric,
}
}
func getBucketRepPendingBytesMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
Subsystem: replicationSubsystem,
Name: pendingBytes,
Help: "Total bytes pending to replicate.",
Type: gaugeMetric,
}
}
func getBucketRepFailedBytesMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@ -436,15 +426,7 @@ func getBucketRepReceivedBytesMD() MetricDescription {
Type: gaugeMetric,
}
}
func getBucketRepPendingOperationsMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
Subsystem: replicationSubsystem,
Name: pendingCount,
Help: "Total number of objects pending replication",
Type: gaugeMetric,
}
}
func getBucketRepFailedOperationsMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@ -1318,11 +1300,6 @@ func getBucketUsageMetrics() MetricsGroup {
})
if stat.hasReplicationUsage() {
metrics = append(metrics, Metric{
Description: getBucketRepPendingBytesMD(),
Value: float64(stat.PendingSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepFailedBytesMD(),
Value: float64(stat.FailedSize),
@ -1338,11 +1315,6 @@ func getBucketUsageMetrics() MetricsGroup {
Value: float64(stat.ReplicaSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepPendingOperationsMD(),
Value: float64(stat.PendingCount),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepFailedOperationsMD(),
Value: float64(stat.FailedCount),

View file

@ -18,6 +18,7 @@
package cmd
import (
"math"
"net/http"
"strings"
"sync/atomic"
@ -441,56 +442,23 @@ func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) (s Bucke
for _, bucketStat := range bucketStats {
replStats.FailedCount += bucketStat.ReplicationStats.FailedCount
replStats.FailedSize += bucketStat.ReplicationStats.FailedSize
replStats.PendingCount += bucketStat.ReplicationStats.PendingCount
replStats.PendingSize += bucketStat.ReplicationStats.PendingSize
replStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize
replStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize
}
usageStat := globalReplicationStats.GetInitialUsage(bucket)
replStats.FailedCount += usageStat.FailedCount
replStats.FailedSize += usageStat.FailedSize
replStats.PendingCount += usageStat.PendingCount
replStats.PendingSize += usageStat.PendingSize
replStats.ReplicaSize += usageStat.ReplicaSize
replStats.ReplicatedSize += usageStat.ReplicatedSize
// use in memory replication stats if it is ahead of usage info.
s.ReplicatedSize = u.ReplicatedSize
if replStats.ReplicatedSize >= u.ReplicatedSize {
s.ReplicatedSize = replStats.ReplicatedSize
} else {
s.ReplicatedSize = u.ReplicatedSize
}
if replStats.PendingSize > u.ReplicationPendingSize {
s.PendingSize = replStats.PendingSize
} else {
s.PendingSize = u.ReplicationPendingSize
}
if replStats.FailedSize > u.ReplicationFailedSize {
s.FailedSize = replStats.FailedSize
} else {
s.FailedSize = u.ReplicationFailedSize
}
if replStats.ReplicaSize > u.ReplicaSize {
s.ReplicaSize = replStats.ReplicaSize
} else {
s.ReplicaSize = u.ReplicaSize
}
if replStats.PendingCount > u.ReplicationPendingCount {
s.PendingCount = replStats.PendingCount
} else {
s.PendingCount = u.ReplicationPendingCount
}
if replStats.FailedCount > u.ReplicationFailedCount {
s.FailedCount = replStats.FailedCount
} else {
s.FailedCount = u.ReplicationFailedCount
}
// Reset FailedSize and FailedCount to 0 for negative overflows which can
// happen since data usage picture can lag behind actual usage state at the time of cluster start
s.FailedSize = uint64(math.Max(float64(replStats.FailedSize), 0))
s.FailedCount = uint64(math.Max(float64(replStats.FailedCount), 0))
s.ReplicaSize = uint64(math.Max(float64(replStats.ReplicaSize), float64(u.ReplicaSize)))
return s
}
@ -537,15 +505,6 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
float64(usageInfo.ObjectsCount),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "pending_size"),
"Total capacity pending to be replicated",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(stat.PendingSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "failed_size"),
@ -573,15 +532,6 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
float64(stat.ReplicaSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "pending_count"),
"Total replication operations pending",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(stat.PendingCount),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "failed_count"),

View file

@ -9,10 +9,8 @@ These metrics can be from any MinIO server once per collection.
|:---------------------------------------------|:--------------------------------------------------------------------------------------------------------------------|
| `minio_bucket_objects_size_distribution` | Distribution of object sizes in the bucket, includes label for the bucket name. |
| `minio_bucket_replication_failed_bytes` | Total number of bytes failed at least once to replicate. |
| `minio_bucket_replication_pending_bytes` | Total bytes pending to replicate. |
| `minio_bucket_replication_received_bytes` | Total number of bytes replicated to this bucket from another source bucket. |
| `minio_bucket_replication_sent_bytes` | Total number of bytes replicated to the target bucket. |
| `minio_bucket_replication_pending_count` | Total number of replication operations pending for this bucket. |
| `minio_bucket_replication_failed_count` | Total number of replication foperations failed for this bucket. |
| `minio_bucket_usage_object_total` | Total number of objects |
| `minio_bucket_usage_total_bytes` | Total bucket size in bytes |