Compare commits

...

68 commits

Author SHA1 Message Date
Cyrus Najmabadi 009a7cc428 Approach without providerrefs. 2019-10-14 13:19:37 -07:00
Cyrus Najmabadi 1e759dcc69 Map the shape over. 2019-10-14 13:05:41 -07:00
Cyrus Najmabadi a773ca5636 One line' 2019-10-14 12:58:01 -07:00
Cyrus Najmabadi acb380aa49 Be explicit 2019-10-14 12:53:20 -07:00
Cyrus Najmabadi 3451075c07 rename 2019-10-14 12:52:07 -07:00
Cyrus Najmabadi a9ea68e882 Simplify impl 2019-10-14 12:47:27 -07:00
Cyrus Najmabadi 61b3a349c4 Simplify pipe connection 2019-10-14 11:49:51 -07:00
Cyrus Najmabadi a62a76ff69 Warn with old versions of pulumi 2019-10-14 11:28:15 -07:00
Cyrus Najmabadi e5c785d5ae make internal 2019-10-14 10:59:49 -07:00
Cyrus Najmabadi 5974ef8498 Fix 2019-10-12 16:06:02 -07:00
Cyrus Najmabadi 7b2a977c5c Docs 2019-10-12 15:57:35 -07:00
Cyrus Najmabadi 156a86464d Simplify 2019-10-12 15:53:37 -07:00
Cyrus Najmabadi 8544822f7f Unify code 2019-10-12 15:49:10 -07:00
Cyrus Najmabadi af7019ef3c Simplify strings 2019-10-12 15:34:48 -07:00
Cyrus Najmabadi 0eb593a3e0 Docs 2019-10-12 15:31:33 -07:00
Cyrus Najmabadi a7909d0eca Simplify 2019-10-12 15:17:44 -07:00
Cyrus Najmabadi 1d9d56b88b Update CL. 2019-10-12 14:44:55 -07:00
Cyrus Najmabadi a2ac118f64 Test safety 2019-10-11 15:03:27 -07:00
Cyrus Najmabadi 01db2c428e Update test. 2019-10-11 14:53:27 -07:00
CyrusNajmabadi ed3204516f
Update sdk/nodejs/runtime/invoke.ts 2019-10-11 14:48:05 -07:00
Cyrus Najmabadi ce3a2e84f9 Docs 2019-10-11 14:35:17 -07:00
Cyrus Najmabadi 750ff466f3 Docs 2019-10-11 14:34:59 -07:00
Cyrus Najmabadi 91031910cd Docs 2019-10-11 14:31:27 -07:00
CyrusNajmabadi 920942a91f
Update sdk/nodejs/cmd/pulumi-language-nodejs/proxy.go 2019-10-11 14:25:31 -07:00
Cyrus Najmabadi d8acb5da9c Docs 2019-10-11 14:24:43 -07:00
Cyrus Najmabadi 33ff46b225 Working windows pipes 2019-10-11 14:14:54 -07:00
Cyrus Najmabadi cbd4197ee3 Linting 2019-10-11 13:32:48 -07:00
Cyrus Najmabadi 745fdcddc2 Linting 2019-10-11 13:29:49 -07:00
Cyrus Najmabadi 0f87ac7793 Support existing pattern for providers calling into us 2019-10-11 13:17:08 -07:00
Cyrus Najmabadi af0ad59da5 Logging 2019-10-11 12:51:15 -07:00
Cyrus Najmabadi 52ef1ef13c Logging 2019-10-11 12:50:18 -07:00
Cyrus Najmabadi 001eff8a2f Cleanup 2019-10-11 12:32:10 -07:00
Cyrus Najmabadi 6d8ad3537f Revert 2019-10-11 12:30:28 -07:00
Cyrus Najmabadi 437a5be768 Docs 2019-10-11 12:29:52 -07:00
Cyrus Najmabadi 72ca005879 Break out windows/unix specific pipe creation. 2019-10-11 12:23:45 -07:00
Cyrus Najmabadi 68feff2b96 Break out pipes into windows/unix specific code. 2019-10-11 11:34:05 -07:00
Cyrus Najmabadi 30e5166295 Pass along a context. 2019-10-10 21:28:04 -07:00
Cyrus Najmabadi 43dfef68a4 Copyright 2019-10-10 21:23:06 -07:00
Cyrus Najmabadi 1e693e5180 docs 2019-10-10 21:21:47 -07:00
Cyrus Najmabadi 7374936d9f lint 2019-10-10 21:15:28 -07:00
Cyrus Najmabadi 288e6bc2a8 Fixup tests 2019-10-10 21:03:40 -07:00
Cyrus Najmabadi a3d8615ea2 Update tests 2019-10-10 19:51:45 -07:00
Cyrus Najmabadi 127f8df950 Add async/sync invoke tests. 2019-10-10 18:42:50 -07:00
Cyrus Najmabadi 47749a826c Keep api shape for back compat 2019-10-10 18:27:02 -07:00
Cyrus Najmabadi 6358d33359 Update name. 2019-10-10 18:10:17 -07:00
Cyrus Najmabadi 01deb91e28 Update name. 2019-10-10 18:09:24 -07:00
Cyrus Najmabadi 720d4ecfd8 Update name. 2019-10-10 18:08:42 -07:00
Cyrus Najmabadi c458d932eb Lint 2019-10-10 18:07:07 -07:00
Cyrus Najmabadi 23558e1284 Fix up sync invokes 2019-10-10 17:51:28 -07:00
Cyrus Najmabadi 26b7b4d142 Restore old sig 2019-10-10 17:18:41 -07:00
Cyrus Najmabadi 6994a1a07a Provide entrypoint that switches accordingly 2019-10-10 16:29:54 -07:00
Cyrus Najmabadi dd2e53d2f5 Properly grab provider ref in new sync invoke method. 2019-10-10 16:11:00 -07:00
Cyrus Najmabadi 16e1e2393b Actually check for unserializable code. 2019-10-10 16:02:52 -07:00
Cyrus Najmabadi 1aff4eea78 Clarity 2019-10-10 15:53:28 -07:00
Cyrus Najmabadi a47d661d9c Cleanup 2019-10-10 15:52:49 -07:00
Cyrus Najmabadi 29249aae43 Linting 2019-10-10 15:50:20 -07:00
Cyrus Najmabadi 05d88ea75b Thread along errors in serving 2019-10-10 15:49:16 -07:00
Cyrus Najmabadi e9ea17e36f Thread along errors in serving 2019-10-10 15:47:18 -07:00
Cyrus Najmabadi 452880c1ba Thread along errors in serving 2019-10-10 15:41:40 -07:00
Cyrus Najmabadi 7bf9c000d5 Thread along errors in serving 2019-10-10 15:33:44 -07:00
Cyrus Najmabadi 04bc60264e Work 2019-10-10 15:07:15 -07:00
Pat Gavlin e3712a6370 Add a way for the nodejs sdk to perform invokes in a purely synchronous fashion to the engine. 2019-10-10 13:50:18 -07:00
Cyrus Najmabadi cc8d3c549e Update CL 2019-10-10 13:34:50 -07:00
Cyrus Najmabadi b6b3dd2a8e Update CL 2019-10-10 13:34:35 -07:00
Cyrus Najmabadi 09d51bc9d1 Update CL 2019-10-10 13:33:50 -07:00
Cyrus Najmabadi 8d4290154d Update CL 2019-10-10 13:33:09 -07:00
Cyrus Najmabadi f16ca21445 Add the concept of a ProviderRef to refer to a provider. 2019-10-10 13:11:37 -07:00
Cyrus Najmabadi f21379799b Add ProviderRef concept 2019-10-10 12:36:39 -07:00
23 changed files with 1212 additions and 182 deletions

