streams - introduce and adopt listenStream

cc @alexdima
This commit is contained in:
Benjamin Pasero 2021-02-10 14:34:13 +01:00
parent 699cc6abd3
commit 83f4dfdff1
9 changed files with 281 additions and 218 deletions

View file

@ -16,6 +16,13 @@ export interface ReadableStreamEvents<T> {
/**
* The 'data' event is emitted whenever the stream is
* relinquishing ownership of a chunk of data to a consumer.
*
* NOTE: PLEASE UNDERSTAND THAT ADDING A DATA LISTENER CAN
* TURN THE STREAM INTO FLOWING MODE. IT IS THEREFOR THE
* LAST LISTENER THAT SHOULD BE ADDED AND NOT THE FIRST
*
* Use `listenStream` as a helper method to listen to
* stream events in the right order.
*/
on(event: 'data', callback: (data: T) => void): void;
@ -268,7 +275,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
// end with data or error if provided
if (result instanceof Error) {
this.error(result);
} else if (result) {
} else if (typeof result !== 'undefined') {
this.write(result);
}
@ -489,18 +496,74 @@ export function peekReadable<T>(readable: Readable<T>, reducer: IReducer<T>, max
}
/**
* Helper to fully read a T stream into a T.
* Helper to fully read a T stream into a T or consuming
* a stream fully, awaiting all the events without caring
* about the data.
*/
export function consumeStream<T>(stream: ReadableStreamEvents<T>, reducer: IReducer<T>): Promise<T> {
export function consumeStream<T>(stream: ReadableStreamEvents<T>, reducer: IReducer<T>): Promise<T>;
export function consumeStream(stream: ReadableStreamEvents<unknown>): Promise<undefined>;
export function consumeStream<T>(stream: ReadableStreamEvents<T>, reducer?: IReducer<T>): Promise<T | undefined> {
return new Promise((resolve, reject) => {
const chunks: T[] = [];
stream.on('data', data => chunks.push(data));
stream.on('error', error => reject(error));
stream.on('end', () => resolve(reducer(chunks)));
listenStream(stream, {
onData: chunk => {
if (reducer) {
chunks.push(chunk);
}
},
onError: error => {
if (reducer) {
reject(error);
} else {
resolve(undefined);
}
},
onEnd: () => {
if (reducer) {
resolve(reducer(chunks));
} else {
resolve(undefined);
}
}
});
});
}
export interface IStreamListener<T> {
/**
* The 'data' event is emitted whenever the stream is
* relinquishing ownership of a chunk of data to a consumer.
*/
onData(data: T): void;
/**
* Emitted when any error occurs.
*/
onError(err: Error): void;
/**
* The 'end' event is emitted when there is no more data
* to be consumed from the stream. The 'end' event will
* not be emitted unless the data is completely consumed.
*/
onEnd(): void;
}
/**
* Helper to listen to all events of a T stream in proper order.
*/
export function listenStream<T>(stream: ReadableStreamEvents<T>, listener: IStreamListener<T>): void {
stream.on('error', error => listener.onError(error));
stream.on('end', () => listener.onEnd());
// Adding the `data` listener will turn the stream
// into flowing mode. As such it is important to
// add this listener last (DO NOT CHANGE!)
stream.on('data', data => listener.onData(data));
}
/**
* Helper to peek up to `maxChunks` into a stream. The return type signals if
* the stream has ended or not. If not, caller needs to add a `data` listener
@ -509,9 +572,9 @@ export function consumeStream<T>(stream: ReadableStreamEvents<T>, reducer: IRedu
export function peekStream<T>(stream: ReadableStream<T>, maxChunks: number): Promise<ReadableBufferedStream<T>> {
return new Promise((resolve, reject) => {
const streamListeners = new DisposableStore();
const buffer: T[] = [];
// Data Listener
const buffer: T[] = [];
const dataListener = (chunk: T) => {
// Add to buffer
@ -529,23 +592,27 @@ export function peekStream<T>(stream: ReadableStream<T>, maxChunks: number): Pro
}
};
streamListeners.add(toDisposable(() => stream.removeListener('data', dataListener)));
stream.on('data', dataListener);
// Error Listener
const errorListener = (error: Error) => {
return reject(error);
};
streamListeners.add(toDisposable(() => stream.removeListener('error', errorListener)));
stream.on('error', errorListener);
// End Listener
const endListener = () => {
return resolve({ stream, buffer, ended: true });
};
streamListeners.add(toDisposable(() => stream.removeListener('error', errorListener)));
stream.on('error', errorListener);
streamListeners.add(toDisposable(() => stream.removeListener('end', endListener)));
stream.on('end', endListener);
// Important: leave the `data` listener last because
// this can turn the stream into flowing mode and we
// want `error` events to be received as well.
streamListeners.add(toDisposable(() => stream.removeListener('data', dataListener)));
stream.on('data', dataListener);
});
}
@ -585,46 +652,11 @@ export function toReadable<T>(t: T): Readable<T> {
export function transform<Original, Transformed>(stream: ReadableStreamEvents<Original>, transformer: ITransformer<Original, Transformed>, reducer: IReducer<Transformed>): ReadableStream<Transformed> {
const target = newWriteableStream<Transformed>(reducer);
stream.on('data', data => target.write(transformer.data(data)));
stream.on('end', () => target.end());
stream.on('error', error => target.error(transformer.error ? transformer.error(error) : error));
listenStream(stream, {
onData: data => target.write(transformer.data(data)),
onError: error => target.error(transformer.error ? transformer.error(error) : error),
onEnd: () => target.end()
});
return target;
}
export interface IReadableStreamObservable {
/**
* A promise to await the `end` or `error` event
* of a stream.
*/
errorOrEnd: () => Promise<void>;
}
/**
* Helper to observe a stream for certain events through
* a promise based API.
*/
export function observe(stream: ReadableStream<unknown>): IReadableStreamObservable {
// A stream is closed when it ended or errord
// We install this listener right from the
// beginning to catch the events early.
const errorOrEnd = Promise.race([
new Promise<void>(resolve => stream.on('end', () => resolve())),
new Promise<void>(resolve => stream.on('error', () => resolve()))
]);
return {
errorOrEnd(): Promise<void> {
// We need to ensure the stream is flowing so that our
// listeners are getting triggered. It is possible that
// the stream is not flowing because no `data` listener
// was attached yet.
stream.resume();
return errorOrEnd;
}
};
}

View file

@ -4,7 +4,7 @@
*--------------------------------------------------------------------------------------------*/
import * as assert from 'assert';
import { isReadableStream, newWriteableStream, Readable, consumeReadable, peekReadable, consumeStream, ReadableStream, toStream, toReadable, transform, peekStream, isReadableBufferedStream, observe } from 'vs/base/common/stream';
import { isReadableStream, newWriteableStream, Readable, consumeReadable, peekReadable, consumeStream, ReadableStream, toStream, toReadable, transform, peekStream, isReadableBufferedStream, listenStream } from 'vs/base/common/stream';
import { timeout } from 'vs/base/common/async';
suite('Stream', () => {
@ -69,6 +69,7 @@ suite('Stream', () => {
stream.end('Final Bit');
assert.strictEqual(chunks.length, 4);
assert.strictEqual(chunks[3], 'Final Bit');
assert.strictEqual(end, true);
stream.destroy();
@ -76,6 +77,15 @@ suite('Stream', () => {
assert.strictEqual(chunks.length, 4);
});
test('WriteableStream - end with empty string works', async () => {
const reducer = (strings: string[]) => strings.length > 0 ? strings.join() : 'error';
const stream = newWriteableStream<string>(reducer);
stream.end('');
const result = await consumeStream(stream, reducer);
assert.strictEqual(result, '');
});
test('WriteableStream - removeListener', () => {
const stream = newWriteableStream<string>(strings => strings.join());
@ -270,6 +280,56 @@ suite('Stream', () => {
assert.strictEqual(consumed, '1,2,3,4,5');
});
test('consumeStream - without reducer', async () => {
const stream = readableToStream(arrayToReadable(['1', '2', '3', '4', '5']));
const consumed = await consumeStream(stream);
assert.strictEqual(consumed, undefined);
});
test('consumeStream - without reducer and error', async () => {
const stream = newWriteableStream<string>(strings => strings.join());
stream.error(new Error());
const consumed = await consumeStream(stream);
assert.strictEqual(consumed, undefined);
});
test('listenStream', () => {
const stream = newWriteableStream<string>(strings => strings.join());
let error = false;
let end = false;
let data = '';
listenStream(stream, {
onData: d => {
data = d;
},
onError: e => {
error = true;
},
onEnd: () => {
end = true;
}
});
stream.write('Hello');
assert.strictEqual(data, 'Hello');
stream.write('World');
assert.strictEqual(data, 'World');
assert.strictEqual(error, false);
assert.strictEqual(end, false);
stream.error(new Error());
assert.strictEqual(error, true);
stream.end('Final Bit');
assert.strictEqual(end, true);
});
test('peekStream', async () => {
for (let i = 0; i < 5; i++) {
const stream = readableToStream(arrayToReadable(['1', '2', '3', '4', '5']));
@ -335,30 +395,6 @@ suite('Stream', () => {
assert.strictEqual(consumed, '11,22,33,44,55');
});
test('observer', async () => {
const source1 = newWriteableStream<string>(strings => strings.join());
setTimeout(() => source1.error(new Error()));
await observe(source1).errorOrEnd();
const source2 = newWriteableStream<string>(strings => strings.join());
setTimeout(() => source2.end('Hello Test'));
await observe(source2).errorOrEnd();
const source3 = newWriteableStream<string>(strings => strings.join());
setTimeout(() => {
source3.write('Hello Test');
source3.error(new Error());
});
await observe(source3).errorOrEnd();
const source4 = newWriteableStream<string>(strings => strings.join());
setTimeout(() => {
source4.write('Hello Test');
source4.end();
});
await observe(source4).errorOrEnd();
});
test('events are delivered even if a listener is removed during delivery', () => {
const stream = newWriteableStream<string>(strings => strings.join());

View file

@ -38,6 +38,7 @@ import { IUndoRedoService, ResourceEditStackSnapshot } from 'vs/platform/undoRed
import { TextChange } from 'vs/editor/common/model/textChange';
import { Constants } from 'vs/base/common/uint';
import { PieceTreeTextBuffer } from 'vs/editor/common/model/pieceTreeTextBuffer/pieceTreeTextBuffer';
import { listenStream } from 'vs/base/common/stream';
function createTextBufferBuilder() {
return new PieceTreeTextBufferBuilder();
@ -64,33 +65,33 @@ export function createTextBufferFactoryFromStream(stream: ITextStream | VSBuffer
let done = false;
stream.on('data', (chunk: string | VSBuffer) => {
if (validator) {
const error = validator(chunk);
if (error) {
listenStream<string | VSBuffer>(stream, {
onData: chunk => {
if (validator) {
const error = validator(chunk);
if (error) {
done = true;
reject(error);
}
}
if (filter) {
chunk = filter(chunk);
}
builder.acceptChunk((typeof chunk === 'string') ? chunk : chunk.toString());
},
onError: error => {
if (!done) {
done = true;
reject(error);
}
}
if (filter) {
chunk = filter(chunk);
}
builder.acceptChunk((typeof chunk === 'string') ? chunk : chunk.toString());
});
stream.on('error', (error) => {
if (!done) {
done = true;
reject(error);
}
});
stream.on('end', () => {
if (!done) {
done = true;
resolve(builder.finish());
},
onEnd: () => {
if (!done) {
done = true;
resolve(builder.finish());
}
}
});
});

View file

@ -14,7 +14,7 @@ import { TernarySearchTree } from 'vs/base/common/map';
import { isNonEmptyArray, coalesce } from 'vs/base/common/arrays';
import { ILogService } from 'vs/platform/log/common/log';
import { VSBuffer, VSBufferReadable, readableToBuffer, bufferToReadable, streamToBuffer, VSBufferReadableStream, VSBufferReadableBufferedStream, bufferedStreamToBuffer, newWriteableBufferStream } from 'vs/base/common/buffer';
import { isReadableStream, transform, peekReadable, peekStream, isReadableBufferedStream, newWriteableStream, IReadableStreamObservable, observe } from 'vs/base/common/stream';
import { isReadableStream, transform, peekReadable, peekStream, isReadableBufferedStream, newWriteableStream, listenStream, consumeStream } from 'vs/base/common/stream';
import { Promises, Queue } from 'vs/base/common/async';
import { CancellationTokenSource, CancellationToken } from 'vs/base/common/cancellation';
import { Schemas } from 'vs/base/common/network';
@ -454,8 +454,7 @@ export class FileService extends Disposable implements IFileService {
throw error;
});
let fileStreamObserver: IReadableStreamObservable | undefined = undefined;
let fileStream: VSBufferReadableStream | undefined = undefined;
try {
// if the etag is provided, we await the result of the validation
@ -466,8 +465,6 @@ export class FileService extends Disposable implements IFileService {
await statPromise;
}
let fileStream: VSBufferReadableStream | undefined = undefined;
// read unbuffered (only if either preferred, or the provider has no buffered read capability)
if (!(hasOpenReadWriteCloseCapability(provider) || hasFileReadStreamCapability(provider)) || (hasReadWriteCapability(provider) && options?.preferUnbuffered)) {
fileStream = this.readFileUnbuffered(provider, resource, options);
@ -483,9 +480,6 @@ export class FileService extends Disposable implements IFileService {
fileStream = this.readFileBuffered(provider, resource, cancellableSource.token, options);
}
// observe the stream for the error case below
fileStreamObserver = observe(fileStream);
const fileStat = await statPromise;
return {
@ -497,8 +491,8 @@ export class FileService extends Disposable implements IFileService {
// Await the stream to finish so that we exit this method
// in a consistent state with file handles closed
// (https://github.com/microsoft/vscode/issues/114024)
if (fileStreamObserver) {
await fileStreamObserver.errorOrEnd();
if (fileStream) {
await consumeStream(fileStream);
}
throw new FileOperationError(localize('err.read', "Unable to read file '{0}' ({1})", this.resourceForError(resource), ensureFileSystemProviderError(error).toString()), toFileOperationResult(error), options);
@ -1065,28 +1059,29 @@ export class FileService extends Disposable implements IFileService {
return new Promise(async (resolve, reject) => {
stream.on('data', async chunk => {
listenStream(stream, {
onData: async chunk => {
// pause stream to perform async write operation
stream.pause();
// pause stream to perform async write operation
stream.pause();
try {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
} catch (error) {
return reject(error);
}
try {
await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0);
} catch (error) {
return reject(error);
}
posInFile += chunk.byteLength;
posInFile += chunk.byteLength;
// resume stream now that we have successfully written
// run this on the next tick to prevent increasing the
// execution stack because resume() may call the event
// handler again before finishing.
setTimeout(() => stream.resume());
// resume stream now that we have successfully written
// run this on the next tick to prevent increasing the
// execution stack because resume() may call the event
// handler again before finishing.
setTimeout(() => stream.resume());
},
onError: error => reject(error),
onEnd: () => resolve()
});
stream.on('error', error => reject(error));
stream.on('end', () => resolve());
});
}

View file

@ -1584,6 +1584,20 @@ flakySuite('Disk File Service', function () {
assert.strictEqual(error!.fileOperationResult, FileOperationResult.FILE_TOO_LARGE);
}
(isWindows ? test.skip /* windows: cannot create file symbolic link without elevated context */ : test)('readFile - dangling symbolic link - https://github.com/microsoft/vscode/issues/116049', async () => {
const link = URI.file(join(testDir, 'small.js-link'));
await promises.symlink(join(testDir, 'small.js'), link.fsPath);
let error: FileOperationError | undefined = undefined;
try {
await service.readFile(link);
} catch (err) {
error = err;
}
assert.ok(error);
});
test('createFile', async () => {
return assertCreateFile(contents => VSBuffer.fromString(contents));
});

View file

@ -9,6 +9,7 @@ import { bufferToStream, VSBufferReadableStream } from 'vs/base/common/buffer';
import { CancellationToken } from 'vs/base/common/cancellation';
import { Disposable, toDisposable } from 'vs/base/common/lifecycle';
import { FileAccess, Schemas } from 'vs/base/common/network';
import { listenStream } from 'vs/base/common/stream';
import { URI } from 'vs/base/common/uri';
import { FileOperationError, FileOperationResult, IFileService } from 'vs/platform/files/common/files';
import { ILogService } from 'vs/platform/log/common/log';
@ -75,28 +76,27 @@ export class WebviewProtocolProvider extends Disposable {
if (!this.listening) {
this.listening = true;
// Data
stream.on('data', data => {
try {
if (!this.push(data.buffer)) {
stream.pause(); // pause the stream if we should not push anymore
listenStream(stream, {
onData: data => {
try {
if (!this.push(data.buffer)) {
stream.pause(); // pause the stream if we should not push anymore
}
} catch (error) {
this.emit(error);
}
},
onError: error => {
this.emit('error', error);
},
onEnd: () => {
try {
this.push(null); // signal EOS
} catch (error) {
this.emit(error);
}
} catch (error) {
this.emit(error);
}
});
// End
stream.on('end', () => {
try {
this.push(null); // signal EOS
} catch (error) {
this.emit(error);
}
});
// Error
stream.on('error', error => this.emit('error', error));
}
// ensure the stream is flowing

View file

@ -53,6 +53,7 @@ import { ILogService } from 'vs/platform/log/common/log';
import { IUriIdentityService } from 'vs/workbench/services/uriIdentity/common/uriIdentity';
import { ResourceFileEdit } from 'vs/editor/browser/services/bulkEditService';
import { IExplorerService } from 'vs/workbench/contrib/files/browser/files';
import { listenStream } from 'vs/base/common/stream';
export const NEW_FILE_COMMAND_ID = 'explorer.newFile';
export const NEW_FILE_LABEL = nls.localize('newFile', "New File");
@ -1034,22 +1035,22 @@ const downloadFileHandler = async (accessor: ServicesAccessor) => {
reject();
}));
sourceStream.on('data', data => {
if (!disposed) {
target.write(data.buffer);
reportProgress(contents.name, contents.size, data.byteLength, operation);
listenStream(sourceStream, {
onData: data => {
if (!disposed) {
target.write(data.buffer);
reportProgress(contents.name, contents.size, data.byteLength, operation);
}
},
onError: error => {
disposables.dispose();
reject(error);
},
onEnd: () => {
disposables.dispose();
resolve();
}
});
sourceStream.on('error', error => {
disposables.dispose();
reject(error);
});
sourceStream.on('end', () => {
disposables.dispose();
resolve();
});
});
}

View file

@ -3,7 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { Readable, ReadableStream, newWriteableStream } from 'vs/base/common/stream';
import { Readable, ReadableStream, newWriteableStream, listenStream } from 'vs/base/common/stream';
import { VSBuffer, VSBufferReadable, VSBufferReadableStream } from 'vs/base/common/buffer';
export const UTF8 = 'utf8';
@ -133,50 +133,47 @@ export function toDecodeStream(source: VSBufferReadableStream, options: IDecodeS
}
};
// Stream error: forward to target
source.on('error', error => target.error(error));
listenStream(source, {
onData: async chunk => {
// Stream data
source.on('data', async chunk => {
// if the decoder is ready, we just write directly
if (decoder) {
target.write(decoder.write(chunk.buffer));
}
// otherwise we need to buffer the data until the stream is ready
else {
bufferedChunks.push(chunk);
bytesBuffered += chunk.byteLength;
// buffered enough data for encoding detection, create stream
if (bytesBuffered >= minBytesRequiredForDetection) {
// pause stream here until the decoder is ready
source.pause();
await createDecoder();
// resume stream now that decoder is ready but
// outside of this stack to reduce recursion
setTimeout(() => source.resume());
// if the decoder is ready, we just write directly
if (decoder) {
target.write(decoder.write(chunk.buffer));
}
// otherwise we need to buffer the data until the stream is ready
else {
bufferedChunks.push(chunk);
bytesBuffered += chunk.byteLength;
// buffered enough data for encoding detection, create stream
if (bytesBuffered >= minBytesRequiredForDetection) {
// pause stream here until the decoder is ready
source.pause();
await createDecoder();
// resume stream now that decoder is ready but
// outside of this stack to reduce recursion
setTimeout(() => source.resume());
}
}
},
onError: error => target.error(error), // simply forward to target
onEnd: async () => {
// we were still waiting for data to do the encoding
// detection. thus, wrap up starting the stream even
// without all the data to get things going
if (!decoder) {
await createDecoder();
}
// end the target with the remainders of the decoder
target.end(decoder?.end());
}
});
// Stream end
source.on('end', async () => {
// we were still waiting for data to do the encoding
// detection. thus, wrap up starting the stream even
// without all the data to get things going
if (!decoder) {
await createDecoder();
}
// end the target with the remainders of the decoder
target.end(decoder?.end());
});
});
}

View file

@ -69,7 +69,7 @@ import { Part } from 'vs/workbench/browser/part';
import { IPanelService } from 'vs/workbench/services/panel/common/panelService';
import { IPanel } from 'vs/workbench/common/panel';
import { IBadge } from 'vs/workbench/services/activity/common/activity';
import { VSBuffer, VSBufferReadable } from 'vs/base/common/buffer';
import { bufferToStream, VSBuffer, VSBufferReadable } from 'vs/base/common/buffer';
import { Schemas } from 'vs/base/common/network';
import { IProductService } from 'vs/platform/product/common/productService';
import product from 'vs/platform/product/common/product';
@ -839,20 +839,7 @@ export class TestFileService implements IFileService {
return Promise.resolve({
resource,
value: {
on: (event: string, callback: Function): void => {
if (event === 'data') {
callback(this.content);
}
if (event === 'end') {
callback();
}
},
removeListener: () => { },
resume: () => { },
pause: () => { },
destroy: () => { }
},
value: bufferToStream(VSBuffer.fromString(this.content)),
etag: 'index.txt',
encoding: 'utf8',
mtime: Date.now(),