Make streamInvoke gracefully-cancellable from SDKs

The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.

This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.

Use it like this:

    // `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
    // updates from the stream, then `cancel` giving the Kubernetes provider to
    // clean up and close gracefully.
    const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
        group: "apps", version: "v1", kind: "Deployment",
        break;
    });
    deployments.cancel();
This commit is contained in:
Alex Clemmer 2019-11-04 11:36:11 -08:00
parent f195cc0d4d
commit 038f920dc3
2 changed files with 66 additions and 43 deletions

View file

@ -28,6 +28,9 @@ CHANGELOG
- Support for lists and maps in config.
[#3342](https://github.com/pulumi/pulumi/pull/3342)
- `ResourceProvider#StreamInvoke` implemented, will be the basis for streaming
APIs in `pulumi query`. [#3424](https://github.com/pulumi/pulumi/pull/3424)
## 1.4.0 (2019-10-24)
- `FileAsset` in the Python SDK now accepts anything implementing `os.PathLike` in addition to `str`.
@ -72,7 +75,7 @@ CHANGELOG
- Support renaming stack projects via `pulumi stack rename`.
[#3292](https://github.com/pulumi/pulumi/pull/3292)
- Add `helm` to `pulumi/pulumi` Dockerhub container
[#3294](https://github.com/pulumi/pulumi/pull/3294)

View file

@ -81,16 +81,59 @@ export function invoke(tok: string, props: Inputs, opts: InvokeOptions = {}): Pr
return invokeSync(tok, props, opts, syncInvokes);
}
export function streamInvoke(
export async function streamInvoke(
tok: string,
props: Inputs,
opts: InvokeOptions = {},
): AsyncIterable<any> {
if (opts.async) {
throw Error("streamInvoke does not support async mode");
}
): Promise<StreamInvokeResponse<any>> {
const label = `StreamInvoking function: tok=${tok} asynchronously`;
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
return streamInvokeAsync(tok, props, opts);
// Wait for all values to be available, and then perform the RPC.
const done = rpcKeepAlive();
try {
const serialized = await serializeProperties(`streamInvoke:${tok}`, props);
log.debug(
`StreamInvoke RPC prepared: tok=${tok}` + excessiveDebugOutput
? `, obj=${JSON.stringify(serialized)}`
: ``,
);
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
const provider = await ProviderResource.register(getProvider(tok, opts));
const req = createInvokeRequest(tok, serialized, provider, opts);
// Call `streamInvoke`.
const call = monitor.streamInvoke(req, {});
// Transform the callback-oriented `streamInvoke` result into a plain-old (potentially
// infinite) `AsyncIterable`.
const listenForWatchEvents = (callback: (obj: any) => void) => {
return new Promise(resolve => {
call.on("data", function(thing: any) {
const live = deserializeResponse(tok, thing);
callback(live);
});
call.on("error", (err: any) => {
if (err.code === 1 && err.details === "Cancelled") {
return;
}
throw err;
});
// Infinite stream, never call `resolve`.
});
};
const stream: AsyncIterable<any> = asyncify.default(listenForWatchEvents);
// Return a cancellable handle to the stream.
return new StreamInvokeResponse(
stream,
() => call.cancel());
} finally {
done();
}
}
export function invokeFallbackToAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
@ -193,44 +236,21 @@ For more details see: https://www.pulumi.com/docs/troubleshooting/#synchronous-c
}
}
async function* streamInvokeAsync(
tok: string,
props: Inputs,
opts: InvokeOptions,
): AsyncIterable<any> {
const label = `StreamInvoking function: tok=${tok} asynchronously`;
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
// StreamInvokeResponse represents a (potentially infinite) streaming response to `streamInvoke`,
// with facilities to gracefully cancel and clean up the stream.
export class StreamInvokeResponse<T> implements AsyncIterable<T> {
constructor(
private source: AsyncIterable<T>,
private cancelSource: () => void,
) {}
// Wait for all values to be available, and then perform the RPC.
const done = rpcKeepAlive();
try {
const serialized = await serializeProperties(`streamInvoke:${tok}`, props);
log.debug(
`StreamInvoke RPC prepared: tok=${tok}` + excessiveDebugOutput
? `, obj=${JSON.stringify(serialized)}`
: ``,
);
// cancel signals the `streamInvoke` should be cancelled and cleaned up gracefully.
public cancel() {
this.cancelSource();
}
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
const provider = await ProviderResource.register(getProvider(tok, opts));
const req = createInvokeRequest(tok, serialized, provider, opts);
const call = monitor.streamInvoke(req, {});
const listenForWatchEvents = (callback: (obj: any) => void) => {
return new Promise(resolve => {
call.on("data", function(thing: any) {
const live = deserializeResponse(tok, thing);
callback(live);
});
// Infinite stream, never call `resolve`.
});
};
yield* asyncify.default(listenForWatchEvents);
} finally {
done();
[Symbol.asyncIterator]() {
return this.source[Symbol.asyncIterator]();
}
}