Compare commits

...

2 commits

Author SHA1 Message Date
Sean Gillespie aab99a342a
Brief sketch using a bool to indicate a bail 2018-08-27 10:03:52 -07:00
Sean Gillespie 9cf2429644
plub diag.Sink everywhere 2018-08-24 15:27:23 -07:00
6 changed files with 64 additions and 65 deletions

View file

@ -194,7 +194,7 @@ func (res *planResult) Chdir() (func(), error) {
// Walk enumerates all steps in the plan, calling out to the provided action at each step. It returns four things: the // Walk enumerates all steps in the plan, calling out to the provided action at each step. It returns four things: the
// resulting Snapshot, no matter whether an error occurs or not; an error, if something went wrong; the step that // resulting Snapshot, no matter whether an error occurs or not; an error, if something went wrong; the step that
// failed, if the error is non-nil; and finally the state of the resource modified in the failing step. // failed, if the error is non-nil; and finally the state of the resource modified in the failing step.
func (res *planResult) Walk(cancelCtx *Context, events deploy.Events, preview bool) error { func (res *planResult) Walk(cancelCtx *Context, sink diag.Sink, events deploy.Events, preview bool) error {
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
done := make(chan bool) done := make(chan bool)
@ -206,7 +206,7 @@ func (res *planResult) Walk(cancelCtx *Context, events deploy.Events, preview bo
Refresh: res.Options.Refresh, Refresh: res.Options.Refresh,
RefreshOnly: res.Options.isRefresh, RefreshOnly: res.Options.isRefresh,
} }
err = res.Plan.Execute(ctx, opts, preview) err = res.Plan.Execute(ctx, sink, opts, preview)
close(done) close(done)
}() }()
@ -240,7 +240,7 @@ func printPlan(ctx *Context, result *planResult, dryRun bool) (ResourceChanges,
// Walk the plan's steps and and pretty-print them out. // Walk the plan's steps and and pretty-print them out.
actions := newPlanActions(result.Options) actions := newPlanActions(result.Options)
if err := result.Walk(ctx, actions, true); err != nil { if err := result.Walk(ctx, result.Plugctx.Diag, actions, true); err != nil {
return nil, errors.New("an error occurred while advancing the preview") return nil, errors.New("an error occurred while advancing the preview")
} }

View file

@ -159,7 +159,7 @@ func update(ctx *Context, info *planContext, opts planOptions, dryRun bool) (Res
start := time.Now() start := time.Now()
actions := newUpdateActions(ctx, info.Update, opts) actions := newUpdateActions(ctx, info.Update, opts)
err = result.Walk(ctx, actions, false) err = result.Walk(ctx, result.Plugctx.Diag, actions, false)
resourceChanges = ResourceChanges(actions.Ops) resourceChanges = ResourceChanges(actions.Ops)
if len(resourceChanges) != 0 { if len(resourceChanges) != 0 {

View file

@ -226,7 +226,6 @@ func NewPlan(ctx *plugin.Context, target *Target, prev *Snapshot, source Source,
func (p *Plan) Ctx() *plugin.Context { return p.ctx } func (p *Plan) Ctx() *plugin.Context { return p.ctx }
func (p *Plan) Target() *Target { return p.target } func (p *Plan) Target() *Target { return p.target }
func (p *Plan) Diag() diag.Sink { return p.ctx.Diag }
func (p *Plan) Prev() *Snapshot { return p.prev } func (p *Plan) Prev() *Snapshot { return p.prev }
func (p *Plan) Olds() map[resource.URN]*resource.State { return p.olds } func (p *Plan) Olds() map[resource.URN]*resource.State { return p.olds }
func (p *Plan) Source() Source { return p.source } func (p *Plan) Source() Source { return p.source }
@ -272,7 +271,7 @@ func (p *Plan) generateEventURN(event SourceEvent) resource.URN {
// Execute executes a plan to completion, using the given cancellation context and running a preview // Execute executes a plan to completion, using the given cancellation context and running a preview
// or update. // or update.
func (p *Plan) Execute(ctx context.Context, opts Options, preview bool) error { func (p *Plan) Execute(ctx context.Context, sink diag.Sink, opts Options, preview bool) error {
planExec := &planExecutor{plan: p} planExec := &planExecutor{plan: p}
return planExec.Execute(ctx, opts, preview) return planExec.Execute(ctx, sink, opts, preview)
} }

View file

@ -49,13 +49,13 @@ func execError(message string, preview bool) error {
} }
// reportError reports a single error to the executor's diag stream with the indicated URN for context. // reportError reports a single error to the executor's diag stream with the indicated URN for context.
func (pe *planExecutor) reportError(urn resource.URN, err error) { func (pe *planExecutor) reportError(sink diag.Sink, urn resource.URN, err error) {
pe.plan.Diag().Errorf(diag.RawMessage(urn, err.Error())) sink.Errorf(diag.RawMessage(urn, err.Error()))
} }
// Execute executes a plan to completion, using the given cancellation context and running a preview // Execute executes a plan to completion, using the given cancellation context and running a preview
// or update. // or update.
func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview bool) error { func (pe *planExecutor) Execute(callerCtx context.Context, diagSink diag.Sink, opts Options, preview bool) error {
// Set up a goroutine that will signal cancellation to the plan's plugins if the caller context is cancelled. We do // Set up a goroutine that will signal cancellation to the plan's plugins if the caller context is cancelled. We do
// not hang this off of the context we create below because we do not want the failure of a single step to cause // not hang this off of the context we create below because we do not want the failure of a single step to cause
// other steps to fail. // other steps to fail.
@ -73,7 +73,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
// Before doing anything else, optionally refresh each resource in the base checkpoint. // Before doing anything else, optionally refresh each resource in the base checkpoint.
if opts.Refresh { if opts.Refresh {
if err := pe.refresh(callerCtx, opts, preview); err != nil { if err := pe.refresh(callerCtx, diagSink, opts, preview); err != nil {
return err return err
} }
if opts.RefreshOnly { if opts.RefreshOnly {
@ -93,7 +93,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
// Set up a step generator and executor for this plan. // Set up a step generator and executor for this plan.
pe.stepGen = newStepGenerator(pe.plan, opts) pe.stepGen = newStepGenerator(pe.plan, opts)
pe.stepExec = newStepExecutor(ctx, cancel, pe.plan, opts, preview) pe.stepExec = newStepExecutor(ctx, cancel, diagSink, opts, preview)
// We iterate the source in its own goroutine because iteration is blocking and we want the main loop to be able to // We iterate the source in its own goroutine because iteration is blocking and we want the main loop to be able to
// respond to cancellation requests promptly. // respond to cancellation requests promptly.
@ -133,7 +133,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
log.Infof("planExecutor.Execute(...): incoming event (nil? %v, %v)", event.Event == nil, event.Error) log.Infof("planExecutor.Execute(...): incoming event (nil? %v, %v)", event.Event == nil, event.Error)
if event.Error != nil { if event.Error != nil {
pe.reportError("", event.Error) pe.reportError(diagSink, "", event.Error)
cancel() cancel()
return false, event.Error return false, event.Error
} }
@ -153,9 +153,11 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
return false, nil return false, nil
} }
if eventErr := pe.handleSingleEvent(event.Event); eventErr != nil { if eventErr, bailed := pe.handleSingleEvent(diagSink, event.Event); eventErr != nil {
log.Infof("planExecutor.Execute(...): error handling event: %v", eventErr) log.Infof("planExecutor.Execute(...): error handling event: %v", eventErr)
pe.reportError(pe.plan.generateEventURN(event.Event), eventErr) if !bailed {
pe.reportError(diagSink, pe.plan.generateEventURN(event.Event), eventErr)
}
cancel() cancel()
return false, eventErr return false, eventErr
} }
@ -184,33 +186,34 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
// handleSingleEvent handles a single source event. For all incoming events, it produces a chain that needs // handleSingleEvent handles a single source event. For all incoming events, it produces a chain that needs
// to be executed and schedules the chain for execution. // to be executed and schedules the chain for execution.
func (pe *planExecutor) handleSingleEvent(event SourceEvent) error { func (pe *planExecutor) handleSingleEvent(sink diag.Sink, event SourceEvent) (error, bool) {
contract.Require(event != nil, "event != nil") contract.Require(event != nil, "event != nil")
var steps []Step var steps []Step
var err error var err error
var bailed bool
switch e := event.(type) { switch e := event.(type) {
case RegisterResourceEvent: case RegisterResourceEvent:
log.Infof("planExecutor.handleSingleEvent(...): received RegisterResourceEvent") log.Infof("planExecutor.handleSingleEvent(...): received RegisterResourceEvent")
steps, err = pe.stepGen.GenerateSteps(e) steps, err, bailed = pe.stepGen.GenerateSteps(sink, e)
case ReadResourceEvent: case ReadResourceEvent:
log.Infof("planExecutor.handleSingleEvent(...): received ReadResourceEvent") log.Infof("planExecutor.handleSingleEvent(...): received ReadResourceEvent")
steps, err = pe.stepGen.GenerateReadSteps(e) steps, err, bailed = pe.stepGen.GenerateReadSteps(e)
case RegisterResourceOutputsEvent: case RegisterResourceOutputsEvent:
log.Infof("planExecutor.handleSingleEvent(...): received register resource outputs") log.Infof("planExecutor.handleSingleEvent(...): received register resource outputs")
pe.stepExec.ExecuteRegisterResourceOutputs(e) pe.stepExec.ExecuteRegisterResourceOutputs(sink, e)
return nil return nil, false
} }
if err != nil { if err != nil {
return err return err, bailed
} }
pe.stepExec.Execute(steps) pe.stepExec.Execute(steps)
return nil return nil, false
} }
// refresh refreshes the state of the base checkpoint file for the current plan in memory. // refresh refreshes the state of the base checkpoint file for the current plan in memory.
func (pe *planExecutor) refresh(callerCtx context.Context, opts Options, preview bool) error { func (pe *planExecutor) refresh(callerCtx context.Context, sink diag.Sink, opts Options, preview bool) error {
prev := pe.plan.prev prev := pe.plan.prev
if prev == nil || len(prev.Resources) == 0 { if prev == nil || len(prev.Resources) == 0 {
return nil return nil
@ -218,7 +221,7 @@ func (pe *planExecutor) refresh(callerCtx context.Context, opts Options, preview
// Fire up a worker pool and issue a refresh step per resource in the old snapshot. // Fire up a worker pool and issue a refresh step per resource in the old snapshot.
ctx, cancel := context.WithCancel(callerCtx) ctx, cancel := context.WithCancel(callerCtx)
stepExec := newStepExecutor(ctx, cancel, pe.plan, opts, preview) stepExec := newStepExecutor(ctx, cancel, sink, opts, preview)
canceled := false canceled := false
steps := make([]Step, len(prev.Resources)) steps := make([]Step, len(prev.Resources))

View file

@ -52,7 +52,6 @@ type Chain = []Step
// resolved, we (the engine) can assume that any chain given to us by the step generator is already // resolved, we (the engine) can assume that any chain given to us by the step generator is already
// ready to execute. // ready to execute.
type stepExecutor struct { type stepExecutor struct {
plan *Plan // The plan currently being executed.
opts Options // The options for this current plan. opts Options // The options for this current plan.
preview bool // Whether or not we are doing a preview. preview bool // Whether or not we are doing a preview.
pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs. pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs.
@ -83,7 +82,7 @@ func (se *stepExecutor) Execute(chain Chain) {
} }
// ExecuteRegisterResourceOutputs services a RegisterResourceOutputsEvent synchronously on the calling goroutine. // ExecuteRegisterResourceOutputs services a RegisterResourceOutputsEvent synchronously on the calling goroutine.
func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputsEvent) { func (se *stepExecutor) ExecuteRegisterResourceOutputs(sink diag.Sink, e RegisterResourceOutputsEvent) {
// Look up the final state in the pending registration list. // Look up the final state in the pending registration list.
urn := e.URN() urn := e.URN()
value, has := se.pendingNews.Load(urn) value, has := se.pendingNews.Load(urn)
@ -110,7 +109,7 @@ func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputs
// of these are particularly appealing right now. // of these are particularly appealing right now.
outErr := errors.Wrap(eventerr, "resource complete event returned an error") outErr := errors.Wrap(eventerr, "resource complete event returned an error")
diagMsg := diag.RawMessage(reg.URN(), outErr.Error()) diagMsg := diag.RawMessage(reg.URN(), outErr.Error())
se.plan.Diag().Errorf(diagMsg) sink.Errorf(diagMsg)
se.cancelDueToError() se.cancelDueToError()
return return
} }
@ -145,7 +144,7 @@ func (se *stepExecutor) WaitForCompletion() {
// executeChain executes a chain, one step at a time. If any step in the chain fails to execute, or if the // executeChain executes a chain, one step at a time. If any step in the chain fails to execute, or if the
// context is canceled, the chain stops execution. // context is canceled, the chain stops execution.
func (se *stepExecutor) executeChain(workerID int, chain Chain) { func (se *stepExecutor) executeChain(sink diag.Sink, workerID int, chain Chain) {
for _, step := range chain { for _, step := range chain {
select { select {
case <-se.ctx.Done(): case <-se.ctx.Done():
@ -164,7 +163,7 @@ func (se *stepExecutor) executeChain(workerID int, chain Chain) {
// The errStepApplyFailed sentinel signals that the error that failed this chain was a step apply // The errStepApplyFailed sentinel signals that the error that failed this chain was a step apply
// error and that we shouldn't log it. Everything else should be logged to the diag system as usual. // error and that we shouldn't log it. Everything else should be logged to the diag system as usual.
diagMsg := diag.RawMessage(step.URN(), err.Error()) diagMsg := diag.RawMessage(step.URN(), err.Error())
se.plan.Diag().Errorf(diagMsg) sink.Errorf(diagMsg)
} }
return return
} }
@ -262,7 +261,7 @@ func (se *stepExecutor) log(workerID int, msg string, args ...interface{}) {
// worker is the base function for all step executor worker goroutines. It continuously polls for new chains // worker is the base function for all step executor worker goroutines. It continuously polls for new chains
// and executes any that it gets from the channel. // and executes any that it gets from the channel.
func (se *stepExecutor) worker(workerID int) { func (se *stepExecutor) worker(sink diag.Sink, workerID int) {
se.log(workerID, "worker coming online") se.log(workerID, "worker coming online")
defer se.workers.Done() defer se.workers.Done()
@ -276,7 +275,7 @@ func (se *stepExecutor) worker(workerID int) {
} }
se.log(workerID, "worker received chain for execution") se.log(workerID, "worker received chain for execution")
se.executeChain(workerID, chain) se.executeChain(sink, workerID, chain)
case <-se.ctx.Done(): case <-se.ctx.Done():
se.log(workerID, "worker exiting due to cancellation") se.log(workerID, "worker exiting due to cancellation")
return return
@ -284,10 +283,9 @@ func (se *stepExecutor) worker(workerID int) {
} }
} }
func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan, opts Options, func newStepExecutor(ctx context.Context, cancel context.CancelFunc, sink diag.Sink, opts Options,
preview bool) *stepExecutor { preview bool) *stepExecutor {
exec := &stepExecutor{ exec := &stepExecutor{
plan: plan,
opts: opts, opts: opts,
preview: preview, preview: preview,
incomingChains: make(chan Chain), incomingChains: make(chan Chain),
@ -299,7 +297,7 @@ func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan,
fanout := opts.DegreeOfParallelism() fanout := opts.DegreeOfParallelism()
for i := 0; i < fanout; i++ { for i := 0; i < fanout; i++ {
exec.workers.Add(1) exec.workers.Add(1)
go exec.worker(i) go exec.worker(sink, i)
} }
return exec return exec

View file

@ -44,7 +44,7 @@ type stepGenerator struct {
// GenerateReadSteps is responsible for producing one or more steps required to service // GenerateReadSteps is responsible for producing one or more steps required to service
// a ReadResourceEvent coming from the language host. // a ReadResourceEvent coming from the language host.
func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, error) { func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, error, bool) {
urn := sg.plan.generateURN(event.Parent(), event.Type(), event.Name()) urn := sg.plan.generateURN(event.Parent(), event.Type(), event.Name())
newState := resource.NewState(event.Type(), newState := resource.NewState(event.Type(),
urn, urn,
@ -81,7 +81,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
return []Step{ return []Step{
NewReadReplacementStep(sg.plan, event, old, newState), NewReadReplacementStep(sg.plan, event, old, newState),
NewReplaceStep(sg.plan, old, newState, nil, true), NewReplaceStep(sg.plan, old, newState, nil, true),
}, nil }, nil, false
} }
if bool(logging.V(7)) && hasOld && old.ID == event.ID() { if bool(logging.V(7)) && hasOld && old.ID == event.ID() {
@ -91,7 +91,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
sg.reads[urn] = true sg.reads[urn] = true
return []Step{ return []Step{
NewReadStep(sg.plan, event, old, newState), NewReadStep(sg.plan, event, old, newState),
}, nil }, nil, false
} }
// GenerateSteps produces one or more steps required to achieve the goal state // GenerateSteps produces one or more steps required to achieve the goal state
@ -100,7 +100,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
// If the given resource is a custom resource, the step generator will invoke Diff // If the given resource is a custom resource, the step generator will invoke Diff
// and Check on the provider associated with that resource. If those fail, an error // and Check on the provider associated with that resource. If those fail, an error
// is returned. // is returned.
func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, error) { func (sg *stepGenerator) GenerateSteps(sink diag.Sink, event RegisterResourceEvent) ([]Step, error, bool) {
var invalid bool // will be set to true if this object fails validation. var invalid bool // will be set to true if this object fails validation.
goal := event.Goal() goal := event.Goal()
@ -109,7 +109,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
if sg.urns[urn] { if sg.urns[urn] {
invalid = true invalid = true
// TODO[pulumi/pulumi-framework#19]: improve this error message! // TODO[pulumi/pulumi-framework#19]: improve this error message!
sg.plan.Diag().Errorf(diag.GetDuplicateResourceURNError(urn), urn) sink.Errorf(diag.GetDuplicateResourceURNError(urn), urn)
} }
sg.urns[urn] = true sg.urns[urn] = true
@ -141,11 +141,11 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
ref, refErr := providers.ParseReference(goal.Provider) ref, refErr := providers.ParseReference(goal.Provider)
if refErr != nil { if refErr != nil {
return nil, errors.Errorf( return nil, errors.Errorf(
"bad provider reference '%v' for resource '%v': %v", goal.Provider, urn, refErr) "bad provider reference '%v' for resource '%v': %v", goal.Provider, urn, refErr), false
} }
p, ok := sg.plan.GetProvider(ref) p, ok := sg.plan.GetProvider(ref)
if !ok { if !ok {
return nil, errors.Errorf("unknown provider '%v' for resource '%v'", ref, urn) return nil, errors.Errorf("unknown provider '%v' for resource '%v'", ref, urn), false
} }
prov = p prov = p
} }
@ -174,8 +174,8 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
} }
if err != nil { if err != nil {
return nil, err return nil, err, false
} else if sg.issueCheckErrors(new, urn, failures) { } else if sg.issueCheckErrors(sink, new, urn, failures) {
invalid = true invalid = true
} }
new.Inputs = inputs new.Inputs = inputs
@ -186,25 +186,24 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
var analyzer plugin.Analyzer var analyzer plugin.Analyzer
analyzer, err = sg.plan.ctx.Host.Analyzer(a) analyzer, err = sg.plan.ctx.Host.Analyzer(a)
if err != nil { if err != nil {
return nil, err return nil, err, false
} else if analyzer == nil { } else if analyzer == nil {
return nil, errors.Errorf("analyzer '%v' could not be loaded from your $PATH", a) return nil, errors.Errorf("analyzer '%v' could not be loaded from your $PATH", a), false
} }
var failures []plugin.AnalyzeFailure var failures []plugin.AnalyzeFailure
failures, err = analyzer.Analyze(new.Type, inputs) failures, err = analyzer.Analyze(new.Type, inputs)
if err != nil { if err != nil {
return nil, err return nil, err, false
} }
for _, failure := range failures { for _, failure := range failures {
invalid = true invalid = true
sg.plan.Diag().Errorf( sink.Errorf(diag.GetAnalyzeResourceFailureError(urn), a, urn, failure.Property, failure.Reason)
diag.GetAnalyzeResourceFailureError(urn), a, urn, failure.Property, failure.Reason)
} }
} }
// If the resource isn't valid, don't proceed any further. // If the resource isn't valid, don't proceed any further.
if invalid { if invalid {
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed") return nil, errors.New("One or more resource validation errors occurred; refusing to proceed"), true
} }
// There are four cases we need to consider when figuring out what to do with this resource. // There are four cases we need to consider when figuring out what to do with this resource.
@ -231,7 +230,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
return []Step{ return []Step{
NewReplaceStep(sg.plan, old, new, nil, false), NewReplaceStep(sg.plan, old, new, nil, false),
NewCreateReplacementStep(sg.plan, event, old, new, nil, false), NewCreateReplacementStep(sg.plan, event, old, new, nil, false),
}, nil }, nil, false
} }
// Case 2: wasExternal // Case 2: wasExternal
@ -246,13 +245,13 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
logging.V(7).Infof("Planner recognized '%s' as old external resource, creating instead", urn) logging.V(7).Infof("Planner recognized '%s' as old external resource, creating instead", urn)
sg.creates[urn] = true sg.creates[urn] = true
if err != nil { if err != nil {
return nil, err return nil, err, false
} }
return []Step{ return []Step{
NewCreateReplacementStep(sg.plan, event, old, new, nil, true), NewCreateReplacementStep(sg.plan, event, old, new, nil, true),
NewReplaceStep(sg.plan, old, new, nil, true), NewReplaceStep(sg.plan, old, new, nil, true),
}, nil }, nil, false
} }
// Case 3: hasOld // Case 3: hasOld
@ -274,7 +273,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
// Determine whether the change resulted in a diff. // Determine whether the change resulted in a diff.
d, diffErr := sg.diff(urn, old.ID, oldInputs, oldOutputs, inputs, prov, allowUnknowns) d, diffErr := sg.diff(urn, old.ID, oldInputs, oldOutputs, inputs, prov, allowUnknowns)
if diffErr != nil { if diffErr != nil {
return nil, diffErr return nil, diffErr, false
} }
diff = d diff = d
} }
@ -282,7 +281,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
// Ensure that we received a sensible response. // Ensure that we received a sensible response.
if diff.Changes != plugin.DiffNone && diff.Changes != plugin.DiffSome { if diff.Changes != plugin.DiffNone && diff.Changes != plugin.DiffSome {
return nil, errors.Errorf( return nil, errors.Errorf(
"unrecognized diff state for %s: %d", urn, diff.Changes) "unrecognized diff state for %s: %d", urn, diff.Changes), false
} }
// If there were changes, check for a replacement vs. an in-place update. // If there were changes, check for a replacement vs. an in-place update.
@ -296,9 +295,9 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
var failures []plugin.CheckFailure var failures []plugin.CheckFailure
inputs, failures, err = prov.Check(urn, nil, goal.Properties, allowUnknowns) inputs, failures, err = prov.Check(urn, nil, goal.Properties, allowUnknowns)
if err != nil { if err != nil {
return nil, err return nil, err, false
} else if sg.issueCheckErrors(new, urn, failures) { } else if sg.issueCheckErrors(sink, new, urn, failures) {
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed") return nil, errors.New("One or more resource validation errors occurred; refusing to proceed"), true
} }
new.Inputs = inputs new.Inputs = inputs
} }
@ -358,14 +357,14 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
NewDeleteReplacementStep(sg.plan, old, false), NewDeleteReplacementStep(sg.plan, old, false),
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, false), NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, false),
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, false), NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, false),
), nil ), nil, false
} }
return []Step{ return []Step{
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, true), NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, true),
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, true), NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, true),
// note that the delete step is generated "later" on, after all creates/updates finish. // note that the delete step is generated "later" on, after all creates/updates finish.
}, nil }, nil, false
} }
// If we fell through, it's an update. // If we fell through, it's an update.
@ -373,7 +372,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
if logging.V(7) { if logging.V(7) {
logging.V(7).Infof("Planner decided to update '%v' (oldprops=%v inputs=%v", urn, oldInputs, new.Inputs) logging.V(7).Infof("Planner decided to update '%v' (oldprops=%v inputs=%v", urn, oldInputs, new.Inputs)
} }
return []Step{NewUpdateStep(sg.plan, event, old, new, diff.StableKeys)}, nil return []Step{NewUpdateStep(sg.plan, event, old, new, diff.StableKeys)}, nil, false
} }
// No need to update anything, the properties didn't change. // No need to update anything, the properties didn't change.
@ -381,7 +380,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
if logging.V(7) { if logging.V(7) {
logging.V(7).Infof("Planner decided not to update '%v' (same) (inputs=%v)", urn, new.Inputs) logging.V(7).Infof("Planner decided not to update '%v' (same) (inputs=%v)", urn, new.Inputs)
} }
return []Step{NewSameStep(sg.plan, event, old, new)}, nil return []Step{NewSameStep(sg.plan, event, old, new)}, nil, false
} }
// Case 4: Not Case 1, 2, or 3 // Case 4: Not Case 1, 2, or 3
@ -389,7 +388,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
// it's just being created. // it's just being created.
sg.creates[urn] = true sg.creates[urn] = true
logging.V(7).Infof("Planner decided to create '%v' (inputs=%v)", urn, new.Inputs) logging.V(7).Infof("Planner decided to create '%v' (inputs=%v)", urn, new.Inputs)
return []Step{NewCreateStep(sg.plan, event, new)}, nil return []Step{NewCreateStep(sg.plan, event, new)}, nil, false
} }
func (sg *stepGenerator) GenerateDeletes() []Step { func (sg *stepGenerator) GenerateDeletes() []Step {
@ -471,7 +470,7 @@ func (sg *stepGenerator) diff(urn resource.URN, id resource.ID, oldInputs, oldOu
} }
// issueCheckErrors prints any check errors to the diagnostics sink. // issueCheckErrors prints any check errors to the diagnostics sink.
func (sg *stepGenerator) issueCheckErrors(new *resource.State, urn resource.URN, func (sg *stepGenerator) issueCheckErrors(sink diag.Sink, new *resource.State, urn resource.URN,
failures []plugin.CheckFailure) bool { failures []plugin.CheckFailure) bool {
if len(failures) == 0 { if len(failures) == 0 {
return false return false
@ -479,10 +478,10 @@ func (sg *stepGenerator) issueCheckErrors(new *resource.State, urn resource.URN,
inputs := new.Inputs inputs := new.Inputs
for _, failure := range failures { for _, failure := range failures {
if failure.Property != "" { if failure.Property != "" {
sg.plan.Diag().Errorf(diag.GetResourcePropertyInvalidValueError(urn), sink.Errorf(diag.GetResourcePropertyInvalidValueError(urn),
new.Type, urn.Name(), failure.Property, inputs[failure.Property], failure.Reason) new.Type, urn.Name(), failure.Property, inputs[failure.Property], failure.Reason)
} else { } else {
sg.plan.Diag().Errorf( sink.Errorf(
diag.GetResourceInvalidError(urn), new.Type, urn.Name(), failure.Reason) diag.GetResourceInvalidError(urn), new.Type, urn.Name(), failure.Reason)
} }
} }