View file

@ -3,6 +3,43 @@ CHANGELOG
## HEAD (Unreleased)
## IMPORTANT - COMPAT
- Mitigate issue causing [crashes](https://github.com/pulumi/pulumi/issues/3260) and
[hangs](https://github.com/pulumi/pulumi/issues/3309) across many OSs (primarily CentOS and macOS)
and several versions of nodejs (primarily 12.12 and up).
The issue occurs when making a 'data source' call in Pulumi in a synchronous fashion. i.e. code
like:
```ts
// A call to some provider's `getXXX` data source function.
const lb = aws.lb.getLoadBalancer(...);
```
The issue is mitigated such that it should occur much less for users in practice. If your
data-source call does not pass in a `parent` or `provider` the issue should not appear anymore,
and you should not have to make any changes to your code.
If your data-source does pass in either of these values, this issue should only occur rarely. If
it no longer occurs for you, you should not have to make any changes to your code. However, if
you are still running into crashes or hangs on OSX, it is recommended to take the following
approach to fixing the issue:
```ts
// Update the code where you create your provider from the following:
const provider = new aws.Provider(...);
// to:
const provider = await ProviderRef.get(new aws.Provider(...));
```
This will now be a `ProviderRef` instead of a `Provider`. However, it should be accepted with all
the latest Pulumi libraries anywhere a `Provider` was previously accepted.
In a future version, Pulumi libraries *may* be updated to no longer accept a `Provider` to help
ensure this issue doesn't occur at all.
## 1.3.1 (2019-10-09)
- Revert "propagate resource inputs to resource state during preview". These changes had a critical issue that needs
@ -20,7 +57,7 @@ CHANGELOG
- Support renaming stack projects via `pulumi stack rename`.
[#3292](https://github.com/pulumi/pulumi/pull/3292)
- Make the location of `.pulumi` folder configurable with an environment variable.
[#3300](https://github.com/pulumi/pulumi/pull/3300) (Fixes [#2966](https://github.com/pulumi/pulumi/issues/2966))

2
go.mod
View file

@ -5,6 +5,7 @@ go 1.12
require (
cloud.google.com/go v0.39.0
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Microsoft/go-winio v0.4.14
github.com/Sirupsen/logrus v1.0.5 // indirect
github.com/aws/aws-sdk-go v1.19.16
github.com/blang/semver v3.5.1+incompatible
@ -42,7 +43,6 @@ require (
github.com/sergi/go-diff v1.0.0
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.3.0 // indirect
github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c
github.com/spf13/cast v1.2.0
github.com/spf13/cobra v0.0.3

5
go.sum
View file

@ -31,6 +31,8 @@ github.com/Azure/go-autorest/tracing v0.1.0 h1:TRBxC5Pj/fIuh4Qob0ZpkggbfT8RC0Sub
github.com/Azure/go-autorest/tracing v0.1.0/go.mod h1:ROEEAFwXycQw7Sn3DXNtEedEvdeRAgDr0izn4z5Ij88=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190418212003-6ac0b49e7197/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/OpenPeeDeeP/depguard v0.0.0-20180806142446-a69c782687b2 h1:HTOmFEEYrWi4MW5ZKUx6xfeyM10Sx3kQF65xiQJMPYA=
github.com/OpenPeeDeeP/depguard v0.0.0-20180806142446-a69c782687b2/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
@ -381,6 +383,7 @@ github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c h1:fyKiXKO1/I/B6Y2U8T7WdQGWzwehOuGIrljPtt7YTTI=
github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c/go.mod h1:sUM3LWHvSMaG192sy56D9F7CNvL7jUJVXoqM1QKLnog=
github.com/spf13/afero v1.1.0 h1:bopulORc2JeYaxfHLvJa5NzxviA9PoWhpiiJkru7Ji4=
@ -488,6 +491,8 @@ golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b h1:ag/x1USPSsqHud38I9BAC88qdNLDHHtQ4mlgQIZPPNA=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View file

@ -42,6 +42,8 @@ import (
"github.com/hashicorp/go-multierror"
pbempty "github.com/golang/protobuf/ptypes/empty"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/resource/config"
"github.com/pulumi/pulumi/pkg/util/cmdutil"
@ -362,75 +364,181 @@ func getPluginVersion(info packageJSON) (string, error) {
return version, nil
}
// When talking to the nodejs runtime we have three parties involved:
//
// Engine Monitor <==> Language Host (this code) <==> NodeJS sdk runtime.
//
// Instead of having the NodeJS sdk runtime communicating directly with the Engine Monitor, we
// instead have it communicate with us and we send all those messages to the real engine monitor
// itself. We do that by having ourselves launch our own grpc monitor server and passing the
// address of it to the NodeJS runtime. As far as the NodeJS sdk runtime is concerned, it is
// communicating directly with the engine.
//
// When NodeJS then communicates back with us over our server, we then forward the messages
// along untouched to the Engine Monitor. However, we also open an additional *non-grpc*
// channel to allow the sdk runtime to send us messages on. Specifically, this non-grpc channel
// is used entirely to allow the sdk runtime to make 'invoke' calls in a synchronous fashion.
// This is accomplished by avoiding grpc entirely (which has no facility for synchronous rpc
// calls), and instead operating over a pair of files coordinated between us and the sdk
// runtime. One file is used by it to send us messages, and one file is used by us to send
// messages back. Because these are just files, nodejs natively supports allowing both sides to
// read and write from each synchronously.
//
// When we receive the sync-invoke messages from the nodejs sdk we deserialize things off of the
// file and then make a synchronous call to the real engine `invoke` monitor endpoint. Unlike
// nodejs, we have no problem calling this synchronously, and can block until we get the
// response which we can then synchronously send to nodejs.
// RPC endpoint for LanguageRuntimeServer::Run
func (host *nodeLanguageHost) Run(ctx context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
args := host.constructArguments(req)
config, err := host.constructConfig(req)
tracingSpan := opentracing.SpanFromContext(ctx)
// Make a connection to the real monitor that we will forward messages to.
conn, err := grpc.Dial(req.GetMonitorAddress(), grpc.WithInsecure())
if err != nil {
err = errors.Wrap(err, "failed to serialize configuration")
return nil, err
}
env := os.Environ()
env = append(env, pulumiConfigVar+"="+config)
// Make a client around that connection. We can then make our own server that will act as a
// monitor for the sdk and forward to the real monitor.
target := pulumirpc.NewResourceMonitorClient(conn)
if host.typescript {
env = append(env, "PULUMI_NODEJS_TYPESCRIPT=true")
// Channel to control the server lifetime. Once `Run` finishes, we'll shutdown the server.
serverCancel := make(chan bool)
defer func() {
serverCancel <- true
close(serverCancel)
}()
// Launch the rpc server giving it the real monitor to forward messages to.
port, serverDone, err := rpcutil.Serve(0, serverCancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, &monitorProxy{target})
return nil
},
}, tracingSpan)
if err != nil {
return nil, err
}
if logging.V(5) {
commandStr := strings.Join(args, " ")
logging.V(5).Infoln("Language host launching process: ", host.nodeBin, commandStr)
// Create the pipes we'll use to communicate synchronously with the nodejs process. Once we're
// done using the pipes clean them up so we don't leave anything around in the user file system.
pipes, pipesDone, err := createAndServePipes(ctx, target)
if err != nil {
return nil, err
}
// Now simply spawn a process to execute the requested program, wiring up stdout/stderr directly.
var errResult string
cmd := exec.Command(host.nodeBin, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = env
if err := cmd.Run(); err != nil {
// NodeJS stdout is complicated enough that we should explicitly flush stdout and stderr here. NodeJS does
// process writes using console.out and console.err synchronously, but it does not process writes using
// `process.stdout.write` or `process.stderr.write` synchronously, and it is possible that there exist unflushed
// writes on those file descriptors at the time that the Node process exits.
//
// Because of this, we explicitly flush stdout and stderr so that we are absolutely sure that we capture any
// error messages in the engine.
contract.IgnoreError(os.Stdout.Sync())
contract.IgnoreError(os.Stderr.Sync())
if exiterr, ok := err.(*exec.ExitError); ok {
// If the program ran, but exited with a non-zero error code. This will happen often,
// since user errors will trigger this. So, the error message should look as nice as
// possible.
if status, stok := exiterr.Sys().(syscall.WaitStatus); stok {
// Check if we got special exit code that means "we already gave the user an
// actionable message". In that case, we can simply bail out and terminate `pulumi`
// without showing any more messages.
if status.ExitStatus() == nodeJSProcessExitedAfterShowingUserActionableMessage {
return &pulumirpc.RunResponse{Error: "", Bail: true}, nil
}
// Channel producing the final response we want to issue to our caller. Will get the result of
// the actual nodejs process we launch, or any results caused by errors in our server/pipes.
responseChannel := make(chan *pulumirpc.RunResponse)
defer close(responseChannel)
err = errors.Errorf("Program exited with non-zero exit code: %d", status.ExitStatus())
} else {
err = errors.Wrapf(exiterr, "Program exited unexpectedly")
}
} else {
// Otherwise, we didn't even get to run the program. This ought to never happen unless there's
// a bug or system condition that prevented us from running the language exec. Issue a scarier error.
err = errors.Wrapf(err, "Problem executing program (could not run language executor)")
// Forward any rpc server or pipe errors to our output channel.
go func() {
err := <-serverDone
if err != nil {
responseChannel <- &pulumirpc.RunResponse{Error: err.Error()}
}
}()
go func() {
err := <-pipesDone
if err != nil {
responseChannel <- &pulumirpc.RunResponse{Error: err.Error()}
}
}()
// now, launch the nodejs process and actually run the user code in it.
go host.execNodejs(responseChannel, req, fmt.Sprintf("127.0.0.1:%d", port), pipes.directory())
// Wait for one of our launched goroutines to signal that we're done. This might be our proxy
// (in the case of errors), or the launched nodejs completing (either successfully, or with
// errors).
return <-responseChannel, nil
}
// Launch the nodejs process and wait for it to complete. Report success or any errors using the
// `responseChannel` arg.
func (host *nodeLanguageHost) execNodejs(
responseChannel chan<- *pulumirpc.RunResponse, req *pulumirpc.RunRequest,
address, pipesDirectory string) {
// Actually launch nodejs and process the result of it into an appropriate response object.
response := func() *pulumirpc.RunResponse {
args := host.constructArguments(req, address, pipesDirectory)
config, err := host.constructConfig(req)
if err != nil {
err = errors.Wrap(err, "failed to serialize configuration")
return &pulumirpc.RunResponse{Error: err.Error()}
}
errResult = err.Error()
}
env := os.Environ()
env = append(env, pulumiConfigVar+"="+config)
return &pulumirpc.RunResponse{Error: errResult}, nil
if host.typescript {
env = append(env, "PULUMI_NODEJS_TYPESCRIPT=true")
}
if logging.V(5) {
commandStr := strings.Join(args, " ")
logging.V(5).Infoln("Language host launching process: ", host.nodeBin, commandStr)
}
// Now simply spawn a process to execute the requested program, wiring up stdout/stderr directly.
var errResult string
cmd := exec.Command(host.nodeBin, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = env
if err := cmd.Run(); err != nil {
// NodeJS stdout is complicated enough that we should explicitly flush stdout and stderr here. NodeJS does
// process writes using console.out and console.err synchronously, but it does not process writes using
// `process.stdout.write` or `process.stderr.write` synchronously, and it is possible that there exist unflushed
// writes on those file descriptors at the time that the Node process exits.
//
// Because of this, we explicitly flush stdout and stderr so that we are absolutely sure that we capture any
// error messages in the engine.
contract.IgnoreError(os.Stdout.Sync())
contract.IgnoreError(os.Stderr.Sync())
if exiterr, ok := err.(*exec.ExitError); ok {
// If the program ran, but exited with a non-zero error code. This will happen often,
// since user errors will trigger this. So, the error message should look as nice as
// possible.
if status, stok := exiterr.Sys().(syscall.WaitStatus); stok {
// Check if we got special exit code that means "we already gave the user an
// actionable message". In that case, we can simply bail out and terminate `pulumi`
// without showing any more messages.
if status.ExitStatus() == nodeJSProcessExitedAfterShowingUserActionableMessage {
return &pulumirpc.RunResponse{Error: "", Bail: true}
}
err = errors.Errorf("Program exited with non-zero exit code: %d", status.ExitStatus())
} else {
err = errors.Wrapf(exiterr, "Program exited unexpectedly")
}
} else {
// Otherwise, we didn't even get to run the program. This ought to never happen unless there's
// a bug or system condition that prevented us from running the language exec. Issue a scarier error.
err = errors.Wrapf(err, "Problem executing program (could not run language executor)")
}
errResult = err.Error()
}
return &pulumirpc.RunResponse{Error: errResult}
}()
// notify our caller of the response we got from the nodejs process. Note: this is done
// unilaterally. this is how we signal to nodeLanguageHost.Run that we are done and it can
// return to its caller.
responseChannel <- response
}
// constructArguments constructs a command-line for `pulumi-language-nodejs`
// by enumerating all of the optional and non-optional arguments present
// in a RunRequest.
func (host *nodeLanguageHost) constructArguments(req *pulumirpc.RunRequest) []string {
func (host *nodeLanguageHost) constructArguments(req *pulumirpc.RunRequest, address, pipesDirectory string) []string {
args := []string{host.runPath}
maybeAppendArg := func(k, v string) {
if v != "" {
@ -438,8 +546,9 @@ func (host *nodeLanguageHost) constructArguments(req *pulumirpc.RunRequest) []st
}
}
maybeAppendArg("monitor", req.GetMonitorAddress())
maybeAppendArg("monitor", address)
maybeAppendArg("engine", host.engineAddress)
maybeAppendArg("sync", pipesDirectory)
maybeAppendArg("project", req.GetProject())
maybeAppendArg("stack", req.GetStack())
maybeAppendArg("pwd", req.GetPwd())

View file

@ -28,7 +28,7 @@ func TestArgumentConstruction(t *testing.T) {
t.Run("DryRun-NoArguments", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{DryRun: true}
args := host.constructArguments(rr)
args := host.constructArguments(rr, &monitorProxy{})
assert.Contains(tt, args, "--dry-run")
assert.NotContains(tt, args, "true")
})
@ -36,28 +36,28 @@ func TestArgumentConstruction(t *testing.T) {
t.Run("OptionalArgs-PassedIfSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{Project: "foo"}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, &monitorProxy{}), " ")
assert.Contains(tt, args, "--project foo")
})
t.Run("OptionalArgs-NotPassedIfNotSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, &monitorProxy{}), " ")
assert.NotContains(tt, args, "--stack")
})
t.Run("DotIfProgramNotSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, &monitorProxy{}), " ")
assert.Contains(tt, args, ".")
})
t.Run("ProgramIfProgramSpecified", func(tt *testing.T) {
host := &nodeLanguageHost{}
rr := &pulumirpc.RunRequest{Program: "foobar"}
args := strings.Join(host.constructArguments(rr), " ")
args := strings.Join(host.constructArguments(rr, &monitorProxy{}), " ")
assert.Contains(tt, args, "foobar")
})
}

View file

@ -0,0 +1,187 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/binary"
"io"
pbempty "github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"github.com/pulumi/pulumi/pkg/util/logging"
pulumirpc "github.com/pulumi/pulumi/sdk/proto/go"
)
// pipes is the platform agnostic abstraction over a pair of channels we can read and write binary
// data over. It is provided through the `createPipes` functions provided in `proxy_unix.go` (where
// it is implemented on top of fifo files), and in `proxy_windows.go` (where it is implemented on
// top of named pipes).
type pipes interface {
// The directory containing the two streams to read and write from. This will be passed to the
// nodejs process so it can connect to our read and writes streams for communication.
directory() string
// Attempt to create and connect to the read and write streams.
connect() error
// The stream that we will use to read in requests send to us by the nodejs process.
reader() io.Reader
// The stream we will write responses back to the nodejs process with.
writer() io.Writer
// called when we're done with the pipes and want to clean up any os resources we may have
// allocated (for example, actual files and directories on disk).
shutdown()
}
func createAndServePipes(ctx context.Context, target pulumirpc.ResourceMonitorClient) (pipes, chan error, error) {
pipes, err := createPipes()
if err != nil {
return nil, nil, err
}
pipesDone := servePipes(ctx, pipes, target)
return pipes, pipesDone, nil
}
func servePipes(ctx context.Context, pipes pipes, target pulumirpc.ResourceMonitorClient) chan error {
done := make(chan error)
go func() {
// Keep reading and writing from the pipes until we run into an error or are canceled.
err := func() error {
pbcodec := encoding.GetCodec(proto.Name)
err := pipes.connect()
if err != nil {
logging.V(10).Infof("Sync invoke: Error connecting to pipes: %s\n", err)
return err
}
for {
// read a 4-byte request length
logging.V(10).Infoln("Sync invoke: Reading length from request pipe")
var reqLen uint32
if err := binary.Read(pipes.reader(), binary.BigEndian, &reqLen); err != nil {
// This is benign on shutdown.
if err == io.EOF {
// We were asked to gracefully cancel. Just exit now.
logging.V(10).Infof("Sync invoke: Gracefully shutting down")
return nil
}
logging.V(10).Infof("Sync invoke: Received error reading length from pipe: %s\n", err)
return err
}
// read the request in full
logging.V(10).Infoln("Sync invoke: Reading message from request pipe")
reqBytes := make([]byte, reqLen)
if _, err := io.ReadFull(pipes.reader(), reqBytes); err != nil {
logging.V(10).Infof("Sync invoke: Received error reading message from pipe: %s\n", err)
return err
}
// decode and dispatch the request
logging.V(10).Infof("Sync invoke: Unmarshalling request")
var req pulumirpc.InvokeRequest
if err := pbcodec.Unmarshal(reqBytes, &req); err != nil {
logging.V(10).Infof("Sync invoke: Received error reading full from pipe: %s\n", err)
return err
}
logging.V(10).Infof("Sync invoke: Invoking: %s", req.GetTok())
res, err := target.Invoke(ctx, &req)
if err != nil {
logging.V(10).Infof("Sync invoke: Received error invoking: %s\n", err)
return err
}
// encode the response
logging.V(10).Infof("Sync invoke: Marshalling response")
resBytes, err := pbcodec.Marshal(res)
if err != nil {
logging.V(10).Infof("Sync invoke: Received error marshalling: %s\n", err)
return err
}
// write the 4-byte response length
logging.V(10).Infoln("Sync invoke: Writing length to request pipe")
if err := binary.Write(pipes.writer(), binary.BigEndian, uint32(len(resBytes))); err != nil {
logging.V(10).Infof("Sync invoke: Error writing length to pipe: %s\n", err)
return err
}
// write the response in full
logging.V(10).Infoln("Sync invoke: Writing message to request pipe")
if _, err := pipes.writer().Write(resBytes); err != nil {
logging.V(10).Infof("Sync invoke: Error writing message to pipe: %s\n", err)
return err
}
}
}()
// Signal our caller that we're done.
done <- err
close(done)
// cleanup any resources the pipes were holding onto.
pipes.shutdown()
}()
return done
}
// Forward all resource monitor calls that we're serving to nodejs back to the engine to actually
// perform.
type monitorProxy struct {
target pulumirpc.ResourceMonitorClient
}
func (p *monitorProxy) Invoke(
ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
return p.target.Invoke(ctx, req)
}
func (p *monitorProxy) ReadResource(ctx context.Context,
req *pulumirpc.ReadResourceRequest) (*pulumirpc.ReadResourceResponse, error) {
return p.target.ReadResource(ctx, req)
}
func (p *monitorProxy) RegisterResource(ctx context.Context,
req *pulumirpc.RegisterResourceRequest) (*pulumirpc.RegisterResourceResponse, error) {
return p.target.RegisterResource(ctx, req)
}
func (p *monitorProxy) RegisterResourceOutputs(
ctx context.Context, req *pulumirpc.RegisterResourceOutputsRequest) (*pbempty.Empty, error) {
return p.target.RegisterResourceOutputs(ctx, req)
}
func (p *monitorProxy) SupportsFeature(
ctx context.Context, req *pulumirpc.SupportsFeatureRequest) (*pulumirpc.SupportsFeatureResponse, error) {
return p.target.SupportsFeature(ctx, req)
}

View file

@ -0,0 +1,103 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//+build !windows
package main
import (
"io"
"io/ioutil"
"os"
"path"
"syscall"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/util/logging"
)
// Unix specific pipe implementation. Fairly simple as it sits on top of a pair of standard fifo
// files that we can communicate over. Slightly complex as this involves extra cleanup steps to
// ensure they're cleaned up when we're done.
func createPipes() (pipes, error) {
dir, err := ioutil.TempDir("", "pulumi-node-pipes")
if err != nil {
return nil, err
}
invokeReqPath, invokeResPath := path.Join(dir, "invoke_req"), path.Join(dir, "invoke_res")
return &unixPipes{
dir: dir,
reqPath: invokeReqPath,
resPath: invokeResPath,
}, nil
}
type unixPipes struct {
dir string
reqPath, resPath string
reqPipe, resPipe *os.File
}
func (p *unixPipes) directory() string {
return p.dir
}
func (p *unixPipes) reader() io.Reader {
return p.reqPipe
}
func (p *unixPipes) writer() io.Writer {
return p.resPipe
}
func (p *unixPipes) connect() error {
if err := syscall.Mkfifo(path.Join(p.dir, "invoke_req"), 0600); err != nil {
logging.V(10).Infof("createPipes: Received error opening request pipe: %s\n", err)
return err
}
if err := syscall.Mkfifo(path.Join(p.dir, "invoke_res"), 0600); err != nil {
logging.V(10).Infof("createPipes: Received error opening result pipe: %s\n", err)
return err
}
invokeReqPipe, err := os.OpenFile(p.reqPath, os.O_RDONLY, 0)
if err != nil {
return err
}
p.reqPipe = invokeReqPipe
invokeResPipe, err := os.OpenFile(p.resPath, os.O_WRONLY, 0)
if err != nil {
return err
}
p.resPipe = invokeResPipe
return nil
}
func (p *unixPipes) shutdown() {
if p.reqPipe != nil {
contract.IgnoreClose(p.reqPipe)
contract.IgnoreError(os.Remove(p.reqPath))
}
if p.resPipe != nil {
contract.IgnoreClose(p.resPipe)
contract.IgnoreError(os.Remove(p.resPath))
}
contract.IgnoreError(os.Remove(p.dir))
}

View file

@ -0,0 +1,113 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//+build windows
package main
import (
"fmt"
"io"
"net"
"os"
"time"
winio "github.com/Microsoft/go-winio"
"github.com/pulumi/pulumi/pkg/util/logging"
)
// Windows specific pipe implementation. Slightly complex as it sits on top of a pair of named pipes
// that have to asynchronously accept connections to. But also fairly simple as windows will take
// care of cleaning things up once our processes complete.
func createPipes() (pipes, error) {
// Generate a random pipe name so that we don't collide with other pipes made by other pulumi
// instances.
rand := uint32(time.Now().UnixNano() + int64(os.Getpid()))
dir := fmt.Sprintf(`\\.\pipe\pulumi%v`, rand)
reqPipeName := dir + `\invoke_req`
resPipeName := dir + `\invoke_res`
reqListener, err := winio.ListenPipe(reqPipeName, nil)
if err != nil {
logging.V(10).Infof("createPipes: Received error trying to create request pipe %s: %s\n", reqPipeName, err)
return nil, err
}
resListener, err := winio.ListenPipe(resPipeName, nil)
if err != nil {
logging.V(10).Infof("createPipes: Received error trying to create response pipe %s: %s\n", resPipeName, err)
return nil, err
}
return &windowsPipes{
dir: dir,
reqListener: reqListener,
resListener: resListener,
}, nil
}
type windowsPipes struct {
dir string
reqListener, resListener net.Listener
reqConn, resConn net.Conn
}
func (p *windowsPipes) directory() string {
return p.dir
}
func (p *windowsPipes) reader() io.Reader {
return p.reqConn
}
func (p *windowsPipes) writer() io.Writer {
return p.resConn
}
func (p *windowsPipes) connect() error {
acceptDone := make(chan error)
defer close(acceptDone);
go func() {
reqConn, err := p.reqListener.Accept()
if err != nil {
acceptDone <- err
}
p.reqConn = reqConn
acceptDone <- nil
}()
go func() {
resConn, err := p.resListener.Accept()
if err != nil {
acceptDone <- err
}
p.resConn = resConn
acceptDone <- nil
}()
res1 := <-acceptDone
res2 := <-acceptDone
if res1 != nil {
return res1
}
return res2
}
func (p *windowsPipes) shutdown() {
// Don't need to do anything here. Named pipes are cleaned up when all processes that are using
// them terminate.
}

View file

@ -94,6 +94,7 @@ function usage(): void {
console.error(` --pwd=pwd change the working directory before running the program`);
console.error(` --monitor=addr [required] the RPC address for a resource monitor to connect to`);
console.error(` --engine=addr the RPC address for a resource engine to connect to`);
console.error(` --sync=path path to synchronous 'invoke' endpoints`);
console.error(` --tracing=url a Zipkin-compatible endpoint to send tracing data to`);
console.error(``);
console.error(` and [program] is a JavaScript program to run in Node.js, and [arg]... optional args to it.`);
@ -146,6 +147,7 @@ function main(args: string[]): void {
addToEnvIfDefined("PULUMI_NODEJS_PARALLEL", argv["parallel"]);
addToEnvIfDefined("PULUMI_NODEJS_MONITOR", argv["monitor"]);
addToEnvIfDefined("PULUMI_NODEJS_ENGINE", argv["engine"]);
addToEnvIfDefined("PULUMI_NODEJS_SYNC", argv["sync"]);
// Ensure that our v8 hooks have been initialized. Then actually load and run the user program.
v8Hooks.isInitializedAsync().then(() => {

View file

@ -13,9 +13,9 @@
// limitations under the License.
import { util } from "protobufjs";
import { ResourceError, RunError } from "./errors";
import { all, Input, Inputs, interpolate, Output, output } from "./output";
import { getStackResource } from "./runtime";
import { ResourceError } from "./errors";
import { Input, Inputs, interpolate, Output, output } from "./output";
import { getStackResource, unknownValue } from "./runtime";
import { readResource, registerResource, registerResourceOutputs } from "./runtime/resource";
import { getProject, getStack } from "./runtime/settings";
import * as utils from "./utils";
@ -684,6 +684,22 @@ export abstract class ProviderResource extends CustomResource {
/** @internal */
private readonly pkg: string;
/** @internal */
// tslint:disable-next-line: variable-name
public __registrationId?: string;
public static async register(provider: ProviderResource | undefined): Promise<string | undefined> {
if (provider === undefined) {
return undefined;
}
if (!provider.__registrationId) {
provider.__registrationId = `${await provider.urn}::${await provider.id}`;
}
return provider.__registrationId;
}
/**
* Creates and registers a new provider resource for a particular package.
*

View file

@ -12,54 +12,103 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import * as fs from "fs";
import * as grpc from "grpc";
import * as asset from "../asset";
import { InvokeOptions } from "../invoke";
import * as log from "../log";
import { Inputs } from "../output";
import { Inputs, Output } from "../output";
import { debuggablePromise } from "./debuggable";
import { deserializeProperties, serializeProperties, unknownValue } from "./rpc";
import { excessiveDebugOutput, getMonitor, rpcKeepAlive } from "./settings";
import { excessiveDebugOutput, getMonitor, rpcKeepAlive, SyncInvokes, tryGetSyncInvokes } from "./settings";
import { ProviderResource, Resource } from "../resource";
import * as utils from "../utils";
const gstruct = require("google-protobuf/google/protobuf/struct_pb.js");
const providerproto = require("../proto/provider_pb.js");
/**
* invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
* can be a bag of computed values (Ts or Promise<T>s), and the result is a Promise<any> that
* resolves when the invoke finishes.
* `invoke` dynamically invokes the function, `tok`, which is offered by a provider plugin. `invoke`
* behaves differently in the case that options contains `{async:true}` or not.
*
* In the case where `{async:true}` is present in the options bag:
*
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider plugin.
* 2. the `props` inputs can be a bag of computed values (including, `T`s, `Promise<T>`s,
* `Output<T>`s etc.).
*
*
* In the case where `{async:true}` is not present in the options bag:
*
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider call.
* However, that Promise will *also* have the respective values of the Provider result exposed
* directly on it as properties.
*
* 2. The inputs must be a bag of simple values, and the result is the result that the Provider
* produced.
*
* Simple values are:
* 1. `undefined`, `null`, string, number or boolean values.
* 2. arrays of simple values.
* 3. objects containing only simple values.
*
* Importantly, simple values do *not* include:
* 1. `Promise`s
* 2. `Output`s
* 3. `Asset`s or `Archive`s
* 4. `Resource`s.
*
* All of these contain async values that would prevent `invoke from being able to operate
* synchronously.
*/
export async function invoke(tok: string, props: Inputs, opts?: InvokeOptions): Promise<any> {
const label = `Invoking function: tok=${tok}`;
log.debug(label +
excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``);
opts = opts || {};
if (opts.parent && opts.provider === undefined) {
opts.provider = opts.parent.getProvider(tok);
export function invoke(tok: string, props: Inputs, opts: InvokeOptions = {}): Promise<any> {
if (opts.async) {
// Use specifically requested async invoking. Respect that.
return invokeAsync(tok, props, opts);
}
const syncInvokes = tryGetSyncInvokes();
if (!syncInvokes) {
// We weren't launched from a pulumi CLI that supports sync-invokes. Let the user know they
// should update and fall back to synchronously blocking on the async invoke.
return invokeFallbackToAsync(tok, props, opts);
}
return invokeSync(tok, props, opts, syncInvokes);
}
let issuedUpdateWarning: boolean | undefined;
export function invokeFallbackToAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
if (!issuedUpdateWarning) {
issuedUpdateWarning = true;
log.warn(
`Pulumi is out of date. To update to a more recent version see instructions at:
https://www.pulumi.com/docs/get-started/install/`);
}
const asyncResult = invokeAsync(tok, props, opts);
const syncResult = utils.promiseResult(asyncResult);
return createLiftedPromise(syncResult);
}
async function invokeAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
const label = `Invoking function: tok=${tok} asynchronously`;
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
// Wait for all values to be available, and then perform the RPC.
const done = rpcKeepAlive();
try {
const obj = gstruct.Struct.fromJavaScript(
await serializeProperties(`invoke:${tok}`, props));
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(obj)}` : ``);
const serialized = await serializeProperties(`invoke:${tok}`, props);
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``);
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
let providerRef: string | undefined;
if (opts.provider !== undefined) {
const providerURN = await opts.provider.urn.promise();
const providerID = await opts.provider.id.promise() || unknownValue;
providerRef = `${providerURN}::${providerID}`;
}
const provider = await ProviderResource.register(getProvider(tok, opts));
const req = createInvokeRequest(tok, serialized, provider, opts);
const req = new providerproto.InvokeRequest();
req.setTok(tok);
req.setArgs(obj);
req.setProvider(providerRef);
req.setVersion(opts.version || "");
const resp: any = await debuggablePromise(new Promise((innerResolve, innerReject) =>
monitor.invoke(req, (err: grpc.StatusObject, innerResponse: any) => {
log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
@ -80,18 +129,8 @@ export async function invoke(tok: string, props: Inputs, opts?: InvokeOptions):
}
})), label);
// If there were failures, propagate them.
const failures: any = resp.getFailuresList();
if (failures && failures.length) {
let reasons = "";
for (let i = 0; i < failures.length; i++) {
if (reasons !== "") {
reasons += "; ";
}
reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
}
throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
}
// If there were failures, throw them here.
propagateFailures(tok, resp);
// Finally propagate any other properties that were given to us as outputs.
return deserializeProperties(resp.getReturn());
@ -100,3 +139,149 @@ export async function invoke(tok: string, props: Inputs, opts?: InvokeOptions):
done();
}
}
function invokeSync(tok: string, props: any, opts: InvokeOptions, syncInvokes: SyncInvokes): Promise<any> {
const label = `Invoking function: tok=${tok} synchronously`;
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
const serialized = serializePropertiesSync(props);
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``);
const providerRef = getProviderRefSync();
const req = createInvokeRequest(tok, serialized, providerRef, opts);
// Encode the request.
const reqBytes = Buffer.from(req.serializeBinary());
// Write the request length.
const reqLen = Buffer.alloc(4);
reqLen.writeUInt32BE(reqBytes.length, /*offset:*/ 0);
fs.writeSync(syncInvokes.requests, reqLen);
fs.writeSync(syncInvokes.requests, reqBytes);
// Read the response.
const respLenBytes = Buffer.alloc(4);
fs.readSync(syncInvokes.responses, respLenBytes, /*offset:*/ 0, /*length:*/ 4, /*position:*/ null);
const respLen = respLenBytes.readUInt32BE(/*offset:*/ 0);
const respBytes = Buffer.alloc(respLen);
fs.readSync(syncInvokes.responses, respBytes, /*offset:*/ 0, /*length:*/ respLen, /*position:*/ null);
// Decode the response.
const resp = providerproto.InvokeResponse.deserializeBinary(new Uint8Array(respBytes));
// If there were failures, throw them here.
propagateFailures(tok, resp);
// Finally propagate any other properties that were given to us as outputs.
const resultValue = deserializeProperties(resp.getReturn());
return createLiftedPromise(resultValue);
function getProviderRefSync() {
const provider = getProvider(tok, opts);
if (provider === undefined) {
return undefined;
}
if (provider.__registrationId === undefined) {
// TODO(cyrusn): issue warning here that we are synchronously blocking an rpc call.
utils.promiseResult(ProviderResource.register(provider));
}
return provider.__registrationId;
}
}
// Expose the properties of the actual result of invoke directly on the promise itself. Note this
// doesn't actually involve any asynchrony. The promise will be created synchronously and the
// values copied to it can be used immediately. We simply make a Promise so that any consumers that
// do a `.then()` on it continue to work even though we've switched from being async to sync.
function createLiftedPromise(value: any): Promise<any> {
const promise = Promise.resolve(value);
Object.assign(promise, value);
return promise;
}
function createInvokeRequest(tok: string, serialized: any, provider: string | undefined, opts: InvokeOptions) {
const obj = gstruct.Struct.fromJavaScript(serialized);
const req = new providerproto.InvokeRequest();
req.setTok(tok);
req.setArgs(obj);
req.setProvider(provider);
req.setVersion(opts.version || "");
return req;
}
function getProvider(tok: string, opts: InvokeOptions) {
return opts.provider ? opts.provider :
opts.parent ? opts.parent.getProvider(tok) : undefined;
}
function serializePropertiesSync(prop: any): any {
if (prop === undefined ||
prop === null ||
typeof prop === "boolean" ||
typeof prop === "number" ||
typeof prop === "string") {
return prop;
}
if (asset.Asset.isInstance(prop) || asset.Archive.isInstance(prop)) {
throw new Error("Assets and Archives cannot be passed in as arguments to a data source call.");
}
if (prop instanceof Promise) {
throw new Error("Promises cannot be passed in as arguments to a data source call.");
}
if (Output.isInstance(prop)) {
throw new Error("Outputs cannot be passed in as arguments to a data source call.");
}
if (Resource.isInstance(prop)) {
throw new Error("Resources cannot be passed in as arguments to a data source call.");
}
if (prop instanceof Array) {
const result: any[] = [];
for (let i = 0; i < prop.length; i++) {
// When serializing arrays, we serialize any undefined values as `null`. This matches JSON semantics.
const elem = serializePropertiesSync(prop[i]);
result.push(elem === undefined ? null : elem);
}
return result;
}
return serializeAllKeys(prop, {});
function serializeAllKeys(innerProp: any, obj: any) {
for (const k of Object.keys(innerProp)) {
// When serializing an object, we omit any keys with undefined values. This matches JSON semantics.
const v = serializePropertiesSync(innerProp[k]);
if (v !== undefined) {
obj[k] = v;
}
}
return obj;
}
}
function propagateFailures(tok: string, resp: any) {
const failures: any = resp.getFailuresList();
if (failures && failures.length) {
let reasons = "";
for (let i = 0; i < failures.length; i++) {
if (reasons !== "") {
reasons += "; ";
}
reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
}
throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
}
}

View file

@ -25,6 +25,7 @@ import {
CustomResource,
CustomResourceOptions,
ID,
ProviderResource,
Resource,
ResourceOptions,
URN,
@ -329,14 +330,9 @@ async function prepareResource(label: string, res: Resource, custom: boolean,
let providerRef: string | undefined;
let importID: ID | undefined;
if (custom) {
if ((<CustomResourceOptions>opts).provider !== undefined) {
const provider = (<CustomResourceOptions>opts).provider!;
const providerURN = await provider.urn.promise();
const providerID = await provider.id.promise() || unknownValue;
providerRef = `${providerURN}::${providerID}`;
}
importID = (<CustomResourceOptions>opts).import;
const customOpts = <CustomResourceOptions>opts;
importID = customOpts.import;
providerRef = await ProviderResource.register(customOpts.provider)
}
// Collect the URNs for explicit/implicit dependencies for the engine so that it can understand
@ -412,7 +408,7 @@ async function getAllTransitivelyReferencedCustomResourceURNs(resources: Set<Res
// [Comp1, Cust1, Comp2, Cust2, Cust3]
const transitivelyReachableResources = getTransitivelyReferencedChildResourcesOfComponentResources(resources);
const transitivelyReachableCustomResources = [...transitivelyReachableResources].filter(r => CustomResource.isInstance(r));
const transitivelyReachableCustomResources = [...transitivelyReachableResources].filter(r => CustomResource.isInstance(r));
const promises = transitivelyReachableCustomResources.map(r => r.urn.promise());
const urns = await Promise.all(promises);
return new Set<string>(urns);
@ -535,7 +531,7 @@ export function registerResourceOutputs(res: Resource, outputs: Inputs | Promise
const label = `monitor.registerResourceOutputs(${urn}, ...)`;
await debuggablePromise(new Promise((resolve, reject) =>
(monitor as any).registerResourceOutputs(req, (err: grpc.ServiceError, innerResponse: any) => {
log.debug(`RegisterResourceOutputs RPC finished: urn=${urn}; `+
log.debug(`RegisterResourceOutputs RPC finished: urn=${urn}; ` +
`err: ${err}, resp: ${innerResponse}`);
if (err) {
// If the monitor is unavailable, it is in the process of shutting down or has already
@ -552,7 +548,7 @@ export function registerResourceOutputs(res: Resource, outputs: Inputs | Promise
resolve();
}
})), label);
}
}
}, false);
}

View file

@ -14,7 +14,7 @@
import * as asset from "../asset";
import * as log from "../log";
import { Input, Inputs, isSecretOutput, Output } from "../output";
import { Input, Inputs, Output } from "../output";
import { ComponentResource, CustomResource, Resource } from "../resource";
import { debuggablePromise, errorString } from "./debuggable";
import { excessiveDebugOutput, isDryRun, monitorSupportsSecrets } from "./settings";
@ -237,6 +237,10 @@ export const specialSecretSig = "1b47061264138c4ac30d75fd1eb44270";
* appropriate, in addition to translating certain "special" values so that they are ready to go on the wire.
*/
export async function serializeProperty(ctx: string, prop: Input<any>, dependentResources: Set<Resource>): Promise<any> {
// IMPORTANT:
// IMPORTANT: Keep this in sync with serializesPropertiesSync in invoke.ts
// IMPORTANT:
if (prop === undefined ||
prop === null ||
typeof prop === "boolean" ||

View file

@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import * as fs from "fs";
import * as grpc from "grpc";
import * as path from "path";
import { RunError } from "../errors";
import * as log from "../log";
import { ComponentResource, URN } from "../resource";
@ -41,6 +43,11 @@ export interface Options {
readonly testModeEnabled?: boolean; // true if we're in testing mode (allows execution without the CLI).
readonly queryMode?: boolean; // true if we're in query mode (does not allow resource registration).
readonly legacyApply?: boolean; // true if we will resolve missing outputs to inputs during preview.
/**
* Directory containing the send/receive files for making synchronous invokes to the engine.
*/
readonly syncDir?: string;
}
/**
@ -171,6 +178,25 @@ export function getMonitor(): Object | undefined {
return monitor;
}
/** @internal */
export interface SyncInvokes {
requests: number;
responses: number;
}
let syncInvokes: SyncInvokes | undefined;
/** @internal */
export function tryGetSyncInvokes(): SyncInvokes | undefined {
if (syncInvokes === undefined && options.syncDir) {
const requests = fs.openSync(path.join(options.syncDir, "invoke_req"), fs.constants.O_WRONLY|fs.constants.O_SYNC);
const responses = fs.openSync(path.join(options.syncDir, "invoke_res"), fs.constants.O_RDONLY|fs.constants.O_SYNC);
syncInvokes = { requests, responses };
}
return syncInvokes;
}
/**
* engine is a live connection to the engine, used for logging, etc. (lazily initialized).
*/
@ -226,6 +252,7 @@ function loadOptions(): Options {
engineAddr: process.env["PULUMI_NODEJS_ENGINE"],
testModeEnabled: (process.env["PULUMI_TEST_MODE"] === "true"),
legacyApply: (process.env["PULUMI_ENABLE_LEGACY_APPLY"] === "true"),
syncDir: process.env["PULUMI_NODEJS_SYNC"],
};
}

View file

@ -16,7 +16,7 @@
import * as assert from "assert";
import { asyncTest } from "./util";
import { promiseResult, liftProperties } from "../utils";
import { promiseResult } from "../utils";
describe("deasync", () => {
it("handles simple promise", () => {
@ -57,52 +57,52 @@ describe("deasync", () => {
assert.equal(result, actual);
});
it("lift properties", asyncTest(async () => {
const actual = { a: "foo", b: 4, c: true, d: [function() {}] };
// it("lift properties", asyncTest(async () => {
// const actual = { a: "foo", b: 4, c: true, d: [function() {}] };
const promise = new Promise<typeof actual>((resolve) => {
resolve(actual);
});
// const promise = new Promise<typeof actual>((resolve) => {
// resolve(actual);
// });
const combinedResult = liftProperties(promise);
// const combinedResult = liftProperties(promise);
// check that we've lifted the values properly.
for (const key of Object.keys(actual)) {
const value = (<any>actual)[key];
assert.deepStrictEqual(value, (<any>combinedResult)[key]);
}
// // check that we've lifted the values properly.
// for (const key of Object.keys(actual)) {
// const value = (<any>actual)[key];
// assert.deepStrictEqual(value, (<any>combinedResult)[key]);
// }
// also check that we have a proper promise to work with:
const promiseValue = await combinedResult;
for (const key of Object.keys(actual)) {
const value = (<any>actual)[key];
assert.deepStrictEqual(value, (<any>promiseValue)[key]);
}
// // also check that we have a proper promise to work with:
// const promiseValue = await combinedResult;
// for (const key of Object.keys(actual)) {
// const value = (<any>actual)[key];
// assert.deepStrictEqual(value, (<any>promiseValue)[key]);
// }
// also ensure that .then works
await combinedResult.then(v => {
for (const key of Object.keys(actual)) {
const value = (<any>actual)[key];
assert.deepStrictEqual(value, (<any>v)[key]);
}
});
}));
// // also ensure that .then works
// await combinedResult.then(v => {
// for (const key of Object.keys(actual)) {
// const value = (<any>actual)[key];
// assert.deepStrictEqual(value, (<any>v)[key]);
// }
// });
// }));
it("lift properties throws", () => {
const message = "etc";
const promise = new Promise<number>((resolve, reject) => {
reject(new Error(message));
});
// it("lift properties throws", () => {
// const message = "etc";
// const promise = new Promise<number>((resolve, reject) => {
// reject(new Error(message));
// });
try {
const result = liftProperties(promise);
assert.fail("Should not be able to reach here 1.")
}
catch (err) {
assert.equal(err.message, message);
return;
}
// try {
// const result = liftProperties(promise);
// assert.fail("Should not be able to reach here 1.")
// }
// catch (err) {
// assert.equal(err.message, message);
// return;
// }
assert.fail("Should not be able to reach here 2.")
});
// assert.fail("Should not be able to reach here 2.")
// });
});

View file

@ -6,12 +6,34 @@ let pulumi = require("../../../../../");
let args = {
a: "hello",
b: true,
c: [ 0.99, 42, { z: "x" } ],
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result = pulumi.runtime.invoke("invoke:index:echo", args);
result.then((v) => {
let result1 = pulumi.runtime.invoke("invoke:index:echo", args);
// When invoking synchronously: Ensure the properties come back synchronously and are present on the
// result.
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
// When invoking synchronously: Ensure the properties are available asynchronously through normal
// Promise semantics.
result1.then(v => {
assert.deepEqual(v, args);
});
let result2 = pulumi.runtime.invoke("invoke:index:echo", args, { async: true });
// When invoking asynchronously: Ensure the properties are *not* present on the result.
for (const key in args) {
assert.notDeepEqual(result2[key], args[key]);
}
// When invoking asynchronously: Ensure the properties are available asynchronously through normal
// Promise semantics.
result2.then(v => {
assert.deepEqual(v, args);
});

View file

@ -10,11 +10,13 @@ new MyResource("testResource", "0.19.1");
new MyResource("testResource2", "0.19.2");
new MyResource("testResource3");
pulumi.runtime.invoke("invoke:index:doit", {}, { version: "0.19.1" });
pulumi.runtime.invoke("invoke:index:doit_v2", {}, { version: "0.19.2" });
pulumi.runtime.invoke("invoke:index:doit_noversion", {});
pulumi.runtime.invoke("invoke:index:doit", {}, { version: "0.19.1" }, { async: true });
pulumi.runtime.invoke("invoke:index:doit_v2", {}, { version: "0.19.2" }, { async: true });
pulumi.runtime.invoke("invoke:index:doit_noversion", {}, { async: true });
new pulumi.CustomResource("test:index:ReadResource", "foo", {}, { id: "readme", version: "0.20.0" });
new pulumi.CustomResource("test:index:ReadResource", "foo_noversion", {}, { id: "readme" });

View file

@ -0,0 +1,35 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
const provider = new Provider("p");
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { provider });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { provider });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { provider, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});

View file

@ -0,0 +1,42 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
class Resource extends pulumi.CustomResource {
constructor(name, opts) {
super("test:index:Resource", name, {}, opts)
}
}
const provider = new Provider("p");
const parent = new Resource("r", { provider })
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { parent });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { parent });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { parent, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});

View file

@ -0,0 +1,37 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
(async () => {
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
const provider = await pulumi.ProviderRef.get(new Provider("p"));
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { provider });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { provider });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { provider, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});
})();

View file

@ -0,0 +1,44 @@
// Test the ability to invoke provider functions via RPC.
let assert = require("assert");
let pulumi = require("../../../../../");
(async () => {
class Provider extends pulumi.ProviderResource {
constructor(name, opts) {
super("test", name, {}, opts);
}
}
class Resource extends pulumi.CustomResource {
constructor(name, opts) {
super("test:index:Resource", name, {}, opts)
}
}
const provider = await pulumi.ProviderRef.get(new Provider("p"));
const parent = new Resource("r", { provider })
let args = {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
};
let result1 = pulumi.runtime.invoke("test:index:echo", args, { parent });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { parent });
result2.then((v) => {
assert.deepEqual(v, args);
});
let result3 = pulumi.runtime.invoke("test:index:echo", args, { parent, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});
})();

View file

@ -46,7 +46,7 @@ interface RunCase {
};
skipRootResourceEndpoints?: boolean;
showRootResourceRegistration?: boolean;
invoke?: (ctx: any, tok: string, args: any, version: string) => { failures: any, ret: any };
invoke?: (ctx: any, tok: string, args: any, version: string, provider: string) => { failures: any, ret: any };
readResource?: (ctx: any, t: string, name: string, id: string, par: string, state: any, version: string) => {
urn: URN | undefined, props: any | undefined,
};
@ -303,7 +303,8 @@ describe("rpc", () => {
"invoke": {
program: path.join(base, "009.invoke"),
expectResourceCount: 0,
invoke: (ctx: any, tok: string, args: any, version: string) => {
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "");
assert.strictEqual(tok, "invoke:index:echo");
assert.deepEqual(args, {
a: "hello",
@ -897,10 +898,92 @@ describe("rpc", () => {
});
},
},
"provider_invokes": {
program: path.join(base, "060.provider_invokes"),
expectResourceCount: 1,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any) => {
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
"provider_in_parent_invokes": {
program: path.join(base, "061.provider_in_parent_invokes"),
expectResourceCount: 2,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any, dependencies?: string[],
custom?: boolean, protect?: boolean, parent?: string, provider?: string) => {
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
"providerref_invokes": {
program: path.join(base, "062.providerref_invokes"),
expectResourceCount: 1,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any) => {
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
"providerref_in_parent_invokes": {
program: path.join(base, "063.providerref_in_parent_invokes"),
expectResourceCount: 2,
registerResource: (ctx: any, dryrun: boolean, t: string, name: string, res: any, dependencies?: string[],
custom?: boolean, protect?: boolean, parent?: string, provider?: string) => {
if (name === "c") {
assert.equal(provider, "");
}
return { urn: makeUrn(t, name), id: name === "p" ? "1" : undefined, props: undefined };
},
invoke: (ctx: any, tok: string, args: any, version: string, provider: string) => {
assert.strictEqual(provider, "pulumi:providers:test::p::1");
assert.strictEqual(tok, "test:index:echo");
assert.deepEqual(args, {
a: "hello",
b: true,
c: [0.99, 42, { z: "x" }],
id: "some-id",
urn: "some-urn",
});
return { failures: undefined, ret: args };
},
},
};
for (const casename of Object.keys(cases)) {
// if (casename !== "stack_exports") {
// if (casename.indexOf("provider_in_parent_invokes") < 0) {
// continue;
// }
@ -925,7 +1008,7 @@ describe("rpc", () => {
const args: any = req.getArgs().toJavaScript();
const version: string = req.getVersion();
const { failures, ret } =
opts.invoke(ctx, req.getTok(), args, version);
opts.invoke(ctx, req.getTok(), args, version, req.getProvider());
resp.setFailuresList(failures);
resp.setReturn(gstruct.Struct.fromJavaScript(ret));
}

View file

@ -94,31 +94,12 @@ export function promiseResult<T>(promise: Promise<T>): T {
}
/**
* Lifts the properties of the value a promise resolves to and adds them to the promise itself. This
* can be used to take a function that was previously async (i.e. Promise-returning) and make it
* synchronous in a backward compatible fashion. Specifically, because the function now returns a
* `Promise<T> & T` all properties on it will be available for sync consumers, while async consumers
* can still use `await` on it or call `.then(...)` on it.
* No longer supported. This function is now a no-op and will directly return the promise passed
* into it.
*
* This is an advanced compat function for libraries and should not generally be used by normal
* Pulumi application.
*/
export function liftProperties<T>(promise: Promise<T>, opts: InvokeOptions = {}): Promise<T> & T {
if (opts.async) {
// Caller just wants the async side of the result. That's what we have, so just return it
// as is.
//
// Note: this cast isn't actually safe (since 'promise' doesn't actually provide the T side
// of things). That's ok. By default the return signature will be correct, and users will
// only get into this code path when specifically trying to force asynchrony. Given that,
// it's fine to expect them to have to know what they're doing and that they shoud only use
// the Promise side of things.
return <Promise<T> & T>promise;
}
// Caller wants the async side and the sync side merged. Block on getting the underlying
// promise value, then take all the properties from it and copy over onto the promise itself and
// return the combined set of each.
const value = promiseResult(promise);
return Object.assign(promise, value);
return <any>promise;
}