New approach to move us to using deasync as little as possible (and with as little impact to users as possible). (#3325)

This commit is contained in:
CyrusNajmabadi 2019-10-14 22:08:06 -07:00 committed by GitHub
parent 893e51d0ce
commit 91addf2feb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1146 additions and 194 deletions

View file

@ -3,6 +3,14 @@ CHANGELOG
## HEAD (Unreleased)
- Fix hangs and crashes related to use of `getResource` (i.e. `aws.ec2.getSubnetIds(...)`) methods,
including frequent hangs on Node.js 12. This fixes https://github.com/pulumi/pulumi/issues/3260)
and [hangs](https://github.com/pulumi/pulumi/issues/3309).
Some less common existing styles of using `getResource` calls are also deprecated as part of this
change, and users should see https://www.pulumi.com/docs/troubleshooting/#synchronous-call for
details on adjusting their code if needed.
## 1.3.1 (2019-10-09)
- Revert "propagate resource inputs to resource state during preview". These changes had a critical issue that needs
@ -20,7 +28,7 @@ CHANGELOG
- Support renaming stack projects via `pulumi stack rename`.
[#3292](https://github.com/pulumi/pulumi/pull/3292)
- Make the location of `.pulumi` folder configurable with an environment variable.
[#3300](https://github.com/pulumi/pulumi/pull/3300) (Fixes [#2966](https://github.com/pulumi/pulumi/issues/2966))

2
go.mod
View file

@ -5,6 +5,7 @@ go 1.12
require (
cloud.google.com/go v0.39.0
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Microsoft/go-winio v0.4.14
github.com/Sirupsen/logrus v1.0.5 // indirect
github.com/aws/aws-sdk-go v1.19.16
github.com/blang/semver v3.5.1+incompatible
@ -42,7 +43,6 @@ require (
github.com/sergi/go-diff v1.0.0
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.3.0 // indirect
github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c
github.com/spf13/cast v1.2.0
github.com/spf13/cobra v0.0.3

5
go.sum
View file

@ -31,6 +31,8 @@ github.com/Azure/go-autorest/tracing v0.1.0 h1:TRBxC5Pj/fIuh4Qob0ZpkggbfT8RC0Sub
github.com/Azure/go-autorest/tracing v0.1.0/go.mod h1:ROEEAFwXycQw7Sn3DXNtEedEvdeRAgDr0izn4z5Ij88=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190418212003-6ac0b49e7197/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/OpenPeeDeeP/depguard v0.0.0-20180806142446-a69c782687b2 h1:HTOmFEEYrWi4MW5ZKUx6xfeyM10Sx3kQF65xiQJMPYA=
github.com/OpenPeeDeeP/depguard v0.0.0-20180806142446-a69c782687b2/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
@ -381,6 +383,7 @@ github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c h1:fyKiXKO1/I/B6Y2U8T7WdQGWzwehOuGIrljPtt7YTTI=
github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c/go.mod h1:sUM3LWHvSMaG192sy56D9F7CNvL7jUJVXoqM1QKLnog=
github.com/spf13/afero v1.1.0 h1:bopulORc2JeYaxfHLvJa5NzxviA9PoWhpiiJkru7Ji4=
@ -488,6 +491,8 @@ golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b h1:ag/x1USPSsqHud38I9BAC88qdNLDHHtQ4mlgQIZPPNA=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View file

@ -42,6 +42,8 @@ import (
"github.com/hashicorp/go-multierror"
pbempty "github.com/golang/protobuf/ptypes/empty"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/resource/config"
"github.com/pulumi/pulumi/pkg/util/cmdutil"
@ -362,75 +364,180 @@ func getPluginVersion(info packageJSON) (string, error) {
return version, nil
}
// When talking to the nodejs runtime we have three parties involved:
//
// Engine Monitor <==> Language Host (this code) <==> NodeJS sdk runtime.
//
// Instead of having the NodeJS sdk runtime communicating directly with the Engine Monitor, we
// instead have it communicate with us and we send all those messages to the real engine monitor
// itself. We do that by having ourselves launch our own grpc monitor server and passing the
// address of it to the NodeJS runtime. As far as the NodeJS sdk runtime is concerned, it is
// communicating directly with the engine.
//
// When NodeJS then communicates back with us over our server, we then forward the messages
// along untouched to the Engine Monitor. However, we also open an additional *non-grpc*
// channel to allow the sdk runtime to send us messages on. Specifically, this non-grpc channel
// is used entirely to allow the sdk runtime to make 'invoke' calls in a synchronous fashion.
// This is accomplished by avoiding grpc entirely (which has no facility for synchronous rpc
// calls), and instead operating over a pair of files coordinated between us and the sdk
// runtime. One file is used by it to send us messages, and one file is used by us to send
// messages back. Because these are just files, nodejs natively supports allowing both sides to
// read and write from each synchronously.
//
// When we receive the sync-invoke messages from the nodejs sdk we deserialize things off of the
// file and then make a synchronous call to the real engine `invoke` monitor endpoint. Unlike
// nodejs, we have no problem calling this synchronously, and can block until we get the
// response which we can then synchronously send to nodejs.
// RPC endpoint for LanguageRuntimeServer::Run
func (host *nodeLanguageHost) Run(ctx context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
args := host.constructArguments(req)
config, err := host.constructConfig(req)
tracingSpan := opentracing.SpanFromContext(ctx)
// Make a connection to the real monitor that we will forward messages to.
conn, err := grpc.Dial(req.GetMonitorAddress(), grpc.WithInsecure())
if err != nil {
err = errors.Wrap(err, "failed to serialize configuration")
return nil, err
}
env := os.Environ()
env = append(env, pulumiConfigVar+"="+config)
// Make a client around that connection. We can then make our own server that will act as a
// monitor for the sdk and forward to the real monitor.
target := pulumirpc.NewResourceMonitorClient(conn)
if host.typescript {
env = append(env, "PULUMI_NODEJS_TYPESCRIPT=true")
// Channel to control the server lifetime. Once `Run` finishes, we'll shutdown the server.
serverCancel := make(chan bool)
defer func() {
serverCancel <- true
close(serverCancel)
}()
// Launch the rpc server giving it the real monitor to forward messages to.
port, serverDone, err := rpcutil.Serve(0, serverCancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, &monitorProxy{target})
return nil
},
}, tracingSpan)
if err != nil {
return nil, err
}
if logging.V(5) {
commandStr := strings.Join(args, " ")
logging.V(5).Infoln("Language host launching process: ", host.nodeBin, commandStr)
// Create the pipes we'll use to communicate synchronously with the nodejs process. Once we're
// done using the pipes clean them up so we don't leave anything around in the user file system.
pipes, pipesDone, err := createAndServePipes(ctx, target)
if err != nil {
return nil, err
}
// Now simply spawn a process to execute the requested program, wiring up stdout/stderr directly.
var errResult string
cmd := exec.Command(host.nodeBin, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = env
if err := cmd.Run(); err != nil {
// NodeJS stdout is complicated enough that we should explicitly flush stdout and stderr here. NodeJS does
// process writes using console.out and console.err synchronously, but it does not process writes using
// `process.stdout.write` or `process.stderr.write` synchronously, and it is possible that there exist unflushed
// writes on those file descriptors at the time that the Node process exits.
//
// Because of this, we explicitly flush stdout and stderr so that we are absolutely sure that we capture any
// error messages in the engine.
contract.IgnoreError(os.Stdout.Sync())
contract.IgnoreError(os.Stderr.Sync())
if exiterr, ok := err.(*exec.ExitError); ok {
// If the program ran, but exited with a non-zero error code. This will happen often,
// since user errors will trigger this. So, the error message should look as nice as
// possible.
if status, stok := exiterr.Sys().(syscall.WaitStatus); stok {
// Check if we got special exit code that means "we already gave the user an
// actionable message". In that case, we can simply bail out and terminate `pulumi`
// without showing any more messages.
if status.ExitStatus() == nodeJSProcessExitedAfterShowingUserActionableMessage {
return &pulumirpc.RunResponse{Error: "", Bail: true}, nil
}
// Channel producing the final response we want to issue to our caller. Will get the result of
// the actual nodejs process we launch, or any results caused by errors in our server/pipes.
responseChannel := make(chan *pulumirpc.RunResponse)
defer close(responseChannel)
err = errors.Errorf("Program exited with non-zero exit code: %d", status.ExitStatus())
} else {
err = errors.Wrapf(exiterr, "Program exited unexpectedly")
}
} else {
// Otherwise, we didn't even get to run the program. This ought to never happen unless there's
// a bug or system condition that prevented us from running the language exec. Issue a scarier error.
err = errors.Wrapf(err, "Problem executing program (could not run language executor)")
// Forward any rpc server or pipe errors to our output channel.
go func() {
err := <-serverDone
if err != nil {
responseChannel <- &pulumirpc.RunResponse{Error: err.Error()}
}
}()
go func() {
err := <-pipesDone
if err != nil {
responseChannel <- &pulumirpc.RunResponse{Error: err.Error()}
}
}()
// now, launch the nodejs process and actually run the user code in it.
go host.execNodejs(responseChannel, req, fmt.Sprintf("127.0.0.1:%d", port), pipes.directory())
// Wait for one of our launched goroutines to signal that we're done. This might be our proxy
// (in the case of errors), or the launched nodejs completing (either successfully, or with
// errors).
return <-responseChannel, nil
}
// Launch the nodejs process and wait for it to complete. Report success or any errors using the
// `responseChannel` arg.
func (host *nodeLanguageHost) execNodejs(
responseChannel chan<- *pulumirpc.RunResponse, req *pulumirpc.RunRequest,
address, pipesDirectory string) {
// Actually launch nodejs and process the result of it into an appropriate response object.
response := func() *pulumirpc.RunResponse {
args := host.constructArguments(req, address, pipesDirectory)
config, err := host.constructConfig(req)
if err != nil {
err = errors.Wrap(err, "failed to serialize configuration")
return &pulumirpc.RunResponse{Error: err.Error()}
}
errResult = err.Error()
}
env := os.Environ()
env = append(env, pulumiConfigVar+"="+config)
return &pulumirpc.RunResponse{Error: errResult}, nil
if host.typescript {
env = append(env, "PULUMI_NODEJS_TYPESCRIPT=true")
}
if logging.V(5) {
commandStr := strings.Join(args, " ")
logging.V(5).Infoln("Language host launching process: ", host.nodeBin, commandStr)
}
// Now simply spawn a process to execute the requested program, wiring up stdout/stderr directly.
var errResult string
cmd := exec.Command(host.nodeBin, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = env
if err := cmd.Run(); err != nil {
// NodeJS stdout is complicated enough that we should explicitly flush stdout and stderr here. NodeJS does
// process writes using console.out and console.err synchronously, but it does not process writes using
// `process.stdout.write` or `process.stderr.write` synchronously, and it is possible that there exist unflushed
// writes on those file descriptors at the time that the Node process exits.
//
// Because of this, we explicitly flush stdout and stderr so that we are absolutely sure that we capture any
// error messages in the engine.
contract.IgnoreError(os.Stdout.Sync())
contract.IgnoreError(os.Stderr.Sync())
if exiterr, ok := err.(*exec.ExitError); ok {
// If the program ran, but exited with a non-zero error code. This will happen often,
// since user errors will trigger this. So, the error message should look as nice as
// possible.
if status, stok := exiterr.Sys().(syscall.WaitStatus); stok {
// Check if we got special exit code that means "we already gave the user an
// actionable message". In that case, we can simply bail out and terminate `pulumi`
// without showing any more messages.
if status.ExitStatus() == nodeJSProcessExitedAfterShowingUserActionableMessage {
return &pulumirpc.RunResponse{Error: "", Bail: true}
}
err = errors.Errorf("Program exited with non-zero exit code: %d", status.ExitStatus())
} else {
err = errors.Wrapf(exiterr, "Program exited unexpectedly")
}
} else {
// Otherwise, we didn't even get to run the program. This ought to never happen unless there's
// a bug or system condition that prevented us from running the language exec. Issue a scarier error.
err = errors.Wrapf(err, "Problem executing program (could not run language executor)")
}
errResult = err.Error()
}
return &pulumirpc.RunResponse{Error: errResult}
}()
// notify our caller of the response we got from the nodejs process. Note: this is done
// unilaterally. this is how we signal to nodeLanguageHost.Run that we are done and it can
// return to its caller.
responseChannel <- response
}
// constructArguments constructs a command-line for `pulumi-language-nodejs`
// by enumerating all of the optional and non-optional arguments present
// in a RunRequest.
func (host *nodeLanguageHost) constructArguments(req *pulumirpc.RunRequest) []string {
func (host *nodeLanguageHost) constructArguments(req *pulumirpc.RunRequest, address, pipesDirectory string) []string {
args := []string{host.runPath}
maybeAppendArg := func(k, v string) {
if v != "" {
@ -438,8 +545,9 @@ func (host *nodeLanguageHost) constructArguments(req *pulumirpc.RunRequest) []st
}
}
maybeAppendArg("monitor", req.GetMonitorAddress())
maybeAppendArg("monitor", address)
maybeAppendArg("engine", host.engineAddress)
maybeAppendArg("sync", pipesDirectory)
maybeAppendArg("project", req.GetProject())
maybeAppendArg("stack", req.GetStack())
maybeAppendArg("pwd", req.GetPwd())

View file

@ -28,7 +28,7 @@ func TestArgumentConstruction(t *testing.T) {
t.Run("DryRun-NoArguments", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{DryRun: true}
args := host.constructArguments(rr)
args := host.constructArguments(rr, "", "")
assert.Contains(tt, args, "--dry-run")
assert.NotContains(tt, args, "true")
})
@ -36,28 +36,28 @@ func TestArgumentConstruction(t *testing.T) {
t.Run("OptionalArgs-PassedIfSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{Project: "foo"}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, "", ""), " ")
assert.Contains(tt, args, "--project foo")
})
t.Run("OptionalArgs-NotPassedIfNotSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, "", ""), " ")
assert.NotContains(tt, args, "--stack")
})
t.Run("DotIfProgramNotSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, "", ""), " ")
assert.Contains(tt, args, ".")
})
t.Run("ProgramIfProgramSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{Program: "foobar"}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, "", ""), " ")
assert.Contains(tt, args, "foobar")
})
}

View file

@ -0,0 +1,182 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/binary"
"io"
pbempty "github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"github.com/pulumi/pulumi/pkg/util/logging"
pulumirpc "github.com/pulumi/pulumi/sdk/proto/go"
)
// pipes is the platform agnostic abstraction over a pair of channels we can read and write binary
// data over. It is provided through the `createPipes` functions provided in `proxy_unix.go` (where
// it is implemented on top of fifo files), and in `proxy_windows.go` (where it is implemented on
// top of named pipes).
type pipes interface {
// The directory containing the two streams to read and write from. This will be passed to the
// nodejs process so it can connect to our read and writes streams for communication.
directory() string
// Attempt to create and connect to the read and write streams.
connect() error
// The stream that we will use to read in requests send to us by the nodejs process.
reader() io.Reader
// The stream we will write responses back to the nodejs process with.
writer() io.Writer
// called when we're done with the pipes and want to clean up any os resources we may have
// allocated (for example, actual files and directories on disk).
shutdown()
}
func createAndServePipes(ctx context.Context, target pulumirpc.ResourceMonitorClient) (pipes, chan error, error) {
pipes, err := createPipes()
if err != nil {
return nil, nil, err
}
pipesDone := servePipes(ctx, pipes, target)
return pipes, pipesDone, nil
}
func servePipes(ctx context.Context, pipes pipes, target pulumirpc.ResourceMonitorClient) chan error {
done := make(chan error)
go func() {
// Keep reading and writing from the pipes until we run into an error or are canceled.
err := func() error {
pbcodec := encoding.GetCodec(proto.Name)
err := pipes.connect()
if err != nil {
logging.V(10).Infof("Sync invoke: Error connecting to pipes: %s\n", err)
return err
}
for {
// read a 4-byte request length
logging.V(10).Infoln("Sync invoke: Reading length from request pipe")
var reqLen uint32
if err := binary.Read(pipes.reader(), binary.BigEndian, &reqLen); err != nil {
// This is benign on shutdown.
if err == io.EOF {
// We were asked to gracefully cancel. Just exit now.
logging.V(10).Infof("Sync invoke: Gracefully shutting down")
return nil
}
logging.V(10).Infof("Sync invoke: Received error reading length from pipe: %s\n", err)
return err
}
// read the request in full
logging.V(10).Infoln("Sync invoke: Reading message from request pipe")
reqBytes := make([]byte, reqLen)
if _, err := io.ReadFull(pipes.reader(), reqBytes); err != nil {
logging.V(10).Infof("Sync invoke: Received error reading message from pipe: %s\n", err)
return err
}
// decode and dispatch the request
logging.V(10).Infof("Sync invoke: Unmarshalling request")
var req pulumirpc.InvokeRequest
if err := pbcodec.Unmarshal(reqBytes, &req); err != nil {
logging.V(10).Infof("Sync invoke: Received error reading full from pipe: %s\n", err)
return err
}
logging.V(10).Infof("Sync invoke: Invoking: %s", req.GetTok())
res, err := target.Invoke(ctx, &req)
if err != nil {
logging.V(10).Infof("Sync invoke: Received error invoking: %s\n", err)
return err
}
// encode the response
logging.V(10).Infof("Sync invoke: Marshalling response")
resBytes, err := pbcodec.Marshal(res)
if err != nil {
logging.V(10).Infof("Sync invoke: Received error marshalling: %s\n", err)
return err
}
// write the 4-byte response length
logging.V(10).Infoln("Sync invoke: Writing length to request pipe")
if err := binary.Write(pipes.writer(), binary.BigEndian, uint32(len(resBytes))); err != nil {
logging.V(10).Infof("Sync invoke: Error writing length to pipe: %s\n", err)
return err
}
// write the response in full
logging.V(10).Infoln("Sync invoke: Writing message to request pipe")
if _, err := pipes.writer().Write(resBytes); err != nil {
logging.V(10).Infof("Sync invoke: Error writing message to pipe: %s\n", err)
return err
}
}
}()
// Signal our caller that we're done.
done <- err
close(done)
// cleanup any resources the pipes were holding onto.
pipes.shutdown()
}()
return done
}
// Forward all resource monitor calls that we're serving to nodejs back to the engine to actually
// perform.
type monitorProxy struct {
target pulumirpc.ResourceMonitorClient
}
func (p *monitorProxy) Invoke(
ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
return p.target.Invoke(ctx, req)
}
func (p *monitorProxy) ReadResource(
ctx context.Context, req *pulumirpc.ReadResourceRequest) (*pulumirpc.ReadResourceResponse, error) {
return p.target.ReadResource(ctx, req)
}
func (p *monitorProxy) RegisterResource(
ctx context.Context, req *pulumirpc.RegisterResourceRequest) (*pulumirpc.RegisterResourceResponse, error) {
return p.target.RegisterResource(ctx, req)
}
func (p *monitorProxy) RegisterResourceOutputs(
ctx context.Context, req *pulumirpc.RegisterResourceOutputsRequest) (*pbempty.Empty, error) {
return p.target.RegisterResourceOutputs(ctx, req)
}
func (p *monitorProxy) SupportsFeature(
ctx context.Context, req *pulumirpc.SupportsFeatureRequest) (*pulumirpc.SupportsFeatureResponse, error) {
return p.target.SupportsFeature(ctx, req)
}

View file

@ -0,0 +1,103 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//+build !windows
package main
import (
"io"
"io/ioutil"
"os"
"path"
"syscall"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/util/logging"
)
// Unix specific pipe implementation. Fairly simple as it sits on top of a pair of standard fifo
// files that we can communicate over. Slightly complex as this involves extra cleanup steps to
// ensure they're cleaned up when we're done.
func createPipes() (pipes, error) {
dir, err := ioutil.TempDir("", "pulumi-node-pipes")
if err != nil {
return nil, err
}
invokeReqPath, invokeResPath := path.Join(dir, "invoke_req"), path.Join(dir, "invoke_res")
return &unixPipes{
dir: dir,
reqPath: invokeReqPath,
resPath: invokeResPath,
}, nil
}
type unixPipes struct {
dir string
reqPath, resPath string
reqPipe, resPipe *os.File
}
func (p *unixPipes) directory() string {
return p.dir
}
func (p *unixPipes) reader() io.Reader {
return p.reqPipe
}
func (p *unixPipes) writer() io.Writer {
return p.resPipe
}
func (p *unixPipes) connect() error {
if err := syscall.Mkfifo(path.Join(p.dir, "invoke_req"), 0600); err != nil {
logging.V(10).Infof("createPipes: Received error opening request pipe: %s\n", err)
return err
}
if err := syscall.Mkfifo(path.Join(p.dir, "invoke_res"), 0600); err != nil {
logging.V(10).Infof("createPipes: Received error opening result pipe: %s\n", err)
return err
}
invokeReqPipe, err := os.OpenFile(p.reqPath, os.O_RDONLY, 0)
if err != nil {
return err
}
p.reqPipe = invokeReqPipe
invokeResPipe, err := os.OpenFile(p.resPath, os.O_WRONLY, 0)
if err != nil {
return err
}
p.resPipe = invokeResPipe
return nil
}
func (p *unixPipes) shutdown() {
if p.reqPipe != nil {
contract.IgnoreClose(p.reqPipe)
contract.IgnoreError(os.Remove(p.reqPath))
}
if p.resPipe != nil {
contract.IgnoreClose(p.resPipe)
contract.IgnoreError(os.Remove(p.resPath))
}
contract.IgnoreError(os.Remove(p.dir))
}

View file

@ -0,0 +1,113 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//+build windows
package main
import (
"fmt"
"io"
"net"
"os"
"time"
winio "github.com/Microsoft/go-winio"
"github.com/pulumi/pulumi/pkg/util/logging"
)
// Windows specific pipe implementation. Slightly complex as it sits on top of a pair of named pipes
// that have to asynchronously accept connections to. But also fairly simple as windows will take
// care of cleaning things up once our processes complete.
func createPipes() (pipes, error) {
// Generate a random pipe name so that we don't collide with other pipes made by other pulumi
// instances.
rand := uint32(time.Now().UnixNano() + int64(os.Getpid()))
dir := fmt.Sprintf(`\\.\pipe\pulumi%v`, rand)
reqPipeName := dir + `\invoke_req`
resPipeName := dir + `\invoke_res`
reqListener, err := winio.ListenPipe(reqPipeName, nil)
if err != nil {
logging.V(10).Infof("createPipes: Received error trying to create request pipe %s: %s\n", reqPipeName, err)
return nil, err
}
resListener, err := winio.ListenPipe(resPipeName, nil)
if err != nil {
logging.V(10).Infof("createPipes: Received error trying to create response pipe %s: %s\n", resPipeName, err)
return nil, err
}
return &windowsPipes{
dir: dir,
reqListener: reqListener,
resListener: resListener,
}, nil
}
type windowsPipes struct {
dir string
reqListener, resListener net.Listener
reqConn, resConn net.Conn
}
func (p *windowsPipes) directory() string {
return p.dir
}
func (p *windowsPipes) reader() io.Reader {
return p.reqConn
}
func (p *windowsPipes) writer() io.Writer {
return p.resConn
}
func (p *windowsPipes) connect() error {
acceptDone := make(chan error)
defer close(acceptDone);
go func() {
reqConn, err := p.reqListener.Accept()
if err != nil {
acceptDone <- err
}
p.reqConn = reqConn
acceptDone <- nil
}()
go func() {
resConn, err := p.resListener.Accept()
if err != nil {
acceptDone <- err
}
p.resConn = resConn
acceptDone <- nil
}()
res1 := <-acceptDone
res2 := <-acceptDone
if res1 != nil {
return res1
}
return res2
}
func (p *windowsPipes) shutdown() {
// Don't need to do anything here. Named pipes are cleaned up when all processes that are using
// them terminate.
}

View file

@ -94,6 +94,7 @@ function usage(): void {
console.error(` --pwd=pwd change the working directory before running the program`);
console.error(` --monitor=addr [required] the RPC address for a resource monitor to connect to`);
console.error(` --engine=addr the RPC address for a resource engine to connect to`);
console.error(` --sync=path path to synchronous 'invoke' endpoints`);
console.error(` --tracing=url a Zipkin-compatible endpoint to send tracing data to`);
console.error(``);
console.error(` and [program] is a JavaScript program to run in Node.js, and [arg]... optional args to it.`);
@ -146,6 +147,7 @@ function main(args: string[]): void {
addToEnvIfDefined("PULUMI_NODEJS_PARALLEL", argv["parallel"]);
addToEnvIfDefined("PULUMI_NODEJS_MONITOR", argv["monitor"]);
addToEnvIfDefined("PULUMI_NODEJS_ENGINE", argv["engine"]);
addToEnvIfDefined("PULUMI_NODEJS_SYNC", argv["sync"]);
// Ensure that our v8 hooks have been initialized. Then actually load and run the user program.
v8Hooks.isInitializedAsync().then(() => {

View file

@ -13,9 +13,9 @@
// limitations under the License.
import { util } from "protobufjs";
import { ResourceError, RunError } from "./errors";
import { all, Input, Inputs, interpolate, Output, output } from "./output";
import { getStackResource } from "./runtime";
import { ResourceError } from "./errors";
import { Input, Inputs, interpolate, Output, output } from "./output";
import { getStackResource, unknownValue } from "./runtime";
import { readResource, registerResource, registerResourceOutputs } from "./runtime/resource";
import { getProject, getStack } from "./runtime/settings";
import * as utils from "./utils";
@ -701,6 +701,22 @@ export abstract class ProviderResource extends CustomResource {
/** @internal */
private readonly pkg: string;
/** @internal */
// tslint:disable-next-line: variable-name
public __registrationId?: string;
public static async register(provider: ProviderResource | undefined): Promise<string | undefined> {
if (provider === undefined) {
return undefined;
}
if (!provider.__registrationId) {
provider.__registrationId = `${await provider.urn.promise()}::${await provider.id.promise()}`;
}
return provider.__registrationId;
}
/**
* Creates and registers a new provider resource for a particular package.
*

View file

@ -12,54 +12,104 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import * as fs from "fs";
import * as grpc from "grpc";
import * as asset from "../asset";
import { InvokeOptions } from "../invoke";
import * as log from "../log";
import { Inputs } from "../output";
import { Inputs, Output } from "../output";
import { debuggablePromise } from "./debuggable";
import { deserializeProperties, serializeProperties, unknownValue } from "./rpc";
import { excessiveDebugOutput, getMonitor, rpcKeepAlive } from "./settings";
import { deserializeProperties, serializeProperties } from "./rpc";
import { excessiveDebugOutput, getMonitor, rpcKeepAlive, SyncInvokes, tryGetSyncInvokes } from "./settings";
import { ProviderResource, Resource } from "../resource";
import * as utils from "../utils";
const gstruct = require("google-protobuf/google/protobuf/struct_pb.js");
const providerproto = require("../proto/provider_pb.js");
/**
* invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
* can be a bag of computed values (Ts or Promise<T>s), and the result is a Promise<any> that
* resolves when the invoke finishes.
* `invoke` dynamically invokes the function, `tok`, which is offered by a provider plugin. `invoke`
* behaves differently in the case that options contains `{async:true}` or not.
*
* In the case where `{async:true}` is present in the options bag:
*
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider plugin.
* 2. the `props` inputs can be a bag of computed values (including, `T`s, `Promise<T>`s,
* `Output<T>`s etc.).
*
*
* In the case where `{async:true}` is not present in the options bag:
*
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider call.
* However, that Promise will *also* have the respective values of the Provider result exposed
* directly on it as properties.
*
* 2. The inputs must be a bag of simple values, and the result is the result that the Provider
* produced.
*
* Simple values are:
* 1. `undefined`, `null`, string, number or boolean values.
* 2. arrays of simple values.
* 3. objects containing only simple values.
*
* Importantly, simple values do *not* include:
* 1. `Promise`s
* 2. `Output`s
* 3. `Asset`s or `Archive`s
* 4. `Resource`s.
*
* All of these contain async values that would prevent `invoke from being able to operate
* synchronously.
*/
export async function invoke(tok: string, props: Inputs, opts?: InvokeOptions): Promise<any> {
const label = `Invoking function: tok=${tok}`;
log.debug(label +
excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``);
opts = opts || {};
if (opts.parent && opts.provider === undefined) {
opts.provider = opts.parent.getProvider(tok);
export function invoke(tok: string, props: Inputs, opts: InvokeOptions = {}): Promise<any> {
if (opts.async) {
// Use specifically requested async invoking. Respect that.
return invokeAsync(tok, props, opts);
}
const syncInvokes = tryGetSyncInvokes();
if (!syncInvokes) {
// We weren't launched from a pulumi CLI that supports sync-invokes. Let the user know they
// should update and fall back to synchronously blocking on the async invoke.
return invokeFallbackToAsync(tok, props, opts);
}
return invokeSync(tok, props, opts, syncInvokes);
}
let issuedUpdateWarning: boolean | undefined;
export function invokeFallbackToAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
if (!issuedUpdateWarning) {
issuedUpdateWarning = true;
log.warn(
`Pulumi is out of date. To update to a more recent version see instructions at:
https://www.pulumi.com/docs/get-started/install/`);
}
const asyncResult = invokeAsync(tok, props, opts);
const syncResult = utils.promiseResult(asyncResult);
return createLiftedPromise(syncResult);
}
async function invokeAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
const label = `Invoking function: tok=${tok} asynchronously`;
console.log(label);
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
// Wait for all values to be available, and then perform the RPC.
const done = rpcKeepAlive();
try {
const obj = gstruct.Struct.fromJavaScript(
await serializeProperties(`invoke:${tok}`, props));
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(obj)}` : ``);
const serialized = await serializeProperties(`invoke:${tok}`, props);
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``);
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
let providerRef: string | undefined;
if (opts.provider !== undefined) {
const providerURN = await opts.provider.urn.promise();
const providerID = await opts.provider.id.promise() || unknownValue;
providerRef = `${providerURN}::${providerID}`;
}
const provider = await ProviderResource.register(getProvider(tok, opts));
const req = createInvokeRequest(tok, serialized, provider, opts);
const req = new providerproto.InvokeRequest();
req.setTok(tok);
req.setArgs(obj);
req.setProvider(providerRef);
req.setVersion(opts.version || "");
const resp: any = await debuggablePromise(new Promise((innerResolve, innerReject) =>
monitor.invoke(req, (err: grpc.StatusObject, innerResponse: any) => {
log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
@ -80,23 +130,161 @@ export async function invoke(tok: string, props: Inputs, opts?: InvokeOptions):
}
})), label);
// If there were failures, propagate them.
const failures: any = resp.getFailuresList();
if (failures && failures.length) {
let reasons = "";
for (let i = 0; i < failures.length; i++) {
if (reasons !== "") {
reasons += "; ";
}
reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
}
throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
}
// Finally propagate any other properties that were given to us as outputs.
return deserializeProperties(resp.getReturn());
return deserializeResponse(tok, resp);
}
finally {
done();
}
}
function invokeSync(tok: string, props: any, opts: InvokeOptions, syncInvokes: SyncInvokes): Promise<any> {
const label = `Invoking function: tok=${tok} synchronously`;
console.log(label);
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
const serialized = serializePropertiesSync(props);
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``);
const providerRef = getProviderRefSync();
console.log("ProviderRef: " + providerRef);
const req = createInvokeRequest(tok, serialized, providerRef, opts);
// Encode the request.
const reqBytes = Buffer.from(req.serializeBinary());
// Write the request length.
const reqLen = Buffer.alloc(4);
reqLen.writeUInt32BE(reqBytes.length, /*offset:*/ 0);
fs.writeSync(syncInvokes.requests, reqLen);
fs.writeSync(syncInvokes.requests, reqBytes);
// Read the response.
const respLenBytes = Buffer.alloc(4);
fs.readSync(syncInvokes.responses, respLenBytes, /*offset:*/ 0, /*length:*/ 4, /*position:*/ null);
const respLen = respLenBytes.readUInt32BE(/*offset:*/ 0);
const respBytes = Buffer.alloc(respLen);
fs.readSync(syncInvokes.responses, respBytes, /*offset:*/ 0, /*length:*/ respLen, /*position:*/ null);
// Decode the response.
const resp = providerproto.InvokeResponse.deserializeBinary(new Uint8Array(respBytes));
const resultValue = deserializeResponse(tok, resp);
return createLiftedPromise(resultValue);
function getProviderRefSync() {
const provider = getProvider(tok, opts);
if (provider === undefined) {
return undefined;
}
if (provider.__registrationId === undefined) {
log.warn(
`Synchronous call made to "${tok}" with an unregistered provider.
For more details see: https://www.pulumi.com/docs/troubleshooting/#synchronous-call`);
utils.promiseResult(ProviderResource.register(provider));
}
return provider.__registrationId;
}
}
// Expose the properties of the actual result of invoke directly on the promise itself. Note this
// doesn't actually involve any asynchrony. The promise will be created synchronously and the
// values copied to it can be used immediately. We simply make a Promise so that any consumers that
// do a `.then()` on it continue to work even though we've switched from being async to sync.
function createLiftedPromise(value: any): Promise<any> {
const promise = Promise.resolve(value);
Object.assign(promise, value);
return promise;
}
function createInvokeRequest(tok: string, serialized: any, provider: string | undefined, opts: InvokeOptions) {
if (provider !== undefined && typeof provider !== "string") {
throw new Error("Incorrect provider type.");
}
const obj = gstruct.Struct.fromJavaScript(serialized);
const req = new providerproto.InvokeRequest();
req.setTok(tok);
req.setArgs(obj);
req.setProvider(provider);
req.setVersion(opts.version || "");
return req;
}
function getProvider(tok: string, opts: InvokeOptions) {
return opts.provider ? opts.provider :
opts.parent ? opts.parent.getProvider(tok) : undefined;
}
function serializePropertiesSync(prop: any): any {
if (prop === undefined ||
prop === null ||
typeof prop === "boolean" ||
typeof prop === "number" ||
typeof prop === "string") {
return prop;
}
if (asset.Asset.isInstance(prop) || asset.Archive.isInstance(prop)) {
throw new Error("Assets and Archives cannot be passed in as arguments to a data source call.");
}
if (prop instanceof Promise) {
throw new Error("Promises cannot be passed in as arguments to a data source call.");
}
if (Output.isInstance(prop)) {
throw new Error("Outputs cannot be passed in as arguments to a data source call.");
}
if (Resource.isInstance(prop)) {
throw new Error("Resources cannot be passed in as arguments to a data source call.");
}
if (prop instanceof Array) {
const result: any[] = [];
for (let i = 0; i < prop.length; i++) {
// When serializing arrays, we serialize any undefined values as `null`. This matches JSON semantics.
const elem = serializePropertiesSync(prop[i]);
result.push(elem === undefined ? null : elem);
}
return result;
}
return serializeAllKeys(prop, {});
function serializeAllKeys(innerProp: any, obj: any) {
for (const k of Object.keys(innerProp)) {
// When serializing an object, we omit any keys with undefined values. This matches JSON semantics.
const v = serializePropertiesSync(innerProp[k]);
if (v !== undefined) {
obj[k] = v;
}
}
return obj;
}
}
function deserializeResponse(tok: string, resp: any) {
const failures: any = resp.getFailuresList();
if (failures && failures.length) {
let reasons = "";
for (let i = 0; i < failures.length; i++) {
if (reasons !== "") {
reasons += "; ";
}
reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
}
throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
}
return deserializeProperties(resp.getReturn());
}

View file

@ -25,6 +25,7 @@ import {
CustomResource,
CustomResourceOptions,
ID,
ProviderResource,
Resource,
ResourceOptions,
URN,
@ -319,14 +320,9 @@ async function prepareResource(label: string, res: Resource, custom: boolean,
let providerRef: string | undefined;
let importID: ID | undefined;
if (custom) {
if ((<CustomResourceOptions>opts).provider !== undefined) {
const provider = (<CustomResourceOptions>opts).provider!;
const providerURN = await provider.urn.promise();
const providerID = await provider.id.promise() || unknownValue;
providerRef = `${providerURN}::${providerID}`;
}
importID = (<CustomResourceOptions>opts).import;
const customOpts = <CustomResourceOptions>opts;
importID = customOpts.import;
providerRef = await ProviderResource.register(customOpts.provider);
}
// Collect the URNs for explicit/implicit dependencies for the engine so that it can understand
@ -402,7 +398,7 @@ async function getAllTransitivelyReferencedCustomResourceURNs(resources: Set<Res
// [Comp1, Cust1, Comp2, Cust2, Cust3]
const transitivelyReachableResources = getTransitivelyReferencedChildResourcesOfComponentResources(resources);
const transitivelyReachableCustomResources = [...transitivelyReachableResources].filter(r => CustomResource.isInstance(r));
const transitivelyReachableCustomResources = [...transitivelyReachableResources].filter(r => CustomResource.isInstance(r));
const promises = transitivelyReachableCustomResources.map(r => r.urn.promise());
const urns = await Promise.all(promises);
return new Set<string>(urns);
@ -525,7 +521,7 @@ export function registerResourceOutputs(res: Resource, outputs: Inputs | Promise
const label = `monitor.registerResourceOutputs(${urn}, ...)`;
await debuggablePromise(new Promise((resolve, reject) =>
(monitor as any).registerResourceOutputs(req, (err: grpc.ServiceError, innerResponse: any) => {
log.debug(`RegisterResourceOutputs RPC finished: urn=${urn}; `+
log.debug(`RegisterResourceOutputs RPC finished: urn=${urn}; ` +
`err: ${err}, resp: ${innerResponse}`);
if (err) {
// If the monitor is unavailable, it is in the process of shutting down or has already
@ -542,7 +538,7 @@ export function registerResourceOutputs(res: Resource, outputs: Inputs | Promise
resolve();
}
})), label);
}
}
}, false);
}

View file

@ -14,7 +14,7 @@
import * as asset from "../asset";
import * as log from "../log";
import { Input, Inputs, isSecretOutput, Output } from "../output";
import { Input, Inputs, Output } from "../output";
import { ComponentResource, CustomResource, Resource } from "../resource";
import { debuggablePromise, errorString } from "./debuggable";
import { excessiveDebugOutput, isDryRun, monitorSupportsSecrets } from "./settings";
@ -237,6 +237,10 @@ export const specialSecretSig = "1b47061264138c4ac30d75fd1eb44270";
* appropriate, in addition to translating certain "special" values so that they are ready to go on the wire.
*/
export async function serializeProperty(ctx: string, prop: Input<any>, dependentResources: Set<Resource>): Promise<any> {
// IMPORTANT:
// IMPORTANT: Keep this in sync with serializesPropertiesSync in invoke.ts
// IMPORTANT:
if (prop === undefined ||
prop === null ||
typeof prop === "boolean" ||

View file

@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import * as fs from "fs";
import * as grpc from "grpc";
import * as path from "path";
import { RunError } from "../errors";
import * as log from "../log";
import { ComponentResource, URN } from "../resource";
@ -41,6 +43,11 @@ export interface Options {
readonly testModeEnabled?: boolean; // true if we're in testing mode (allows execution without the CLI).
readonly queryMode?: boolean; // true if we're in query mode (does not allow resource registration).
readonly legacyApply?: boolean; // true if we will resolve missing outputs to inputs during preview.
/**
* Directory containing the send/receive files for making synchronous invokes to the engine.
*/
readonly syncDir?: string;
}
/**
@ -171,6 +178,25 @@ export function getMonitor(): Object | undefined {
return monitor;
}
/** @internal */
export interface SyncInvokes {
requests: number;
responses: number;
}
let syncInvokes: SyncInvokes | undefined;
/** @internal */
export function tryGetSyncInvokes(): SyncInvokes | undefined {
if (syncInvokes === undefined && options.syncDir) {
const requests = fs.openSync(path.join(options.syncDir, "invoke_req"), fs.constants.O_WRONLY|fs.constants.O_SYNC);
const responses = fs.openSync(path.join(options.syncDir, "invoke_res"), fs.constants.O_RDONLY|fs.constants.O_SYNC);
syncInvokes = { requests, responses };
}
return syncInvokes;
}
/**
* engine is a live connection to the engine, used for logging, etc. (lazily initialized).
*/
@ -226,6 +252,7 @@ function loadOptions(): Options {
engineAddr: process.env["PULUMI_NODEJS_ENGINE"],
testModeEnabled: (process.env["PULUMI_TEST_MODE"] === "true"),
legacyApply: (process.env["PULUMI_ENABLE_LEGACY_APPLY"] === "true"),
syncDir: process.env["PULUMI_NODEJS_SYNC"],
};
}

View file

@ -16,7 +16,7 @@
import * as assert from "assert";
import { asyncTest } from "./util";
import { promiseResult, liftProperties } from "../utils";
import { promiseResult } from "../utils";
describe("deasync", () => {
it("handles simple promise", () => {
@ -56,53 +56,4 @@ describe("deasync", () => {
const result = promiseResult(promise);
assert.equal(result, actual);
});
it("lift properties", asyncTest(async () => {
const actual = { a: "foo", b: 4, c: true, d: [function() {}] };
const promise = new Promise<typeof actual>((resolve) => {
resolve(actual);
});
const combinedResult = liftProperties(promise);
// check that we've lifted the values properly.
for (const key of Object.keys(actual)) {
const value = (<any>actual)[key];
assert.deepStrictEqual(value, (<any>combinedResult)[key]);
}
// also check that we have a proper promise to work with:
const promiseValue = await combinedResult;
for (const key of Object.keys(actual)) {
const value = (<any>actual)[key];
assert.deepStrictEqual(value, (<any>promiseValue)[key]);
}
// also ensure that .then works
await combinedResult.then(v => {
for (const key of Object.keys(actual)) {
const value = (<any>actual)[key];
assert.deepStrictEqual(value, (<any>v)[key]);
}
});
}));
it("lift properties throws", () => {
const message = "etc";
const promise = new Promise<number>((resolve, reject) => {
reject(new Error(message));
});
try {
const result = liftProperties(promise);
assert.fail("Should not be able to reach here 1.")
}
catch (err) {
assert.equal(err.message, message);
return;
}
assert.fail("Should not be able to reach here 2.")
});
});

View file

@ -6,12 +6,34 @@ let pulumi = require("../../../../../");
let args = {
a: "hello",
b: true,
c: [ 0.99, 42, { z: "x" } ],
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result = pulumi.runtime.invoke("invoke:index:echo", args);
result.then((v) => {
let result1 = pulumi.runtime.invoke("invoke:index:echo", args);
// When invoking synchronously: Ensure the properties come back synchronously and are present on the
// result.
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
// When invoking synchronously: Ensure the properties are available asynchronously through normal
// Promise semantics.
result1.then(v => {
assert.deepEqual(v, args);
});
let result2 = pulumi.runtime.invoke("invoke:index:echo", args, { async: true });
// When invoking asynchronously: Ensure the properties are *not* present on the result.
for (const key in args) {
assert.notDeepEqual(result2[key], args[key]);
}
// When invoking asynchronously: Ensure the properties are available asynchronously through normal
// Promise semantics.
result2.then(v => {
assert.deepEqual(v, args);
});

View file

@ -10,11 +10,13 @@ new MyResource("testResource", "0.19.1");
new MyResource("testResource2", "0.19.2");
new MyResource("testResource3");
pulumi.runtime.invoke("invoke:index:doit", {}, { version: "0.19.1" });
pulumi.runtime.invoke("invoke:index:doit_v2", {}, { version: "0.19.2" });
pulumi.runtime.invoke("invoke:index:doit_noversion", {});
pulumi.runtime.invoke("invoke:index:doit", {}, { version: "0.19.1" }, { async: true });
pulumi.runtime.invoke("invoke:index:doit_v2", {}, { version: "0.19.2" }, { async: true });
pulumi.runtime.invoke("invoke:index:doit_noversion", {}, { async: true });
new pulumi.CustomResource("test:index:ReadResource", "foo", {}, { id: "readme", version: "0.20.0" });
new pulumi.CustomResource("test:index:ReadResource", "foo_noversion", {}, { id: "readme" });

View file

@ -0,0 +1,35 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
const provider = new Provider("p");
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { provider });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { provider });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { provider, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});

View file

@ -0,0 +1,42 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
class Resource extends pulumi.CustomResource {
constructor(name, opts) {
super("test:index:Resource", name, {}, opts)
}
}
const provider = new Provider("p");
const parent = new Resource("r", { provider })
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { parent });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { parent });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { parent, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});

View file

@ -0,0 +1,38 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
(async () => {
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
const provider = new Provider("p");
await pulumi.ProviderResource.register(provider);
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { provider });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { provider });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { provider, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});
})();

View file

@ -0,0 +1,46 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
(async () => {
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
class Resource extends pulumi.CustomResource {
constructor(name, opts) {
super("test:index:Resource", name, {}, opts)
}
}
const provider = new Provider("p");
await pulumi.ProviderResource.register(provider);
const parent = new Resource("r", { provider })
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { parent });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { parent });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { parent, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});
})();

View file

@ -46,7 +46,7 @@ interface RunCase {
};
skipRootResourceEndpoints?: boolean;
showRootResourceRegistration?: boolean;
invoke?: (ctx: any, tok: string, args: any, version: string) => { failures: any, ret: any };
invoke?: (ctx: any, tok: string, args: any, version: string, provider: string) => { failures: any, ret: any };
readResource?: (ctx: any, t: string, name: string, id: string, par: string, state: any, version: string) => {
urn: URN | undefined, props: any | undefined,
};
@ -303,7 +303,8 @@ describe("rpc", () => {
"invoke": {
program: path.join(base, "009.invoke"),
expectResourceCount: 0,
invoke: (ctx: any, tok: string, args: any, version: string) => {
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "");
assert.strictEqual(tok, "invoke:index:echo");
assert.deepEqual(args, {
a: "hello",
@ -897,10 +898,92 @@ describe("rpc", () => {
});
},
},
"provider_invokes": {
program: path.join(base, "060.provider_invokes"),
expectResourceCount: 1,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any) => {
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
"provider_in_parent_invokes": {
program: path.join(base, "061.provider_in_parent_invokes"),
expectResourceCount: 2,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any, dependencies?: string[],
custom?: boolean, protect?: boolean, parent?: string, provider?: string) => {
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
"providerref_invokes": {
program: path.join(base, "062.providerref_invokes"),
expectResourceCount: 1,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any) => {
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
"providerref_in_parent_invokes": {
program: path.join(base, "063.providerref_in_parent_invokes"),
expectResourceCount: 2,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any, dependencies?: string[],
custom?: boolean, protect?: boolean, parent?: string, provider?: string) => {
if (name === "c") {
assert.equal(provider, "");
}
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
};
for (const casename of Object.keys(cases)) {
// if (casename !== "stack_exports") {
// if (casename.indexOf("provider_in_parent_invokes") < 0) {
// continue;
// }
@ -925,7 +1008,7 @@ describe("rpc", () => {
const args: any = req.getArgs().toJavaScript();
const version: string = req.getVersion();
const { failures, ret } =
opts.invoke(ctx, req.getTok(), args, version);
opts.invoke(ctx, req.getTok(), args, version, req.getProvider());
resp.setFailuresList(failures);
resp.setReturn(gstruct.Struct.fromJavaScript(ret));
}

View file

@ -94,31 +94,12 @@ export function promiseResult<T>(promise: Promise<T>): T {
}
/**
* Lifts the properties of the value a promise resolves to and adds them to the promise itself. This
* can be used to take a function that was previously async (i.e. Promise-returning) and make it
* synchronous in a backward compatible fashion. Specifically, because the function now returns a
* `Promise<T> & T` all properties on it will be available for sync consumers, while async consumers
* can still use `await` on it or call `.then(...)` on it.
* No longer supported. This function is now a no-op and will directly return the promise passed
* into it.
*
* This is an advanced compat function for libraries and should not generally be used by normal
* Pulumi application.
*/
export function liftProperties<T>(promise: Promise<T>, opts: InvokeOptions = {}): Promise<T> & T {
if (opts.async) {
// Caller just wants the async side of the result. That's what we have, so just return it
// as is.
//
// Note: this cast isn't actually safe (since 'promise' doesn't actually provide the T side
// of things). That's ok. By default the return signature will be correct, and users will
// only get into this code path when specifically trying to force asynchrony. Given that,
// it's fine to expect them to have to know what they're doing and that they shoud only use
// the Promise side of things.
return <Promise<T> & T>promise;
}
// Caller wants the async side and the sync side merged. Block on getting the underlying
// promise value, then take all the properties from it and copy over onto the promise itself and
// return the combined set of each.
const value = promiseResult(promise);
return Object.assign(promise, value);
return <any>promise;
}