2018-05-22 21:43:36 +02:00
|
|
|
// Copyright 2016-2018, Pulumi Corporation.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2017-09-27 21:34:44 +02:00
|
|
|
|
2019-10-15 07:08:06 +02:00
|
|
|
import * as fs from "fs";
|
2018-05-17 00:37:34 +02:00
|
|
|
import * as grpc from "grpc";
|
2019-10-22 09:20:26 +02:00
|
|
|
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-12 00:00:46 +01:00
|
|
|
import { AsyncIterable } from "@pulumi/query/interfaces";
|
|
|
|
|
2019-10-15 07:08:06 +02:00
|
|
|
import * as asset from "../asset";
|
2019-11-19 21:51:14 +01:00
|
|
|
import { Config } from "../config";
|
Implement first-class providers. (#1695)
### First-Class Providers
These changes implement support for first-class providers. First-class
providers are provider plugins that are exposed as resources via the
Pulumi programming model so that they may be explicitly and multiply
instantiated. Each instance of a provider resource may be configured
differently, and configuration parameters may be source from the
outputs of other resources.
### Provider Plugin Changes
In order to accommodate the need to verify and diff provider
configuration and configure providers without complete configuration
information, these changes adjust the high-level provider plugin
interface. Two new methods for validating a provider's configuration
and diffing changes to the same have been added (`CheckConfig` and
`DiffConfig`, respectively), and the type of the configuration bag
accepted by `Configure` has been changed to a `PropertyMap`.
These changes have not yet been reflected in the provider plugin gRPC
interface. We will do this in a set of follow-up changes. Until then,
these methods are implemented by adapters:
- `CheckConfig` validates that all configuration parameters are string
or unknown properties. This is necessary because existing plugins
only accept string-typed configuration values.
- `DiffConfig` either returns "never replace" if all configuration
values are known or "must replace" if any configuration value is
unknown. The justification for this behavior is given
[here](https://github.com/pulumi/pulumi/pull/1695/files#diff-a6cd5c7f337665f5bb22e92ca5f07537R106)
- `Configure` converts the config bag to a legacy config map and
configures the provider plugin if all config values are known. If any
config value is unknown, the underlying plugin is not configured and
the provider may only perform `Check`, `Read`, and `Invoke`, all of
which return empty results. We justify this behavior becuase it is
only possible during a preview and provides the best experience we
can manage with the existing gRPC interface.
### Resource Model Changes
Providers are now exposed as resources that participate in a stack's
dependency graph. Like other resources, they are explicitly created,
may have multiple instances, and may have dependencies on other
resources. Providers are referred to using provider references, which
are a combination of the provider's URN and its ID. This design
addresses the need during a preview to refer to providers that have not
yet been physically created and therefore have no ID.
All custom resources that are not themselves providers must specify a
single provider via a provider reference. The named provider will be
used to manage that resource's CRUD operations. If a resource's
provider reference changes, the resource must be replaced. Though its
URN is not present in the resource's dependency list, the provider
should be treated as a dependency of the resource when topologically
sorting the dependency graph.
Finally, `Invoke` operations must now specify a provider to use for the
invocation via a provider reference.
### Engine Changes
First-class providers support requires a few changes to the engine:
- The engine must have some way to map from provider references to
provider plugins. It must be possible to add providers from a stack's
checkpoint to this map and to register new/updated providers during
the execution of a plan in response to CRUD operations on provider
resources.
- In order to support updating existing stacks using existing Pulumi
programs that may not explicitly instantiate providers, the engine
must be able to manage the "default" providers for each package
referenced by a checkpoint or Pulumi program. The configuration for
a "default" provider is taken from the stack's configuration data.
The former need is addressed by adding a provider registry type that is
responsible for managing all of the plugins required by a plan. In
addition to loading plugins froma checkpoint and providing the ability
to map from a provider reference to a provider plugin, this type serves
as the provider plugin for providers themselves (i.e. it is the
"provider provider").
The latter need is solved via two relatively self-contained changes to
plan setup and the eval source.
During plan setup, the old checkpoint is scanned for custom resources
that do not have a provider reference in order to compute the set of
packages that require a default provider. Once this set has been
computed, the required default provider definitions are conjured and
prepended to the checkpoint's resource list. Each resource that
requires a default provider is then updated to refer to the default
provider for its package.
While an eval source is running, each custom resource registration,
resource read, and invoke that does not name a provider is trapped
before being returned by the source iterator. If no default provider
for the appropriate package has been registered, the eval source
synthesizes an appropriate registration, waits for it to complete, and
records the registered provider's reference. This reference is injected
into the original request, which is then processed as usual. If a
default provider was already registered, the recorded reference is
used and no new registration occurs.
### SDK Changes
These changes only expose first-class providers from the Node.JS SDK.
- A new abstract class, `ProviderResource`, can be subclassed and used
to instantiate first-class providers.
- A new field in `ResourceOptions`, `provider`, can be used to supply
a particular provider instance to manage a `CustomResource`'s CRUD
operations.
- A new type, `InvokeOptions`, can be used to specify options that
control the behavior of a call to `pulumi.runtime.invoke`. This type
includes a `provider` field that is analogous to
`ResourceOptions.provider`.
2018-08-07 02:50:29 +02:00
|
|
|
import { InvokeOptions } from "../invoke";
|
2017-10-08 21:10:46 +02:00
|
|
|
import * as log from "../log";
|
2019-10-15 07:08:06 +02:00
|
|
|
import { Inputs, Output } from "../output";
|
2017-09-27 21:34:44 +02:00
|
|
|
import { debuggablePromise } from "./debuggable";
|
2019-10-15 07:08:06 +02:00
|
|
|
import { deserializeProperties, serializeProperties } from "./rpc";
|
|
|
|
import { excessiveDebugOutput, getMonitor, rpcKeepAlive, SyncInvokes, tryGetSyncInvokes } from "./settings";
|
|
|
|
|
|
|
|
import { ProviderResource, Resource } from "../resource";
|
|
|
|
import * as utils from "../utils";
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-12 00:00:46 +01:00
|
|
|
import { PushableAsyncIterable } from "./asyncIterableUtil";
|
2017-09-27 21:34:44 +02:00
|
|
|
|
2018-01-25 22:34:21 +01:00
|
|
|
const gstruct = require("google-protobuf/google/protobuf/struct_pb.js");
|
2019-01-31 20:19:36 +01:00
|
|
|
const providerproto = require("../proto/provider_pb.js");
|
2017-09-27 21:34:44 +02:00
|
|
|
|
|
|
|
/**
|
2019-10-15 07:08:06 +02:00
|
|
|
* `invoke` dynamically invokes the function, `tok`, which is offered by a provider plugin. `invoke`
|
|
|
|
* behaves differently in the case that options contains `{async:true}` or not.
|
|
|
|
*
|
|
|
|
* In the case where `{async:true}` is present in the options bag:
|
|
|
|
*
|
|
|
|
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider plugin.
|
|
|
|
* 2. the `props` inputs can be a bag of computed values (including, `T`s, `Promise<T>`s,
|
|
|
|
* `Output<T>`s etc.).
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* In the case where `{async:true}` is not present in the options bag:
|
|
|
|
*
|
|
|
|
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider call.
|
|
|
|
* However, that Promise will *also* have the respective values of the Provider result exposed
|
|
|
|
* directly on it as properties.
|
|
|
|
*
|
|
|
|
* 2. The inputs must be a bag of simple values, and the result is the result that the Provider
|
|
|
|
* produced.
|
|
|
|
*
|
|
|
|
* Simple values are:
|
|
|
|
* 1. `undefined`, `null`, string, number or boolean values.
|
|
|
|
* 2. arrays of simple values.
|
|
|
|
* 3. objects containing only simple values.
|
|
|
|
*
|
|
|
|
* Importantly, simple values do *not* include:
|
|
|
|
* 1. `Promise`s
|
|
|
|
* 2. `Output`s
|
|
|
|
* 3. `Asset`s or `Archive`s
|
|
|
|
* 4. `Resource`s.
|
|
|
|
*
|
|
|
|
* All of these contain async values that would prevent `invoke from being able to operate
|
|
|
|
* synchronously.
|
2017-09-27 21:34:44 +02:00
|
|
|
*/
|
2019-10-15 07:08:06 +02:00
|
|
|
export function invoke(tok: string, props: Inputs, opts: InvokeOptions = {}): Promise<any> {
|
|
|
|
if (opts.async) {
|
2019-11-19 21:51:14 +01:00
|
|
|
// User specifically requested async invoking. Respect that.
|
2019-10-15 07:08:06 +02:00
|
|
|
return invokeAsync(tok, props, opts);
|
|
|
|
}
|
|
|
|
|
2019-11-19 21:51:14 +01:00
|
|
|
const config = new Config("pulumi");
|
|
|
|
const noSyncCalls = config.getBoolean("noSyncCalls");
|
|
|
|
if (noSyncCalls) {
|
|
|
|
// User globally disabled sync invokes.
|
|
|
|
return invokeAsync(tok, props, opts);
|
|
|
|
}
|
|
|
|
|
|
|
|
const syncResult = invokeSync(tok, props, opts);
|
|
|
|
|
|
|
|
// Wrap the synchronous value in a Promise view as well so that consumers can treat it
|
|
|
|
// either as the real value or something they can use as a Promise.
|
|
|
|
return createLiftedPromise(syncResult);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Invokes the provided token *synchronously* no matter what.
|
|
|
|
* @internal
|
|
|
|
*/
|
|
|
|
export function invokeSync<T>(tok: string, props: Inputs, opts: InvokeOptions = {}): T {
|
2019-10-15 07:08:06 +02:00
|
|
|
const syncInvokes = tryGetSyncInvokes();
|
|
|
|
if (!syncInvokes) {
|
|
|
|
// We weren't launched from a pulumi CLI that supports sync-invokes. Let the user know they
|
|
|
|
// should update and fall back to synchronously blocking on the async invoke.
|
2019-11-19 21:51:14 +01:00
|
|
|
return invokeFallbackToAsync<T>(tok, props, opts);
|
2018-08-11 01:18:21 +02:00
|
|
|
}
|
|
|
|
|
2019-11-19 21:51:14 +01:00
|
|
|
return invokeSyncWorker<T>(tok, props, opts, syncInvokes);
|
2019-10-15 07:08:06 +02:00
|
|
|
}
|
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 20:36:11 +01:00
|
|
|
export async function streamInvoke(
|
2019-10-22 09:20:26 +02:00
|
|
|
tok: string,
|
|
|
|
props: Inputs,
|
|
|
|
opts: InvokeOptions = {},
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 20:36:11 +01:00
|
|
|
): Promise<StreamInvokeResponse<any>> {
|
|
|
|
const label = `StreamInvoking function: tok=${tok} asynchronously`;
|
|
|
|
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
|
|
|
|
|
|
|
|
// Wait for all values to be available, and then perform the RPC.
|
|
|
|
const done = rpcKeepAlive();
|
|
|
|
try {
|
|
|
|
const serialized = await serializeProperties(`streamInvoke:${tok}`, props);
|
|
|
|
log.debug(
|
|
|
|
`StreamInvoke RPC prepared: tok=${tok}` + excessiveDebugOutput
|
|
|
|
? `, obj=${JSON.stringify(serialized)}`
|
|
|
|
: ``,
|
|
|
|
);
|
|
|
|
|
|
|
|
// Fetch the monitor and make an RPC request.
|
|
|
|
const monitor: any = getMonitor();
|
|
|
|
|
|
|
|
const provider = await ProviderResource.register(getProvider(tok, opts));
|
|
|
|
const req = createInvokeRequest(tok, serialized, provider, opts);
|
|
|
|
|
|
|
|
// Call `streamInvoke`.
|
|
|
|
const call = monitor.streamInvoke(req, {});
|
|
|
|
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-12 00:00:46 +01:00
|
|
|
const queue = new PushableAsyncIterable();
|
|
|
|
call.on("data", function(thing: any) {
|
|
|
|
const live = deserializeResponse(tok, thing);
|
|
|
|
queue.push(live);
|
|
|
|
});
|
|
|
|
call.on("error", (err: any) => {
|
|
|
|
if (err.code === 1 && err.details === "Cancelled") {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
throw err;
|
|
|
|
});
|
|
|
|
call.on("end", () => {
|
|
|
|
queue.complete();
|
|
|
|
});
|
2019-10-22 09:20:26 +02:00
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 20:36:11 +01:00
|
|
|
// Return a cancellable handle to the stream.
|
|
|
|
return new StreamInvokeResponse(
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-12 00:00:46 +01:00
|
|
|
queue,
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 20:36:11 +01:00
|
|
|
() => call.cancel());
|
|
|
|
} finally {
|
|
|
|
done();
|
|
|
|
}
|
2019-10-22 09:20:26 +02:00
|
|
|
}
|
|
|
|
|
2019-11-19 21:51:14 +01:00
|
|
|
export function invokeFallbackToAsync<T>(tok: string, props: Inputs, opts: InvokeOptions): T {
|
|
|
|
return utils.promiseResult(invokeAsync(tok, props, opts));
|
2019-10-15 07:08:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
async function invokeAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
|
|
|
|
const label = `Invoking function: tok=${tok} asynchronously`;
|
|
|
|
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
|
|
|
|
|
2018-01-26 00:26:39 +01:00
|
|
|
// Wait for all values to be available, and then perform the RPC.
|
|
|
|
const done = rpcKeepAlive();
|
|
|
|
try {
|
2019-10-15 07:08:06 +02:00
|
|
|
const serialized = await serializeProperties(`invoke:${tok}`, props);
|
|
|
|
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``);
|
2018-01-26 00:26:39 +01:00
|
|
|
|
|
|
|
// Fetch the monitor and make an RPC request.
|
|
|
|
const monitor: any = getMonitor();
|
|
|
|
|
2019-10-15 07:08:06 +02:00
|
|
|
const provider = await ProviderResource.register(getProvider(tok, opts));
|
|
|
|
const req = createInvokeRequest(tok, serialized, provider, opts);
|
Implement first-class providers. (#1695)
### First-Class Providers
These changes implement support for first-class providers. First-class
providers are provider plugins that are exposed as resources via the
Pulumi programming model so that they may be explicitly and multiply
instantiated. Each instance of a provider resource may be configured
differently, and configuration parameters may be source from the
outputs of other resources.
### Provider Plugin Changes
In order to accommodate the need to verify and diff provider
configuration and configure providers without complete configuration
information, these changes adjust the high-level provider plugin
interface. Two new methods for validating a provider's configuration
and diffing changes to the same have been added (`CheckConfig` and
`DiffConfig`, respectively), and the type of the configuration bag
accepted by `Configure` has been changed to a `PropertyMap`.
These changes have not yet been reflected in the provider plugin gRPC
interface. We will do this in a set of follow-up changes. Until then,
these methods are implemented by adapters:
- `CheckConfig` validates that all configuration parameters are string
or unknown properties. This is necessary because existing plugins
only accept string-typed configuration values.
- `DiffConfig` either returns "never replace" if all configuration
values are known or "must replace" if any configuration value is
unknown. The justification for this behavior is given
[here](https://github.com/pulumi/pulumi/pull/1695/files#diff-a6cd5c7f337665f5bb22e92ca5f07537R106)
- `Configure` converts the config bag to a legacy config map and
configures the provider plugin if all config values are known. If any
config value is unknown, the underlying plugin is not configured and
the provider may only perform `Check`, `Read`, and `Invoke`, all of
which return empty results. We justify this behavior becuase it is
only possible during a preview and provides the best experience we
can manage with the existing gRPC interface.
### Resource Model Changes
Providers are now exposed as resources that participate in a stack's
dependency graph. Like other resources, they are explicitly created,
may have multiple instances, and may have dependencies on other
resources. Providers are referred to using provider references, which
are a combination of the provider's URN and its ID. This design
addresses the need during a preview to refer to providers that have not
yet been physically created and therefore have no ID.
All custom resources that are not themselves providers must specify a
single provider via a provider reference. The named provider will be
used to manage that resource's CRUD operations. If a resource's
provider reference changes, the resource must be replaced. Though its
URN is not present in the resource's dependency list, the provider
should be treated as a dependency of the resource when topologically
sorting the dependency graph.
Finally, `Invoke` operations must now specify a provider to use for the
invocation via a provider reference.
### Engine Changes
First-class providers support requires a few changes to the engine:
- The engine must have some way to map from provider references to
provider plugins. It must be possible to add providers from a stack's
checkpoint to this map and to register new/updated providers during
the execution of a plan in response to CRUD operations on provider
resources.
- In order to support updating existing stacks using existing Pulumi
programs that may not explicitly instantiate providers, the engine
must be able to manage the "default" providers for each package
referenced by a checkpoint or Pulumi program. The configuration for
a "default" provider is taken from the stack's configuration data.
The former need is addressed by adding a provider registry type that is
responsible for managing all of the plugins required by a plan. In
addition to loading plugins froma checkpoint and providing the ability
to map from a provider reference to a provider plugin, this type serves
as the provider plugin for providers themselves (i.e. it is the
"provider provider").
The latter need is solved via two relatively self-contained changes to
plan setup and the eval source.
During plan setup, the old checkpoint is scanned for custom resources
that do not have a provider reference in order to compute the set of
packages that require a default provider. Once this set has been
computed, the required default provider definitions are conjured and
prepended to the checkpoint's resource list. Each resource that
requires a default provider is then updated to refer to the default
provider for its package.
While an eval source is running, each custom resource registration,
resource read, and invoke that does not name a provider is trapped
before being returned by the source iterator. If no default provider
for the appropriate package has been registered, the eval source
synthesizes an appropriate registration, waits for it to complete, and
records the registered provider's reference. This reference is injected
into the original request, which is then processed as usual. If a
default provider was already registered, the recorded reference is
used and no new registration occurs.
### SDK Changes
These changes only expose first-class providers from the Node.JS SDK.
- A new abstract class, `ProviderResource`, can be subclassed and used
to instantiate first-class providers.
- A new field in `ResourceOptions`, `provider`, can be used to supply
a particular provider instance to manage a `CustomResource`'s CRUD
operations.
- A new type, `InvokeOptions`, can be used to specify options that
control the behavior of a call to `pulumi.runtime.invoke`. This type
includes a `provider` field that is analogous to
`ResourceOptions.provider`.
2018-08-07 02:50:29 +02:00
|
|
|
|
2018-02-05 23:44:23 +01:00
|
|
|
const resp: any = await debuggablePromise(new Promise((innerResolve, innerReject) =>
|
2018-06-07 20:21:38 +02:00
|
|
|
monitor.invoke(req, (err: grpc.StatusObject, innerResponse: any) => {
|
2018-01-26 00:26:39 +01:00
|
|
|
log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
|
|
|
|
if (err) {
|
2018-05-17 00:37:34 +02:00
|
|
|
// If the monitor is unavailable, it is in the process of shutting down or has already
|
2018-10-10 19:01:57 +02:00
|
|
|
// shut down. Don't emit an error and don't do any more RPCs, just exit.
|
2018-05-17 00:37:34 +02:00
|
|
|
if (err.code === grpc.status.UNAVAILABLE) {
|
|
|
|
log.debug("Resource monitor is terminating");
|
2018-10-10 19:01:57 +02:00
|
|
|
process.exit(0);
|
2018-05-17 00:37:34 +02:00
|
|
|
}
|
|
|
|
|
2018-06-07 20:21:38 +02:00
|
|
|
// If the RPC failed, rethrow the error with a native exception and the message that
|
|
|
|
// the engine provided - it's suitable for user presentation.
|
|
|
|
innerReject(new Error(err.details));
|
2018-01-26 00:26:39 +01:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
innerResolve(innerResponse);
|
2017-09-27 21:34:44 +02:00
|
|
|
}
|
2018-11-25 03:57:17 +01:00
|
|
|
})), label);
|
2017-09-27 21:34:44 +02:00
|
|
|
|
2018-01-26 00:26:39 +01:00
|
|
|
// Finally propagate any other properties that were given to us as outputs.
|
2019-10-15 07:08:06 +02:00
|
|
|
return deserializeResponse(tok, resp);
|
2018-01-26 00:26:39 +01:00
|
|
|
}
|
|
|
|
finally {
|
|
|
|
done();
|
|
|
|
}
|
2017-09-27 21:34:44 +02:00
|
|
|
}
|
2019-10-15 07:08:06 +02:00
|
|
|
|
2019-11-19 21:51:14 +01:00
|
|
|
function invokeSyncWorker<T>(tok: string, props: any, opts: InvokeOptions, syncInvokes: SyncInvokes): T {
|
2019-10-15 07:08:06 +02:00
|
|
|
const label = `Invoking function: tok=${tok} synchronously`;
|
|
|
|
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
|
|
|
|
|
|
|
|
const serialized = serializePropertiesSync(props);
|
|
|
|
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``);
|
|
|
|
|
|
|
|
const providerRef = getProviderRefSync();
|
|
|
|
const req = createInvokeRequest(tok, serialized, providerRef, opts);
|
|
|
|
|
|
|
|
// Encode the request.
|
|
|
|
const reqBytes = Buffer.from(req.serializeBinary());
|
|
|
|
|
|
|
|
// Write the request length.
|
|
|
|
const reqLen = Buffer.alloc(4);
|
|
|
|
reqLen.writeUInt32BE(reqBytes.length, /*offset:*/ 0);
|
|
|
|
fs.writeSync(syncInvokes.requests, reqLen);
|
|
|
|
fs.writeSync(syncInvokes.requests, reqBytes);
|
|
|
|
|
|
|
|
// Read the response.
|
|
|
|
const respLenBytes = Buffer.alloc(4);
|
|
|
|
fs.readSync(syncInvokes.responses, respLenBytes, /*offset:*/ 0, /*length:*/ 4, /*position:*/ null);
|
|
|
|
const respLen = respLenBytes.readUInt32BE(/*offset:*/ 0);
|
|
|
|
const respBytes = Buffer.alloc(respLen);
|
|
|
|
fs.readSync(syncInvokes.responses, respBytes, /*offset:*/ 0, /*length:*/ respLen, /*position:*/ null);
|
|
|
|
|
|
|
|
// Decode the response.
|
|
|
|
const resp = providerproto.InvokeResponse.deserializeBinary(new Uint8Array(respBytes));
|
|
|
|
const resultValue = deserializeResponse(tok, resp);
|
|
|
|
|
2019-11-19 21:51:14 +01:00
|
|
|
return resultValue;
|
2019-10-15 07:08:06 +02:00
|
|
|
|
|
|
|
function getProviderRefSync() {
|
|
|
|
const provider = getProvider(tok, opts);
|
|
|
|
|
|
|
|
if (provider === undefined) {
|
|
|
|
return undefined;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (provider.__registrationId === undefined) {
|
2019-11-19 21:51:14 +01:00
|
|
|
// Have to do an explicit console.log here as the call to utils.promiseResult may hang
|
|
|
|
// node, and that may prevent our normal logging calls from making it back to the user.
|
|
|
|
console.log(
|
|
|
|
`Synchronous call made to "${tok}" with an unregistered provider. This is now deprecated and may cause the program to hang.
|
2019-10-15 07:08:06 +02:00
|
|
|
For more details see: https://www.pulumi.com/docs/troubleshooting/#synchronous-call`);
|
|
|
|
utils.promiseResult(ProviderResource.register(provider));
|
|
|
|
}
|
|
|
|
|
|
|
|
return provider.__registrationId;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 20:36:11 +01:00
|
|
|
// StreamInvokeResponse represents a (potentially infinite) streaming response to `streamInvoke`,
|
|
|
|
// with facilities to gracefully cancel and clean up the stream.
|
|
|
|
export class StreamInvokeResponse<T> implements AsyncIterable<T> {
|
|
|
|
constructor(
|
|
|
|
private source: AsyncIterable<T>,
|
|
|
|
private cancelSource: () => void,
|
|
|
|
) {}
|
|
|
|
|
|
|
|
// cancel signals the `streamInvoke` should be cancelled and cleaned up gracefully.
|
|
|
|
public cancel() {
|
|
|
|
this.cancelSource();
|
|
|
|
}
|
2019-10-22 09:20:26 +02:00
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 20:36:11 +01:00
|
|
|
[Symbol.asyncIterator]() {
|
|
|
|
return this.source[Symbol.asyncIterator]();
|
2019-10-22 09:20:26 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-15 07:08:06 +02:00
|
|
|
// Expose the properties of the actual result of invoke directly on the promise itself. Note this
|
|
|
|
// doesn't actually involve any asynchrony. The promise will be created synchronously and the
|
|
|
|
// values copied to it can be used immediately. We simply make a Promise so that any consumers that
|
|
|
|
// do a `.then()` on it continue to work even though we've switched from being async to sync.
|
|
|
|
function createLiftedPromise(value: any): Promise<any> {
|
|
|
|
const promise = Promise.resolve(value);
|
|
|
|
Object.assign(promise, value);
|
|
|
|
return promise;
|
|
|
|
}
|
|
|
|
|
|
|
|
function createInvokeRequest(tok: string, serialized: any, provider: string | undefined, opts: InvokeOptions) {
|
|
|
|
if (provider !== undefined && typeof provider !== "string") {
|
|
|
|
throw new Error("Incorrect provider type.");
|
|
|
|
}
|
|
|
|
|
|
|
|
const obj = gstruct.Struct.fromJavaScript(serialized);
|
|
|
|
|
|
|
|
const req = new providerproto.InvokeRequest();
|
|
|
|
req.setTok(tok);
|
|
|
|
req.setArgs(obj);
|
|
|
|
req.setProvider(provider);
|
|
|
|
req.setVersion(opts.version || "");
|
|
|
|
return req;
|
|
|
|
}
|
|
|
|
|
|
|
|
function getProvider(tok: string, opts: InvokeOptions) {
|
|
|
|
return opts.provider ? opts.provider :
|
|
|
|
opts.parent ? opts.parent.getProvider(tok) : undefined;
|
|
|
|
}
|
|
|
|
|
|
|
|
function serializePropertiesSync(prop: any): any {
|
|
|
|
if (prop === undefined ||
|
|
|
|
prop === null ||
|
|
|
|
typeof prop === "boolean" ||
|
|
|
|
typeof prop === "number" ||
|
|
|
|
typeof prop === "string") {
|
|
|
|
|
|
|
|
return prop;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (asset.Asset.isInstance(prop) || asset.Archive.isInstance(prop)) {
|
|
|
|
throw new Error("Assets and Archives cannot be passed in as arguments to a data source call.");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (prop instanceof Promise) {
|
|
|
|
throw new Error("Promises cannot be passed in as arguments to a data source call.");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (Output.isInstance(prop)) {
|
|
|
|
throw new Error("Outputs cannot be passed in as arguments to a data source call.");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (Resource.isInstance(prop)) {
|
|
|
|
throw new Error("Resources cannot be passed in as arguments to a data source call.");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (prop instanceof Array) {
|
|
|
|
const result: any[] = [];
|
|
|
|
for (let i = 0; i < prop.length; i++) {
|
|
|
|
// When serializing arrays, we serialize any undefined values as `null`. This matches JSON semantics.
|
|
|
|
const elem = serializePropertiesSync(prop[i]);
|
|
|
|
result.push(elem === undefined ? null : elem);
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
return serializeAllKeys(prop, {});
|
|
|
|
|
|
|
|
function serializeAllKeys(innerProp: any, obj: any) {
|
|
|
|
for (const k of Object.keys(innerProp)) {
|
|
|
|
// When serializing an object, we omit any keys with undefined values. This matches JSON semantics.
|
|
|
|
const v = serializePropertiesSync(innerProp[k]);
|
|
|
|
if (v !== undefined) {
|
|
|
|
obj[k] = v;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return obj;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-07 03:41:49 +01:00
|
|
|
function deserializeResponse(tok: string, resp: any): any {
|
2019-10-15 07:08:06 +02:00
|
|
|
const failures: any = resp.getFailuresList();
|
|
|
|
if (failures && failures.length) {
|
|
|
|
let reasons = "";
|
|
|
|
for (let i = 0; i < failures.length; i++) {
|
|
|
|
if (reasons !== "") {
|
|
|
|
reasons += "; ";
|
|
|
|
}
|
|
|
|
|
|
|
|
reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
|
|
|
|
}
|
|
|
|
|
|
|
|
throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
|
|
|
|
}
|
|
|
|
|
2019-11-07 03:41:49 +01:00
|
|
|
const ret = resp.getReturn();
|
|
|
|
return ret === undefined
|
|
|
|
? ret
|
|
|
|
: deserializeProperties(ret);
|
2019-10-15 07:08:06 +02:00
|
|
|
}
|