Recover from deployment failures

This commit is contained in:
joeduffy 2017-06-12 07:16:08 -07:00
parent 2eb78e1036
commit 0d836ae0bd
5 changed files with 92 additions and 56 deletions

View file

@ -18,10 +18,12 @@ package main
import (
"bytes"
"fmt"
"time"
"github.com/spf13/cobra"
"github.com/pulumi/lumi/pkg/compiler/errors"
"github.com/pulumi/lumi/pkg/diag"
"github.com/pulumi/lumi/pkg/diag/colors"
"github.com/pulumi/lumi/pkg/resource"
"github.com/pulumi/lumi/pkg/resource/deploy"
@ -59,7 +61,7 @@ func newDeployCmd() *cobra.Command {
if err != nil {
return err
}
planAndDeploy(cmd, info, deployOptions{
deployLatest(cmd, info, deployOptions{
Delete: false,
DryRun: dryRun,
Analyzers: analyzers,
@ -114,6 +116,52 @@ type deployOptions struct {
Output string // the place to store the output, if any.
}
func deployLatest(cmd *cobra.Command, info *envCmdInfo, opts deployOptions) {
if result := plan(cmd, info, opts); result != nil {
if opts.DryRun {
// If a dry run, just print the plan, don't actually carry out the deployment.
printPlan(result, opts)
} else {
// Otherwise, we will actually deploy the latest bits.
var header bytes.Buffer
printPrelude(&header, result, opts, false)
header.WriteString(fmt.Sprintf("%vDeploying changes:%v\n", colors.SpecUnimportant, colors.Reset))
fmt.Printf(colors.Colorize(&header))
// Create an object to track progress and perform the actual operations.
start := time.Now()
progress := newProgress(opts.Summary)
summary, _, _, _ := result.Plan.Apply(progress)
contract.Assert(summary != nil)
empty := (summary.Steps() == 0) // if no step is returned, it was empty.
// Print a summary.
var footer bytes.Buffer
if empty {
cmdutil.Diag().Infof(diag.Message("no resources need to be updated"))
} else {
// Print out the total number of steps performed (and their kinds), the duration, and any summary info.
printSummary(&footer, progress.Ops, opts.ShowReplaceSteps, false)
footer.WriteString(fmt.Sprintf("%vDeployment duration: %v%v\n",
colors.SpecUnimportant, time.Since(start), colors.Reset))
}
if progress.MaybeCorrupt {
footer.WriteString(fmt.Sprintf(
"%vA catastrophic error occurred; resources states may be unknown%v\n",
colors.SpecAttention, colors.Reset))
}
// Now save the updated snapshot to the specified output file, if any, or the standard location otherwise.
// Note that if a failure has occurred, the Apply routine above will have returned a safe checkpoint.
targ := result.Info.Target
saveEnv(targ, summary.Snap(), opts.Output, true /*overwrite*/)
fmt.Printf(colors.Colorize(&footer))
}
}
}
// deployProgress pretty-prints the plan application process as it goes.
type deployProgress struct {
Steps int

View file

@ -46,7 +46,7 @@ func newDestroyCmd() *cobra.Command {
name := info.Target.Name
if dryRun || yes ||
confirmPrompt("This will permanently destroy all resources in the '%v' environment!", name) {
planAndDeploy(cmd, info, deployOptions{
deployLatest(cmd, info, deployOptions{
Delete: true,
DryRun: dryRun,
Summary: summary,

View file

@ -20,7 +20,6 @@ import (
"fmt"
"sort"
"strconv"
"time"
"github.com/spf13/cobra"
@ -62,7 +61,7 @@ func newPlanCmd() *cobra.Command {
return err
}
contract.Assertf(!dotOutput, "TODO[pulumi/lumi#235]: DOT files not yet supported")
planAndDeploy(cmd, info, deployOptions{
opts := deployOptions{
Delete: false,
DryRun: true,
Analyzers: analyzers,
@ -71,7 +70,10 @@ func newPlanCmd() *cobra.Command {
ShowSames: showSames,
Summary: summary,
DOT: dotOutput,
})
}
if result := plan(cmd, info, opts); result != nil {
printPlan(result, opts)
}
return nil
}),
}
@ -150,53 +152,6 @@ type planResult struct {
Plan *deploy.Plan // the plan created by this command.
}
func planAndDeploy(cmd *cobra.Command, info *envCmdInfo, opts deployOptions) {
if result := plan(cmd, info, opts); result != nil {
// Now based on whether a dry run was specified, or not, either print or perform the planned operations.
if opts.DryRun {
printPlan(result, opts)
} else {
// If show unchanged was requested, print them first, along with a header.
var header bytes.Buffer
printPrelude(&header, result, opts, false)
header.WriteString(fmt.Sprintf("%vDeploying changes:%v\n", colors.SpecUnimportant, colors.Reset))
fmt.Printf(colors.Colorize(&header))
// Create an object to track progress and perform the actual operations.
start := time.Now()
progress := newProgress(opts.Summary)
summary, _, _, _ := result.Plan.Apply(progress)
contract.Assert(summary != nil)
empty := (summary.Steps() == 0) // if no step is returned, it was empty.
// Print a summary.
var footer bytes.Buffer
if empty {
cmdutil.Diag().Infof(diag.Message("no resources need to be updated"))
} else {
// Print out the total number of steps performed (and their kinds), the duration, and any summary info.
printSummary(&footer, progress.Ops, opts.ShowReplaceSteps, false)
footer.WriteString(fmt.Sprintf("%vDeployment duration: %v%v\n",
colors.SpecUnimportant, time.Since(start), colors.Reset))
}
if progress.MaybeCorrupt {
footer.WriteString(fmt.Sprintf(
"%vA catastrophic error occurred; resources states may be unknown%v\n",
colors.SpecAttention, colors.Reset))
}
// Now save the updated snapshot to the specified output file, if any, or the standard location otherwise.
// Note that if a failure has occurred, the Apply routine above will have returned a safe checkpoint.
targ := result.Info.Target
saveEnv(targ, summary.Snap(), opts.Output, true /*overwrite*/)
fmt.Printf(colors.Colorize(&footer))
}
}
}
func printPlan(result *planResult, opts deployOptions) {
// First print config/unchanged/etc. if necessary.
var prelude bytes.Buffer

View file

@ -87,6 +87,7 @@ func (p *Plan) Iterate() (*PlanIterator, error) {
replaces: make(map[resource.URN]bool),
deletes: make(map[resource.URN]bool),
sames: make(map[resource.URN]bool),
dones: make(map[*resource.State]bool),
}, nil
}
@ -113,8 +114,9 @@ type PlanIterator struct {
deletes map[resource.URN]bool // URNs discovered to be deleted.
sames map[resource.URN]bool // URNs discovered to be the same.
delqueue []*resource.State // a queue of deletes left to perform.
resources []*resource.State // the resulting ordered resource states.
delqueue []*resource.State // a queue of deletes left to perform.
resources []*resource.State // the resulting ordered resource states.
dones map[*resource.State]bool // true for each old state we're done with.
srcdone bool // true if the source interpreter has been run to completion.
done bool // true if the planning and associated iteration has finished.
@ -130,6 +132,7 @@ func (iter *PlanIterator) Replaces() map[resource.URN]bool { return iter.replace
func (iter *PlanIterator) Deletes() map[resource.URN]bool { return iter.deletes }
func (iter *PlanIterator) Sames() map[resource.URN]bool { return iter.sames }
func (iter *PlanIterator) Resources() []*resource.State { return iter.resources }
func (iter *PlanIterator) Dones() map[*resource.State]bool { return iter.dones }
func (iter *PlanIterator) Done() bool { return iter.done }
// Next advances the plan by a single step, and returns the next step to be performed. In doing so, it will perform
@ -310,12 +313,36 @@ func (iter *PlanIterator) calculateDeletes() []*resource.State {
// failure happens partway through, the untouched snapshot elements will be retained, while any updates will be
// preserved. If no failure happens, the snapshot naturally reflects the final state of all resources.
func (iter *PlanIterator) Snap() *Snapshot {
return NewSnapshot(iter.p.Target().Name, iter.resources, iter.p.new.Info())
var resources []*resource.State
// If we didn't finish the execution, we must produce a partial snapshot of old plus new states.
if !iter.done {
if prev := iter.p.prev; prev != nil {
for _, res := range prev.Resources {
if !iter.dones[res] {
resources = append(resources, res)
}
}
}
} else {
contract.Assert(len(iter.dones) == 0)
}
// Always add the new resoures afterwards that got produced during the evaluation of the current plan.
resources = append(resources, iter.resources...)
return NewSnapshot(iter.p.Target().Name, resources, iter.p.new.Info())
}
// MarkStateSnapshot marks an old state snapshot as being processed. This is done to recover from failures partway
// through the application of a deployment plan. Any old state that has not yet been recovered needs to be kept.
func (iter *PlanIterator) MarkStateSnapshot(state *resource.State) {
iter.dones[state] = true
}
// AppendStateSnapshot appends a resource's state to the current snapshot.
func (iter *PlanIterator) AppendStateSnapshot(state *resource.State) {
iter.resources = append(iter.resources, state) // add this state to the pending list.
iter.resources = append(iter.resources, state)
}
// Provider fetches the provider for a given resource, possibly lazily allocating the plugins for it. If a provider

View file

@ -91,6 +91,7 @@ func (s *Step) Apply() (resource.Status, error) {
contract.Assert(s.old != nil)
contract.Assert(s.new != nil)
s.new.Update(s.old.ID(), s.old.Outputs())
s.iter.MarkStateSnapshot(s.old)
s.iter.AppendStateSnapshot(s.old)
case OpCreate, OpReplaceCreate:
@ -111,6 +112,9 @@ func (s *Step) Apply() (resource.Status, error) {
}
s.outputs = outs
state := s.new.Update(id, outs)
if s.old != nil {
s.iter.MarkStateSnapshot(s.old)
}
s.iter.AppendStateSnapshot(state)
case OpDelete, OpReplaceDelete:
@ -120,6 +124,7 @@ func (s *Step) Apply() (resource.Status, error) {
if rst, err := prov.Delete(s.old.Type(), s.old.ID()); err != nil {
return rst, err
}
s.iter.MarkStateSnapshot(s.old)
case OpUpdate:
// Invoke the Update RPC function for this provider:
@ -140,6 +145,7 @@ func (s *Step) Apply() (resource.Status, error) {
}
s.outputs = outs
state := s.new.Update(id, outs)
s.iter.MarkStateSnapshot(s.old)
s.iter.AppendStateSnapshot(state)
default: