[sdk/nodejs] Support for calling methods (#7377)
This commit is contained in:
parent
bf886cd53a
commit
c1f3e1c84b
|
@ -1,6 +1,7 @@
|
|||
|
||||
### Improvements
|
||||
|
||||
|
||||
- [sdk/nodejs] - Support for calling resource methods
|
||||
[#7377](https://github.com/pulumi/pulumi/pull/7377)
|
||||
### Bug Fixes
|
||||
|
||||
|
|
|
@ -181,6 +181,22 @@ export abstract class Resource {
|
|||
// tslint:disable-next-line:variable-name
|
||||
private readonly __providers: Record<string, ProviderResource>;
|
||||
|
||||
/**
|
||||
* The specified provider or provider determined from the parent for custom resources.
|
||||
* @internal
|
||||
*/
|
||||
// Note: This is deliberately not named `__provider` as that conflicts with the property
|
||||
// used by the `dynamic.Resource` class.
|
||||
// tslint:disable-next-line:variable-name
|
||||
readonly __prov?: ProviderResource;
|
||||
|
||||
/**
|
||||
* The specified provider version.
|
||||
* @internal
|
||||
*/
|
||||
// tslint:disable-next-line:variable-name
|
||||
readonly __version?: string;
|
||||
|
||||
public static isInstance(obj: any): obj is Resource {
|
||||
return utils.isInstance<Resource>(obj, "__pulumiResource");
|
||||
}
|
||||
|
@ -313,6 +329,8 @@ export abstract class Resource {
|
|||
}
|
||||
|
||||
this.__protect = !!opts.protect;
|
||||
this.__prov = custom ? opts.provider : undefined;
|
||||
this.__version = opts.version;
|
||||
|
||||
// Collapse any `Alias`es down to URNs. We have to wait until this point to do so because we do not know the
|
||||
// default `name` and `type` to apply until we are inside the resource constructor.
|
||||
|
|
|
@ -23,7 +23,7 @@ import { InvokeOptions } from "../invoke";
|
|||
import * as log from "../log";
|
||||
import { Inputs, Output } from "../output";
|
||||
import { debuggablePromise } from "./debuggable";
|
||||
import { deserializeProperties, serializeProperties } from "./rpc";
|
||||
import { deserializeProperties, isRpcSecret, serializeProperties, serializePropertiesReturnDeps, unwrapRpcSecret } from "./rpc";
|
||||
import {
|
||||
excessiveDebugOutput,
|
||||
getMonitor,
|
||||
|
@ -31,7 +31,7 @@ import {
|
|||
terminateRpcs,
|
||||
} from "./settings";
|
||||
|
||||
import { ProviderResource, Resource } from "../resource";
|
||||
import { DependencyResource, ProviderResource, Resource } from "../resource";
|
||||
import * as utils from "../utils";
|
||||
import { PushableAsyncIterable } from "./asyncIterableUtil";
|
||||
|
||||
|
@ -101,27 +101,27 @@ export async function streamInvoke(
|
|||
const req = createInvokeRequest(tok, serialized, provider, opts);
|
||||
|
||||
// Call `streamInvoke`.
|
||||
const call = monitor.streamInvoke(req, {});
|
||||
const result = monitor.streamInvoke(req, {});
|
||||
|
||||
const queue = new PushableAsyncIterable();
|
||||
call.on("data", function(thing: any) {
|
||||
result.on("data", function(thing: any) {
|
||||
const live = deserializeResponse(tok, thing);
|
||||
queue.push(live);
|
||||
});
|
||||
call.on("error", (err: any) => {
|
||||
result.on("error", (err: any) => {
|
||||
if (err.code === 1) {
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
call.on("end", () => {
|
||||
result.on("end", () => {
|
||||
queue.complete();
|
||||
});
|
||||
|
||||
// Return a cancellable handle to the stream.
|
||||
return new StreamInvokeResponse(
|
||||
queue,
|
||||
() => call.cancel());
|
||||
() => result.cancel());
|
||||
} finally {
|
||||
done();
|
||||
}
|
||||
|
@ -232,3 +232,182 @@ function deserializeResponse(tok: string, resp: any): any {
|
|||
? ret
|
||||
: deserializeProperties(ret);
|
||||
}
|
||||
|
||||
/**
|
||||
* `call` dynamically calls the function, `tok`, which is offered by a provider plugin.
|
||||
*/
|
||||
export function call<T>(tok: string, props: Inputs, res?: Resource): Output<T> {
|
||||
const label = `Calling function: tok=${tok}`;
|
||||
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
|
||||
|
||||
const [out, resolver] = createOutput<T>(`call(${tok})`);
|
||||
|
||||
debuggablePromise(Promise.resolve().then(async () => {
|
||||
const done = rpcKeepAlive();
|
||||
try {
|
||||
// Construct a provider reference from the given provider, if one is available on the resource.
|
||||
let provider: string | undefined = undefined;
|
||||
let version: string | undefined = undefined;
|
||||
if (res) {
|
||||
if (res.__prov) {
|
||||
provider = await ProviderResource.register(res.__prov);
|
||||
}
|
||||
version = res.__version;
|
||||
}
|
||||
|
||||
const [serialized, propertyDepsResources] = await serializePropertiesReturnDeps(`call:${tok}`, props);
|
||||
log.debug(`Call RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``);
|
||||
|
||||
const req = await createCallRequest(tok, serialized, propertyDepsResources, provider, version);
|
||||
|
||||
const monitor: any = getMonitor();
|
||||
const resp: any = await debuggablePromise(new Promise((innerResolve, innerReject) =>
|
||||
monitor.call(req, (err: grpc.ServiceError, innerResponse: any) => {
|
||||
log.debug(`Call RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
|
||||
if (err) {
|
||||
// If the monitor is unavailable, it is in the process of shutting down or has already
|
||||
// shut down. Don't emit an error and don't do any more RPCs, just exit.
|
||||
if (err.code === grpc.status.UNAVAILABLE || err.code === grpc.status.CANCELLED) {
|
||||
terminateRpcs();
|
||||
err.message = "Resource monitor is terminating";
|
||||
innerReject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
// If the RPC failed, rethrow the error with a native exception and the message that
|
||||
// the engine provided - it's suitable for user presentation.
|
||||
innerReject(new Error(err.details));
|
||||
}
|
||||
else {
|
||||
innerResolve(innerResponse);
|
||||
}
|
||||
})), label);
|
||||
|
||||
// Deserialize the response and resolve the output.
|
||||
const deserialized = deserializeResponse(tok, resp);
|
||||
let isSecret = false;
|
||||
const deps: Resource[] = [];
|
||||
|
||||
// Keep track of whether we need to mark the resulting output a secret.
|
||||
// and unwrap each individual value.
|
||||
for (const k of Object.keys(deserialized)) {
|
||||
const v = deserialized[k];
|
||||
if (isRpcSecret(v)) {
|
||||
isSecret = true;
|
||||
deserialized[k] = unwrapRpcSecret(v);
|
||||
}
|
||||
}
|
||||
|
||||
// Combine the individual dependencies into a single set of dependency resources.
|
||||
const rpcDeps = resp.getReturndependenciesMap();
|
||||
if (rpcDeps) {
|
||||
const urns = new Set<string>();
|
||||
for (const [k, returnDeps] of rpcDeps.entries()) {
|
||||
for (const urn of returnDeps.getUrnsList()) {
|
||||
urns.add(urn);
|
||||
}
|
||||
}
|
||||
for (const urn of urns) {
|
||||
deps.push(new DependencyResource(urn));
|
||||
}
|
||||
}
|
||||
|
||||
// If the value the engine handed back is or contains an unknown value, the resolver will mark its value as
|
||||
// unknown automatically, so we just pass true for isKnown here. Note that unknown values will only be
|
||||
// present during previews (i.e. isDryRun() will be true).
|
||||
resolver(deserialized, true, isSecret, deps);
|
||||
}
|
||||
catch (e) {
|
||||
resolver(<any>undefined, true, false, undefined, e);
|
||||
}
|
||||
finally {
|
||||
done();
|
||||
}
|
||||
}), label);
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
function createOutput<T>(label: string):
|
||||
[Output<T>, (v: T, isKnown: boolean, isSecret: boolean, deps?: Resource[], err?: Error | undefined) => void] {
|
||||
let resolveValue: (v: T) => void;
|
||||
let rejectValue: (err: Error) => void;
|
||||
let resolveIsKnown: (v: boolean) => void;
|
||||
let rejectIsKnown: (err: Error) => void;
|
||||
let resolveIsSecret: (v: boolean) => void;
|
||||
let rejectIsSecret: (err: Error) => void;
|
||||
let resolveDeps: (v: Resource[]) => void;
|
||||
let rejectDeps: (err: Error) => void;
|
||||
|
||||
const resolver = (v: T, isKnown: boolean, isSecret: boolean, deps: Resource[] = [], err?: Error) => {
|
||||
if (!!err) {
|
||||
rejectValue(err);
|
||||
rejectIsKnown(err);
|
||||
rejectIsSecret(err);
|
||||
rejectDeps(err);
|
||||
} else {
|
||||
resolveValue(v);
|
||||
resolveIsKnown(isKnown);
|
||||
resolveIsSecret(isSecret);
|
||||
resolveDeps(deps);
|
||||
}
|
||||
};
|
||||
|
||||
const out = new Output(
|
||||
[],
|
||||
debuggablePromise(
|
||||
new Promise<T>((resolve, reject) => {
|
||||
resolveValue = resolve;
|
||||
rejectValue = reject;
|
||||
}),
|
||||
`${label}Value`),
|
||||
debuggablePromise(
|
||||
new Promise<boolean>((resolve, reject) => {
|
||||
resolveIsKnown = resolve;
|
||||
rejectIsKnown = reject;
|
||||
}),
|
||||
`${label}IsKnown`),
|
||||
debuggablePromise(
|
||||
new Promise<boolean>((resolve, reject) => {
|
||||
resolveIsSecret = resolve;
|
||||
rejectIsSecret = reject;
|
||||
}),
|
||||
`${label}IsSecret`),
|
||||
debuggablePromise(
|
||||
new Promise<Resource[]>((resolve, reject) => {
|
||||
resolveDeps = resolve;
|
||||
rejectDeps = reject;
|
||||
}),
|
||||
`${label}Deps`));
|
||||
|
||||
return [out, resolver];
|
||||
}
|
||||
|
||||
async function createCallRequest(tok: string, serialized: Record<string, any>,
|
||||
serializedDeps: Map<string, Set<Resource>>, provider?: string, version?: string) {
|
||||
if (provider !== undefined && typeof provider !== "string") {
|
||||
throw new Error("Incorrect provider type.");
|
||||
}
|
||||
|
||||
const obj = gstruct.Struct.fromJavaScript(serialized);
|
||||
|
||||
const req = new providerproto.CallRequest();
|
||||
req.setTok(tok);
|
||||
req.setArgs(obj);
|
||||
req.setProvider(provider);
|
||||
req.setVersion(version || "");
|
||||
|
||||
const argDependencies = req.getArgdependenciesMap();
|
||||
for (const [key, propertyDeps] of serializedDeps) {
|
||||
const urns = new Set<string>();
|
||||
for (const dep of propertyDeps) {
|
||||
const urn = await dep.urn.promise();
|
||||
urns.add(urn);
|
||||
}
|
||||
const deps = new providerproto.CallRequest.ArgumentDependencies();
|
||||
deps.setUrnsList(Array.from(urns));
|
||||
argDependencies.set(key, deps);
|
||||
}
|
||||
|
||||
return req;
|
||||
}
|
||||
|
|
|
@ -152,6 +152,11 @@ export async function serializeProperties(label: string, props: Inputs) {
|
|||
return result;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export async function serializePropertiesReturnDeps(label: string, props: Inputs) {
|
||||
return serializeFilteredProperties(label, props, _ => true);
|
||||
}
|
||||
|
||||
/**
|
||||
* deserializeProperties fetches the raw outputs and deserializes them from a gRPC call result.
|
||||
*/
|
||||
|
|
3
tests/integration/construct_component_methods/nodejs/.gitignore
vendored
Normal file
3
tests/integration/construct_component_methods/nodejs/.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
/.pulumi/
|
||||
/bin/
|
||||
/node_modules/
|
|
@ -0,0 +1,3 @@
|
|||
name: construct_component_methods_nodejs
|
||||
description: A program that constructs remote component resources with methods.
|
||||
runtime: nodejs
|
|
@ -0,0 +1,31 @@
|
|||
// Copyright 2016-2021, Pulumi Corporation. All rights reserved.
|
||||
|
||||
import * as pulumi from "@pulumi/pulumi";
|
||||
|
||||
interface ComponentArgs {
|
||||
first: pulumi.Input<string>;
|
||||
second: pulumi.Input<string>;
|
||||
}
|
||||
|
||||
export class Component extends pulumi.ComponentResource {
|
||||
constructor(name: string, args: ComponentArgs, opts?: pulumi.ComponentResourceOptions) {
|
||||
super("testcomponent:index:Component", name, args, opts, true);
|
||||
}
|
||||
|
||||
getMessage(args: Component.GetMessageArgs): pulumi.Output<Component.GetMessageResult> {
|
||||
return pulumi.runtime.call("testcomponent:index:Component/getMessage", {
|
||||
"__self__": this,
|
||||
"name": args.name,
|
||||
}, this);
|
||||
}
|
||||
}
|
||||
|
||||
export namespace Component {
|
||||
export interface GetMessageArgs {
|
||||
name: pulumi.Input<string>;
|
||||
}
|
||||
|
||||
export interface GetMessageResult {
|
||||
message: string;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
// Copyright 2016-2021, Pulumi Corporation. All rights reserved.
|
||||
|
||||
import { Component } from "./component";
|
||||
|
||||
const component = new Component("component", {
|
||||
first: "Hello",
|
||||
second: "World",
|
||||
});
|
||||
|
||||
const result = component.getMessage({ name: "Alice" });
|
||||
|
||||
export const message = result.message;
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"name": "steps",
|
||||
"license": "Apache-2.0",
|
||||
"devDependencies": {
|
||||
"typescript": "^3.0.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@pulumi/pulumi": "latest"
|
||||
}
|
||||
}
|
|
@ -1000,6 +1000,32 @@ func TestConstructUnknownNode(t *testing.T) {
|
|||
testConstructUnknown(t, "nodejs", "@pulumi/pulumi")
|
||||
}
|
||||
|
||||
// Test methods on remote components.
|
||||
func TestConstructMethodsNode(t *testing.T) {
|
||||
tests := []struct {
|
||||
componentDir string
|
||||
}{
|
||||
{
|
||||
componentDir: "testcomponent",
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.componentDir, func(t *testing.T) {
|
||||
pathEnv := pathEnv(t, filepath.Join("construct_component_methods", test.componentDir))
|
||||
integration.ProgramTest(t, &integration.ProgramTestOptions{
|
||||
Env: []string{pathEnv},
|
||||
Dir: filepath.Join("construct_component_methods", "nodejs"),
|
||||
Dependencies: []string{"@pulumi/pulumi"},
|
||||
Quick: true,
|
||||
NoParallel: true, // avoid contention for Dir
|
||||
ExtraRuntimeValidation: func(t *testing.T, stackInfo integration.RuntimeValidationStackInfo) {
|
||||
assert.Equal(t, "Hello World, Alice!", stackInfo.Outputs["message"])
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetResourceNode(t *testing.T) {
|
||||
integration.ProgramTest(t, &integration.ProgramTestOptions{
|
||||
Dir: filepath.Join("get_resource", "nodejs"),
|
||||
|
|
Loading…
Reference in a new issue