pulumi/sdk/nodejs/runtime/asyncIterableUtil.ts
Alex Clemmer a40008db41 StreamInvoke should return AsyncIterable that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.

Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.

`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:

    call.on("data", (thing: any) => {... do thing ...});

Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.

Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.

Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.

But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.

This commit removes this dependency for a better monster: the one we
know.

It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-12 13:51:19 -08:00

84 lines
3 KiB
TypeScript

// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { AsyncIterable } from "@pulumi/query/interfaces";
type CloseValue = "7473659d-924c-414d-84e5-b1640b2a6296";
const closeValue: CloseValue = "7473659d-924c-414d-84e5-b1640b2a6296";
// PushableAsyncIterable is an `AsyncIterable` that data can be pushed to. It is useful for turning
// push-based callback APIs into pull-based `AsyncIterable` APIs. For example, a user can write:
//
// const queue = new PushableAsyncIterable();
// call.on("data", (thing: any) => queue.push(live));
//
// And then later consume `queue` as any other `AsyncIterable`:
//
// for await (const l of list) {
// console.log(l.metadata.name);
// }
//
// Note that this class implements `AsyncIterable<T | undefined>`. This is for a fundamental reason:
// the user can call `complete` at any time. `AsyncIteratable` would normally know when an element
// is the last, but in this case it can't. Or, another way to look at it is, the last element is
// guaranteed to be `undefined`.
export class PushableAsyncIterable<T> implements AsyncIterable<T | undefined> {
private bufferedData: T[] = [];
private nextQueue: ((payload: T | CloseValue) => void)[] = [];
private completed = false;
push(payload: T) {
if (this.nextQueue.length === 0) {
this.bufferedData.push(payload);
} else {
const resolve = this.nextQueue.shift()!;
resolve(payload);
}
}
complete() {
this.completed = true;
if (this.nextQueue.length > 0) {
const resolve = this.nextQueue.shift()!;
resolve(closeValue);
}
}
private shift(): Promise<T | CloseValue> {
return new Promise(resolve => {
if (this.bufferedData.length === 0) {
if (this.completed === true) {
resolve(closeValue);
}
this.nextQueue.push(resolve);
} else {
resolve(this.bufferedData.shift());
}
});
}
[Symbol.asyncIterator]() {
const t = this;
return {
async next(): Promise<{ done: boolean; value: T | undefined; }> {
const value = await t.shift();
if (value === closeValue) {
return { value: undefined, done: true };
}
return { value, done: false };
},
};
}
}