Automation API - add the ability to stream progress updates during up/refresh/destroy (#5367)

This commit is contained in:
Evan Boyle 2020-09-14 18:56:04 -07:00 committed by GitHub
parent c05ac500da
commit c23ca46382
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 196 additions and 17 deletions

View file

@ -21,6 +21,9 @@ CHANGELOG
API and the CLI must be updated together.
[#5317](https://github.com/pulumi/pulumi/pull/5317)
- Automation API - support streaming output for Up/Refresh/Destroy operations.
[#5367](https://github.com/pulumi/pulumi/pull/5367)
## 2.10.0 (2020-09-10)
- feat(autoapi): add Upsert methods for stacks

View file

@ -101,6 +101,14 @@ conflicts:
if err != nil && IsConcurrentUpdateError(err) { /* retry logic here */ }
```
## Developing the Godocs
This repo has extensive examples and godoc content. To test out your changes locally you can do the following:
1. enlist in the appropriate pulumi branch:
2. cd $GOPATH/src/github.com/pulumi/pulumi/sdk/go/x/auto
3. godoc -http=:6060
4. Navigate to http://localhost:6060/pkg/github.com/pulumi/pulumi/sdk/v2/go/x/auto/
## Known Issues
The Automation API is currently in Alpha and has several known issues. Please upvote issues,

View file

@ -17,6 +17,7 @@ package auto
import (
"bytes"
"context"
"io"
"os"
"os/exec"
)
@ -26,6 +27,7 @@ const unknownErrorCode = -2
func runPulumiCommandSync(
ctx context.Context,
workdir string,
additionalOutput []io.Writer,
additionalEnv []string,
args ...string,
) (string, string, int, error) {
@ -35,10 +37,13 @@ func runPulumiCommandSync(
cmd := exec.CommandContext(ctx, "pulumi", args...)
cmd.Dir = workdir
cmd.Env = append(os.Environ(), additionalEnv...)
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
additionalOutput = append(additionalOutput, &stdout)
cmd.Stdout = io.MultiWriter(additionalOutput...)
cmd.Stderr = &stderr
code := unknownErrorCode
err := cmd.Run()
if exitError, ok := err.(*exec.ExitError); ok {

View file

@ -18,6 +18,8 @@ package auto
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
@ -205,7 +207,7 @@ func ExampleGitRepo() {
func ExampleGitRepo_personalAccessToken() {
ctx := context.Background()
pName := "go_remote_proj"
fqsn := FullyQualifiedStackName("myOrg", pName, "myStack")
stackName := FullyQualifiedStackName("myOrg", pName, "myStack")
// Get the Sourcecode Repository PERSONAL_ACCESS_TOKEN
token, _ := os.LookupEnv("PERSONAL_ACCESS_TOKEN")
@ -219,13 +221,13 @@ func ExampleGitRepo_personalAccessToken() {
}
// initialize a stack from the git repo, specifying our project override
NewStackRemoteSource(ctx, fqsn, repo)
NewStackRemoteSource(ctx, stackName, repo)
}
func ExampleGitRepo_privateKeyPath() {
ctx := context.Background()
pName := "go_remote_proj"
fqsn := FullyQualifiedStackName("myOrg", pName, "myStack")
stackName := FullyQualifiedStackName("myOrg", pName, "myStack")
repo := GitRepo{
URL: "https://github.com/pulumi/test-repo.git",
@ -237,13 +239,13 @@ func ExampleGitRepo_privateKeyPath() {
}
// initialize a stack from the git repo, specifying our project override
NewStackRemoteSource(ctx, fqsn, repo)
NewStackRemoteSource(ctx, stackName, repo)
}
func ExampleGitRepo_usernameAndPassword() {
ctx := context.Background()
pName := "go_remote_proj"
fqsn := FullyQualifiedStackName("myOrg", pName, "myStack")
stackName := FullyQualifiedStackName("myOrg", pName, "myStack")
repo := GitRepo{
URL: "https://github.com/pulumi/test-repo.git",
@ -256,7 +258,7 @@ func ExampleGitRepo_usernameAndPassword() {
}
// initialize a stack from the git repo, specifying our project override
NewStackRemoteSource(ctx, fqsn, repo)
NewStackRemoteSource(ctx, stackName, repo)
}
func ExampleLocalWorkspace() {
@ -917,6 +919,20 @@ func ExampleStack_Destroy() {
stack.Destroy(ctx, optdestroy.Message("a message to save with the destroy operation"))
}
func ExampleStack_Destroy_streamingProgress() {
ctx := context.Background()
stackName := FullyQualifiedStackName("org", "project", "stack")
// select an existing stack to destroy
stack, _ := SelectStackLocalSource(ctx, stackName, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optdestroy.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this destroy will incrementally stream unstructured progress messages to stdout and our temp file
stack.Destroy(ctx, optdestroy.ProgressStreams(progressStreams...))
}
func ExampleStack_Up() {
ctx := context.Background()
stackName := FullyQualifiedStackName("org", "project", "stack")
@ -925,6 +941,20 @@ func ExampleStack_Up() {
stack.Up(ctx, optup.Message("a message to save with the up operation"), optup.Parallel(10000))
}
func ExampleStack_Up_streamingProgress() {
ctx := context.Background()
stackName := FullyQualifiedStackName("org", "project", "stack")
// create a new stack to update
stack, _ := NewStackLocalSource(ctx, stackName, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optup.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this update will incrementally stream unstructured progress messages to stdout and our temp file
stack.Up(ctx, optup.ProgressStreams(progressStreams...))
}
func ExampleStack_Preview() {
ctx := context.Background()
stackName := FullyQualifiedStackName("org", "project", "stack")
@ -941,6 +971,20 @@ func ExampleStack_Refresh() {
stack.Refresh(ctx, optrefresh.Message("a message to save with the refresh operation"))
}
func ExampleStack_Refresh_streamingProgress() {
ctx := context.Background()
stackName := FullyQualifiedStackName("org", "project", "stack")
// select an existing stack and refresh the resources under management
stack, _ := SelectStackLocalSource(ctx, stackName, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optrefresh.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this refresh will incrementally stream unstructured progress messages to stdout and our temp file
stack.Refresh(ctx, optrefresh.ProgressStreams(progressStreams...))
}
func ExampleStack_GetAllConfig() {
ctx := context.Background()
stackName := FullyQualifiedStackName("org", "project", "stack")

View file

@ -424,7 +424,7 @@ func (l *LocalWorkspace) runPulumiCmdSync(
env = append(env, strings.Join(e, "="))
}
}
return runPulumiCommandSync(ctx, l.WorkDir(), env, args...)
return runPulumiCommandSync(ctx, l.WorkDir(), nil /* additionalOutputs */, env, args...)
}
// NewLocalWorkspace creates and configures a LocalWorkspace. LocalWorkspaceOptions can be used to

View file

@ -15,6 +15,7 @@
package auto
import (
"bytes"
"context"
"fmt"
"math/rand"
@ -28,6 +29,9 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi/config"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optdestroy"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optrefresh"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optup"
"github.com/stretchr/testify/assert"
)
@ -930,6 +934,77 @@ func TestNestedStackFails(t *testing.T) {
assert.Equal(t, "succeeded", dRes.Summary.Result)
}
func TestProgressStreams(t *testing.T) {
ctx := context.Background()
pName := "inline_progress_streams"
sName := fmt.Sprintf("int_test%d", rangeIn(10000000, 99999999))
stackName := FullyQualifiedStackName(pulumiOrg, pName, sName)
cfg := ConfigMap{
"bar": ConfigValue{
Value: "abc",
},
"buzz": ConfigValue{
Value: "secret",
Secret: true,
},
}
// initialize
s, err := NewStackInlineSource(ctx, stackName, pName, func(ctx *pulumi.Context) error {
c := config.New(ctx, "")
ctx.Export("exp_static", pulumi.String("foo"))
ctx.Export("exp_cfg", pulumi.String(c.Get("bar")))
ctx.Export("exp_secret", c.GetSecret("buzz"))
return nil
})
if err != nil {
t.Errorf("failed to initialize stack, err: %v", err)
t.FailNow()
}
defer func() {
// -- pulumi stack rm --
err = s.Workspace().RemoveStack(ctx, s.Name())
assert.Nil(t, err, "failed to remove stack. Resources have leaked.")
}()
err = s.SetAllConfig(ctx, cfg)
if err != nil {
t.Errorf("failed to set config, err: %v", err)
t.FailNow()
}
// -- pulumi up --
var upOut bytes.Buffer
res, err := s.Up(ctx, optup.ProgressStreams(&upOut))
if err != nil {
t.Errorf("up failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, upOut.String(), res.StdOut, "expected stdout writers to contain same contents")
// -- pulumi refresh --
var refOut bytes.Buffer
ref, err := s.Refresh(ctx, optrefresh.ProgressStreams(&refOut))
if err != nil {
t.Errorf("refresh failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, refOut.String(), ref.StdOut, "expected stdout writers to contain same contents")
// -- pulumi destroy --
var desOut bytes.Buffer
dRes, err := s.Destroy(ctx, optdestroy.ProgressStreams(&desOut))
if err != nil {
t.Errorf("destroy failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, desOut.String(), dRes.StdOut, "expected stdout writers to contain same contents")
}
func getTestOrg() string {
testOrg := "pulumi-test"
if _, set := os.LookupEnv("PULUMI_TEST_ORG"); set {

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Destroy(...optdestroy.Option)
package optdestroy
import "io"
// Parallel is the number of resource operations to run in parallel at once during the destroy
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -45,6 +47,13 @@ func TargetDependents() Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Destroy() operation
type Option interface {
ApplyOption(*Options)
@ -63,6 +72,8 @@ type Options struct {
Target []string
// Allows updating of dependent targets discovered but not specified in the Target list
TargetDependents bool
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Refresh(...optrefresh.Option)
package optrefresh
import "io"
// Parallel is the number of resource operations to run in parallel at once during the refresh
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -45,6 +47,13 @@ func Target(urns []string) Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Refresh() operation
type Option interface {
ApplyOption(*Options)
@ -63,6 +72,8 @@ type Options struct {
ExpectNoChanges bool
// Specify an exclusive list of resource URNs to re
Target []string
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Up(...optup.Option)
package optup
import "io"
// Parallel is the number of resource operations to run in parallel at once during the update
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -59,6 +61,13 @@ func TargetDependents() Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Up() operation
type Option interface {
ApplyOption(*Options)
@ -81,6 +90,8 @@ type Options struct {
Target []string
// Allows updating of dependent targets discovered but not specified in the Target list
TargetDependents bool
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -91,6 +91,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"runtime"
"strings"
"sync"
@ -246,7 +247,7 @@ func (s *Stack) Preview(ctx context.Context, opts ...optpreview.Option) (Preview
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, nil /* additionalOutput */, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
}
@ -306,7 +307,7 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, upOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
@ -368,7 +369,7 @@ func (s *Stack) Refresh(ctx context.Context, opts ...optrefresh.Option) (Refresh
}
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, refreshOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to refresh stack"), stdout, stderr, code)
}
@ -425,7 +426,7 @@ func (s *Stack) Destroy(ctx context.Context, opts ...optdestroy.Option) (Destroy
}
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, destroyOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to destroy stack"), stdout, stderr, code)
}
@ -457,13 +458,17 @@ func (s *Stack) Outputs(ctx context.Context) (OutputMap, error) {
}
// standard outputs
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json")
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"stack", "output", "--json",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "could not get outputs"), outStdout, outStderr, code)
}
// secret outputs
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json", "--show-secrets")
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"stack", "output", "--json", "--show-secrets",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "could not get secret outputs"), outStdout, outStderr, code)
}
@ -499,7 +504,9 @@ func (s *Stack) History(ctx context.Context) ([]UpdateSummary, error) {
return nil, errors.Wrap(err, "failed to get stack history")
}
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, "history", "--json", "--show-secrets")
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"history", "--json", "--show-secrets",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "failed to get stack history"), stdout, stderr, errCode)
}
@ -654,7 +661,11 @@ type DestroyResult struct {
// secretSentinel represents the CLI response for an output marked as "secret"
const secretSentinel = "[secret]"
func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, string, int, error) {
func (s *Stack) runPulumiCmdSync(
ctx context.Context,
additionalOutput []io.Writer,
args ...string,
) (string, string, int, error) {
var env []string
if s.Workspace().PulumiHome() != "" {
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
@ -671,7 +682,7 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
return "", "", -1, errors.Wrap(err, "failed to exec command, error getting additional args")
}
args = append(args, additionalArgs...)
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), env, args...)
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), additionalOutput, env, args...)
if err != nil {
return stdout, stderr, errCode, err
}