Compare commits
2 commits
master
...
swgillespi
Author | SHA1 | Date | |
---|---|---|---|
aab99a342a | |||
9cf2429644 |
|
@ -194,7 +194,7 @@ func (res *planResult) Chdir() (func(), error) {
|
||||||
// Walk enumerates all steps in the plan, calling out to the provided action at each step. It returns four things: the
|
// Walk enumerates all steps in the plan, calling out to the provided action at each step. It returns four things: the
|
||||||
// resulting Snapshot, no matter whether an error occurs or not; an error, if something went wrong; the step that
|
// resulting Snapshot, no matter whether an error occurs or not; an error, if something went wrong; the step that
|
||||||
// failed, if the error is non-nil; and finally the state of the resource modified in the failing step.
|
// failed, if the error is non-nil; and finally the state of the resource modified in the failing step.
|
||||||
func (res *planResult) Walk(cancelCtx *Context, events deploy.Events, preview bool) error {
|
func (res *planResult) Walk(cancelCtx *Context, sink diag.Sink, events deploy.Events, preview bool) error {
|
||||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||||
|
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
|
@ -206,7 +206,7 @@ func (res *planResult) Walk(cancelCtx *Context, events deploy.Events, preview bo
|
||||||
Refresh: res.Options.Refresh,
|
Refresh: res.Options.Refresh,
|
||||||
RefreshOnly: res.Options.isRefresh,
|
RefreshOnly: res.Options.isRefresh,
|
||||||
}
|
}
|
||||||
err = res.Plan.Execute(ctx, opts, preview)
|
err = res.Plan.Execute(ctx, sink, opts, preview)
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ func printPlan(ctx *Context, result *planResult, dryRun bool) (ResourceChanges,
|
||||||
|
|
||||||
// Walk the plan's steps and and pretty-print them out.
|
// Walk the plan's steps and and pretty-print them out.
|
||||||
actions := newPlanActions(result.Options)
|
actions := newPlanActions(result.Options)
|
||||||
if err := result.Walk(ctx, actions, true); err != nil {
|
if err := result.Walk(ctx, result.Plugctx.Diag, actions, true); err != nil {
|
||||||
return nil, errors.New("an error occurred while advancing the preview")
|
return nil, errors.New("an error occurred while advancing the preview")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,7 @@ func update(ctx *Context, info *planContext, opts planOptions, dryRun bool) (Res
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
actions := newUpdateActions(ctx, info.Update, opts)
|
actions := newUpdateActions(ctx, info.Update, opts)
|
||||||
|
|
||||||
err = result.Walk(ctx, actions, false)
|
err = result.Walk(ctx, result.Plugctx.Diag, actions, false)
|
||||||
resourceChanges = ResourceChanges(actions.Ops)
|
resourceChanges = ResourceChanges(actions.Ops)
|
||||||
|
|
||||||
if len(resourceChanges) != 0 {
|
if len(resourceChanges) != 0 {
|
||||||
|
|
|
@ -226,7 +226,6 @@ func NewPlan(ctx *plugin.Context, target *Target, prev *Snapshot, source Source,
|
||||||
|
|
||||||
func (p *Plan) Ctx() *plugin.Context { return p.ctx }
|
func (p *Plan) Ctx() *plugin.Context { return p.ctx }
|
||||||
func (p *Plan) Target() *Target { return p.target }
|
func (p *Plan) Target() *Target { return p.target }
|
||||||
func (p *Plan) Diag() diag.Sink { return p.ctx.Diag }
|
|
||||||
func (p *Plan) Prev() *Snapshot { return p.prev }
|
func (p *Plan) Prev() *Snapshot { return p.prev }
|
||||||
func (p *Plan) Olds() map[resource.URN]*resource.State { return p.olds }
|
func (p *Plan) Olds() map[resource.URN]*resource.State { return p.olds }
|
||||||
func (p *Plan) Source() Source { return p.source }
|
func (p *Plan) Source() Source { return p.source }
|
||||||
|
@ -272,7 +271,7 @@ func (p *Plan) generateEventURN(event SourceEvent) resource.URN {
|
||||||
|
|
||||||
// Execute executes a plan to completion, using the given cancellation context and running a preview
|
// Execute executes a plan to completion, using the given cancellation context and running a preview
|
||||||
// or update.
|
// or update.
|
||||||
func (p *Plan) Execute(ctx context.Context, opts Options, preview bool) error {
|
func (p *Plan) Execute(ctx context.Context, sink diag.Sink, opts Options, preview bool) error {
|
||||||
planExec := &planExecutor{plan: p}
|
planExec := &planExecutor{plan: p}
|
||||||
return planExec.Execute(ctx, opts, preview)
|
return planExec.Execute(ctx, sink, opts, preview)
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,13 +49,13 @@ func execError(message string, preview bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// reportError reports a single error to the executor's diag stream with the indicated URN for context.
|
// reportError reports a single error to the executor's diag stream with the indicated URN for context.
|
||||||
func (pe *planExecutor) reportError(urn resource.URN, err error) {
|
func (pe *planExecutor) reportError(sink diag.Sink, urn resource.URN, err error) {
|
||||||
pe.plan.Diag().Errorf(diag.RawMessage(urn, err.Error()))
|
sink.Errorf(diag.RawMessage(urn, err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute executes a plan to completion, using the given cancellation context and running a preview
|
// Execute executes a plan to completion, using the given cancellation context and running a preview
|
||||||
// or update.
|
// or update.
|
||||||
func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview bool) error {
|
func (pe *planExecutor) Execute(callerCtx context.Context, diagSink diag.Sink, opts Options, preview bool) error {
|
||||||
// Set up a goroutine that will signal cancellation to the plan's plugins if the caller context is cancelled. We do
|
// Set up a goroutine that will signal cancellation to the plan's plugins if the caller context is cancelled. We do
|
||||||
// not hang this off of the context we create below because we do not want the failure of a single step to cause
|
// not hang this off of the context we create below because we do not want the failure of a single step to cause
|
||||||
// other steps to fail.
|
// other steps to fail.
|
||||||
|
@ -73,7 +73,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
|
||||||
|
|
||||||
// Before doing anything else, optionally refresh each resource in the base checkpoint.
|
// Before doing anything else, optionally refresh each resource in the base checkpoint.
|
||||||
if opts.Refresh {
|
if opts.Refresh {
|
||||||
if err := pe.refresh(callerCtx, opts, preview); err != nil {
|
if err := pe.refresh(callerCtx, diagSink, opts, preview); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if opts.RefreshOnly {
|
if opts.RefreshOnly {
|
||||||
|
@ -93,7 +93,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
|
||||||
|
|
||||||
// Set up a step generator and executor for this plan.
|
// Set up a step generator and executor for this plan.
|
||||||
pe.stepGen = newStepGenerator(pe.plan, opts)
|
pe.stepGen = newStepGenerator(pe.plan, opts)
|
||||||
pe.stepExec = newStepExecutor(ctx, cancel, pe.plan, opts, preview)
|
pe.stepExec = newStepExecutor(ctx, cancel, diagSink, opts, preview)
|
||||||
|
|
||||||
// We iterate the source in its own goroutine because iteration is blocking and we want the main loop to be able to
|
// We iterate the source in its own goroutine because iteration is blocking and we want the main loop to be able to
|
||||||
// respond to cancellation requests promptly.
|
// respond to cancellation requests promptly.
|
||||||
|
@ -133,7 +133,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
|
||||||
log.Infof("planExecutor.Execute(...): incoming event (nil? %v, %v)", event.Event == nil, event.Error)
|
log.Infof("planExecutor.Execute(...): incoming event (nil? %v, %v)", event.Event == nil, event.Error)
|
||||||
|
|
||||||
if event.Error != nil {
|
if event.Error != nil {
|
||||||
pe.reportError("", event.Error)
|
pe.reportError(diagSink, "", event.Error)
|
||||||
cancel()
|
cancel()
|
||||||
return false, event.Error
|
return false, event.Error
|
||||||
}
|
}
|
||||||
|
@ -153,9 +153,11 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if eventErr := pe.handleSingleEvent(event.Event); eventErr != nil {
|
if eventErr, bailed := pe.handleSingleEvent(diagSink, event.Event); eventErr != nil {
|
||||||
log.Infof("planExecutor.Execute(...): error handling event: %v", eventErr)
|
log.Infof("planExecutor.Execute(...): error handling event: %v", eventErr)
|
||||||
pe.reportError(pe.plan.generateEventURN(event.Event), eventErr)
|
if !bailed {
|
||||||
|
pe.reportError(diagSink, pe.plan.generateEventURN(event.Event), eventErr)
|
||||||
|
}
|
||||||
cancel()
|
cancel()
|
||||||
return false, eventErr
|
return false, eventErr
|
||||||
}
|
}
|
||||||
|
@ -184,33 +186,34 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
|
||||||
|
|
||||||
// handleSingleEvent handles a single source event. For all incoming events, it produces a chain that needs
|
// handleSingleEvent handles a single source event. For all incoming events, it produces a chain that needs
|
||||||
// to be executed and schedules the chain for execution.
|
// to be executed and schedules the chain for execution.
|
||||||
func (pe *planExecutor) handleSingleEvent(event SourceEvent) error {
|
func (pe *planExecutor) handleSingleEvent(sink diag.Sink, event SourceEvent) (error, bool) {
|
||||||
contract.Require(event != nil, "event != nil")
|
contract.Require(event != nil, "event != nil")
|
||||||
|
|
||||||
var steps []Step
|
var steps []Step
|
||||||
var err error
|
var err error
|
||||||
|
var bailed bool
|
||||||
switch e := event.(type) {
|
switch e := event.(type) {
|
||||||
case RegisterResourceEvent:
|
case RegisterResourceEvent:
|
||||||
log.Infof("planExecutor.handleSingleEvent(...): received RegisterResourceEvent")
|
log.Infof("planExecutor.handleSingleEvent(...): received RegisterResourceEvent")
|
||||||
steps, err = pe.stepGen.GenerateSteps(e)
|
steps, err, bailed = pe.stepGen.GenerateSteps(sink, e)
|
||||||
case ReadResourceEvent:
|
case ReadResourceEvent:
|
||||||
log.Infof("planExecutor.handleSingleEvent(...): received ReadResourceEvent")
|
log.Infof("planExecutor.handleSingleEvent(...): received ReadResourceEvent")
|
||||||
steps, err = pe.stepGen.GenerateReadSteps(e)
|
steps, err, bailed = pe.stepGen.GenerateReadSteps(e)
|
||||||
case RegisterResourceOutputsEvent:
|
case RegisterResourceOutputsEvent:
|
||||||
log.Infof("planExecutor.handleSingleEvent(...): received register resource outputs")
|
log.Infof("planExecutor.handleSingleEvent(...): received register resource outputs")
|
||||||
pe.stepExec.ExecuteRegisterResourceOutputs(e)
|
pe.stepExec.ExecuteRegisterResourceOutputs(sink, e)
|
||||||
return nil
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err, bailed
|
||||||
}
|
}
|
||||||
pe.stepExec.Execute(steps)
|
pe.stepExec.Execute(steps)
|
||||||
return nil
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// refresh refreshes the state of the base checkpoint file for the current plan in memory.
|
// refresh refreshes the state of the base checkpoint file for the current plan in memory.
|
||||||
func (pe *planExecutor) refresh(callerCtx context.Context, opts Options, preview bool) error {
|
func (pe *planExecutor) refresh(callerCtx context.Context, sink diag.Sink, opts Options, preview bool) error {
|
||||||
prev := pe.plan.prev
|
prev := pe.plan.prev
|
||||||
if prev == nil || len(prev.Resources) == 0 {
|
if prev == nil || len(prev.Resources) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -218,7 +221,7 @@ func (pe *planExecutor) refresh(callerCtx context.Context, opts Options, preview
|
||||||
|
|
||||||
// Fire up a worker pool and issue a refresh step per resource in the old snapshot.
|
// Fire up a worker pool and issue a refresh step per resource in the old snapshot.
|
||||||
ctx, cancel := context.WithCancel(callerCtx)
|
ctx, cancel := context.WithCancel(callerCtx)
|
||||||
stepExec := newStepExecutor(ctx, cancel, pe.plan, opts, preview)
|
stepExec := newStepExecutor(ctx, cancel, sink, opts, preview)
|
||||||
|
|
||||||
canceled := false
|
canceled := false
|
||||||
steps := make([]Step, len(prev.Resources))
|
steps := make([]Step, len(prev.Resources))
|
||||||
|
|
|
@ -52,7 +52,6 @@ type Chain = []Step
|
||||||
// resolved, we (the engine) can assume that any chain given to us by the step generator is already
|
// resolved, we (the engine) can assume that any chain given to us by the step generator is already
|
||||||
// ready to execute.
|
// ready to execute.
|
||||||
type stepExecutor struct {
|
type stepExecutor struct {
|
||||||
plan *Plan // The plan currently being executed.
|
|
||||||
opts Options // The options for this current plan.
|
opts Options // The options for this current plan.
|
||||||
preview bool // Whether or not we are doing a preview.
|
preview bool // Whether or not we are doing a preview.
|
||||||
pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs.
|
pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs.
|
||||||
|
@ -83,7 +82,7 @@ func (se *stepExecutor) Execute(chain Chain) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteRegisterResourceOutputs services a RegisterResourceOutputsEvent synchronously on the calling goroutine.
|
// ExecuteRegisterResourceOutputs services a RegisterResourceOutputsEvent synchronously on the calling goroutine.
|
||||||
func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputsEvent) {
|
func (se *stepExecutor) ExecuteRegisterResourceOutputs(sink diag.Sink, e RegisterResourceOutputsEvent) {
|
||||||
// Look up the final state in the pending registration list.
|
// Look up the final state in the pending registration list.
|
||||||
urn := e.URN()
|
urn := e.URN()
|
||||||
value, has := se.pendingNews.Load(urn)
|
value, has := se.pendingNews.Load(urn)
|
||||||
|
@ -110,7 +109,7 @@ func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputs
|
||||||
// of these are particularly appealing right now.
|
// of these are particularly appealing right now.
|
||||||
outErr := errors.Wrap(eventerr, "resource complete event returned an error")
|
outErr := errors.Wrap(eventerr, "resource complete event returned an error")
|
||||||
diagMsg := diag.RawMessage(reg.URN(), outErr.Error())
|
diagMsg := diag.RawMessage(reg.URN(), outErr.Error())
|
||||||
se.plan.Diag().Errorf(diagMsg)
|
sink.Errorf(diagMsg)
|
||||||
se.cancelDueToError()
|
se.cancelDueToError()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -145,7 +144,7 @@ func (se *stepExecutor) WaitForCompletion() {
|
||||||
|
|
||||||
// executeChain executes a chain, one step at a time. If any step in the chain fails to execute, or if the
|
// executeChain executes a chain, one step at a time. If any step in the chain fails to execute, or if the
|
||||||
// context is canceled, the chain stops execution.
|
// context is canceled, the chain stops execution.
|
||||||
func (se *stepExecutor) executeChain(workerID int, chain Chain) {
|
func (se *stepExecutor) executeChain(sink diag.Sink, workerID int, chain Chain) {
|
||||||
for _, step := range chain {
|
for _, step := range chain {
|
||||||
select {
|
select {
|
||||||
case <-se.ctx.Done():
|
case <-se.ctx.Done():
|
||||||
|
@ -164,7 +163,7 @@ func (se *stepExecutor) executeChain(workerID int, chain Chain) {
|
||||||
// The errStepApplyFailed sentinel signals that the error that failed this chain was a step apply
|
// The errStepApplyFailed sentinel signals that the error that failed this chain was a step apply
|
||||||
// error and that we shouldn't log it. Everything else should be logged to the diag system as usual.
|
// error and that we shouldn't log it. Everything else should be logged to the diag system as usual.
|
||||||
diagMsg := diag.RawMessage(step.URN(), err.Error())
|
diagMsg := diag.RawMessage(step.URN(), err.Error())
|
||||||
se.plan.Diag().Errorf(diagMsg)
|
sink.Errorf(diagMsg)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -262,7 +261,7 @@ func (se *stepExecutor) log(workerID int, msg string, args ...interface{}) {
|
||||||
|
|
||||||
// worker is the base function for all step executor worker goroutines. It continuously polls for new chains
|
// worker is the base function for all step executor worker goroutines. It continuously polls for new chains
|
||||||
// and executes any that it gets from the channel.
|
// and executes any that it gets from the channel.
|
||||||
func (se *stepExecutor) worker(workerID int) {
|
func (se *stepExecutor) worker(sink diag.Sink, workerID int) {
|
||||||
se.log(workerID, "worker coming online")
|
se.log(workerID, "worker coming online")
|
||||||
defer se.workers.Done()
|
defer se.workers.Done()
|
||||||
|
|
||||||
|
@ -276,7 +275,7 @@ func (se *stepExecutor) worker(workerID int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
se.log(workerID, "worker received chain for execution")
|
se.log(workerID, "worker received chain for execution")
|
||||||
se.executeChain(workerID, chain)
|
se.executeChain(sink, workerID, chain)
|
||||||
case <-se.ctx.Done():
|
case <-se.ctx.Done():
|
||||||
se.log(workerID, "worker exiting due to cancellation")
|
se.log(workerID, "worker exiting due to cancellation")
|
||||||
return
|
return
|
||||||
|
@ -284,10 +283,9 @@ func (se *stepExecutor) worker(workerID int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan, opts Options,
|
func newStepExecutor(ctx context.Context, cancel context.CancelFunc, sink diag.Sink, opts Options,
|
||||||
preview bool) *stepExecutor {
|
preview bool) *stepExecutor {
|
||||||
exec := &stepExecutor{
|
exec := &stepExecutor{
|
||||||
plan: plan,
|
|
||||||
opts: opts,
|
opts: opts,
|
||||||
preview: preview,
|
preview: preview,
|
||||||
incomingChains: make(chan Chain),
|
incomingChains: make(chan Chain),
|
||||||
|
@ -299,7 +297,7 @@ func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan,
|
||||||
fanout := opts.DegreeOfParallelism()
|
fanout := opts.DegreeOfParallelism()
|
||||||
for i := 0; i < fanout; i++ {
|
for i := 0; i < fanout; i++ {
|
||||||
exec.workers.Add(1)
|
exec.workers.Add(1)
|
||||||
go exec.worker(i)
|
go exec.worker(sink, i)
|
||||||
}
|
}
|
||||||
|
|
||||||
return exec
|
return exec
|
||||||
|
|
|
@ -44,7 +44,7 @@ type stepGenerator struct {
|
||||||
|
|
||||||
// GenerateReadSteps is responsible for producing one or more steps required to service
|
// GenerateReadSteps is responsible for producing one or more steps required to service
|
||||||
// a ReadResourceEvent coming from the language host.
|
// a ReadResourceEvent coming from the language host.
|
||||||
func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, error) {
|
func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, error, bool) {
|
||||||
urn := sg.plan.generateURN(event.Parent(), event.Type(), event.Name())
|
urn := sg.plan.generateURN(event.Parent(), event.Type(), event.Name())
|
||||||
newState := resource.NewState(event.Type(),
|
newState := resource.NewState(event.Type(),
|
||||||
urn,
|
urn,
|
||||||
|
@ -81,7 +81,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
|
||||||
return []Step{
|
return []Step{
|
||||||
NewReadReplacementStep(sg.plan, event, old, newState),
|
NewReadReplacementStep(sg.plan, event, old, newState),
|
||||||
NewReplaceStep(sg.plan, old, newState, nil, true),
|
NewReplaceStep(sg.plan, old, newState, nil, true),
|
||||||
}, nil
|
}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
if bool(logging.V(7)) && hasOld && old.ID == event.ID() {
|
if bool(logging.V(7)) && hasOld && old.ID == event.ID() {
|
||||||
|
@ -91,7 +91,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
|
||||||
sg.reads[urn] = true
|
sg.reads[urn] = true
|
||||||
return []Step{
|
return []Step{
|
||||||
NewReadStep(sg.plan, event, old, newState),
|
NewReadStep(sg.plan, event, old, newState),
|
||||||
}, nil
|
}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// GenerateSteps produces one or more steps required to achieve the goal state
|
// GenerateSteps produces one or more steps required to achieve the goal state
|
||||||
|
@ -100,7 +100,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
|
||||||
// If the given resource is a custom resource, the step generator will invoke Diff
|
// If the given resource is a custom resource, the step generator will invoke Diff
|
||||||
// and Check on the provider associated with that resource. If those fail, an error
|
// and Check on the provider associated with that resource. If those fail, an error
|
||||||
// is returned.
|
// is returned.
|
||||||
func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, error) {
|
func (sg *stepGenerator) GenerateSteps(sink diag.Sink, event RegisterResourceEvent) ([]Step, error, bool) {
|
||||||
var invalid bool // will be set to true if this object fails validation.
|
var invalid bool // will be set to true if this object fails validation.
|
||||||
|
|
||||||
goal := event.Goal()
|
goal := event.Goal()
|
||||||
|
@ -109,7 +109,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
if sg.urns[urn] {
|
if sg.urns[urn] {
|
||||||
invalid = true
|
invalid = true
|
||||||
// TODO[pulumi/pulumi-framework#19]: improve this error message!
|
// TODO[pulumi/pulumi-framework#19]: improve this error message!
|
||||||
sg.plan.Diag().Errorf(diag.GetDuplicateResourceURNError(urn), urn)
|
sink.Errorf(diag.GetDuplicateResourceURNError(urn), urn)
|
||||||
}
|
}
|
||||||
sg.urns[urn] = true
|
sg.urns[urn] = true
|
||||||
|
|
||||||
|
@ -141,11 +141,11 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
ref, refErr := providers.ParseReference(goal.Provider)
|
ref, refErr := providers.ParseReference(goal.Provider)
|
||||||
if refErr != nil {
|
if refErr != nil {
|
||||||
return nil, errors.Errorf(
|
return nil, errors.Errorf(
|
||||||
"bad provider reference '%v' for resource '%v': %v", goal.Provider, urn, refErr)
|
"bad provider reference '%v' for resource '%v': %v", goal.Provider, urn, refErr), false
|
||||||
}
|
}
|
||||||
p, ok := sg.plan.GetProvider(ref)
|
p, ok := sg.plan.GetProvider(ref)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.Errorf("unknown provider '%v' for resource '%v'", ref, urn)
|
return nil, errors.Errorf("unknown provider '%v' for resource '%v'", ref, urn), false
|
||||||
}
|
}
|
||||||
prov = p
|
prov = p
|
||||||
}
|
}
|
||||||
|
@ -174,8 +174,8 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err, false
|
||||||
} else if sg.issueCheckErrors(new, urn, failures) {
|
} else if sg.issueCheckErrors(sink, new, urn, failures) {
|
||||||
invalid = true
|
invalid = true
|
||||||
}
|
}
|
||||||
new.Inputs = inputs
|
new.Inputs = inputs
|
||||||
|
@ -186,25 +186,24 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
var analyzer plugin.Analyzer
|
var analyzer plugin.Analyzer
|
||||||
analyzer, err = sg.plan.ctx.Host.Analyzer(a)
|
analyzer, err = sg.plan.ctx.Host.Analyzer(a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err, false
|
||||||
} else if analyzer == nil {
|
} else if analyzer == nil {
|
||||||
return nil, errors.Errorf("analyzer '%v' could not be loaded from your $PATH", a)
|
return nil, errors.Errorf("analyzer '%v' could not be loaded from your $PATH", a), false
|
||||||
}
|
}
|
||||||
var failures []plugin.AnalyzeFailure
|
var failures []plugin.AnalyzeFailure
|
||||||
failures, err = analyzer.Analyze(new.Type, inputs)
|
failures, err = analyzer.Analyze(new.Type, inputs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err, false
|
||||||
}
|
}
|
||||||
for _, failure := range failures {
|
for _, failure := range failures {
|
||||||
invalid = true
|
invalid = true
|
||||||
sg.plan.Diag().Errorf(
|
sink.Errorf(diag.GetAnalyzeResourceFailureError(urn), a, urn, failure.Property, failure.Reason)
|
||||||
diag.GetAnalyzeResourceFailureError(urn), a, urn, failure.Property, failure.Reason)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the resource isn't valid, don't proceed any further.
|
// If the resource isn't valid, don't proceed any further.
|
||||||
if invalid {
|
if invalid {
|
||||||
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed")
|
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed"), true
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are four cases we need to consider when figuring out what to do with this resource.
|
// There are four cases we need to consider when figuring out what to do with this resource.
|
||||||
|
@ -231,7 +230,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
return []Step{
|
return []Step{
|
||||||
NewReplaceStep(sg.plan, old, new, nil, false),
|
NewReplaceStep(sg.plan, old, new, nil, false),
|
||||||
NewCreateReplacementStep(sg.plan, event, old, new, nil, false),
|
NewCreateReplacementStep(sg.plan, event, old, new, nil, false),
|
||||||
}, nil
|
}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Case 2: wasExternal
|
// Case 2: wasExternal
|
||||||
|
@ -246,13 +245,13 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
logging.V(7).Infof("Planner recognized '%s' as old external resource, creating instead", urn)
|
logging.V(7).Infof("Planner recognized '%s' as old external resource, creating instead", urn)
|
||||||
sg.creates[urn] = true
|
sg.creates[urn] = true
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err, false
|
||||||
}
|
}
|
||||||
|
|
||||||
return []Step{
|
return []Step{
|
||||||
NewCreateReplacementStep(sg.plan, event, old, new, nil, true),
|
NewCreateReplacementStep(sg.plan, event, old, new, nil, true),
|
||||||
NewReplaceStep(sg.plan, old, new, nil, true),
|
NewReplaceStep(sg.plan, old, new, nil, true),
|
||||||
}, nil
|
}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Case 3: hasOld
|
// Case 3: hasOld
|
||||||
|
@ -274,7 +273,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
// Determine whether the change resulted in a diff.
|
// Determine whether the change resulted in a diff.
|
||||||
d, diffErr := sg.diff(urn, old.ID, oldInputs, oldOutputs, inputs, prov, allowUnknowns)
|
d, diffErr := sg.diff(urn, old.ID, oldInputs, oldOutputs, inputs, prov, allowUnknowns)
|
||||||
if diffErr != nil {
|
if diffErr != nil {
|
||||||
return nil, diffErr
|
return nil, diffErr, false
|
||||||
}
|
}
|
||||||
diff = d
|
diff = d
|
||||||
}
|
}
|
||||||
|
@ -282,7 +281,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
// Ensure that we received a sensible response.
|
// Ensure that we received a sensible response.
|
||||||
if diff.Changes != plugin.DiffNone && diff.Changes != plugin.DiffSome {
|
if diff.Changes != plugin.DiffNone && diff.Changes != plugin.DiffSome {
|
||||||
return nil, errors.Errorf(
|
return nil, errors.Errorf(
|
||||||
"unrecognized diff state for %s: %d", urn, diff.Changes)
|
"unrecognized diff state for %s: %d", urn, diff.Changes), false
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there were changes, check for a replacement vs. an in-place update.
|
// If there were changes, check for a replacement vs. an in-place update.
|
||||||
|
@ -296,9 +295,9 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
var failures []plugin.CheckFailure
|
var failures []plugin.CheckFailure
|
||||||
inputs, failures, err = prov.Check(urn, nil, goal.Properties, allowUnknowns)
|
inputs, failures, err = prov.Check(urn, nil, goal.Properties, allowUnknowns)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err, false
|
||||||
} else if sg.issueCheckErrors(new, urn, failures) {
|
} else if sg.issueCheckErrors(sink, new, urn, failures) {
|
||||||
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed")
|
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed"), true
|
||||||
}
|
}
|
||||||
new.Inputs = inputs
|
new.Inputs = inputs
|
||||||
}
|
}
|
||||||
|
@ -358,14 +357,14 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
NewDeleteReplacementStep(sg.plan, old, false),
|
NewDeleteReplacementStep(sg.plan, old, false),
|
||||||
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, false),
|
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, false),
|
||||||
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, false),
|
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, false),
|
||||||
), nil
|
), nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
return []Step{
|
return []Step{
|
||||||
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, true),
|
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, true),
|
||||||
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, true),
|
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, true),
|
||||||
// note that the delete step is generated "later" on, after all creates/updates finish.
|
// note that the delete step is generated "later" on, after all creates/updates finish.
|
||||||
}, nil
|
}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we fell through, it's an update.
|
// If we fell through, it's an update.
|
||||||
|
@ -373,7 +372,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
if logging.V(7) {
|
if logging.V(7) {
|
||||||
logging.V(7).Infof("Planner decided to update '%v' (oldprops=%v inputs=%v", urn, oldInputs, new.Inputs)
|
logging.V(7).Infof("Planner decided to update '%v' (oldprops=%v inputs=%v", urn, oldInputs, new.Inputs)
|
||||||
}
|
}
|
||||||
return []Step{NewUpdateStep(sg.plan, event, old, new, diff.StableKeys)}, nil
|
return []Step{NewUpdateStep(sg.plan, event, old, new, diff.StableKeys)}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// No need to update anything, the properties didn't change.
|
// No need to update anything, the properties didn't change.
|
||||||
|
@ -381,7 +380,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
if logging.V(7) {
|
if logging.V(7) {
|
||||||
logging.V(7).Infof("Planner decided not to update '%v' (same) (inputs=%v)", urn, new.Inputs)
|
logging.V(7).Infof("Planner decided not to update '%v' (same) (inputs=%v)", urn, new.Inputs)
|
||||||
}
|
}
|
||||||
return []Step{NewSameStep(sg.plan, event, old, new)}, nil
|
return []Step{NewSameStep(sg.plan, event, old, new)}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Case 4: Not Case 1, 2, or 3
|
// Case 4: Not Case 1, 2, or 3
|
||||||
|
@ -389,7 +388,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
|
||||||
// it's just being created.
|
// it's just being created.
|
||||||
sg.creates[urn] = true
|
sg.creates[urn] = true
|
||||||
logging.V(7).Infof("Planner decided to create '%v' (inputs=%v)", urn, new.Inputs)
|
logging.V(7).Infof("Planner decided to create '%v' (inputs=%v)", urn, new.Inputs)
|
||||||
return []Step{NewCreateStep(sg.plan, event, new)}, nil
|
return []Step{NewCreateStep(sg.plan, event, new)}, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sg *stepGenerator) GenerateDeletes() []Step {
|
func (sg *stepGenerator) GenerateDeletes() []Step {
|
||||||
|
@ -471,7 +470,7 @@ func (sg *stepGenerator) diff(urn resource.URN, id resource.ID, oldInputs, oldOu
|
||||||
}
|
}
|
||||||
|
|
||||||
// issueCheckErrors prints any check errors to the diagnostics sink.
|
// issueCheckErrors prints any check errors to the diagnostics sink.
|
||||||
func (sg *stepGenerator) issueCheckErrors(new *resource.State, urn resource.URN,
|
func (sg *stepGenerator) issueCheckErrors(sink diag.Sink, new *resource.State, urn resource.URN,
|
||||||
failures []plugin.CheckFailure) bool {
|
failures []plugin.CheckFailure) bool {
|
||||||
if len(failures) == 0 {
|
if len(failures) == 0 {
|
||||||
return false
|
return false
|
||||||
|
@ -479,10 +478,10 @@ func (sg *stepGenerator) issueCheckErrors(new *resource.State, urn resource.URN,
|
||||||
inputs := new.Inputs
|
inputs := new.Inputs
|
||||||
for _, failure := range failures {
|
for _, failure := range failures {
|
||||||
if failure.Property != "" {
|
if failure.Property != "" {
|
||||||
sg.plan.Diag().Errorf(diag.GetResourcePropertyInvalidValueError(urn),
|
sink.Errorf(diag.GetResourcePropertyInvalidValueError(urn),
|
||||||
new.Type, urn.Name(), failure.Property, inputs[failure.Property], failure.Reason)
|
new.Type, urn.Name(), failure.Property, inputs[failure.Property], failure.Reason)
|
||||||
} else {
|
} else {
|
||||||
sg.plan.Diag().Errorf(
|
sink.Errorf(
|
||||||
diag.GetResourceInvalidError(urn), new.Type, urn.Name(), failure.Reason)
|
diag.GetResourceInvalidError(urn), new.Type, urn.Name(), failure.Reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue