Extract ZlibDeflateStream

This commit is contained in:
Alex Dima 2021-11-25 15:27:37 +01:00
parent babe6a6a94
commit 2b6fd1df46
No known key found for this signature in database
GPG key ID: 39563C1504FDD0C9

View file

@ -185,15 +185,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
public readonly socket: NodeSocket;
public readonly permessageDeflate: boolean;
private _totalIncomingWireBytes: number;
private _totalIncomingDataBytes: number;
private _totalOutgoingWireBytes: number;
private _totalOutgoingDataBytes: number;
private _totalIncomingWireBytes: number = 0;
private _totalIncomingDataBytes: number = 0;
private _totalOutgoingWireBytes: number = 0;
private _totalOutgoingDataBytes: number = 0;
private readonly _zlibInflateStream: ZlibInflateStream | null;
private readonly _zlibDeflate: zlib.DeflateRaw | null;
private readonly _zlibDeflateStream: ZlibDeflateStream | null;
private _zlibDeflateFlushWaitingCount: number;
private readonly _onDidZlibFlush = this._register(new Emitter<void>());
private readonly _pendingDeflateData: Buffer[] = [];
private readonly _incomingData: ChunkStream;
private readonly _onData = this._register(new Emitter<VSBuffer>());
private readonly _onClose = this._register(new Emitter<SocketCloseEvent>());
@ -251,18 +250,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
super();
this.socket = socket;
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'WebSocketNodeSocket', permessageDeflate, inflateBytesLength: inflateBytes?.byteLength || 0, recordInflateBytes });
this._totalIncomingWireBytes = 0;
this._totalIncomingDataBytes = 0;
this._totalOutgoingWireBytes = 0;
this._totalOutgoingDataBytes = 0;
this.permessageDeflate = permessageDeflate;
if (permessageDeflate) {
// See https://tools.ietf.org/html/rfc7692#page-16
// To simplify our logic, we don't negotiate the window size
// and simply dedicate (2^15) / 32kb per web socket
this._zlibInflateStream = new ZlibInflateStream(this, recordInflateBytes, inflateBytes, {
this._zlibInflateStream = this._register(new ZlibInflateStream(this, recordInflateBytes, inflateBytes, {
windowBits: 15
});
}));
this._register(this._zlibInflateStream.onError((err) => {
// zlib errors are fatal, since we have no idea how to recover
console.error(err);
@ -278,11 +273,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
this._onData.fire(data);
}));
this._zlibDeflate = zlib.createDeflateRaw({
this._zlibDeflateStream = this._register(new ZlibDeflateStream(this, {
windowBits: 15
});
this._zlibDeflate.on('error', (err) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (<any>err)?.code });
}));
this._register(this._zlibDeflateStream.onError((err) => {
// zlib errors are fatal, since we have no idea how to recover
console.error(err);
onUnexpectedError(err);
@ -291,14 +285,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
hadError: true,
error: err
});
});
this._zlibDeflate.on('data', (data: Buffer) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data);
this._pendingDeflateData.push(data);
});
}));
} else {
this._zlibInflateStream = null;
this._zlibDeflate = null;
this._zlibDeflateStream = null;
}
this._zlibDeflateFlushWaitingCount = 0;
this._incomingData = new ChunkStream();
@ -333,24 +323,17 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
public write(buffer: VSBuffer): void {
this._totalOutgoingDataBytes += buffer.byteLength;
if (this._zlibDeflate) {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer);
this._zlibDeflate.write(<Buffer>buffer.buffer);
if (this._zlibDeflateStream) {
this._zlibDeflateStream.write(buffer);
this._zlibDeflateFlushWaitingCount++;
// See https://zlib.net/manual.html#Constants
this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired);
this._zlibDeflateStream.flush((data) => {
this._zlibDeflateFlushWaitingCount--;
let data = Buffer.concat(this._pendingDeflateData);
this._pendingDeflateData.length = 0;
// See https://tools.ietf.org/html/rfc7692#section-7.2.1
data = data.slice(0, data.length - 4);
if (!this._isEnded) {
// Avoid ERR_STREAM_WRITE_AFTER_END
this._write(VSBuffer.wrap(data), true);
this._write(data, true);
}
if (this._zlibDeflateFlushWaitingCount === 0) {
@ -590,6 +573,54 @@ class ZlibInflateStream extends Disposable {
}
}
class ZlibDeflateStream extends Disposable {
private readonly _onError = this._register(new Emitter<Error>());
public readonly onError = this._onError.event;
private readonly _zlibDeflate: zlib.DeflateRaw;
private readonly _pendingDeflateData: VSBuffer[] = [];
constructor(
private readonly _tracer: ISocketTracer,
options: zlib.ZlibOptions
) {
super();
this._zlibDeflate = zlib.createDeflateRaw({
windowBits: 15
});
this._zlibDeflate.on('error', (err) => {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (<any>err)?.code });
this._onError.fire(err);
});
this._zlibDeflate.on('data', (data: Buffer) => {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data);
this._pendingDeflateData.push(VSBuffer.wrap(data));
});
}
public write(buffer: VSBuffer): void {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer);
this._zlibDeflate.write(<Buffer>buffer.buffer);
}
public flush(callback: (data: VSBuffer) => void): void {
// See https://zlib.net/manual.html#Constants
this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => {
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired);
let data = VSBuffer.concat(this._pendingDeflateData);
this._pendingDeflateData.length = 0;
// See https://tools.ietf.org/html/rfc7692#section-7.2.1
data = data.slice(0, data.byteLength - 4);
callback(data);
});
}
}
function unmask(buffer: VSBuffer, mask: number): void {
if (mask === 0) {
return;