Compare commits

...

3 commits

Author SHA1 Message Date
evanboyle 1d7608821b add the ability to stream progress updates during up/refresh/destroy 2020-09-11 07:57:03 -07:00
Pat Gavlin d1d8117853
Update pkg/engine/plugin_host.go
Co-authored-by: Justin Van Patten <jvp@justinvp.com>
2020-09-10 15:03:37 -07:00
Pat Gavlin 0267c878bd Revise host mode.
The current implementation of host mode uses a `pulumi host` command and
an ad-hoc communication protocol between the engine and client to
connect a language host after the host has begun listening. The most
significant disadvantages of this approach are the communication
protocol (which currently requires the use of stdout), the host-specific
command, and the difficulty of accommodating the typical program-bound
lifetime for an update.

These changes reimplement host mode by adding engine support for
connecting to an existing language runtime service rather than launching
a plugin. This capability is provided via an engine-specific language
runtime, `client`, which accepts the address of the existing languge
runtime service as a runtime option. The CLI exposes this runtime via
the `--client` flag to the `up` and `preview` commands, which similarly
accepts the address of an existing language runtime service as an
argument. These changes also adjust the automation API to consume the
new host mode implementation.
2020-09-08 11:40:37 -07:00
24 changed files with 654 additions and 531 deletions

View file

@ -1,248 +0,0 @@
// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/pulumi/pulumi/pkg/v2/backend"
"github.com/pulumi/pulumi/pkg/v2/backend/display"
"github.com/pulumi/pulumi/pkg/v2/engine"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/result"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
)
// intentionally disabling here for cleaner err declaration/assignment.
// nolint: vetshadow
func newHostCmd() *cobra.Command {
var debug bool
var expectNop bool
var message string
var execKind string
var stack string
var configArray []string
var path bool
// Flags for engine.UpdateOptions.
var policyPackPaths []string
var policyPackConfigPaths []string
var diffDisplay bool
var eventLogPath string
var parallel int
var refresh bool
var showConfig bool
var showReplacementSteps bool
var showSames bool
var showReads bool
var suppressOutputs bool
var yes bool
var secretsProvider string
var targets []string
var replaces []string
var targetReplaces []string
var targetDependents bool
var cmd = &cobra.Command{
Use: "host",
Short: "[PREVIEW] Launch the engine in 'host' mode without launching a language runtime",
Long: "[PREVIEW] Launch the engine in 'host' mode without launching a language runtime.\n" +
"\n" +
"To support some automation scenarios, the engine may be launched in this mode to prevent it from\n" +
"needing to spawn its own language runtime. In this case, the lifetime of interactions between the\n" +
"language and the engine host must be managed manually.",
// host is an internal command not intended for public use
Hidden: true,
Run: cmdutil.RunResultFunc(func(cmd *cobra.Command, args []string) result.Result {
isPreview := len(args) > 0 && args[0] == "preview"
// Construct the options based on flags.
opts, err := updateFlagsToOptions(false, true, true)
if err != nil {
return result.FromError(err)
}
opts.Display = display.Options{
Color: cmdutil.GetGlobalColorization(),
ShowConfig: showConfig,
ShowReplacementSteps: showReplacementSteps,
ShowSameResources: showSames,
ShowReads: showReads,
SuppressOutputs: suppressOutputs,
IsInteractive: false,
Type: display.DisplayDiff,
EventLogPath: eventLogPath,
Debug: debug,
}
opts.Engine = engine.UpdateOptions{
Parallel: parallel,
Debug: debug,
Refresh: refresh,
UseLegacyDiff: useLegacyDiff(),
IsHostCommand: true,
}
// we set up the stack in auto.NewStack before execing host
s, err := requireStack(stack, false /*offerNew*/, opts.Display, false /*setCurrent*/)
if err != nil {
return result.FromError(err)
}
proj, root, err := readProject()
if err != nil {
return result.FromError(err)
}
proj.Runtime = workspace.NewProjectRuntimeInfo("host", nil)
sm, err := getStackSecretsManager(s)
if err != nil {
return result.FromError(errors.Wrap(err, "getting secrets manager"))
}
cfg, err := getStackConfiguration(s, sm)
if err != nil {
return result.FromError(errors.Wrap(err, "getting stack configuration"))
}
m, err := getUpdateMetadata(message, root, execKind)
if err != nil {
return result.FromError(errors.Wrap(err, "gathering environment metadata"))
}
operation := s.Update
if isPreview {
operation = s.Preview
opts.Display.JSONDisplay = true
}
// Now perform the update. This will stay alive until the user cancels the host.
changes, res := operation(commandContext(), backend.UpdateOperation{
Proj: proj,
Root: root,
Opts: opts,
M: m,
StackConfiguration: cfg,
SecretsManager: sm,
Scopes: cancellationScopesWithoutInterrupt,
})
switch {
case res != nil && res.Error() == context.Canceled:
return result.FromError(errors.New("update cancelled"))
case res != nil:
return PrintEngineResult(res)
case expectNop && changes != nil && changes.HasChanges():
return result.FromError(errors.New("error: no changes were expected but changes occurred"))
default:
return nil
}
}),
}
cmd.PersistentFlags().BoolVarP(
&debug, "debug", "d", false,
"Print detailed debugging output during resource operations")
cmd.PersistentFlags().BoolVar(
&expectNop, "expect-no-changes", false,
"Return an error if any changes occur during this update")
cmd.PersistentFlags().StringVarP(
&stack, "stack", "s", "",
"The name of the stack to operate on. Defaults to the current stack")
cmd.PersistentFlags().StringVar(
&stackConfigFile, "config-file", "",
"Use the configuration values in the specified file rather than detecting the file name")
cmd.PersistentFlags().StringArrayVarP(
&configArray, "config", "c", []string{},
"Config to use during the update")
cmd.PersistentFlags().BoolVar(
&path, "config-path", false,
"Config keys contain a path to a property in a map or list to set")
cmd.PersistentFlags().StringVar(
&secretsProvider, "secrets-provider", "default", "The type of the provider that should be used to encrypt and "+
"decrypt secrets (possible choices: default, passphrase, awskms, azurekeyvault, gcpkms, hashivault). Only"+
"used when creating a new stack from an existing template")
cmd.PersistentFlags().StringVarP(
&message, "message", "m", "",
"Optional message to associate with the update operation")
cmd.PersistentFlags().StringArrayVarP(
&targets, "target", "t", []string{},
"Specify a single resource URN to update. Other resources will not be updated."+
" Multiple resources can be specified using --target urn1 --target urn2")
cmd.PersistentFlags().StringArrayVar(
&replaces, "replace", []string{},
"Specify resources to replace. Multiple resources can be specified using --replace run1 --replace urn2")
cmd.PersistentFlags().StringArrayVar(
&targetReplaces, "target-replace", []string{},
"Specify a single resource URN to replace. Other resources will not be updated."+
" Shorthand for --target urn --replace urn.")
cmd.PersistentFlags().BoolVar(
&targetDependents, "target-dependents", false,
"Allows updating of dependent targets discovered but not specified in --target list")
// Flags for engine.UpdateOptions.
cmd.PersistentFlags().StringSliceVar(
&policyPackPaths, "policy-pack", []string{},
"[PREVIEW] Run one or more policy packs as part of this update")
cmd.PersistentFlags().StringSliceVar(
&policyPackConfigPaths, "policy-pack-config", []string{},
`[PREVIEW] Path to JSON file containing the config for the policy pack of the corresponding "--policy-pack" flag`)
cmd.PersistentFlags().BoolVar(
&diffDisplay, "diff", false,
"Display operation as a rich diff showing the overall change")
cmd.PersistentFlags().IntVarP(
&parallel, "parallel", "p", defaultParallel,
"Allow P resource operations to run in parallel at once (1 for no parallelism). Defaults to unbounded.")
cmd.PersistentFlags().BoolVarP(
&refresh, "refresh", "r", false,
"Refresh the state of the stack's resources before this update")
cmd.PersistentFlags().BoolVar(
&showConfig, "show-config", false,
"Show configuration keys and variables")
cmd.PersistentFlags().BoolVar(
&showReplacementSteps, "show-replacement-steps", false,
"Show detailed resource replacement creates and deletes instead of a single step")
cmd.PersistentFlags().BoolVar(
&showSames, "show-sames", false,
"Show resources that don't need be updated because they haven't changed, alongside those that do")
cmd.PersistentFlags().BoolVar(
&showReads, "show-reads", false,
"Show resources that are being read in, alongside those being managed directly in the stack")
cmd.PersistentFlags().BoolVar(
&suppressOutputs, "suppress-outputs", false,
"Suppress display of stack outputs (in case they contain sensitive values)")
cmd.PersistentFlags().BoolVarP(
&yes, "yes", "y", false,
"Automatically approve and perform the update after previewing it")
if hasDebugCommands() {
cmd.PersistentFlags().StringVar(
&eventLogPath, "event-log", "",
"Log events to a file at this path")
}
// internal flag
cmd.PersistentFlags().StringVar(&execKind, "exec-kind", "", "")
// ignore err, only happens if flag does not exist
_ = cmd.PersistentFlags().MarkHidden("exec-kind")
return cmd
}

