add the ability to stream progress updates during up/refresh/destroy

This commit is contained in:
evanboyle 2020-09-11 07:57:03 -07:00
parent d1d8117853
commit 1d7608821b
9 changed files with 188 additions and 11 deletions

View file

@ -101,6 +101,14 @@ conflicts:
if err != nil && IsConcurrentUpdateError(err) { /* retry logic here */ }
```
## Developing the Godocs
This repo has extensive examples and godoc content. To test out your changes locally you can do the following:
1. enlist in the appropriate pulumi branch:
2. cd $GOPATH/src/github.com/pulumi/pulumi/sdk/go/x/auto
3. godoc -http=:6060
4. Navigate to http://localhost:6060/pkg/github.com/pulumi/pulumi/sdk/v2/go/x/auto/
## Known Issues
The Automation API is currently in Alpha and has several known issues. Please upvote issues,

View file

@ -17,6 +17,7 @@ package auto
import (
"bytes"
"context"
"io"
"os"
"os/exec"
)
@ -26,6 +27,7 @@ const unknownErrorCode = -2
func runPulumiCommandSync(
ctx context.Context,
workdir string,
additionalOutput []io.Writer,
additionalEnv []string,
args ...string,
) (string, string, int, error) {
@ -35,10 +37,13 @@ func runPulumiCommandSync(
cmd := exec.CommandContext(ctx, "pulumi", args...)
cmd.Dir = workdir
cmd.Env = append(os.Environ(), additionalEnv...)
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
additionalOutput = append(additionalOutput, &stdout)
cmd.Stdout = io.MultiWriter(additionalOutput...)
cmd.Stderr = &stderr
code := unknownErrorCode
err := cmd.Run()
if exitError, ok := err.(*exec.ExitError); ok {

View file

@ -18,6 +18,9 @@ package auto
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
@ -734,6 +737,20 @@ func ExampleStack_Destroy() {
stack.Destroy(ctx, optdestroy.Message("a message to save with the destroy operation"))
}
func ExampleStack_Destroy_streamingProgress() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
// select an existing stack to destroy
stack, _ := SelectStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optdestroy.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this destroy will incrementally stream unstructured progress messages to stdout and our temp file
stack.Destroy(ctx, optdestroy.ProgressStreams(progressStreams...))
}
func ExampleStack_Up() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
@ -742,6 +759,20 @@ func ExampleStack_Up() {
stack.Up(ctx, optup.Message("a message to save with the up operation"), optup.Parallel(10000))
}
func ExampleStack_Up_streamingProgress() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
// create a new stack to update
stack, _ := NewStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optup.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this update will incrementally stream unstructured progress messages to stdout and our temp file
stack.Up(ctx, optup.ProgressStreams(progressStreams...))
}
func ExampleStack_Preview() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
@ -758,6 +789,20 @@ func ExampleStack_Refresh() {
stack.Refresh(ctx, optrefresh.Message("a message to save with the refresh operation"))
}
func ExampleStack_Refresh_streamingProgress() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
// select an existing stack and refresh the resources under management
stack, _ := SelectStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optrefresh.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this refresh will incrementally stream unstructured progress messages to stdout and our temp file
stack.Refresh(ctx, optrefresh.ProgressStreams(progressStreams...))
}
func ExampleStack_GetAllConfig() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")

View file

@ -458,7 +458,7 @@ func (l *LocalWorkspace) runPulumiCmdSync(
env = append(env, strings.Join(e, "="))
}
}
return runPulumiCommandSync(ctx, l.WorkDir(), env, args...)
return runPulumiCommandSync(ctx, l.WorkDir(), nil /* additionalOutputs */, env, args...)
}
// NewLocalWorkspace creates and configures a LocalWorkspace. LocalWorkspaceOptions can be used to

View file

@ -15,6 +15,7 @@
package auto
import (
"bytes"
"context"
"fmt"
"math/rand"
@ -27,6 +28,9 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi/config"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optdestroy"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optrefresh"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optup"
"github.com/stretchr/testify/assert"
)
@ -474,3 +478,74 @@ func TestNestedStackFails(t *testing.T) {
assert.Equal(t, "destroy", dRes.Summary.Kind)
assert.Equal(t, "succeeded", dRes.Summary.Result)
}
func TestProgressStreams(t *testing.T) {
ctx := context.Background()
pName := "inline_progress_streams"
sName := fmt.Sprintf("int_test%d", rangeIn(10000000, 99999999))
fqsn := FullyQualifiedStackName(pulumiOrg, pName, sName)
cfg := ConfigMap{
"bar": ConfigValue{
Value: "abc",
},
"buzz": ConfigValue{
Value: "secret",
Secret: true,
},
}
// initialize
s, err := NewStackInlineSource(ctx, fqsn, func(ctx *pulumi.Context) error {
c := config.New(ctx, "")
ctx.Export("exp_static", pulumi.String("foo"))
ctx.Export("exp_cfg", pulumi.String(c.Get("bar")))
ctx.Export("exp_secret", c.GetSecret("buzz"))
return nil
})
if err != nil {
t.Errorf("failed to initialize stack, err: %v", err)
t.FailNow()
}
defer func() {
// -- pulumi stack rm --
err = s.Workspace().RemoveStack(ctx, s.Name())
assert.Nil(t, err, "failed to remove stack. Resources have leaked.")
}()
err = s.SetAllConfig(ctx, cfg)
if err != nil {
t.Errorf("failed to set config, err: %v", err)
t.FailNow()
}
// -- pulumi up --
var upOut bytes.Buffer
res, err := s.Up(ctx, optup.ProgressStreams(&upOut))
if err != nil {
t.Errorf("up failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, upOut.String(), res.StdOut, "expected stdout writers to contain same contents")
// -- pulumi refresh --
var refOut bytes.Buffer
ref, err := s.Refresh(ctx, optrefresh.ProgressStreams(&refOut))
if err != nil {
t.Errorf("refresh failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, refOut.String(), ref.StdOut, "expected stdout writers to contain same contents")
// -- pulumi destroy --
var desOut bytes.Buffer
dRes, err := s.Destroy(ctx, optdestroy.ProgressStreams(&desOut))
if err != nil {
t.Errorf("destroy failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, desOut.String(), dRes.StdOut, "expected stdout writers to contain same contents")
}

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Destroy(...optdestroy.Option)
package optdestroy
import "io"
// Parallel is the number of resource operations to run in parallel at once during the destroy
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -45,6 +47,13 @@ func TargetDependents() Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Destroy() operation
type Option interface {
ApplyOption(*Options)
@ -63,6 +72,8 @@ type Options struct {
Target []string
// Allows updating of dependent targets discovered but not specified in the Target list
TargetDependents bool
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Refresh(...optrefresh.Option)
package optrefresh
import "io"
// Parallel is the number of resource operations to run in parallel at once during the refresh
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -45,6 +47,13 @@ func Target(urns []string) Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Refresh() operation
type Option interface {
ApplyOption(*Options)
@ -63,6 +72,8 @@ type Options struct {
ExpectNoChanges bool
// Specify an exclusive list of resource URNs to re
Target []string
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Up(...optup.Option)
package optup
import "io"
// Parallel is the number of resource operations to run in parallel at once during the update
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -59,6 +61,13 @@ func TargetDependents() Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Up() operation
type Option interface {
ApplyOption(*Options)
@ -81,6 +90,8 @@ type Options struct {
Target []string
// Allows updating of dependent targets discovered but not specified in the Target list
TargetDependents bool
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -91,6 +91,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"runtime"
"strings"
"sync"
@ -222,7 +223,7 @@ func (s *Stack) Preview(ctx context.Context, opts ...optpreview.Option) (Preview
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, nil /* additionalOutput */, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
}
@ -282,7 +283,7 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, upOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
@ -344,7 +345,7 @@ func (s *Stack) Refresh(ctx context.Context, opts ...optrefresh.Option) (Refresh
}
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, refreshOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to refresh stack"), stdout, stderr, code)
}
@ -401,7 +402,7 @@ func (s *Stack) Destroy(ctx context.Context, opts ...optdestroy.Option) (Destroy
}
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, destroyOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to destroy stack"), stdout, stderr, code)
}
@ -433,13 +434,17 @@ func (s *Stack) Outputs(ctx context.Context) (OutputMap, error) {
}
// standard outputs
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json")
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"stack", "output", "--json",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "could not get outputs"), outStdout, outStderr, code)
}
// secret outputs
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json", "--show-secrets")
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"stack", "output", "--json", "--show-secrets",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "could not get secret outputs"), outStdout, outStderr, code)
}
@ -475,7 +480,9 @@ func (s *Stack) History(ctx context.Context) ([]UpdateSummary, error) {
return nil, errors.Wrap(err, "failed to get stack history")
}
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, "history", "--json", "--show-secrets")
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"history", "--json", "--show-secrets",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "failed to get stack history"), stdout, stderr, errCode)
}
@ -630,7 +637,11 @@ type DestroyResult struct {
// secretSentinel represents the CLI response for an output marked as "secret"
const secretSentinel = "[secret]"
func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, string, int, error) {
func (s *Stack) runPulumiCmdSync(
ctx context.Context,
additionalOutput []io.Writer,
args ...string,
) (string, string, int, error) {
var env []string
if s.Workspace().PulumiHome() != "" {
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
@ -647,7 +658,7 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
return "", "", -1, errors.Wrap(err, "failed to exec command, error getting additional args")
}
args = append(args, additionalArgs...)
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), env, args...)
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), additionalOutput, env, args...)
if err != nil {
return stdout, stderr, errCode, err
}