Revise host mode. (#5317)

* Revise host mode.

The current implementation of host mode uses a `pulumi host` command and
an ad-hoc communication protocol between the engine and client to
connect a language host after the host has begun listening. The most
significant disadvantages of this approach are the communication
protocol (which currently requires the use of stdout), the host-specific
command, and the difficulty of accommodating the typical program-bound
lifetime for an update.

These changes reimplement host mode by adding engine support for
connecting to an existing language runtime service rather than launching
a plugin. This capability is provided via an engine-specific language
runtime, `client`, which accepts the address of the existing languge
runtime service as a runtime option. The CLI exposes this runtime via
the `--client` flag to the `up` and `preview` commands, which similarly
accepts the address of an existing language runtime service as an
argument. These changes also adjust the automation API to consume the
new host mode implementation.
This commit is contained in:
Pat Gavlin 2020-09-14 17:40:17 -07:00 committed by GitHub
parent 46c7c327dd
commit 855f1fd1cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 569 additions and 552 deletions

View file

@ -14,6 +14,11 @@ CHANGELOG
when interacting with a private repo
[#5333](https://github.com/pulumi/pulumi/pull/5333)
- Revise the design for connecting an existing language runtime to a CLI invocation.
Note that this is a protocol breaking change for the Automation API, so both the
API and the CLI must be updated together.
[#5317](https://github.com/pulumi/pulumi/pull/5317)
## 2.10.0 (2020-09-10)
- feat(autoapi): add Upsert methods for stacks

View file

@ -1,248 +0,0 @@
// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/pulumi/pulumi/pkg/v2/backend"
"github.com/pulumi/pulumi/pkg/v2/backend/display"
"github.com/pulumi/pulumi/pkg/v2/engine"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/result"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
)
// intentionally disabling here for cleaner err declaration/assignment.
// nolint: vetshadow
func newHostCmd() *cobra.Command {
var debug bool
var expectNop bool
var message string
var execKind string
var stack string
var configArray []string
var path bool
// Flags for engine.UpdateOptions.
var policyPackPaths []string
var policyPackConfigPaths []string
var diffDisplay bool
var eventLogPath string
var parallel int
var refresh bool
var showConfig bool
var showReplacementSteps bool
var showSames bool
var showReads bool
var suppressOutputs bool
var yes bool
var secretsProvider string
var targets []string
var replaces []string
var targetReplaces []string
var targetDependents bool
var cmd = &cobra.Command{
Use: "host",
Short: "[PREVIEW] Launch the engine in 'host' mode without launching a language runtime",
Long: "[PREVIEW] Launch the engine in 'host' mode without launching a language runtime.\n" +
"\n" +
"To support some automation scenarios, the engine may be launched in this mode to prevent it from\n" +
"needing to spawn its own language runtime. In this case, the lifetime of interactions between the\n" +
"language and the engine host must be managed manually.",
// host is an internal command not intended for public use
Hidden: true,
Run: cmdutil.RunResultFunc(func(cmd *cobra.Command, args []string) result.Result {
isPreview := len(args) > 0 && args[0] == "preview"
// Construct the options based on flags.
opts, err := updateFlagsToOptions(false, true, true)
if err != nil {
return result.FromError(err)
}
opts.Display = display.Options{
Color: cmdutil.GetGlobalColorization(),
ShowConfig: showConfig,
ShowReplacementSteps: showReplacementSteps,
ShowSameResources: showSames,
ShowReads: showReads,
SuppressOutputs: suppressOutputs,
IsInteractive: false,
Type: display.DisplayDiff,
EventLogPath: eventLogPath,
Debug: debug,
}
opts.Engine = engine.UpdateOptions{
Parallel: parallel,
Debug: debug,
Refresh: refresh,
UseLegacyDiff: useLegacyDiff(),
IsHostCommand: true,
}
// we set up the stack in auto.NewStack before execing host
s, err := requireStack(stack, false /*offerNew*/, opts.Display, false /*setCurrent*/)
if err != nil {
return result.FromError(err)
}
proj, root, err := readProject()
if err != nil {
return result.FromError(err)
}
proj.Runtime = workspace.NewProjectRuntimeInfo("host", nil)
sm, err := getStackSecretsManager(s)
if err != nil {
return result.FromError(errors.Wrap(err, "getting secrets manager"))
}
cfg, err := getStackConfiguration(s, sm)
if err != nil {
return result.FromError(errors.Wrap(err, "getting stack configuration"))
}
m, err := getUpdateMetadata(message, root, execKind)
if err != nil {
return result.FromError(errors.Wrap(err, "gathering environment metadata"))
}
operation := s.Update
if isPreview {
operation = s.Preview
opts.Display.JSONDisplay = true
}
// Now perform the update. This will stay alive until the user cancels the host.
changes, res := operation(commandContext(), backend.UpdateOperation{
Proj: proj,
Root: root,
Opts: opts,
M: m,
StackConfiguration: cfg,
SecretsManager: sm,
Scopes: cancellationScopesWithoutInterrupt,
})
switch {
case res != nil && res.Error() == context.Canceled:
return result.FromError(errors.New("update cancelled"))
case res != nil:
return PrintEngineResult(res)
case expectNop && changes != nil && changes.HasChanges():
return result.FromError(errors.New("error: no changes were expected but changes occurred"))
default:
return nil
}
}),
}
cmd.PersistentFlags().BoolVarP(
&debug, "debug", "d", false,
"Print detailed debugging output during resource operations")
cmd.PersistentFlags().BoolVar(
&expectNop, "expect-no-changes", false,
"Return an error if any changes occur during this update")
cmd.PersistentFlags().StringVarP(
&stack, "stack", "s", "",
"The name of the stack to operate on. Defaults to the current stack")
cmd.PersistentFlags().StringVar(
&stackConfigFile, "config-file", "",
"Use the configuration values in the specified file rather than detecting the file name")
cmd.PersistentFlags().StringArrayVarP(
&configArray, "config", "c", []string{},
"Config to use during the update")
cmd.PersistentFlags().BoolVar(
&path, "config-path", false,
"Config keys contain a path to a property in a map or list to set")
cmd.PersistentFlags().StringVar(
&secretsProvider, "secrets-provider", "default", "The type of the provider that should be used to encrypt and "+
"decrypt secrets (possible choices: default, passphrase, awskms, azurekeyvault, gcpkms, hashivault). Only"+
"used when creating a new stack from an existing template")
cmd.PersistentFlags().StringVarP(
&message, "message", "m", "",
"Optional message to associate with the update operation")
cmd.PersistentFlags().StringArrayVarP(
&targets, "target", "t", []string{},
"Specify a single resource URN to update. Other resources will not be updated."+
" Multiple resources can be specified using --target urn1 --target urn2")
cmd.PersistentFlags().StringArrayVar(
&replaces, "replace", []string{},
"Specify resources to replace. Multiple resources can be specified using --replace run1 --replace urn2")
cmd.PersistentFlags().StringArrayVar(
&targetReplaces, "target-replace", []string{},
"Specify a single resource URN to replace. Other resources will not be updated."+
" Shorthand for --target urn --replace urn.")
cmd.PersistentFlags().BoolVar(
&targetDependents, "target-dependents", false,
"Allows updating of dependent targets discovered but not specified in --target list")
// Flags for engine.UpdateOptions.
cmd.PersistentFlags().StringSliceVar(
&policyPackPaths, "policy-pack", []string{},
"[PREVIEW] Run one or more policy packs as part of this update")
cmd.PersistentFlags().StringSliceVar(
&policyPackConfigPaths, "policy-pack-config", []string{},
`[PREVIEW] Path to JSON file containing the config for the policy pack of the corresponding "--policy-pack" flag`)
cmd.PersistentFlags().BoolVar(
&diffDisplay, "diff", false,
"Display operation as a rich diff showing the overall change")
cmd.PersistentFlags().IntVarP(
&parallel, "parallel", "p", defaultParallel,
"Allow P resource operations to run in parallel at once (1 for no parallelism). Defaults to unbounded.")
cmd.PersistentFlags().BoolVarP(
&refresh, "refresh", "r", false,
"Refresh the state of the stack's resources before this update")
cmd.PersistentFlags().BoolVar(
&showConfig, "show-config", false,
"Show configuration keys and variables")
cmd.PersistentFlags().BoolVar(
&showReplacementSteps, "show-replacement-steps", false,
"Show detailed resource replacement creates and deletes instead of a single step")
cmd.PersistentFlags().BoolVar(
&showSames, "show-sames", false,
"Show resources that don't need be updated because they haven't changed, alongside those that do")
cmd.PersistentFlags().BoolVar(
&showReads, "show-reads", false,
"Show resources that are being read in, alongside those being managed directly in the stack")
cmd.PersistentFlags().BoolVar(
&suppressOutputs, "suppress-outputs", false,
"Suppress display of stack outputs (in case they contain sensitive values)")
cmd.PersistentFlags().BoolVarP(
&yes, "yes", "y", false,
"Automatically approve and perform the update after previewing it")
if hasDebugCommands() {
cmd.PersistentFlags().StringVar(
&eventLogPath, "event-log", "",
"Log events to a file at this path")
}
// internal flag
cmd.PersistentFlags().StringVar(&execKind, "exec-kind", "", "")
// ignore err, only happens if flag does not exist
_ = cmd.PersistentFlags().MarkHidden("exec-kind")
return cmd
}

View file

@ -67,7 +67,7 @@ func getProjectPlugins() ([]workspace.PluginInfo, error) {
// Get the required plugins and then ensure they have metadata populated about them. Because it's possible
// a plugin required by the project hasn't yet been installed, we will simply skip any errors we encounter.
var results []workspace.PluginInfo
plugins, err := ctx.Host.GetRequiredPlugins(plugin.ProgInfo{
plugins, err := plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,

View file

@ -34,6 +34,7 @@ func newPreviewCmd() *cobra.Command {
var stack string
var configArray []string
var configPath bool
var client string
// Flags for engine.UpdateOptions.
var jsonDisplay bool
@ -106,7 +107,7 @@ func newPreviewCmd() *cobra.Command {
return result.FromError(err)
}
proj, root, err := readProject()
proj, root, err := readProjectForUpdate(client)
if err != nil {
return result.FromError(err)
}
@ -195,6 +196,10 @@ func newPreviewCmd() *cobra.Command {
&configPath, "config-path", false,
"Config keys contain a path to a property in a map or list to set")
cmd.PersistentFlags().StringVar(
&client, "client", "", "The address of an existing language runtime host to connect to")
_ = cmd.PersistentFlags().MarkHidden("client")
cmd.PersistentFlags().StringVarP(
&message, "message", "m", "",
"Optional message to associate with the preview operation")

View file

@ -211,7 +211,6 @@ func NewPulumiCmd() *cobra.Command {
// Less common, and thus hidden, commands:
cmd.AddCommand(newGenCompletionCmd(cmd))
cmd.AddCommand(newGenMarkdownCmd(cmd))
cmd.AddCommand(newHostCmd())
// We have a set of commands that are still experimental and that we add only when PULUMI_EXPERIMENTAL is set
// to true.

View file

@ -51,6 +51,7 @@ func newUpCmd() *cobra.Command {
var stack string
var configArray []string
var path bool
var client string
// Flags for engine.UpdateOptions.
var policyPackPaths []string
@ -85,7 +86,7 @@ func newUpCmd() *cobra.Command {
return result.FromError(err)
}
proj, root, err := readProject()
proj, root, err := readProjectForUpdate(client)
if err != nil {
return result.FromError(err)
}
@ -399,6 +400,10 @@ func newUpCmd() *cobra.Command {
"decrypt secrets (possible choices: default, passphrase, awskms, azurekeyvault, gcpkms, hashivault). Only"+
"used when creating a new stack from an existing template")
cmd.PersistentFlags().StringVar(
&client, "client", "", "The address of an existing language runtime host to connect to")
_ = cmd.PersistentFlags().MarkHidden("client")
cmd.PersistentFlags().StringVarP(
&message, "message", "m", "",
"Optional message to associate with the update operation")

View file

@ -399,6 +399,24 @@ func parseAndSaveConfigArray(s backend.Stack, configArray []string, path bool) e
return nil
}
// readProjectForUpdate attempts to detect and read a Pulumi project for the current workspace. If
// the project is successfully detected and read, it is returned along with the path to its
// containing directory, which will be used as the root of the project's Pulumi program. If a
// client address is present, the returned project will always have the runtime set to "client"
// with the address option set to the client address.
func readProjectForUpdate(clientAddress string) (*workspace.Project, string, error) {
proj, root, err := readProject()
if err != nil {
return nil, "", err
}
if clientAddress != "" {
proj.Runtime = workspace.NewProjectRuntimeInfo("client", map[string]interface{}{
"address": clientAddress,
})
}
return proj, root, nil
}
// readProject attempts to detect and read a Pulumi project for the current workspace. If the
// project is successfully detected and read, it is returned along with the path to its containing
// directory, which will be used as the root of the project's Pulumi program.
@ -683,14 +701,11 @@ func (s *cancellationScope) Close() {
<-s.done
}
type cancellationScopeSource struct {
disableInterrupt bool
}
type cancellationScopeSource int
var cancellationScopes = backend.CancellationScopeSource(cancellationScopeSource{})
var cancellationScopesWithoutInterrupt = backend.CancellationScopeSource(cancellationScopeSource{true})
var cancellationScopes = backend.CancellationScopeSource(cancellationScopeSource(0))
func (cs cancellationScopeSource) NewScope(events chan<- engine.Event, isPreview bool) backend.CancellationScope {
func (cancellationScopeSource) NewScope(events chan<- engine.Event, isPreview bool) backend.CancellationScope {
cancelContext, cancelSource := cancel.NewContext(context.Background())
c := &cancellationScope{
@ -701,30 +716,28 @@ func (cs cancellationScopeSource) NewScope(events chan<- engine.Event, isPreview
go func() {
for range c.sigint {
if !cs.disableInterrupt {
// If we haven't yet received a SIGINT, call the cancellation func. Otherwise call the termination
// func.
if cancelContext.CancelErr() == nil {
message := "^C received; cancelling. If you would like to terminate immediately, press ^C again.\n"
if !isPreview {
message += colors.BrightRed + "Note that terminating immediately may lead to orphaned resources " +
"and other inconsistencies.\n" + colors.Reset
}
events <- engine.NewEvent(engine.StdoutColorEvent, engine.StdoutEventPayload{
Message: message,
Color: colors.Always,
})
cancelSource.Cancel()
} else {
message := colors.BrightRed + "^C received; terminating" + colors.Reset
events <- engine.NewEvent(engine.StdoutColorEvent, engine.StdoutEventPayload{
Message: message,
Color: colors.Always,
})
cancelSource.Terminate()
// If we haven't yet received a SIGINT, call the cancellation func. Otherwise call the termination
// func.
if cancelContext.CancelErr() == nil {
message := "^C received; cancelling. If you would like to terminate immediately, press ^C again.\n"
if !isPreview {
message += colors.BrightRed + "Note that terminating immediately may lead to orphaned resources " +
"and other inconsistencies.\n" + colors.Reset
}
engine.NewEvent(engine.StdoutColorEvent, engine.StdoutEventPayload{
Message: message,
Color: colors.Always,
})
cancelSource.Cancel()
} else {
message := colors.BrightRed + "^C received; terminating" + colors.Reset
engine.NewEvent(engine.StdoutColorEvent, engine.StdoutEventPayload{
Message: message,
Color: colors.Always,
})
cancelSource.Terminate()
}
}
close(c.done)

View file

@ -25,9 +25,12 @@ import (
"testing"
"github.com/blang/semver"
pbempty "github.com/golang/protobuf/ptypes/empty"
"github.com/mitchellh/copystructure"
combinations "github.com/mxschmitt/golang-combinations"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/pulumi/pulumi/pkg/v2/resource/deploy"
@ -44,11 +47,11 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/result"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
combinations "github.com/mxschmitt/golang-combinations"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
type JournalEntryKind int
@ -347,14 +350,15 @@ type TestStep struct {
}
type TestPlan struct {
Project string
Stack string
Runtime string
Config config.Map
Decrypter config.Decrypter
BackendClient deploy.BackendClient
Options UpdateOptions
Steps []TestStep
Project string
Stack string
Runtime string
RuntimeOptions map[string]interface{}
Config config.Map
Decrypter config.Decrypter
BackendClient deploy.BackendClient
Options UpdateOptions
Steps []TestStep
}
//nolint: goconst
@ -392,7 +396,7 @@ func (p *TestPlan) GetProject() workspace.Project {
return workspace.Project{
Name: projectName,
Runtime: workspace.NewProjectRuntimeInfo(runtime, nil),
Runtime: workspace.NewProjectRuntimeInfo(runtime, p.RuntimeOptions),
}
}
@ -5990,3 +5994,114 @@ func TestSingleComponentDefaultProviderLifecycle(t *testing.T) {
}
p.Run(t, nil)
}
type updateContext struct {
*deploytest.ResourceMonitor
resmon chan *deploytest.ResourceMonitor
programErr chan error
snap chan *deploy.Snapshot
updateResult chan result.Result
}
func startUpdate(host plugin.Host) (*updateContext, error) {
ctx := &updateContext{
resmon: make(chan *deploytest.ResourceMonitor),
programErr: make(chan error),
snap: make(chan *deploy.Snapshot),
updateResult: make(chan result.Result),
}
stop := make(chan bool)
port, _, err := rpcutil.Serve(0, stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterLanguageRuntimeServer(srv, ctx)
return nil
},
}, nil)
if err != nil {
return nil, err
}
p := &TestPlan{
Options: UpdateOptions{host: host},
Runtime: "client",
RuntimeOptions: map[string]interface{}{
"address": fmt.Sprintf("127.0.0.1:%d", port),
},
}
go func() {
snap, res := TestOp(Update).Run(p.GetProject(), p.GetTarget(nil), p.Options, false, p.BackendClient, nil)
ctx.snap <- snap
close(ctx.snap)
ctx.updateResult <- res
close(ctx.updateResult)
stop <- true
}()
ctx.ResourceMonitor = <-ctx.resmon
return ctx, nil
}
func (ctx *updateContext) Finish(err error) (*deploy.Snapshot, result.Result) {
ctx.programErr <- err
close(ctx.programErr)
return <-ctx.snap, <-ctx.updateResult
}
func (ctx *updateContext) GetRequiredPlugins(_ context.Context,
req *pulumirpc.GetRequiredPluginsRequest) (*pulumirpc.GetRequiredPluginsResponse, error) {
return &pulumirpc.GetRequiredPluginsResponse{}, nil
}
func (ctx *updateContext) Run(_ context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
// Connect to the resource monitor and create an appropriate client.
conn, err := grpc.Dial(
req.MonitorAddress,
grpc.WithInsecure(),
rpcutil.GrpcChannelOptions(),
)
if err != nil {
return nil, errors.Wrapf(err, "could not connect to resource monitor")
}
defer contract.IgnoreClose(conn)
// Fire up a resource monitor client
ctx.resmon <- deploytest.NewResourceMonitor(pulumirpc.NewResourceMonitorClient(conn))
close(ctx.resmon)
// Wait for the program to terminate.
if err := <-ctx.programErr; err != nil {
return &pulumirpc.RunResponse{Error: err.Error()}, nil
}
return &pulumirpc.RunResponse{}, nil
}
func (ctx *updateContext) GetPluginInfo(_ context.Context, req *pbempty.Empty) (*pulumirpc.PluginInfo, error) {
return &pulumirpc.PluginInfo{
Version: "1.0.0",
}, nil
}
func TestLanguageClient(t *testing.T) {
loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{}, nil
}),
}
update, err := startUpdate(deploytest.NewPluginHost(nil, nil, nil, loaders...))
if err != nil {
t.Fatalf("failed to start update: %v", err)
}
// Register resources, etc.
_, _, _, err = update.RegisterResource("pkgA:m:typA", "resA", true)
assert.NoError(t, err)
snap, res := update.Finish(nil)
assert.Nil(t, res)
assert.Len(t, snap.Resources, 2)
}

View file

@ -16,11 +16,10 @@ package engine
import (
"context"
"fmt"
"os"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/v2/resource/deploy"
"github.com/pulumi/pulumi/pkg/v2/resource/deploy/providers"
"github.com/pulumi/pulumi/sdk/v2/go/common/diag"
@ -32,6 +31,8 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
)
const clientRuntimeName = "client"
// ProjectInfoContext returns information about the current project, including its pwd, main, and plugin context.
func ProjectInfoContext(projinfo *Projinfo, host plugin.Host, config plugin.ConfigSource,
diag, statusDiag diag.Sink, tracingSpan opentracing.Span) (string, string, *plugin.Context, error) {
@ -50,6 +51,23 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host, config plugin.Conf
return "", "", nil, err
}
// If the project wants to connect to an existing language runtime, do so now.
if projinfo.Proj.Runtime.Name() == clientRuntimeName {
addressValue, ok := projinfo.Proj.Runtime.Options()["address"]
if !ok {
return "", "", nil, errors.New("missing address of language runtime service")
}
address, ok := addressValue.(string)
if !ok {
return "", "", nil, errors.New("address of language runtime service must be a string")
}
host, err := connectToLanguageRuntime(ctx, address)
if err != nil {
return "", "", nil, err
}
ctx.Host = host
}
return pwd, main, ctx, nil
}
@ -127,10 +145,6 @@ func plan(ctx *Context, info *planContext, opts planOptions, dryRun bool) (*plan
return nil, err
}
if opts.UpdateOptions.IsHostCommand {
fmt.Fprintf(os.Stderr, "engine: %s\n", plugctx.Host.ServerAddr())
}
opts.trustDependencies = proj.TrustResourceDependencies()
// Now create the state source. This may issue an error if it can't create the source. This entails,
// for example, loading any plugins which will be required to execute a program, among other things.

49
pkg/engine/plugin_host.go Normal file
View file

@ -0,0 +1,49 @@
// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
type clientLanguageRuntimeHost struct {
plugin.Host
languageRuntime plugin.LanguageRuntime
}
func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) {
// Dial the language runtime.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()), rpcutil.GrpcChannelOptions())
if err != nil {
return nil, errors.Wrap(err, "could not connect to language host")
}
client := pulumirpc.NewLanguageRuntimeClient(conn)
return &clientLanguageRuntimeHost{
Host: ctx.Host,
languageRuntime: plugin.NewLanguageRuntimeClient(ctx, clientRuntimeName, client),
}, nil
}
func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

View file

@ -75,11 +75,16 @@ func newPluginSet() pluginSet {
func gatherPluginsFromProgram(plugctx *plugin.Context, prog plugin.ProgInfo) (pluginSet, error) {
logging.V(preparePluginLog).Infof("gatherPluginsFromProgram(): gathering plugins from language host")
set := newPluginSet()
langhostPlugins, err := plugctx.Host.GetRequiredPlugins(prog, plugin.AllPlugins)
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, prog, plugin.AllPlugins)
if err != nil {
return set, err
}
for _, plug := range langhostPlugins {
// Ignore language plugins named "client".
if plug.Name == clientRuntimeName && plug.Kind == workspace.LanguagePlugin {
continue
}
logging.V(preparePluginLog).Infof(
"gatherPluginsFromProgram(): plugin %s %s (%s) is required by language host",
plug.Name, plug.Version, plug.ServerURL)

View file

@ -132,9 +132,6 @@ type UpdateOptions struct {
// true if the engine should use legacy diffing behavior during an update.
UseLegacyDiff bool
// true if we're executing in the context of `pulumi host` command.
IsHostCommand bool
// true if we should report events for steps that involve default providers.
reportDefaultProviderSteps bool
@ -389,12 +386,19 @@ func newUpdateSource(
return nil, err
}
// If we are connecting to an existing client, stash the address of the engine in its arguments.
var args []string
if proj.Runtime.Name() == clientRuntimeName {
args = []string{plugctx.Host.ServerAddr()}
}
// If that succeeded, create a new source that will perform interpretation of the compiled program.
// TODO[pulumi/pulumi#88]: we are passing `nil` as the arguments map; we need to allow a way to pass these.
return deploy.NewEvalSource(plugctx, &deploy.EvalRunInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Args: args,
Target: target,
}, defaultProviderVersions, dryRun), nil
}

View file

@ -15,16 +15,22 @@
package deploytest
import (
"context"
"fmt"
"sync"
"github.com/blang/semver"
pbempty "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/pulumi/pulumi/sdk/v2/go/common/diag"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v2/go/common/tokens"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
type LoadProviderFunc func() (plugin.Provider, error)
@ -55,12 +61,53 @@ func NewProviderLoaderWithHost(pkg tokens.Package, version semver.Version,
}
}
type hostEngine struct {
sink diag.Sink
statusSink diag.Sink
address string
stop chan bool
}
func (e *hostEngine) Log(_ context.Context, req *pulumirpc.LogRequest) (*pbempty.Empty, error) {
var sev diag.Severity
switch req.Severity {
case pulumirpc.LogSeverity_DEBUG:
sev = diag.Debug
case pulumirpc.LogSeverity_INFO:
sev = diag.Info
case pulumirpc.LogSeverity_WARNING:
sev = diag.Warning
case pulumirpc.LogSeverity_ERROR:
sev = diag.Error
default:
return nil, errors.Errorf("Unrecognized logging severity: %v", req.Severity)
}
if req.Ephemeral {
e.statusSink.Logf(sev, diag.StreamMessage(resource.URN(req.Urn), req.Message, req.StreamId))
} else {
e.sink.Logf(sev, diag.StreamMessage(resource.URN(req.Urn), req.Message, req.StreamId))
}
return &pbempty.Empty{}, nil
}
func (e *hostEngine) GetRootResource(_ context.Context,
req *pulumirpc.GetRootResourceRequest) (*pulumirpc.GetRootResourceResponse, error) {
return nil, errors.New("unsupported")
}
func (e *hostEngine) SetRootResource(_ context.Context,
req *pulumirpc.SetRootResourceRequest) (*pulumirpc.SetRootResourceResponse, error) {
return nil, errors.New("unsupported")
}
type pluginHost struct {
providerLoaders []*ProviderLoader
languageRuntime plugin.LanguageRuntime
sink diag.Sink
statusSink diag.Sink
engine *hostEngine
providers map[plugin.Provider]struct{}
closed bool
m sync.Mutex
@ -69,11 +116,28 @@ type pluginHost struct {
func NewPluginHost(sink, statusSink diag.Sink, languageRuntime plugin.LanguageRuntime,
providerLoaders ...*ProviderLoader) plugin.Host {
engine := &hostEngine{
sink: sink,
statusSink: statusSink,
stop: make(chan bool),
}
port, _, err := rpcutil.Serve(0, engine.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterEngineServer(srv, engine)
return nil
},
}, nil)
if err != nil {
panic(fmt.Errorf("could not start engine service: %v", err))
}
engine.address = fmt.Sprintf("127.0.0.1:%v", port)
return &pluginHost{
providerLoaders: providerLoaders,
languageRuntime: languageRuntime,
sink: sink,
statusSink: statusSink,
engine: engine,
providers: make(map[plugin.Provider]struct{}),
}
}
@ -143,11 +207,12 @@ func (host *pluginHost) Close() error {
host.m.Lock()
defer host.m.Unlock()
go func() { host.engine.stop <- true }()
host.closed = true
return nil
}
func (host *pluginHost) ServerAddr() string {
panic("Host RPC address not available")
return host.engine.address
}
func (host *pluginHost) Log(sev diag.Severity, urn resource.URN, msg string, streamID int32) {
if !host.isClosed() {

View file

@ -54,6 +54,10 @@ func (rm *ResourceMonitor) Close() error {
return rm.conn.Close()
}
func NewResourceMonitor(resmon pulumirpc.ResourceMonitorClient) *ResourceMonitor {
return &ResourceMonitor{resmon: resmon}
}
type ResourceOptions struct {
Parent resource.URN
Protect bool

View file

@ -73,8 +73,6 @@ type Host interface {
// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
// and/or there are errors loading one or more plugins, a non-nil error is returned.
EnsurePlugins(plugins []workspace.PluginInfo, kinds Flags) error
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error)
// SignalCancellation asks all resource providers to gracefully shut down and abort any ongoing
// operations. Operation aborted in this way will return an error (e.g., `Update` and `Create`
@ -371,42 +369,6 @@ func (host *defaultHost) EnsurePlugins(plugins []workspace.PluginInfo, kinds Fla
return result
}
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func (host *defaultHost) GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error) {
var plugins []workspace.PluginInfo
if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
}
plugins = append(plugins, workspace.PluginInfo{
Name: info.Proj.Runtime.Name(),
Kind: workspace.LanguagePlugin,
})
if kinds&ResourcePlugins != 0 {
// Use the language plugin to compute this project's set of plugin dependencies.
// TODO: we want to support loading precisely what the project needs, rather than doing a static scan of resolved
// packages. Doing this requires that we change our RPC interface and figure out how to configure plugins
// later than we do (right now, we do it up front, but at that point we don't know the version).
deps, err := lang.GetRequiredPlugins(info)
if err != nil {
return nil, errors.Wrapf(err, "failed to discover plugin requirements")
}
plugins = append(plugins, deps...)
}
} else {
// If we can't load the language plugin, we can't discover the resource plugins.
contract.Assertf(kinds&ResourcePlugins != 0,
"cannot load resource plugins without also loading the language plugin")
}
return plugins, nil
}
func (host *defaultHost) SignalCancellation() error {
// NOTE: we're abusing loadPlugin in order to ensure proper synchronization.
_, err := host.loadPlugin(func() (interface{}, error) {
@ -478,3 +440,39 @@ const (
// AllPlugins uses flags to ensure that all plugin kinds are loaded.
var AllPlugins = AnalyzerPlugins | LanguagePlugins | ResourcePlugins
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func GetRequiredPlugins(host Host, info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error) {
var plugins []workspace.PluginInfo
if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
}
plugins = append(plugins, workspace.PluginInfo{
Name: info.Proj.Runtime.Name(),
Kind: workspace.LanguagePlugin,
})
if kinds&ResourcePlugins != 0 {
// Use the language plugin to compute this project's set of plugin dependencies.
// TODO: we want to support loading precisely what the project needs, rather than doing a static scan of resolved
// packages. Doing this requires that we change our RPC interface and figure out how to configure plugins
// later than we do (right now, we do it up front, but at that point we don't know the version).
deps, err := lang.GetRequiredPlugins(info)
if err != nil {
return nil, errors.Wrapf(err, "failed to discover plugin requirements")
}
plugins = append(plugins, deps...)
}
} else {
// If we can't load the language plugin, we can't discover the resource plugins.
contract.Assertf(kinds&ResourcePlugins != 0,
"cannot load resource plugins without also loading the language plugin")
}
return plugins, nil
}

View file

@ -16,10 +16,7 @@ package plugin
import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/blang/semver"
pbempty "github.com/golang/protobuf/ptypes/empty"
@ -30,17 +27,10 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v2/go/common/version"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
// HostOnlyRuntimeMode can be used to launch the engine in "host"-only mode. That is to say, the engine and
// resource monitors will be spawned, but no language runtime will be created. This allows automated
// scenarios that drive the engine from outside of the engine itself, such as, for example, a Node.js
// program spawning, deploying, and tearing down the engine's resources.
const HostOnlyRuntimeMode = "host"
// langhost reflects a language host plugin, loaded dynamically for a single language/runtime pair.
type langhost struct {
ctx *Context
@ -53,12 +43,7 @@ type langhost struct {
// plugin could not be found, or an error occurs while creating the child process, an error is returned.
func NewLanguageRuntime(host Host, ctx *Context, runtime string,
options map[string]interface{}) (LanguageRuntime, error) {
// If this is host-only mode, exit without spawning anything.
if runtime == HostOnlyRuntimeMode {
return &langhost{ctx: ctx, runtime: runtime}, nil
}
// For all other languages, load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.LanguagePlugin, strings.Replace(runtime, tokens.QNameDelimiter, "_", -1), nil)
if err != nil {
@ -90,19 +75,21 @@ func NewLanguageRuntime(host Host, ctx *Context, runtime string,
}, nil
}
func (h *langhost) Runtime() string { return h.runtime }
func (h *langhost) IsHostOnlyMode() bool { return h.runtime == HostOnlyRuntimeMode }
func NewLanguageRuntimeClient(ctx *Context, runtime string, client pulumirpc.LanguageRuntimeClient) LanguageRuntime {
return &langhost{
ctx: ctx,
runtime: runtime,
client: client,
}
}
func (h *langhost) Runtime() string { return h.runtime }
// GetRequiredPlugins computes the complete set of anticipated plugins required by a program.
func (h *langhost) GetRequiredPlugins(info ProgInfo) ([]workspace.PluginInfo, error) {
proj := string(info.Proj.Name)
logging.V(7).Infof("langhost[%v].GetRequiredPlugins(proj=%s,pwd=%s,program=%s) executing",
h.runtime, proj, info.Pwd, info.Program)
if h.IsHostOnlyMode() {
logging.V(7).Infof("langhost[%v].GetRequiredPlugins(proj=%s,pwd=%s,program=%s) in host mode, exiting",
h.runtime, proj, info.Pwd, info.Program)
return nil, nil
}
resp, err := h.client.GetRequiredPlugins(h.ctx.Request(), &pulumirpc.GetRequiredPluginsRequest{
Project: proj,
Pwd: info.Pwd,
@ -156,14 +143,6 @@ func (h *langhost) GetRequiredPlugins(info ProgInfo) ([]workspace.PluginInfo, er
func (h *langhost) Run(info RunInfo) (string, bool, error) {
logging.V(7).Infof("langhost[%v].Run(pwd=%v,program=%v,#args=%v,proj=%s,stack=%v,#config=%v,dryrun=%v) executing",
h.runtime, info.Pwd, info.Program, len(info.Args), info.Project, info.Stack, len(info.Config), info.DryRun)
// In host mode, we simply print the engine and monitor address to stderr and wait for a signal.
if h.IsHostOnlyMode() {
fmt.Fprintf(os.Stderr, "resmon: %s\n", info.MonitorAddress)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
return "", false, nil
}
config := make(map[string]string)
for k, v := range info.Config {
config[k.String()] = v
@ -203,20 +182,15 @@ func (h *langhost) GetPluginInfo() (workspace.PluginInfo, error) {
Kind: workspace.LanguagePlugin,
}
var vers string
if h.IsHostOnlyMode() {
vers = version.Version
} else {
plugInfo.Path = h.plug.Bin
plugInfo.Path = h.plug.Bin
resp, err := h.client.GetPluginInfo(h.ctx.Request(), &pbempty.Empty{})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("langhost[%v].GetPluginInfo() failed: err=%v", h.runtime, rpcError)
return workspace.PluginInfo{}, rpcError
}
vers = resp.Version
resp, err := h.client.GetPluginInfo(h.ctx.Request(), &pbempty.Empty{})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("langhost[%v].GetPluginInfo() failed: err=%v", h.runtime, rpcError)
return workspace.PluginInfo{}, rpcError
}
vers := resp.Version
if vers != "" {
sv, err := semver.ParseTolerant(vers)

View file

@ -350,7 +350,9 @@ func TestRuntimeErrorInlineGo(t *testing.T) {
_, err = s.Up(ctx)
assert.NotNil(t, err)
assert.True(t, IsRuntimeError(err))
if !assert.True(t, IsRuntimeError(err)) {
t.Logf("%v is not a runtime error", err)
}
// -- pulumi destroy --

View file

@ -88,25 +88,28 @@
package auto
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"runtime/debug"
"runtime"
"strings"
"sync"
pbempty "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/pulumi/pulumi/sdk/v2/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v2/go/common/constant"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v2/go/pulumi"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optdestroy"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optpreview"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optrefresh"
"github.com/pulumi/pulumi/sdk/v2/go/x/auto/optup"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
// Stack is an isolated, independently configurable instance of a Pulumi program.
@ -222,23 +225,26 @@ func (s *Stack) Preview(ctx context.Context, opts ...optpreview.Option) (Preview
if preOpts.TargetDependents {
sharedArgs = append(sharedArgs, "--target-dependents")
}
if preOpts.Parallel > 0 {
sharedArgs = append(sharedArgs, fmt.Sprintf("--parallel=%d", preOpts.Parallel))
}
var stdout, stderr string
var code int
if s.Workspace().Program() != nil {
hostArgs := []string{"preview", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoInline)}
hostArgs = append(hostArgs, sharedArgs...)
stdout, stderr, err = s.host(ctx, hostArgs, preOpts.Parallel)
kind, args := constant.ExecKindAutoLocal, []string{"preview", "--json"}
if program := s.Workspace().Program(); program != nil {
server, err := startLanguageRuntimeServer(program)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
}
} else {
args := []string{"preview", "--json", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoLocal)}
args = append(args, sharedArgs...)
stdout, stderr, code, err = s.runPulumiCmdSync(ctx, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
return res, err
}
defer contract.IgnoreClose(server)
kind, args = constant.ExecKindAutoInline, append(args, "--client="+server.address)
}
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run preview"), stdout, stderr, code)
}
err = json.Unmarshal([]byte(stdout), &res)
@ -279,30 +285,26 @@ func (s *Stack) Up(ctx context.Context, opts ...optup.Option) (UpResult, error)
if upOpts.TargetDependents {
sharedArgs = append(sharedArgs, "--target-dependents")
}
if upOpts.Parallel > 0 {
sharedArgs = append(sharedArgs, fmt.Sprintf("--parallel=%d", upOpts.Parallel))
}
var stdout, stderr string
var code int
if s.Workspace().Program() != nil {
// TODO need to figure out how to get error code...
stdout, stderr, err = s.host(
ctx,
append(sharedArgs, fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoInline)),
upOpts.Parallel,
)
kind, args := constant.ExecKindAutoLocal, []string{"up", "--yes", "--skip-preview"}
if program := s.Workspace().Program(); program != nil {
server, err := startLanguageRuntimeServer(program)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
} else {
args := []string{"up", "--yes", "--skip-preview", fmt.Sprintf("--exec-kind=%s", constant.ExecKindAutoLocal)}
args = append(args, sharedArgs...)
if upOpts.Parallel > 0 {
args = append(args, fmt.Sprintf("--parallel=%d", upOpts.Parallel))
return res, err
}
defer contract.IgnoreClose(server)
stdout, stderr, code, err = s.runPulumiCmdSync(ctx, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
kind, args = constant.ExecKindAutoInline, append(args, "--client="+server.address)
}
args = append(args, fmt.Sprintf("--exec-kind=%s", kind))
args = append(args, sharedArgs...)
stdout, stderr, code, err := s.runPulumiCmdSync(ctx, args...)
if err != nil {
return res, newAutoError(errors.Wrap(err, "failed to run update"), stdout, stderr, code)
}
outs, err := s.Outputs(ctx)
@ -676,143 +678,149 @@ func (s *Stack) runPulumiCmdSync(ctx context.Context, args ...string) (string, s
return stdout, stderr, errCode, nil
}
func (s *Stack) host(ctx context.Context, additionalArgs []string, parallel int) (string, string, error) {
proj, err := s.Workspace().ProjectSettings(ctx)
if err != nil {
return "", "", errors.Wrap(err, "could not start run program, failed to start host")
}
const (
stateWaiting = iota
stateRunning
stateCanceled
stateFinished
)
var stdout bytes.Buffer
var errBuff bytes.Buffer
args := []string{"host"}
args = append(args, additionalArgs...)
workspaceArgs, err := s.Workspace().SerializeArgsForOp(ctx, s.Name())
if err != nil {
return "", "", errors.Wrap(err, "failed to exec command, error getting additional args")
}
args = append(args, workspaceArgs...)
cmd := exec.CommandContext(ctx, "pulumi", args...)
cmd.Dir = s.Workspace().WorkDir()
if s.Workspace().PulumiHome() != "" {
homeEnv := fmt.Sprintf("%s=%s", pulumiHomeEnv, s.Workspace().PulumiHome())
cmd.Env = append(os.Environ(), homeEnv)
}
type languageRuntimeServer struct {
m sync.Mutex
c *sync.Cond
cmd.Stdout = &stdout
stderr, _ := cmd.StderrPipe()
err = cmd.Start()
if err != nil {
return "", "", errors.Wrap(err, "failed to start host command")
}
scanner := bufio.NewScanner(stderr)
scanner.Split(bufio.ScanLines)
fn pulumi.RunFunc
address string
resMonAddrChan := make(chan string)
engineAddrChan := make(chan string)
failChan := make(chan bool)
go func() {
numAddrs := 0
for scanner.Scan() {
m := scanner.Text()
errBuff.WriteString(m)
if strings.HasPrefix(m, "resmon: ") {
numAddrs++
// resmon: 127.0.0.1:23423
resMonAddrChan <- strings.Split(m, " ")[1]
}
if strings.HasPrefix(m, "engine: ") {
numAddrs++
// engine: 127.0.0.1:23423
engineAddrChan <- strings.Split(m, " ")[1]
}
}
if numAddrs < 2 {
failChan <- true
}
}()
var monitorAddr string
var engineAddr string
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
return stdout.String(), errBuff.String(), ctx.Err()
case <-failChan:
return stdout.String(), errBuff.String(), errors.New("failed to launch host")
case monitorAddr = <-resMonAddrChan:
case engineAddr = <-engineAddrChan:
}
}
cfg, err := s.GetAllConfig(ctx)
if err != nil {
return stdout.String(), errBuff.String(), errors.Wrap(err, "failed to serialize config for inline program")
}
cfgMap := make(map[string]string)
for k, v := range cfg {
cfgMap[k] = v.Value
}
runInfo := pulumi.RunInfo{
EngineAddr: engineAddr,
MonitorAddr: monitorAddr,
Config: cfgMap,
Project: proj.Name.String(),
Stack: s.Name(),
}
if parallel > 0 {
runInfo.Parallel = parallel
}
err = execUserCode(ctx, s.Workspace().Program(), runInfo)
if err != nil {
interruptErr := cmd.Process.Signal(os.Interrupt)
if interruptErr != nil {
return stdout.String(), errBuff.String(),
errors.Wrap(err, "failed to run inline program and shutdown gracefully, could not kill host")
}
waitErr := cmd.Wait()
if waitErr != nil {
return stdout.String(), errBuff.String(),
errors.Wrap(err, "failed to run inline program and shutdown gracefully")
}
return stdout.String(), errBuff.String(), errors.Wrap(err, "error running inline pulumi program")
}
err = cmd.Process.Signal(os.Interrupt)
if err != nil {
return stdout.String(), errBuff.String(), errors.Wrap(err, "failed to shutdown host gracefully")
}
err = cmd.Wait()
if err != nil {
return stdout.String(), errBuff.String(), err
}
err = s.Workspace().PostCommandCallback(ctx, s.Name())
if err != nil {
return stdout.String(), errBuff.String(),
errors.Wrap(err, "command ran successfully, but error running PostCommandCallback")
}
return stdout.String(), errBuff.String(), nil
state int
cancel chan bool
done chan error
}
func execUserCode(ctx context.Context, fn pulumi.RunFunc, info pulumi.RunInfo) (err error) {
// isNestedInvocation returns true if pulumi.RunWithContext is on the stack.
func isNestedInvocation() bool {
depth, callers := 0, make([]uintptr, 32)
for {
n := runtime.Callers(depth, callers)
if n == 0 {
return false
}
depth += n
frames := runtime.CallersFrames(callers)
for f, more := frames.Next(); more; f, more = frames.Next() {
if f.Function == "github.com/pulumi/pulumi/sdk/v2/go/pulumi.RunWithContext" {
return true
}
}
}
}
func startLanguageRuntimeServer(fn pulumi.RunFunc) (*languageRuntimeServer, error) {
if isNestedInvocation() {
return nil, errors.New("nested stack operations are not supported https://github.com/pulumi/pulumi/issues/5058")
}
s := &languageRuntimeServer{
fn: fn,
cancel: make(chan bool),
}
s.c = sync.NewCond(&s.m)
port, done, err := rpcutil.Serve(0, s.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterLanguageRuntimeServer(srv, s)
return nil
},
}, nil)
if err != nil {
return nil, err
}
s.address, s.done = fmt.Sprintf("127.0.0.1:%d", port), done
return s, nil
}
func (s *languageRuntimeServer) Close() error {
s.m.Lock()
switch s.state {
case stateCanceled:
s.m.Unlock()
return nil
case stateWaiting:
// Not started yet; go ahead and cancel
default:
for s.state != stateFinished {
s.c.Wait()
}
}
s.state = stateCanceled
s.m.Unlock()
s.cancel <- true
close(s.cancel)
return <-s.done
}
func (s *languageRuntimeServer) GetRequiredPlugins(ctx context.Context,
req *pulumirpc.GetRequiredPluginsRequest) (*pulumirpc.GetRequiredPluginsResponse, error) {
return &pulumirpc.GetRequiredPluginsResponse{}, nil
}
func (s *languageRuntimeServer) Run(ctx context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
s.m.Lock()
if s.state == stateCanceled {
s.m.Unlock()
return nil, errors.Errorf("program canceled")
}
s.state = stateRunning
s.m.Unlock()
defer func() {
if r := recover(); r != nil {
if pErr, ok := r.(error); ok {
err = errors.Wrap(pErr, "go inline source runtime error, an unhandled error occurred:")
} else {
err = errors.New("go inline source runtime error, an unhandled error occurred: unknown error")
}
}
s.m.Lock()
s.state = stateFinished
s.m.Unlock()
s.c.Broadcast()
}()
stack := string(debug.Stack())
if strings.Contains(stack, "github.com/pulumi/pulumi/sdk/go/pulumi/run.go") {
return errors.New("nested stack operations are not supported https://github.com/pulumi/pulumi/issues/5058")
var engineAddress string
if len(req.Args) > 0 {
engineAddress = req.Args[0]
}
pulumiCtx, err := pulumi.NewContext(ctx, info)
runInfo := pulumi.RunInfo{
EngineAddr: engineAddress,
MonitorAddr: req.GetMonitorAddress(),
Config: req.GetConfig(),
Project: req.GetProject(),
Stack: req.GetStack(),
Parallel: int(req.GetParallel()),
}
pulumiCtx, err := pulumi.NewContext(ctx, runInfo)
if err != nil {
return err
return nil, err
}
return pulumi.RunWithContext(pulumiCtx, fn)
err = func() (err error) {
defer func() {
if r := recover(); r != nil {
if pErr, ok := r.(error); ok {
err = errors.Wrap(pErr, "go inline source runtime error, an unhandled error occurred:")
} else {
err = errors.New("go inline source runtime error, an unhandled error occurred: unknown error")
}
}
}()
return pulumi.RunWithContext(pulumiCtx, s.fn)
}()
if err != nil {
return &pulumirpc.RunResponse{Error: err.Error()}, nil
}
return &pulumirpc.RunResponse{}, nil
}
func (s *languageRuntimeServer) GetPluginInfo(ctx context.Context, req *pbempty.Empty) (*pulumirpc.PluginInfo, error) {
return &pulumirpc.PluginInfo{
Version: "1.0.0",
}, nil
}