View file

@ -67,7 +67,7 @@ func getProjectPlugins() ([]workspace.PluginInfo, error) {
// Get the required plugins and then ensure they have metadata populated about them. Because it's possible
// a plugin required by the project hasn't yet been installed, we will simply skip any errors we encounter.
var results []workspace.PluginInfo
plugins, err := ctx.Host.GetRequiredPlugins(plugin.ProgInfo{
plugins, err := plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,

View file

@ -34,6 +34,7 @@ func newPreviewCmd() *cobra.Command {
var stack string
var configArray []string
var configPath bool
var client string
// Flags for engine.UpdateOptions.
var jsonDisplay bool
@ -106,7 +107,7 @@ func newPreviewCmd() *cobra.Command {
return result.FromError(err)
}
proj, root, err := readProject()
proj, root, err := readProjectForUpdate(client)
if err != nil {
return result.FromError(err)
}
@ -195,6 +196,9 @@ func newPreviewCmd() *cobra.Command {
&configPath, "config-path", false,
"Config keys contain a path to a property in a map or list to set")
cmd.PersistentFlags().StringVar(
&client, "client", "", "The address of an existing language runtime host to connect to")
cmd.PersistentFlags().StringVarP(
&message, "message", "m", "",
"Optional message to associate with the preview operation")

View file

@ -211,7 +211,6 @@ func NewPulumiCmd() *cobra.Command {
// Less common, and thus hidden, commands:
cmd.AddCommand(newGenCompletionCmd(cmd))
cmd.AddCommand(newGenMarkdownCmd(cmd))
cmd.AddCommand(newHostCmd())
// We have a set of commands that are still experimental and that we add only when PULUMI_EXPERIMENTAL is set
// to true.

View file

@ -51,6 +51,7 @@ func newUpCmd() *cobra.Command {
var stack string
var configArray []string
var path bool
var client string
// Flags for engine.UpdateOptions.
var policyPackPaths []string
@ -85,7 +86,7 @@ func newUpCmd() *cobra.Command {
return result.FromError(err)
}
proj, root, err := readProject()
proj, root, err := readProjectForUpdate(client)
if err != nil {
return result.FromError(err)
}
@ -399,6 +400,9 @@ func newUpCmd() *cobra.Command {
"decrypt secrets (possible choices: default, passphrase, awskms, azurekeyvault, gcpkms, hashivault). Only"+
"used when creating a new stack from an existing template")
cmd.PersistentFlags().StringVar(
&client, "client", "", "The address of an existing language runtime host to connect to")
cmd.PersistentFlags().StringVarP(
&message, "message", "m", "",
"Optional message to associate with the update operation")

View file

@ -399,6 +399,24 @@ func parseAndSaveConfigArray(s backend.Stack, configArray []string, path bool) e
return nil
}
// readProjectForUpdate attempts to detect and read a Pulumi project for the current workspace. If
// the project is successfully detected and read, it is returned along with the path to its
// containing directory, which will be used as the root of the project's Pulumi program. If a
// client address is present, the returned project will always have the runtime set to "client"
// with the address option set to the client address.
func readProjectForUpdate(clientAddress string) (*workspace.Project, string, error) {
proj, root, err := readProject()
if err != nil {
return nil, "", err
}
if clientAddress != "" {
proj.Runtime = workspace.NewProjectRuntimeInfo("client", map[string]interface{}{
"address": clientAddress,
})
}
return proj, root, nil
}
// readProject attempts to detect and read a Pulumi project for the current workspace. If the
// project is successfully detected and read, it is returned along with the path to its containing
// directory, which will be used as the root of the project's Pulumi program.

View file

@ -25,9 +25,12 @@ import (
"testing"
"github.com/blang/semver"
pbempty "github.com/golang/protobuf/ptypes/empty"
"github.com/mitchellh/copystructure"
combinations "github.com/mxschmitt/golang-combinations"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/pulumi/pulumi/pkg/v2/resource/deploy"
@ -44,11 +47,11 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/result"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
combinations "github.com/mxschmitt/golang-combinations"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
type JournalEntryKind int
@ -347,14 +350,15 @@ type TestStep struct {
}
type TestPlan struct {
Project string
Stack string
Runtime string
Config config.Map
Decrypter config.Decrypter
BackendClient deploy.BackendClient
Options UpdateOptions
Steps []TestStep
Project string
Stack string
Runtime string
RuntimeOptions map[string]interface{}
Config config.Map
Decrypter config.Decrypter
BackendClient deploy.BackendClient
Options UpdateOptions
Steps []TestStep
}
//nolint: goconst
@ -392,7 +396,7 @@ func (p *TestPlan) GetProject() workspace.Project {
return workspace.Project{
Name: projectName,
Runtime: workspace.NewProjectRuntimeInfo(runtime, nil),
Runtime: workspace.NewProjectRuntimeInfo(runtime, p.RuntimeOptions),
}
}
@ -5990,3 +5994,112 @@ func TestSingleComponentDefaultProviderLifecycle(t *testing.T) {
}
p.Run(t, nil)
}
type updateContext struct {
*deploytest.ResourceMonitor
resmon chan *deploytest.ResourceMonitor
programErr chan error
snap chan *deploy.Snapshot
updateResult chan result.Result
}
func startUpdate(host plugin.Host) (*updateContext, error) {
ctx := &updateContext{
resmon: make(chan *deploytest.ResourceMonitor),
programErr: make(chan error),
snap: make(chan *deploy.Snapshot),
updateResult: make(chan result.Result),
}
port, _, err := rpcutil.Serve(0, nil, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterLanguageRuntimeServer(srv, ctx)
return nil
},
}, nil)
if err != nil {
return nil, err
}
p := &TestPlan{
Options: UpdateOptions{host: host},
Runtime: "client",
RuntimeOptions: map[string]interface{}{
"address": fmt.Sprintf("127.0.0.1:%d", port),
},
}
go func() {
snap, res := TestOp(Update).Run(p.GetProject(), p.GetTarget(nil), p.Options, false, p.BackendClient, nil)
ctx.snap <- snap
close(ctx.snap)
ctx.updateResult <- res
close(ctx.updateResult)
}()
ctx.ResourceMonitor = <-ctx.resmon
return ctx, nil
}
func (ctx *updateContext) Finish(err error) (*deploy.Snapshot, result.Result) {
ctx.programErr <- err
close(ctx.programErr)
return <-ctx.snap, <-ctx.updateResult
}
func (ctx *updateContext) GetRequiredPlugins(_ context.Context,
req *pulumirpc.GetRequiredPluginsRequest) (*pulumirpc.GetRequiredPluginsResponse, error) {
return &pulumirpc.GetRequiredPluginsResponse{}, nil
}
func (ctx *updateContext) Run(_ context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
// Connect to the resource monitor and create an appropriate client.
conn, err := grpc.Dial(
req.MonitorAddress,
grpc.WithInsecure(),
rpcutil.GrpcChannelOptions(),
)
if err != nil {
return nil, errors.Wrapf(err, "could not connect to resource monitor")
}
defer contract.IgnoreClose(conn)
// Fire up a resource monitor client
ctx.resmon <- deploytest.NewResourceMonitor(pulumirpc.NewResourceMonitorClient(conn))
close(ctx.resmon)
// Wait for the program to terminate.
if err := <-ctx.programErr; err != nil {
return &pulumirpc.RunResponse{Error: err.Error()}, nil
}
return &pulumirpc.RunResponse{}, nil
}
func (ctx *updateContext) GetPluginInfo(_ context.Context, req *pbempty.Empty) (*pulumirpc.PluginInfo, error) {
return &pulumirpc.PluginInfo{
Version: "1.0.0",
}, nil
}
func TestLanguageClient(t *testing.T) {
loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{}, nil
}),
}
update, err := startUpdate(deploytest.NewPluginHost(nil, nil, nil, loaders...))
if err != nil {
t.Fatalf("failed to start update: %v", err)
}
// Register resources, etc.
_, _, _, err = update.RegisterResource("pkgA:m:typA", "resA", true)
assert.NoError(t, err)
snap, res := update.Finish(nil)
assert.Nil(t, res)
assert.Len(t, snap.Resources, 2)
}

