This commit is contained in:
evanboyle 2020-09-08 17:27:41 -07:00
parent 2585b86aa4
commit b924cd12df
3 changed files with 103 additions and 0 deletions

View file

@ -15,6 +15,7 @@
package auto
import (
"bufio"
"bytes"
"context"
"os"
@ -46,3 +47,49 @@ func runPulumiCommandSync(
}
return stdout.String(), stderr.String(), code, err
}
func runPulumiCommandAsync(
ctx context.Context,
workdir string,
additionalEnv []string,
updates chan string,
done chan error,
args ...string,
) (string, string, int, error) {
// all commands should be run in non-interactive mode.
// this causes commands to fail rather than prompting for input (and thus hanging indefinitely)
args = append(args, "--non-interactive")
cmd := exec.CommandContext(ctx, "pulumi", args...)
cmd.Dir = workdir
cmd.Env = append(os.Environ(), additionalEnv...)
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stderr = &stderr
out, err := cmd.StdoutPipe()
if err != nil {
return "", "", unknownErrorCode, err
}
go func() {
scanner := bufio.NewScanner(out)
for scanner.Scan() {
m := scanner.Text()
updates <- m
stdout.WriteString(m)
}
}()
code := unknownErrorCode
err = cmd.Start()
if err != nil {
done <- err
}
err = cmd.Wait()
if err != nil {
done <- err
}
if exitError, ok := err.(*exec.ExitError); ok {
code = exitError.ExitCode()
}
return stdout.String(), stderr.String(), code, err
}

View file

@ -59,6 +59,15 @@ func TargetDependents() Option {
})
}
// StreamUpdates will incrementally send update output to the specified events chan, sending an event to the done chan
// when the update is complete.
func StreamUpdates(events chan string, done chan error) Option {
return optionFunc(func(opts *Options) {
opts.UpdateChan = events
opts.DoneChan = done
})
}
// Option is a parameter to be applied to a Stack.Up() operation
type Option interface {
ApplyOption(*Options)
@ -81,6 +90,10 @@ type Options struct {
Target []string
// Allows updating of dependent targets discovered but not specified in the Target list
TargetDependents bool
// Channel to send stdout events during the update
UpdateChan chan string
// Channel to send completion signal at the end of the update
DoneChan chan error
}
type optionFunc func(*Options)

View file

@ -238,6 +238,8 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
return res, errors.Wrap(err, "failed to run update")
}
async := false
upOpts := &optup.Options{}
for _, o := range opts {
o.ApplyOption(upOpts)
@ -259,6 +261,9 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
if upOpts.TargetDependents {
sharedArgs = append(sharedArgs, "--target-dependents")
}
if upOpts.UpdateChan != nil {
async = true
}
var stdout, stderr string
var code int
@ -279,6 +284,10 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
args = append(args, fmt.Sprintf("--parallel=%d", upOpts.Parallel))
}
if async {
stdout, stderr, code, err = s.runPulumiCmdAsync(ctx, upOpts.UpdateChan, upOpts.DoneChan, args...)
}
stdout, stderr, code, err = s.runPulumiCmdSync(ctx, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
@ -656,6 +665,40 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
return stdout, stderr, errCode, nil
}
// TODO pull out common prep code
func (s *Stack) runPulumiCmdAsync(
ctx context.Context,
updates chan string,
done chan error,
args ...string,
) (string, string, int, error) {
var env []string
if s.Workspace().PulumiHome() != "" {
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
env = append(env, homeEnv)
}
if envvars := s.Workspace().GetEnvVars(); envvars != nil {
for k, v := range envvars {
e := []string{k, v}
env = append(env, strings.Join(e, "="))
}
}
additionalArgs, err := s.Workspace().SerializeArgsForOp(ctx, s.Name())
if err != nil {
return "", "", -1, errors.Wrap(err, "failed to exec command, error getting additional args")
}
args = append(args, additionalArgs...)
stdout, stderr, errCode, err := runPulumiCommandAsync(ctx, s.Workspace().WorkDir(), env, updates, done, args...)
if err != nil {
return stdout, stderr, errCode, err
}
err = s.Workspace().PostCommandCallback(ctx, s.Name())
if err != nil {
return stdout, stderr, errCode, errors.Wrap(err, "command ran successfully, but error running PostCommandCallback")
}
return stdout, stderr, errCode, nil
}
func (s *Stack) host(ctx context.Context, additionalArgs []string, parallel int) (string, string, error) {
proj, err := s.Workspace().ProjectSettings(ctx)
if err != nil {