Plumb basic cancellation through the engine. (#1231)

hese changes plumb basic support for cancellation through the engine.
Two types of cancellation are supported for all engine operations:
- Cancellation, which waits for the operation to drive itself to a safe
  point before the operation returns, and
- Termination, which does not wait for the operation to drive itself
  to a safe opint for the operation returns.

When updating local or managed stacks, a single ^C triggers cancellation
of any running operation; a second ^C will trigger termination.

Fixes #513, #1077.
This commit is contained in:
Pat Gavlin 2018-04-19 18:59:14 -07:00 committed by GitHub
parent e8485c2388
commit 4fa69bfd72
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 376 additions and 135 deletions

View file

@ -3,6 +3,7 @@
package cmd
import (
"context"
"fmt"
"os"
@ -77,7 +78,7 @@ func newDestroyCmd() *cobra.Command {
}
}
return s.Destroy(proj, root, m, engine.UpdateOptions{
err = s.Destroy(proj, root, m, engine.UpdateOptions{
Analyzers: analyzers,
Force: force,
Preview: preview,
@ -90,7 +91,11 @@ func newDestroyCmd() *cobra.Command {
ShowSameResources: showSames,
DiffDisplay: diffDisplay,
Debug: debug,
})
}, cancellationScopes)
if err == context.Canceled {
return errors.New("destroy cancelled")
}
return err
}),
}

View file

@ -3,6 +3,7 @@
package cmd
import (
"context"
"os"
"github.com/pkg/errors"
@ -68,7 +69,7 @@ func newRefreshCmd() *cobra.Command {
return errors.Wrap(err, "gathering environment metadata")
}
return s.Refresh(proj, root, m, engine.UpdateOptions{
err = s.Refresh(proj, root, m, engine.UpdateOptions{
Analyzers: analyzers,
Force: force,
Preview: preview,
@ -81,7 +82,11 @@ func newRefreshCmd() *cobra.Command {
ShowSameResources: showSames,
DiffDisplay: diffDisplay,
Debug: debug,
})
}, cancellationScopes)
if err == context.Canceled {
return errors.New("refresh cancelled")
}
return err
}),
}

View file

@ -3,6 +3,7 @@
package cmd
import (
"context"
"os"
"github.com/pkg/errors"
@ -72,7 +73,7 @@ func newUpdateCmd() *cobra.Command {
return errors.Wrap(err, "gathering environment metadata")
}
return s.Update(proj, root, m, engine.UpdateOptions{
err = s.Update(proj, root, m, engine.UpdateOptions{
Analyzers: analyzers,
Force: force,
Preview: preview,
@ -85,7 +86,11 @@ func newUpdateCmd() *cobra.Command {
ShowSameResources: showSames,
DiffDisplay: diffDisplay,
Debug: debug,
})
}, cancellationScopes)
if err == context.Canceled {
return errors.New("update cancelled")
}
return err
}),
}

View file