View file

@ -16,11 +16,10 @@ package engine
import (
"context"
"fmt"
"os"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/v2/resource/deploy"
"github.com/pulumi/pulumi/pkg/v2/resource/deploy/providers"
"github.com/pulumi/pulumi/sdk/v2/go/common/diag"
@ -50,6 +49,23 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host, config plugin.Conf
return "", "", nil, err
}
// If the project wants to connect to an existing language runtime, do so now.
if projinfo.Proj.Runtime.Name() == "client" {
addressValue, ok := projinfo.Proj.Runtime.Options()["address"]
if !ok {
return "", "", nil, errors.New("missing address of language runtime service")
}
address, ok := addressValue.(string)
if !ok {
return "", "", nil, errors.New("address of language runtime service must be a string")
}
host, err := connectToLanguageRuntime(ctx, address)
if err != nil {
return "", "", nil, err
}
ctx.Host = host
}
return pwd, main, ctx, nil
}
@ -127,10 +143,6 @@ func plan(ctx *Context, info *planContext, opts planOptions, dryRun bool) (*plan
return nil, err
}
if opts.UpdateOptions.IsHostCommand {
fmt.Fprintf(os.Stderr, "engine: %s\n", plugctx.Host.ServerAddr())
}
opts.trustDependencies = proj.TrustResourceDependencies()
// Now create the state source. This may issue an error if it can't create the source. This entails,
// for example, loading any plugins which will be required to execute a program, among other things.

49
pkg/engine/plugin_host.go Normal file
View file

