diff --git a/cmd/lumi/deploy.go b/cmd/lumi/deploy.go index 26fa3beb7..e43afe214 100644 --- a/cmd/lumi/deploy.go +++ b/cmd/lumi/deploy.go @@ -18,10 +18,12 @@ package main import ( "bytes" "fmt" + "time" "github.com/spf13/cobra" "github.com/pulumi/lumi/pkg/compiler/errors" + "github.com/pulumi/lumi/pkg/diag" "github.com/pulumi/lumi/pkg/diag/colors" "github.com/pulumi/lumi/pkg/resource" "github.com/pulumi/lumi/pkg/resource/deploy" @@ -59,7 +61,7 @@ func newDeployCmd() *cobra.Command { if err != nil { return err } - planAndDeploy(cmd, info, deployOptions{ + deployLatest(cmd, info, deployOptions{ Delete: false, DryRun: dryRun, Analyzers: analyzers, @@ -114,6 +116,52 @@ type deployOptions struct { Output string // the place to store the output, if any. } +func deployLatest(cmd *cobra.Command, info *envCmdInfo, opts deployOptions) { + if result := plan(cmd, info, opts); result != nil { + if opts.DryRun { + // If a dry run, just print the plan, don't actually carry out the deployment. + printPlan(result, opts) + } else { + // Otherwise, we will actually deploy the latest bits. + var header bytes.Buffer + printPrelude(&header, result, opts, false) + header.WriteString(fmt.Sprintf("%vDeploying changes:%v\n", colors.SpecUnimportant, colors.Reset)) + fmt.Printf(colors.Colorize(&header)) + + // Create an object to track progress and perform the actual operations. + start := time.Now() + progress := newProgress(opts.Summary) + summary, _, _, _ := result.Plan.Apply(progress) + contract.Assert(summary != nil) + empty := (summary.Steps() == 0) // if no step is returned, it was empty. + + // Print a summary. + var footer bytes.Buffer + if empty { + cmdutil.Diag().Infof(diag.Message("no resources need to be updated")) + } else { + // Print out the total number of steps performed (and their kinds), the duration, and any summary info. + printSummary(&footer, progress.Ops, opts.ShowReplaceSteps, false) + footer.WriteString(fmt.Sprintf("%vDeployment duration: %v%v\n", + colors.SpecUnimportant, time.Since(start), colors.Reset)) + } + + if progress.MaybeCorrupt { + footer.WriteString(fmt.Sprintf( + "%vA catastrophic error occurred; resources states may be unknown%v\n", + colors.SpecAttention, colors.Reset)) + } + + // Now save the updated snapshot to the specified output file, if any, or the standard location otherwise. + // Note that if a failure has occurred, the Apply routine above will have returned a safe checkpoint. + targ := result.Info.Target + saveEnv(targ, summary.Snap(), opts.Output, true /*overwrite*/) + + fmt.Printf(colors.Colorize(&footer)) + } + } +} + // deployProgress pretty-prints the plan application process as it goes. type deployProgress struct { Steps int diff --git a/cmd/lumi/destroy.go b/cmd/lumi/destroy.go index c852b68c3..e59805414 100644 --- a/cmd/lumi/destroy.go +++ b/cmd/lumi/destroy.go @@ -46,7 +46,7 @@ func newDestroyCmd() *cobra.Command { name := info.Target.Name if dryRun || yes || confirmPrompt("This will permanently destroy all resources in the '%v' environment!", name) { - planAndDeploy(cmd, info, deployOptions{ + deployLatest(cmd, info, deployOptions{ Delete: true, DryRun: dryRun, Summary: summary, diff --git a/cmd/lumi/plan.go b/cmd/lumi/plan.go index 0153a1487..9cbf9041e 100644 --- a/cmd/lumi/plan.go +++ b/cmd/lumi/plan.go @@ -20,7 +20,6 @@ import ( "fmt" "sort" "strconv" - "time" "github.com/spf13/cobra" @@ -62,7 +61,7 @@ func newPlanCmd() *cobra.Command { return err } contract.Assertf(!dotOutput, "TODO[pulumi/lumi#235]: DOT files not yet supported") - planAndDeploy(cmd, info, deployOptions{ + opts := deployOptions{ Delete: false, DryRun: true, Analyzers: analyzers, @@ -71,7 +70,10 @@ func newPlanCmd() *cobra.Command { ShowSames: showSames, Summary: summary, DOT: dotOutput, - }) + } + if result := plan(cmd, info, opts); result != nil { + printPlan(result, opts) + } return nil }), } @@ -150,53 +152,6 @@ type planResult struct { Plan *deploy.Plan // the plan created by this command. } -func planAndDeploy(cmd *cobra.Command, info *envCmdInfo, opts deployOptions) { - if result := plan(cmd, info, opts); result != nil { - // Now based on whether a dry run was specified, or not, either print or perform the planned operations. - if opts.DryRun { - printPlan(result, opts) - } else { - // If show unchanged was requested, print them first, along with a header. - var header bytes.Buffer - printPrelude(&header, result, opts, false) - header.WriteString(fmt.Sprintf("%vDeploying changes:%v\n", colors.SpecUnimportant, colors.Reset)) - fmt.Printf(colors.Colorize(&header)) - - // Create an object to track progress and perform the actual operations. - start := time.Now() - progress := newProgress(opts.Summary) - summary, _, _, _ := result.Plan.Apply(progress) - contract.Assert(summary != nil) - empty := (summary.Steps() == 0) // if no step is returned, it was empty. - - // Print a summary. - var footer bytes.Buffer - - if empty { - cmdutil.Diag().Infof(diag.Message("no resources need to be updated")) - } else { - // Print out the total number of steps performed (and their kinds), the duration, and any summary info. - printSummary(&footer, progress.Ops, opts.ShowReplaceSteps, false) - footer.WriteString(fmt.Sprintf("%vDeployment duration: %v%v\n", - colors.SpecUnimportant, time.Since(start), colors.Reset)) - } - - if progress.MaybeCorrupt { - footer.WriteString(fmt.Sprintf( - "%vA catastrophic error occurred; resources states may be unknown%v\n", - colors.SpecAttention, colors.Reset)) - } - - // Now save the updated snapshot to the specified output file, if any, or the standard location otherwise. - // Note that if a failure has occurred, the Apply routine above will have returned a safe checkpoint. - targ := result.Info.Target - saveEnv(targ, summary.Snap(), opts.Output, true /*overwrite*/) - - fmt.Printf(colors.Colorize(&footer)) - } - } -} - func printPlan(result *planResult, opts deployOptions) { // First print config/unchanged/etc. if necessary. var prelude bytes.Buffer diff --git a/pkg/resource/deploy/plan_apply.go b/pkg/resource/deploy/plan_apply.go index 95ede5e3a..27676a4e8 100644 --- a/pkg/resource/deploy/plan_apply.go +++ b/pkg/resource/deploy/plan_apply.go @@ -87,6 +87,7 @@ func (p *Plan) Iterate() (*PlanIterator, error) { replaces: make(map[resource.URN]bool), deletes: make(map[resource.URN]bool), sames: make(map[resource.URN]bool), + dones: make(map[*resource.State]bool), }, nil } @@ -113,8 +114,9 @@ type PlanIterator struct { deletes map[resource.URN]bool // URNs discovered to be deleted. sames map[resource.URN]bool // URNs discovered to be the same. - delqueue []*resource.State // a queue of deletes left to perform. - resources []*resource.State // the resulting ordered resource states. + delqueue []*resource.State // a queue of deletes left to perform. + resources []*resource.State // the resulting ordered resource states. + dones map[*resource.State]bool // true for each old state we're done with. srcdone bool // true if the source interpreter has been run to completion. done bool // true if the planning and associated iteration has finished. @@ -130,6 +132,7 @@ func (iter *PlanIterator) Replaces() map[resource.URN]bool { return iter.replace func (iter *PlanIterator) Deletes() map[resource.URN]bool { return iter.deletes } func (iter *PlanIterator) Sames() map[resource.URN]bool { return iter.sames } func (iter *PlanIterator) Resources() []*resource.State { return iter.resources } +func (iter *PlanIterator) Dones() map[*resource.State]bool { return iter.dones } func (iter *PlanIterator) Done() bool { return iter.done } // Next advances the plan by a single step, and returns the next step to be performed. In doing so, it will perform @@ -310,12 +313,36 @@ func (iter *PlanIterator) calculateDeletes() []*resource.State { // failure happens partway through, the untouched snapshot elements will be retained, while any updates will be // preserved. If no failure happens, the snapshot naturally reflects the final state of all resources. func (iter *PlanIterator) Snap() *Snapshot { - return NewSnapshot(iter.p.Target().Name, iter.resources, iter.p.new.Info()) + var resources []*resource.State + + // If we didn't finish the execution, we must produce a partial snapshot of old plus new states. + if !iter.done { + if prev := iter.p.prev; prev != nil { + for _, res := range prev.Resources { + if !iter.dones[res] { + resources = append(resources, res) + } + } + } + } else { + contract.Assert(len(iter.dones) == 0) + } + + // Always add the new resoures afterwards that got produced during the evaluation of the current plan. + resources = append(resources, iter.resources...) + + return NewSnapshot(iter.p.Target().Name, resources, iter.p.new.Info()) +} + +// MarkStateSnapshot marks an old state snapshot as being processed. This is done to recover from failures partway +// through the application of a deployment plan. Any old state that has not yet been recovered needs to be kept. +func (iter *PlanIterator) MarkStateSnapshot(state *resource.State) { + iter.dones[state] = true } // AppendStateSnapshot appends a resource's state to the current snapshot. func (iter *PlanIterator) AppendStateSnapshot(state *resource.State) { - iter.resources = append(iter.resources, state) // add this state to the pending list. + iter.resources = append(iter.resources, state) } // Provider fetches the provider for a given resource, possibly lazily allocating the plugins for it. If a provider diff --git a/pkg/resource/deploy/step.go b/pkg/resource/deploy/step.go index 18ebc8909..c75f66fb9 100644 --- a/pkg/resource/deploy/step.go +++ b/pkg/resource/deploy/step.go @@ -91,6 +91,7 @@ func (s *Step) Apply() (resource.Status, error) { contract.Assert(s.old != nil) contract.Assert(s.new != nil) s.new.Update(s.old.ID(), s.old.Outputs()) + s.iter.MarkStateSnapshot(s.old) s.iter.AppendStateSnapshot(s.old) case OpCreate, OpReplaceCreate: @@ -111,6 +112,9 @@ func (s *Step) Apply() (resource.Status, error) { } s.outputs = outs state := s.new.Update(id, outs) + if s.old != nil { + s.iter.MarkStateSnapshot(s.old) + } s.iter.AppendStateSnapshot(state) case OpDelete, OpReplaceDelete: @@ -120,6 +124,7 @@ func (s *Step) Apply() (resource.Status, error) { if rst, err := prov.Delete(s.old.Type(), s.old.ID()); err != nil { return rst, err } + s.iter.MarkStateSnapshot(s.old) case OpUpdate: // Invoke the Update RPC function for this provider: @@ -140,6 +145,7 @@ func (s *Step) Apply() (resource.Status, error) { } s.outputs = outs state := s.new.Update(id, outs) + s.iter.MarkStateSnapshot(s.old) s.iter.AppendStateSnapshot(state) default: