Avoid metadata update for incoming replication failure (#12054)

This is an optimization to save IOPS. The replication
failures will be re-queued once more to re-attempt
replication. If it still does not succeed, the replication
status is set as `FAILED` and will be caught up on
scanner cycle.
This commit is contained in:
Poorna Krishnamoorthy 2021-04-15 16:32:00 -07:00 committed by Harshavardhana
parent 23a37d0855
commit 7617d6963c
4 changed files with 54 additions and 30 deletions

View file

@ -571,12 +571,13 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
z, ok := objectAPI.(*erasureServerPools)
if !ok {
return
}
objInfo := ri.ObjectInfo
bucket := objInfo.Bucket
object := objInfo.Name
@ -741,28 +742,43 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
eventName = event.ObjectReplicationFailed
}
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, objInfo.UserDefined, ObjectOptions{
VersionID: objInfo.VersionID,
}); err != nil {
// Leave metadata in `PENDING` state if inline replication fails to save iops
if ri.OpType == replication.HealReplicationType || replicationStatus == replication.Completed {
// This lower level implementation is necessary to avoid write locks from CopyObject.
poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} else {
metadata := make(map[string]string, len(objInfo.UserDefined))
for k, v := range objInfo.UserDefined {
metadata[k] = v
}
if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, metadata, ObjectOptions{
VersionID: objInfo.VersionID,
}); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
}
}
opType := replication.MetadataReplicationType
if rtype == replicateAll {
opType = replication.ObjectReplicationType
}
globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
}
opType := replication.MetadataReplicationType
if rtype == replicateAll {
opType = replication.ObjectReplicationType
// re-queue failures once more - keep a retry count to avoid flooding the queue if
// the target site is down. Leave it to scanner to catch up instead.
if replicationStatus == replication.Failed && ri.RetryCount < 1 {
ri.OpType = replication.HealReplicationType
ri.RetryCount++
globalReplicationPool.queueReplicaTask(ctx, ri)
}
globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
}
// filterReplicationStatusMetadata filters replication status metadata for COPY
@ -803,9 +819,9 @@ var (
type ReplicationPool struct {
mu sync.Mutex
size int
replicaCh chan ObjectInfo
replicaCh chan ReplicateObjectInfo
replicaDeleteCh chan DeletedObjectVersionInfo
mrfReplicaCh chan ObjectInfo
mrfReplicaCh chan ReplicateObjectInfo
mrfReplicaDeleteCh chan DeletedObjectVersionInfo
killCh chan struct{}
wg sync.WaitGroup
@ -816,9 +832,9 @@ type ReplicationPool struct {
// NewReplicationPool creates a pool of replication workers of specified size
func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool {
pool := &ReplicationPool{
replicaCh: make(chan ObjectInfo, 1000),
replicaCh: make(chan ReplicateObjectInfo, 1000),
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 1000),
mrfReplicaCh: make(chan ObjectInfo, 100000),
mrfReplicaCh: make(chan ReplicateObjectInfo, 100000),
mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000),
ctx: ctx,
objLayer: o,
@ -890,7 +906,7 @@ func (p *ReplicationPool) Resize(n int) {
}
}
func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) {
func (p *ReplicationPool) queueReplicaTask(ctx context.Context, ri ReplicateObjectInfo) {
if p == nil {
return
}
@ -898,8 +914,8 @@ func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) {
case <-ctx.Done():
close(p.replicaCh)
close(p.mrfReplicaCh)
case p.replicaCh <- oi:
case p.mrfReplicaCh <- oi:
case p.replicaCh <- ri:
case p.mrfReplicaCh <- ri:
// queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations
default:
}
@ -1060,9 +1076,9 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, op
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) {
if sync {
replicateObject(ctx, objInfo, o)
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o)
} else {
globalReplicationPool.queueReplicaTask(GlobalContext, objInfo)
globalReplicationPool.queueReplicaTask(GlobalContext, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType})
}
if sz, err := objInfo.GetActualSize(); err == nil {
globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)

View file

@ -1098,11 +1098,11 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
case replication.Pending:
sizeS.pendingCount++
sizeS.pendingSize += oi.Size
globalReplicationPool.queueReplicaTask(ctx, oi)
globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType})
case replication.Failed:
sizeS.failedSize += oi.Size
sizeS.failedCount++
globalReplicationPool.queueReplicaTask(ctx, oi)
globalReplicationPool.queueReplicaTask(ctx, ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType})
case replication.Completed, "COMPLETE":
sizeS.replicatedSize += oi.Size
case replication.Replica:

View file

@ -220,6 +220,13 @@ func (o ObjectInfo) Clone() (cinfo ObjectInfo) {
return cinfo
}
// ReplicateObjectInfo represents object info to be replicated
type ReplicateObjectInfo struct {
ObjectInfo
OpType replication.Type
RetryCount uint32
}
// MultipartInfo captures metadata information about the uploadId
// this data structure is used primarily for some internal purposes
// for verifying upload type such as was the upload

View file

@ -122,6 +122,7 @@ const (
ObjectReplicationType Type = 1 + iota
DeleteReplicationType
MetadataReplicationType
HealReplicationType
)
// ObjectOpts provides information to deduce whether replication