@ -0,0 +1,49 @@
// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
type clientLanguageRuntimeHost struct {
plugin.Host
languageRuntime plugin.LanguageRuntime
}
func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) {
// Dial the language runtime.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()), rpcutil.GrpcChannelOptions())
if err != nil {
return nil, errors.Wrap(err, "could not connect to language host")
}
client := pulumirpc.NewLanguageRuntimeClient(conn)
return &clientLanguageRuntimeHost{
Host: ctx.Host,
languageRuntime: plugin.NewLanguageRuntimeClient(ctx, "client", client),
}, nil
}
func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

View file

@ -75,11 +75,16 @@ func newPluginSet() pluginSet {
func gatherPluginsFromProgram(plugctx *plugin.Context, prog plugin.ProgInfo) (pluginSet, error) {
logging.V(preparePluginLog).Infof("gatherPluginsFromProgram(): gathering plugins from language host")
set := newPluginSet()
langhostPlugins, err := plugctx.Host.GetRequiredPlugins(prog, plugin.AllPlugins)
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, prog, plugin.AllPlugins)
if err != nil {
return set, err
}
for _, plug := range langhostPlugins {
// Ignore language plugins named "client".
if plug.Name == "client" && plug.Kind == workspace.LanguagePlugin {
continue
}
logging.V(preparePluginLog).Infof(
"gatherPluginsFromProgram(): plugin %s %s (%s) is required by language host",
plug.Name, plug.Version, plug.ServerURL)

View file

@ -132,9 +132,6 @@ type UpdateOptions struct {
// true if the engine should use legacy diffing behavior during an update.
UseLegacyDiff bool
// true if we're executing in the context of `pulumi host` command.
IsHostCommand bool
// true if we should report events for steps that involve default providers.
reportDefaultProviderSteps bool
@ -389,12 +386,19 @@ func newUpdateSource(
return nil, err
}
// If we are connecting to an existing client, stash the address of the engine in its arguments.
var args []string
if proj.Runtime.Name() == "client" {
args = []string{plugctx.Host.ServerAddr()}
}
// If that succeeded, create a new source that will perform interpretation of the compiled program.
// TODO[pulumi/pulumi#88]: we are passing `nil` as the arguments map; we need to allow a way to pass these.
return deploy.NewEvalSource(plugctx, &deploy.EvalRunInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Args: args,
Target: target,
}, defaultProviderVersions, dryRun), nil
}

View file

@ -54,6 +54,10 @@ func (rm *ResourceMonitor) Close() error {
return rm.conn.Close()
}
func NewResourceMonitor(resmon pulumirpc.ResourceMonitorClient) *ResourceMonitor {
return &ResourceMonitor{resmon: resmon}
}
type ResourceOptions struct {
Parent resource.URN
Protect bool

View file

@ -73,8 +73,6 @@ type Host interface {
// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
// and/or there are errors loading one or more plugins, a non-nil error is returned.
EnsurePlugins(plugins []workspace.PluginInfo, kinds Flags) error
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error)
// SignalCancellation asks all resource providers to gracefully shut down and abort any ongoing
// operations. Operation aborted in this way will return an error (e.g., `Update` and `Create`
@ -371,42 +369,6 @@ func (host *defaultHost) EnsurePlugins(plugins []workspace.PluginInfo, kinds Fla
return result
}
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func (host *defaultHost) GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error) {
var plugins []workspace.PluginInfo
if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
}
plugins = append(plugins, workspace.PluginInfo{
Name: info.Proj.Runtime.Name(),
Kind: workspace.LanguagePlugin,
})
if kinds&ResourcePlugins != 0 {
// Use the language plugin to compute this project's set of plugin dependencies.
// TODO: we want to support loading precisely what the project needs, rather than doing a static scan of resolved
// packages. Doing this requires that we change our RPC interface and figure out how to configure plugins
// later than we do (right now, we do it up front, but at that point we don't know the version).
deps, err := lang.GetRequiredPlugins(info)
if err != nil {
return nil, errors.Wrapf(err, "failed to discover plugin requirements")
}
plugins = append(plugins, deps...)
}
} else {
// If we can't load the language plugin, we can't discover the resource plugins.
contract.Assertf(kinds&ResourcePlugins != 0,
"cannot load resource plugins without also loading the language plugin")
}
return plugins, nil
}
func (host *defaultHost) SignalCancellation() error {
// NOTE: we're abusing loadPlugin in order to ensure proper synchronization.
_, err := host.loadPlugin(func() (interface{}, error) {
@ -478,3 +440,39 @@ const (
// AllPlugins uses flags to ensure that all plugin kinds are loaded.
var AllPlugins = AnalyzerPlugins | LanguagePlugins | ResourcePlugins
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func GetRequiredPlugins(host Host, info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error) {
var plugins []workspace.PluginInfo
if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
}
plugins = append(plugins, workspace.PluginInfo{
Name: info.Proj.Runtime.Name(),
Kind: workspace.LanguagePlugin,
})
if kinds&ResourcePlugins != 0 {
// Use the language plugin to compute this project's set of plugin dependencies.
// TODO: we want to support loading precisely what the project needs, rather than doing a static scan of resolved
// packages. Doing this requires that we change our RPC interface and figure out how to configure plugins
// later than we do (right now, we do it up front, but at that point we don't know the version).
deps, err := lang.GetRequiredPlugins(info)
if err != nil {
return nil, errors.Wrapf(err, "failed to discover plugin requirements")
}
plugins = append(plugins, deps...)
}
} else {
// If we can't load the language plugin, we can't discover the resource plugins.
contract.Assertf(kinds&ResourcePlugins != 0,
"cannot load resource plugins without also loading the language plugin")
}
return plugins, nil
}

View file

@ -16,10 +16,7 @@ package plugin
import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/blang/semver"
pbempty "github.com/golang/protobuf/ptypes/empty"
@ -30,17 +27,10 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v2/go/common/version"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
// HostOnlyRuntimeMode can be used to launch the engine in "host"-only mode. That is to say, the engine and
// resource monitors will be spawned, but no language runtime will be created. This allows automated
// scenarios that drive the engine from outside of the engine itself, such as, for example, a Node.js
// program spawning, deploying, and tearing down the engine's resources.
const HostOnlyRuntimeMode = "host"
// langhost reflects a language host plugin, loaded dynamically for a single language/runtime pair.
type langhost struct {
ctx *Context
@ -53,12 +43,7 @@ type langhost struct {
// plugin could not be found, or an error occurs while creating the child process, an error is returned.
func NewLanguageRuntime(host Host, ctx *Context, runtime string,
options map[string]interface{}) (LanguageRuntime, error) {
// If this is host-only mode, exit without spawning anything.
if runtime == HostOnlyRuntimeMode {
return &langhost{ctx: ctx, runtime: runtime}, nil
}
// For all other languages, load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.LanguagePlugin, strings.Replace(runtime, tokens.QNameDelimiter, "_", -1), nil)
if err != nil {
@ -90,19 +75,21 @@ func NewLanguageRuntime(host Host, ctx *Context, runtime string,
}, nil
}
func (h *langhost) Runtime() string { return h.runtime }
func (h *langhost) IsHostOnlyMode() bool { return h.runtime == HostOnlyRuntimeMode }
func NewLanguageRuntimeClient(ctx *Context, runtime string, client pulumirpc.LanguageRuntimeClient) LanguageRuntime {
return &langhost{
ctx: ctx,
runtime: runtime,
client: client,
}
}
func (h *langhost) Runtime() string { return h.runtime }
// GetRequiredPlugins computes the complete set of anticipated plugins required by a program.
func (h *langhost) GetRequiredPlugins(info ProgInfo) ([]workspace.PluginInfo, error) {
proj := string(info.Proj.Name)
logging.V(7).Infof("langhost[%v].GetRequiredPlugins(proj=%s,pwd=%s,program=%s) executing",
h.runtime, proj, info.Pwd, info.Program)
if h.IsHostOnlyMode() {
logging.V(7).Infof("langhost[%v].GetRequiredPlugins(proj=%s,pwd=%s,program=%s) in host mode, exiting",
h.runtime, proj, info.Pwd, info.Program)
return nil, nil
}
resp, err := h.client.GetRequiredPlugins(h.ctx.Request(), &pulumirpc.GetRequiredPluginsRequest{
Project: proj,
Pwd: info.Pwd,
@ -156,14 +143,6 @@ func (h *langhost) GetRequiredPlugins(info ProgInfo) ([]workspace.PluginInfo, er
func (h *langhost) Run(info RunInfo) (string, bool, error) {
logging.V(7).Infof("langhost[%v].Run(pwd=%v,program=%v,#args=%v,proj=%s,stack=%v,#config=%v,dryrun=%v) executing",
h.runtime, info.Pwd, info.Program, len(info.Args), info.Project, info.Stack, len(info.Config), info.DryRun)
// In host mode, we simply print the engine and monitor address to stderr and wait for a signal.
if h.IsHostOnlyMode() {
fmt.Fprintf(os.Stderr, "resmon: %s\n", info.MonitorAddress)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
return "", false, nil
}
config := make(map[string]string)
for k, v := range info.Config {
config[k.String()] = v
@ -203,20 +182,15 @@ func (h *langhost) GetPluginInfo() (workspace.PluginInfo, error) {
Kind: workspace.LanguagePlugin,
}
var vers string
if h.IsHostOnlyMode() {
vers = version.Version
} else {
plugInfo.Path = h.plug.Bin
plugInfo.Path = h.plug.Bin
resp, err := h.client.GetPluginInfo(h.ctx.Request(), &pbempty.Empty{})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("langhost[%v].GetPluginInfo() failed: err=%v", h.runtime, rpcError)
return workspace.PluginInfo{}, rpcError
}
vers = resp.Version
resp, err := h.client.GetPluginInfo(h.ctx.Request(), &pbempty.Empty{})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("langhost[%v].GetPluginInfo() failed: err=%v", h.runtime, rpcError)
return workspace.PluginInfo{}, rpcError
}
vers := resp.Version
if vers != "" {
sv, err := semver.ParseTolerant(vers)

View file

@ -101,6 +101,14 @@ conflicts:
if err != nil && IsConcurrentUpdateError(err) { /* retry logic here */ }
```
## Developing the Godocs
This repo has extensive examples and godoc content. To test out your changes locally you can do the following:
1. enlist in the appropriate pulumi branch:
2. cd $GOPATH/src/github.com/pulumi/pulumi/sdk/go/x/auto
3. godoc -http=:6060
4. Navigate to http://localhost:6060/pkg/github.com/pulumi/pulumi/sdk/v2/go/x/auto/
## Known Issues
The Automation API is currently in Alpha and has several known issues. Please upvote issues,

View file

@ -17,6 +17,7 @@ package auto
import (
"bytes"
"context"
"io"
"os"
"os/exec"
)
@ -26,6 +27,7 @@ const unknownErrorCode = -2
func runPulumiCommandSync(
ctx context.Context,
workdir string,
additionalOutput []io.Writer,
additionalEnv []string,
args ...string,
) (string, string, int, error) {
@ -35,10 +37,13 @@ func runPulumiCommandSync(
cmd := exec.CommandContext(ctx, "pulumi", args...)
cmd.Dir = workdir
cmd.Env = append(os.Environ(), additionalEnv...)
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
additionalOutput = append(additionalOutput, &stdout)
cmd.Stdout = io.MultiWriter(additionalOutput...)
cmd.Stderr = &stderr
code := unknownErrorCode
err := cmd.Run()
if exitError, ok := err.(*exec.ExitError); ok {

View file

@ -337,7 +337,9 @@ func TestRuntimeErrorPython(t *testing.T) {
_, err = s.Up(ctx)
assert.NotNil(t, err)
assert.True(t, IsRuntimeError(err))
if !assert.True(t, IsRuntimeError(err)) {
t.Logf("%v is not a runtime error", err)
}
// -- pulumi destroy --

View file

@ -18,6 +18,9 @@ package auto
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
@ -734,6 +737,20 @@ func ExampleStack_Destroy() {
stack.Destroy(ctx, optdestroy.Message("a message to save with the destroy operation"))
}
func ExampleStack_Destroy_streamingProgress() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
// select an existing stack to destroy
stack, _ := SelectStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optdestroy.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this destroy will incrementally stream unstructured progress messages to stdout and our temp file
stack.Destroy(ctx, optdestroy.ProgressStreams(progressStreams...))
}
func ExampleStack_Up() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
@ -742,6 +759,20 @@ func ExampleStack_Up() {
stack.Up(ctx, optup.Message("a message to save with the up operation"), optup.Parallel(10000))
}
func ExampleStack_Up_streamingProgress() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
// create a new stack to update
stack, _ := NewStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optup.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this update will incrementally stream unstructured progress messages to stdout and our temp file
stack.Up(ctx, optup.ProgressStreams(progressStreams...))
}
func ExampleStack_Preview() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
@ -758,6 +789,20 @@ func ExampleStack_Refresh() {
stack.Refresh(ctx, optrefresh.Message("a message to save with the refresh operation"))
}
func ExampleStack_Refresh_streamingProgress() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")
// select an existing stack and refresh the resources under management
stack, _ := SelectStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
// create a temp file that we can tail during while our program runs
tmp, _ := ioutil.TempFile(os.TempDir(), "")
// optrefresh.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
// this gives us incremental status over time
progressStreams := []io.Writer{os.Stdout, tmp}
// this refresh will incrementally stream unstructured progress messages to stdout and our temp file
stack.Refresh(ctx, optrefresh.ProgressStreams(progressStreams...))
}
func ExampleStack_GetAllConfig() {
ctx := context.Background()
fqsn := FullyQualifiedStackName("org", "project", "stack")

View file

@ -458,7 +458,7 @@ func (l *LocalWorkspace) runPulumiCmdSync(
env = append(env, strings.Join(e, "="))
}
}
return runPulumiCommandSync(ctx, l.WorkDir(), env, args...)
return runPulumiCommandSync(ctx, l.WorkDir(), nil /* additionalOutputs */, env, args...)
}
// NewLocalWorkspace creates and configures a LocalWorkspace. LocalWorkspaceOptions can be used to

View file

@ -15,6 +15,7 @@
package auto
import (
"bytes"
"context"
"fmt"
"math/rand"
@ -27,6 +28,9 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi/config"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optdestroy"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optrefresh"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optup"
"github.com/stretchr/testify/assert"
)
@ -474,3 +478,74 @@ func TestNestedStackFails(t *testing.T) {
assert.Equal(t, "destroy", dRes.Summary.Kind)
assert.Equal(t, "succeeded", dRes.Summary.Result)
}
func TestProgressStreams(t *testing.T) {
ctx := context.Background()
pName := "inline_progress_streams"
sName := fmt.Sprintf("int_test%d", rangeIn(10000000, 99999999))
fqsn := FullyQualifiedStackName(pulumiOrg, pName, sName)
cfg := ConfigMap{
"bar": ConfigValue{
Value: "abc",
},
"buzz": ConfigValue{
Value: "secret",
Secret: true,
},
}
// initialize
s, err := NewStackInlineSource(ctx, fqsn, func(ctx *pulumi.Context) error {
c := config.New(ctx, "")
ctx.Export("exp_static", pulumi.String("foo"))
ctx.Export("exp_cfg", pulumi.String(c.Get("bar")))
ctx.Export("exp_secret", c.GetSecret("buzz"))
return nil
})
if err != nil {
t.Errorf("failed to initialize stack, err: %v", err)
t.FailNow()
}
defer func() {
// -- pulumi stack rm --
err = s.Workspace().RemoveStack(ctx, s.Name())
assert.Nil(t, err, "failed to remove stack. Resources have leaked.")
}()
err = s.SetAllConfig(ctx, cfg)
if err != nil {
t.Errorf("failed to set config, err: %v", err)
t.FailNow()
}
// -- pulumi up --
var upOut bytes.Buffer
res, err := s.Up(ctx, optup.ProgressStreams(&upOut))
if err != nil {
t.Errorf("up failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, upOut.String(), res.StdOut, "expected stdout writers to contain same contents")
// -- pulumi refresh --
var refOut bytes.Buffer
ref, err := s.Refresh(ctx, optrefresh.ProgressStreams(&refOut))
if err != nil {
t.Errorf("refresh failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, refOut.String(), ref.StdOut, "expected stdout writers to contain same contents")
// -- pulumi destroy --
var desOut bytes.Buffer
dRes, err := s.Destroy(ctx, optdestroy.ProgressStreams(&desOut))
if err != nil {
t.Errorf("destroy failed, err: %v", err)
t.FailNow()
}
assert.Equal(t, desOut.String(), dRes.StdOut, "expected stdout writers to contain same contents")
}

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Destroy(...optdestroy.Option)
package optdestroy
import "io"
// Parallel is the number of resource operations to run in parallel at once during the destroy
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -45,6 +47,13 @@ func TargetDependents() Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Destroy() operation
type Option interface {
ApplyOption(*Options)
@ -63,6 +72,8 @@ type Options struct {
Target []string
// Allows updating of dependent targets discovered but not specified in the Target list
TargetDependents bool
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Refresh(...optrefresh.Option)
package optrefresh
import "io"
// Parallel is the number of resource operations to run in parallel at once during the refresh
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -45,6 +47,13 @@ func Target(urns []string) Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Refresh() operation
type Option interface {
ApplyOption(*Options)
@ -63,6 +72,8 @@ type Options struct {
ExpectNoChanges bool
// Specify an exclusive list of resource URNs to re
Target []string
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -16,6 +16,8 @@
// github.com/sdk/v2/go/x/auto Stack.Up(...optup.Option)
package optup
import "io"
// Parallel is the number of resource operations to run in parallel at once during the update
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
func Parallel(n int) Option {
@ -59,6 +61,13 @@ func TargetDependents() Option {
})
}
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
func ProgressStreams(writers ...io.Writer) Option {
return optionFunc(func(opts *Options) {
opts.ProgressStreams = writers
})
}
// Option is a parameter to be applied to a Stack.Up() operation
type Option interface {
ApplyOption(*Options)
@ -81,6 +90,8 @@ type Options struct {
Target []string
// Allows updating of dependent targets discovered but not specified in the Target list
TargetDependents bool
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
ProgressStreams []io.Writer
}
type optionFunc func(*Options)

View file

@ -88,25 +88,29 @@
package auto
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"runtime/debug"
"io"
"runtime"
"strings"
"sync"
pbempty "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/pulumi/pulumi/sdk/v2/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v2/go/common/constant"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optdestroy"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optpreview"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optrefresh"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optup"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
// Stack is an isolated, independently configurable instance of a Pulumi program.
@ -202,23 +206,26 @@ func (s *Stack) Preview(ctx context.Context, opts ...optpreview.Option) (Preview
if preOpts.TargetDependents {
sharedArgs = append(sharedArgs, "--target-dependents")
}
if preOpts.Parallel > 0 {
sharedArgs = append(sharedArgs, fmt.Sprintf("--parallel=%d", preOpts.Parallel))
}
var stdout, stderr string
var code int
if s.Workspace().Program() != nil {
hostArgs := []string{"preview", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoInline)}
hostArgs = append(hostArgs, sharedArgs...)
stdout, stderr, err = s.host(ctx, hostArgs, preOpts.Parallel)
kind, args := constant.ExecKindAutoLocal, []string{"preview", "--json"}
if program := s.Workspace().Program(); program != nil {
server, err := startLanguageRuntimeServer(program)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
}
} else {
args := []string{"preview", "--json", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoLocal)}
args = append(args, sharedArgs...)
stdout, stderr, code, err = s.runPulumiCmdSync(ctx, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
return res, err
}
defer contract.IgnoreClose(server)
kind, args = constant.ExecKindAutoInline, append(args, "--client="+server.address)
}
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, nil /* additionalOutput */, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
}
err = json.Unmarshal([]byte(stdout), &res)
@ -259,30 +266,26 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
if upOpts.TargetDependents {
sharedArgs = append(sharedArgs, "--target-dependents")
}
if upOpts.Parallel > 0 {
sharedArgs = append(sharedArgs, fmt.Sprintf("--parallel=%d", upOpts.Parallel))
}
var stdout, stderr string
var code int
if s.Workspace().Program() != nil {
// TODO need to figure out how to get error code...
stdout, stderr, err = s.host(
ctx,
append(sharedArgs, fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoInline)),
upOpts.Parallel,
)
kind, args := constant.ExecKindAutoLocal, []string{"up", "--yes", "--skip-preview"}
if program := s.Workspace().Program(); program != nil {
server, err := startLanguageRuntimeServer(program)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
} else {
args := []string{"up", "--yes", "--skip-preview", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoLocal)}
args = append(args, sharedArgs...)
if upOpts.Parallel > 0 {
args = append(args, fmt.Sprintf("--parallel=%d", upOpts.Parallel))
return res, err
}
defer contract.IgnoreClose(server)
stdout, stderr, code, err = s.runPulumiCmdSync(ctx, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
kind, args = constant.ExecKindAutoInline, append(args, "--client="+server.address)
}
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, upOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
outs, err := s.Outputs(ctx)
@ -342,7 +345,7 @@ func (s *Stack) Refresh(ctx context.Context, opts ...optrefresh.Option) (Refresh
}
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, refreshOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to refresh stack"), stdout, stderr, code)
}
@ -399,7 +402,7 @@ func (s *Stack) Destroy(ctx context.Context, opts ...optdestroy.Option) (Destroy
}
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, destroyOpts.ProgressStreams, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to destroy stack"), stdout, stderr, code)
}
@ -431,13 +434,17 @@ func (s *Stack) Outputs(ctx context.Context) (OutputMap, error) {
}
// standard outputs
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json")
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"stack", "output", "--json",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "could not get outputs"), outStdout, outStderr, code)
}
// secret outputs
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json", "--show-secrets")
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"stack", "output", "--json", "--show-secrets",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "could not get secret outputs"), outStdout, outStderr, code)
}
@ -473,7 +480,9 @@ func (s *Stack) History(ctx context.Context) ([]UpdateSummary, error) {
return nil, errors.Wrap(err, "failed to get stack history")
}
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, "history", "--json", "--show-secrets")
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
"history", "--json", "--show-secrets",
)
if err != nil {
return nil, newAutoError(errors.Wrap(err, "failed to get stack history"), stdout, stderr, errCode)
}
@ -628,7 +637,11 @@ type DestroyResult struct {
// secretSentinel represents the CLI response for an output marked as "secret"
const secretSentinel = "[secret]"
func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, string, int, error) {
func (s *Stack) runPulumiCmdSync(
ctx context.Context,
additionalOutput []io.Writer,
args ...string,
) (string, string, int, error) {
var env []string
if s.Workspace().PulumiHome() != "" {
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
@ -645,7 +658,7 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
return "", "", -1, errors.Wrap(err, "failed to exec command, error getting additional args")
}
args = append(args, additionalArgs...)
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), env, args...)
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), additionalOutput, env, args...)
if err != nil {
return stdout, stderr, errCode, err
}
@ -656,143 +669,149 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
return stdout, stderr, errCode, nil
}
func (s *Stack) host(ctx context.Context, additionalArgs []string, parallel int) (string, string, error) {
proj, err := s.Workspace().ProjectSettings(ctx)
if err != nil {
return "", "", errors.Wrap(err, "could not start run program, failed to start host")
}
const (
stateWaiting = iota
stateRunning
stateCanceled
stateFinished
)
var stdout bytes.Buffer
var errBuff bytes.Buffer
args := []string{"host"}
args = append(args, additionalArgs...)
workspaceArgs, err := s.Workspace().SerializeArgsForOp(ctx, s.Name())
if err != nil {
return "", "", errors.Wrap(err, "failed to exec command, error getting additional args")
}
args = append(args, workspaceArgs...)
cmd := exec.CommandContext(ctx, "pulumi", args...)
cmd.Dir = s.Workspace().WorkDir()
if s.Workspace().PulumiHome() != "" {
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
cmd.Env = append(os.Environ(), homeEnv)
}
type languageRuntimeServer struct {
m sync.Mutex
c *sync.Cond
cmd.Stdout = &stdout
stderr, _ := cmd.StderrPipe()
err = cmd.Start()
if err != nil {
return "", "", errors.Wrap(err, "failed to start host command")
}
scanner := bufio.NewScanner(stderr)
scanner.Split(bufio.ScanLines)
fn pulumi.RunFunc
address string
resMonAddrChan := make(chan string)
engineAddrChan := make(chan string)
failChan := make(chan bool)
go func() {
numAddrs := 0
for scanner.Scan() {
m := scanner.Text()
errBuff.WriteString(m)
if strings.HasPrefix(m, "resmon: ") {
numAddrs++
// resmon: 127.0.0.1:23423
resMonAddrChan <- strings.Split(m, " ")[1]
}
if strings.HasPrefix(m, "engine: ") {
numAddrs++
// engine: 127.0.0.1:23423
engineAddrChan <- strings.Split(m, " ")[1]
}
}
if numAddrs < 2 {
failChan <- true
}
}()
var monitorAddr string
var engineAddr string
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
return stdout.String(), errBuff.String(), ctx.Err()
case <-failChan:
return stdout.String(), errBuff.String(), errors.New("failed to launch host")
case monitorAddr = <-resMonAddrChan:
case engineAddr = <-engineAddrChan:
}
}
cfg, err := s.GetAllConfig(ctx)
if err != nil {
return stdout.String(), errBuff.String(), errors.Wrap(err, "failed to serialize config for inline program")
}
cfgMap := make(map[string]string)
for k, v := range cfg {
cfgMap[k] = v.Value
}
runInfo := pulumi.RunInfo{
EngineAddr: engineAddr,
MonitorAddr: monitorAddr,
Config: cfgMap,
Project: proj.Name.String(),
Stack: s.Name(),
}
if parallel > 0 {
runInfo.Parallel = parallel
}
err = execUserCode(ctx, s.Workspace().Program(), runInfo)
if err != nil {
interruptErr := cmd.Process.Signal(os.Interrupt)
if interruptErr != nil {
return stdout.String(), errBuff.String(),
errors.Wrap(err, "failed to run inline program and shutdown gracefully, could not kill host")
}
waitErr := cmd.Wait()
if waitErr != nil {
return stdout.String(), errBuff.String(),
errors.Wrap(err, "failed to run inline program and shutdown gracefully")
}
return stdout.String(), errBuff.String(), errors.Wrap(err, "error running inline pulumi program")
}
err = cmd.Process.Signal(os.Interrupt)
if err != nil {
return stdout.String(), errBuff.String(), errors.Wrap(err, "failed to shutdown host gracefully")
}
err = cmd.Wait()
if err != nil {
return stdout.String(), errBuff.String(), err
}
err = s.Workspace().PostCommandCallback(ctx, s.Name())
if err != nil {
return stdout.String(), errBuff.String(),
errors.Wrap(err, "command ran successfully, but error running PostCommandCallback")
}
return stdout.String(), errBuff.String(), nil
state int
cancel chan bool
done chan error
}
func execUserCode(ctx context.Context, fn pulumi.RunFunc, info pulumi.RunInfo) (err error) {
// isNestedInvocation returns true if pulumi.RunWithContext is on the stack.
func isNestedInvocation() bool {
depth, callers := 0, make([]uintptr, 32)
for {
n := runtime.Callers(depth, callers)
if n == 0 {
return false
}
depth += n
frames := runtime.CallersFrames(callers)
for f, more := frames.Next(); more; f, more = frames.Next() {
if f.Function == "github.com/pulumi/pulumi/sdk/v2/go/pulumi.RunWithContext" {
return true
}
}
}
}
func startLanguageRuntimeServer(fn pulumi.RunFunc) (*languageRuntimeServer, error) {
if isNestedInvocation() {
return nil, errors.New("nested stack operations are not supported https://github.com/pulumi/pulumi/issues/5058")
}
s := &languageRuntimeServer{
fn: fn,
cancel: make(chan bool),
}
s.c = sync.NewCond(&s.m)
port, done, err := rpcutil.Serve(0, s.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterLanguageRuntimeServer(srv, s)
return nil
},
}, nil)
if err != nil {
return nil, err
}
s.address, s.done = fmt.Sprintf("127.0.0.1:%d", port), done
return s, nil
}
func (s *languageRuntimeServer) Close() error {
s.m.Lock()
switch s.state {
case stateCanceled:
s.m.Unlock()
return nil
case stateWaiting:
// Not started yet; go ahead and cancel
default:
for s.state != stateFinished {
s.c.Wait()
}
}
s.state = stateCanceled
s.m.Unlock()
s.cancel <- true
close(s.cancel)
return <-s.done
}
func (s *languageRuntimeServer) GetRequiredPlugins(ctx context.Context,
req *pulumirpc.GetRequiredPluginsRequest) (*pulumirpc.GetRequiredPluginsResponse, error) {
return &pulumirpc.GetRequiredPluginsResponse{}, nil
}
func (s *languageRuntimeServer) Run(ctx context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
s.m.Lock()
if s.state == stateCanceled {
s.m.Unlock()
return nil, errors.Errorf("program canceled")
}
s.state = stateRunning
s.m.Unlock()
defer func() {
if r := recover(); r != nil {
if pErr, ok := r.(error); ok {
err = errors.Wrap(pErr, "go inline source runtime error, an unhandled error occurred:")
} else {
err = errors.New("go inline source runtime error, an unhandled error occurred: unknown error")
}
}
s.m.Lock()
s.state = stateFinished
s.m.Unlock()
s.c.Broadcast()
}()
stack := string(debug.Stack())
if strings.Contains(stack, "github.com/pulumi/pulumi/sdk/go/pulumi/run.go") {
return errors.New("nested stack operations are not supported https://github.com/pulumi/pulumi/issues/5058")
var engineAddress string
if len(req.Args) > 0 {
engineAddress = req.Args[0]
}
pulumiCtx, err := pulumi.NewContext(ctx, info)
runInfo := pulumi.RunInfo{
EngineAddr: engineAddress,
MonitorAddr: req.GetMonitorAddress(),
Config: req.GetConfig(),
Project: req.GetProject(),
Stack: req.GetStack(),
Parallel: int(req.GetParallel()),
}
pulumiCtx, err := pulumi.NewContext(ctx, runInfo)
if err != nil {
return err
return nil, err
}
return pulumi.RunWithContext(pulumiCtx, fn)
err = func() (err error) {
defer func() {
if r := recover(); r != nil {
if pErr, ok := r.(error); ok {
err = errors.Wrap(pErr, "go inline source runtime error, an unhandled error occurred:")
} else {
err = errors.New("go inline source runtime error, an unhandled error occurred: unknown error")
}
}
}()
return pulumi.RunWithContext(pulumiCtx, s.fn)
}()
if err != nil {
return &pulumirpc.RunResponse{Error: err.Error()}, nil
}
return &pulumirpc.RunResponse{}, nil
}
func (s *languageRuntimeServer) GetPluginInfo(ctx context.Context, req *pbempty.Empty) (*pulumirpc.PluginInfo, error) {
return &pulumirpc.PluginInfo{
Version: "1.0.0",
}, nil
}