replication improvements for metrics (#12105)

* Various improvements in replication (#11949)

- collect real time replication metrics for prometheus.
- add pending_count, failed_count metric for total pending/failed replication operations.

- add API to get replication metrics

- add MRF worker to handle spill-over replication operations

- multiple issues found with replication
- fixes an issue when client sends a bucket
 name with `/` at the end from SetRemoteTarget
 API call make sure to trim the bucket name to
 avoid any extra `/`.

- hold write locks in GetObjectNInfo during replication
  to ensure that object version stack is not overwritten
  while reading the content.

- add additional protection during WriteMetadata() to
  ensure that we always write a valid FileInfo{} and avoid
  ever writing empty FileInfo{} to the lowest layers.

Co-authored-by: Poorna Krishnamoorthy <poorna@minio.io>
Co-authored-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Poorna Krishnamoorthy 2021-04-20 19:02:25 -07:00 committed by Harshavardhana
parent 4d94d68916
commit f0924b1248
42 changed files with 2656 additions and 475 deletions

View file

@ -21,6 +21,7 @@ import (
"io"
"io/ioutil"
"net/http"
"path"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
@ -50,7 +51,7 @@ func (a adminAPIHandlers) PutBucketQuotaConfigHandler(w http.ResponseWriter, r *
}
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket := path.Clean(vars["bucket"])
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
@ -90,7 +91,8 @@ func (a adminAPIHandlers) GetBucketQuotaConfigHandler(w http.ResponseWriter, r *
}
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket := path.Clean(vars["bucket"])
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
@ -118,7 +120,7 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket := path.Clean(vars["bucket"])
update := r.URL.Query().Get("update") == "true"
if !globalIsErasure {
@ -211,7 +213,7 @@ func (a adminAPIHandlers) ListRemoteTargetsHandler(w http.ResponseWriter, r *htt
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket := path.Clean(vars["bucket"])
arnType := vars["type"]
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
@ -250,7 +252,7 @@ func (a adminAPIHandlers) RemoveRemoteTargetHandler(w http.ResponseWriter, r *ht
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket := path.Clean(vars["bucket"])
arn := vars["arn"]
if !globalIsErasure {

View file

@ -1029,7 +1029,7 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
if !dataUsageInfo.LastUpdate.IsZero() {
size = dataUsageInfo.BucketsUsage[bucket.Name].Size
}
acctInfo.Buckets = append(acctInfo.Buckets, madmin.BucketUsageInfo{
acctInfo.Buckets = append(acctInfo.Buckets, madmin.BucketAccessInfo{
Name: bucket.Name,
Created: bucket.Created,
Size: size,

View file

@ -208,7 +208,6 @@ func registerAPIRouter(router *mux.Router) {
// GetBucketReplicationConfig
bucket.Methods(http.MethodGet).HandlerFunc(
collectAPIStats("getbucketreplicationconfiguration", maxClients(httpTraceAll(api.GetBucketReplicationConfigHandler)))).Queries("replication", "")
// GetBucketVersioning
bucket.Methods(http.MethodGet).HandlerFunc(
collectAPIStats("getbucketversioning", maxClients(httpTraceAll(api.GetBucketVersioningHandler)))).Queries("versioning", "")
@ -275,8 +274,6 @@ func registerAPIRouter(router *mux.Router) {
// PutBucketReplicationConfig
bucket.Methods(http.MethodPut).HandlerFunc(
collectAPIStats("putbucketreplicationconfiguration", maxClients(httpTraceAll(api.PutBucketReplicationConfigHandler)))).Queries("replication", "")
// GetObjectRetention
// PutBucketEncryption
bucket.Methods(http.MethodPut).HandlerFunc(
collectAPIStats("putbucketencryption", maxClients(httpTraceAll(api.PutBucketEncryptionHandler)))).Queries("encryption", "")
@ -327,6 +324,12 @@ func registerAPIRouter(router *mux.Router) {
// ListObjectsV1 (Legacy)
bucket.Methods(http.MethodGet).HandlerFunc(
collectAPIStats("listobjectsv1", maxClients(httpTraceAll(api.ListObjectsV1Handler))))
// MinIO extension API for replication.
//
// GetBucketReplicationMetrics
router.Methods(http.MethodGet).HandlerFunc(
collectAPIStats("getbucketreplicationmetrics", maxClients(httpTraceAll(api.GetBucketReplicationMetricsHandler)))).Queries("replication-metrics", "")
}
/// Root operation

View file

@ -20,6 +20,7 @@ import (
"bytes"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"encoding/xml"
"fmt"
"io"
@ -1603,3 +1604,59 @@ func (api objectAPIHandlers) DeleteBucketReplicationConfigHandler(w http.Respons
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// GetBucketReplicationMetricsHandler - GET Bucket replication metrics.
// ----------
// Gets the replication metrics for a bucket.
func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketReplicationMetrics")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// check if user has permissions to perform this operation
if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
// Check if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
bucketStats := globalNotificationSys.GetClusterBucketStats(r.Context(), bucket)
bucketReplStats := BucketReplicationStats{}
// sum up metrics from each node in the cluster
for _, bucketStat := range bucketStats {
bucketReplStats.FailedCount += bucketStat.ReplicationStats.FailedCount
bucketReplStats.FailedSize += bucketStat.ReplicationStats.FailedSize
bucketReplStats.PendingCount += bucketStat.ReplicationStats.PendingCount
bucketReplStats.PendingSize += bucketStat.ReplicationStats.PendingSize
bucketReplStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize
bucketReplStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize
}
// add initial usage from the time of cluster up
usageStat := globalReplicationStats.GetInitialUsage(bucket)
bucketReplStats.FailedCount += usageStat.FailedCount
bucketReplStats.FailedSize += usageStat.FailedSize
bucketReplStats.PendingCount += usageStat.PendingCount
bucketReplStats.PendingSize += usageStat.PendingSize
bucketReplStats.ReplicaSize += usageStat.ReplicaSize
bucketReplStats.ReplicatedSize += usageStat.ReplicatedSize
if err := json.NewEncoder(w).Encode(&bucketReplStats); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
w.(http.Flusher).Flush()
}

View file

@ -169,7 +169,10 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
}
meta.ReplicationConfigXML = configData
case bucketTargetsFile:
meta.BucketTargetsConfigJSON, meta.BucketTargetsConfigMetaJSON, err = encryptBucketMetadata(meta.Name, configData, crypto.Context{bucket: meta.Name, bucketTargetsFile: bucketTargetsFile})
meta.BucketTargetsConfigJSON, meta.BucketTargetsConfigMetaJSON, err = encryptBucketMetadata(meta.Name, configData, crypto.Context{
bucket: meta.Name,
bucketTargetsFile: bucketTargetsFile,
})
if err != nil {
return fmt.Errorf("Error encrypting bucket target metadata %w", err)
}

View file

@ -416,6 +416,7 @@ func encryptBucketMetadata(bucket string, input []byte, kmsContext crypto.Contex
if err != nil {
return
}
outbuf := bytes.NewBuffer(nil)
objectKey := crypto.GenerateKey(key, rand.Reader)
sealedKey = objectKey.Seal(key, crypto.GenerateIV(rand.Reader), crypto.S3.String(), bucket, "")
@ -437,7 +438,6 @@ func decryptBucketMetadata(input []byte, bucket string, meta map[string]string,
return nil, errKMSNotConfigured
}
keyID, kmsKey, sealedKey, err := crypto.S3.ParseMetadata(meta)
if err != nil {
return nil, err
}
@ -449,8 +449,8 @@ func decryptBucketMetadata(input []byte, bucket string, meta map[string]string,
if err = objectKey.Unseal(extKey, sealedKey, crypto.S3.String(), bucket, ""); err != nil {
return nil, err
}
outbuf := bytes.NewBuffer(nil)
_, err = sio.Decrypt(outbuf, bytes.NewBuffer(input), sio.Config{Key: objectKey[:], MinVersion: sio.Version20})
return outbuf.Bytes(), err
}

View file

@ -88,7 +88,7 @@ func (sys *BucketQuotaSys) check(ctx context.Context, bucket string, size int64)
return err
}
dui := v.(DataUsageInfo)
dui := v.(madmin.DataUsageInfo)
bui, ok := dui.BucketsUsage[bucket]
if !ok {
@ -115,7 +115,7 @@ func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
// enforceFIFOQuota deletes objects in FIFO order until sufficient objects
// have been deleted so as to bring bucket usage within quota.
func enforceFIFOQuotaBucket(ctx context.Context, objectAPI ObjectLayer, bucket string, bui BucketUsageInfo) {
func enforceFIFOQuotaBucket(ctx context.Context, objectAPI ObjectLayer, bucket string, bui madmin.BucketUsageInfo) {
// Check if the current bucket has quota restrictions, if not skip it
cfg, err := globalBucketQuotaSys.Get(bucket)
if err != nil {

View file

@ -0,0 +1,192 @@
/*
* MinIO Cloud Storage, (C) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"sync"
"sync/atomic"
"github.com/minio/minio/pkg/bucket/replication"
)
func (b *BucketReplicationStats) hasReplicationUsage() bool {
return b.PendingSize > 0 ||
b.FailedSize > 0 ||
b.ReplicatedSize > 0 ||
b.ReplicaSize > 0 ||
b.PendingCount > 0 ||
b.FailedCount > 0
}
// ReplicationStats holds the global in-memory replication stats
type ReplicationStats struct {
sync.RWMutex
Cache map[string]*BucketReplicationStats
UsageCache map[string]*BucketReplicationStats // initial usage
}
// Delete deletes in-memory replication statistics for a bucket.
func (r *ReplicationStats) Delete(bucket string) {
if r == nil {
return
}
r.Lock()
defer r.Unlock()
delete(r.Cache, bucket)
delete(r.UsageCache, bucket)
}
// Update updates in-memory replication statistics with new values.
func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus replication.StatusType, opType replication.Type) {
if r == nil {
return
}
r.RLock()
b, ok := r.Cache[bucket]
if !ok {
b = &BucketReplicationStats{}
}
r.RUnlock()
switch status {
case replication.Pending:
if opType == replication.ObjectReplicationType {
atomic.AddUint64(&b.PendingSize, uint64(n))
}
atomic.AddUint64(&b.PendingCount, 1)
case replication.Completed:
switch prevStatus { // adjust counters based on previous state
case replication.Pending:
atomic.AddUint64(&b.PendingCount, ^uint64(0))
case replication.Failed:
atomic.AddUint64(&b.FailedCount, ^uint64(0))
}
if opType == replication.ObjectReplicationType {
atomic.AddUint64(&b.ReplicatedSize, uint64(n))
switch prevStatus {
case replication.Pending:
atomic.AddUint64(&b.PendingSize, ^uint64(n-1))
case replication.Failed:
atomic.AddUint64(&b.FailedSize, ^uint64(n-1))
}
}
case replication.Failed:
// count failures only once - not on every retry
switch prevStatus { // adjust counters based on previous state
case replication.Pending:
atomic.AddUint64(&b.PendingCount, ^uint64(0))
}
if opType == replication.ObjectReplicationType {
if prevStatus == replication.Pending {
atomic.AddUint64(&b.FailedSize, uint64(n))
atomic.AddUint64(&b.FailedCount, 1)
atomic.AddUint64(&b.PendingSize, ^uint64(n-1))
}
}
case replication.Replica:
if opType == replication.ObjectReplicationType {
atomic.AddUint64(&b.ReplicaSize, uint64(n))
}
}
r.Lock()
r.Cache[bucket] = b
r.Unlock()
}
// GetInitialUsage get replication metrics available at the time of cluster initialization
func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats {
if r == nil {
return BucketReplicationStats{}
}
r.RLock()
defer r.RUnlock()
st, ok := r.UsageCache[bucket]
if !ok {
return BucketReplicationStats{}
}
return BucketReplicationStats{
PendingSize: atomic.LoadUint64(&st.PendingSize),
FailedSize: atomic.LoadUint64(&st.FailedSize),
ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize),
ReplicaSize: atomic.LoadUint64(&st.ReplicaSize),
PendingCount: atomic.LoadUint64(&st.PendingCount),
FailedCount: atomic.LoadUint64(&st.FailedCount),
}
}
// Get replication metrics for a bucket from this node since this node came up.
func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
if r == nil {
return BucketReplicationStats{}
}
r.RLock()
defer r.RUnlock()
st, ok := r.Cache[bucket]
if !ok {
return BucketReplicationStats{}
}
return BucketReplicationStats{
PendingSize: atomic.LoadUint64(&st.PendingSize),
FailedSize: atomic.LoadUint64(&st.FailedSize),
ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize),
ReplicaSize: atomic.LoadUint64(&st.ReplicaSize),
PendingCount: atomic.LoadUint64(&st.PendingCount),
FailedCount: atomic.LoadUint64(&st.FailedCount),
}
}
// NewReplicationStats initialize in-memory replication statistics
func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
st := &ReplicationStats{
Cache: make(map[string]*BucketReplicationStats),
UsageCache: make(map[string]*BucketReplicationStats),
}
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
if err != nil {
return st
}
// data usage has not captured any data yet.
if dataUsageInfo.LastUpdate.IsZero() {
return st
}
for bucket, usage := range dataUsageInfo.BucketsUsage {
b := &BucketReplicationStats{
PendingSize: usage.ReplicationPendingSize,
FailedSize: usage.ReplicationFailedSize,
ReplicatedSize: usage.ReplicatedSize,
ReplicaSize: usage.ReplicaSize,
PendingCount: usage.ReplicationPendingCount,
FailedCount: usage.ReplicationFailedCount,
}
if b.hasReplicationUsage() {
st.UsageCache[bucket] = b
}
}
return st
}

View file

@ -291,6 +291,14 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
versionPurgeStatus = Complete
}
}
prevStatus := dobj.DeleteMarkerReplicationStatus
currStatus := replicationStatus
if dobj.VersionID != "" {
prevStatus = string(dobj.VersionPurgeStatus)
currStatus = string(versionPurgeStatus)
}
// to decrement pending count later.
globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType)
var eventName = event.ObjectReplicationComplete
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
@ -594,7 +602,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
})
return
}
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, writeLock, ObjectOptions{
VersionID: objInfo.VersionID,
})
if err != nil {
@ -604,10 +612,10 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
Object: objInfo,
Host: "Internal: [Replication]",
})
logger.LogIf(ctx, err)
logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err))
return
}
defer gr.Close() // hold read lock for entire transaction
defer gr.Close() // hold write lock for entire transaction
objInfo = gr.ObjInfo
size, err := objInfo.GetActualSize()
@ -644,7 +652,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
rtype = getReplicationAction(objInfo, oi)
if rtype == replicateNone {
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed.
// PutObject might have completed
return
}
}
@ -656,7 +664,8 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
srcOpts := miniogo.CopySrcOptions{
Bucket: dest.Bucket,
Object: object,
VersionID: objInfo.VersionID}
VersionID: objInfo.VersionID,
}
dstOpts := miniogo.PutObjectOptions{
Internal: miniogo.AdvancedPutOptions{
SourceVersionID: objInfo.VersionID,
@ -718,6 +727,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
}
}
prevReplStatus := objInfo.ReplicationStatus
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
if objInfo.UserTags != "" {
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
@ -742,6 +752,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
opType := replication.MetadataReplicationType
if rtype == replicateAll {
opType = replication.ObjectReplicationType
}
globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
@ -780,38 +795,61 @@ type DeletedObjectVersionInfo struct {
}
var (
globalReplicationPool *ReplicationPool
globalReplicationPool *ReplicationPool
globalReplicationStats *ReplicationStats
)
// ReplicationPool describes replication pool
type ReplicationPool struct {
mu sync.Mutex
size int
replicaCh chan ObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
killCh chan struct{}
wg sync.WaitGroup
ctx context.Context
objLayer ObjectLayer
mu sync.Mutex
size int
replicaCh chan ObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
mrfReplicaCh chan ObjectInfo
mrfReplicaDeleteCh chan DeletedObjectVersionInfo
killCh chan struct{}
wg sync.WaitGroup
ctx context.Context
objLayer ObjectLayer
}
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool {
pool := &ReplicationPool{
replicaCh: make(chan ObjectInfo, 10000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
ctx: ctx,
objLayer: o,
replicaCh: make(chan ObjectInfo, 1000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 1000),
mrfReplicaCh: make(chan ObjectInfo, 100000),
mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000),
ctx: ctx,
objLayer: o,
}
go func() {
<-ctx.Done()
close(pool.replicaCh)
close(pool.replicaDeleteCh)
}()
pool.Resize(sz)
// add long running worker for handling most recent failures/pending replications
go pool.AddMRFWorker()
return pool
}
// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
// to the other workers
func (p *ReplicationPool) AddMRFWorker() {
for {
select {
case <-p.ctx.Done():
return
case oi, ok := <-p.mrfReplicaCh:
if !ok {
return
}
replicateObject(p.ctx, oi, p.objLayer)
case doi, ok := <-p.mrfReplicaDeleteCh:
if !ok {
return
}
replicateDelete(p.ctx, doi, p.objLayer)
}
}
}
// AddWorker adds a replication worker to the pool
func (p *ReplicationPool) AddWorker() {
defer p.wg.Done()
@ -852,28 +890,39 @@ func (p *ReplicationPool) Resize(n int) {
}
}
func (p *ReplicationPool) queueReplicaTask(oi ObjectInfo) {
func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) {
if p == nil {
return
}
select {
case <-ctx.Done():
close(p.replicaCh)
close(p.mrfReplicaCh)
case p.replicaCh <- oi:
case p.mrfReplicaCh <- oi:
// queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations
default:
}
}
func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi DeletedObjectVersionInfo) {
if p == nil {
return
}
select {
case <-ctx.Done():
close(p.replicaDeleteCh)
close(p.mrfReplicaDeleteCh)
case p.replicaDeleteCh <- doi:
case p.mrfReplicaDeleteCh <- doi:
// queue all overflows into the mrfReplicaDeleteCh to handle incoming pending/failed operations
default:
}
}
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationWorkers())
globalReplicationStats = NewReplicationStats(ctx, objectAPI)
}
// get Reader from replication target if active-active replication is in place and
@ -1009,11 +1058,14 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, op
return oi, proxy, err
}
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool) {
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) {
if sync {
replicateObject(ctx, objInfo, o)
} else {
globalReplicationPool.queueReplicaTask(objInfo)
globalReplicationPool.queueReplicaTask(GlobalContext, objInfo)
}
if sz, err := objInfo.GetActualSize(); err == nil {
globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
}
}
@ -1021,6 +1073,7 @@ func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo,
if sync {
replicateDelete(ctx, dv, o)
} else {
globalReplicationPool.queueReplicaDeleteTask(dv)
globalReplicationPool.queueReplicaDeleteTask(GlobalContext, dv)
}
globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}

41
cmd/bucket-stats.go Normal file
View file

@ -0,0 +1,41 @@
/*
* MinIO Cloud Storage, (C) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
//go:generate msgp -file $GOFILE
// BucketStats bucket statistics
type BucketStats struct {
ReplicationStats BucketReplicationStats
}
// BucketReplicationStats represents inline replication statistics
// such as pending, failed and completed bytes in total for a bucket
type BucketReplicationStats struct {
// Pending size in bytes
PendingSize uint64 `json:"pendingReplicationSize"`
// Completed size in bytes
ReplicatedSize uint64 `json:"completedReplicationSize"`
// Total Replica size in bytes
ReplicaSize uint64 `json:"replicaSize"`
// Failed size in bytes
FailedSize uint64 `json:"failedReplicationSize"`
// Total number of pending operations including metadata updates
PendingCount uint64 `json:"pendingReplicationCount"`
// Total number of failed operations including metadata updates
FailedCount uint64 `json:"failedReplicationCount"`
}

342
cmd/bucket-stats_gen.go Normal file
View file

@ -0,0 +1,342 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *BucketReplicationStats) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "PendingSize":
z.PendingSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "PendingSize")
return
}
case "ReplicatedSize":
z.ReplicatedSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
case "ReplicaSize":
z.ReplicaSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
case "FailedSize":
z.FailedSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "FailedSize")
return
}
case "PendingCount":
z.PendingCount, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "PendingCount")
return
}
case "FailedCount":
z.FailedCount, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "FailedCount")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *BucketReplicationStats) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// write "PendingSize"
err = en.Append(0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.PendingSize)
if err != nil {
err = msgp.WrapError(err, "PendingSize")
return
}
// write "ReplicatedSize"
err = en.Append(0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.ReplicatedSize)
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
// write "ReplicaSize"
err = en.Append(0xab, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.ReplicaSize)
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
// write "FailedSize"
err = en.Append(0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.FailedSize)
if err != nil {
err = msgp.WrapError(err, "FailedSize")
return
}
// write "PendingCount"
err = en.Append(0xac, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteUint64(z.PendingCount)
if err != nil {
err = msgp.WrapError(err, "PendingCount")
return
}
// write "FailedCount"
err = en.Append(0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteUint64(z.FailedCount)
if err != nil {
err = msgp.WrapError(err, "FailedCount")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketReplicationStats) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// string "PendingSize"
o = append(o, 0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.PendingSize)
// string "ReplicatedSize"
o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.ReplicatedSize)
// string "ReplicaSize"
o = append(o, 0xab, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.ReplicaSize)
// string "FailedSize"
o = append(o, 0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.FailedSize)
// string "PendingCount"
o = append(o, 0xac, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74)
o = msgp.AppendUint64(o, z.PendingCount)
// string "FailedCount"
o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74)
o = msgp.AppendUint64(o, z.FailedCount)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *BucketReplicationStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "PendingSize":
z.PendingSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "PendingSize")
return
}
case "ReplicatedSize":
z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
case "ReplicaSize":
z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
case "FailedSize":
z.FailedSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "FailedSize")
return
}
case "PendingCount":
z.PendingCount, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "PendingCount")
return
}
case "FailedCount":
z.FailedCount, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "FailedCount")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketReplicationStats) Msgsize() (s int) {
s = 1 + 12 + msgp.Uint64Size + 15 + msgp.Uint64Size + 12 + msgp.Uint64Size + 11 + msgp.Uint64Size + 13 + msgp.Uint64Size + 12 + msgp.Uint64Size
return
}
// DecodeMsg implements msgp.Decodable
func (z *BucketStats) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
err = z.ReplicationStats.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *BucketStats) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "ReplicationStats"
err = en.Append(0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
if err != nil {
return
}
err = z.ReplicationStats.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketStats) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "ReplicationStats"
o = append(o, 0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
o, err = z.ReplicationStats.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *BucketStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
bts, err = z.ReplicationStats.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketStats) Msgsize() (s int) {
s = 1 + 17 + z.ReplicationStats.Msgsize()
return
}

View file

@ -0,0 +1,236 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalBucketReplicationStats(t *testing.T) {
v := BucketReplicationStats{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeBucketReplicationStats(t *testing.T) {
v := BucketReplicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeBucketReplicationStats Msgsize() is inaccurate")
}
vn := BucketReplicationStats{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalBucketStats(t *testing.T) {
v := BucketStats{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgBucketStats(b *testing.B) {
v := BucketStats{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgBucketStats(b *testing.B) {
v := BucketStats{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalBucketStats(b *testing.B) {
v := BucketStats{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeBucketStats(t *testing.T) {
v := BucketStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeBucketStats Msgsize() is inaccurate")
}
vn := BucketStats{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeBucketStats(b *testing.B) {
v := BucketStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeBucketStats(b *testing.B) {
v := BucketStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View file

@ -31,6 +31,7 @@ import (
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio/cmd/crypto"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/versioning"
"github.com/minio/minio/pkg/madmin"
)
@ -328,6 +329,7 @@ func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objA
for _, bucket := range buckets {
cfg, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket.Name)
if err != nil {
logger.LogIf(ctx, err)
continue
}
if cfg == nil || cfg.Empty() {
@ -339,6 +341,7 @@ func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objA
for _, tgt := range cfg.Targets {
tgtClient, err := sys.getRemoteTargetClient(&tgt)
if err != nil {
logger.LogIf(ctx, err)
continue
}
sys.arnRemotesMap[tgt.Arn] = tgtClient
@ -432,7 +435,10 @@ func parseBucketTargetConfig(bucket string, cdata, cmetadata []byte) (*madmin.Bu
return nil, err
}
if crypto.S3.IsEncrypted(meta) {
if data, err = decryptBucketMetadata(cdata, bucket, meta, crypto.Context{bucket: bucket, bucketTargetsFile: bucketTargetsFile}); err != nil {
if data, err = decryptBucketMetadata(cdata, bucket, meta, crypto.Context{
bucket: bucket,
bucketTargetsFile: bucketTargetsFile,
}); err != nil {
return nil, err
}
}

View file

@ -29,14 +29,15 @@ import (
// API sub-system constants
const (
apiRequestsMax = "requests_max"
apiRequestsDeadline = "requests_deadline"
apiClusterDeadline = "cluster_deadline"
apiCorsAllowOrigin = "cors_allow_origin"
apiRemoteTransportDeadline = "remote_transport_deadline"
apiListQuorum = "list_quorum"
apiExtendListCacheLife = "extend_list_cache_life"
apiReplicationWorkers = "replication_workers"
apiRequestsMax = "requests_max"
apiRequestsDeadline = "requests_deadline"
apiClusterDeadline = "cluster_deadline"
apiCorsAllowOrigin = "cors_allow_origin"
apiRemoteTransportDeadline = "remote_transport_deadline"
apiListQuorum = "list_quorum"
apiExtendListCacheLife = "extend_list_cache_life"
apiReplicationWorkers = "replication_workers"
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE"
@ -87,7 +88,7 @@ var (
},
config.KV{
Key: apiReplicationWorkers,
Value: "100",
Value: "500",
},
}
)

View file

@ -117,7 +117,7 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
}
// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
results := make(chan madmin.DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
logger.LogIf(ctx, err)
@ -773,6 +773,8 @@ type sizeSummary struct {
pendingSize int64
failedSize int64
replicaSize int64
pendingCount uint64
failedCount uint64
}
type getSizeFn func(item scannerItem) (sizeSummary, error)
@ -1088,11 +1090,13 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
}
switch oi.ReplicationStatus {
case replication.Pending:
sizeS.pendingCount++
sizeS.pendingSize += oi.Size
globalReplicationPool.queueReplicaTask(oi)
globalReplicationPool.queueReplicaTask(ctx, oi)
case replication.Failed:
sizeS.failedSize += oi.Size
globalReplicationPool.queueReplicaTask(oi)
sizeS.failedCount++
globalReplicationPool.queueReplicaTask(ctx, oi)
case replication.Completed, "COMPLETE":
sizeS.replicatedSize += oi.Size
case replication.Replica:
@ -1111,7 +1115,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer,
} else {
versionID = oi.VersionID
}
globalReplicationPool.queueReplicaDeleteTask(DeletedObjectVersionInfo{
globalReplicationPool.queueReplicaDeleteTask(ctx, DeletedObjectVersionInfo{
DeletedObject: DeletedObject{
ObjectName: oi.Name,
DeleteMarkerVersionID: dmVersionID,

View file

@ -31,11 +31,9 @@ import (
"sync"
"time"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/env"
"github.com/willf/bloom"
)
@ -80,7 +78,7 @@ func newDataUpdateTracker() *dataUpdateTracker {
Current: dataUpdateFilter{
idx: 1,
},
debug: env.Get(envDataUsageScannerDebug, config.EnableOff) == config.EnableOn || serverDebugLog,
debug: serverDebugLog,
input: make(chan string, dataUpdateTrackerQueueSize),
save: make(chan struct{}, 1),
saveExited: make(chan struct{}),

View file

@ -32,6 +32,7 @@ import (
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/madmin"
"github.com/tinylib/msgp/msgp"
)
@ -45,15 +46,26 @@ type sizeHistogram [dataUsageBucketLen]uint64
//msgp:tuple dataUsageEntry
type dataUsageEntry struct {
Children dataUsageHashMap
// These fields do no include any children.
Size int64
ReplicatedSize uint64
ReplicationPendingSize uint64
ReplicationFailedSize uint64
ReplicaSize uint64
Objects uint64
ObjSizes sizeHistogram
Children dataUsageHashMap
Size int64
Objects uint64
ObjSizes sizeHistogram
ReplicationStats replicationStats
}
//msgp:tuple replicationStats
type replicationStats struct {
PendingSize uint64
ReplicatedSize uint64
FailedSize uint64
ReplicaSize uint64
FailedCount uint64
PendingCount uint64
MissedThresholdSize uint64
AfterThresholdSize uint64
MissedThresholdCount uint64
AfterThresholdCount uint64
}
//msgp:tuple dataUsageEntryV2
@ -65,20 +77,40 @@ type dataUsageEntryV2 struct {
Children dataUsageHashMap
}
// dataUsageCache contains a cache of data usage entries latest version 3.
type dataUsageCache struct {
Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntry
//msgp:tuple dataUsageEntryV3
type dataUsageEntryV3 struct {
// These fields do no include any children.
Size int64
ReplicatedSize uint64
ReplicationPendingSize uint64
ReplicationFailedSize uint64
ReplicaSize uint64
Objects uint64
ObjSizes sizeHistogram
Children dataUsageHashMap
}
// dataUsageCache contains a cache of data usage entries version 2.
// dataUsageCache contains a cache of data usage entries latest version 4.
type dataUsageCache struct {
Info dataUsageCacheInfo
Cache map[string]dataUsageEntry
Disks []string
}
// dataUsageCacheV2 contains a cache of data usage entries version 2.
type dataUsageCacheV2 struct {
Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntryV2
}
// dataUsageCache contains a cache of data usage entries version 3.
type dataUsageCacheV3 struct {
Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntryV3
}
//msgp:ignore dataUsageEntryInfo
type dataUsageEntryInfo struct {
Name string
@ -89,8 +121,8 @@ type dataUsageEntryInfo struct {
type dataUsageCacheInfo struct {
// Name of the bucket. Also root element.
Name string
LastUpdate time.Time
NextCycle uint32
LastUpdate time.Time
// indicates if the disk is being healed and scanner
// should skip healing the disk
SkipHealing bool
@ -100,20 +132,25 @@ type dataUsageCacheInfo struct {
func (e *dataUsageEntry) addSizes(summary sizeSummary) {
e.Size += summary.totalSize
e.ReplicatedSize += uint64(summary.replicatedSize)
e.ReplicationFailedSize += uint64(summary.failedSize)
e.ReplicationPendingSize += uint64(summary.pendingSize)
e.ReplicaSize += uint64(summary.replicaSize)
e.ReplicationStats.ReplicatedSize += uint64(summary.replicatedSize)
e.ReplicationStats.FailedSize += uint64(summary.failedSize)
e.ReplicationStats.PendingSize += uint64(summary.pendingSize)
e.ReplicationStats.ReplicaSize += uint64(summary.replicaSize)
e.ReplicationStats.PendingCount += uint64(summary.pendingCount)
e.ReplicationStats.FailedCount += uint64(summary.failedCount)
}
// merge other data usage entry into this, excluding children.
func (e *dataUsageEntry) merge(other dataUsageEntry) {
e.Objects += other.Objects
e.Size += other.Size
e.ReplicationPendingSize += other.ReplicationPendingSize
e.ReplicationFailedSize += other.ReplicationFailedSize
e.ReplicatedSize += other.ReplicatedSize
e.ReplicaSize += other.ReplicaSize
e.ReplicationStats.PendingSize += other.ReplicationStats.PendingSize
e.ReplicationStats.FailedSize += other.ReplicationStats.FailedSize
e.ReplicationStats.ReplicatedSize += other.ReplicationStats.ReplicatedSize
e.ReplicationStats.ReplicaSize += other.ReplicationStats.ReplicaSize
e.ReplicationStats.PendingCount += other.ReplicationStats.PendingCount
e.ReplicationStats.FailedCount += other.ReplicationStats.FailedCount
for i, v := range other.ObjSizes[:] {
e.ObjSizes[i] += v
@ -238,25 +275,27 @@ func (d *dataUsageCache) keepRootChildren(list map[dataUsageHash]struct{}) {
}
}
// dui converts the flattened version of the path to DataUsageInfo.
// dui converts the flattened version of the path to madmin.DataUsageInfo.
// As a side effect d will be flattened, use a clone if this is not ok.
func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo {
func (d *dataUsageCache) dui(path string, buckets []BucketInfo) madmin.DataUsageInfo {
e := d.find(path)
if e == nil {
// No entry found, return empty.
return DataUsageInfo{}
return madmin.DataUsageInfo{}
}
flat := d.flatten(*e)
return DataUsageInfo{
LastUpdate: d.Info.LastUpdate,
ObjectsTotalCount: flat.Objects,
ObjectsTotalSize: uint64(flat.Size),
ReplicatedSize: flat.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationFailedSize,
ReplicationPendingSize: flat.ReplicationPendingSize,
ReplicaSize: flat.ReplicaSize,
BucketsCount: uint64(len(e.Children)),
BucketsUsage: d.bucketsUsageInfo(buckets),
return madmin.DataUsageInfo{
LastUpdate: d.Info.LastUpdate,
ObjectsTotalCount: flat.Objects,
ObjectsTotalSize: uint64(flat.Size),
ReplicatedSize: flat.ReplicationStats.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationStats.FailedSize,
ReplicationPendingSize: flat.ReplicationStats.PendingSize,
ReplicaSize: flat.ReplicationStats.ReplicaSize,
ReplicationPendingCount: flat.ReplicationStats.PendingCount,
ReplicationFailedCount: flat.ReplicationStats.FailedCount,
BucketsCount: uint64(len(e.Children)),
BucketsUsage: d.bucketsUsageInfo(buckets),
}
}
@ -373,22 +412,24 @@ func (h *sizeHistogram) toMap() map[string]uint64 {
// bucketsUsageInfo returns the buckets usage info as a map, with
// key as bucket name
func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]BucketUsageInfo {
var dst = make(map[string]BucketUsageInfo, len(buckets))
func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]madmin.BucketUsageInfo {
var dst = make(map[string]madmin.BucketUsageInfo, len(buckets))
for _, bucket := range buckets {
e := d.find(bucket.Name)
if e == nil {
continue
}
flat := d.flatten(*e)
dst[bucket.Name] = BucketUsageInfo{
Size: uint64(flat.Size),
ObjectsCount: flat.Objects,
ReplicationPendingSize: flat.ReplicationPendingSize,
ReplicatedSize: flat.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationFailedSize,
ReplicaSize: flat.ReplicaSize,
ObjectSizesHistogram: flat.ObjSizes.toMap(),
dst[bucket.Name] = madmin.BucketUsageInfo{
Size: uint64(flat.Size),
ObjectsCount: flat.Objects,
ReplicationPendingSize: flat.ReplicationStats.PendingSize,
ReplicatedSize: flat.ReplicationStats.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationStats.FailedSize,
ReplicationPendingCount: flat.ReplicationStats.PendingCount,
ReplicationFailedCount: flat.ReplicationStats.FailedCount,
ReplicaSize: flat.ReplicationStats.ReplicaSize,
ObjectSizesHistogram: flat.ObjSizes.toMap(),
}
}
return dst
@ -396,20 +437,22 @@ func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]Bucke
// bucketUsageInfo returns the buckets usage info.
// If not found all values returned are zero values.
func (d *dataUsageCache) bucketUsageInfo(bucket string) BucketUsageInfo {
func (d *dataUsageCache) bucketUsageInfo(bucket string) madmin.BucketUsageInfo {
e := d.find(bucket)
if e == nil {
return BucketUsageInfo{}
return madmin.BucketUsageInfo{}
}
flat := d.flatten(*e)
return BucketUsageInfo{
Size: uint64(flat.Size),
ObjectsCount: flat.Objects,
ReplicationPendingSize: flat.ReplicationPendingSize,
ReplicatedSize: flat.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationFailedSize,
ReplicaSize: flat.ReplicaSize,
ObjectSizesHistogram: flat.ObjSizes.toMap(),
return madmin.BucketUsageInfo{
Size: uint64(flat.Size),
ObjectsCount: flat.Objects,
ReplicationPendingSize: flat.ReplicationStats.PendingSize,
ReplicationPendingCount: flat.ReplicationStats.PendingCount,
ReplicatedSize: flat.ReplicationStats.ReplicatedSize,
ReplicationFailedSize: flat.ReplicationStats.FailedSize,
ReplicationFailedCount: flat.ReplicationStats.FailedCount,
ReplicaSize: flat.ReplicationStats.ReplicaSize,
ObjectSizesHistogram: flat.ObjSizes.toMap(),
}
}
@ -533,6 +576,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
// Bumping the cache version will drop data from previous versions
// and write new data with the new version.
const (
dataUsageCacheVerV4 = 4
dataUsageCacheVerV3 = 3
dataUsageCacheVerV2 = 2
dataUsageCacheVerV1 = 1
@ -541,7 +585,7 @@ const (
// serialize the contents of the cache.
func (d *dataUsageCache) serializeTo(dst io.Writer) error {
// Add version and compress.
_, err := dst.Write([]byte{dataUsageCacheVerV3})
_, err := dst.Write([]byte{dataUsageCacheVerV4})
if err != nil {
return err
}
@ -609,6 +653,35 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
return err
}
defer dec.Close()
dold := &dataUsageCacheV3{}
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
return err
}
d.Info = dold.Info
d.Disks = dold.Disks
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
for k, v := range dold.Cache {
d.Cache[k] = dataUsageEntry{
Size: v.Size,
Objects: v.Objects,
ObjSizes: v.ObjSizes,
Children: v.Children,
ReplicationStats: replicationStats{
ReplicatedSize: v.ReplicatedSize,
ReplicaSize: v.ReplicaSize,
FailedSize: v.ReplicationFailedSize,
PendingSize: v.ReplicationPendingSize,
},
}
}
return nil
case dataUsageCacheVerV4:
// Zstd compressed.
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
if err != nil {
return err
}
defer dec.Close()
return d.DecodeMsg(msgp.NewReader(dec))
}

File diff suppressed because it is too large Load diff

View file

@ -348,6 +348,119 @@ func BenchmarkDecodedataUsageCacheV2(b *testing.B) {
}
}
func TestMarshalUnmarshaldataUsageCacheV3(t *testing.T) {
v := dataUsageCacheV3{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgdataUsageCacheV3(b *testing.B) {
v := dataUsageCacheV3{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgdataUsageCacheV3(b *testing.B) {
v := dataUsageCacheV3{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshaldataUsageCacheV3(b *testing.B) {
v := dataUsageCacheV3{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodedataUsageCacheV3(t *testing.T) {
v := dataUsageCacheV3{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodedataUsageCacheV3 Msgsize() is inaccurate")
}
vn := dataUsageCacheV3{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodedataUsageCacheV3(b *testing.B) {
v := dataUsageCacheV3{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodedataUsageCacheV3(b *testing.B) {
v := dataUsageCacheV3{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshaldataUsageEntry(t *testing.T) {
v := dataUsageEntry{}
bts, err := v.MarshalMsg(nil)
@ -574,6 +687,232 @@ func BenchmarkDecodedataUsageEntryV2(b *testing.B) {
}
}
func TestMarshalUnmarshaldataUsageEntryV3(t *testing.T) {
v := dataUsageEntryV3{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgdataUsageEntryV3(b *testing.B) {
v := dataUsageEntryV3{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgdataUsageEntryV3(b *testing.B) {
v := dataUsageEntryV3{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshaldataUsageEntryV3(b *testing.B) {
v := dataUsageEntryV3{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodedataUsageEntryV3(t *testing.T) {
v := dataUsageEntryV3{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodedataUsageEntryV3 Msgsize() is inaccurate")
}
vn := dataUsageEntryV3{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodedataUsageEntryV3(b *testing.B) {
v := dataUsageEntryV3{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodedataUsageEntryV3(b *testing.B) {
v := dataUsageEntryV3{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalreplicationStats(t *testing.T) {
v := replicationStats{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgreplicationStats(b *testing.B) {
v := replicationStats{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgreplicationStats(b *testing.B) {
v := replicationStats{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalreplicationStats(b *testing.B) {
v := replicationStats{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodereplicationStats(t *testing.T) {
v := replicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodereplicationStats Msgsize() is inaccurate")
}
vn := replicationStats{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodereplicationStats(b *testing.B) {
v := replicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodereplicationStats(b *testing.B) {
v := replicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalsizeHistogram(t *testing.T) {
v := sizeHistogram{}
bts, err := v.MarshalMsg(nil)

View file

@ -25,11 +25,10 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/madmin"
)
const (
envDataUsageScannerDebug = "MINIO_DISK_USAGE_SCANNER_DEBUG"
dataUsageRoot = SlashSeparator
dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix
@ -39,7 +38,7 @@ const (
)
// storeDataUsageInBackend will store all objects sent on the gui channel until closed.
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan DataUsageInfo) {
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan madmin.DataUsageInfo) {
for dataUsageInfo := range dui {
dataUsageJSON, err := json.Marshal(dataUsageInfo)
if err != nil {
@ -59,27 +58,27 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan
}
}
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (madmin.DataUsageInfo, error) {
r, err := objAPI.GetObjectNInfo(ctx, dataUsageBucket, dataUsageObjName, nil, http.Header{}, readLock, ObjectOptions{})
if err != nil {
if isErrObjectNotFound(err) || isErrBucketNotFound(err) {
return DataUsageInfo{}, nil
return madmin.DataUsageInfo{}, nil
}
return DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName)
return madmin.DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName)
}
defer r.Close()
var dataUsageInfo DataUsageInfo
var dataUsageInfo madmin.DataUsageInfo
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err = json.NewDecoder(r).Decode(&dataUsageInfo); err != nil {
return DataUsageInfo{}, err
return madmin.DataUsageInfo{}, err
}
// For forward compatibility reasons, we need to add this code.
if len(dataUsageInfo.BucketsUsage) == 0 {
dataUsageInfo.BucketsUsage = make(map[string]BucketUsageInfo, len(dataUsageInfo.BucketSizes))
dataUsageInfo.BucketsUsage = make(map[string]madmin.BucketUsageInfo, len(dataUsageInfo.BucketSizes))
for bucket, size := range dataUsageInfo.BucketSizes {
dataUsageInfo.BucketsUsage[bucket] = BucketUsageInfo{Size: size}
dataUsageInfo.BucketsUsage[bucket] = madmin.BucketUsageInfo{Size: size}
}
}

View file

@ -135,7 +135,8 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
errFileVersionNotFound,
errDiskNotFound,
}...) {
logger.LogOnceIf(ctx, fmt.Errorf("Drive %s returned an error (%w)", disks[index], err),
logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
disks[index], bucket, object, err),
disks[index].String())
}
}

View file

@ -322,8 +322,12 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
return errDiskNotFound
}
// Pick one FileInfo for a disk at index.
files[index].Erasure.Index = index + 1
return disks[index].WriteMetadata(ctx, bucket, prefix, files[index])
fi := files[index]
fi.Erasure.Index = index + 1
if fi.IsValid() {
return disks[index].WriteMetadata(ctx, bucket, prefix, fi)
}
return errCorruptedFormat
}, index)
}

View file

@ -71,7 +71,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
// Read metadata associated with the object from all disks.
storageDisks := er.getDisks()
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false)
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true)
// get Quorum for this object
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
@ -1191,7 +1191,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil {
@ -1255,7 +1255,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
disks := er.getDisks()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil {

View file

@ -433,7 +433,7 @@ func (z *erasureServerPools) StorageInfo(ctx context.Context) (StorageInfo, []er
return storageInfo, errs
}
func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -448,7 +448,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
}
if len(allBuckets) == 0 {
updates <- DataUsageInfo{} // no buckets found update data usage to reflect latest state
updates <- madmin.DataUsageInfo{} // no buckets found update data usage to reflect latest state
return nil
}

View file

@ -237,7 +237,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) {
}
// NSScanner returns data usage stats of the current FS deployment
func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error {
// Load bucket totals
var totalCache dataUsageCache
err := totalCache.load(ctx, fs, dataUsageCacheName)

View file

@ -47,7 +47,7 @@ func (a GatewayUnsupported) LocalStorageInfo(ctx context.Context) (StorageInfo,
}
// NSScanner - scanner is not implemented for gateway
func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error {
logger.CriticalIf(ctx, errors.New("not implemented"))
return NotImplemented{}
}

View file

@ -44,7 +44,7 @@ const (
healMetricNamespace MetricNamespace = "minio_heal"
interNodeMetricNamespace MetricNamespace = "minio_inter_node"
nodeMetricNamespace MetricNamespace = "minio_node"
minIOMetricNamespace MetricNamespace = "minio"
minioMetricNamespace MetricNamespace = "minio"
s3MetricNamespace MetricNamespace = "minio_s3"
)
@ -88,9 +88,11 @@ const (
readTotal MetricName = "read_total"
writeTotal MetricName = "write_total"
failedCount MetricName = "failed_count"
failedBytes MetricName = "failed_bytes"
freeBytes MetricName = "free_bytes"
pendingBytes MetricName = "pending_bytes"
pendingCount MetricName = "pending_count"
readBytes MetricName = "read_bytes"
rcharBytes MetricName = "rchar_bytes"
receivedBytes MetricName = "received_bytes"
@ -351,6 +353,16 @@ func getNodeDiskTotalBytesMD() MetricDescription {
Type: gaugeMetric,
}
}
func getUsageLastScanActivityMD() MetricDescription {
return MetricDescription{
Namespace: minioMetricNamespace,
Subsystem: usageSubsystem,
Name: lastActivityTime,
Help: "Time elapsed (in nano seconds) since last scan activity. This is set to 0 until first scan cycle",
Type: gaugeMetric,
}
}
func getBucketUsageTotalBytesMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@ -405,6 +417,24 @@ func getBucketRepReceivedBytesMD() MetricDescription {
Type: gaugeMetric,
}
}
func getBucketRepPendingOperationsMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
Subsystem: replicationSubsystem,
Name: pendingCount,
Help: "Total number of objects pending replication",
Type: gaugeMetric,
}
}
func getBucketRepFailedOperationsMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
Subsystem: replicationSubsystem,
Name: failedCount,
Help: "Total number of objects which failed replication",
Type: gaugeMetric,
}
}
func getBucketObjectDistributionMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@ -625,7 +655,7 @@ func getNodeOfflineTotalMD() MetricDescription {
}
func getMinIOVersionMD() MetricDescription {
return MetricDescription{
Namespace: minIOMetricNamespace,
Namespace: minioMetricNamespace,
Subsystem: softwareSubsystem,
Name: versionInfo,
Help: "MinIO Release tag for the server",
@ -634,7 +664,7 @@ func getMinIOVersionMD() MetricDescription {
}
func getMinIOCommitMD() MetricDescription {
return MetricDescription{
Namespace: minIOMetricNamespace,
Namespace: minioMetricNamespace,
Subsystem: softwareSubsystem,
Name: commitInfo,
Help: "Git commit hash for the MinIO release.",
@ -955,13 +985,14 @@ func getMinioHealingMetrics() MetricsGroup {
if !exists {
return
}
var dur time.Duration
if !bgSeq.lastHealActivity.IsZero() {
dur = time.Since(bgSeq.lastHealActivity)
if bgSeq.lastHealActivity.IsZero() {
return
}
metrics = append(metrics, Metric{
Description: getHealLastActivityTimeMD(),
Value: float64(dur),
Value: float64(time.Since(bgSeq.lastHealActivity)),
})
metrics = append(metrics, getObjectsScanned(bgSeq)...)
metrics = append(metrics, getScannedItems(bgSeq)...)
@ -1167,7 +1198,14 @@ func getBucketUsageMetrics() MetricsGroup {
return
}
metrics = append(metrics, Metric{
Description: getUsageLastScanActivityMD(),
Value: float64(time.Since(dataUsageInfo.LastUpdate)),
})
for bucket, usage := range dataUsageInfo.BucketsUsage {
stat := getLatestReplicationStats(bucket, usage)
metrics = append(metrics, Metric{
Description: getBucketUsageTotalBytesMD(),
Value: float64(usage.Size),
@ -1180,25 +1218,35 @@ func getBucketUsageMetrics() MetricsGroup {
VariableLabels: map[string]string{"bucket": bucket},
})
if usage.hasReplicationUsage() {
if stat.hasReplicationUsage() {
metrics = append(metrics, Metric{
Description: getBucketRepPendingBytesMD(),
Value: float64(usage.ReplicationPendingSize),
Value: float64(stat.PendingSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepFailedBytesMD(),
Value: float64(usage.ReplicationFailedSize),
Value: float64(stat.FailedSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepSentBytesMD(),
Value: float64(usage.ReplicatedSize),
Value: float64(stat.ReplicatedSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepReceivedBytesMD(),
Value: float64(usage.ReplicaSize),
Value: float64(stat.ReplicaSize),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepPendingOperationsMD(),
Value: float64(stat.PendingCount),
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketRepFailedOperationsMD(),
Value: float64(stat.FailedCount),
VariableLabels: map[string]string{"bucket": bucket},
})
}
@ -1315,13 +1363,6 @@ func getClusterStorageMetrics() MetricsGroup {
}
}
func (b *BucketUsageInfo) hasReplicationUsage() bool {
return b.ReplicationPendingSize > 0 ||
b.ReplicationFailedSize > 0 ||
b.ReplicatedSize > 0 ||
b.ReplicaSize > 0
}
type minioClusterCollector struct {
desc *prometheus.Desc
}

View file

@ -23,6 +23,7 @@ import (
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
@ -430,6 +431,67 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
)
}
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
replStats := BucketReplicationStats{}
for _, bucketStat := range bucketStats {
replStats.FailedCount += bucketStat.ReplicationStats.FailedCount
replStats.FailedSize += bucketStat.ReplicationStats.FailedSize
replStats.PendingCount += bucketStat.ReplicationStats.PendingCount
replStats.PendingSize += bucketStat.ReplicationStats.PendingSize
replStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize
replStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize
}
usageStat := globalReplicationStats.GetInitialUsage(bucket)
replStats.FailedCount += usageStat.FailedCount
replStats.FailedSize += usageStat.FailedSize
replStats.PendingCount += usageStat.PendingCount
replStats.PendingSize += usageStat.PendingSize
replStats.ReplicaSize += usageStat.ReplicaSize
replStats.ReplicatedSize += usageStat.ReplicatedSize
// use in memory replication stats if it is ahead of usage info.
if replStats.ReplicatedSize >= u.ReplicatedSize {
s.ReplicatedSize = replStats.ReplicatedSize
} else {
s.ReplicatedSize = u.ReplicatedSize
}
if replStats.PendingSize > u.ReplicationPendingSize {
s.PendingSize = replStats.PendingSize
} else {
s.PendingSize = u.ReplicationPendingSize
}
if replStats.FailedSize > u.ReplicationFailedSize {
s.FailedSize = replStats.FailedSize
} else {
s.FailedSize = u.ReplicationFailedSize
}
if replStats.ReplicaSize > u.ReplicaSize {
s.ReplicaSize = replStats.ReplicaSize
} else {
s.ReplicaSize = u.ReplicaSize
}
if replStats.PendingCount > u.ReplicationPendingCount {
s.PendingCount = replStats.PendingCount
} else {
s.PendingCount = u.ReplicationPendingCount
}
if replStats.FailedCount > u.ReplicationFailedCount {
s.FailedCount = replStats.FailedCount
} else {
s.FailedCount = u.ReplicationFailedCount
}
return s
}
// Populates prometheus with bucket usage metrics, this metrics
// is only enabled if scanner is enabled.
func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
@ -447,13 +509,13 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
if err != nil {
return
}
// data usage has not captured any data yet.
if dataUsageInfo.LastUpdate.IsZero() {
return
}
for bucket, usageInfo := range dataUsageInfo.BucketsUsage {
stat := getLatestReplicationStats(bucket, usageInfo)
// Total space used by bucket
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
@ -479,7 +541,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
"Total capacity pending to be replicated",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicationPendingSize),
float64(stat.PendingSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
@ -488,7 +550,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
"Total capacity failed to replicate at least once",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicationFailedSize),
float64(stat.FailedSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
@ -497,7 +559,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
"Total capacity replicated to destination",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicatedSize),
float64(stat.ReplicatedSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
@ -506,7 +568,25 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) {
"Total capacity replicated to this instance",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(usageInfo.ReplicaSize),
float64(stat.ReplicaSize),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "pending_count"),
"Total replication operations pending",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(stat.PendingCount),
bucket,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName("bucket", "replication", "failed_count"),
"Total replication operations failed",
[]string{"bucket"}, nil),
prometheus.GaugeValue,
float64(stat.FailedCount),
bucket,
)
for k, v := range usageInfo.ObjectSizesHistogram {

View file

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
@ -615,6 +616,8 @@ func (sys *NotificationSys) findEarliestCleanBloomFilter(ctx context.Context, di
return best
}
var errPeerNotReachable = errors.New("peer is not reachable")
// GetLocks - makes GetLocks RPC call on all peers.
func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*PeerLocks {
locksResp := make([]*PeerLocks, len(sys.peerClients))
@ -623,7 +626,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe
index := index
g.Go(func() error {
if client == nil {
return nil
return errPeerNotReachable
}
serverLocksResp, err := sys.peerClients[index].GetLocks()
if err != nil {
@ -671,6 +674,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s
// DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers
func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) {
globalReplicationStats.Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName)
if localMetacacheMgr != nil {
localMetacacheMgr.deleteBucketCache(bucketName)
@ -694,6 +698,37 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
}
}
// GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view.
func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats {
ng := WithNPeers(len(sys.peerClients))
bucketStats := make([]BucketStats, len(sys.peerClients))
for index, client := range sys.peerClients {
index := index
client := client
ng.Go(ctx, func() error {
if client == nil {
return errPeerNotReachable
}
bs, err := client.GetBucketStats(bucketName)
if err != nil {
return err
}
bucketStats[index] = bs
return nil
}, index, *client.host)
}
for _, nErr := range ng.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
if nErr.Err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
}
}
bucketStats = append(bucketStats, BucketStats{
ReplicationStats: globalReplicationStats.Get(bucketName),
})
return bucketStats
}
// Loads notification policies for all buckets into NotificationSys.
func (sys *NotificationSys) load(buckets []BucketInfo) {
for _, bucket := range buckets {

View file

@ -69,57 +69,6 @@ var ObjectsHistogramIntervals = []objectHistogramInterval{
{"GREATER_THAN_512_MB", humanize.MiByte * 512, math.MaxInt64},
}
// BucketUsageInfo - bucket usage info provides
// - total size of the bucket
// - total objects in a bucket
// - object size histogram per bucket
type BucketUsageInfo struct {
Size uint64 `json:"size"`
ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"`
ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"`
ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"`
ReplicaSize uint64 `json:"objectReplicaTotalSize"`
ObjectsCount uint64 `json:"objectsCount"`
ObjectSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"`
}
// DataUsageInfo represents data usage stats of the underlying Object API
type DataUsageInfo struct {
// LastUpdate is the timestamp of when the data usage info was last updated.
// This does not indicate a full scan.
LastUpdate time.Time `json:"lastUpdate"`
// Objects total count across all buckets
ObjectsTotalCount uint64 `json:"objectsCount"`
// Objects total size across all buckets
ObjectsTotalSize uint64 `json:"objectsTotalSize"`
// Total Size for objects that have not yet been replicated
ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"`
// Total size for objects that have witness one or more failures and will be retried
ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"`
// Total size for objects that have been replicated to destination
ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"`
// Total size for objects that are replicas
ReplicaSize uint64 `json:"objectsReplicaTotalSize"`
// Total number of buckets in this cluster
BucketsCount uint64 `json:"bucketsCount"`
// Buckets usage info provides following information across all buckets
// - total size of the bucket
// - total objects in a bucket
// - object size histogram per bucket
BucketsUsage map[string]BucketUsageInfo `json:"bucketsUsageInfo"`
// Deprecated kept here for backward compatibility reasons.
BucketSizes map[string]uint64 `json:"bucketsSizes"`
}
// BucketInfo - represents bucket metadata.
type BucketInfo struct {
// Name of the bucket.

View file

@ -91,7 +91,7 @@ type ObjectLayer interface {
// Storage operations.
Shutdown(context.Context) error
NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error
NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error
BackendInfo() madmin.BackendInfo
StorageInfo(ctx context.Context) (StorageInfo, []error)

View file

@ -1339,7 +1339,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync)
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
setPutObjHeaders(w, objInfo, false)
@ -1654,7 +1654,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync)
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
setPutObjHeaders(w, objInfo, false)
@ -2737,7 +2737,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
setPutObjHeaders(w, objInfo, false)
if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync)
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
// Write success response.
@ -3017,7 +3017,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
return
}
if replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync)
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.MetadataReplicationType)
}
writeSuccessResponseHeadersOnly(w)
@ -3190,7 +3190,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
return
}
if replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync)
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.MetadataReplicationType)
}
writeSuccessNoContent(w)
@ -3373,7 +3373,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
}
if replicate {
scheduleReplication(ctx, objInfo.Clone(), objAPI, sync)
scheduleReplication(ctx, objInfo.Clone(), objAPI, sync, replication.MetadataReplicationType)
}
if objInfo.VersionID != "" {
@ -3447,7 +3447,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
}
if replicate {
scheduleReplication(ctx, oi.Clone(), objAPI, sync)
scheduleReplication(ctx, oi.Clone(), objAPI, sync, replication.MetadataReplicationType)
}
if oi.VersionID != "" {

View file

@ -434,6 +434,20 @@ func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err
return data, err
}
// GetBucketStats - load bucket statistics
func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error) {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodGetBucketStats, values, nil, -1)
if err != nil {
return BucketStats{}, err
}
var bs BucketStats
defer http.DrainBody(respBody)
return bs, msgp.Decode(respBody, &bs)
}
// LoadBucketMetadata - load bucket metadata
func (client *peerRESTClient) LoadBucketMetadata(bucket string) error {
values := make(url.Values)

View file

@ -17,7 +17,7 @@
package cmd
const (
peerRESTVersion = "v12"
peerRESTVersion = "v14" // Add GetBucketStats API
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@ -36,6 +36,7 @@ const (
peerRESTMethodDispatchNetInfo = "/dispatchnetinfo"
peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata"
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
peerRESTMethodGetBucketStats = "/getbucketstats"
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"

View file

@ -532,12 +532,35 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *h
return
}
globalReplicationStats.Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName)
if localMetacacheMgr != nil {
localMetacacheMgr.deleteBucketCache(bucketName)
}
}
// GetBucketStatsHandler - fetches current in-memory bucket stats, currently only
// returns BucketReplicationStatus
func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
bs := BucketStats{
ReplicationStats: globalReplicationStats.Get(bucketName),
}
defer w.(http.Flusher).Flush()
logger.LogIf(r.Context(), msgp.Encode(w, &bs))
}
// LoadBucketMetadataHandler - reloads in memory bucket metadata
func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -1069,6 +1092,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(httpTraceHdrs(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerUpdate).HandlerFunc(httpTraceHdrs(server.ServerUpdateHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeletePolicy).HandlerFunc(httpTraceAll(server.DeletePolicyHandler)).Queries(restQueries(peerRESTPolicy)...)

View file

@ -1336,7 +1336,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}
}
if mustReplicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync)
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType)
}
reqParams := extractReqParams(r)

View file

@ -904,6 +904,7 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F
if err = xlMeta.Load(buf); err != nil {
return err
}
if err = xlMeta.AddVersion(fi); err != nil {
return err
}

View file

@ -12,6 +12,7 @@ These metrics can be from any MinIO server once per collection.
|`minio_bucket_replication_pending_bytes` |Total bytes pending to replicate. |
|`minio_bucket_replication_received_bytes` |Total number of bytes replicated to this bucket from another source bucket. |
|`minio_bucket_replication_sent_bytes` |Total number of bytes replicated to the target bucket. |
|`minio_bucket_replication_pending_count` |Total number of replication operations pending for this bucket. |
|`minio_bucket_usage_object_total` |Total number of objects |
|`minio_bucket_usage_total_bytes` |Total bucket size in bytes |
|`minio_cache_hits_total` |Total number of disk cache hits |

View file

@ -258,15 +258,15 @@ Fetches accounting usage information for the current authenticated user
| Param | Type | Description |
|--------------------------------|----------------------|-------------------------|
| `AccountInfo.AccountName` | _string_ | Account name. |
| `AccountInfo.Buckets` | _[]BucketUsageInfo_ | Bucket usage info. |
| `AccountInfo.Buckets` | _[]BucketAccessInfo_ | Bucket usage info. |
| Param | Type | Description |
|----------------------------|-----------------|-----------------------------------------|
| `BucketUsageInfo.Name` | _string_ | The name of the current bucket
| `BucketUsageInfo.Size` | _uint64_ | The total size of the current bucket
| `BucketUsageInfo.Created` | _time.Time_ | Bucket creation time
| `BucketUsageInfo.Access` | _AccountAccess_ | Type of access of the current account
| `BucketAccessInfo.Name` | _string_ | The name of the current bucket
| `BucketAccessInfo.Size` | _uint64_ | The total size of the current bucket
| `BucketAccessInfo.Created` | _time.Time_ | Bucket creation time
| `BucketAccessInfo.Access` | _AccountAccess_ | Type of access of the current account
| Param | Type | Description |

View file

@ -20,7 +20,6 @@ package madmin
import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"time"
)
@ -124,36 +123,71 @@ func (adm *AdminClient) StorageInfo(ctx context.Context) (StorageInfo, error) {
// Unmarshal the server's json response
var storageInfo StorageInfo
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return StorageInfo{}, err
}
err = json.Unmarshal(respBytes, &storageInfo)
if err != nil {
if err = json.NewDecoder(resp.Body).Decode(&storageInfo); err != nil {
return StorageInfo{}, err
}
return storageInfo, nil
}
// DataUsageInfo represents data usage of an Object API
// BucketUsageInfo - bucket usage info provides
// - total size of the bucket
// - total objects in a bucket
// - object size histogram per bucket
type BucketUsageInfo struct {
Size uint64 `json:"size"`
ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"`
ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"`
ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"`
ReplicaSize uint64 `json:"objectReplicaTotalSize"`
ReplicationPendingCount uint64 `json:"objectsPendingReplicationCount"`
ReplicationFailedCount uint64 `json:"objectsFailedReplicationCount"`
ObjectsCount uint64 `json:"objectsCount"`
ObjectSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"`
}
// DataUsageInfo represents data usage stats of the underlying Object API
type DataUsageInfo struct {
// LastUpdate is the timestamp of when the data usage info was last updated.
// This does not indicate a full scan.
LastUpdate time.Time `json:"lastUpdate"`
ObjectsCount uint64 `json:"objectsCount"`
ObjectsTotalSize uint64 `json:"objectsTotalSize"`
LastUpdate time.Time `json:"lastUpdate"`
// ObjectsSizesHistogram contains information on objects across all buckets.
// See ObjectsHistogramIntervals.
ObjectsSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"`
// Objects total count across all buckets
ObjectsTotalCount uint64 `json:"objectsCount"`
// Objects total size across all buckets
ObjectsTotalSize uint64 `json:"objectsTotalSize"`
// Total Size for objects that have not yet been replicated
ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"`
// Total size for objects that have witness one or more failures and will be retried
ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"`
// Total size for objects that have been replicated to destination
ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"`
// Total size for objects that are replicas
ReplicaSize uint64 `json:"objectsReplicaTotalSize"`
// Total number of objects pending replication
ReplicationPendingCount uint64 `json:"objectsPendingReplicationCount"`
// Total number of objects that failed replication
ReplicationFailedCount uint64 `json:"objectsFailedReplicationCount"`
// Total number of buckets in this cluster
BucketsCount uint64 `json:"bucketsCount"`
// BucketsSizes is "bucket name" -> size.
BucketsSizes map[string]uint64 `json:"bucketsSizes"`
// Buckets usage info provides following information across all buckets
// - total size of the bucket
// - total objects in a bucket
// - object size histogram per bucket
BucketsUsage map[string]BucketUsageInfo `json:"bucketsUsageInfo"`
// Deprecated kept here for backward compatibility reasons.
BucketSizes map[string]uint64 `json:"bucketsSizes"`
}
// DataUsageInfo - returns data usage of the current object API
@ -171,14 +205,7 @@ func (adm *AdminClient) DataUsageInfo(ctx context.Context) (DataUsageInfo, error
// Unmarshal the server's json response
var dataUsageInfo DataUsageInfo
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return DataUsageInfo{}, err
}
err = json.Unmarshal(respBytes, &dataUsageInfo)
if err != nil {
if err = json.NewDecoder(resp.Body).Decode(&dataUsageInfo); err != nil {
return DataUsageInfo{}, err
}
@ -344,14 +371,7 @@ func (adm *AdminClient) ServerInfo(ctx context.Context) (InfoMessage, error) {
// Unmarshal the server's json response
var message InfoMessage
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return InfoMessage{}, err
}
err = json.Unmarshal(respBytes, &message)
if err != nil {
if err = json.NewDecoder(resp.Body).Decode(&message); err != nil {
return InfoMessage{}, err
}

View file

@ -20,7 +20,6 @@ package madmin
import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
)
@ -53,10 +52,9 @@ func (adm *AdminClient) ServerUpdate(ctx context.Context, updateURL string) (us
return us, httpRespToErrorResponse(resp)
}
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
if err = json.NewDecoder(resp.Body).Decode(&us); err != nil {
return us, err
}
err = json.Unmarshal(buf, &us)
return us, err
return us, nil
}

View file

@ -35,9 +35,9 @@ type AccountAccess struct {
Write bool `json:"write"`
}
// BucketUsageInfo represents bucket usage of a bucket, and its relevant
// BucketAccessInfo represents bucket usage of a bucket, and its relevant
// access type for an account
type BucketUsageInfo struct {
type BucketAccessInfo struct {
Name string `json:"name"`
Size uint64 `json:"size"`
Created time.Time `json:"created"`
@ -49,7 +49,7 @@ type BucketUsageInfo struct {
type AccountInfo struct {
AccountName string
Policy iampolicy.Policy
Buckets []BucketUsageInfo
Buckets []BucketAccessInfo
}
// AccountInfo returns the usage info for the authenticating account.