@ -3,8 +3,10 @@
package cmd
import (
"context"
"fmt"
"os"
"os/signal"
"os/user"
"path"
"path/filepath"
@ -22,8 +24,10 @@ import (
"github.com/pulumi/pulumi/pkg/backend/local"
"github.com/pulumi/pulumi/pkg/backend/state"
"github.com/pulumi/pulumi/pkg/diag/colors"
"github.com/pulumi/pulumi/pkg/engine"
"github.com/pulumi/pulumi/pkg/resource/config"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/cancel"
"github.com/pulumi/pulumi/pkg/util/cmdutil"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/util/fsutil"
@ -592,3 +596,67 @@ func removeOldProjectConfiguration() error {
return workspace.SaveProject(proj)
}
type cancellationScope struct {
context *cancel.Context
sigint chan os.Signal
}
func (s *cancellationScope) Context() *cancel.Context {
return s.context
}
func (s *cancellationScope) Close() {
signal.Stop(s.sigint)
close(s.sigint)
}
type cancellationScopeSource int
var cancellationScopes = backend.CancellationScopeSource(cancellationScopeSource(0))
func (cancellationScopeSource) NewScope(events chan<- engine.Event, isPreview bool) backend.CancellationScope {
cancelContext, cancelSource := cancel.NewContext(context.Background())
c := &cancellationScope{
context: cancelContext,
sigint: make(chan os.Signal),
}
go func() {
for range c.sigint {
// If we haven't yet received a SIGINT, call the cancellation func. Otherwise call the termination
// func.
if cancelContext.CancelErr() == nil {
message := "^C received; cancelling. If you would like to terminate immediately, press ^C again.\n"
if !isPreview {
message += colors.BrightRed + "Note that terminating immediately may lead to orphaned resources " +
"and other inconsistencies.\n" + colors.Reset
}
events <- engine.Event{
Type: engine.StdoutColorEvent,
Payload: engine.StdoutEventPayload{
Message: message,
Color: colors.Always,
},
}
cancelSource.Cancel()
} else {
message := colors.BrightRed + "^C received; terminating" + colors.Reset
events <- engine.Event{
Type: engine.StdoutColorEvent,
Payload: engine.StdoutEventPayload{
Message: message,
Color: colors.Always,
},
}
cancelSource.Terminate()
}
}
}()
signal.Notify(c.sigint, os.Interrupt)
return c
}

View file

@ -9,6 +9,7 @@ import (
"github.com/pulumi/pulumi/pkg/operations"
"github.com/pulumi/pulumi/pkg/resource/config"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/cancel"
"github.com/pulumi/pulumi/pkg/workspace"
)
@ -34,13 +35,13 @@ type Backend interface {
// Update updates the target stack with the current workspace's contents (config and code).
Update(stackName tokens.QName, proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions, scopes CancellationScopeSource) error
// Refresh refreshes the stack's state from the cloud provider.
Refresh(stackName tokens.QName, proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions, scopes CancellationScopeSource) error
// Destroy destroys all of this stack's resources.
Destroy(stackName tokens.QName, proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions, scopes CancellationScopeSource) error
// GetHistory returns all updates for the stack. The returned UpdateInfo slice will be in
// descending order (newest first).
@ -55,3 +56,17 @@ type Backend interface {
// Logout logs you out of the backend and removes any stored credentials.
Logout() error
}
// CancellationScope provides a scoped source of cancellation and termination requests.
type CancellationScope interface {
// Context returns the cancellation context used to observe cancellation and termination requests for this scope.
Context() *cancel.Context
// Close closes the cancellation scope.
Close()
}
// CancellationScopeSource provides a source for cancellation scopes.
type CancellationScopeSource interface {
// NewScope creates a new cancellation scope.
NewScope(events chan<- engine.Event, isPreview bool) CancellationScope
}

View file

@ -467,7 +467,7 @@ func (b *cloudBackend) PreviewThenPrompt(
updateKind client.UpdateKind,
stack backend.Stack, pkg *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions) error {
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
// create a channel to hear about the update events from the engine. this will be used so that
// we can build up the diff display in case the user asks to see the details of the diff
@ -497,7 +497,7 @@ func (b *cloudBackend) PreviewThenPrompt(
err := b.updateStack(
updateKind, stack, pkg, root, m,
opts, displayOpts, eventsChannel, true /*dryRun*/)
opts, displayOpts, eventsChannel, true /*dryRun*/, scopes)
if err != nil || opts.Preview {
// if we're just previewing, then we can stop at this point.
@ -554,7 +554,7 @@ func (b *cloudBackend) PreviewThenPromptThenExecute(
updateKind client.UpdateKind,
stackName tokens.QName, pkg *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions) error {
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
// First get the stack.
stack, err := getStack(b, stackName)
@ -565,7 +565,7 @@ func (b *cloudBackend) PreviewThenPromptThenExecute(
if !opts.Force {
// If we're not forcing, then preview the operation to the user and ask them if
// they want to proceed.
err = b.PreviewThenPrompt(updateKind, stack, pkg, root, m, opts, displayOpts)
err = b.PreviewThenPrompt(updateKind, stack, pkg, root, m, opts, displayOpts, scopes)
if err != nil || opts.Preview {
return err
}
@ -576,22 +576,28 @@ func (b *cloudBackend) PreviewThenPromptThenExecute(
var unused chan engine.Event
return b.updateStack(
updateKind, stack, pkg,
root, m, opts, displayOpts, unused, false /*dryRun*/)
root, m, opts, displayOpts, unused, false /*dryRun*/, scopes)
}
func (b *cloudBackend) Update(stackName tokens.QName, pkg *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return b.PreviewThenPromptThenExecute(client.UpdateKindUpdate, stackName, pkg, root, m, opts, displayOpts)
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions,
scopes backend.CancellationScopeSource) error {
return b.PreviewThenPromptThenExecute(client.UpdateKindUpdate, stackName, pkg, root, m, opts, displayOpts, scopes)
}
func (b *cloudBackend) Refresh(stackName tokens.QName, pkg *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return b.PreviewThenPromptThenExecute(client.UpdateKindRefresh, stackName, pkg, root, m, opts, displayOpts)
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions,
scopes backend.CancellationScopeSource) error {
return b.PreviewThenPromptThenExecute(client.UpdateKindRefresh, stackName, pkg, root, m, opts, displayOpts, scopes)
}
func (b *cloudBackend) Destroy(stackName tokens.QName, pkg *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return b.PreviewThenPromptThenExecute(client.UpdateKindDestroy, stackName, pkg, root, m, opts, displayOpts)
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions,
scopes backend.CancellationScopeSource) error {
return b.PreviewThenPromptThenExecute(client.UpdateKindDestroy, stackName, pkg, root, m, opts, displayOpts, scopes)
}
func (b *cloudBackend) createAndStartUpdate(
@ -646,7 +652,8 @@ func (b *cloudBackend) createAndStartUpdate(
func (b *cloudBackend) updateStack(
action client.UpdateKind, stack backend.Stack, pkg *workspace.Project,
root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, callerEventsOpt chan<- engine.Event, dryRun bool) error {
displayOpts backend.DisplayOptions, callerEventsOpt chan<- engine.Event, dryRun bool,
scopes backend.CancellationScopeSource) error {
// Print a banner so it's clear this is going to the cloud.
actionLabel := getActionLabel(string(action), dryRun)
@ -684,7 +691,7 @@ func (b *cloudBackend) updateStack(
if stack.(Stack).RunLocally() {
return b.runEngineAction(
action, stack.Name(), pkg, root, opts, displayOpts,
update, token, callerEventsOpt, dryRun)
update, token, callerEventsOpt, dryRun, scopes)
}
// Otherwise, wait for the update to complete while rendering its events to stdout/stderr.
@ -727,7 +734,7 @@ func (b *cloudBackend) runEngineAction(
action client.UpdateKind, stackName tokens.QName, pkg *workspace.Project,
root string, opts engine.UpdateOptions, displayOpts backend.DisplayOptions,
update client.UpdateIdentifier, token string,
callerEventsOpt chan<- engine.Event, dryRun bool) error {
callerEventsOpt chan<- engine.Event, dryRun bool, scopes backend.CancellationScopeSource) error {
u, err := b.newUpdate(stackName, pkg, root, update, token)
if err != nil {
@ -741,6 +748,10 @@ func (b *cloudBackend) runEngineAction(
getActionLabel(string(action), dryRun), displayEvents, displayDone, displayOpts)
engineEvents := make(chan engine.Event)
scope := scopes.NewScope(engineEvents, dryRun)
defer scope.Close()
go func() {
// Pull in all events from the engine and send to them to the two listeners.
for e := range engineEvents {
@ -754,17 +765,18 @@ func (b *cloudBackend) runEngineAction(
// Depending on the action, kick off the relevant engine activity. Note that we don't immediately check and
// return error conditions, because we will do so below after waiting for the display channels to close.
engineCtx := &engine.Context{Cancel: scope.Context(), Events: engineEvents}
switch action {
case client.UpdateKindUpdate:
if dryRun {
err = engine.Preview(u, engineEvents, opts)
err = engine.Preview(u, engineCtx, opts)
} else {
_, err = engine.Update(u, engineEvents, opts, dryRun)
_, err = engine.Update(u, engineCtx, opts, dryRun)
}
case client.UpdateKindRefresh:
_, err = engine.Refresh(u, engineEvents, opts, dryRun)
_, err = engine.Refresh(u, engineCtx, opts, dryRun)
case client.UpdateKindDestroy:
_, err = engine.Destroy(u, engineEvents, opts, dryRun)
_, err = engine.Destroy(u, engineCtx, opts, dryRun)
default:
contract.Failf("Unrecognized action type: %s", action)
}

View file

@ -86,19 +86,19 @@ func (s *cloudStack) Remove(force bool) (bool, error) {
return backend.RemoveStack(s, force)
}
func (s *cloudStack) Update(proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return backend.UpdateStack(s, proj, root, m, opts, displayOpts)
func (s *cloudStack) Update(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return backend.UpdateStack(s, proj, root, m, opts, displayOpts, scopes)
}
func (s *cloudStack) Refresh(proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return backend.RefreshStack(s, proj, root, m, opts, displayOpts)
func (s *cloudStack) Refresh(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return backend.RefreshStack(s, proj, root, m, opts, displayOpts, scopes)
}
func (s *cloudStack) Destroy(proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return backend.DestroyStack(s, proj, root, m, opts, displayOpts)
func (s *cloudStack) Destroy(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return backend.DestroyStack(s, proj, root, m, opts, displayOpts, scopes)
}
func (s *cloudStack) GetLogs(query operations.LogQuery) ([]operations.LogEntry, error) {

View file

@ -138,8 +138,8 @@ func (b *localBackend) GetStackCrypter(stackName tokens.QName) (config.Crypter,
}
func (b *localBackend) Update(
stackName tokens.QName, proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
// The Pulumi Service will pick up changes to a stack's tags on each update. (e.g. changing the description
// in Pulumi.yaml.) While this isn't necessary for local updates, we do the validation here to keep
@ -153,27 +153,32 @@ func (b *localBackend) Update(
}
return b.performEngineOp("updating", backend.DeployUpdate,
stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Update)
stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Update, scopes)
}
func (b *localBackend) Refresh(stackName tokens.QName, proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
func (b *localBackend) Refresh(
stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return b.performEngineOp("refreshing", backend.RefreshUpdate,
stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Refresh)
stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Refresh, scopes)
}
func (b *localBackend) Destroy(stackName tokens.QName, proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
func (b *localBackend) Destroy(stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata,
opts engine.UpdateOptions, displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return b.performEngineOp("destroying", backend.DestroyUpdate,
stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Destroy)
stackName, proj, root, m, opts, displayOpts, opts.Preview, engine.Destroy, scopes)
}
type engineOpFunc func(
engine.UpdateInfo, chan<- engine.Event, engine.UpdateOptions, bool) (engine.ResourceChanges, error)
engine.UpdateInfo, *engine.Context, engine.UpdateOptions, bool) (engine.ResourceChanges, error)
func (b *localBackend) performEngineOp(op string, kind backend.UpdateKind,
stackName tokens.QName, proj *workspace.Project, root string, m backend.UpdateMetadata,
opts engine.UpdateOptions, displayOpts backend.DisplayOptions, dryRun bool, performEngineOp engineOpFunc) error {
opts engine.UpdateOptions, displayOpts backend.DisplayOptions, dryRun bool, performEngineOp engineOpFunc,
scopes backend.CancellationScopeSource) error {
if !opts.Force && !dryRun {
return errors.Errorf("--force or --preview must be passed when %s a local stack", op)
}
@ -184,13 +189,18 @@ func (b *localBackend) performEngineOp(op string, kind backend.UpdateKind,
}
events := make(chan engine.Event)
cancelScope := scopes.NewScope(events, dryRun)
defer cancelScope.Close()
done := make(chan bool)
go DisplayEvents(op, events, done, displayOpts)
// Perform the update
start := time.Now().Unix()
changes, updateErr := performEngineOp(update, events, opts, dryRun)
engineCtx := &engine.Context{Cancel: cancelScope.Context(), Events: events}
changes, updateErr := performEngineOp(update, engineCtx, opts, dryRun)
end := time.Now().Unix()
<-done

View file

@ -49,19 +49,19 @@ func (s *localStack) Remove(force bool) (bool, error) {
return backend.RemoveStack(s, force)
}
func (s *localStack) Update(proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return backend.UpdateStack(s, proj, root, m, opts, displayOpts)
func (s *localStack) Update(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return backend.UpdateStack(s, proj, root, m, opts, displayOpts, scopes)
}
func (s *localStack) Refresh(proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return backend.RefreshStack(s, proj, root, m, opts, displayOpts)
func (s *localStack) Refresh(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return backend.RefreshStack(s, proj, root, m, opts, displayOpts, scopes)
}
func (s *localStack) Destroy(proj *workspace.Project, root string,
m backend.UpdateMetadata, opts engine.UpdateOptions, displayOpts backend.DisplayOptions) error {
return backend.DestroyStack(s, proj, root, m, opts, displayOpts)
func (s *localStack) Destroy(proj *workspace.Project, root string, m backend.UpdateMetadata, opts engine.UpdateOptions,
displayOpts backend.DisplayOptions, scopes backend.CancellationScopeSource) error {
return backend.DestroyStack(s, proj, root, m, opts, displayOpts, scopes)
}
func (s *localStack) GetLogs(query operations.LogQuery) ([]operations.LogEntry, error) {

View file

@ -24,14 +24,14 @@ type Stack interface {
Backend() Backend // the backend this stack belongs to.
// Update this stack.
Update(proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error
Update(proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions,
displayOpts DisplayOptions, scopes CancellationScopeSource) error
// Refresh this stack's state from the cloud provider.
Refresh(proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error
Refresh(proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions,
displayOpts DisplayOptions, scopes CancellationScopeSource) error
// Destroy this stack's resources.
Destroy(proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error
Destroy(proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions,
displayOpts DisplayOptions, scopes CancellationScopeSource) error
Remove(force bool) (bool, error) // remove this stack.
GetLogs(query operations.LogQuery) ([]operations.LogEntry, error) // list log entries for this stack.
@ -45,21 +45,21 @@ func RemoveStack(s Stack, force bool) (bool, error) {
}
// UpdateStack updates the target stack with the current workspace's contents (config and code).
func UpdateStack(s Stack, proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error {
return s.Backend().Update(s.Name(), proj, root, m, opts, displayOpts)
func UpdateStack(s Stack, proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions,
displayOpts DisplayOptions, scopes CancellationScopeSource) error {
return s.Backend().Update(s.Name(), proj, root, m, opts, displayOpts, scopes)
}
// RefreshStack refresh's the stack's state from the cloud provider.
func RefreshStack(s Stack, proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error {
return s.Backend().Refresh(s.Name(), proj, root, m, opts, displayOpts)
func RefreshStack(s Stack, proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions,
displayOpts DisplayOptions, scopes CancellationScopeSource) error {
return s.Backend().Refresh(s.Name(), proj, root, m, opts, displayOpts, scopes)
}
// DestroyStack destroys all of this stack's resources.
func DestroyStack(s Stack, proj *workspace.Project, root string,
m UpdateMetadata, opts engine.UpdateOptions, displayOpts DisplayOptions) error {
return s.Backend().Destroy(s.Name(), proj, root, m, opts, displayOpts)
func DestroyStack(s Stack, proj *workspace.Project, root string, m UpdateMetadata, opts engine.UpdateOptions,
displayOpts DisplayOptions, scopes CancellationScopeSource) error {
return s.Backend().Destroy(s.Name(), proj, root, m, opts, displayOpts, scopes)
}
// GetStackCrypter fetches the encrypter/decrypter for a stack.

View file

@ -10,20 +10,20 @@ import (
)
func Destroy(
u UpdateInfo, events chan<- Event, opts UpdateOptions, dryRun bool) (ResourceChanges, error) {
u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, error) {
contract.Require(u != nil, "u")
defer func() { events <- cancelEvent() }()
defer func() { ctx.Events <- cancelEvent() }()
ctx, err := newPlanContext(u)
info, err := newPlanContext(u)
if err != nil {
return nil, err
}
defer ctx.Close()
defer info.Close()
emitter := makeEventEmitter(events, u)
return update(ctx, planOptions{
emitter := makeEventEmitter(ctx.Events, u)
return update(ctx, info, planOptions{
UpdateOptions: opts,
SourceFunc: newDestroySource,
Events: emitter,

View file

@ -4,6 +4,7 @@ package engine
import (
"github.com/pulumi/pulumi/pkg/resource/deploy"
"github.com/pulumi/pulumi/pkg/util/cancel"
"github.com/pulumi/pulumi/pkg/workspace"
)
@ -39,3 +40,9 @@ type SnapshotMutation interface {
// persisted. See the comments on SnapshotProvider.BeginMutation for more details.
End(snapshot *deploy.Snapshot) error
}
// Context provides cancellation, termination, and eventing options for an engine operation.
type Context struct {
Cancel *cancel.Context
Events chan<- Event
}

View file

@ -157,7 +157,7 @@ func (res *planResult) Chdir() (func(), error) {
// Walk enumerates all steps in the plan, calling out to the provided action at each step. It returns four things: the
// resulting Snapshot, no matter whether an error occurs or not; an error, if something went wrong; the step that
// failed, if the error is non-nil; and finally the state of the resource modified in the failing step.
func (res *planResult) Walk(events deploy.Events, preview bool) (deploy.PlanSummary,
func (res *planResult) Walk(ctx *Context, events deploy.Events, preview bool) (deploy.PlanSummary,
deploy.Step, resource.Status, error) {
opts := deploy.Options{
Events: events,
@ -172,33 +172,57 @@ func (res *planResult) Walk(events deploy.Events, preview bool) (deploy.PlanSumm
step, err := iter.Next()
if err != nil {
closeerr := iter.Close() // ignore close errors; the Next error trumps
contract.IgnoreError(closeerr)
closeErr := iter.Close() // ignore close errors; the Next error trumps
contract.IgnoreError(closeErr)
return nil, nil, resource.StatusOK, err
}
for step != nil {
// Perform any per-step actions.
rst, err := iter.Apply(step, preview)
// Iterate the plan in a goroutine while listening for termination.
var rst resource.Status
done := make(chan bool)
go func() {
defer func() {
// Close the iterator. If we have already observed another error, that error trumps the close error.
closeErr := iter.Close()
if err == nil {
err = closeErr
}
close(done)
}()
// If an error occurred, exit early.
if err != nil {
closeerr := iter.Close() // ignore close errors; the action error trumps
contract.IgnoreError(closeerr)
return iter, step, rst, err
}
contract.Assert(rst == resource.StatusOK)
for step != nil {
// Check for cancellation and termination.
if cancelErr := ctx.Cancel.CancelErr(); cancelErr != nil {
rst, err = resource.StatusOK, cancelErr
return
}
step, err = iter.Next()
if err != nil {
closeerr := iter.Close() // ignore close errors; the action error trumps
contract.IgnoreError(closeerr)
return iter, step, resource.StatusOK, err
// Perform any per-step actions.
rst, err = iter.Apply(step, preview)
// If an error occurred, exit early.
if err != nil {
return
}
contract.Assert(rst == resource.StatusOK)
step, err = iter.Next()
if err != nil {
return
}
}
// Finally, return a summary and the resulting plan information.
rst, err = resource.StatusOK, nil
}()
select {
case <-ctx.Cancel.Terminated():
return iter, step, rst, ctx.Cancel.TerminateErr()
case <-done:
return iter, step, rst, err
}
// Finally, return a summary and the resulting plan information.
return iter, nil, resource.StatusOK, iter.Close()
}
func (res *planResult) Close() error {
@ -206,12 +230,12 @@ func (res *planResult) Close() error {
}
// printPlan prints the plan's result to the plan's Options.Events stream.
func printPlan(result *planResult, dryRun bool) (ResourceChanges, error) {
func printPlan(ctx *Context, result *planResult, dryRun bool) (ResourceChanges, error) {
result.Options.Events.preludeEvent(dryRun, result.Ctx.Update.GetTarget().Config)
// Walk the plan's steps and and pretty-print them out.
actions := newPreviewActions(result.Options)
_, _, _, err := result.Walk(actions, true)
_, _, _, err := result.Walk(ctx, actions, true)
if err != nil {
return nil, errors.New("an error occurred while advancing the preview")
}

View file

@ -9,20 +9,20 @@ import (
"github.com/pulumi/pulumi/pkg/util/contract"
)
func Preview(u UpdateInfo, events chan<- Event, opts UpdateOptions) error {
func Preview(u UpdateInfo, ctx *Context, opts UpdateOptions) error {
contract.Require(u != nil, "u")
contract.Require(events != nil, "events")
contract.Require(ctx != nil, "ctx")
defer func() { events <- cancelEvent() }()
defer func() { ctx.Events <- cancelEvent() }()
ctx, err := newPlanContext(u)
info, err := newPlanContext(u)
if err != nil {
return err
}
defer ctx.Close()
defer info.Close()
emitter := makeEventEmitter(events, u)
return preview(ctx, planOptions{
emitter := makeEventEmitter(ctx.Events, u)
return preview(ctx, info, planOptions{
UpdateOptions: opts,
SourceFunc: newUpdateSource,
Events: emitter,
@ -30,8 +30,8 @@ func Preview(u UpdateInfo, events chan<- Event, opts UpdateOptions) error {
})
}
func preview(ctx *planContext, opts planOptions) error {
result, err := plan(ctx, opts, true /*dryRun*/)
func preview(ctx *Context, info *planContext, opts planOptions) error {
result, err := plan(info, opts, true /*dryRun*/)
if err != nil {
return err
}
@ -45,7 +45,7 @@ func preview(ctx *planContext, opts planOptions) error {
}
defer done()
if _, err := printPlan(result, true /*dryRun*/); err != nil {
if _, err := printPlan(ctx, result, true /*dryRun*/); err != nil {
return err
}
}

View file

@ -9,19 +9,20 @@ import (
"github.com/pulumi/pulumi/pkg/workspace"
)
func Refresh(u UpdateInfo, events chan<- Event, opts UpdateOptions, dryRun bool) (ResourceChanges, error) {
func Refresh(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, error) {
contract.Require(u != nil, "u")
defer func() { events <- cancelEvent() }()
defer func() { ctx.Events <- cancelEvent() }()
ctx, err := newPlanContext(u)
info, err := newPlanContext(u)
if err != nil {
return nil, err
}
defer ctx.Close()
defer info.Close()
emitter := makeEventEmitter(events, u)
return update(ctx, planOptions{
emitter := makeEventEmitter(ctx.Events, u)
return update(ctx, info, planOptions{
UpdateOptions: opts,
SkipOutputs: true, // refresh is exclusively about outputs
SourceFunc: newRefreshSource,

View file

@ -35,22 +35,20 @@ type UpdateOptions struct {
// ResourceChanges contains the aggregate resource changes by operation type.
type ResourceChanges map[deploy.StepOp]int
func Update(
u UpdateInfo, events chan<- Event, opts UpdateOptions, dryRun bool) (ResourceChanges, error) {
func Update(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, error) {
contract.Require(u != nil, "update")
contract.Require(events != nil, "events")
contract.Require(ctx != nil, "ctx")
defer func() { events <- cancelEvent() }()
defer func() { ctx.Events <- cancelEvent() }()
ctx, err := newPlanContext(u)
info, err := newPlanContext(u)
if err != nil {
return nil, err
}
defer ctx.Close()
defer info.Close()
emitter := makeEventEmitter(events, u)
return update(ctx, planOptions{
emitter := makeEventEmitter(ctx.Events, u)
return update(ctx, info, planOptions{
UpdateOptions: opts,
SourceFunc: newUpdateSource,
Events: emitter,
@ -87,7 +85,7 @@ func newUpdateSource(
}, dryRun), nil
}
func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges, error) {
func update(ctx *Context, info *planContext, opts planOptions, dryRun bool) (ResourceChanges, error) {
result, err := plan(info, opts, dryRun)
if err != nil {
return nil, err
@ -106,7 +104,7 @@ func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges,
if dryRun {
// If a dry run, just print the plan, don't actually carry out the deployment.
resourceChanges, err = printPlan(result, dryRun)
resourceChanges, err = printPlan(ctx, result, dryRun)
if err != nil {
return resourceChanges, err
}
@ -116,8 +114,8 @@ func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges,
// Walk the plan, reporting progress and executing the actual operations as we go.
start := time.Now()
actions := newUpdateActions(info.Update, opts)
summary, _, _, err := result.Walk(actions, false)
actions := newUpdateActions(ctx, info.Update, opts)
summary, _, _, err := result.Walk(ctx, actions, false)
if err != nil && summary == nil {
// Something went wrong, and no changes were made.
return resourceChanges, err
@ -139,6 +137,7 @@ func update(info *planContext, opts planOptions, dryRun bool) (ResourceChanges,
// updateActions pretty-prints the plan application process as it goes.
type updateActions struct {
Context *Context
Steps int
Ops map[deploy.StepOp]int
Seen map[resource.URN]deploy.Step
@ -147,12 +146,13 @@ type updateActions struct {
Opts planOptions
}
func newUpdateActions(u UpdateInfo, opts planOptions) *updateActions {
func newUpdateActions(context *Context, u UpdateInfo, opts planOptions) *updateActions {
return &updateActions{
Ops: make(map[deploy.StepOp]int),
Seen: make(map[resource.URN]deploy.Step),
Update: u,
Opts: opts,
Context: context,
Ops: make(map[deploy.StepOp]int),
Seen: make(map[resource.URN]deploy.Step),
Update: u,
Opts: opts,
}
}
@ -171,6 +171,12 @@ func (acts *updateActions) OnResourceStepPost(ctx interface{},
assertSeen(acts.Seen, step)
// If we've already been terminated, exit without writing the checkpoint. We explicitly want to leave the
// checkpoint in an inconsistent state in this event.
if acts.Context.Cancel.TerminateErr() != nil {
return nil
}
// Report the result of the step.
stepop := step.Op()
if err != nil {

View file

@ -0,0 +1,83 @@
package cancel
import (
"context"
"github.com/pulumi/pulumi/pkg/util/contract"
)
// Context provides the ability to observe cancellation and termination requests from a Source. A termination request
// automatically triggers a corresponding cancellation request. This can be used to implement cancellation with two
// priority levels.
type Context struct {
terminate context.Context
cancel context.Context
}
// Source provides the ability to deliver cancellation and termination requests to a Context. A termination request
// automatically triggers a corresponding cancellation request. This can be used to implement cancellation with two
// priority levels.
type Source struct {
context *Context
terminate context.CancelFunc
cancel context.CancelFunc
}
// NewContext creates a new cancellation context and source parented to the given context. The returned cancellation
// context will be terminated when the supplied root context is canceled.
func NewContext(ctx context.Context) (*Context, *Source) {
contract.Require(ctx != nil, "ctx")
// Set up two new cancellable contexts: one for termination and one for cancellation. The cancellation context is a
// child context of the termination context and will therefore be automatically cancelled when termination is
// requested. Both are children of the supplied context--cancelling the supplied context will cause termination.
terminationContext, terminate := context.WithCancel(ctx)
cancellationContext, cancel := context.WithCancel(terminationContext)
c := &Context{
terminate: terminationContext,
cancel: cancellationContext,
}
s := &Source{
context: c,
terminate: terminate,
cancel: cancel,
}
return c, s
}
// Canceled returns a channel that will be closed when the context is canceled or terminated.
func (c *Context) Canceled() <-chan struct{} {
return c.cancel.Done()
}
// CancelErr returns a non-nil error iff the context has been canceled or terminated.
func (c *Context) CancelErr() error {
return c.cancel.Err()
}
// Terminated returns a channel that will be closed when the context is terminated.
func (c *Context) Terminated() <-chan struct{} {
return c.terminate.Done()
}
// TerminateErr returns a non-nil error iff the context has been terminated.
func (c *Context) TerminateErr() error {
return c.terminate.Err()
}
// Context returns the Context to which this source will deliver cancellation and termination requests.
func (s *Source) Context() *Context {
return s.context
}
// Cancel cancels this source's context.
func (s *Source) Cancel() {
s.cancel()
}
// Terminate terminates this source's context (which also cancels this context).
func (s *Source) Terminate() {
s.terminate()
}