Switch away from native grpc impl. (#3728)

This commit is contained in:
CyrusNajmabadi 2020-01-15 16:42:42 -08:00 committed by GitHub
parent 84bd59ed5e
commit 9f65591139
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 88 additions and 139 deletions

View file

@ -3,6 +3,8 @@ CHANGELOG
## HEAD (Unreleased)
- @pulumi/pulumi Now requires Nodejs version 8.13.0 and upwards or 10.10.0 and upwards.
- All data-source invocations are now asynchronous (Promise-returning) by default.
- Lock dep ts-node to v8.5.4 [#3733](https://github.com/pulumi/pulumi/pull/3733)

View file

@ -211,7 +211,7 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string) (*plug
// The server is unavailable. This is the Linux bug. Wait a little and retry.
time.Sleep(time.Millisecond * 10)
continue // keep retrying
case codes.Unimplemented, codes.ResourceExhausted:
case codes.Unimplemented, codes.ResourceExhausted, codes.Internal:
// Since we sent "" as the method above, this is the expected response. Ready to go.
break outer
}

View file

@ -15,13 +15,14 @@
import * as minimist from "minimist";
import * as path from "path";
import * as grpc from "@grpc/grpc-js";
import * as dynamic from "../../dynamic";
import * as resource from "../../resource";
import * as runtime from "../../runtime";
import { version } from "../../version";
const requireFromString = require("require-from-string");
const grpc = require("grpc");
const anyproto = require("google-protobuf/google/protobuf/any_pb.js");
const emptyproto = require("google-protobuf/google/protobuf/empty_pb.js");
const structproto = require("google-protobuf/google/protobuf/struct_pb.js");
@ -32,6 +33,27 @@ const statusproto = require("../../proto/status_pb.js");
const providerKey: string = "__provider";
// We track all uncaught errors here. If we have any, we will make sure we always have a non-0 exit
// code.
const uncaughtErrors = new Set<Error>();
const uncaughtHandler = (err: Error) => {
if (!uncaughtErrors.has(err)) {
uncaughtErrors.add(err);
console.error(err.stack || err.message || ("" + err));
}
};
process.on("uncaughtException", uncaughtHandler);
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so just
// suppress the TS strictness here.
process.on("unhandledRejection", uncaughtHandler);
process.on("exit", (code: number) => {
// If there were any uncaught errors at all, we always want to exit with an error code.
if (code === 0 && uncaughtErrors.size > 0) {
process.exitCode = 1;
}
});
function getProvider(props: any): dynamic.ResourceProvider {
// TODO[pulumi/pulumi#414]: investigate replacing requireFromString with eval
return requireFromString(props[providerKey]).handler();
@ -187,7 +209,8 @@ async function createRPC(call: any, callback: any): Promise<void> {
callback(undefined, resp);
} catch (e) {
return callback(grpcResponseFromError(e));
const response = grpcResponseFromError(e);
return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
}
}
@ -238,7 +261,8 @@ async function updateRPC(call: any, callback: any): Promise<void> {
callback(undefined, resp);
} catch (e) {
return callback(grpcResponseFromError(e));
const response = grpcResponseFromError(e);
return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
}
}
@ -274,7 +298,7 @@ function resultIncludingProvider(result: any, props: any): any {
// rejected the resource, or an initialization error, where the API server has accepted the
// resource, but it failed to initialize (e.g., the app code is continually crashing and the
// resource has failed to become alive).
function grpcResponseFromError(e: {id: string, properties: any, message: string, reasons?: string[]}): any {
function grpcResponseFromError(e: {id: string, properties: any, message: string, reasons?: string[]}) {
// Create response object.
const resp = new statusproto.Status();
resp.setCode(grpc.status.UNKNOWN);
@ -308,7 +332,7 @@ function grpcResponseFromError(e: {id: string, properties: any, message: string,
};
}
export function main(args: string[]): void {
export async function main(args: string[]) {
// The program requires a single argument: the address of the RPC endpoint for the engine. It
// optionally also takes a second argument, a reference back to the engine, but this may be missing.
if (args.length === 0) {
@ -335,8 +359,15 @@ export function main(args: string[]): void {
delete: deleteRPC,
getPluginInfo: getPluginInfoRPC,
});
const port: number = server.bind(`0.0.0.0:0`, grpc.ServerCredentials.createInsecure());
const port: number = await new Promise<number>((resolve, reject) => {
server.bindAsync(`0.0.0.0:0`, grpc.ServerCredentials.createInsecure(), (err, p) => {
if (err) {
reject(err);
} else {
resolve(p);
}
});
});
server.start();
// Emit the address so the monitor can read it to connect. The gRPC server will keep the message loop alive.

View file

@ -10,9 +10,9 @@
},
"dependencies": {
"@pulumi/query": "^0.3.0",
"@grpc/grpc-js": "^0.6.15",
"deasync": "^0.1.15",
"google-protobuf": "^3.5.0",
"grpc": "1.24.2",
"minimist": "^1.2.0",
"normalize-package-data": "^2.4.0",
"protobufjs": "^6.8.6",
@ -34,13 +34,12 @@
"@types/semver": "^5.5.0",
"istanbul": "^0.4.5",
"mocha": "^3.5.0",
"node-gyp": "^3.6.2",
"tslint": "^5.11.0"
},
"pulumi": {
"comment": "Do not remove. Marks this as as a deployment-time-only package"
},
"engines": {
"node": ">=8.0.0"
"node": ">=8.13.0 || >=10.10.0"
}
}

View file

@ -16,7 +16,7 @@
// limitations under the License.
//
'use strict';
var grpc = require('grpc');
var grpc = require('@grpc/grpc-js');
var analyzer_pb = require('./analyzer_pb.js');
var plugin_pb = require('./plugin_pb.js');
var google_protobuf_empty_pb = require('google-protobuf/google/protobuf/empty_pb.js');

View file

@ -16,7 +16,7 @@
// limitations under the License.
//
'use strict';
var grpc = require('grpc');
var grpc = require('@grpc/grpc-js');
var engine_pb = require('./engine_pb.js');
var google_protobuf_empty_pb = require('google-protobuf/google/protobuf/empty_pb.js');

View file

@ -16,7 +16,7 @@
// limitations under the License.
//
'use strict';
var grpc = require('grpc');
var grpc = require('@grpc/grpc-js');
var language_pb = require('./language_pb.js');
var plugin_pb = require('./plugin_pb.js');
var google_protobuf_empty_pb = require('google-protobuf/google/protobuf/empty_pb.js');

View file

@ -16,7 +16,7 @@
// limitations under the License.
//
'use strict';
var grpc = require('grpc');
var grpc = require('@grpc/grpc-js');
var provider_pb = require('./provider_pb.js');
var plugin_pb = require('./plugin_pb.js');
var google_protobuf_empty_pb = require('google-protobuf/google/protobuf/empty_pb.js');

View file

@ -16,7 +16,7 @@
// limitations under the License.
//
'use strict';
var grpc = require('grpc');
var grpc = require('@grpc/grpc-js');
var resource_pb = require('./resource_pb.js');
var google_protobuf_empty_pb = require('google-protobuf/google/protobuf/empty_pb.js');
var google_protobuf_struct_pb = require('google-protobuf/google/protobuf/struct_pb.js');

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import * as grpc from "@grpc/grpc-js";
import * as fs from "fs";
import * as grpc from "grpc";
import { AsyncIterable } from "@pulumi/query/interfaces";

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import * as grpc from "@grpc/grpc-js";
import * as query from "@pulumi/query";
import * as grpc from "grpc";
import * as log from "../log";
import * as utils from "../utils";

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import * as grpc from "@grpc/grpc-js";
import * as fs from "fs";
import * as grpc from "grpc";
import * as path from "path";
import { RunError } from "../errors";
import * as log from "../log";

View file

@ -1,59 +0,0 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// tslint:disable
import * as assert from "assert";
import { asyncTest } from "./util";
import { promiseResult } from "../utils";
describe("deasync", () => {
it("handles simple promise", () => {
const actual = 4;
const promise = new Promise<number>((resolve) => {
resolve(actual);
});
const result = promiseResult(promise);
assert.equal(result, actual);
});
it("handles rejected promise", () => {
const message = "etc";
const promise = new Promise<number>((resolve, reject) => {
reject(new Error(message));
});
try {
const result = promiseResult(promise);
assert.fail("Should not be able to reach here 1.")
}
catch (err) {
assert.equal(err.message, message);
return;
}
assert.fail("Should not be able to reach here 2.")
});
it("handles pumping", () => {
const actual = 4;
const promise = new Promise<number>((resolve) => {
setTimeout(resolve, 500 /*ms*/, actual);
});
const result = promiseResult(promise);
assert.equal(result, actual);
});
});

View file

@ -20,21 +20,7 @@ let args = {
urn: "some-urn",
};
if (semver.lt(process.version, "12.11.0")) {
// These tests hang on runtimes later than 12.10.x due to their use of deasync.
let result1 = pulumi.runtime.invoke("test:index:echo", args, { provider, async: false });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { provider, async: false });
result2.then((v) => {
assert.deepEqual(v, args);
});
}
let result3 = pulumi.runtime.invoke("test:index:echo", args, { provider });
let result3 = pulumi.runtime.invoke("test:index:echo", args, { provider, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});

View file

@ -27,21 +27,7 @@ let args = {
urn: "some-urn",
};
if (semver.lt(process.version, "12.11.0")) {
// These tests hang on runtimes later than 12.10.x due to their use of deasync.
let result1 = pulumi.runtime.invoke("test:index:echo", args, { parent, async: false });
for (const key in args) {
assert.deepEqual(result1[key], args[key]);
}
let result2 = pulumi.runtime.invoke("test:index:echo", args, { parent, async: false });
result2.then((v) => {
assert.deepEqual(v, args);
});
}
let result3 = pulumi.runtime.invoke("test:index:echo", args, { parent });
let result3 = pulumi.runtime.invoke("test:index:echo", args, { parent, async: true });
result3.then((v) => {
assert.deepEqual(v, args);
});

View file

@ -19,11 +19,12 @@ import * as path from "path";
import { ID, runtime, URN } from "../../../index";
import { asyncTest } from "../../util";
import * as grpc from "@grpc/grpc-js";
const enginerpc = require("../../../proto/engine_grpc_pb.js");
const engineproto = require("../../../proto/engine_pb.js");
const gempty = require("google-protobuf/google/protobuf/empty_pb.js");
const gstruct = require("google-protobuf/google/protobuf/struct_pb.js");
const grpc = require("grpc");
const langrpc = require("../../../proto/language_grpc_pb.js");
const langproto = require("../../../proto/language_pb.js");
const resrpc = require("../../../proto/resource_grpc_pb.js");
@ -1154,7 +1155,7 @@ describe("rpc", () => {
let rootResource: string | undefined;
let regCnt = 0;
let logCnt = 0;
const monitor = createMockEngine(opts,
const monitor = await createMockEngineAsync(opts,
// Invoke callback
(call: any, callback: any) => {
const resp = new providerproto.InvokeResponse();
@ -1331,20 +1332,12 @@ describe("rpc", () => {
}
// Finally, tear down everything so each test case starts anew.
await new Promise<void>((resolve, reject) => {
langHost.proc.kill();
langHost.proc.on("close", () => { resolve(); });
});
await new Promise<void>((resolve, reject) => {
monitor.server.tryShutdown((err: Error) => {
if (err) {
reject(err);
}
else {
resolve();
}
});
});
monitor.server.forceShutdown();
}
}));
}
@ -1435,7 +1428,7 @@ function mockRun(langHostClient: any, monitor: string, opts: RunCase, dryrun: bo
// Despite the name, the "engine" RPC endpoint is only a logging endpoint. createMockEngine fires up a fake
// logging server so tests can assert that certain things get logged.
function createMockEngine(
async function createMockEngineAsync(
opts: RunCase,
invokeCallback: (call: any, request: any) => any,
readResourceCallback: (call: any, request: any) => any,
@ -1444,7 +1437,7 @@ function createMockEngine(
logCallback: (call: any, request: any) => any,
getRootResourceCallback: (call: any, request: any) => any,
setRootResourceCallback: (call: any, request: any) => any,
supportsFeatureCallback: (call: any, request: any) => any): { server: any, addr: string } {
supportsFeatureCallback: (call: any, request: any) => any) {
// The resource monitor is hosted in the current process so it can record state, etc.
const server = new grpc.Server();
server.addService(resrpc.ResourceMonitorService, {
@ -1456,7 +1449,7 @@ function createMockEngine(
registerResourceOutputs: registerResourceOutputsCallback,
});
let engineImpl: Object = {
let engineImpl: grpc.UntypedServiceImplementation = {
log: logCallback,
};
@ -1469,8 +1462,19 @@ function createMockEngine(
}
server.addService(enginerpc.EngineService, engineImpl);
const port = server.bind("0.0.0.0:0", grpc.ServerCredentials.createInsecure());
const port = await new Promise<number>((resolve, reject) => {
server.bindAsync("0.0.0.0:0", grpc.ServerCredentials.createInsecure(), (err, p) => {
if (err) {
reject(err);
} else {
resolve(p);
}
});
});
server.start();
return { server: server, addr: `0.0.0.0:${port}` };
}

View file

@ -5990,22 +5990,22 @@ return function () { typescript.parseCommandLine([""]); };
});
}
{
cases.push({
title: "Fail to capture non-deployment module due to native code",
func: function () { console.log(pulumi); },
error: `Error serializing function 'func': tsClosureCases.js(0,0)
// {
// cases.push({
// title: "Fail to capture non-deployment module due to native code",
// func: function () { console.log(pulumi); },
// error: `Error serializing function 'func': tsClosureCases.js(0,0)
function 'func':(...)
module './bin/index.js' which indirectly referenced
function 'debug':(...)
(...)
Function code:
function (...)() { [native code] }
// function 'func':(...)
// module './bin/index.js' which indirectly referenced
// function 'debug':(...)
// (...)
// Function code:
// function (...)() { [native code] }
Module './bin/index.js' is a 'deployment only' module. In general these cannot be captured inside a 'run time' function.`
});
}
// Module './bin/index.js' is a 'deployment only' module. In general these cannot be captured inside a 'run time' function.`
// });
// }
{
// Used just to validate that if we capture a Config object we see these values serialized over.

View file

@ -63,7 +63,6 @@
"cmd/run-policy-pack/run.ts",
"tests/config.spec.ts",
"tests/deasync.spec.ts",
"tests/init.spec.ts",
"tests/iterable.spec.ts",
"tests/options.spec.ts",

View file

@ -55,6 +55,7 @@ $DOCKER_RUN /bin/bash -c 'set -x && JS_PULUMIRPC=/nodejs/proto && \
protoc --js_out=$JS_PROTOFLAGS:$JS_PULUMIRPC --grpc_out=minimum_node_version=6:$JS_PULUMIRPC --plugin=protoc-gen-grpc=/usr/local/bin/grpc_tools_node_protoc_plugin status.proto && \
protoc --js_out=$JS_PROTOFLAGS:$TEMP_DIR --grpc_out=minimum_node_version=6:$TEMP_DIR --plugin=protoc-gen-grpc=/usr/local/bin/grpc_tools_node_protoc_plugin $JS_HACK_PROTOS && \
sed -i "s/^var global = .*;/var proto = { pulumirpc: {} }, global = proto;/" "$TEMP_DIR"/*.js && \
sed -i "s/^var grpc = require(.*);/var grpc = require('\''@grpc\/grpc-js'\'');/" "$TEMP_DIR"/*.js && \
cp "$TEMP_DIR"/*.js "$JS_PULUMIRPC"'
function on_exit() {