Add healthcheck back for replication targets (#13168)

This will allow objects to relinquish read lock held during
replication earlier if the target is known to be down
without waiting for connection timeout when replication 
is attempted.
This commit is contained in:
Poorna Krishnamoorthy 2021-09-08 18:34:50 -04:00 committed by GitHub
parent 198a838d00
commit 9af4e7b1da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 6 deletions

View file

@ -306,6 +306,21 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
return
}
if tgt.IsOffline() {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, rcfg.RoleArn))
sendEvent(eventArgs{
BucketName: bucket,
Object: ObjectInfo{
Bucket: bucket,
Name: dobj.ObjectName,
VersionID: versionID,
DeleteMarker: dobj.DeleteMarker,
},
Host: "Internal: [Replication]",
EventName: event.ObjectReplicationNotTracked,
})
return
}
// Lock the object name before starting replication operation.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
@ -692,7 +707,16 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
})
return
}
if tgt.IsOffline() {
logger.LogIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, cfg.RoleArn))
sendEvent(eventArgs{
EventName: event.ObjectReplicationNotTracked,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
return
}
// Lock the object name before starting replication.
// Use separate lock that doesn't collide with regular objects.
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
@ -1318,7 +1342,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec
return nil, oi, false, nil
}
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
if tgt == nil {
if tgt == nil || tgt.IsOffline() {
return nil, oi, false, fmt.Errorf("target is offline or not configured")
}
// if proxying explicitly disabled on remote target

View file

@ -37,7 +37,7 @@ import (
)
const (
defaultHealthCheckDuration = 100 * time.Second
defaultHealthCheckDuration = 30 * time.Second
)
// BucketTargetSys represents bucket targets subsystem
@ -93,6 +93,9 @@ func (sys *BucketTargetSys) Delete(bucket string) {
return
}
for _, t := range tgts {
if tgt, ok := sys.arnRemotesMap[t.Arn]; ok && tgt.healthCancelFn != nil {
tgt.healthCancelFn()
}
delete(sys.arnRemotesMap, t.Arn)
}
delete(sys.targetsMap, bucket)
@ -224,6 +227,9 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
return BucketRemoteTargetNotFound{Bucket: bucket}
}
sys.targetsMap[bucket] = targets
if tgt, ok := sys.arnRemotesMap[arnStr]; ok && tgt.healthCancelFn != nil {
tgt.healthCancelFn()
}
delete(sys.arnRemotesMap, arnStr)
sys.updateBandwidthLimit(bucket, 0)
return nil
@ -286,6 +292,9 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
// remove target and arn association
if tgts, ok := sys.targetsMap[bucket]; ok {
for _, t := range tgts {
if tgt, ok := sys.arnRemotesMap[t.Arn]; ok && tgt.healthCancelFn != nil {
tgt.healthCancelFn()
}
delete(sys.arnRemotesMap, t.Arn)
}
}
@ -360,6 +369,10 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T
if tcfg.HealthCheckDuration >= 1 { // require minimum health check duration of 1 sec.
hcDuration = tcfg.HealthCheckDuration
}
cancelFn, err := api.HealthCheck(hcDuration)
if err != nil {
return nil, err
}
tc := &TargetClient{
Client: api,
healthCheckDuration: hcDuration,
@ -367,6 +380,7 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T
Bucket: tcfg.TargetBucket,
StorageClass: tcfg.StorageClass,
disableProxy: tcfg.DisableProxy,
healthCancelFn: cancelFn,
}
return tc, nil
}
@ -445,4 +459,5 @@ type TargetClient struct {
replicateSync bool
StorageClass string // storage class on remote
disableProxy bool
healthCancelFn context.CancelFunc // cancellation function for client healthcheck
}

2
go.mod
View file

@ -46,7 +46,7 @@ require (
github.com/minio/highwayhash v1.0.2
github.com/minio/kes v0.14.0
github.com/minio/madmin-go v1.1.0
github.com/minio/minio-go/v7 v7.0.13-0.20210823191913-cee488b95ff2
github.com/minio/minio-go/v7 v7.0.14-0.20210908194250-617d530ffac5
github.com/minio/parquet-go v1.0.0
github.com/minio/pkg v1.1.2
github.com/minio/selfupdate v0.3.1

4
go.sum
View file

@ -1036,8 +1036,8 @@ github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78/go.mod h1:mTh2uJuAbEqdhMVl6CMIIZLUeiMiWtJR4JB8/5g2skw=
github.com/minio/minio-go/v7 v7.0.11-0.20210607181445-e162fdb8e584/go.mod h1:WoyW+ySKAKjY98B9+7ZbI8z8S3jaxaisdcvj9TGlazA=
github.com/minio/minio-go/v7 v7.0.13-0.20210715203016-9e713532886e/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs=
github.com/minio/minio-go/v7 v7.0.13-0.20210823191913-cee488b95ff2 h1:+/AXRNJS9cVfmPqyJVw0Mg2u4XNsXSOBBtLfk2IuK7o=
github.com/minio/minio-go/v7 v7.0.13-0.20210823191913-cee488b95ff2/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs=
github.com/minio/minio-go/v7 v7.0.14-0.20210908194250-617d530ffac5 h1:c6okzYcdOLPP9tHoOE/JxiWi5qSQpvFC6VqbA4FB/Iw=
github.com/minio/minio-go/v7 v7.0.14-0.20210908194250-617d530ffac5/go.mod h1:S23iSP5/gbMwtxeY5FM71R+TkAYyzEdoNEDDwpt8yWs=
github.com/minio/operator v0.0.0-20210812082324-26350f153661 h1:dGAJHpfmhNukFg0M0wDqH+G1OB2YPgZCcT6uv4n9YQk=
github.com/minio/operator v0.0.0-20210812082324-26350f153661/go.mod h1:zQqn6VGT46xlSpVXh1I/VZRv+eSgHtVu6URdg71YKX8=
github.com/minio/operator/logsearchapi v0.0.0-20210812082324-26350f153661 h1:tJw15hS3b1dVTf5PwA4roXZ/oRNnHyZ/8Y+yNTmQ5rA=