Compare commits
3 commits
master
...
evan/autoS
Author | SHA1 | Date | |
---|---|---|---|
1d7608821b | |||
d1d8117853 | |||
0267c878bd |
|
@ -1,248 +0,0 @@
|
|||
// Copyright 2016-2020, Pulumi Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/pulumi/pulumi/pkg/v2/backend"
|
||||
"github.com/pulumi/pulumi/pkg/v2/backend/display"
|
||||
"github.com/pulumi/pulumi/pkg/v2/engine"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/cmdutil"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/result"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
|
||||
)
|
||||
|
||||
// intentionally disabling here for cleaner err declaration/assignment.
|
||||
// nolint: vetshadow
|
||||
func newHostCmd() *cobra.Command {
|
||||
var debug bool
|
||||
var expectNop bool
|
||||
var message string
|
||||
var execKind string
|
||||
var stack string
|
||||
var configArray []string
|
||||
var path bool
|
||||
|
||||
// Flags for engine.UpdateOptions.
|
||||
var policyPackPaths []string
|
||||
var policyPackConfigPaths []string
|
||||
var diffDisplay bool
|
||||
var eventLogPath string
|
||||
var parallel int
|
||||
var refresh bool
|
||||
var showConfig bool
|
||||
var showReplacementSteps bool
|
||||
var showSames bool
|
||||
var showReads bool
|
||||
var suppressOutputs bool
|
||||
var yes bool
|
||||
var secretsProvider string
|
||||
var targets []string
|
||||
var replaces []string
|
||||
var targetReplaces []string
|
||||
var targetDependents bool
|
||||
|
||||
var cmd = &cobra.Command{
|
||||
Use: "host",
|
||||
Short: "[PREVIEW] Launch the engine in 'host' mode without launching a language runtime",
|
||||
Long: "[PREVIEW] Launch the engine in 'host' mode without launching a language runtime.\n" +
|
||||
"\n" +
|
||||
"To support some automation scenarios, the engine may be launched in this mode to prevent it from\n" +
|
||||
"needing to spawn its own language runtime. In this case, the lifetime of interactions between the\n" +
|
||||
"language and the engine host must be managed manually.",
|
||||
// host is an internal command not intended for public use
|
||||
Hidden: true,
|
||||
Run: cmdutil.RunResultFunc(func(cmd *cobra.Command, args []string) result.Result {
|
||||
isPreview := len(args) > 0 && args[0] == "preview"
|
||||
|
||||
// Construct the options based on flags.
|
||||
opts, err := updateFlagsToOptions(false, true, true)
|
||||
if err != nil {
|
||||
return result.FromError(err)
|
||||
}
|
||||
|
||||
opts.Display = display.Options{
|
||||
Color: cmdutil.GetGlobalColorization(),
|
||||
ShowConfig: showConfig,
|
||||
ShowReplacementSteps: showReplacementSteps,
|
||||
ShowSameResources: showSames,
|
||||
ShowReads: showReads,
|
||||
SuppressOutputs: suppressOutputs,
|
||||
IsInteractive: false,
|
||||
Type: display.DisplayDiff,
|
||||
EventLogPath: eventLogPath,
|
||||
Debug: debug,
|
||||
}
|
||||
|
||||
opts.Engine = engine.UpdateOptions{
|
||||
Parallel: parallel,
|
||||
Debug: debug,
|
||||
Refresh: refresh,
|
||||
UseLegacyDiff: useLegacyDiff(),
|
||||
IsHostCommand: true,
|
||||
}
|
||||
|
||||
// we set up the stack in auto.NewStack before execing host
|
||||
s, err := requireStack(stack, false /*offerNew*/, opts.Display, false /*setCurrent*/)
|
||||
if err != nil {
|
||||
return result.FromError(err)
|
||||
}
|
||||
|
||||
proj, root, err := readProject()
|
||||
if err != nil {
|
||||
return result.FromError(err)
|
||||
}
|
||||
proj.Runtime = workspace.NewProjectRuntimeInfo("host", nil)
|
||||
|
||||
sm, err := getStackSecretsManager(s)
|
||||
if err != nil {
|
||||
return result.FromError(errors.Wrap(err, "getting secrets manager"))
|
||||
}
|
||||
|
||||
cfg, err := getStackConfiguration(s, sm)
|
||||
if err != nil {
|
||||
return result.FromError(errors.Wrap(err, "getting stack configuration"))
|
||||
}
|
||||
|
||||
m, err := getUpdateMetadata(message, root, execKind)
|
||||
if err != nil {
|
||||
return result.FromError(errors.Wrap(err, "gathering environment metadata"))
|
||||
}
|
||||
|
||||
operation := s.Update
|
||||
if isPreview {
|
||||
operation = s.Preview
|
||||
opts.Display.JSONDisplay = true
|
||||
}
|
||||
|
||||
// Now perform the update. This will stay alive until the user cancels the host.
|
||||
changes, res := operation(commandContext(), backend.UpdateOperation{
|
||||
Proj: proj,
|
||||
Root: root,
|
||||
Opts: opts,
|
||||
M: m,
|
||||
StackConfiguration: cfg,
|
||||
SecretsManager: sm,
|
||||
Scopes: cancellationScopesWithoutInterrupt,
|
||||
})
|
||||
switch {
|
||||
case res != nil && res.Error() == context.Canceled:
|
||||
return result.FromError(errors.New("update cancelled"))
|
||||
case res != nil:
|
||||
return PrintEngineResult(res)
|
||||
case expectNop && changes != nil && changes.HasChanges():
|
||||
return result.FromError(errors.New("error: no changes were expected but changes occurred"))
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
|
||||
cmd.PersistentFlags().BoolVarP(
|
||||
&debug, "debug", "d", false,
|
||||
"Print detailed debugging output during resource operations")
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&expectNop, "expect-no-changes", false,
|
||||
"Return an error if any changes occur during this update")
|
||||
cmd.PersistentFlags().StringVarP(
|
||||
&stack, "stack", "s", "",
|
||||
"The name of the stack to operate on. Defaults to the current stack")
|
||||
cmd.PersistentFlags().StringVar(
|
||||
&stackConfigFile, "config-file", "",
|
||||
"Use the configuration values in the specified file rather than detecting the file name")
|
||||
cmd.PersistentFlags().StringArrayVarP(
|
||||
&configArray, "config", "c", []string{},
|
||||
"Config to use during the update")
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&path, "config-path", false,
|
||||
"Config keys contain a path to a property in a map or list to set")
|
||||
cmd.PersistentFlags().StringVar(
|
||||
&secretsProvider, "secrets-provider", "default", "The type of the provider that should be used to encrypt and "+
|
||||
"decrypt secrets (possible choices: default, passphrase, awskms, azurekeyvault, gcpkms, hashivault). Only"+
|
||||
"used when creating a new stack from an existing template")
|
||||
|
||||
cmd.PersistentFlags().StringVarP(
|
||||
&message, "message", "m", "",
|
||||
"Optional message to associate with the update operation")
|
||||
|
||||
cmd.PersistentFlags().StringArrayVarP(
|
||||
&targets, "target", "t", []string{},
|
||||
"Specify a single resource URN to update. Other resources will not be updated."+
|
||||
" Multiple resources can be specified using --target urn1 --target urn2")
|
||||
cmd.PersistentFlags().StringArrayVar(
|
||||
&replaces, "replace", []string{},
|
||||
"Specify resources to replace. Multiple resources can be specified using --replace run1 --replace urn2")
|
||||
cmd.PersistentFlags().StringArrayVar(
|
||||
&targetReplaces, "target-replace", []string{},
|
||||
"Specify a single resource URN to replace. Other resources will not be updated."+
|
||||
" Shorthand for --target urn --replace urn.")
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&targetDependents, "target-dependents", false,
|
||||
"Allows updating of dependent targets discovered but not specified in --target list")
|
||||
|
||||
// Flags for engine.UpdateOptions.
|
||||
cmd.PersistentFlags().StringSliceVar(
|
||||
&policyPackPaths, "policy-pack", []string{},
|
||||
"[PREVIEW] Run one or more policy packs as part of this update")
|
||||
cmd.PersistentFlags().StringSliceVar(
|
||||
&policyPackConfigPaths, "policy-pack-config", []string{},
|
||||
`[PREVIEW] Path to JSON file containing the config for the policy pack of the corresponding "--policy-pack" flag`)
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&diffDisplay, "diff", false,
|
||||
"Display operation as a rich diff showing the overall change")
|
||||
cmd.PersistentFlags().IntVarP(
|
||||
¶llel, "parallel", "p", defaultParallel,
|
||||
"Allow P resource operations to run in parallel at once (1 for no parallelism). Defaults to unbounded.")
|
||||
cmd.PersistentFlags().BoolVarP(
|
||||
&refresh, "refresh", "r", false,
|
||||
"Refresh the state of the stack's resources before this update")
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&showConfig, "show-config", false,
|
||||
"Show configuration keys and variables")
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&showReplacementSteps, "show-replacement-steps", false,
|
||||
"Show detailed resource replacement creates and deletes instead of a single step")
|
||||
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&showSames, "show-sames", false,
|
||||
"Show resources that don't need be updated because they haven't changed, alongside those that do")
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&showReads, "show-reads", false,
|
||||
"Show resources that are being read in, alongside those being managed directly in the stack")
|
||||
|
||||
cmd.PersistentFlags().BoolVar(
|
||||
&suppressOutputs, "suppress-outputs", false,
|
||||
"Suppress display of stack outputs (in case they contain sensitive values)")
|
||||
cmd.PersistentFlags().BoolVarP(
|
||||
&yes, "yes", "y", false,
|
||||
"Automatically approve and perform the update after previewing it")
|
||||
|
||||
if hasDebugCommands() {
|
||||
cmd.PersistentFlags().StringVar(
|
||||
&eventLogPath, "event-log", "",
|
||||
"Log events to a file at this path")
|
||||
}
|
||||
|
||||
// internal flag
|
||||
cmd.PersistentFlags().StringVar(&execKind, "exec-kind", "", "")
|
||||
// ignore err, only happens if flag does not exist
|
||||
_ = cmd.PersistentFlags().MarkHidden("exec-kind")
|
||||
|
||||
return cmd
|
||||
}
|
|
@ -67,7 +67,7 @@ func getProjectPlugins() ([]workspace.PluginInfo, error) {
|
|||
// Get the required plugins and then ensure they have metadata populated about them. Because it's possible
|
||||
// a plugin required by the project hasn't yet been installed, we will simply skip any errors we encounter.
|
||||
var results []workspace.PluginInfo
|
||||
plugins, err := ctx.Host.GetRequiredPlugins(plugin.ProgInfo{
|
||||
plugins, err := plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
|
||||
Proj: proj,
|
||||
Pwd: pwd,
|
||||
Program: main,
|
||||
|
|
|
@ -34,6 +34,7 @@ func newPreviewCmd() *cobra.Command {
|
|||
var stack string
|
||||
var configArray []string
|
||||
var configPath bool
|
||||
var client string
|
||||
|
||||
// Flags for engine.UpdateOptions.
|
||||
var jsonDisplay bool
|
||||
|
@ -106,7 +107,7 @@ func newPreviewCmd() *cobra.Command {
|
|||
return result.FromError(err)
|
||||
}
|
||||
|
||||
proj, root, err := readProject()
|
||||
proj, root, err := readProjectForUpdate(client)
|
||||
if err != nil {
|
||||
return result.FromError(err)
|
||||
}
|
||||
|
@ -195,6 +196,9 @@ func newPreviewCmd() *cobra.Command {
|
|||
&configPath, "config-path", false,
|
||||
"Config keys contain a path to a property in a map or list to set")
|
||||
|
||||
cmd.PersistentFlags().StringVar(
|
||||
&client, "client", "", "The address of an existing language runtime host to connect to")
|
||||
|
||||
cmd.PersistentFlags().StringVarP(
|
||||
&message, "message", "m", "",
|
||||
"Optional message to associate with the preview operation")
|
||||
|
|
|
@ -211,7 +211,6 @@ func NewPulumiCmd() *cobra.Command {
|
|||
// Less common, and thus hidden, commands:
|
||||
cmd.AddCommand(newGenCompletionCmd(cmd))
|
||||
cmd.AddCommand(newGenMarkdownCmd(cmd))
|
||||
cmd.AddCommand(newHostCmd())
|
||||
|
||||
// We have a set of commands that are still experimental and that we add only when PULUMI_EXPERIMENTAL is set
|
||||
// to true.
|
||||
|
|
|
@ -51,6 +51,7 @@ func newUpCmd() *cobra.Command {
|
|||
var stack string
|
||||
var configArray []string
|
||||
var path bool
|
||||
var client string
|
||||
|
||||
// Flags for engine.UpdateOptions.
|
||||
var policyPackPaths []string
|
||||
|
@ -85,7 +86,7 @@ func newUpCmd() *cobra.Command {
|
|||
return result.FromError(err)
|
||||
}
|
||||
|
||||
proj, root, err := readProject()
|
||||
proj, root, err := readProjectForUpdate(client)
|
||||
if err != nil {
|
||||
return result.FromError(err)
|
||||
}
|
||||
|
@ -399,6 +400,9 @@ func newUpCmd() *cobra.Command {
|
|||
"decrypt secrets (possible choices: default, passphrase, awskms, azurekeyvault, gcpkms, hashivault). Only"+
|
||||
"used when creating a new stack from an existing template")
|
||||
|
||||
cmd.PersistentFlags().StringVar(
|
||||
&client, "client", "", "The address of an existing language runtime host to connect to")
|
||||
|
||||
cmd.PersistentFlags().StringVarP(
|
||||
&message, "message", "m", "",
|
||||
"Optional message to associate with the update operation")
|
||||
|
|
|
@ -399,6 +399,24 @@ func parseAndSaveConfigArray(s backend.Stack, configArray []string, path bool) e
|
|||
return nil
|
||||
}
|
||||
|
||||
// readProjectForUpdate attempts to detect and read a Pulumi project for the current workspace. If
|
||||
// the project is successfully detected and read, it is returned along with the path to its
|
||||
// containing directory, which will be used as the root of the project's Pulumi program. If a
|
||||
// client address is present, the returned project will always have the runtime set to "client"
|
||||
// with the address option set to the client address.
|
||||
func readProjectForUpdate(clientAddress string) (*workspace.Project, string, error) {
|
||||
proj, root, err := readProject()
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if clientAddress != "" {
|
||||
proj.Runtime = workspace.NewProjectRuntimeInfo("client", map[string]interface{}{
|
||||
"address": clientAddress,
|
||||
})
|
||||
}
|
||||
return proj, root, nil
|
||||
}
|
||||
|
||||
// readProject attempts to detect and read a Pulumi project for the current workspace. If the
|
||||
// project is successfully detected and read, it is returned along with the path to its containing
|
||||
// directory, which will be used as the root of the project's Pulumi program.
|
||||
|
|
|
@ -25,9 +25,12 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/blang/semver"
|
||||
pbempty "github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/mitchellh/copystructure"
|
||||
combinations "github.com/mxschmitt/golang-combinations"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
"github.com/pulumi/pulumi/pkg/v2/resource/deploy"
|
||||
|
@ -44,11 +47,11 @@ import (
|
|||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/logging"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/result"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil/rpcerror"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
|
||||
|
||||
combinations "github.com/mxschmitt/golang-combinations"
|
||||
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
|
||||
)
|
||||
|
||||
type JournalEntryKind int
|
||||
|
@ -347,14 +350,15 @@ type TestStep struct {
|
|||
}
|
||||
|
||||
type TestPlan struct {
|
||||
Project string
|
||||
Stack string
|
||||
Runtime string
|
||||
Config config.Map
|
||||
Decrypter config.Decrypter
|
||||
BackendClient deploy.BackendClient
|
||||
Options UpdateOptions
|
||||
Steps []TestStep
|
||||
Project string
|
||||
Stack string
|
||||
Runtime string
|
||||
RuntimeOptions map[string]interface{}
|
||||
Config config.Map
|
||||
Decrypter config.Decrypter
|
||||
BackendClient deploy.BackendClient
|
||||
Options UpdateOptions
|
||||
Steps []TestStep
|
||||
}
|
||||
|
||||
//nolint: goconst
|
||||
|
@ -392,7 +396,7 @@ func (p *TestPlan) GetProject() workspace.Project {
|
|||
|
||||
return workspace.Project{
|
||||
Name: projectName,
|
||||
Runtime: workspace.NewProjectRuntimeInfo(runtime, nil),
|
||||
Runtime: workspace.NewProjectRuntimeInfo(runtime, p.RuntimeOptions),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5990,3 +5994,112 @@ func TestSingleComponentDefaultProviderLifecycle(t *testing.T) {
|
|||
}
|
||||
p.Run(t, nil)
|
||||
}
|
||||
|
||||
type updateContext struct {
|
||||
*deploytest.ResourceMonitor
|
||||
|
||||
resmon chan *deploytest.ResourceMonitor
|
||||
programErr chan error
|
||||
snap chan *deploy.Snapshot
|
||||
updateResult chan result.Result
|
||||
}
|
||||
|
||||
func startUpdate(host plugin.Host) (*updateContext, error) {
|
||||
ctx := &updateContext{
|
||||
resmon: make(chan *deploytest.ResourceMonitor),
|
||||
programErr: make(chan error),
|
||||
snap: make(chan *deploy.Snapshot),
|
||||
updateResult: make(chan result.Result),
|
||||
}
|
||||
|
||||
port, _, err := rpcutil.Serve(0, nil, []func(*grpc.Server) error{
|
||||
func(srv *grpc.Server) error {
|
||||
pulumirpc.RegisterLanguageRuntimeServer(srv, ctx)
|
||||
return nil
|
||||
},
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &TestPlan{
|
||||
Options: UpdateOptions{host: host},
|
||||
Runtime: "client",
|
||||
RuntimeOptions: map[string]interface{}{
|
||||
"address": fmt.Sprintf("127.0.0.1:%d", port),
|
||||
},
|
||||
}
|
||||
|
||||
go func() {
|
||||
snap, res := TestOp(Update).Run(p.GetProject(), p.GetTarget(nil), p.Options, false, p.BackendClient, nil)
|
||||
ctx.snap <- snap
|
||||
close(ctx.snap)
|
||||
ctx.updateResult <- res
|
||||
close(ctx.updateResult)
|
||||
}()
|
||||
|
||||
ctx.ResourceMonitor = <-ctx.resmon
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func (ctx *updateContext) Finish(err error) (*deploy.Snapshot, result.Result) {
|
||||
ctx.programErr <- err
|
||||
close(ctx.programErr)
|
||||
|
||||
return <-ctx.snap, <-ctx.updateResult
|
||||
}
|
||||
|
||||
func (ctx *updateContext) GetRequiredPlugins(_ context.Context,
|
||||
req *pulumirpc.GetRequiredPluginsRequest) (*pulumirpc.GetRequiredPluginsResponse, error) {
|
||||
return &pulumirpc.GetRequiredPluginsResponse{}, nil
|
||||
}
|
||||
|
||||
func (ctx *updateContext) Run(_ context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
|
||||
// Connect to the resource monitor and create an appropriate client.
|
||||
conn, err := grpc.Dial(
|
||||
req.MonitorAddress,
|
||||
grpc.WithInsecure(),
|
||||
rpcutil.GrpcChannelOptions(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not connect to resource monitor")
|
||||
}
|
||||
defer contract.IgnoreClose(conn)
|
||||
|
||||
// Fire up a resource monitor client
|
||||
ctx.resmon <- deploytest.NewResourceMonitor(pulumirpc.NewResourceMonitorClient(conn))
|
||||
close(ctx.resmon)
|
||||
|
||||
// Wait for the program to terminate.
|
||||
if err := <-ctx.programErr; err != nil {
|
||||
return &pulumirpc.RunResponse{Error: err.Error()}, nil
|
||||
}
|
||||
return &pulumirpc.RunResponse{}, nil
|
||||
}
|
||||
|
||||
func (ctx *updateContext) GetPluginInfo(_ context.Context, req *pbempty.Empty) (*pulumirpc.PluginInfo, error) {
|
||||
return &pulumirpc.PluginInfo{
|
||||
Version: "1.0.0",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestLanguageClient(t *testing.T) {
|
||||
loaders := []*deploytest.ProviderLoader{
|
||||
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
|
||||
return &deploytest.Provider{}, nil
|
||||
}),
|
||||
}
|
||||
|
||||
update, err := startUpdate(deploytest.NewPluginHost(nil, nil, nil, loaders...))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start update: %v", err)
|
||||
}
|
||||
|
||||
// Register resources, etc.
|
||||
_, _, _, err = update.RegisterResource("pkgA:m:typA", "resA", true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
snap, res := update.Finish(nil)
|
||||
assert.Nil(t, res)
|
||||
assert.Len(t, snap.Resources, 2)
|
||||
}
|
||||
|
|
|
@ -16,11 +16,10 @@ package engine
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pulumi/pulumi/pkg/v2/resource/deploy"
|
||||
"github.com/pulumi/pulumi/pkg/v2/resource/deploy/providers"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/diag"
|
||||
|
@ -50,6 +49,23 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host, config plugin.Conf
|
|||
return "", "", nil, err
|
||||
}
|
||||
|
||||
// If the project wants to connect to an existing language runtime, do so now.
|
||||
if projinfo.Proj.Runtime.Name() == "client" {
|
||||
addressValue, ok := projinfo.Proj.Runtime.Options()["address"]
|
||||
if !ok {
|
||||
return "", "", nil, errors.New("missing address of language runtime service")
|
||||
}
|
||||
address, ok := addressValue.(string)
|
||||
if !ok {
|
||||
return "", "", nil, errors.New("address of language runtime service must be a string")
|
||||
}
|
||||
host, err := connectToLanguageRuntime(ctx, address)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
ctx.Host = host
|
||||
}
|
||||
|
||||
return pwd, main, ctx, nil
|
||||
}
|
||||
|
||||
|
@ -127,10 +143,6 @@ func plan(ctx *Context, info *planContext, opts planOptions, dryRun bool) (*plan
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if opts.UpdateOptions.IsHostCommand {
|
||||
fmt.Fprintf(os.Stderr, "engine: %s\n", plugctx.Host.ServerAddr())
|
||||
}
|
||||
|
||||
opts.trustDependencies = proj.TrustResourceDependencies()
|
||||
// Now create the state source. This may issue an error if it can't create the source. This entails,
|
||||
// for example, loading any plugins which will be required to execute a program, among other things.
|
||||
|
|
49
pkg/engine/plugin_host.go
Normal file
49
pkg/engine/plugin_host.go
Normal file
|
@ -0,0 +1,49 @@
|
|||
// Copyright 2016-2020, Pulumi Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/resource/plugin"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
|
||||
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
|
||||
)
|
||||
|
||||
type clientLanguageRuntimeHost struct {
|
||||
plugin.Host
|
||||
|
||||
languageRuntime plugin.LanguageRuntime
|
||||
}
|
||||
|
||||
func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) {
|
||||
// Dial the language runtime.
|
||||
conn, err := grpc.Dial(address, grpc.WithInsecure(),
|
||||
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()), rpcutil.GrpcChannelOptions())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not connect to language host")
|
||||
}
|
||||
|
||||
client := pulumirpc.NewLanguageRuntimeClient(conn)
|
||||
return &clientLanguageRuntimeHost{
|
||||
Host: ctx.Host,
|
||||
languageRuntime: plugin.NewLanguageRuntimeClient(ctx, "client", client),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
|
||||
return host.languageRuntime, nil
|
||||
}
|
|
@ -75,11 +75,16 @@ func newPluginSet() pluginSet {
|
|||
func gatherPluginsFromProgram(plugctx *plugin.Context, prog plugin.ProgInfo) (pluginSet, error) {
|
||||
logging.V(preparePluginLog).Infof("gatherPluginsFromProgram(): gathering plugins from language host")
|
||||
set := newPluginSet()
|
||||
langhostPlugins, err := plugctx.Host.GetRequiredPlugins(prog, plugin.AllPlugins)
|
||||
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, prog, plugin.AllPlugins)
|
||||
if err != nil {
|
||||
return set, err
|
||||
}
|
||||
for _, plug := range langhostPlugins {
|
||||
// Ignore language plugins named "client".
|
||||
if plug.Name == "client" && plug.Kind == workspace.LanguagePlugin {
|
||||
continue
|
||||
}
|
||||
|
||||
logging.V(preparePluginLog).Infof(
|
||||
"gatherPluginsFromProgram(): plugin %s %s (%s) is required by language host",
|
||||
plug.Name, plug.Version, plug.ServerURL)
|
||||
|
|
|
@ -132,9 +132,6 @@ type UpdateOptions struct {
|
|||
// true if the engine should use legacy diffing behavior during an update.
|
||||
UseLegacyDiff bool
|
||||
|
||||
// true if we're executing in the context of `pulumi host` command.
|
||||
IsHostCommand bool
|
||||
|
||||
// true if we should report events for steps that involve default providers.
|
||||
reportDefaultProviderSteps bool
|
||||
|
||||
|
@ -389,12 +386,19 @@ func newUpdateSource(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// If we are connecting to an existing client, stash the address of the engine in its arguments.
|
||||
var args []string
|
||||
if proj.Runtime.Name() == "client" {
|
||||
args = []string{plugctx.Host.ServerAddr()}
|
||||
}
|
||||
|
||||
// If that succeeded, create a new source that will perform interpretation of the compiled program.
|
||||
// TODO[pulumi/pulumi#88]: we are passing `nil` as the arguments map; we need to allow a way to pass these.
|
||||
return deploy.NewEvalSource(plugctx, &deploy.EvalRunInfo{
|
||||
Proj: proj,
|
||||
Pwd: pwd,
|
||||
Program: main,
|
||||
Args: args,
|
||||
Target: target,
|
||||
}, defaultProviderVersions, dryRun), nil
|
||||
}
|
||||
|
|
|
@ -54,6 +54,10 @@ func (rm *ResourceMonitor) Close() error {
|
|||
return rm.conn.Close()
|
||||
}
|
||||
|
||||
func NewResourceMonitor(resmon pulumirpc.ResourceMonitorClient) *ResourceMonitor {
|
||||
return &ResourceMonitor{resmon: resmon}
|
||||
}
|
||||
|
||||
type ResourceOptions struct {
|
||||
Parent resource.URN
|
||||
Protect bool
|
||||
|
|
|
@ -73,8 +73,6 @@ type Host interface {
|
|||
// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
|
||||
// and/or there are errors loading one or more plugins, a non-nil error is returned.
|
||||
EnsurePlugins(plugins []workspace.PluginInfo, kinds Flags) error
|
||||
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
|
||||
GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error)
|
||||
|
||||
// SignalCancellation asks all resource providers to gracefully shut down and abort any ongoing
|
||||
// operations. Operation aborted in this way will return an error (e.g., `Update` and `Create`
|
||||
|
@ -371,42 +369,6 @@ func (host *defaultHost) EnsurePlugins(plugins []workspace.PluginInfo, kinds Fla
|
|||
return result
|
||||
}
|
||||
|
||||
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
|
||||
func (host *defaultHost) GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error) {
|
||||
var plugins []workspace.PluginInfo
|
||||
|
||||
if kinds&LanguagePlugins != 0 {
|
||||
// First make sure the language plugin is present. We need this to load the required resource plugins.
|
||||
// TODO: we need to think about how best to version this. For now, it always picks the latest.
|
||||
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
|
||||
}
|
||||
plugins = append(plugins, workspace.PluginInfo{
|
||||
Name: info.Proj.Runtime.Name(),
|
||||
Kind: workspace.LanguagePlugin,
|
||||
})
|
||||
|
||||
if kinds&ResourcePlugins != 0 {
|
||||
// Use the language plugin to compute this project's set of plugin dependencies.
|
||||
// TODO: we want to support loading precisely what the project needs, rather than doing a static scan of resolved
|
||||
// packages. Doing this requires that we change our RPC interface and figure out how to configure plugins
|
||||
// later than we do (right now, we do it up front, but at that point we don't know the version).
|
||||
deps, err := lang.GetRequiredPlugins(info)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to discover plugin requirements")
|
||||
}
|
||||
plugins = append(plugins, deps...)
|
||||
}
|
||||
} else {
|
||||
// If we can't load the language plugin, we can't discover the resource plugins.
|
||||
contract.Assertf(kinds&ResourcePlugins != 0,
|
||||
"cannot load resource plugins without also loading the language plugin")
|
||||
}
|
||||
|
||||
return plugins, nil
|
||||
}
|
||||
|
||||
func (host *defaultHost) SignalCancellation() error {
|
||||
// NOTE: we're abusing loadPlugin in order to ensure proper synchronization.
|
||||
_, err := host.loadPlugin(func() (interface{}, error) {
|
||||
|
@ -478,3 +440,39 @@ const (
|
|||
|
||||
// AllPlugins uses flags to ensure that all plugin kinds are loaded.
|
||||
var AllPlugins = AnalyzerPlugins | LanguagePlugins | ResourcePlugins
|
||||
|
||||
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
|
||||
func GetRequiredPlugins(host Host, info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error) {
|
||||
var plugins []workspace.PluginInfo
|
||||
|
||||
if kinds&LanguagePlugins != 0 {
|
||||
// First make sure the language plugin is present. We need this to load the required resource plugins.
|
||||
// TODO: we need to think about how best to version this. For now, it always picks the latest.
|
||||
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
|
||||
}
|
||||
plugins = append(plugins, workspace.PluginInfo{
|
||||
Name: info.Proj.Runtime.Name(),
|
||||
Kind: workspace.LanguagePlugin,
|
||||
})
|
||||
|
||||
if kinds&ResourcePlugins != 0 {
|
||||
// Use the language plugin to compute this project's set of plugin dependencies.
|
||||
// TODO: we want to support loading precisely what the project needs, rather than doing a static scan of resolved
|
||||
// packages. Doing this requires that we change our RPC interface and figure out how to configure plugins
|
||||
// later than we do (right now, we do it up front, but at that point we don't know the version).
|
||||
deps, err := lang.GetRequiredPlugins(info)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to discover plugin requirements")
|
||||
}
|
||||
plugins = append(plugins, deps...)
|
||||
}
|
||||
} else {
|
||||
// If we can't load the language plugin, we can't discover the resource plugins.
|
||||
contract.Assertf(kinds&ResourcePlugins != 0,
|
||||
"cannot load resource plugins without also loading the language plugin")
|
||||
}
|
||||
|
||||
return plugins, nil
|
||||
}
|
||||
|
|
|
@ -16,10 +16,7 @@ package plugin
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/blang/semver"
|
||||
pbempty "github.com/golang/protobuf/ptypes/empty"
|
||||
|
@ -30,17 +27,10 @@ import (
|
|||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/logging"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil/rpcerror"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/version"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
|
||||
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
|
||||
)
|
||||
|
||||
// HostOnlyRuntimeMode can be used to launch the engine in "host"-only mode. That is to say, the engine and
|
||||
// resource monitors will be spawned, but no language runtime will be created. This allows automated
|
||||
// scenarios that drive the engine from outside of the engine itself, such as, for example, a Node.js
|
||||
// program spawning, deploying, and tearing down the engine's resources.
|
||||
const HostOnlyRuntimeMode = "host"
|
||||
|
||||
// langhost reflects a language host plugin, loaded dynamically for a single language/runtime pair.
|
||||
type langhost struct {
|
||||
ctx *Context
|
||||
|
@ -53,12 +43,7 @@ type langhost struct {
|
|||
// plugin could not be found, or an error occurs while creating the child process, an error is returned.
|
||||
func NewLanguageRuntime(host Host, ctx *Context, runtime string,
|
||||
options map[string]interface{}) (LanguageRuntime, error) {
|
||||
// If this is host-only mode, exit without spawning anything.
|
||||
if runtime == HostOnlyRuntimeMode {
|
||||
return &langhost{ctx: ctx, runtime: runtime}, nil
|
||||
}
|
||||
|
||||
// For all other languages, load the plugin's path by using the standard workspace logic.
|
||||
_, path, err := workspace.GetPluginPath(
|
||||
workspace.LanguagePlugin, strings.Replace(runtime, tokens.QNameDelimiter, "_", -1), nil)
|
||||
if err != nil {
|
||||
|
@ -90,19 +75,21 @@ func NewLanguageRuntime(host Host, ctx *Context, runtime string,
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (h *langhost) Runtime() string { return h.runtime }
|
||||
func (h *langhost) IsHostOnlyMode() bool { return h.runtime == HostOnlyRuntimeMode }
|
||||
func NewLanguageRuntimeClient(ctx *Context, runtime string, client pulumirpc.LanguageRuntimeClient) LanguageRuntime {
|
||||
return &langhost{
|
||||
ctx: ctx,
|
||||
runtime: runtime,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *langhost) Runtime() string { return h.runtime }
|
||||
|
||||
// GetRequiredPlugins computes the complete set of anticipated plugins required by a program.
|
||||
func (h *langhost) GetRequiredPlugins(info ProgInfo) ([]workspace.PluginInfo, error) {
|
||||
proj := string(info.Proj.Name)
|
||||
logging.V(7).Infof("langhost[%v].GetRequiredPlugins(proj=%s,pwd=%s,program=%s) executing",
|
||||
h.runtime, proj, info.Pwd, info.Program)
|
||||
if h.IsHostOnlyMode() {
|
||||
logging.V(7).Infof("langhost[%v].GetRequiredPlugins(proj=%s,pwd=%s,program=%s) in host mode, exiting",
|
||||
h.runtime, proj, info.Pwd, info.Program)
|
||||
return nil, nil
|
||||
}
|
||||
resp, err := h.client.GetRequiredPlugins(h.ctx.Request(), &pulumirpc.GetRequiredPluginsRequest{
|
||||
Project: proj,
|
||||
Pwd: info.Pwd,
|
||||
|
@ -156,14 +143,6 @@ func (h *langhost) GetRequiredPlugins(info ProgInfo) ([]workspace.PluginInfo, er
|
|||
func (h *langhost) Run(info RunInfo) (string, bool, error) {
|
||||
logging.V(7).Infof("langhost[%v].Run(pwd=%v,program=%v,#args=%v,proj=%s,stack=%v,#config=%v,dryrun=%v) executing",
|
||||
h.runtime, info.Pwd, info.Program, len(info.Args), info.Project, info.Stack, len(info.Config), info.DryRun)
|
||||
// In host mode, we simply print the engine and monitor address to stderr and wait for a signal.
|
||||
if h.IsHostOnlyMode() {
|
||||
fmt.Fprintf(os.Stderr, "resmon: %s\n", info.MonitorAddress)
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigs
|
||||
return "", false, nil
|
||||
}
|
||||
config := make(map[string]string)
|
||||
for k, v := range info.Config {
|
||||
config[k.String()] = v
|
||||
|
@ -203,20 +182,15 @@ func (h *langhost) GetPluginInfo() (workspace.PluginInfo, error) {
|
|||
Kind: workspace.LanguagePlugin,
|
||||
}
|
||||
|
||||
var vers string
|
||||
if h.IsHostOnlyMode() {
|
||||
vers = version.Version
|
||||
} else {
|
||||
plugInfo.Path = h.plug.Bin
|
||||
plugInfo.Path = h.plug.Bin
|
||||
|
||||
resp, err := h.client.GetPluginInfo(h.ctx.Request(), &pbempty.Empty{})
|
||||
if err != nil {
|
||||
rpcError := rpcerror.Convert(err)
|
||||
logging.V(7).Infof("langhost[%v].GetPluginInfo() failed: err=%v", h.runtime, rpcError)
|
||||
return workspace.PluginInfo{}, rpcError
|
||||
}
|
||||
vers = resp.Version
|
||||
resp, err := h.client.GetPluginInfo(h.ctx.Request(), &pbempty.Empty{})
|
||||
if err != nil {
|
||||
rpcError := rpcerror.Convert(err)
|
||||
logging.V(7).Infof("langhost[%v].GetPluginInfo() failed: err=%v", h.runtime, rpcError)
|
||||
return workspace.PluginInfo{}, rpcError
|
||||
}
|
||||
vers := resp.Version
|
||||
|
||||
if vers != "" {
|
||||
sv, err := semver.ParseTolerant(vers)
|
||||
|
|
|
@ -101,6 +101,14 @@ conflicts:
|
|||
if err != nil && IsConcurrentUpdateError(err) { /* retry logic here */ }
|
||||
```
|
||||
|
||||
## Developing the Godocs
|
||||
This repo has extensive examples and godoc content. To test out your changes locally you can do the following:
|
||||
|
||||
1. enlist in the appropriate pulumi branch:
|
||||
2. cd $GOPATH/src/github.com/pulumi/pulumi/sdk/go/x/auto
|
||||
3. godoc -http=:6060
|
||||
4. Navigate to http://localhost:6060/pkg/github.com/pulumi/pulumi/sdk/v2/go/x/auto/
|
||||
|
||||
## Known Issues
|
||||
|
||||
The Automation API is currently in Alpha and has several known issues. Please upvote issues,
|
||||
|
|
|
@ -17,6 +17,7 @@ package auto
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
)
|
||||
|
@ -26,6 +27,7 @@ const unknownErrorCode = -2
|
|||
func runPulumiCommandSync(
|
||||
ctx context.Context,
|
||||
workdir string,
|
||||
additionalOutput []io.Writer,
|
||||
additionalEnv []string,
|
||||
args ...string,
|
||||
) (string, string, int, error) {
|
||||
|
@ -35,10 +37,13 @@ func runPulumiCommandSync(
|
|||
cmd := exec.CommandContext(ctx, "pulumi", args...)
|
||||
cmd.Dir = workdir
|
||||
cmd.Env = append(os.Environ(), additionalEnv...)
|
||||
|
||||
var stdout bytes.Buffer
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stdout = &stdout
|
||||
additionalOutput = append(additionalOutput, &stdout)
|
||||
cmd.Stdout = io.MultiWriter(additionalOutput...)
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
code := unknownErrorCode
|
||||
err := cmd.Run()
|
||||
if exitError, ok := err.(*exec.ExitError); ok {
|
||||
|
|
|
@ -337,7 +337,9 @@ func TestRuntimeErrorPython(t *testing.T) {
|
|||
|
||||
_, err = s.Up(ctx)
|
||||
assert.NotNil(t, err)
|
||||
assert.True(t, IsRuntimeError(err))
|
||||
if !assert.True(t, IsRuntimeError(err)) {
|
||||
t.Logf("%v is not a runtime error", err)
|
||||
}
|
||||
|
||||
// -- pulumi destroy --
|
||||
|
||||
|
|
|
@ -18,6 +18,9 @@ package auto
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
|
||||
|
@ -734,6 +737,20 @@ func ExampleStack_Destroy() {
|
|||
stack.Destroy(ctx, optdestroy.Message("a message to save with the destroy operation"))
|
||||
}
|
||||
|
||||
func ExampleStack_Destroy_streamingProgress() {
|
||||
ctx := context.Background()
|
||||
fqsn := FullyQualifiedStackName("org", "project", "stack")
|
||||
// select an existing stack to destroy
|
||||
stack, _ := SelectStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
|
||||
// create a temp file that we can tail during while our program runs
|
||||
tmp, _ := ioutil.TempFile(os.TempDir(), "")
|
||||
// optdestroy.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
|
||||
// this gives us incremental status over time
|
||||
progressStreams := []io.Writer{os.Stdout, tmp}
|
||||
// this destroy will incrementally stream unstructured progress messages to stdout and our temp file
|
||||
stack.Destroy(ctx, optdestroy.ProgressStreams(progressStreams...))
|
||||
}
|
||||
|
||||
func ExampleStack_Up() {
|
||||
ctx := context.Background()
|
||||
fqsn := FullyQualifiedStackName("org", "project", "stack")
|
||||
|
@ -742,6 +759,20 @@ func ExampleStack_Up() {
|
|||
stack.Up(ctx, optup.Message("a message to save with the up operation"), optup.Parallel(10000))
|
||||
}
|
||||
|
||||
func ExampleStack_Up_streamingProgress() {
|
||||
ctx := context.Background()
|
||||
fqsn := FullyQualifiedStackName("org", "project", "stack")
|
||||
// create a new stack to update
|
||||
stack, _ := NewStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
|
||||
// create a temp file that we can tail during while our program runs
|
||||
tmp, _ := ioutil.TempFile(os.TempDir(), "")
|
||||
// optup.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
|
||||
// this gives us incremental status over time
|
||||
progressStreams := []io.Writer{os.Stdout, tmp}
|
||||
// this update will incrementally stream unstructured progress messages to stdout and our temp file
|
||||
stack.Up(ctx, optup.ProgressStreams(progressStreams...))
|
||||
}
|
||||
|
||||
func ExampleStack_Preview() {
|
||||
ctx := context.Background()
|
||||
fqsn := FullyQualifiedStackName("org", "project", "stack")
|
||||
|
@ -758,6 +789,20 @@ func ExampleStack_Refresh() {
|
|||
stack.Refresh(ctx, optrefresh.Message("a message to save with the refresh operation"))
|
||||
}
|
||||
|
||||
func ExampleStack_Refresh_streamingProgress() {
|
||||
ctx := context.Background()
|
||||
fqsn := FullyQualifiedStackName("org", "project", "stack")
|
||||
// select an existing stack and refresh the resources under management
|
||||
stack, _ := SelectStackLocalSource(ctx, fqsn, filepath.Join(".", "program"))
|
||||
// create a temp file that we can tail during while our program runs
|
||||
tmp, _ := ioutil.TempFile(os.TempDir(), "")
|
||||
// optrefresh.ProgressStreams allows us to stream incremental output to stdout, a file to tail, etc.
|
||||
// this gives us incremental status over time
|
||||
progressStreams := []io.Writer{os.Stdout, tmp}
|
||||
// this refresh will incrementally stream unstructured progress messages to stdout and our temp file
|
||||
stack.Refresh(ctx, optrefresh.ProgressStreams(progressStreams...))
|
||||
}
|
||||
|
||||
func ExampleStack_GetAllConfig() {
|
||||
ctx := context.Background()
|
||||
fqsn := FullyQualifiedStackName("org", "project", "stack")
|
||||
|
|
|
@ -458,7 +458,7 @@ func (l *LocalWorkspace) runPulumiCmdSync(
|
|||
env = append(env, strings.Join(e, "="))
|
||||
}
|
||||
}
|
||||
return runPulumiCommandSync(ctx, l.WorkDir(), env, args...)
|
||||
return runPulumiCommandSync(ctx, l.WorkDir(), nil /* additionalOutputs */, env, args...)
|
||||
}
|
||||
|
||||
// NewLocalWorkspace creates and configures a LocalWorkspace. LocalWorkspaceOptions can be used to
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package auto
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
@ -27,6 +28,9 @@ import (
|
|||
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/pulumi/config"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optdestroy"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optrefresh"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optup"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
@ -474,3 +478,74 @@ func TestNestedStackFails(t *testing.T) {
|
|||
assert.Equal(t, "destroy", dRes.Summary.Kind)
|
||||
assert.Equal(t, "succeeded", dRes.Summary.Result)
|
||||
}
|
||||
|
||||
func TestProgressStreams(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pName := "inline_progress_streams"
|
||||
sName := fmt.Sprintf("int_test%d", rangeIn(10000000, 99999999))
|
||||
fqsn := FullyQualifiedStackName(pulumiOrg, pName, sName)
|
||||
cfg := ConfigMap{
|
||||
"bar": ConfigValue{
|
||||
Value: "abc",
|
||||
},
|
||||
"buzz": ConfigValue{
|
||||
Value: "secret",
|
||||
Secret: true,
|
||||
},
|
||||
}
|
||||
|
||||
// initialize
|
||||
s, err := NewStackInlineSource(ctx, fqsn, func(ctx *pulumi.Context) error {
|
||||
c := config.New(ctx, "")
|
||||
ctx.Export("exp_static", pulumi.String("foo"))
|
||||
ctx.Export("exp_cfg", pulumi.String(c.Get("bar")))
|
||||
ctx.Export("exp_secret", c.GetSecret("buzz"))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("failed to initialize stack, err: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// -- pulumi stack rm --
|
||||
err = s.Workspace().RemoveStack(ctx, s.Name())
|
||||
assert.Nil(t, err, "failed to remove stack. Resources have leaked.")
|
||||
}()
|
||||
|
||||
err = s.SetAllConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
t.Errorf("failed to set config, err: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// -- pulumi up --
|
||||
var upOut bytes.Buffer
|
||||
res, err := s.Up(ctx, optup.ProgressStreams(&upOut))
|
||||
if err != nil {
|
||||
t.Errorf("up failed, err: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
assert.Equal(t, upOut.String(), res.StdOut, "expected stdout writers to contain same contents")
|
||||
|
||||
// -- pulumi refresh --
|
||||
var refOut bytes.Buffer
|
||||
ref, err := s.Refresh(ctx, optrefresh.ProgressStreams(&refOut))
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("refresh failed, err: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
assert.Equal(t, refOut.String(), ref.StdOut, "expected stdout writers to contain same contents")
|
||||
|
||||
// -- pulumi destroy --
|
||||
var desOut bytes.Buffer
|
||||
dRes, err := s.Destroy(ctx, optdestroy.ProgressStreams(&desOut))
|
||||
if err != nil {
|
||||
t.Errorf("destroy failed, err: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
assert.Equal(t, desOut.String(), dRes.StdOut, "expected stdout writers to contain same contents")
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
// github.com/sdk/v2/go/x/auto Stack.Destroy(...optdestroy.Option)
|
||||
package optdestroy
|
||||
|
||||
import "io"
|
||||
|
||||
// Parallel is the number of resource operations to run in parallel at once during the destroy
|
||||
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
|
||||
func Parallel(n int) Option {
|
||||
|
@ -45,6 +47,13 @@ func TargetDependents() Option {
|
|||
})
|
||||
}
|
||||
|
||||
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
|
||||
func ProgressStreams(writers ...io.Writer) Option {
|
||||
return optionFunc(func(opts *Options) {
|
||||
opts.ProgressStreams = writers
|
||||
})
|
||||
}
|
||||
|
||||
// Option is a parameter to be applied to a Stack.Destroy() operation
|
||||
type Option interface {
|
||||
ApplyOption(*Options)
|
||||
|
@ -63,6 +72,8 @@ type Options struct {
|
|||
Target []string
|
||||
// Allows updating of dependent targets discovered but not specified in the Target list
|
||||
TargetDependents bool
|
||||
// ProgressStreams allows specifying one or more io.Writers to redirect incremental destroy output
|
||||
ProgressStreams []io.Writer
|
||||
}
|
||||
|
||||
type optionFunc func(*Options)
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
// github.com/sdk/v2/go/x/auto Stack.Refresh(...optrefresh.Option)
|
||||
package optrefresh
|
||||
|
||||
import "io"
|
||||
|
||||
// Parallel is the number of resource operations to run in parallel at once during the refresh
|
||||
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
|
||||
func Parallel(n int) Option {
|
||||
|
@ -45,6 +47,13 @@ func Target(urns []string) Option {
|
|||
})
|
||||
}
|
||||
|
||||
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
|
||||
func ProgressStreams(writers ...io.Writer) Option {
|
||||
return optionFunc(func(opts *Options) {
|
||||
opts.ProgressStreams = writers
|
||||
})
|
||||
}
|
||||
|
||||
// Option is a parameter to be applied to a Stack.Refresh() operation
|
||||
type Option interface {
|
||||
ApplyOption(*Options)
|
||||
|
@ -63,6 +72,8 @@ type Options struct {
|
|||
ExpectNoChanges bool
|
||||
// Specify an exclusive list of resource URNs to re
|
||||
Target []string
|
||||
// ProgressStreams allows specifying one or more io.Writers to redirect incremental refresh output
|
||||
ProgressStreams []io.Writer
|
||||
}
|
||||
|
||||
type optionFunc func(*Options)
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
// github.com/sdk/v2/go/x/auto Stack.Up(...optup.Option)
|
||||
package optup
|
||||
|
||||
import "io"
|
||||
|
||||
// Parallel is the number of resource operations to run in parallel at once during the update
|
||||
// (1 for no parallelism). Defaults to unbounded. (default 2147483647)
|
||||
func Parallel(n int) Option {
|
||||
|
@ -59,6 +61,13 @@ func TargetDependents() Option {
|
|||
})
|
||||
}
|
||||
|
||||
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
|
||||
func ProgressStreams(writers ...io.Writer) Option {
|
||||
return optionFunc(func(opts *Options) {
|
||||
opts.ProgressStreams = writers
|
||||
})
|
||||
}
|
||||
|
||||
// Option is a parameter to be applied to a Stack.Up() operation
|
||||
type Option interface {
|
||||
ApplyOption(*Options)
|
||||
|
@ -81,6 +90,8 @@ type Options struct {
|
|||
Target []string
|
||||
// Allows updating of dependent targets discovered but not specified in the Target list
|
||||
TargetDependents bool
|
||||
// ProgressStreams allows specifying one or more io.Writers to redirect incremental update output
|
||||
ProgressStreams []io.Writer
|
||||
}
|
||||
|
||||
type optionFunc func(*Options)
|
||||
|
|
|
@ -88,25 +88,29 @@
|
|||
package auto
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime/debug"
|
||||
"io"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
pbempty "github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/apitype"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/constant"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optdestroy"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optpreview"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optrefresh"
|
||||
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optup"
|
||||
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
|
||||
)
|
||||
|
||||
// Stack is an isolated, independently configurable instance of a Pulumi program.
|
||||
|
@ -202,23 +206,26 @@ func (s *Stack) Preview(ctx context.Context, opts ...optpreview.Option) (Preview
|
|||
if preOpts.TargetDependents {
|
||||
sharedArgs = append(sharedArgs, "--target-dependents")
|
||||
}
|
||||
if preOpts.Parallel > 0 {
|
||||
sharedArgs = append(sharedArgs, fmt.Sprintf("--parallel=%d", preOpts.Parallel))
|
||||
}
|
||||
|
||||
var stdout, stderr string
|
||||
var code int
|
||||
if s.Workspace().Program() != nil {
|
||||
hostArgs := []string{"preview", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoInline)}
|
||||
hostArgs = append(hostArgs, sharedArgs...)
|
||||
stdout, stderr, err = s.host(ctx, hostArgs, preOpts.Parallel)
|
||||
kind, args := constant.ExecKindAutoLocal, []string{"preview", "--json"}
|
||||
if program := s.Workspace().Program(); program != nil {
|
||||
server, err := startLanguageRuntimeServer(program)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
|
||||
}
|
||||
} else {
|
||||
args := []string{"preview", "--json", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoLocal)}
|
||||
args = append(args, sharedArgs...)
|
||||
stdout, stderr, code, err = s.runPulumiCmdSync(ctx, args...)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
|
||||
return res, err
|
||||
}
|
||||
defer contract.IgnoreClose(server)
|
||||
|
||||
kind, args = constant.ExecKindAutoInline, append(args, "--client="+server.address)
|
||||
}
|
||||
|
||||
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
|
||||
args = append(args, sharedArgs...)
|
||||
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, nil /* additionalOutput */, args...)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(stdout), &res)
|
||||
|
@ -259,30 +266,26 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
|
|||
if upOpts.TargetDependents {
|
||||
sharedArgs = append(sharedArgs, "--target-dependents")
|
||||
}
|
||||
if upOpts.Parallel > 0 {
|
||||
sharedArgs = append(sharedArgs, fmt.Sprintf("--parallel=%d", upOpts.Parallel))
|
||||
}
|
||||
|
||||
var stdout, stderr string
|
||||
var code int
|
||||
if s.Workspace().Program() != nil {
|
||||
// TODO need to figure out how to get error code...
|
||||
stdout, stderr, err = s.host(
|
||||
ctx,
|
||||
append(sharedArgs, fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoInline)),
|
||||
upOpts.Parallel,
|
||||
)
|
||||
kind, args := constant.ExecKindAutoLocal, []string{"up", "--yes", "--skip-preview"}
|
||||
if program := s.Workspace().Program(); program != nil {
|
||||
server, err := startLanguageRuntimeServer(program)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
|
||||
}
|
||||
} else {
|
||||
args := []string{"up", "--yes", "--skip-preview", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoLocal)}
|
||||
args = append(args, sharedArgs...)
|
||||
if upOpts.Parallel > 0 {
|
||||
args = append(args, fmt.Sprintf("--parallel=%d", upOpts.Parallel))
|
||||
return res, err
|
||||
}
|
||||
defer contract.IgnoreClose(server)
|
||||
|
||||
stdout, stderr, code, err = s.runPulumiCmdSync(ctx, args...)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
|
||||
}
|
||||
kind, args = constant.ExecKindAutoInline, append(args, "--client="+server.address)
|
||||
}
|
||||
|
||||
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
|
||||
args = append(args, sharedArgs...)
|
||||
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, upOpts.ProgressStreams, args...)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
|
||||
}
|
||||
|
||||
outs, err := s.Outputs(ctx)
|
||||
|
@ -342,7 +345,7 @@ func (s *Stack) Refresh(ctx context.Context, opts ...optrefresh.Option) (Refresh
|
|||
}
|
||||
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
|
||||
|
||||
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
|
||||
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, refreshOpts.ProgressStreams, args...)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to refresh stack"), stdout, stderr, code)
|
||||
}
|
||||
|
@ -399,7 +402,7 @@ func (s *Stack) Destroy(ctx context.Context, opts ...optdestroy.Option) (Destroy
|
|||
}
|
||||
args = append(args, fmt.Sprintf("--exec-kind=%s", execKind))
|
||||
|
||||
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
|
||||
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, destroyOpts.ProgressStreams, args...)
|
||||
if err != nil {
|
||||
return res, newAutoError(errors.Wrap(err, "failed to destroy stack"), stdout, stderr, code)
|
||||
}
|
||||
|
@ -431,13 +434,17 @@ func (s *Stack) Outputs(ctx context.Context) (OutputMap, error) {
|
|||
}
|
||||
|
||||
// standard outputs
|
||||
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json")
|
||||
outStdout, outStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
|
||||
"stack", "output", "--json",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, newAutoError(errors.Wrap(err, "could not get outputs"), outStdout, outStderr, code)
|
||||
}
|
||||
|
||||
// secret outputs
|
||||
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, "stack", "output", "--json", "--show-secrets")
|
||||
secretStdout, secretStderr, code, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
|
||||
"stack", "output", "--json", "--show-secrets",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, newAutoError(errors.Wrap(err, "could not get secret outputs"), outStdout, outStderr, code)
|
||||
}
|
||||
|
@ -473,7 +480,9 @@ func (s *Stack) History(ctx context.Context) ([]UpdateSummary, error) {
|
|||
return nil, errors.Wrap(err, "failed to get stack history")
|
||||
}
|
||||
|
||||
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, "history", "--json", "--show-secrets")
|
||||
stdout, stderr, errCode, err := s.runPulumiCmdSync(ctx, nil, /* additionalOutputs */
|
||||
"history", "--json", "--show-secrets",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, newAutoError(errors.Wrap(err, "failed to get stack history"), stdout, stderr, errCode)
|
||||
}
|
||||
|
@ -628,7 +637,11 @@ type DestroyResult struct {
|
|||
// secretSentinel represents the CLI response for an output marked as "secret"
|
||||
const secretSentinel = "[secret]"
|
||||
|
||||
func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, string, int, error) {
|
||||
func (s *Stack) runPulumiCmdSync(
|
||||
ctx context.Context,
|
||||
additionalOutput []io.Writer,
|
||||
args ...string,
|
||||
) (string, string, int, error) {
|
||||
var env []string
|
||||
if s.Workspace().PulumiHome() != "" {
|
||||
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
|
||||
|
@ -645,7 +658,7 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
|
|||
return "", "", -1, errors.Wrap(err, "failed to exec command, error getting additional args")
|
||||
}
|
||||
args = append(args, additionalArgs...)
|
||||
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), env, args...)
|
||||
stdout, stderr, errCode, err := runPulumiCommandSync(ctx, s.Workspace().WorkDir(), additionalOutput, env, args...)
|
||||
if err != nil {
|
||||
return stdout, stderr, errCode, err
|
||||
}
|
||||
|
@ -656,143 +669,149 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
|
|||
return stdout, stderr, errCode, nil
|
||||
}
|
||||
|
||||
func (s *Stack) host(ctx context.Context, additionalArgs []string, parallel int) (string, string, error) {
|
||||
proj, err := s.Workspace().ProjectSettings(ctx)
|
||||
if err != nil {
|
||||
return "", "", errors.Wrap(err, "could not start run program, failed to start host")
|
||||
}
|
||||
const (
|
||||
stateWaiting = iota
|
||||
stateRunning
|
||||
stateCanceled
|
||||
stateFinished
|
||||
)
|
||||
|
||||
var stdout bytes.Buffer
|
||||
var errBuff bytes.Buffer
|
||||
args := []string{"host"}
|
||||
args = append(args, additionalArgs...)
|
||||
workspaceArgs, err := s.Workspace().SerializeArgsForOp(ctx, s.Name())
|
||||
if err != nil {
|
||||
return "", "", errors.Wrap(err, "failed to exec command, error getting additional args")
|
||||
}
|
||||
args = append(args, workspaceArgs...)
|
||||
cmd := exec.CommandContext(ctx, "pulumi", args...)
|
||||
cmd.Dir = s.Workspace().WorkDir()
|
||||
if s.Workspace().PulumiHome() != "" {
|
||||
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
|
||||
cmd.Env = append(os.Environ(), homeEnv)
|
||||
}
|
||||
type languageRuntimeServer struct {
|
||||
m sync.Mutex
|
||||
c *sync.Cond
|
||||
|
||||
cmd.Stdout = &stdout
|
||||
stderr, _ := cmd.StderrPipe()
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return "", "", errors.Wrap(err, "failed to start host command")
|
||||
}
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
scanner.Split(bufio.ScanLines)
|
||||
fn pulumi.RunFunc
|
||||
address string
|
||||
|
||||
resMonAddrChan := make(chan string)
|
||||
engineAddrChan := make(chan string)
|
||||
failChan := make(chan bool)
|
||||
go func() {
|
||||
numAddrs := 0
|
||||
for scanner.Scan() {
|
||||
m := scanner.Text()
|
||||
errBuff.WriteString(m)
|
||||
if strings.HasPrefix(m, "resmon: ") {
|
||||
numAddrs++
|
||||
// resmon: 127.0.0.1:23423
|
||||
resMonAddrChan <- strings.Split(m, " ")[1]
|
||||
}
|
||||
if strings.HasPrefix(m, "engine: ") {
|
||||
numAddrs++
|
||||
// engine: 127.0.0.1:23423
|
||||
engineAddrChan <- strings.Split(m, " ")[1]
|
||||
}
|
||||
}
|
||||
if numAddrs < 2 {
|
||||
failChan <- true
|
||||
}
|
||||
}()
|
||||
var monitorAddr string
|
||||
var engineAddr string
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return stdout.String(), errBuff.String(), ctx.Err()
|
||||
case <-failChan:
|
||||
return stdout.String(), errBuff.String(), errors.New("failed to launch host")
|
||||
case monitorAddr = <-resMonAddrChan:
|
||||
case engineAddr = <-engineAddrChan:
|
||||
}
|
||||
}
|
||||
|
||||
cfg, err := s.GetAllConfig(ctx)
|
||||
if err != nil {
|
||||
return stdout.String(), errBuff.String(), errors.Wrap(err, "failed to serialize config for inline program")
|
||||
}
|
||||
cfgMap := make(map[string]string)
|
||||
for k, v := range cfg {
|
||||
cfgMap[k] = v.Value
|
||||
}
|
||||
|
||||
runInfo := pulumi.RunInfo{
|
||||
EngineAddr: engineAddr,
|
||||
MonitorAddr: monitorAddr,
|
||||
Config: cfgMap,
|
||||
Project: proj.Name.String(),
|
||||
Stack: s.Name(),
|
||||
}
|
||||
if parallel > 0 {
|
||||
runInfo.Parallel = parallel
|
||||
}
|
||||
err = execUserCode(ctx, s.Workspace().Program(), runInfo)
|
||||
if err != nil {
|
||||
interruptErr := cmd.Process.Signal(os.Interrupt)
|
||||
if interruptErr != nil {
|
||||
return stdout.String(), errBuff.String(),
|
||||
errors.Wrap(err, "failed to run inline program and shutdown gracefully, could not kill host")
|
||||
}
|
||||
waitErr := cmd.Wait()
|
||||
if waitErr != nil {
|
||||
return stdout.String(), errBuff.String(),
|
||||
errors.Wrap(err, "failed to run inline program and shutdown gracefully")
|
||||
}
|
||||
return stdout.String(), errBuff.String(), errors.Wrap(err, "error running inline pulumi program")
|
||||
}
|
||||
|
||||
err = cmd.Process.Signal(os.Interrupt)
|
||||
if err != nil {
|
||||
return stdout.String(), errBuff.String(), errors.Wrap(err, "failed to shutdown host gracefully")
|
||||
}
|
||||
err = cmd.Wait()
|
||||
|
||||
if err != nil {
|
||||
return stdout.String(), errBuff.String(), err
|
||||
}
|
||||
|
||||
err = s.Workspace().PostCommandCallback(ctx, s.Name())
|
||||
if err != nil {
|
||||
return stdout.String(), errBuff.String(),
|
||||
errors.Wrap(err, "command ran successfully, but error running PostCommandCallback")
|
||||
}
|
||||
|
||||
return stdout.String(), errBuff.String(), nil
|
||||
state int
|
||||
cancel chan bool
|
||||
done chan error
|
||||
}
|
||||
|
||||
func execUserCode(ctx context.Context, fn pulumi.RunFunc, info pulumi.RunInfo) (err error) {
|
||||
// isNestedInvocation returns true if pulumi.RunWithContext is on the stack.
|
||||
func isNestedInvocation() bool {
|
||||
depth, callers := 0, make([]uintptr, 32)
|
||||
for {
|
||||
n := runtime.Callers(depth, callers)
|
||||
if n == 0 {
|
||||
return false
|
||||
}
|
||||
depth += n
|
||||
|
||||
frames := runtime.CallersFrames(callers)
|
||||
for f, more := frames.Next(); more; f, more = frames.Next() {
|
||||
if f.Function == "github.com/pulumi/pulumi/sdk/v2/go/pulumi.RunWithContext" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startLanguageRuntimeServer(fn pulumi.RunFunc) (*languageRuntimeServer, error) {
|
||||
if isNestedInvocation() {
|
||||
return nil, errors.New("nested stack operations are not supported https://github.com/pulumi/pulumi/issues/5058")
|
||||
}
|
||||
|
||||
s := &languageRuntimeServer{
|
||||
fn: fn,
|
||||
cancel: make(chan bool),
|
||||
}
|
||||
s.c = sync.NewCond(&s.m)
|
||||
|
||||
port, done, err := rpcutil.Serve(0, s.cancel, []func(*grpc.Server) error{
|
||||
func(srv *grpc.Server) error {
|
||||
pulumirpc.RegisterLanguageRuntimeServer(srv, s)
|
||||
return nil
|
||||
},
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.address, s.done = fmt.Sprintf("127.0.0.1:%d", port), done
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *languageRuntimeServer) Close() error {
|
||||
s.m.Lock()
|
||||
switch s.state {
|
||||
case stateCanceled:
|
||||
s.m.Unlock()
|
||||
return nil
|
||||
case stateWaiting:
|
||||
// Not started yet; go ahead and cancel
|
||||
default:
|
||||
for s.state != stateFinished {
|
||||
s.c.Wait()
|
||||
}
|
||||
}
|
||||
s.state = stateCanceled
|
||||
s.m.Unlock()
|
||||
|
||||
s.cancel <- true
|
||||
close(s.cancel)
|
||||
return <-s.done
|
||||
}
|
||||
|
||||
func (s *languageRuntimeServer) GetRequiredPlugins(ctx context.Context,
|
||||
req *pulumirpc.GetRequiredPluginsRequest) (*pulumirpc.GetRequiredPluginsResponse, error) {
|
||||
return &pulumirpc.GetRequiredPluginsResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *languageRuntimeServer) Run(ctx context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
|
||||
s.m.Lock()
|
||||
if s.state == stateCanceled {
|
||||
s.m.Unlock()
|
||||
return nil, errors.Errorf("program canceled")
|
||||
}
|
||||
s.state = stateRunning
|
||||
s.m.Unlock()
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if pErr, ok := r.(error); ok {
|
||||
err = errors.Wrap(pErr, "go inline source runtime error, an unhandled error occurred:")
|
||||
} else {
|
||||
err = errors.New("go inline source runtime error, an unhandled error occurred: unknown error")
|
||||
}
|
||||
}
|
||||
s.m.Lock()
|
||||
s.state = stateFinished
|
||||
s.m.Unlock()
|
||||
s.c.Broadcast()
|
||||
}()
|
||||
stack := string(debug.Stack())
|
||||
if strings.Contains(stack, "github.com/pulumi/pulumi/sdk/go/pulumi/run.go") {
|
||||
return errors.New("nested stack operations are not supported https://github.com/pulumi/pulumi/issues/5058")
|
||||
|
||||
var engineAddress string
|
||||
if len(req.Args) > 0 {
|
||||
engineAddress = req.Args[0]
|
||||
}
|
||||
pulumiCtx, err := pulumi.NewContext(ctx, info)
|
||||
runInfo := pulumi.RunInfo{
|
||||
EngineAddr: engineAddress,
|
||||
MonitorAddr: req.GetMonitorAddress(),
|
||||
Config: req.GetConfig(),
|
||||
Project: req.GetProject(),
|
||||
Stack: req.GetStack(),
|
||||
Parallel: int(req.GetParallel()),
|
||||
}
|
||||
|
||||
pulumiCtx, err := pulumi.NewContext(ctx, runInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
return pulumi.RunWithContext(pulumiCtx, fn)
|
||||
|
||||
err = func() (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if pErr, ok := r.(error); ok {
|
||||
err = errors.Wrap(pErr, "go inline source runtime error, an unhandled error occurred:")
|
||||
} else {
|
||||
err = errors.New("go inline source runtime error, an unhandled error occurred: unknown error")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return pulumi.RunWithContext(pulumiCtx, s.fn)
|
||||
}()
|
||||
if err != nil {
|
||||
return &pulumirpc.RunResponse{Error: err.Error()}, nil
|
||||
}
|
||||
return &pulumirpc.RunResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *languageRuntimeServer) GetPluginInfo(ctx context.Context, req *pbempty.Empty) (*pulumirpc.PluginInfo, error) {
|
||||
return &pulumirpc.PluginInfo{
|
||||
Version: "1.0.0",
|
||||
}, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue