Change createPromiseFromStreams util to use stream.pipeline (#27246)

* Convert createPromiseFromStreams util to use stream pipeline

* Add back 50ms timeout to duplex stream test

* Add friendly error message when a single non readable stream is given

* Throw error instead of returning a rejected promise
This commit is contained in:
Mike Côté 2018-12-15 00:05:08 -05:00 committed by GitHub
parent 6995ea9513
commit e157ea550c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 54 deletions

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { Writable, Duplex } from 'stream';
import { Readable, Writable, Duplex, Transform } from 'stream';
import expect from 'expect.js';
@ -83,20 +83,25 @@ describe('promiseFromStreams', () => {
describe('last stream is duplex', () => {
it('waits for writing and resolves to final value', async () => {
let written = '';
const duplexReadQueue = [];
const duplexItemsToPush = ['foo', 'bar', null];
const result = await createPromiseFromStreams([
createListStream(['a', 'b', 'c']),
new Duplex({
read() {
this.push('foo');
this.push('bar');
this.push(null);
async read() {
const result = await duplexReadQueue.shift();
this.push(result);
},
write(chunk, enc, cb) {
setTimeout(() => {
written += chunk;
cb();
}, 50);
duplexReadQueue.push(new Promise((resolve) => {
setTimeout(() => {
written += chunk;
cb();
resolve(duplexItemsToPush.shift());
}, 50);
}));
}
}).setEncoding('utf8')
]);
@ -105,4 +110,36 @@ describe('promiseFromStreams', () => {
expect(result).to.be('bar');
});
});
describe('error handling', () => {
it('read stream gets destroyed when transform stream fails', async () => {
let destroyCalled = false;
const readStream = new Readable({
read() {
this.push('a');
this.push('b');
this.push('c');
this.push(null);
},
destroy() {
destroyCalled = true;
}
});
const transformStream = new Transform({
transform(chunk, enc, done) {
done(new Error('Test error'));
}
});
try {
await createPromiseFromStreams([
readStream,
transformStream,
]);
throw new Error('Should fail');
} catch (e) {
expect(e.message).to.be('Test error');
expect(destroyCalled).to.be(true);
}
});
});
});

View file

@ -33,54 +33,32 @@
* @param {Array<Stream>} streams
* @return {Promise<any>}
*/
import { pipeline, Writable } from 'stream';
export async function createPromiseFromStreams(streams) {
let finalChunk;
const last = streams[streams.length - 1];
// reject if any of the streams emits an error
const anyStreamFailure = new Promise((resolve, reject) => {
streams.forEach((stream, i) => {
if (i > 0) streams[i - 1].pipe(stream);
stream.on('error', reject);
return stream;
});
});
// resolve when the last stream has finished writing, or
// immediately if the last stream is not writable
const lastFinishedWriting = new Promise(resolve => {
if (typeof last.write !== 'function') {
resolve();
return;
}
last.on('finish', resolve);
});
// resolve with the final value provided by the last stream
// after the last stream has provided it, or immediately if the
// stream is not readable
const lastFinishedReading = new Promise(resolve => {
if (typeof last.read !== 'function') {
resolve();
return;
}
let finalChunk;
last.on('data', (chunk) => {
finalChunk = chunk;
});
last.on('end', () => {
if (typeof last.read !== 'function' && streams.length === 1) {
// For a nicer error than what stream.pipeline throws
throw new Error('A minimum of 2 streams is required when a non-readable stream is given');
}
if (typeof last.read === 'function') {
// We are pushing a writable stream to capture the last chunk
streams.push(new Writable({
// Use object mode even when "last" stream isn't. This allows to
// capture the last chunk as-is.
objectMode: true,
write(chunk, enc, done) {
finalChunk = chunk;
done();
}
}));
}
return new Promise((resolve, reject) => {
pipeline(...streams, (err) => {
if (err) return reject(err);
resolve(finalChunk);
});
});
// wait (and rethrow) the first error, or for the last stream
// to both finish writing and providing values to read
await Promise.race([
anyStreamFailure,
Promise.all([lastFinishedWriting, lastFinishedReading])
]);
// return the final chunk read from the last stream
return await lastFinishedReading;
}