fix: crash in single drive mode for lifecycle (#12077)

also make sure to close the channel on the producer
side, not in a separate go-routine, this can lead
to races between a writer and a closer.

fixes #12073
This commit is contained in:
Harshavardhana 2021-04-16 14:09:25 -07:00 committed by GitHub
parent ca9b48b3b4
commit 0a9d8dfb0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 18 deletions

View file

@ -24,6 +24,7 @@ import (
"net/http"
"runtime"
"strings"
"sync"
"time"
miniogo "github.com/minio/minio-go/v7"
@ -71,11 +72,16 @@ type expiryTask struct {
}
type expiryState struct {
once sync.Once
expiryCh chan expiryTask
}
func (es *expiryState) queueExpiryTask(oi ObjectInfo, rmVersion bool) {
select {
case <-GlobalContext.Done():
es.once.Do(func() {
close(es.expiryCh)
})
case es.expiryCh <- expiryTask{objInfo: oi, versionExpiry: rmVersion}:
default:
}
@ -86,14 +92,9 @@ var (
)
func newExpiryState() *expiryState {
es := &expiryState{
return &expiryState{
expiryCh: make(chan expiryTask, 10000),
}
go func() {
<-GlobalContext.Done()
close(es.expiryCh)
}()
return es
}
func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
@ -106,12 +107,17 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
}
type transitionState struct {
once sync.Once
// add future metrics here
transitionCh chan ObjectInfo
}
func (t *transitionState) queueTransitionTask(oi ObjectInfo) {
select {
case <-GlobalContext.Done():
t.once.Do(func() {
close(t.transitionCh)
})
case t.transitionCh <- oi:
default:
}
@ -123,19 +129,13 @@ var (
)
func newTransitionState() *transitionState {
// fix minimum concurrent transition to 1 for single CPU setup
if globalTransitionConcurrent == 0 {
globalTransitionConcurrent = 1
}
ts := &transitionState{
return &transitionState{
transitionCh: make(chan ObjectInfo, 10000),
}
go func() {
<-GlobalContext.Done()
close(ts.transitionCh)
}()
return ts
}
// addWorker creates a new worker to process tasks

View file

@ -815,6 +815,7 @@ var (
// ReplicationPool describes replication pool
type ReplicationPool struct {
once sync.Once
mu sync.Mutex
size int
replicaCh chan ReplicateObjectInfo
@ -910,8 +911,10 @@ func (p *ReplicationPool) queueReplicaTask(ctx context.Context, ri ReplicateObje
}
select {
case <-ctx.Done():
close(p.replicaCh)
close(p.mrfReplicaCh)
p.once.Do(func() {
close(p.replicaCh)
close(p.mrfReplicaCh)
})
case p.replicaCh <- ri:
case p.mrfReplicaCh <- ri:
// queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations
@ -925,8 +928,10 @@ func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi Delete
}
select {
case <-ctx.Done():
close(p.replicaDeleteCh)
close(p.mrfReplicaDeleteCh)
p.once.Do(func() {
close(p.replicaDeleteCh)
close(p.mrfReplicaDeleteCh)
})
case p.replicaDeleteCh <- doi:
case p.mrfReplicaDeleteCh <- doi:
// queue all overflows into the mrfReplicaDeleteCh to handle incoming pending/failed operations

View file

@ -516,9 +516,9 @@ func serverMain(ctx *cli.Context) {
if globalIsErasure {
initAutoHeal(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
initBackgroundExpiry(GlobalContext, newObject)
}
initBackgroundExpiry(GlobalContext, newObject)
initDataScanner(GlobalContext, newObject)
if err = initServer(GlobalContext, newObject); err != nil {