Compare commits

...

2 commits

Author SHA1 Message Date
Sean Gillespie aab99a342a
Brief sketch using a bool to indicate a bail 2018-08-27 10:03:52 -07:00
Sean Gillespie 9cf2429644
plub diag.Sink everywhere 2018-08-24 15:27:23 -07:00
6 changed files with 64 additions and 65 deletions

View file

@ -194,7 +194,7 @@ func (res *planResult) Chdir() (func(), error) {
// Walk enumerates all steps in the plan, calling out to the provided action at each step. It returns four things: the
// resulting Snapshot, no matter whether an error occurs or not; an error, if something went wrong; the step that
// failed, if the error is non-nil; and finally the state of the resource modified in the failing step.
func (res *planResult) Walk(cancelCtx *Context, events deploy.Events, preview bool) error {
func (res *planResult) Walk(cancelCtx *Context, sink diag.Sink, events deploy.Events, preview bool) error {
ctx, cancelFunc := context.WithCancel(context.Background())
done := make(chan bool)
@ -206,7 +206,7 @@ func (res *planResult) Walk(cancelCtx *Context, events deploy.Events, preview bo
Refresh: res.Options.Refresh,
RefreshOnly: res.Options.isRefresh,
}
err = res.Plan.Execute(ctx, opts, preview)
err = res.Plan.Execute(ctx, sink, opts, preview)
close(done)
}()
@ -240,7 +240,7 @@ func printPlan(ctx *Context, result *planResult, dryRun bool) (ResourceChanges,
// Walk the plan's steps and and pretty-print them out.
actions := newPlanActions(result.Options)
if err := result.Walk(ctx, actions, true); err != nil {
if err := result.Walk(ctx, result.Plugctx.Diag, actions, true); err != nil {
return nil, errors.New("an error occurred while advancing the preview")
}

View file

@ -159,7 +159,7 @@ func update(ctx *Context, info *planContext, opts planOptions, dryRun bool) (Res
start := time.Now()
actions := newUpdateActions(ctx, info.Update, opts)
err = result.Walk(ctx, actions, false)
err = result.Walk(ctx, result.Plugctx.Diag, actions, false)
resourceChanges = ResourceChanges(actions.Ops)
if len(resourceChanges) != 0 {

View file

@ -226,7 +226,6 @@ func NewPlan(ctx *plugin.Context, target *Target, prev *Snapshot, source Source,
func (p *Plan) Ctx() *plugin.Context { return p.ctx }
func (p *Plan) Target() *Target { return p.target }
func (p *Plan) Diag() diag.Sink { return p.ctx.Diag }
func (p *Plan) Prev() *Snapshot { return p.prev }
func (p *Plan) Olds() map[resource.URN]*resource.State { return p.olds }
func (p *Plan) Source() Source { return p.source }
@ -272,7 +271,7 @@ func (p *Plan) generateEventURN(event SourceEvent) resource.URN {
// Execute executes a plan to completion, using the given cancellation context and running a preview
// or update.
func (p *Plan) Execute(ctx context.Context, opts Options, preview bool) error {
func (p *Plan) Execute(ctx context.Context, sink diag.Sink, opts Options, preview bool) error {
planExec := &planExecutor{plan: p}
return planExec.Execute(ctx, opts, preview)
return planExec.Execute(ctx, sink, opts, preview)
}

View file

@ -49,13 +49,13 @@ func execError(message string, preview bool) error {
}
// reportError reports a single error to the executor's diag stream with the indicated URN for context.
func (pe *planExecutor) reportError(urn resource.URN, err error) {
pe.plan.Diag().Errorf(diag.RawMessage(urn, err.Error()))
func (pe *planExecutor) reportError(sink diag.Sink, urn resource.URN, err error) {
sink.Errorf(diag.RawMessage(urn, err.Error()))
}
// Execute executes a plan to completion, using the given cancellation context and running a preview
// or update.
func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview bool) error {
func (pe *planExecutor) Execute(callerCtx context.Context, diagSink diag.Sink, opts Options, preview bool) error {
// Set up a goroutine that will signal cancellation to the plan's plugins if the caller context is cancelled. We do
// not hang this off of the context we create below because we do not want the failure of a single step to cause
// other steps to fail.
@ -73,7 +73,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
// Before doing anything else, optionally refresh each resource in the base checkpoint.
if opts.Refresh {
if err := pe.refresh(callerCtx, opts, preview); err != nil {
if err := pe.refresh(callerCtx, diagSink, opts, preview); err != nil {
return err
}
if opts.RefreshOnly {
@ -93,7 +93,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
// Set up a step generator and executor for this plan.
pe.stepGen = newStepGenerator(pe.plan, opts)
pe.stepExec = newStepExecutor(ctx, cancel, pe.plan, opts, preview)
pe.stepExec = newStepExecutor(ctx, cancel, diagSink, opts, preview)
// We iterate the source in its own goroutine because iteration is blocking and we want the main loop to be able to
// respond to cancellation requests promptly.
@ -133,7 +133,7 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
log.Infof("planExecutor.Execute(...): incoming event (nil? %v, %v)", event.Event == nil, event.Error)
if event.Error != nil {
pe.reportError("", event.Error)
pe.reportError(diagSink, "", event.Error)
cancel()
return false, event.Error
}
@ -153,9 +153,11 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
return false, nil
}
if eventErr := pe.handleSingleEvent(event.Event); eventErr != nil {
if eventErr, bailed := pe.handleSingleEvent(diagSink, event.Event); eventErr != nil {
log.Infof("planExecutor.Execute(...): error handling event: %v", eventErr)
pe.reportError(pe.plan.generateEventURN(event.Event), eventErr)
if !bailed {
pe.reportError(diagSink, pe.plan.generateEventURN(event.Event), eventErr)
}
cancel()
return false, eventErr
}
@ -184,33 +186,34 @@ func (pe *planExecutor) Execute(callerCtx context.Context, opts Options, preview
// handleSingleEvent handles a single source event. For all incoming events, it produces a chain that needs
// to be executed and schedules the chain for execution.
func (pe *planExecutor) handleSingleEvent(event SourceEvent) error {
func (pe *planExecutor) handleSingleEvent(sink diag.Sink, event SourceEvent) (error, bool) {
contract.Require(event != nil, "event != nil")
var steps []Step
var err error
var bailed bool
switch e := event.(type) {
case RegisterResourceEvent:
log.Infof("planExecutor.handleSingleEvent(...): received RegisterResourceEvent")
steps, err = pe.stepGen.GenerateSteps(e)
steps, err, bailed = pe.stepGen.GenerateSteps(sink, e)
case ReadResourceEvent:
log.Infof("planExecutor.handleSingleEvent(...): received ReadResourceEvent")
steps, err = pe.stepGen.GenerateReadSteps(e)
steps, err, bailed = pe.stepGen.GenerateReadSteps(e)
case RegisterResourceOutputsEvent:
log.Infof("planExecutor.handleSingleEvent(...): received register resource outputs")
pe.stepExec.ExecuteRegisterResourceOutputs(e)
return nil
pe.stepExec.ExecuteRegisterResourceOutputs(sink, e)
return nil, false
}
if err != nil {
return err
return err, bailed
}
pe.stepExec.Execute(steps)
return nil
return nil, false
}
// refresh refreshes the state of the base checkpoint file for the current plan in memory.
func (pe *planExecutor) refresh(callerCtx context.Context, opts Options, preview bool) error {
func (pe *planExecutor) refresh(callerCtx context.Context, sink diag.Sink, opts Options, preview bool) error {
prev := pe.plan.prev
if prev == nil || len(prev.Resources) == 0 {
return nil
@ -218,7 +221,7 @@ func (pe *planExecutor) refresh(callerCtx context.Context, opts Options, preview
// Fire up a worker pool and issue a refresh step per resource in the old snapshot.
ctx, cancel := context.WithCancel(callerCtx)
stepExec := newStepExecutor(ctx, cancel, pe.plan, opts, preview)
stepExec := newStepExecutor(ctx, cancel, sink, opts, preview)
canceled := false
steps := make([]Step, len(prev.Resources))

View file

@ -52,7 +52,6 @@ type Chain = []Step
// resolved, we (the engine) can assume that any chain given to us by the step generator is already
// ready to execute.
type stepExecutor struct {
plan *Plan // The plan currently being executed.
opts Options // The options for this current plan.
preview bool // Whether or not we are doing a preview.
pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs.
@ -83,7 +82,7 @@ func (se *stepExecutor) Execute(chain Chain) {
}
// ExecuteRegisterResourceOutputs services a RegisterResourceOutputsEvent synchronously on the calling goroutine.
func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputsEvent) {
func (se *stepExecutor) ExecuteRegisterResourceOutputs(sink diag.Sink, e RegisterResourceOutputsEvent) {
// Look up the final state in the pending registration list.
urn := e.URN()
value, has := se.pendingNews.Load(urn)
@ -110,7 +109,7 @@ func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputs
// of these are particularly appealing right now.
outErr := errors.Wrap(eventerr, "resource complete event returned an error")
diagMsg := diag.RawMessage(reg.URN(), outErr.Error())
se.plan.Diag().Errorf(diagMsg)
sink.Errorf(diagMsg)
se.cancelDueToError()
return
}
@ -145,7 +144,7 @@ func (se *stepExecutor) WaitForCompletion() {
// executeChain executes a chain, one step at a time. If any step in the chain fails to execute, or if the
// context is canceled, the chain stops execution.
func (se *stepExecutor) executeChain(workerID int, chain Chain) {
func (se *stepExecutor) executeChain(sink diag.Sink, workerID int, chain Chain) {
for _, step := range chain {
select {
case <-se.ctx.Done():
@ -164,7 +163,7 @@ func (se *stepExecutor) executeChain(workerID int, chain Chain) {
// The errStepApplyFailed sentinel signals that the error that failed this chain was a step apply
// error and that we shouldn't log it. Everything else should be logged to the diag system as usual.
diagMsg := diag.RawMessage(step.URN(), err.Error())
se.plan.Diag().Errorf(diagMsg)
sink.Errorf(diagMsg)
}
return
}
@ -262,7 +261,7 @@ func (se *stepExecutor) log(workerID int, msg string, args ...interface{}) {
// worker is the base function for all step executor worker goroutines. It continuously polls for new chains
// and executes any that it gets from the channel.
func (se *stepExecutor) worker(workerID int) {
func (se *stepExecutor) worker(sink diag.Sink, workerID int) {
se.log(workerID, "worker coming online")
defer se.workers.Done()
@ -276,7 +275,7 @@ func (se *stepExecutor) worker(workerID int) {
}
se.log(workerID, "worker received chain for execution")
se.executeChain(workerID, chain)
se.executeChain(sink, workerID, chain)
case <-se.ctx.Done():
se.log(workerID, "worker exiting due to cancellation")
return
@ -284,10 +283,9 @@ func (se *stepExecutor) worker(workerID int) {
}
}
func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan, opts Options,
func newStepExecutor(ctx context.Context, cancel context.CancelFunc, sink diag.Sink, opts Options,
preview bool) *stepExecutor {
exec := &stepExecutor{
plan: plan,
opts: opts,
preview: preview,
incomingChains: make(chan Chain),
@ -299,7 +297,7 @@ func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan,
fanout := opts.DegreeOfParallelism()
for i := 0; i < fanout; i++ {
exec.workers.Add(1)
go exec.worker(i)
go exec.worker(sink, i)
}
return exec

View file

@ -44,7 +44,7 @@ type stepGenerator struct {
// GenerateReadSteps is responsible for producing one or more steps required to service
// a ReadResourceEvent coming from the language host.
func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, error) {
func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, error, bool) {
urn := sg.plan.generateURN(event.Parent(), event.Type(), event.Name())
newState := resource.NewState(event.Type(),
urn,
@ -81,7 +81,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
return []Step{
NewReadReplacementStep(sg.plan, event, old, newState),
NewReplaceStep(sg.plan, old, newState, nil, true),
}, nil
}, nil, false
}
if bool(logging.V(7)) && hasOld && old.ID == event.ID() {
@ -91,7 +91,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
sg.reads[urn] = true
return []Step{
NewReadStep(sg.plan, event, old, newState),
}, nil
}, nil, false
}
// GenerateSteps produces one or more steps required to achieve the goal state
@ -100,7 +100,7 @@ func (sg *stepGenerator) GenerateReadSteps(event ReadResourceEvent) ([]Step, err
// If the given resource is a custom resource, the step generator will invoke Diff
// and Check on the provider associated with that resource. If those fail, an error
// is returned.
func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, error) {
func (sg *stepGenerator) GenerateSteps(sink diag.Sink, event RegisterResourceEvent) ([]Step, error, bool) {
var invalid bool // will be set to true if this object fails validation.
goal := event.Goal()
@ -109,7 +109,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
if sg.urns[urn] {
invalid = true
// TODO[pulumi/pulumi-framework#19]: improve this error message!
sg.plan.Diag().Errorf(diag.GetDuplicateResourceURNError(urn), urn)
sink.Errorf(diag.GetDuplicateResourceURNError(urn), urn)
}
sg.urns[urn] = true
@ -141,11 +141,11 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
ref, refErr := providers.ParseReference(goal.Provider)
if refErr != nil {
return nil, errors.Errorf(
"bad provider reference '%v' for resource '%v': %v", goal.Provider, urn, refErr)
"bad provider reference '%v' for resource '%v': %v", goal.Provider, urn, refErr), false
}
p, ok := sg.plan.GetProvider(ref)
if !ok {
return nil, errors.Errorf("unknown provider '%v' for resource '%v'", ref, urn)
return nil, errors.Errorf("unknown provider '%v' for resource '%v'", ref, urn), false
}
prov = p
}
@ -174,8 +174,8 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
}
if err != nil {
return nil, err
} else if sg.issueCheckErrors(new, urn, failures) {
return nil, err, false
} else if sg.issueCheckErrors(sink, new, urn, failures) {
invalid = true
}
new.Inputs = inputs
@ -186,25 +186,24 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
var analyzer plugin.Analyzer
analyzer, err = sg.plan.ctx.Host.Analyzer(a)
if err != nil {
return nil, err
return nil, err, false
} else if analyzer == nil {
return nil, errors.Errorf("analyzer '%v' could not be loaded from your $PATH", a)
return nil, errors.Errorf("analyzer '%v' could not be loaded from your $PATH", a), false
}
var failures []plugin.AnalyzeFailure
failures, err = analyzer.Analyze(new.Type, inputs)
if err != nil {
return nil, err
return nil, err, false
}
for _, failure := range failures {
invalid = true
sg.plan.Diag().Errorf(
diag.GetAnalyzeResourceFailureError(urn), a, urn, failure.Property, failure.Reason)
sink.Errorf(diag.GetAnalyzeResourceFailureError(urn), a, urn, failure.Property, failure.Reason)
}
}
// If the resource isn't valid, don't proceed any further.
if invalid {
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed")
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed"), true
}
// There are four cases we need to consider when figuring out what to do with this resource.
@ -231,7 +230,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
return []Step{
NewReplaceStep(sg.plan, old, new, nil, false),
NewCreateReplacementStep(sg.plan, event, old, new, nil, false),
}, nil
}, nil, false
}
// Case 2: wasExternal
@ -246,13 +245,13 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
logging.V(7).Infof("Planner recognized '%s' as old external resource, creating instead", urn)
sg.creates[urn] = true
if err != nil {
return nil, err
return nil, err, false
}
return []Step{
NewCreateReplacementStep(sg.plan, event, old, new, nil, true),
NewReplaceStep(sg.plan, old, new, nil, true),
}, nil
}, nil, false
}
// Case 3: hasOld
@ -274,7 +273,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
// Determine whether the change resulted in a diff.
d, diffErr := sg.diff(urn, old.ID, oldInputs, oldOutputs, inputs, prov, allowUnknowns)
if diffErr != nil {
return nil, diffErr
return nil, diffErr, false
}
diff = d
}
@ -282,7 +281,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
// Ensure that we received a sensible response.
if diff.Changes != plugin.DiffNone && diff.Changes != plugin.DiffSome {
return nil, errors.Errorf(
"unrecognized diff state for %s: %d", urn, diff.Changes)
"unrecognized diff state for %s: %d", urn, diff.Changes), false
}
// If there were changes, check for a replacement vs. an in-place update.
@ -296,9 +295,9 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
var failures []plugin.CheckFailure
inputs, failures, err = prov.Check(urn, nil, goal.Properties, allowUnknowns)
if err != nil {
return nil, err
} else if sg.issueCheckErrors(new, urn, failures) {
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed")
return nil, err, false
} else if sg.issueCheckErrors(sink, new, urn, failures) {
return nil, errors.New("One or more resource validation errors occurred; refusing to proceed"), true
}
new.Inputs = inputs
}
@ -358,14 +357,14 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
NewDeleteReplacementStep(sg.plan, old, false),
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, false),
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, false),
), nil
), nil, false
}
return []Step{
NewCreateReplacementStep(sg.plan, event, old, new, diff.ReplaceKeys, true),
NewReplaceStep(sg.plan, old, new, diff.ReplaceKeys, true),
// note that the delete step is generated "later" on, after all creates/updates finish.
}, nil
}, nil, false
}
// If we fell through, it's an update.
@ -373,7 +372,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
if logging.V(7) {
logging.V(7).Infof("Planner decided to update '%v' (oldprops=%v inputs=%v", urn, oldInputs, new.Inputs)
}
return []Step{NewUpdateStep(sg.plan, event, old, new, diff.StableKeys)}, nil
return []Step{NewUpdateStep(sg.plan, event, old, new, diff.StableKeys)}, nil, false
}
// No need to update anything, the properties didn't change.
@ -381,7 +380,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
if logging.V(7) {
logging.V(7).Infof("Planner decided not to update '%v' (same) (inputs=%v)", urn, new.Inputs)
}
return []Step{NewSameStep(sg.plan, event, old, new)}, nil
return []Step{NewSameStep(sg.plan, event, old, new)}, nil, false
}
// Case 4: Not Case 1, 2, or 3
@ -389,7 +388,7 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, err
// it's just being created.
sg.creates[urn] = true
logging.V(7).Infof("Planner decided to create '%v' (inputs=%v)", urn, new.Inputs)
return []Step{NewCreateStep(sg.plan, event, new)}, nil
return []Step{NewCreateStep(sg.plan, event, new)}, nil, false
}
func (sg *stepGenerator) GenerateDeletes() []Step {
@ -471,7 +470,7 @@ func (sg *stepGenerator) diff(urn resource.URN, id resource.ID, oldInputs, oldOu
}
// issueCheckErrors prints any check errors to the diagnostics sink.
func (sg *stepGenerator) issueCheckErrors(new *resource.State, urn resource.URN,
func (sg *stepGenerator) issueCheckErrors(sink diag.Sink, new *resource.State, urn resource.URN,
failures []plugin.CheckFailure) bool {
if len(failures) == 0 {
return false
@ -479,10 +478,10 @@ func (sg *stepGenerator) issueCheckErrors(new *resource.State, urn resource.URN,
inputs := new.Inputs
for _, failure := range failures {
if failure.Property != "" {
sg.plan.Diag().Errorf(diag.GetResourcePropertyInvalidValueError(urn),
sink.Errorf(diag.GetResourcePropertyInvalidValueError(urn),
new.Type, urn.Name(), failure.Property, inputs[failure.Property], failure.Reason)
} else {
sg.plan.Diag().Errorf(
sink.Errorf(
diag.GetResourceInvalidError(urn), new.Type, urn.Name(), failure.Reason)
}
}