Further simplification of resource RPC requests. (#840)

This commit is contained in:
CyrusNajmabadi 2018-01-25 15:26:39 -08:00 committed by GitHub
parent 001d187c90
commit 1df66df250
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 135 deletions

View file

@ -23,13 +23,19 @@ function usage(): void {
console.error(` --parallel=p run up to p resource operations in parallel (default is serial)`);
console.error(` --dry-run true to simulate resource changes, but without making them`);
console.error(` --pwd=pwd change the working directory before running the program`);
console.error(` --monitor=addr the RPC address for a resource monitor to connect to`);
console.error(` --monitor=addr [required] the RPC address for a resource monitor to connect to`);
console.error(` --engine=addr the RPC address for a resource engine to connect to`);
console.error(` --tracing=url a Zipkin-compatible endpoint to send tracing data to`);
console.error(``);
console.error(` and [program] is a JavaScript program to run in Node.js, and [arg]... optional args to it.`);
}
function printErrorUsageAndExit(message: string): never {
console.error(message);
usage();
return process.exit(-1);
}
export function main(args: string[]): void {
// See usage above for the intended usage of this program, including flags and required args.
const config: {[key: string]: string} = {};
@ -38,10 +44,7 @@ export function main(args: string[]): void {
string: [ "project", "stack", "parallel", "pwd", "monitor", "engine", "tracing" ],
unknown: (arg: string) => {
if (arg.indexOf("-") === 0) {
console.error(`fatal: Unrecognized flag ${arg}`);
usage();
process.exit(-1);
return false;
return printErrorUsageAndExit(`error: Unrecognized flag ${arg}`);
}
return true;
},
@ -69,9 +72,8 @@ export function main(args: string[]): void {
if (argv["parallel"]) {
parallel = parseInt(argv["parallel"], 10);
if (isNaN(parallel)) {
console.error(`error: --parallel flag must specify a number: ${argv["parallel"]} is not a number`);
usage();
process.exit(-1);
return printErrorUsageAndExit(
`error: --parallel flag must specify a number: ${argv["parallel"]} is not a number`);
}
}
@ -79,12 +81,13 @@ export function main(args: string[]): void {
const dryRun: boolean = !!(argv["dry-run"]);
// If there is a monitor argument, connect to it.
let monitor: Object | undefined;
const monitorAddr: string | undefined = argv["monitor"];
if (monitorAddr) {
monitor = new resrpc.ResourceMonitorClient(monitorAddr, grpc.credentials.createInsecure());
const monitorAddr = argv["monitor"];
if (!monitorAddr) {
return printErrorUsageAndExit(`error: --monitor=addr must be provided.`);
}
const monitor = new resrpc.ResourceMonitorClient(monitorAddr, grpc.credentials.createInsecure());
// If there is an engine argument, connect to it too.
let engine: Object | undefined;
const engineAddr: string | undefined = argv["engine"];
@ -104,9 +107,7 @@ export function main(args: string[]): void {
// Pluck out the program and arguments.
if (argv._.length === 0) {
console.error("fatal: Missing program to execute");
usage();
process.exit(-1);
return printErrorUsageAndExit("error: Missing program to execute");
}
let program: string = argv._[0];
if (program.indexOf("/") !== 0) {

View file

@ -10,63 +10,50 @@ const gstruct = require("google-protobuf/google/protobuf/struct_pb.js");
const resproto = require("../proto/resource_pb.js");
/**
* invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs can be a bag of
* computed values (Ts or Promise<T>s), and the result is a Promise<any> that resolves when the invoke finishes.
* invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
* can be a bag of computed values (Ts or Promise<T>s), and the result is a Promise<any> that
* resolves when the invoke finishes.
*/
export function invoke(tok: string, props: ComputedValues): Promise<any> {
export async function invoke(tok: string, props: ComputedValues): Promise<any> {
log.debug(`Invoking function: tok=${tok}` +
excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``);
// Pre-allocate an error so we have a clean stack to print even if an asynchronous operation occurs.
const invokeError: Error = new Error(`Invoke of '${tok}' failed`);
// Wait for all values to be available, and then perform the RPC.
const done = rpcKeepAlive();
try {
const obj = gstruct.Struct.fromJavaScript(
await serializeProperties(`invoke:${tok}`, props));
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(obj)}` : ``);
const done: () => void = rpcKeepAlive();
return new Promise<any>(async (resolve, reject) => {
// Wait for all values to be available, and then perform the RPC.
try {
const obj = gstruct.Struct.fromJavaScript(
await serializeProperties(`invoke:${tok}`, props));
log.debug(`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(obj)}` : ``);
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
if (monitor) {
const req = new resproto.InvokeRequest();
req.setTok(tok);
req.setArgs(obj);
const resp: any = await debuggablePromise(new Promise((innerResolve, innerReject) => {
monitor.invoke(req, (err: Error, innerResponse: any) => {
log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
if (err) {
innerReject(err);
}
else {
innerResolve(innerResponse);
}
});
}));
// If there were failures, propagate them.
const failures: any = resp.getFailuresList();
if (failures && failures.length) {
throw new Error(`Invoke of '${tok}' failed: ${failures[0].reason} (${failures[0].property})`);
const req = new resproto.InvokeRequest();
req.setTok(tok);
req.setArgs(obj);
const resp: any = await debuggablePromise(new Promise((innerResolve, innerReject) => {
monitor.invoke(req, (err: Error, innerResponse: any) => {
log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
if (err) {
innerReject(err);
}
else {
innerResolve(innerResponse);
}
});
}));
// Finally propagate any other properties that were given to us as outputs.
resolve(deserializeProperties(resp.getReturn()));
}
else {
// If the monitor doesn't exist, still make sure to resolve all properties to undefined.
log.debug(`Not sending Invoke RPC to monitor -- it doesn't exist: invoke tok=${tok}`);
resolve(undefined);
}
// If there were failures, propagate them.
const failures: any = resp.getFailuresList();
if (failures && failures.length) {
throw new Error(`Invoke of '${tok}' failed: ${failures[0].reason} (${failures[0].property})`);
}
catch (err) {
reject(err);
}
finally {
done();
}
});
// Finally propagate any other properties that were given to us as outputs.
return deserializeProperties(resp.getReturn());
}
finally {
done();
}
}

View file

@ -48,75 +48,61 @@ export function registerResource(res: Resource, t: string, name: string, custom:
const dependsOn = opts.dependsOn || [];
await debuggablePromise(Promise.all(dependsOn.map(d => d.urn)), `dependsOn(${label})`);
// Make sure to assign all of these properties.
let urn: URN | undefined;
let id: ID | undefined;
let propsStruct: any | undefined;
let stable = false;
let stables = new Set<string>();
try {
const obj = gstruct.Struct.fromJavaScript(await serializeProperties(label, props));
log.debug(`RegisterResource RPC prepared: t=${t}, name=${name}` +
(excessiveDebugOutput ? `, obj=${JSON.stringify(obj)}` : ``));
const obj = gstruct.Struct.fromJavaScript(await serializeProperties(label, props));
log.debug(`RegisterResource RPC prepared: t=${t}, name=${name}` +
(excessiveDebugOutput ? `, obj=${JSON.stringify(obj)}` : ``));
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
if (!monitor) {
// If the monitor doesn't exist, still make sure to resolve all properties to undefined.
log.warn(`Not sending RPC to monitor -- it doesn't exist: t=${t}, name=${name}`);
return;
}
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
let parentURN: URN | undefined;
if (opts.parent) {
parentURN = await opts.parent.urn;
}
const req = new resproto.RegisterResourceRequest();
req.setType(t);
req.setName(name);
req.setParent(parentURN);
req.setCustom(custom);
req.setObject(obj);
req.setProtect(opts.protect);
const resp: any = await debuggablePromise(new Promise((resolve, reject) =>
monitor.registerResource(req, (err: Error, innerResponse: any) => {
log.debug(`RegisterResource RPC finished: t=${t}, name=${name}; ` +
`err: ${err}, resp: ${innerResponse}`);
if (err) {
log.error(`Failed to register new resource '${name}' [${t}]: ${err.stack}`);
reject(err);
}
else {
resolve(innerResponse);
}
})), opLabel);
urn = resp.getUrn();
id = resp.getId();
propsStruct = resp.getObject();
stable = resp.getStable();
const stablesList: string[] | undefined = resp.getStablesList();
stables = new Set<string>(stablesList);
let parentURN: URN | undefined;
if (opts.parent) {
parentURN = await opts.parent.urn;
}
finally {
// Always make sure to resolve the URN property, even if it is undefined due to a
// missing monitor.
resolveURN(urn);
// If an ID is present, then it's safe to say it's final, because the resource planner
// wouldn't hand it back to us otherwise (e.g., if the resource was being replaced, it
// would be missing). If it isn't available, ensure the ID gets resolved, just resolve
// it to undefined (indicating it isn't known).
if (resolveID) {
resolveID(id);
}
const req = new resproto.RegisterResourceRequest();
req.setType(t);
req.setName(name);
req.setParent(parentURN);
req.setCustom(custom);
req.setObject(obj);
req.setProtect(opts.protect);
// Propagate any other properties that were given to us as outputs.
resolveProperties(res, resolvers, t, name, props, propsStruct, stable, stables);
const resp: any = await debuggablePromise(new Promise((resolve, reject) =>
monitor.registerResource(req, (err: Error, innerResponse: any) => {
log.debug(`RegisterResource RPC finished: t=${t}, name=${name}; ` +
`err: ${err}, resp: ${innerResponse}`);
if (err) {
log.error(`Failed to register new resource '${name}' [${t}]: ${err.stack}`);
reject(err);
}
else {
resolve(innerResponse);
}
})), opLabel);
const urn = resp.getUrn();
const id = resp.getId();
const propsStruct = resp.getObject();
const stable = resp.getStable();
const stablesList: string[] | undefined = resp.getStablesList();
const stables = new Set<string>(stablesList);
// Always make sure to resolve the URN property, even if it is undefined due to a
// missing monitor.
resolveURN(urn);
// If an ID is present, then it's safe to say it's final, because the resource planner
// wouldn't hand it back to us otherwise (e.g., if the resource was being replaced, it
// would be missing). If it isn't available, ensure the ID gets resolved, just resolve
// it to undefined (indicating it isn't known).
if (resolveID) {
resolveID(id);
}
// Propagate any other properties that were given to us as outputs.
resolveProperties(res, resolvers, t, name, props, propsStruct, stable, stables);
});
}
@ -140,11 +126,6 @@ export function registerResourceOutputs(res: Resource, outputs: ComputedValues)
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
if (!monitor) {
// If the monitor doesn't exist, still make sure to resolve all properties to undefined.
log.warn(`Not sending RPC to monitor -- it doesn't exist: urn=${urn}`);
return;
}
const req = new resproto.RegisterResourceOutputsRequest();
req.setUrn(urn);

View file

@ -15,7 +15,7 @@ export interface Options {
readonly project?: string; // the name of the current project.
readonly stack?: string; // the name of the current stack being deployed into.
readonly engine?: Object; // a live connection to the engine, used for logging, etc.
readonly monitor?: Object; // a live connection to the resource monitor that tracks deployments.
readonly monitor: Object; // a live connection to the resource monitor that tracks deployments.
readonly parallel?: number; // the degree of parallelism for resource operations (default is serial).
readonly dryRun?: boolean; // whether we are performing a preview (true) or a real deployment (false).
readonly includeStackTraces?: boolean; // whether we include full stack traces in resource errors or not.
@ -24,7 +24,7 @@ export interface Options {
/**
* options are the current deployment options being used for this entire session.
*/
export let options: Options = {
export let options: Options = <any>{
dryRun: false,
includeStackTraces: true,
};
@ -44,7 +44,7 @@ export function hasMonitor(): boolean {
/**
* getMonitor returns the current resource monitoring service client for RPC communications.
*/
export function getMonitor(): Object | undefined {
export function getMonitor(): Object {
if (!configured) {
configured = true;
console.warn("warning: Pulumi Fabric monitor is missing; no resources will be created");