diff --git a/cmd/destroy.go b/cmd/destroy.go index 3b8fda494..5cb7f36c6 100644 --- a/cmd/destroy.go +++ b/cmd/destroy.go @@ -3,6 +3,7 @@ package cmd import ( + "context" "fmt" "os" @@ -77,7 +78,7 @@ func newDestroyCmd() *cobra.Command { } } - return s.Destroy(proj, root, m, engine.UpdateOptions{ + err = s.Destroy(proj, root, m, engine.UpdateOptions{ Analyzers: analyzers, Force: force, Preview: preview, @@ -90,7 +91,11 @@ func newDestroyCmd() *cobra.Command { ShowSameResources: showSames, DiffDisplay: diffDisplay, Debug: debug, - }) + }, cancellationScopes) + if err == context.Canceled { + return errors.New("destroy cancelled") + } + return err }), } diff --git a/cmd/refresh.go b/cmd/refresh.go index 7e21d37cd..6d838798d 100644 --- a/cmd/refresh.go +++ b/cmd/refresh.go @@ -3,6 +3,7 @@ package cmd import ( + "context" "os" "github.com/pkg/errors" @@ -68,7 +69,7 @@ func newRefreshCmd() *cobra.Command { return errors.Wrap(err, "gathering environment metadata") } - return s.Refresh(proj, root, m, engine.UpdateOptions{ + err = s.Refresh(proj, root, m, engine.UpdateOptions{ Analyzers: analyzers, Force: force, Preview: preview, @@ -81,7 +82,11 @@ func newRefreshCmd() *cobra.Command { ShowSameResources: showSames, DiffDisplay: diffDisplay, Debug: debug, - }) + }, cancellationScopes) + if err == context.Canceled { + return errors.New("refresh cancelled") + } + return err }), } diff --git a/cmd/update.go b/cmd/update.go index 529785747..e289182db 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -3,6 +3,7 @@ package cmd import ( + "context" "os" "github.com/pkg/errors" @@ -72,7 +73,7 @@ func newUpdateCmd() *cobra.Command { return errors.Wrap(err, "gathering environment metadata") } - return s.Update(proj, root, m, engine.UpdateOptions{ + err = s.Update(proj, root, m, engine.UpdateOptions{ Analyzers: analyzers, Force: force, Preview: preview, @@ -85,7 +86,11 @@ func newUpdateCmd() *cobra.Command { ShowSameResources: showSames, DiffDisplay: diffDisplay, Debug: debug, - }) + }, cancellationScopes) + if err == context.Canceled { + return errors.New("update cancelled") + } + return err }), } diff --git a/cmd/util.go b/cmd/util.go index f37799225..7a4f3282c 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -3,8 +3,10 @@ package cmd import ( + "context" "fmt" "os" + "os/signal" "os/user" "path" "path/filepath" @@ -22,8 +24,10 @@ import ( "github.com/pulumi/pulumi/pkg/backend/local" "github.com/pulumi/pulumi/pkg/backend/state" "github.com/pulumi/pulumi/pkg/diag/colors" + "github.com/pulumi/pulumi/pkg/engine" "github.com/pulumi/pulumi/pkg/resource/config" "github.com/pulumi/pulumi/pkg/tokens" + "github.com/pulumi/pulumi/pkg/util/cancel" "github.com/pulumi/pulumi/pkg/util/cmdutil" "github.com/pulumi/pulumi/pkg/util/contract" "github.com/pulumi/pulumi/pkg/util/fsutil" @@ -592,3 +596,67 @@ func removeOldProjectConfiguration() error { return workspace.SaveProject(proj) } + +type cancellationScope struct { + context *cancel.Context + sigint chan os.Signal +} + +func (s *cancellationScope) Context() *cancel.Context { + return s.context +} + +func (s *cancellationScope) Close() { + signal.Stop(s.sigint) + close(s.sigint) +} + +type cancellationScopeSource int + +var cancellationScopes = backend.CancellationScopeSource(cancellationScopeSource(0)) + +func (cancellationScopeSource) NewScope(events chan<- engine.Event, isPreview bool) backend.CancellationScope { + cancelContext, cancelSource := cancel.NewContext(context.Background()) + + c := &cancellationScope{ + context: cancelContext, + sigint: make(chan os.Signal), + } + + go func() { + for range c.sigint { + // If we haven't yet received a SIGINT, call the cancellation func. Otherwise call the termination + // func. + if cancelContext.CancelErr() == nil { + message := "^C received; cancelling. If you would like to terminate immediately, press ^C again.\n" + if !isPreview { + message += colors.BrightRed + "Note that terminating immediately may lead to orphaned resources " + + "and other inconsistencies.\n" + colors.Reset + } + events <- engine.Event{ + Type: engine.StdoutColorEvent, + Payload: engine.StdoutEventPayload{ + Message: message, + Color: colors.Always, + }, + } + + cancelSource.Cancel() + } else { + message := colors.BrightRed + "^C received; terminating" + colors.Reset + events <- engine.Event{ + Type: engine.StdoutColorEvent, + Payload: engine.StdoutEventPayload{ + Message: message, + Color: colors.Always, + }, + } + + cancelSource.Terminate() + } + } + }() + signal.Notify(c.sigint, os.Interrupt) + + return c +} diff --git a/pkg/backend/backend.go b/pkg/backend/backend.go index 351ee4038..945b03ee4 100644 --- a/pkg/backend/backend.go +++ b/pkg/backend/backend.go @@ -9,6 +9,7 @@ import ( "github.com/pulumi/pulumi/pkg/operations" "github.com/pulumi/pulumi/pkg/resource/config" "github.com/pulumi/pulumi/pkg/tokens" + "github.com/pulumi/pulumi/pkg/util/cancel" "github.com/pulumi/pulumi/pkg/workspace" ) @@ -34,13 +35,13 @@ type Backend interface { // Update updates the target stack with the current workspace's contents (config and code). Update(stackName tokens.QName, proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error + m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions, scopes CancellationScopeSource) error // Refresh refreshes the stack's state from the cloud provider. Refresh(stackName tokens.QName, proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error + m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions, scopes CancellationScopeSource) error // Destroy destroys all of this stack's resources. Destroy(stackName tokens.QName, proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error + m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions, scopes CancellationScopeSource) error // GetHistory returns all updates for the stack. The returned UpdateInfo slice will be in // descending order (newest first). @@ -55,3 +56,17 @@ type Backend interface { // Logout logs you out of the backend and removes any stored credentials. Logout() error } + +// CancellationScope provides a scoped source of cancellation and termination requests. +type CancellationScope interface { + // Context returns the cancellation context used to observe cancellation and termination requests for this scope. + Context() *cancel.Context + // Close closes the cancellation scope. + Close() +} + +// CancellationScopeSource provides a source for cancellation scopes. +type CancellationScopeSource interface { + // NewScope creates a new cancellation scope. + NewScope(events chan<- engine.Event, isPreview bool) CancellationScope +} diff --git a/pkg/backend/cloud/backend.go b/pkg/backend/cloud/backend.go index 7808c6192..8b29010bf 100644 --- a/pkg/backend/cloud/backend.go +++ b/pkg/backend/cloud/backend.go @@ -467,7 +467,7 @@ func (b *cloudBackend) PreviewThenPrompt( updateKind client.UpdateKind, stack backend.Stack, pkg *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, - displayOpts backend.DisplayOptions) error { + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { // create a channel to hear about the update events from the engine. this will be used so that // we can build up the diff display in case the user asks to see the details of the diff @@ -497,7 +497,7 @@ func (b *cloudBackend) PreviewThenPrompt( err := b.updateStack( updateKind, stack, pkg, root, m, - opts, displayOpts, eventsChannel, true /*dryRun*/) + opts, displayOpts, eventsChannel, true /*dryRun*/, scopes) if err != nil || opts.Preview { // if we're just previewing, then we can stop at this point. @@ -554,7 +554,7 @@ func (b *cloudBackend) PreviewThenPromptThenExecute( updateKind client.UpdateKind, stackName tokens.QName, pkg *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, - displayOpts backend.DisplayOptions) error { + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { // First get the stack. stack, err := getStack(b, stackName) @@ -565,7 +565,7 @@ func (b *cloudBackend) PreviewThenPromptThenExecute( if !opts.Force { // If we're not forcing, then preview the operation to the user and ask them if // they want to proceed. - err = b.PreviewThenPrompt(updateKind, stack, pkg, root, m, opts, displayOpts) + err = b.PreviewThenPrompt(updateKind, stack, pkg, root, m, opts, displayOpts, scopes) if err != nil || opts.Preview { return err } @@ -576,22 +576,28 @@ func (b *cloudBackend) PreviewThenPromptThenExecute( var unused chan engine.Event return b.updateStack( updateKind, stack, pkg, - root, m, opts, displayOpts, unused, false /*dryRun*/) + root, m, opts, displayOpts, unused, false /*dryRun*/, scopes) } func (b *cloudBackend) Update(stackName tokens.QName, pkg *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return b.PreviewThenPromptThenExecute(client.UpdateKindUpdate, stackName, pkg, root, m, opts, displayOpts) + m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions, + scopes backend.CancellationScopeSource) error { + + return b.PreviewThenPromptThenExecute(client.UpdateKindUpdate, stackName, pkg, root, m, opts, displayOpts, scopes) } func (b *cloudBackend) Refresh(stackName tokens.QName, pkg *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return b.PreviewThenPromptThenExecute(client.UpdateKindRefresh, stackName, pkg, root, m, opts, displayOpts) + m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions, + scopes backend.CancellationScopeSource) error { + + return b.PreviewThenPromptThenExecute(client.UpdateKindRefresh, stackName, pkg, root, m, opts, displayOpts, scopes) } func (b *cloudBackend) Destroy(stackName tokens.QName, pkg *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return b.PreviewThenPromptThenExecute(client.UpdateKindDestroy, stackName, pkg, root, m, opts, displayOpts) + m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions, + scopes backend.CancellationScopeSource) error { + + return b.PreviewThenPromptThenExecute(client.UpdateKindDestroy, stackName, pkg, root, m, opts, displayOpts, scopes) } func (b *cloudBackend) createAndStartUpdate( @@ -646,7 +652,8 @@ func (b *cloudBackend) createAndStartUpdate( func (b *cloudBackend) updateStack( action client.UpdateKind, stack backend.Stack, pkg *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, - displayOpts backend.DisplayOptions, callerEventsOpt chan<- engine.Event, dryRun bool) error { + displayOpts backend.DisplayOptions, callerEventsOpt chan<- engine.Event, dryRun bool, + scopes backend.CancellationScopeSource) error { // Print a banner so it's clear this is going to the cloud. actionLabel := getActionLabel(string(action), dryRun) @@ -684,7 +691,7 @@ func (b *cloudBackend) updateStack( if stack.(Stack).RunLocally() { return b.runEngineAction( action, stack.Name(), pkg, root, opts, displayOpts, - update, token, callerEventsOpt, dryRun) + update, token, callerEventsOpt, dryRun, scopes) } // Otherwise, wait for the update to complete while rendering its events to stdout/stderr. @@ -727,7 +734,7 @@ func (b *cloudBackend) runEngineAction( action client.UpdateKind, stackName tokens.QName, pkg *workspace.Project, root string, opts engine.UpdateOptions, displayOpts backend.DisplayOptions, update client.UpdateIdentifier, token string, - callerEventsOpt chan<- engine.Event, dryRun bool) error { + callerEventsOpt chan<- engine.Event, dryRun bool, scopes backend.CancellationScopeSource) error { u, err := b.newUpdate(stackName, pkg, root, update, token) if err != nil { @@ -741,6 +748,10 @@ func (b *cloudBackend) runEngineAction( getActionLabel(string(action), dryRun), displayEvents, displayDone, displayOpts) engineEvents := make(chan engine.Event) + + scope := scopes.NewScope(engineEvents, dryRun) + defer scope.Close() + go func() { // Pull in all events from the engine and send to them to the two listeners. for e := range engineEvents { @@ -754,17 +765,18 @@ func (b *cloudBackend) runEngineAction( // Depending on the action, kick off the relevant engine activity. Note that we don't immediately check and // return error conditions, because we will do so below after waiting for the display channels to close. + engineCtx := &engine.Context{Cancel: scope.Context(), Events: engineEvents} switch action { case client.UpdateKindUpdate: if dryRun { - err = engine.Preview(u, engineEvents, opts) + err = engine.Preview(u, engineCtx, opts) } else { - _, err = engine.Update(u, engineEvents, opts, dryRun) + _, err = engine.Update(u, engineCtx, opts, dryRun) } case client.UpdateKindRefresh: - _, err = engine.Refresh(u, engineEvents, opts, dryRun) + _, err = engine.Refresh(u, engineCtx, opts, dryRun) case client.UpdateKindDestroy: - _, err = engine.Destroy(u, engineEvents, opts, dryRun) + _, err = engine.Destroy(u, engineCtx, opts, dryRun) default: contract.Failf("Unrecognized action type: %s", action) } diff --git a/pkg/backend/cloud/stack.go b/pkg/backend/cloud/stack.go index 54d48f94f..5d7f426e5 100644 --- a/pkg/backend/cloud/stack.go +++ b/pkg/backend/cloud/stack.go @@ -86,19 +86,19 @@ func (s *cloudStack) Remove(force bool) (bool, error) { return backend.RemoveStack(s, force) } -func (s *cloudStack) Update(proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return backend.UpdateStack(s, proj, root, m, opts, displayOpts) +func (s *cloudStack) Update(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return backend.UpdateStack(s, proj, root, m, opts, displayOpts, scopes) } -func (s *cloudStack) Refresh(proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return backend.RefreshStack(s, proj, root, m, opts, displayOpts) +func (s *cloudStack) Refresh(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return backend.RefreshStack(s, proj, root, m, opts, displayOpts, scopes) } -func (s *cloudStack) Destroy(proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return backend.DestroyStack(s, proj, root, m, opts, displayOpts) +func (s *cloudStack) Destroy(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return backend.DestroyStack(s, proj, root, m, opts, displayOpts, scopes) } func (s *cloudStack) GetLogs(query operations.LogQuery) ([]operations.LogEntry, error) { diff --git a/pkg/backend/local/backend.go b/pkg/backend/local/backend.go index eb12cbd7d..f449f39c8 100644 --- a/pkg/backend/local/backend.go +++ b/pkg/backend/local/backend.go @@ -138,8 +138,8 @@ func (b *localBackend) GetStackCrypter(stackName tokens.QName) (config.Crypter, } func (b *localBackend) Update( - stackName tokens.QName, proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { + stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { // The Pulumi Service will pick up changes to a stack's tags on each update. (e.g. changing the description // in Pulumi.yaml.) While this isn't necessary for local updates, we do the validation here to keep @@ -153,27 +153,32 @@ func (b *localBackend) Update( } return b.performEngineOp("updating", backend.DeployUpdate, - stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Update) + stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Update, scopes) } -func (b *localBackend) Refresh(stackName tokens.QName, proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { +func (b *localBackend) Refresh( + stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return b.performEngineOp("refreshing", backend.RefreshUpdate, - stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Refresh) + stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Refresh, scopes) } -func (b *localBackend) Destroy(stackName tokens.QName, proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { +func (b *localBackend) Destroy(stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata, + opts engine.UpdateOptions, displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return b.performEngineOp("destroying", backend.DestroyUpdate, - stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Destroy) + stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Destroy, scopes) } type engineOpFunc func( - engine.UpdateInfo, chan<- engine.Event, engine.UpdateOptions, bool) (engine.ResourceChanges, error) + engine.UpdateInfo, *engine.Context, engine.UpdateOptions, bool) (engine.ResourceChanges, error) func (b *localBackend) performEngineOp(op string, kind backend.UpdateKind, stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata, - opts engine.UpdateOptions, displayOpts backend.DisplayOptions, dryRun bool, performEngineOp engineOpFunc) error { + opts engine.UpdateOptions, displayOpts backend.DisplayOptions, dryRun bool, performEngineOp engineOpFunc, + scopes backend.CancellationScopeSource) error { + if !opts.Force && !dryRun { return errors.Errorf("--force or --preview must be passed when %s a local stack", op) } @@ -184,13 +189,18 @@ func (b *localBackend) performEngineOp(op string, kind backend.UpdateKind, } events := make(chan engine.Event) + + cancelScope := scopes.NewScope(events, dryRun) + defer cancelScope.Close() + done := make(chan bool) go DisplayEvents(op, events, done, displayOpts) // Perform the update start := time.Now().Unix() - changes, updateErr := performEngineOp(update, events, opts, dryRun) + engineCtx := &engine.Context{Cancel: cancelScope.Context(), Events: events} + changes, updateErr := performEngineOp(update, engineCtx, opts, dryRun) end := time.Now().Unix() <-done diff --git a/pkg/backend/local/stack.go b/pkg/backend/local/stack.go index 9902ab89a..d3a5c23db 100644 --- a/pkg/backend/local/stack.go +++ b/pkg/backend/local/stack.go @@ -49,19 +49,19 @@ func (s *localStack) Remove(force bool) (bool, error) { return backend.RemoveStack(s, force) } -func (s *localStack) Update(proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return backend.UpdateStack(s, proj, root, m, opts, displayOpts) +func (s *localStack) Update(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return backend.UpdateStack(s, proj, root, m, opts, displayOpts, scopes) } -func (s *localStack) Refresh(proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return backend.RefreshStack(s, proj, root, m, opts, displayOpts) +func (s *localStack) Refresh(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return backend.RefreshStack(s, proj, root, m, opts, displayOpts, scopes) } -func (s *localStack) Destroy(proj *workspace.Project, root string, - m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error { - return backend.DestroyStack(s, proj, root, m, opts, displayOpts) +func (s *localStack) Destroy(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions, + displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error { + return backend.DestroyStack(s, proj, root, m, opts, displayOpts, scopes) } func (s *localStack) GetLogs(query operations.LogQuery) ([]operations.LogEntry, error) { diff --git a/pkg/backend/stack.go b/pkg/backend/stack.go index 06e0aee31..39aab053d 100644 --- a/pkg/backend/stack.go +++ b/pkg/backend/stack.go @@ -24,14 +24,14 @@ type Stack interface { Backend() Backend // the backend this stack belongs to. // Update this stack. - Update(proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error + Update(proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions, + displayOpts DisplayOptions, scopes CancellationScopeSource) error // Refresh this stack's state from the cloud provider. - Refresh(proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error + Refresh(proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions, + displayOpts DisplayOptions, scopes CancellationScopeSource) error // Destroy this stack's resources. - Destroy(proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error + Destroy(proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions, + displayOpts DisplayOptions, scopes CancellationScopeSource) error Remove(force bool) (bool, error) // remove this stack. GetLogs(query operations.LogQuery) ([]operations.LogEntry, error) // list log entries for this stack. @@ -45,21 +45,21 @@ func RemoveStack(s Stack, force bool) (bool, error) { } // UpdateStack updates the target stack with the current workspace's contents (config and code). -func UpdateStack(s Stack, proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error { - return s.Backend().Update(s.Name(), proj, root, m, opts, displayOpts) +func UpdateStack(s Stack, proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions, + displayOpts DisplayOptions, scopes CancellationScopeSource) error { + return s.Backend().Update(s.Name(), proj, root, m, opts, displayOpts, scopes) } // RefreshStack refresh's the stack's state from the cloud provider. -func RefreshStack(s Stack, proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error { - return s.Backend().Refresh(s.Name(), proj, root, m, opts, displayOpts) +func RefreshStack(s Stack, proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions, + displayOpts DisplayOptions, scopes CancellationScopeSource) error { + return s.Backend().Refresh(s.Name(), proj, root, m, opts, displayOpts, scopes) } // DestroyStack destroys all of this stack's resources. -func DestroyStack(s Stack, proj *workspace.Project, root string, - m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error { - return s.Backend().Destroy(s.Name(), proj, root, m, opts, displayOpts) +func DestroyStack(s Stack, proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions, + displayOpts DisplayOptions, scopes CancellationScopeSource) error { + return s.Backend().Destroy(s.Name(), proj, root, m, opts, displayOpts, scopes) } // GetStackCrypter fetches the encrypter/decrypter for a stack. diff --git a/pkg/engine/destroy.go b/pkg/engine/destroy.go index 9f15cf1a4..ede6c4cc5 100644 --- a/pkg/engine/destroy.go +++ b/pkg/engine/destroy.go @@ -10,20 +10,20 @@ import ( ) func Destroy( - u UpdateInfo, events chan<- Event, opts UpdateOptions, dryRun bool) (ResourceChanges, error) { + u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, error) { contract.Require(u != nil, "u") - defer func() { events <- cancelEvent() }() + defer func() { ctx.Events <- cancelEvent() }() - ctx, err := newPlanContext(u) + info, err := newPlanContext(u) if err != nil { return nil, err } - defer ctx.Close() + defer info.Close() - emitter := makeEventEmitter(events, u) - return update(ctx, planOptions{ + emitter := makeEventEmitter(ctx.Events, u) + return update(ctx, info, planOptions{ UpdateOptions: opts, SourceFunc: newDestroySource, Events: emitter, diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 905313f43..abc204f7f 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -4,6 +4,7 @@ package engine import ( "github.com/pulumi/pulumi/pkg/resource/deploy" + "github.com/pulumi/pulumi/pkg/util/cancel" "github.com/pulumi/pulumi/pkg/workspace" ) @@ -39,3 +40,9 @@ type SnapshotMutation interface { // persisted. See the comments on SnapshotProvider.BeginMutation for more details. End(snapshot *deploy.Snapshot) error } + +// Context provides cancellation, termination, and eventing options for an engine operation. +type Context struct { + Cancel *cancel.Context + Events chan<- Event +} diff --git a/pkg/engine/plan.go b/pkg/engine/plan.go index 1bb105b52..d4261de1f 100644 --- a/pkg/engine/plan.go +++ b/pkg/engine/plan.go @@ -157,7 +157,7 @@ func (res *planResult) Chdir() (func(), error) { // Walk enumerates all steps in the plan, calling out to the provided action at each step. It returns four things: the // resulting Snapshot, no matter whether an error occurs or not; an error, if something went wrong; the step that // failed, if the error is non-nil; and finally the state of the resource modified in the failing step. -func (res *planResult) Walk(events deploy.Events, preview bool) (deploy.PlanSummary, +func (res *planResult) Walk(ctx *Context, events deploy.Events, preview bool) (deploy.PlanSummary, deploy.Step, resource.Status, error) { opts := deploy.Options{ Events: events, @@ -172,33 +172,57 @@ func (res *planResult) Walk(events deploy.Events, preview bool) (deploy.PlanSumm step, err := iter.Next() if err != nil { - closeerr := iter.Close() // ignore close errors; the Next error trumps - contract.IgnoreError(closeerr) + closeErr := iter.Close() // ignore close errors; the Next error trumps + contract.IgnoreError(closeErr) return nil, nil, resource.StatusOK, err } - for step != nil { - // Perform any per-step actions. - rst, err := iter.Apply(step, preview) + // Iterate the plan in a goroutine while listening for termination. + var rst resource.Status + done := make(chan bool) + go func() { + defer func() { + // Close the iterator. If we have already observed another error, that error trumps the close error. + closeErr := iter.Close() + if err == nil { + err = closeErr + } + close(done) + }() - // If an error occurred, exit early. - if err != nil { - closeerr := iter.Close() // ignore close errors; the action error trumps - contract.IgnoreError(closeerr) - return iter, step, rst, err - } - contract.Assert(rst == resource.StatusOK) + for step != nil { + // Check for cancellation and termination. + if cancelErr := ctx.Cancel.CancelErr(); cancelErr != nil { + rst, err = resource.StatusOK, cancelErr + return + } - step, err = iter.Next() - if err != nil { - closeerr := iter.Close() // ignore close errors; the action error trumps - contract.IgnoreError(closeerr) - return iter, step, resource.StatusOK, err + // Perform any per-step actions. + rst, err = iter.Apply(step, preview) + + // If an error occurred, exit early. + if err != nil { + return + } + contract.Assert(rst == resource.StatusOK) + + step, err = iter.Next() + if err != nil { + return + } } + + // Finally, return a summary and the resulting plan information. + rst, err = resource.StatusOK, nil + }() + + select { + case <-ctx.Cancel.Terminated(): + return iter, step, rst, ctx.Cancel.TerminateErr() + + case <-done: + return iter, step, rst, err } - - // Finally, return a summary and the resulting plan information. - return iter, nil, resource.StatusOK, iter.Close() } func (res *planResult) Close() error { @@ -206,12 +230,12 @@ func (res *planResult) Close() error { } // printPlan prints the plan's result to the plan's Options.Events stream. -func printPlan(result *planResult, dryRun bool) (ResourceChanges, error) { +func printPlan(ctx *Context, result *planResult, dryRun bool) (ResourceChanges, error) { result.Options.Events.preludeEvent(dryRun, result.Ctx.Update.GetTarget().Config) // Walk the plan's steps and and pretty-print them out. actions := newPreviewActions(result.Options) - _, _, _, err := result.Walk(actions, true) + _, _, _, err := result.Walk(ctx, actions, true) if err != nil { return nil, errors.New("an error occurred while advancing the preview") } diff --git a/pkg/engine/preview.go b/pkg/engine/preview.go index 810db5f3c..51a95cd4c 100644 --- a/pkg/engine/preview.go +++ b/pkg/engine/preview.go @@ -9,20 +9,20 @@ import ( "github.com/pulumi/pulumi/pkg/util/contract" ) -func Preview(u UpdateInfo, events chan<- Event, opts UpdateOptions) error { +func Preview(u UpdateInfo, ctx *Context, opts UpdateOptions) error { contract.Require(u != nil, "u") - contract.Require(events != nil, "events") + contract.Require(ctx != nil, "ctx") - defer func() { events <- cancelEvent() }() + defer func() { ctx.Events <- cancelEvent() }() - ctx, err := newPlanContext(u) + info, err := newPlanContext(u) if err != nil { return err } - defer ctx.Close() + defer info.Close() - emitter := makeEventEmitter(events, u) - return preview(ctx, planOptions{ + emitter := makeEventEmitter(ctx.Events, u) + return preview(ctx, info, planOptions{ UpdateOptions: opts, SourceFunc: newUpdateSource, Events: emitter, @@ -30,8 +30,8 @@ func Preview(u UpdateInfo, events chan<- Event, opts UpdateOptions) error { }) } -func preview(ctx *planContext, opts planOptions) error { - result, err := plan(ctx, opts, true /*dryRun*/) +func preview(ctx *Context, info *planContext, opts planOptions) error { + result, err := plan(info, opts, true /*dryRun*/) if err != nil { return err } @@ -45,7 +45,7 @@ func preview(ctx *planContext, opts planOptions) error { } defer done() - if _, err := printPlan(result, true /*dryRun*/); err != nil { + if _, err := printPlan(ctx, result, true /*dryRun*/); err != nil { return err } } diff --git a/pkg/engine/refresh.go b/pkg/engine/refresh.go index 4786fe86b..c98001538 100644 --- a/pkg/engine/refresh.go +++ b/pkg/engine/refresh.go @@ -9,19 +9,20 @@ import ( "github.com/pulumi/pulumi/pkg/workspace" ) -func Refresh(u UpdateInfo, events chan<- Event, opts UpdateOptions, dryRun bool) (ResourceChanges, error) { +func Refresh(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, error) { + contract.Require(u != nil, "u") - defer func() { events <- cancelEvent() }() + defer func() { ctx.Events <- cancelEvent() }() - ctx, err := newPlanContext(u) + info, err := newPlanContext(u) if err != nil { return nil, err } - defer ctx.Close() + defer info.Close() - emitter := makeEventEmitter(events, u) - return update(ctx, planOptions{ + emitter := makeEventEmitter(ctx.Events, u) + return update(ctx, info, planOptions{ UpdateOptions: opts, SkipOutputs: true, // refresh is exclusively about outputs SourceFunc: newRefreshSource, diff --git a/pkg/engine/update.go b/pkg/engine/update.go index 9e47a7af2..cc2afcc55 100644 --- a/pkg/engine/update.go +++ b/pkg/engine/update.go @@ -35,22 +35,20 @@ type UpdateOptions struct { // ResourceChanges contains the aggregate resource changes by operation type. type ResourceChanges map[deploy.StepOp]int -func Update( - u UpdateInfo, events chan<- Event, opts UpdateOptions, dryRun bool) (ResourceChanges, error) { - +func Update(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, error) { contract.Require(u != nil, "update") - contract.Require(events != nil, "events") + contract.Require(ctx != nil, "ctx") - defer func() { events <- cancelEvent() }() + defer func() { ctx.Events <- cancelEvent() }() - ctx, err := newPlanContext(u) + info, err := newPlanContext(u) if err != nil { return nil, err } - defer ctx.Close() + defer info.Close() - emitter := makeEventEmitter(events, u) - return update(ctx, planOptions{ + emitter := makeEventEmitter(ctx.Events, u) + return update(ctx, info, planOptions{ UpdateOptions: opts, SourceFunc: newUpdateSource, Events: emitter, @@ -87,7 +85,7 @@ func newUpdateSource( }, dryRun), nil } -func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges, error) { +func update(ctx *Context, info *planContext, opts planOptions, dryRun bool) (ResourceChanges, error) { result, err := plan(info, opts, dryRun) if err != nil { return nil, err @@ -106,7 +104,7 @@ func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges, if dryRun { // If a dry run, just print the plan, don't actually carry out the deployment. - resourceChanges, err = printPlan(result, dryRun) + resourceChanges, err = printPlan(ctx, result, dryRun) if err != nil { return resourceChanges, err } @@ -116,8 +114,8 @@ func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges, // Walk the plan, reporting progress and executing the actual operations as we go. start := time.Now() - actions := newUpdateActions(info.Update, opts) - summary, _, _, err := result.Walk(actions, false) + actions := newUpdateActions(ctx, info.Update, opts) + summary, _, _, err := result.Walk(ctx, actions, false) if err != nil && summary == nil { // Something went wrong, and no changes were made. return resourceChanges, err @@ -139,6 +137,7 @@ func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges, // updateActions pretty-prints the plan application process as it goes. type updateActions struct { + Context *Context Steps int Ops map[deploy.StepOp]int Seen map[resource.URN]deploy.Step @@ -147,12 +146,13 @@ type updateActions struct { Opts planOptions } -func newUpdateActions(u UpdateInfo, opts planOptions) *updateActions { +func newUpdateActions(context *Context, u UpdateInfo, opts planOptions) *updateActions { return &updateActions{ - Ops: make(map[deploy.StepOp]int), - Seen: make(map[resource.URN]deploy.Step), - Update: u, - Opts: opts, + Context: context, + Ops: make(map[deploy.StepOp]int), + Seen: make(map[resource.URN]deploy.Step), + Update: u, + Opts: opts, } } @@ -171,6 +171,12 @@ func (acts *updateActions) OnResourceStepPost(ctx interface{}, assertSeen(acts.Seen, step) + // If we've already been terminated, exit without writing the checkpoint. We explicitly want to leave the + // checkpoint in an inconsistent state in this event. + if acts.Context.Cancel.TerminateErr() != nil { + return nil + } + // Report the result of the step. stepop := step.Op() if err != nil { diff --git a/pkg/util/cancel/context.go b/pkg/util/cancel/context.go new file mode 100644 index 000000000..61f15de09 --- /dev/null +++ b/pkg/util/cancel/context.go @@ -0,0 +1,83 @@ +package cancel + +import ( + "context" + + "github.com/pulumi/pulumi/pkg/util/contract" +) + +// Context provides the ability to observe cancellation and termination requests from a Source. A termination request +// automatically triggers a corresponding cancellation request. This can be used to implement cancellation with two +// priority levels. +type Context struct { + terminate context.Context + cancel context.Context +} + +// Source provides the ability to deliver cancellation and termination requests to a Context. A termination request +// automatically triggers a corresponding cancellation request. This can be used to implement cancellation with two +// priority levels. +type Source struct { + context *Context + + terminate context.CancelFunc + cancel context.CancelFunc +} + +// NewContext creates a new cancellation context and source parented to the given context. The returned cancellation +// context will be terminated when the supplied root context is canceled. +func NewContext(ctx context.Context) (*Context, *Source) { + contract.Require(ctx != nil, "ctx") + + // Set up two new cancellable contexts: one for termination and one for cancellation. The cancellation context is a + // child context of the termination context and will therefore be automatically cancelled when termination is + // requested. Both are children of the supplied context--cancelling the supplied context will cause termination. + terminationContext, terminate := context.WithCancel(ctx) + cancellationContext, cancel := context.WithCancel(terminationContext) + + c := &Context{ + terminate: terminationContext, + cancel: cancellationContext, + } + s := &Source{ + context: c, + terminate: terminate, + cancel: cancel, + } + return c, s +} + +// Canceled returns a channel that will be closed when the context is canceled or terminated. +func (c *Context) Canceled() <-chan struct{} { + return c.cancel.Done() +} + +// CancelErr returns a non-nil error iff the context has been canceled or terminated. +func (c *Context) CancelErr() error { + return c.cancel.Err() +} + +// Terminated returns a channel that will be closed when the context is terminated. +func (c *Context) Terminated() <-chan struct{} { + return c.terminate.Done() +} + +// TerminateErr returns a non-nil error iff the context has been terminated. +func (c *Context) TerminateErr() error { + return c.terminate.Err() +} + +// Context returns the Context to which this source will deliver cancellation and termination requests. +func (s *Source) Context() *Context { + return s.context +} + +// Cancel cancels this source's context. +func (s *Source) Cancel() { + s.cancel() +} + +// Terminate terminates this source's context (which also cancels this context). +func (s *Source) Terminate() { + s.terminate() +}