Extract ZlibInflateStream

This commit is contained in:
Alex Dima 2021-11-25 11:47:55 +01:00
parent 20d492a0a0
commit babe6a6a94
No known key found for this signature in database
GPG key ID: 39563C1504FDD0C9
3 changed files with 98 additions and 45 deletions

View file

@ -43,6 +43,14 @@ export class VSBuffer {
}
}
static fromByteArray(source: number[]): VSBuffer {
const result = VSBuffer.alloc(source.length);
for (let i = 0, len = source.length; i < len; i++) {
result.buffer[i] = source[i];
}
return result;
}
static concat(buffers: VSBuffer[], totalLength?: number): VSBuffer {
if (typeof totalLength === 'undefined') {
totalLength = 0;
@ -70,6 +78,12 @@ export class VSBuffer {
this.byteLength = this.buffer.byteLength;
}
clone(): VSBuffer {
const result = VSBuffer.alloc(this.byteLength);
result.set(this);
return result;
}
toString(): string {
if (hasBuffer) {
return this.buffer.toString();

View file

@ -25,7 +25,7 @@ export const enum SocketDiagnosticsEventType {
zlibInflateError = 'zlibInflateError',
zlibInflateData = 'zlibInflateData',
zlibInflateWriteInitial = 'zlibInflateWriteInitial',
zlibInflateInitialWrite = 'zlibInflateInitialWrite',
zlibInflateInitialFlushFired = 'zlibInflateInitialFlushFired',
zlibInflateWrite = 'zlibInflateWrite',
zlibInflateFlushFired = 'zlibInflateFlushFired',

View file

@ -174,10 +174,14 @@ const enum ReadState {
Fin = 4
}
interface ISocketTracer {
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void
}
/**
* See https://tools.ietf.org/html/rfc6455#section-5.2
*/
export class WebSocketNodeSocket extends Disposable implements ISocket {
export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketTracer {
public readonly socket: NodeSocket;
public readonly permessageDeflate: boolean;
@ -185,13 +189,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
private _totalIncomingDataBytes: number;
private _totalOutgoingWireBytes: number;
private _totalOutgoingDataBytes: number;
private readonly _zlibInflate: zlib.InflateRaw | null;
private readonly _zlibInflateStream: ZlibInflateStream | null;
private readonly _zlibDeflate: zlib.DeflateRaw | null;
private _zlibDeflateFlushWaitingCount: number;
private readonly _onDidZlibFlush = this._register(new Emitter<void>());
private readonly _recordInflateBytes: boolean;
private readonly _recordedInflateBytes: Buffer[] = [];
private readonly _pendingInflateData: Buffer[] = [];
private readonly _pendingDeflateData: Buffer[] = [];
private readonly _incomingData: ChunkStream;
private readonly _onData = this._register(new Emitter<VSBuffer>());
@ -224,8 +225,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
}
public get recordedInflateBytes(): VSBuffer {
if (this._recordInflateBytes) {
return VSBuffer.wrap(Buffer.concat(this._recordedInflateBytes));
if (this._zlibInflateStream) {
return this._zlibInflateStream.recordedInflateBytes;
}
return VSBuffer.alloc(0);
}
@ -255,16 +256,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
this._totalOutgoingWireBytes = 0;
this._totalOutgoingDataBytes = 0;
this.permessageDeflate = permessageDeflate;
this._recordInflateBytes = recordInflateBytes;
if (permessageDeflate) {
// See https://tools.ietf.org/html/rfc7692#page-16
// To simplify our logic, we don't negotiate the window size
// and simply dedicate (2^15) / 32kb per web socket
this._zlibInflate = zlib.createInflateRaw({
this._zlibInflateStream = new ZlibInflateStream(this, recordInflateBytes, inflateBytes, {
windowBits: 15
});
this._zlibInflate.on('error', (err) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (<any>err)?.code });
this._register(this._zlibInflateStream.onError((err) => {
// zlib errors are fatal, since we have no idea how to recover
console.error(err);
onUnexpectedError(err);
@ -273,19 +272,11 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
hadError: true,
error: err
});
});
this._zlibInflate.on('data', (data: Buffer) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data);
this._pendingInflateData.push(data);
});
if (inflateBytes) {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWriteInitial, inflateBytes.buffer);
this._zlibInflate.write(inflateBytes.buffer);
this._zlibInflate.flush(() => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired);
this._pendingInflateData.length = 0;
});
}
}));
this._register(this._zlibInflateStream.onData((data) => {
this._totalIncomingDataBytes += data.byteLength;
this._onData.fire(data);
}));
this._zlibDeflate = zlib.createDeflateRaw({
windowBits: 15
@ -306,7 +297,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
this._pendingDeflateData.push(data);
});
} else {
this._zlibInflate = null;
this._zlibInflateStream = null;
this._zlibDeflate = null;
}
this._zlibDeflateFlushWaitingCount = 0;
@ -505,34 +496,19 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
this._state.readLen = Constants.MinHeaderByteSize;
this._state.mask = 0;
if (this._zlibInflate && this._state.compressed) {
if (this._zlibInflateStream && this._state.compressed) {
// See https://datatracker.ietf.org/doc/html/rfc7692#section-9.2
// Even if permessageDeflate is negotiated, it is possible
// that the other side might decide to send uncompressed messages
// So only decompress messages that have the RSV 1 bit set
//
// See https://tools.ietf.org/html/rfc7692#section-7.2.2
if (this._recordInflateBytes) {
this._recordedInflateBytes.push(Buffer.from(<Buffer>body.buffer));
}
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, body.buffer);
this._zlibInflate.write(<Buffer>body.buffer);
this._zlibInflateStream.write(body);
if (this._state.fin) {
if (this._recordInflateBytes) {
this._recordedInflateBytes.push(Buffer.from([0x00, 0x00, 0xff, 0xff]));
}
const buff = Buffer.from([0x00, 0x00, 0xff, 0xff]);
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buff);
this._zlibInflate.write(buff);
this._zlibInflateStream.write(VSBuffer.fromByteArray([0x00, 0x00, 0xff, 0xff]));
}
this._zlibInflate.flush(() => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired);
const data = Buffer.concat(this._pendingInflateData);
this._pendingInflateData.length = 0;
this._totalIncomingDataBytes += data.length;
this._onData.fire(VSBuffer.wrap(data));
});
this._zlibInflateStream.flush();
} else {
this._totalIncomingDataBytes += body.byteLength;
this._onData.fire(body);
@ -551,6 +527,69 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
}
}
class ZlibInflateStream extends Disposable {
private readonly _onError = this._register(new Emitter<Error>());
public readonly onError = this._onError.event;
private readonly _onData = this._register(new Emitter<VSBuffer>());
public readonly onData = this._onData.event;
private readonly _zlibInflate: zlib.InflateRaw;
private readonly _recordedInflateBytes: VSBuffer[] = [];
private readonly _pendingInflateData: VSBuffer[] = [];
public get recordedInflateBytes(): VSBuffer {
if (this._recordInflateBytes) {
return VSBuffer.concat(this._recordedInflateBytes);
}
return VSBuffer.alloc(0);
}
constructor(
private readonly _tracer: ISocketTracer,
private readonly _recordInflateBytes: boolean,
inflateBytes: VSBuffer | null,
options: zlib.ZlibOptions
) {
super();
this._zlibInflate = zlib.createInflateRaw(options);
this._zlibInflate.on('error', (err) => {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (<any>err)?.code });
this._onError.fire(err);
});
this._zlibInflate.on('data', (data: Buffer) => {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data);
this._pendingInflateData.push(VSBuffer.wrap(data));
});
if (inflateBytes) {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialWrite, inflateBytes.buffer);
this._zlibInflate.write(inflateBytes.buffer);
this._zlibInflate.flush(() => {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired);
this._pendingInflateData.length = 0;
});
}
}
public write(buffer: VSBuffer): void {
if (this._recordInflateBytes) {
this._recordedInflateBytes.push(buffer.clone());
}
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buffer);
this._zlibInflate.write(buffer.buffer);
}
public flush(): void {
this._zlibInflate.flush(() => {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired);
const data = VSBuffer.concat(this._pendingInflateData);
this._pendingInflateData.length = 0;
this._onData.fire(data);
});
}
}
function unmask(buffer: VSBuffer, mask: number): void {
if (mask === 0) {
return;