Fail refreshes with init errors. (#1882)
And ensure that refreshes continue on errors. Fixes #1881.
This commit is contained in:
parent
373bc25cfd
commit
df1a5e653d
|
@ -1119,6 +1119,14 @@ func TestExternalRefresh(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRefreshInitFailure(t *testing.T) {
|
func TestRefreshInitFailure(t *testing.T) {
|
||||||
|
p := &TestPlan{}
|
||||||
|
|
||||||
|
provURN := p.NewProviderURN("pkgA", "default", "")
|
||||||
|
resURN := p.NewURN("pkgA:m:typA", "resA", "")
|
||||||
|
res2URN := p.NewURN("pkgA:m:typA", "resB", "")
|
||||||
|
|
||||||
|
res2Outputs := resource.PropertyMap{"foo": resource.NewStringProperty("bar")}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Refresh will persist any initialization errors that are returned by `Read`. This provider
|
// Refresh will persist any initialization errors that are returned by `Read`. This provider
|
||||||
// will error out or not based on the value of `refreshShouldFail`.
|
// will error out or not based on the value of `refreshShouldFail`.
|
||||||
|
@ -1134,11 +1142,13 @@ func TestRefreshInitFailure(t *testing.T) {
|
||||||
ReadF: func(
|
ReadF: func(
|
||||||
urn resource.URN, id resource.ID, props resource.PropertyMap,
|
urn resource.URN, id resource.ID, props resource.PropertyMap,
|
||||||
) (resource.PropertyMap, resource.Status, error) {
|
) (resource.PropertyMap, resource.Status, error) {
|
||||||
if refreshShouldFail {
|
if refreshShouldFail && urn == resURN {
|
||||||
err := &plugin.InitError{
|
err := &plugin.InitError{
|
||||||
Reasons: []string{"Refresh reports continued to fail to initialize"},
|
Reasons: []string{"Refresh reports continued to fail to initialize"},
|
||||||
}
|
}
|
||||||
return resource.PropertyMap{}, resource.StatusPartialFailure, err
|
return resource.PropertyMap{}, resource.StatusPartialFailure, err
|
||||||
|
} else if urn == res2URN {
|
||||||
|
return res2Outputs, resource.StatusOK, nil
|
||||||
}
|
}
|
||||||
return resource.PropertyMap{}, resource.StatusOK, nil
|
return resource.PropertyMap{}, resource.StatusOK, nil
|
||||||
},
|
},
|
||||||
|
@ -1154,26 +1164,31 @@ func TestRefreshInitFailure(t *testing.T) {
|
||||||
})
|
})
|
||||||
host := deploytest.NewPluginHost(nil, nil, program, loaders...)
|
host := deploytest.NewPluginHost(nil, nil, program, loaders...)
|
||||||
|
|
||||||
p := &TestPlan{
|
p.Options.host = host
|
||||||
Options: UpdateOptions{host: host},
|
|
||||||
}
|
|
||||||
|
|
||||||
provURN := p.NewProviderURN("pkgA", "default", "")
|
|
||||||
resURN := p.NewURN("pkgA:m:typA", "resA", "")
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// Create an old snapshot with a single initialization failure.
|
// Create an old snapshot with a single initialization failure.
|
||||||
//
|
//
|
||||||
old := &deploy.Snapshot{
|
old := &deploy.Snapshot{
|
||||||
Resources: []*resource.State{{
|
Resources: []*resource.State{
|
||||||
Type: resURN.Type(),
|
{
|
||||||
URN: resURN,
|
Type: resURN.Type(),
|
||||||
Custom: true,
|
URN: resURN,
|
||||||
ID: "0",
|
Custom: true,
|
||||||
Inputs: resource.PropertyMap{},
|
ID: "0",
|
||||||
Outputs: resource.PropertyMap{},
|
Inputs: resource.PropertyMap{},
|
||||||
InitErrors: []string{"Resource failed to initialize"},
|
Outputs: resource.PropertyMap{},
|
||||||
}},
|
InitErrors: []string{"Resource failed to initialize"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: res2URN.Type(),
|
||||||
|
URN: res2URN,
|
||||||
|
Custom: true,
|
||||||
|
ID: "1",
|
||||||
|
Inputs: resource.PropertyMap{},
|
||||||
|
Outputs: resource.PropertyMap{},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
@ -1188,6 +1203,8 @@ func TestRefreshInitFailure(t *testing.T) {
|
||||||
// break
|
// break
|
||||||
case resURN:
|
case resURN:
|
||||||
assert.Empty(t, resource.InitErrors)
|
assert.Empty(t, resource.InitErrors)
|
||||||
|
case res2URN:
|
||||||
|
assert.Equal(t, res2Outputs, resource.Outputs)
|
||||||
default:
|
default:
|
||||||
t.Fatalf("unexpected resource %v", urn)
|
t.Fatalf("unexpected resource %v", urn)
|
||||||
}
|
}
|
||||||
|
@ -1197,7 +1214,7 @@ func TestRefreshInitFailure(t *testing.T) {
|
||||||
// Refresh DOES fail, causing the new initialization error to appear.
|
// Refresh DOES fail, causing the new initialization error to appear.
|
||||||
//
|
//
|
||||||
refreshShouldFail = true
|
refreshShouldFail = true
|
||||||
p.Steps = []TestStep{{Op: Refresh}}
|
p.Steps = []TestStep{{Op: Refresh, ExpectFailure: true}}
|
||||||
snap = p.Run(t, old)
|
snap = p.Run(t, old)
|
||||||
for _, resource := range snap.Resources {
|
for _, resource := range snap.Resources {
|
||||||
switch urn := resource.URN; urn {
|
switch urn := resource.URN; urn {
|
||||||
|
@ -1205,6 +1222,8 @@ func TestRefreshInitFailure(t *testing.T) {
|
||||||
// break
|
// break
|
||||||
case resURN:
|
case resURN:
|
||||||
assert.Equal(t, []string{"Refresh reports continued to fail to initialize"}, resource.InitErrors)
|
assert.Equal(t, []string{"Refresh reports continued to fail to initialize"}, resource.InitErrors)
|
||||||
|
case res2URN:
|
||||||
|
assert.Equal(t, res2Outputs, resource.Outputs)
|
||||||
default:
|
default:
|
||||||
t.Fatalf("unexpected resource %v", urn)
|
t.Fatalf("unexpected resource %v", urn)
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
|
||||||
|
|
||||||
// Set up a step generator and executor for this plan.
|
// Set up a step generator and executor for this plan.
|
||||||
pe.stepGen = newStepGenerator(pe.plan, opts)
|
pe.stepGen = newStepGenerator(pe.plan, opts)
|
||||||
pe.stepExec = newStepExecutor(ctx, cancel, pe.plan, opts, preview)
|
pe.stepExec = newStepExecutor(ctx, cancel, pe.plan, opts, preview, false)
|
||||||
|
|
||||||
// We iterate the source in its own goroutine because iteration is blocking and we want the main loop to be able to
|
// We iterate the source in its own goroutine because iteration is blocking and we want the main loop to be able to
|
||||||
// respond to cancellation requests promptly.
|
// respond to cancellation requests promptly.
|
||||||
|
@ -224,7 +224,7 @@ func (pe *planExecutor) refresh(callerCtx context.Context, opts Options, preview
|
||||||
|
|
||||||
// Fire up a worker pool and issue each refresh in turn.
|
// Fire up a worker pool and issue each refresh in turn.
|
||||||
ctx, cancel := context.WithCancel(callerCtx)
|
ctx, cancel := context.WithCancel(callerCtx)
|
||||||
stepExec := newStepExecutor(ctx, cancel, pe.plan, opts, preview)
|
stepExec := newStepExecutor(ctx, cancel, pe.plan, opts, preview, true)
|
||||||
for i := range steps {
|
for i := range steps {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
break
|
break
|
||||||
|
|
|
@ -607,9 +607,7 @@ func (s *RefreshStep) Apply(preview bool) (resource.Status, StepCompleteFunc, er
|
||||||
return rst, nil, err
|
return rst, nil, err
|
||||||
}
|
}
|
||||||
if initErr, isInitErr := err.(*plugin.InitError); isInitErr {
|
if initErr, isInitErr := err.(*plugin.InitError); isInitErr {
|
||||||
// We clear error in this case because we do not want the refresh to fail in the face of initialization
|
initErrors = initErr.Reasons
|
||||||
// errors.
|
|
||||||
initErrors, err = initErr.Reasons, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,10 +52,11 @@ type Chain = []Step
|
||||||
// resolved, we (the engine) can assume that any chain given to us by the step generator is already
|
// resolved, we (the engine) can assume that any chain given to us by the step generator is already
|
||||||
// ready to execute.
|
// ready to execute.
|
||||||
type stepExecutor struct {
|
type stepExecutor struct {
|
||||||
plan *Plan // The plan currently being executed.
|
plan *Plan // The plan currently being executed.
|
||||||
opts Options // The options for this current plan.
|
opts Options // The options for this current plan.
|
||||||
preview bool // Whether or not we are doing a preview.
|
preview bool // Whether or not we are doing a preview.
|
||||||
pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs.
|
pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs.
|
||||||
|
continueOnError bool // True if we want to continue the plan after a step error.
|
||||||
|
|
||||||
workers sync.WaitGroup // WaitGroup tracking the worker goroutines that are owned by this step executor.
|
workers sync.WaitGroup // WaitGroup tracking the worker goroutines that are owned by this step executor.
|
||||||
incomingChains chan Chain // Incoming chains that we are to execute
|
incomingChains chan Chain // Incoming chains that we are to execute
|
||||||
|
@ -173,7 +174,9 @@ func (se *stepExecutor) executeChain(workerID int, chain Chain) {
|
||||||
|
|
||||||
func (se *stepExecutor) cancelDueToError() {
|
func (se *stepExecutor) cancelDueToError() {
|
||||||
se.sawError.Store(true)
|
se.sawError.Store(true)
|
||||||
se.cancel()
|
if !se.continueOnError {
|
||||||
|
se.cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
@ -285,14 +288,15 @@ func (se *stepExecutor) worker(workerID int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan, opts Options,
|
func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan, opts Options,
|
||||||
preview bool) *stepExecutor {
|
preview, continueOnError bool) *stepExecutor {
|
||||||
exec := &stepExecutor{
|
exec := &stepExecutor{
|
||||||
plan: plan,
|
plan: plan,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
preview: preview,
|
preview: preview,
|
||||||
incomingChains: make(chan Chain),
|
continueOnError: continueOnError,
|
||||||
ctx: ctx,
|
incomingChains: make(chan Chain),
|
||||||
cancel: cancel,
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.sawError.Store(false)
|
exec.sawError.Store(false)
|
||||||
|
|
Loading…
Reference in a new issue