diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index 8cebd402a..e2131d8f2 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -21,6 +21,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/minio/minio/internal/bucket/replication" ) @@ -67,7 +68,7 @@ func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) { if !ok { bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} } - atomic.StoreInt64(&bs.ReplicaSize, n) + atomic.AddInt64(&bs.ReplicaSize, n) r.Cache[bucket] = bs } @@ -122,44 +123,13 @@ func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats if r == nil { return BucketReplicationStats{} } - r.ulock.RLock() - - brs := BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} - + defer r.ulock.RUnlock() st, ok := r.UsageCache[bucket] if ok { return st.Clone() } - r.ulock.RUnlock() - - dataUsageInfo, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) - if err != nil { - return brs - } - // data usage has not captured any data yet. - if dataUsageInfo.LastUpdate.IsZero() { - return brs - } - usage, ok := dataUsageInfo.BucketsUsage[bucket] - if ok && usage.ReplicationInfo != nil { - brs.ReplicaSize = int64(usage.ReplicaSize) - for arn, uinfo := range usage.ReplicationInfo { - brs.Stats[arn] = &BucketReplicationStat{ - FailedSize: int64(uinfo.ReplicationFailedSize), - ReplicatedSize: int64(uinfo.ReplicatedSize), - ReplicaSize: int64(uinfo.ReplicaSize), - FailedCount: int64(uinfo.ReplicationFailedCount), - } - } - if brs.hasReplicationUsage() { - r.ulock.Lock() - defer r.ulock.Unlock() - r.UsageCache[bucket] = &brs - } - - } - return brs + return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} } // Get replication metrics for a bucket from this node since this node came up. @@ -180,38 +150,51 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { // NewReplicationStats initialize in-memory replication statistics func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats { - st := &ReplicationStats{ + return &ReplicationStats{ Cache: make(map[string]*BucketReplicationStats), UsageCache: make(map[string]*BucketReplicationStats), } - - dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) - if err != nil { - return st - } - - // data usage has not captured any data yet. - if dataUsageInfo.LastUpdate.IsZero() { - return st - } - - for bucket, usage := range dataUsageInfo.BucketsUsage { - b := &BucketReplicationStats{ - Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)), - } - for arn, uinfo := range usage.ReplicationInfo { - b.Stats[arn] = &BucketReplicationStat{ - FailedSize: int64(uinfo.ReplicationFailedSize), - ReplicatedSize: int64(uinfo.ReplicatedSize), - ReplicaSize: int64(uinfo.ReplicaSize), - FailedCount: int64(uinfo.ReplicationFailedCount), - } - } - b.ReplicaSize += int64(usage.ReplicaSize) - if b.hasReplicationUsage() { - st.UsageCache[bucket] = b - } - } - - return st +} + +// load replication metrics at cluster start from initial data usage +func (r *ReplicationStats) loadInitialReplicationMetrics(ctx context.Context) { + rTimer := time.NewTimer(time.Minute * 1) + defer rTimer.Stop() + for { + select { + case <-ctx.Done(): + return + case <-rTimer.C: + dui, err := loadDataUsageFromBackend(GlobalContext, newObjectLayerFn()) + if err != nil { + continue + } + // data usage has not captured any data yet. + if dui.LastUpdate.IsZero() { + continue + } + m := make(map[string]*BucketReplicationStats) + for bucket, usage := range dui.BucketsUsage { + b := &BucketReplicationStats{ + Stats: make(map[string]*BucketReplicationStat, len(usage.ReplicationInfo)), + } + for arn, uinfo := range usage.ReplicationInfo { + b.Stats[arn] = &BucketReplicationStat{ + FailedSize: int64(uinfo.ReplicationFailedSize), + ReplicatedSize: int64(uinfo.ReplicatedSize), + ReplicaSize: int64(uinfo.ReplicaSize), + FailedCount: int64(uinfo.ReplicationFailedCount), + } + } + b.ReplicaSize += int64(usage.ReplicaSize) + if b.hasReplicationUsage() { + m[bucket] = b + } + } + r.ulock.Lock() + defer r.ulock.Unlock() + r.UsageCache = m + return + } + } } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 564cb1663..0c22c0689 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1497,6 +1497,7 @@ func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { FailedWorkers: globalAPIConfig.getReplicationFailedWorkers(), }) globalReplicationStats = NewReplicationStats(ctx, objectAPI) + go globalReplicationStats.loadInitialReplicationMetrics(ctx) } // get Reader from replication target if active-active replication is in place and diff --git a/cmd/metrics.go b/cmd/metrics.go index 31ce9a2e9..195d72909 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -442,8 +442,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic for _, bucketStat := range bucketStats { totReplicaSize += bucketStat.ReplicationStats.ReplicaSize for arn, stat := range bucketStat.ReplicationStats.Stats { - oldst, ok := stats[arn] - if !ok { + oldst := stats[arn] + if oldst == nil { oldst = &BucketReplicationStat{} } stats[arn] = &BucketReplicationStat{ @@ -459,8 +459,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic if usageStat.Stats != nil { totReplicaSize += usageStat.ReplicaSize for arn, stat := range usageStat.Stats { - st, ok := stats[arn] - if !ok { + st := stats[arn] + if st == nil { st = &BucketReplicationStat{ ReplicatedSize: stat.ReplicatedSize, FailedSize: stat.FailedSize, @@ -484,13 +484,13 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic // normalize computed real time stats with latest usage stat for arn, tgtstat := range stats { st := BucketReplicationStat{} - bu, ok := usageStat.Stats[arn] + bu, ok := u.ReplicationInfo[arn] if !ok { - bu = &BucketReplicationStat{} + bu = BucketTargetUsageInfo{} } // use in memory replication stats if it is ahead of usage info. - st.ReplicatedSize = bu.ReplicatedSize - if tgtstat.ReplicatedSize >= bu.ReplicatedSize { + st.ReplicatedSize = int64(bu.ReplicatedSize) + if tgtstat.ReplicatedSize >= int64(bu.ReplicatedSize) { st.ReplicatedSize = tgtstat.ReplicatedSize } s.ReplicatedSize += st.ReplicatedSize diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 67df89ecd..3d0ed77ec 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -3278,9 +3278,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() { scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType) } - if objInfo.ReplicationStatus == replication.Replica { + if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { actualSize, _ := objInfo.GetActualSize() - globalReplicationStats.UpdateReplicaStat(bucket, actualSize) + defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) } // Write success response.