From 648e355c05fe2b0cfd38379fea30f08d19114fc0 Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 09:40:16 +0100 Subject: [PATCH 1/9] Add tracing to sockets --- src/vs/base/common/buffer.ts | 13 +- src/vs/base/parts/ipc/common/ipc.net.ts | 146 +++++++++++++++++- src/vs/base/parts/ipc/node/ipc.net.ts | 67 ++++++-- .../remote/browser/browserSocketFactory.ts | 63 ++++++-- .../remote/common/remoteAgentConnection.ts | 8 +- .../platform/remote/node/nodeSocketFactory.ts | 4 +- .../server/remoteExtensionHostAgentServer.ts | 4 +- .../localProcessExtensionHost.ts | 2 +- .../node/extensionHostProcessSetup.ts | 6 +- 9 files changed, 274 insertions(+), 39 deletions(-) diff --git a/src/vs/base/common/buffer.ts b/src/vs/base/common/buffer.ts index f6858281193..0e28b41904b 100644 --- a/src/vs/base/common/buffer.ts +++ b/src/vs/base/common/buffer.ts @@ -90,11 +90,20 @@ export class VSBuffer { set(array: VSBuffer, offset?: number): void; set(array: Uint8Array, offset?: number): void; - set(array: VSBuffer | Uint8Array, offset?: number): void { + set(array: ArrayBuffer, offset?: number): void; + set(array: ArrayBufferView, offset?: number): void; + set(array: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView, offset?: number): void; + set(array: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView, offset?: number): void { if (array instanceof VSBuffer) { this.buffer.set(array.buffer, offset); - } else { + } else if (array instanceof Uint8Array) { this.buffer.set(array, offset); + } else if (array instanceof ArrayBuffer) { + this.buffer.set(new Uint8Array(array), offset); + } else if (ArrayBuffer.isView(array)) { + this.buffer.set(new Uint8Array(array.buffer, array.byteOffset, array.byteLength), offset); + } else { + throw new Error(`Unkown argument 'array'`); } } diff --git a/src/vs/base/parts/ipc/common/ipc.net.ts b/src/vs/base/parts/ipc/common/ipc.net.ts index 576c91cdb50..5075a80ec4f 100644 --- a/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/src/vs/base/parts/ipc/common/ipc.net.ts @@ -8,6 +8,89 @@ import { Emitter, Event } from 'vs/base/common/event'; import { Disposable, dispose, IDisposable } from 'vs/base/common/lifecycle'; import { IIPCLogger, IMessagePassingProtocol, IPCClient } from 'vs/base/parts/ipc/common/ipc'; +export const enum SocketDiagnosticsEventType { + Created = 'created', + Read = 'read', + Write = 'write', + Open = 'open', + Error = 'error', + Close = 'close', + + BrowserWebSocketBlobReceived = 'browserWebSocketBlobReceived', + + NodeEndReceived = 'nodeEndReceived', + NodeEndSent = 'nodeEndSent', + NodeDrainBegin = 'nodeDrainBegin', + NodeDrainEnd = 'nodeDrainEnd', + + zlibInflateError = 'zlibInflateError', + zlibInflateData = 'zlibInflateData', + zlibInflateWriteInitial = 'zlibInflateWriteInitial', + zlibInflateInitialFlushFired = 'zlibInflateInitialFlushFired', + zlibInflateWrite = 'zlibInflateWrite', + zlibInflateFlushFired = 'zlibInflateFlushFired', + zlibDeflateError = 'zlibDeflateError', + zlibDeflateData = 'zlibDeflateData', + zlibDeflateWrite = 'zlibDeflateWrite', + zlibDeflateFlushFired = 'zlibDeflateFlushFired', + + WebSocketNodeSocketWrite = 'webSocketNodeSocketWrite', + WebSocketNodeSocketPeekedHeader = 'webSocketNodeSocketPeekedHeader', + WebSocketNodeSocketReadHeader = 'webSocketNodeSocketReadHeader', + WebSocketNodeSocketReadData = 'webSocketNodeSocketReadData', + WebSocketNodeSocketUnmaskedData = 'webSocketNodeSocketUnmaskedData', + WebSocketNodeSocketDrainBegin = 'webSocketNodeSocketDrainBegin', + WebSocketNodeSocketDrainEnd = 'webSocketNodeSocketDrainEnd', + + ProtocolHeaderRead = 'protocolHeaderRead', + ProtocolMessageRead = 'protocolMessageRead', + ProtocolHeaderWrite = 'protocolHeaderWrite', + ProtocolMessageWrite = 'protocolMessageWrite', + ProtocolWrite = 'protocolWrite', +} + +export namespace SocketDiagnostics { + + export const enableDiagnostics = false; + + export interface IRecord { + timestamp: number; + id: string; + label: string; + type: SocketDiagnosticsEventType; + buff?: VSBuffer; + data?: any; + } + + export const records: IRecord[] = []; + const socketIds = new WeakMap(); + let lastUsedSocketId = 0; + + function getSocketId(nativeObject: any, label: string): string { + if (!socketIds.has(nativeObject)) { + const id = String(++lastUsedSocketId); + socketIds.set(nativeObject, id); + } + return socketIds.get(nativeObject)!; + } + + export function traceSocketEvent(nativeObject: any, socketDebugLabel: string, type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void { + if (!enableDiagnostics) { + return; + } + const id = getSocketId(nativeObject, socketDebugLabel); + + if (data instanceof VSBuffer || data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + const copiedData = VSBuffer.alloc(data.byteLength); + copiedData.set(data); + records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, buff: copiedData }); + } else { + // data is a custom object + records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, data: data }); + } + } +} + export const enum SocketCloseEventType { NodeSocketCloseEvent = 0, WebSocketCloseEvent = 1 @@ -60,6 +143,8 @@ export interface ISocket extends IDisposable { write(buffer: VSBuffer): void; end(): void; drain(): Promise; + + traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void; } let emptyBuffer: VSBuffer | null = null; @@ -173,6 +258,18 @@ const enum ProtocolMessageType { ReplayRequest = 6 } +function protocolMessageTypeToString(messageType: ProtocolMessageType) { + switch (messageType) { + case ProtocolMessageType.None: return 'None'; + case ProtocolMessageType.Regular: return 'Regular'; + case ProtocolMessageType.Control: return 'Control'; + case ProtocolMessageType.Ack: return 'Ack'; + case ProtocolMessageType.KeepAlive: return 'KeepAlive'; + case ProtocolMessageType.Disconnect: return 'Disconnect'; + case ProtocolMessageType.ReplayRequest: return 'ReplayRequest'; + } +} + export const enum ProtocolConstants { HeaderLength = 13, /** @@ -268,6 +365,9 @@ class ProtocolReader extends Disposable { this._state.messageType = buff.readUInt8(0); this._state.id = buff.readUInt32BE(1); this._state.ack = buff.readUInt32BE(5); + + this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderRead, { messageType: protocolMessageTypeToString(this._state.messageType), id: this._state.id, ack: this._state.ack, messageSize: this._state.readLen }); + } else { // buff is the body const messageType = this._state.messageType; @@ -281,6 +381,8 @@ class ProtocolReader extends Disposable { this._state.id = 0; this._state.ack = 0; + this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageRead, buff); + this._onMessage.fire(new ProtocolMessage(messageType, id, ack, buff)); if (this._isDisposed) { @@ -349,6 +451,10 @@ class ProtocolWriter { header.writeUInt32BE(msg.id, 1); header.writeUInt32BE(msg.ack, 5); header.writeUInt32BE(msg.data.byteLength, 9); + + this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderWrite, { messageType: protocolMessageTypeToString(msg.type), id: msg.id, ack: msg.ack, messageSize: msg.data.byteLength }); + this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageWrite, msg.data); + this._writeSoon(header, msg.data); } @@ -378,7 +484,9 @@ class ProtocolWriter { if (this._totalLength === 0) { return; } - this._socket.write(this._bufferTake()); + const data = this._bufferTake(); + this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolWrite, { byteLength: data.byteLength }); + this._socket.write(data); } } @@ -983,3 +1091,39 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._socketWriter.write(msg); } } + +// (() => { +// if (!SocketDiagnostics.enableDiagnostics) { +// return; +// } +// if (typeof require.__$__nodeRequire !== 'function') { +// console.log(`Can only log socket diagnostics on native platforms.`); +// return; +// } +// const type = ( +// process.argv.includes('--type=renderer') +// ? 'renderer' +// : (process.argv.includes('--type=extensionHost') +// ? 'extensionHost' +// : (process.argv.some(item => item.includes('server/main')) +// ? 'server' +// : 'unknown' +// ) +// ) +// ); +// setTimeout(() => { +// SocketDiagnostics.records.forEach(r => { +// if (r.buff) { +// r.data = Buffer.from(r.buff.buffer).toString('base64'); +// r.buff = undefined; +// } +// }); + +// const fs = require.__$__nodeRequire('fs'); +// const path = require.__$__nodeRequire('path'); +// const logPath = path.join(process.cwd(),`${type}-${process.pid}`); + +// console.log(`dumping socket diagnostics at ${logPath}`); +// fs.writeFileSync(logPath, JSON.stringify(SocketDiagnostics.records)); +// }, 20000); +// })(); diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index f4f55e8eea8..32c26428358 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -14,17 +14,25 @@ import { join } from 'vs/base/common/path'; import { Platform, platform } from 'vs/base/common/platform'; import { generateUuid } from 'vs/base/common/uuid'; import { ClientConnectionEvent, IPCServer } from 'vs/base/parts/ipc/common/ipc'; -import { ChunkStream, Client, ISocket, Protocol, SocketCloseEvent, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net'; +import { ChunkStream, Client, ISocket, Protocol, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net'; import * as zlib from 'zlib'; export class NodeSocket implements ISocket { + public readonly debugLabel: string; public readonly socket: Socket; private readonly _errorListener: (err: any) => void; - constructor(socket: Socket) { + public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void { + SocketDiagnostics.traceSocketEvent(this.socket, this.debugLabel, type, data); + } + + constructor(socket: Socket, debugLabel: string = '') { + this.debugLabel = debugLabel; this.socket = socket; + this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'NodeSocket' }); this._errorListener = (err: any) => { + this.traceSocketEvent(SocketDiagnosticsEventType.Error, { code: err?.code, message: err?.message }); if (err) { if (err.code === 'EPIPE') { // An EPIPE exception at the wrong time can lead to a renderer process crash @@ -47,7 +55,10 @@ export class NodeSocket implements ISocket { } public onData(_listener: (e: VSBuffer) => void): IDisposable { - const listener = (buff: Buffer) => _listener(VSBuffer.wrap(buff)); + const listener = (buff: Buffer) => { + this.traceSocketEvent(SocketDiagnosticsEventType.Read, buff); + _listener(VSBuffer.wrap(buff)); + }; this.socket.on('data', listener); return { dispose: () => this.socket.off('data', listener) @@ -56,6 +67,7 @@ export class NodeSocket implements ISocket { public onClose(listener: (e: SocketCloseEvent) => void): IDisposable { const adapter = (hadError: boolean) => { + this.traceSocketEvent(SocketDiagnosticsEventType.Close, { hadError }); listener({ type: SocketCloseEventType.NodeSocketCloseEvent, hadError: hadError, @@ -69,9 +81,13 @@ export class NodeSocket implements ISocket { } public onEnd(listener: () => void): IDisposable { - this.socket.on('end', listener); + const adapter = () => { + this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndReceived); + listener(); + }; + this.socket.on('end', adapter); return { - dispose: () => this.socket.off('end', listener) + dispose: () => this.socket.off('end', adapter) }; } @@ -87,7 +103,8 @@ export class NodeSocket implements ISocket { // > However, the false return value is only advisory and the writable stream will unconditionally // > accept and buffer chunk even if it has not been allowed to drain. try { - this.socket.write(buffer.buffer, (err: any) => { + this.traceSocketEvent(SocketDiagnosticsEventType.Write, buffer); + this.socket.write(buffer.buffer, (err: any) => { if (err) { if (err.code === 'EPIPE') { // An EPIPE exception at the wrong time can lead to a renderer process crash @@ -116,12 +133,15 @@ export class NodeSocket implements ISocket { } public end(): void { + this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndSent); this.socket.end(); } public drain(): Promise { + this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainBegin); return new Promise((resolve, reject) => { if (this.socket.bufferSize === 0) { + this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd); resolve(); return; } @@ -131,6 +151,7 @@ export class NodeSocket implements ISocket { this.socket.off('error', finished); this.socket.off('timeout', finished); this.socket.off('drain', finished); + this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd); resolve(); }; this.socket.on('close', finished); @@ -209,6 +230,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { return VSBuffer.alloc(0); } + public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void { + this.socket.traceSocketEvent(type, data); + } + /** * Create a socket which can communicate using WebSocket frames. * @@ -224,6 +249,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { constructor(socket: NodeSocket, permessageDeflate: boolean, inflateBytes: VSBuffer | null, recordInflateBytes: boolean) { super(); this.socket = socket; + this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'WebSocketNodeSocket', permessageDeflate, inflateBytesLength: inflateBytes?.byteLength || 0, recordInflateBytes }); this._totalIncomingWireBytes = 0; this._totalIncomingDataBytes = 0; this._totalOutgoingWireBytes = 0; @@ -238,6 +264,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { windowBits: 15 }); this._zlibInflate.on('error', (err) => { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (err)?.code }); // zlib errors are fatal, since we have no idea how to recover console.error(err); onUnexpectedError(err); @@ -248,11 +275,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { }); }); this._zlibInflate.on('data', (data: Buffer) => { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data); this._pendingInflateData.push(data); }); if (inflateBytes) { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWriteInitial, inflateBytes.buffer); this._zlibInflate.write(inflateBytes.buffer); this._zlibInflate.flush(() => { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired); this._pendingInflateData.length = 0; }); } @@ -261,6 +291,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { windowBits: 15 }); this._zlibDeflate.on('error', (err) => { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (err)?.code }); // zlib errors are fatal, since we have no idea how to recover console.error(err); onUnexpectedError(err); @@ -271,6 +302,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { }); }); this._zlibDeflate.on('data', (data: Buffer) => { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data); this._pendingDeflateData.push(data); }); } else { @@ -311,11 +343,13 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { this._totalOutgoingDataBytes += buffer.byteLength; if (this._zlibDeflate) { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer); this._zlibDeflate.write(buffer.buffer); this._zlibDeflateFlushWaitingCount++; // See https://zlib.net/manual.html#Constants this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired); this._zlibDeflateFlushWaitingCount--; let data = Buffer.concat(this._pendingDeflateData); this._pendingDeflateData.length = 0; @@ -338,6 +372,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } private _write(buffer: VSBuffer, compressed: boolean): void { + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketWrite, buffer); let headerLen = Constants.MinHeaderByteSize; if (buffer.byteLength < 126) { headerLen += 0; @@ -413,6 +448,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { this._state.firstFrameOfMessage = Boolean(finBit); this._state.mask = 0; + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin }); + } else if (this._state.state === ReadState.ReadHeader) { // read entire header const header = this._incomingData.read(this._state.readLen); @@ -453,11 +490,16 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { this._state.readLen = len; this._state.mask = mask; + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask }); + } else if (this._state.state === ReadState.ReadBody) { // read body const body = this._incomingData.read(this._state.readLen); + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketReadData, body); + unmask(body, this._state.mask); + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketUnmaskedData, body); this._state.state = ReadState.PeekHeader; this._state.readLen = Constants.MinHeaderByteSize; @@ -473,14 +515,19 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { if (this._recordInflateBytes) { this._recordedInflateBytes.push(Buffer.from(body.buffer)); } + + this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, body.buffer); this._zlibInflate.write(body.buffer); if (this._state.fin) { if (this._recordInflateBytes) { this._recordedInflateBytes.push(Buffer.from([0x00, 0x00, 0xff, 0xff])); } - this._zlibInflate.write(Buffer.from([0x00, 0x00, 0xff, 0xff])); + const buff = Buffer.from([0x00, 0x00, 0xff, 0xff]); + this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buff); + this._zlibInflate.write(buff); } this._zlibInflate.flush(() => { + this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired); const data = Buffer.concat(this._pendingInflateData); this._pendingInflateData.length = 0; this._totalIncomingDataBytes += data.length; @@ -495,10 +542,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } public async drain(): Promise { + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainBegin); if (this._zlibDeflateFlushWaitingCount > 0) { await Event.toPromise(this._onDidZlibFlush.event); } await this.socket.drain(); + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainEnd); } } @@ -597,7 +646,7 @@ export class Server extends IPCServer { const onConnection = Event.fromNodeEventEmitter(server, 'connection'); return Event.map(onConnection, socket => ({ - protocol: new Protocol(new NodeSocket(socket)), + protocol: new Protocol(new NodeSocket(socket, 'ipc-server-connection')), onDidClientDisconnect: Event.once(Event.fromNodeEventEmitter(socket, 'close')) })); } @@ -639,7 +688,7 @@ export function connect(hook: any, clientId: string): Promise { return new Promise((c, e) => { const socket = createConnection(hook, () => { socket.removeListener('error', e); - c(Client.fromSocket(new NodeSocket(socket), clientId)); + c(Client.fromSocket(new NodeSocket(socket, `ipc-client${clientId}`), clientId)); }); socket.once('error', e); diff --git a/src/vs/platform/remote/browser/browserSocketFactory.ts b/src/vs/platform/remote/browser/browserSocketFactory.ts index d9a5d9b26de..e81f91b857f 100644 --- a/src/vs/platform/remote/browser/browserSocketFactory.ts +++ b/src/vs/platform/remote/browser/browserSocketFactory.ts @@ -8,12 +8,12 @@ import { RunOnceScheduler } from 'vs/base/common/async'; import { VSBuffer } from 'vs/base/common/buffer'; import { Emitter, Event } from 'vs/base/common/event'; import { Disposable, IDisposable } from 'vs/base/common/lifecycle'; -import { ISocket, SocketCloseEvent, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net'; +import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net'; import { IConnectCallback, ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection'; import { RemoteAuthorityResolverError, RemoteAuthorityResolverErrorCode } from 'vs/platform/remote/common/remoteAuthorityResolver'; export interface IWebSocketFactory { - create(url: string): IWebSocket; + create(url: string, debugLabel: string): IWebSocket; } export interface IWebSocketCloseEvent { @@ -41,6 +41,7 @@ export interface IWebSocket { readonly onClose: Event; readonly onError: Event; + traceSocketEvent?(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void; send(data: ArrayBuffer | ArrayBufferView): void; close(): void; } @@ -50,7 +51,8 @@ class BrowserWebSocket extends Disposable implements IWebSocket { private readonly _onData = new Emitter(); public readonly onData = this._onData.event; - public readonly onOpen: Event; + private readonly _onOpen = this._register(new Emitter()); + public readonly onOpen = this._onOpen.event; private readonly _onClose = this._register(new Emitter()); public readonly onClose = this._onClose.event; @@ -58,6 +60,7 @@ class BrowserWebSocket extends Disposable implements IWebSocket { private readonly _onError = this._register(new Emitter()); public readonly onError = this._onError.event; + private readonly _debugLabel: string; private readonly _socket: WebSocket; private readonly _fileReader: FileReader; private readonly _queue: Blob[]; @@ -66,9 +69,15 @@ class BrowserWebSocket extends Disposable implements IWebSocket { private readonly _socketMessageListener: (ev: MessageEvent) => void; - constructor(socket: WebSocket) { + public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void { + SocketDiagnostics.traceSocketEvent(this._socket, this._debugLabel, type, data); + } + + constructor(url: string, debugLabel: string) { super(); - this._socket = socket; + this._debugLabel = debugLabel; + this._socket = new WebSocket(url); + this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'BrowserWebSocket', url }); this._fileReader = new FileReader(); this._queue = []; this._isReading = false; @@ -78,6 +87,7 @@ class BrowserWebSocket extends Disposable implements IWebSocket { this._isReading = false; const buff = (event.target).result; + this.traceSocketEvent(SocketDiagnosticsEventType.Read, buff); this._onData.fire(buff); if (this._queue.length > 0) { @@ -95,11 +105,16 @@ class BrowserWebSocket extends Disposable implements IWebSocket { }; this._socketMessageListener = (ev: MessageEvent) => { - enqueue(ev.data); + const blob = (ev.data); + this.traceSocketEvent(SocketDiagnosticsEventType.BrowserWebSocketBlobReceived, { type: blob.type, size: blob.size }); + enqueue(blob); }; this._socket.addEventListener('message', this._socketMessageListener); - this.onOpen = Event.fromDOMEventEmitter(this._socket, 'open'); + this._register(dom.addDisposableListener(this._socket, 'open', (e) => { + this.traceSocketEvent(SocketDiagnosticsEventType.Open); + this._onOpen.fire(); + })); // WebSockets emit error events that do not contain any real information // Our only chance of getting to the root cause of an error is to @@ -134,6 +149,8 @@ class BrowserWebSocket extends Disposable implements IWebSocket { }; this._register(dom.addDisposableListener(this._socket, 'close', (e: CloseEvent) => { + this.traceSocketEvent(SocketDiagnosticsEventType.Close, { code: e.code, reason: e.reason, wasClean: e.wasClean }); + this._isClosed = true; if (pendingErrorEvent) { @@ -157,7 +174,10 @@ class BrowserWebSocket extends Disposable implements IWebSocket { this._onClose.fire({ code: e.code, reason: e.reason, wasClean: e.wasClean, event: e }); })); - this._register(dom.addDisposableListener(this._socket, 'error', sendErrorSoon)); + this._register(dom.addDisposableListener(this._socket, 'error', (err) => { + this.traceSocketEvent(SocketDiagnosticsEventType.Error, { message: err?.message }); + sendErrorSoon(err); + })); } send(data: ArrayBuffer | ArrayBufferView): void { @@ -165,11 +185,13 @@ class BrowserWebSocket extends Disposable implements IWebSocket { // Refuse to write data to closed WebSocket... return; } + this.traceSocketEvent(SocketDiagnosticsEventType.Write, data); this._socket.send(data); } close(): void { this._isClosed = true; + this.traceSocketEvent(SocketDiagnosticsEventType.Close); this._socket.close(); this._socket.removeEventListener('message', this._socketMessageListener); this.dispose(); @@ -177,16 +199,27 @@ class BrowserWebSocket extends Disposable implements IWebSocket { } export const defaultWebSocketFactory = new class implements IWebSocketFactory { - create(url: string): IWebSocket { - return new BrowserWebSocket(new WebSocket(url)); + create(url: string, debugLabel: string): IWebSocket { + return new BrowserWebSocket(url, debugLabel); } }; class BrowserSocket implements ISocket { - public readonly socket: IWebSocket; - constructor(socket: IWebSocket) { + public readonly socket: IWebSocket; + public readonly debugLabel: string; + + public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void { + if (typeof this.socket.traceSocketEvent === 'function') { + this.socket.traceSocketEvent(type, data); + } else { + SocketDiagnostics.traceSocketEvent(this.socket, this.debugLabel, type, data); + } + } + + constructor(socket: IWebSocket, debugLabel: string) { this.socket = socket; + this.debugLabel = debugLabel; } public dispose(): void { @@ -239,13 +272,13 @@ export class BrowserSocketFactory implements ISocketFactory { this._webSocketFactory = webSocketFactory || defaultWebSocketFactory; } - connect(host: string, port: number, query: string, callback: IConnectCallback): void { + connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void { const webSocketSchema = (/^https:/.test(window.location.href) ? 'wss' : 'ws'); - const socket = this._webSocketFactory.create(`${webSocketSchema}://${/:/.test(host) ? `[${host}]` : host}:${port}/?${query}&skipWebSocketFrames=false`); + const socket = this._webSocketFactory.create(`${webSocketSchema}://${/:/.test(host) ? `[${host}]` : host}:${port}/?${query}&skipWebSocketFrames=false`, debugLabel); const errorListener = socket.onError((err) => callback(err, undefined)); socket.onOpen(() => { errorListener.dispose(); - callback(undefined, new BrowserSocket(socket)); + callback(undefined, new BrowserSocket(socket, debugLabel)); }); } } diff --git a/src/vs/platform/remote/common/remoteAgentConnection.ts b/src/vs/platform/remote/common/remoteAgentConnection.ts index 3af66e0a9bd..b555abd3694 100644 --- a/src/vs/platform/remote/common/remoteAgentConnection.ts +++ b/src/vs/platform/remote/common/remoteAgentConnection.ts @@ -85,7 +85,7 @@ export interface IConnectCallback { } export interface ISocketFactory { - connect(host: string, port: number, query: string, callback: IConnectCallback): void; + connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void; } function createTimeoutCancellation(millis: number): CancellationToken { @@ -188,9 +188,9 @@ function readOneControlMessage(protocol: PersistentProtocol, timeoutCancellat return result.promise; } -function createSocket(logService: ILogService, socketFactory: ISocketFactory, host: string, port: number, query: string, timeoutCancellationToken: CancellationToken): Promise { +function createSocket(logService: ILogService, socketFactory: ISocketFactory, host: string, port: number, query: string, debugLabel: string, timeoutCancellationToken: CancellationToken): Promise { const result = new PromiseWithTimeout(timeoutCancellationToken); - socketFactory.connect(host, port, query, (err: any, socket: ISocket | undefined) => { + socketFactory.connect(host, port, query, debugLabel, (err: any, socket: ISocket | undefined) => { if (result.didTimeout) { if (err) { logService.error(err); @@ -231,7 +231,7 @@ async function connectToRemoteExtensionHostAgent(options: ISimpleConnectionOptio let socket: ISocket; try { - socket = await createSocket(options.logService, options.socketFactory, options.host, options.port, `reconnectionToken=${options.reconnectionToken}&reconnection=${options.reconnectionProtocol ? 'true' : 'false'}`, timeoutCancellationToken); + socket = await createSocket(options.logService, options.socketFactory, options.host, options.port, `reconnectionToken=${options.reconnectionToken}&reconnection=${options.reconnectionProtocol ? 'true' : 'false'}`, `renderer-${connectionTypeToString(connectionType)}-${options.reconnectionToken}`, timeoutCancellationToken); } catch (error) { options.logService.error(`${logPrefix} socketFactory.connect() failed or timed out. Error:`); options.logService.error(error); diff --git a/src/vs/platform/remote/node/nodeSocketFactory.ts b/src/vs/platform/remote/node/nodeSocketFactory.ts index a0359ed76f6..4f5a2c938a1 100644 --- a/src/vs/platform/remote/node/nodeSocketFactory.ts +++ b/src/vs/platform/remote/node/nodeSocketFactory.ts @@ -8,7 +8,7 @@ import { NodeSocket } from 'vs/base/parts/ipc/node/ipc.net'; import { IConnectCallback, ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection'; export const nodeSocketFactory = new class implements ISocketFactory { - connect(host: string, port: number, query: string, callback: IConnectCallback): void { + connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void { const errorListener = (err: any) => callback(err, undefined); const socket = net.createConnection({ host: host, port: port }, () => { @@ -34,7 +34,7 @@ export const nodeSocketFactory = new class implements ISocketFactory { if (strData.indexOf('\r\n\r\n') >= 0) { // headers received OK socket.off('data', onData); - callback(undefined, new NodeSocket(socket)); + callback(undefined, new NodeSocket(socket, debugLabel)); } }; socket.on('data', onData); diff --git a/src/vs/server/remoteExtensionHostAgentServer.ts b/src/vs/server/remoteExtensionHostAgentServer.ts index 60f925fe191..b3a66234045 100644 --- a/src/vs/server/remoteExtensionHostAgentServer.ts +++ b/src/vs/server/remoteExtensionHostAgentServer.ts @@ -497,9 +497,9 @@ export class RemoteExtensionHostAgentServer extends Disposable { // Finally! if (skipWebSocketFrames) { - this._handleWebSocketConnection(new NodeSocket(socket), isReconnection, reconnectionToken); + this._handleWebSocketConnection(new NodeSocket(socket, `server-connection-${reconnectionToken}`), isReconnection, reconnectionToken); } else { - this._handleWebSocketConnection(new WebSocketNodeSocket(new NodeSocket(socket), permessageDeflate, null, true), isReconnection, reconnectionToken); + this._handleWebSocketConnection(new WebSocketNodeSocket(new NodeSocket(socket, `server-connection-${reconnectionToken}`), permessageDeflate, null, true), isReconnection, reconnectionToken); } } diff --git a/src/vs/workbench/services/extensions/electron-browser/localProcessExtensionHost.ts b/src/vs/workbench/services/extensions/electron-browser/localProcessExtensionHost.ts index 7c141c4a42d..35dae9ea876 100644 --- a/src/vs/workbench/services/extensions/electron-browser/localProcessExtensionHost.ts +++ b/src/vs/workbench/services/extensions/electron-browser/localProcessExtensionHost.ts @@ -450,7 +450,7 @@ export class LocalProcessExtensionHost implements IExtensionHost { // using a buffered message protocol here because between now // and the first time a `then` executes some messages might be lost // unless we immediately register a listener for `onMessage`. - resolve(new PersistentProtocol(new NodeSocket(this._extensionHostConnection))); + resolve(new PersistentProtocol(new NodeSocket(this._extensionHostConnection, 'renderer-exthost'))); }); // Now that the named pipe listener is installed, start the ext host process diff --git a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts index 70b86fcd4d6..36dc1a64166 100644 --- a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts +++ b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts @@ -123,10 +123,10 @@ function _createExtHostProtocol(): Promise { const initialDataChunk = VSBuffer.wrap(Buffer.from(msg.initialDataChunk, 'base64')); let socket: NodeSocket | WebSocketNodeSocket; if (msg.skipWebSocketFrames) { - socket = new NodeSocket(handle); + socket = new NodeSocket(handle, 'extHost-socket'); } else { const inflateBytes = VSBuffer.wrap(Buffer.from(msg.inflateBytes, 'base64')); - socket = new WebSocketNodeSocket(new NodeSocket(handle), msg.permessageDeflate, inflateBytes, false); + socket = new WebSocketNodeSocket(new NodeSocket(handle, 'extHost-socket'), msg.permessageDeflate, inflateBytes, false); } if (protocol) { // reconnection case @@ -174,7 +174,7 @@ function _createExtHostProtocol(): Promise { const socket = net.createConnection(pipeName, () => { socket.removeListener('error', reject); - resolve(new PersistentProtocol(new NodeSocket(socket))); + resolve(new PersistentProtocol(new NodeSocket(socket, 'extHost-renderer'))); }); socket.once('error', reject); From 228ac5b3c1a698e5b46e4b75abb162c6568334a9 Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 10:25:00 +0100 Subject: [PATCH 2/9] Add `traceSocketEvent` for fake socket --- src/vs/base/parts/ipc/test/node/ipc.net.test.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts index ae27949be68..4345fcccaca 100644 --- a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts +++ b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts @@ -11,7 +11,7 @@ import { Barrier, timeout } from 'vs/base/common/async'; import { VSBuffer } from 'vs/base/common/buffer'; import { Emitter } from 'vs/base/common/event'; import { Disposable, DisposableStore } from 'vs/base/common/lifecycle'; -import { ILoadEstimator, PersistentProtocol, Protocol, ProtocolConstants, SocketCloseEvent } from 'vs/base/parts/ipc/common/ipc.net'; +import { ILoadEstimator, PersistentProtocol, Protocol, ProtocolConstants, SocketCloseEvent, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net'; import { createRandomIPCHandle, createStaticIPCHandle, NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net'; import { runWithFakedTimers } from 'vs/base/test/common/timeTravelScheduler'; import { ensureNoDisposablesAreLeakedInTestSuite } from 'vs/base/test/common/utils'; @@ -432,6 +432,9 @@ suite('WebSocketNodeSocket', () => { private readonly _onClose = new Emitter(); public readonly onClose = this._onClose.event; + public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void { + } + constructor() { super(); } From babe6a6a94b81821685403404251fef93d951fb9 Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 11:47:55 +0100 Subject: [PATCH 3/9] Extract `ZlibInflateStream` --- src/vs/base/common/buffer.ts | 14 +++ src/vs/base/parts/ipc/common/ipc.net.ts | 2 +- src/vs/base/parts/ipc/node/ipc.net.ts | 127 ++++++++++++++++-------- 3 files changed, 98 insertions(+), 45 deletions(-) diff --git a/src/vs/base/common/buffer.ts b/src/vs/base/common/buffer.ts index 0e28b41904b..45649713090 100644 --- a/src/vs/base/common/buffer.ts +++ b/src/vs/base/common/buffer.ts @@ -43,6 +43,14 @@ export class VSBuffer { } } + static fromByteArray(source: number[]): VSBuffer { + const result = VSBuffer.alloc(source.length); + for (let i = 0, len = source.length; i < len; i++) { + result.buffer[i] = source[i]; + } + return result; + } + static concat(buffers: VSBuffer[], totalLength?: number): VSBuffer { if (typeof totalLength === 'undefined') { totalLength = 0; @@ -70,6 +78,12 @@ export class VSBuffer { this.byteLength = this.buffer.byteLength; } + clone(): VSBuffer { + const result = VSBuffer.alloc(this.byteLength); + result.set(this); + return result; + } + toString(): string { if (hasBuffer) { return this.buffer.toString(); diff --git a/src/vs/base/parts/ipc/common/ipc.net.ts b/src/vs/base/parts/ipc/common/ipc.net.ts index 5075a80ec4f..5ff8d7bcf40 100644 --- a/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/src/vs/base/parts/ipc/common/ipc.net.ts @@ -25,7 +25,7 @@ export const enum SocketDiagnosticsEventType { zlibInflateError = 'zlibInflateError', zlibInflateData = 'zlibInflateData', - zlibInflateWriteInitial = 'zlibInflateWriteInitial', + zlibInflateInitialWrite = 'zlibInflateInitialWrite', zlibInflateInitialFlushFired = 'zlibInflateInitialFlushFired', zlibInflateWrite = 'zlibInflateWrite', zlibInflateFlushFired = 'zlibInflateFlushFired', diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index 32c26428358..d766f3f125c 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -174,10 +174,14 @@ const enum ReadState { Fin = 4 } +interface ISocketTracer { + traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void +} + /** * See https://tools.ietf.org/html/rfc6455#section-5.2 */ -export class WebSocketNodeSocket extends Disposable implements ISocket { +export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketTracer { public readonly socket: NodeSocket; public readonly permessageDeflate: boolean; @@ -185,13 +189,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { private _totalIncomingDataBytes: number; private _totalOutgoingWireBytes: number; private _totalOutgoingDataBytes: number; - private readonly _zlibInflate: zlib.InflateRaw | null; + private readonly _zlibInflateStream: ZlibInflateStream | null; private readonly _zlibDeflate: zlib.DeflateRaw | null; private _zlibDeflateFlushWaitingCount: number; private readonly _onDidZlibFlush = this._register(new Emitter()); - private readonly _recordInflateBytes: boolean; - private readonly _recordedInflateBytes: Buffer[] = []; - private readonly _pendingInflateData: Buffer[] = []; private readonly _pendingDeflateData: Buffer[] = []; private readonly _incomingData: ChunkStream; private readonly _onData = this._register(new Emitter()); @@ -224,8 +225,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } public get recordedInflateBytes(): VSBuffer { - if (this._recordInflateBytes) { - return VSBuffer.wrap(Buffer.concat(this._recordedInflateBytes)); + if (this._zlibInflateStream) { + return this._zlibInflateStream.recordedInflateBytes; } return VSBuffer.alloc(0); } @@ -255,16 +256,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { this._totalOutgoingWireBytes = 0; this._totalOutgoingDataBytes = 0; this.permessageDeflate = permessageDeflate; - this._recordInflateBytes = recordInflateBytes; if (permessageDeflate) { // See https://tools.ietf.org/html/rfc7692#page-16 // To simplify our logic, we don't negotiate the window size // and simply dedicate (2^15) / 32kb per web socket - this._zlibInflate = zlib.createInflateRaw({ + this._zlibInflateStream = new ZlibInflateStream(this, recordInflateBytes, inflateBytes, { windowBits: 15 }); - this._zlibInflate.on('error', (err) => { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (err)?.code }); + this._register(this._zlibInflateStream.onError((err) => { // zlib errors are fatal, since we have no idea how to recover console.error(err); onUnexpectedError(err); @@ -273,19 +272,11 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { hadError: true, error: err }); - }); - this._zlibInflate.on('data', (data: Buffer) => { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data); - this._pendingInflateData.push(data); - }); - if (inflateBytes) { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWriteInitial, inflateBytes.buffer); - this._zlibInflate.write(inflateBytes.buffer); - this._zlibInflate.flush(() => { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired); - this._pendingInflateData.length = 0; - }); - } + })); + this._register(this._zlibInflateStream.onData((data) => { + this._totalIncomingDataBytes += data.byteLength; + this._onData.fire(data); + })); this._zlibDeflate = zlib.createDeflateRaw({ windowBits: 15 @@ -306,7 +297,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { this._pendingDeflateData.push(data); }); } else { - this._zlibInflate = null; + this._zlibInflateStream = null; this._zlibDeflate = null; } this._zlibDeflateFlushWaitingCount = 0; @@ -505,34 +496,19 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { this._state.readLen = Constants.MinHeaderByteSize; this._state.mask = 0; - if (this._zlibInflate && this._state.compressed) { + if (this._zlibInflateStream && this._state.compressed) { // See https://datatracker.ietf.org/doc/html/rfc7692#section-9.2 // Even if permessageDeflate is negotiated, it is possible // that the other side might decide to send uncompressed messages // So only decompress messages that have the RSV 1 bit set // // See https://tools.ietf.org/html/rfc7692#section-7.2.2 - if (this._recordInflateBytes) { - this._recordedInflateBytes.push(Buffer.from(body.buffer)); - } - this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, body.buffer); - this._zlibInflate.write(body.buffer); + this._zlibInflateStream.write(body); if (this._state.fin) { - if (this._recordInflateBytes) { - this._recordedInflateBytes.push(Buffer.from([0x00, 0x00, 0xff, 0xff])); - } - const buff = Buffer.from([0x00, 0x00, 0xff, 0xff]); - this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buff); - this._zlibInflate.write(buff); + this._zlibInflateStream.write(VSBuffer.fromByteArray([0x00, 0x00, 0xff, 0xff])); } - this._zlibInflate.flush(() => { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired); - const data = Buffer.concat(this._pendingInflateData); - this._pendingInflateData.length = 0; - this._totalIncomingDataBytes += data.length; - this._onData.fire(VSBuffer.wrap(data)); - }); + this._zlibInflateStream.flush(); } else { this._totalIncomingDataBytes += body.byteLength; this._onData.fire(body); @@ -551,6 +527,69 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } } +class ZlibInflateStream extends Disposable { + + private readonly _onError = this._register(new Emitter()); + public readonly onError = this._onError.event; + + private readonly _onData = this._register(new Emitter()); + public readonly onData = this._onData.event; + + private readonly _zlibInflate: zlib.InflateRaw; + private readonly _recordedInflateBytes: VSBuffer[] = []; + private readonly _pendingInflateData: VSBuffer[] = []; + + public get recordedInflateBytes(): VSBuffer { + if (this._recordInflateBytes) { + return VSBuffer.concat(this._recordedInflateBytes); + } + return VSBuffer.alloc(0); + } + + constructor( + private readonly _tracer: ISocketTracer, + private readonly _recordInflateBytes: boolean, + inflateBytes: VSBuffer | null, + options: zlib.ZlibOptions + ) { + super(); + this._zlibInflate = zlib.createInflateRaw(options); + this._zlibInflate.on('error', (err) => { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (err)?.code }); + this._onError.fire(err); + }); + this._zlibInflate.on('data', (data: Buffer) => { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data); + this._pendingInflateData.push(VSBuffer.wrap(data)); + }); + if (inflateBytes) { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialWrite, inflateBytes.buffer); + this._zlibInflate.write(inflateBytes.buffer); + this._zlibInflate.flush(() => { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired); + this._pendingInflateData.length = 0; + }); + } + } + + public write(buffer: VSBuffer): void { + if (this._recordInflateBytes) { + this._recordedInflateBytes.push(buffer.clone()); + } + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buffer); + this._zlibInflate.write(buffer.buffer); + } + + public flush(): void { + this._zlibInflate.flush(() => { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired); + const data = VSBuffer.concat(this._pendingInflateData); + this._pendingInflateData.length = 0; + this._onData.fire(data); + }); + } +} + function unmask(buffer: VSBuffer, mask: number): void { if (mask === 0) { return; From 2b6fd1df46e2e3e787c916b2e9beef6e388ffc12 Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 15:27:37 +0100 Subject: [PATCH 4/9] Extract `ZlibDeflateStream` --- src/vs/base/parts/ipc/node/ipc.net.ts | 99 ++++++++++++++++++--------- 1 file changed, 65 insertions(+), 34 deletions(-) diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index d766f3f125c..0f4cac43045 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -185,15 +185,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT public readonly socket: NodeSocket; public readonly permessageDeflate: boolean; - private _totalIncomingWireBytes: number; - private _totalIncomingDataBytes: number; - private _totalOutgoingWireBytes: number; - private _totalOutgoingDataBytes: number; + private _totalIncomingWireBytes: number = 0; + private _totalIncomingDataBytes: number = 0; + private _totalOutgoingWireBytes: number = 0; + private _totalOutgoingDataBytes: number = 0; private readonly _zlibInflateStream: ZlibInflateStream | null; - private readonly _zlibDeflate: zlib.DeflateRaw | null; + private readonly _zlibDeflateStream: ZlibDeflateStream | null; private _zlibDeflateFlushWaitingCount: number; private readonly _onDidZlibFlush = this._register(new Emitter()); - private readonly _pendingDeflateData: Buffer[] = []; private readonly _incomingData: ChunkStream; private readonly _onData = this._register(new Emitter()); private readonly _onClose = this._register(new Emitter()); @@ -251,18 +250,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT super(); this.socket = socket; this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'WebSocketNodeSocket', permessageDeflate, inflateBytesLength: inflateBytes?.byteLength || 0, recordInflateBytes }); - this._totalIncomingWireBytes = 0; - this._totalIncomingDataBytes = 0; - this._totalOutgoingWireBytes = 0; - this._totalOutgoingDataBytes = 0; this.permessageDeflate = permessageDeflate; if (permessageDeflate) { // See https://tools.ietf.org/html/rfc7692#page-16 // To simplify our logic, we don't negotiate the window size // and simply dedicate (2^15) / 32kb per web socket - this._zlibInflateStream = new ZlibInflateStream(this, recordInflateBytes, inflateBytes, { + this._zlibInflateStream = this._register(new ZlibInflateStream(this, recordInflateBytes, inflateBytes, { windowBits: 15 - }); + })); this._register(this._zlibInflateStream.onError((err) => { // zlib errors are fatal, since we have no idea how to recover console.error(err); @@ -278,11 +273,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT this._onData.fire(data); })); - this._zlibDeflate = zlib.createDeflateRaw({ + this._zlibDeflateStream = this._register(new ZlibDeflateStream(this, { windowBits: 15 - }); - this._zlibDeflate.on('error', (err) => { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (err)?.code }); + })); + this._register(this._zlibDeflateStream.onError((err) => { // zlib errors are fatal, since we have no idea how to recover console.error(err); onUnexpectedError(err); @@ -291,14 +285,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT hadError: true, error: err }); - }); - this._zlibDeflate.on('data', (data: Buffer) => { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data); - this._pendingDeflateData.push(data); - }); + })); } else { this._zlibInflateStream = null; - this._zlibDeflate = null; + this._zlibDeflateStream = null; } this._zlibDeflateFlushWaitingCount = 0; this._incomingData = new ChunkStream(); @@ -333,24 +323,17 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT public write(buffer: VSBuffer): void { this._totalOutgoingDataBytes += buffer.byteLength; - if (this._zlibDeflate) { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer); - this._zlibDeflate.write(buffer.buffer); + if (this._zlibDeflateStream) { + + this._zlibDeflateStream.write(buffer); this._zlibDeflateFlushWaitingCount++; - // See https://zlib.net/manual.html#Constants - this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => { - this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired); + this._zlibDeflateStream.flush((data) => { this._zlibDeflateFlushWaitingCount--; - let data = Buffer.concat(this._pendingDeflateData); - this._pendingDeflateData.length = 0; - - // See https://tools.ietf.org/html/rfc7692#section-7.2.1 - data = data.slice(0, data.length - 4); if (!this._isEnded) { // Avoid ERR_STREAM_WRITE_AFTER_END - this._write(VSBuffer.wrap(data), true); + this._write(data, true); } if (this._zlibDeflateFlushWaitingCount === 0) { @@ -590,6 +573,54 @@ class ZlibInflateStream extends Disposable { } } +class ZlibDeflateStream extends Disposable { + + private readonly _onError = this._register(new Emitter()); + public readonly onError = this._onError.event; + + private readonly _zlibDeflate: zlib.DeflateRaw; + private readonly _pendingDeflateData: VSBuffer[] = []; + + constructor( + private readonly _tracer: ISocketTracer, + options: zlib.ZlibOptions + ) { + super(); + + this._zlibDeflate = zlib.createDeflateRaw({ + windowBits: 15 + }); + this._zlibDeflate.on('error', (err) => { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (err)?.code }); + this._onError.fire(err); + }); + this._zlibDeflate.on('data', (data: Buffer) => { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data); + this._pendingDeflateData.push(VSBuffer.wrap(data)); + }); + } + + public write(buffer: VSBuffer): void { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer); + this._zlibDeflate.write(buffer.buffer); + } + + public flush(callback: (data: VSBuffer) => void): void { + // See https://zlib.net/manual.html#Constants + this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => { + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired); + + let data = VSBuffer.concat(this._pendingDeflateData); + this._pendingDeflateData.length = 0; + + // See https://tools.ietf.org/html/rfc7692#section-7.2.1 + data = data.slice(0, data.byteLength - 4); + + callback(data); + }); + } +} + function unmask(buffer: VSBuffer, mask: number): void { if (mask === 0) { return; From a3dce400d6ede8d0c09b2893a09784f51251e26a Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 15:34:36 +0100 Subject: [PATCH 5/9] Move draining logic to `ZlibDeflateStream` --- src/vs/base/parts/ipc/node/ipc.net.ts | 42 +++++++++++++++++---------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index 0f4cac43045..4b3ae6bdef9 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -191,8 +191,6 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT private _totalOutgoingDataBytes: number = 0; private readonly _zlibInflateStream: ZlibInflateStream | null; private readonly _zlibDeflateStream: ZlibDeflateStream | null; - private _zlibDeflateFlushWaitingCount: number; - private readonly _onDidZlibFlush = this._register(new Emitter()); private readonly _incomingData: ChunkStream; private readonly _onData = this._register(new Emitter()); private readonly _onClose = this._register(new Emitter()); @@ -290,16 +288,15 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT this._zlibInflateStream = null; this._zlibDeflateStream = null; } - this._zlibDeflateFlushWaitingCount = 0; this._incomingData = new ChunkStream(); this._register(this.socket.onData(data => this._acceptChunk(data))); this._register(this.socket.onClose((e) => this._onClose.fire(e))); } public override dispose(): void { - if (this._zlibDeflateFlushWaitingCount > 0) { + if (this._zlibDeflateStream && this._zlibDeflateStream.needsDraining()) { // Wait for any outstanding writes to finish before disposing - this._register(this._onDidZlibFlush.event(() => { + this._register(this._zlibDeflateStream.onDidDrain(() => { this.dispose(); })); } else { @@ -324,21 +321,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT this._totalOutgoingDataBytes += buffer.byteLength; if (this._zlibDeflateStream) { - this._zlibDeflateStream.write(buffer); - - this._zlibDeflateFlushWaitingCount++; this._zlibDeflateStream.flush((data) => { - this._zlibDeflateFlushWaitingCount--; - if (!this._isEnded) { // Avoid ERR_STREAM_WRITE_AFTER_END this._write(data, true); } - - if (this._zlibDeflateFlushWaitingCount === 0) { - this._onDidZlibFlush.fire(); - } }); } else { this._write(buffer, false); @@ -502,8 +490,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT public async drain(): Promise { this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainBegin); - if (this._zlibDeflateFlushWaitingCount > 0) { - await Event.toPromise(this._onDidZlibFlush.event); + if (this._zlibDeflateStream) { + await this._zlibDeflateStream.drain(); } await this.socket.drain(); this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainEnd); @@ -578,8 +566,12 @@ class ZlibDeflateStream extends Disposable { private readonly _onError = this._register(new Emitter()); public readonly onError = this._onError.event; + private readonly _onDidDrain = this._register(new Emitter()); + public readonly onDidDrain = this._onDidDrain.event; + private readonly _zlibDeflate: zlib.DeflateRaw; private readonly _pendingDeflateData: VSBuffer[] = []; + private _flushWaitingCount: number = 0; constructor( private readonly _tracer: ISocketTracer, @@ -606,8 +598,12 @@ class ZlibDeflateStream extends Disposable { } public flush(callback: (data: VSBuffer) => void): void { + this._flushWaitingCount++; + // See https://zlib.net/manual.html#Constants this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => { + this._flushWaitingCount--; + this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired); let data = VSBuffer.concat(this._pendingDeflateData); @@ -617,8 +613,22 @@ class ZlibDeflateStream extends Disposable { data = data.slice(0, data.byteLength - 4); callback(data); + + if (this._flushWaitingCount === 0) { + this._onDidDrain.fire(); + } }); } + + public needsDraining(): boolean { + return (this._flushWaitingCount > 0); + } + + public async drain(): Promise { + if (this._flushWaitingCount > 0) { + await Event.toPromise(this.onDidDrain); + } + } } function unmask(buffer: VSBuffer, mask: number): void { From 7b3474abff37d9ebbe11e48ed8aa38bd077bdf2e Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 16:44:30 +0100 Subject: [PATCH 6/9] Make sure websocket frames are processed in order --- src/vs/base/parts/ipc/node/ipc.net.ts | 290 ++++++++++-------- .../base/parts/ipc/test/node/ipc.net.test.ts | 9 + 2 files changed, 171 insertions(+), 128 deletions(-) diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index 4b3ae6bdef9..af596a0fed6 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -184,13 +184,7 @@ interface ISocketTracer { export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketTracer { public readonly socket: NodeSocket; - public readonly permessageDeflate: boolean; - private _totalIncomingWireBytes: number = 0; - private _totalIncomingDataBytes: number = 0; - private _totalOutgoingWireBytes: number = 0; - private _totalOutgoingDataBytes: number = 0; - private readonly _zlibInflateStream: ZlibInflateStream | null; - private readonly _zlibDeflateStream: ZlibDeflateStream | null; + private readonly _flowManager: WebSocketFlowManager; private readonly _incomingData: ChunkStream; private readonly _onData = this._register(new Emitter()); private readonly _onClose = this._register(new Emitter()); @@ -205,27 +199,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT mask: 0 }; - public get totalIncomingWireBytes(): number { - return this._totalIncomingWireBytes; - } - - public get totalIncomingDataBytes(): number { - return this._totalIncomingDataBytes; - } - - public get totalOutgoingWireBytes(): number { - return this._totalOutgoingWireBytes; - } - - public get totalOutgoingDataBytes(): number { - return this._totalOutgoingDataBytes; + public get permessageDeflate(): boolean { + return this._flowManager.permessageDeflate; } public get recordedInflateBytes(): VSBuffer { - if (this._zlibInflateStream) { - return this._zlibInflateStream.recordedInflateBytes; - } - return VSBuffer.alloc(0); + return this._flowManager.recordedInflateBytes; } public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void { @@ -248,55 +227,33 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT super(); this.socket = socket; this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'WebSocketNodeSocket', permessageDeflate, inflateBytesLength: inflateBytes?.byteLength || 0, recordInflateBytes }); - this.permessageDeflate = permessageDeflate; - if (permessageDeflate) { - // See https://tools.ietf.org/html/rfc7692#page-16 - // To simplify our logic, we don't negotiate the window size - // and simply dedicate (2^15) / 32kb per web socket - this._zlibInflateStream = this._register(new ZlibInflateStream(this, recordInflateBytes, inflateBytes, { - windowBits: 15 - })); - this._register(this._zlibInflateStream.onError((err) => { - // zlib errors are fatal, since we have no idea how to recover - console.error(err); - onUnexpectedError(err); - this._onClose.fire({ - type: SocketCloseEventType.NodeSocketCloseEvent, - hadError: true, - error: err - }); - })); - this._register(this._zlibInflateStream.onData((data) => { - this._totalIncomingDataBytes += data.byteLength; - this._onData.fire(data); - })); - - this._zlibDeflateStream = this._register(new ZlibDeflateStream(this, { - windowBits: 15 - })); - this._register(this._zlibDeflateStream.onError((err) => { - // zlib errors are fatal, since we have no idea how to recover - console.error(err); - onUnexpectedError(err); - this._onClose.fire({ - type: SocketCloseEventType.NodeSocketCloseEvent, - hadError: true, - error: err - }); - })); - } else { - this._zlibInflateStream = null; - this._zlibDeflateStream = null; - } + this._flowManager = this._register(new WebSocketFlowManager( + this, + permessageDeflate, + inflateBytes, + recordInflateBytes, + this._onData, + (data, compressed) => this._write(data, compressed) + )); + this._register(this._flowManager.onError((err) => { + // zlib errors are fatal, since we have no idea how to recover + console.error(err); + onUnexpectedError(err); + this._onClose.fire({ + type: SocketCloseEventType.NodeSocketCloseEvent, + hadError: true, + error: err + }); + })); this._incomingData = new ChunkStream(); this._register(this.socket.onData(data => this._acceptChunk(data))); this._register(this.socket.onClose((e) => this._onClose.fire(e))); } public override dispose(): void { - if (this._zlibDeflateStream && this._zlibDeflateStream.needsDraining()) { + if (this._flowManager.isProcessingWriteQueue()) { // Wait for any outstanding writes to finish before disposing - this._register(this._zlibDeflateStream.onDidDrain(() => { + this._register(this._flowManager.onDidFinishProcessingWriteQueue(() => { this.dispose(); })); } else { @@ -318,22 +275,15 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT } public write(buffer: VSBuffer): void { - this._totalOutgoingDataBytes += buffer.byteLength; - - if (this._zlibDeflateStream) { - this._zlibDeflateStream.write(buffer); - this._zlibDeflateStream.flush((data) => { - if (!this._isEnded) { - // Avoid ERR_STREAM_WRITE_AFTER_END - this._write(data, true); - } - }); - } else { - this._write(buffer, false); - } + this._flowManager.writeMessage(buffer); } private _write(buffer: VSBuffer, compressed: boolean): void { + if (this._isEnded) { + // Avoid ERR_STREAM_WRITE_AFTER_END + return; + } + this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketWrite, buffer); let headerLen = Constants.MinHeaderByteSize; if (buffer.byteLength < 126) { @@ -371,7 +321,6 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT header.writeUInt8((buffer.byteLength >>> 0) & 0b11111111, ++offset); } - this._totalOutgoingWireBytes += header.byteLength + buffer.byteLength; this.socket.write(VSBuffer.concat([header, buffer])); } @@ -384,7 +333,6 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT if (data.byteLength === 0) { return; } - this._totalIncomingWireBytes += data.byteLength; this._incomingData.acceptChunk(data); @@ -467,45 +415,153 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT this._state.readLen = Constants.MinHeaderByteSize; this._state.mask = 0; - if (this._zlibInflateStream && this._state.compressed) { - // See https://datatracker.ietf.org/doc/html/rfc7692#section-9.2 - // Even if permessageDeflate is negotiated, it is possible - // that the other side might decide to send uncompressed messages - // So only decompress messages that have the RSV 1 bit set - // - // See https://tools.ietf.org/html/rfc7692#section-7.2.2 - - this._zlibInflateStream.write(body); - if (this._state.fin) { - this._zlibInflateStream.write(VSBuffer.fromByteArray([0x00, 0x00, 0xff, 0xff])); - } - this._zlibInflateStream.flush(); - } else { - this._totalIncomingDataBytes += body.byteLength; - this._onData.fire(body); - } + this._flowManager.acceptFrame(body, this._state.compressed, !!this._state.fin); } } } public async drain(): Promise { this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainBegin); - if (this._zlibDeflateStream) { - await this._zlibDeflateStream.drain(); + if (this._flowManager.isProcessingWriteQueue()) { + await Event.toPromise(this._flowManager.onDidFinishProcessingWriteQueue); } await this.socket.drain(); this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainEnd); } } +class WebSocketFlowManager extends Disposable { + + private readonly _onError = this._register(new Emitter()); + public readonly onError = this._onError.event; + + private readonly _zlibInflateStream: ZlibInflateStream | null; + private readonly _zlibDeflateStream: ZlibDeflateStream | null; + private readonly _writeQueue: VSBuffer[] = []; + private readonly _readQueue: { data: VSBuffer, isCompressed: boolean, isLastFrameOfMessage: boolean }[] = []; + + private readonly _onDidFinishProcessingWriteQueue = this._register(new Emitter()); + public readonly onDidFinishProcessingWriteQueue = this._onDidFinishProcessingWriteQueue.event; + + public get permessageDeflate(): boolean { + return Boolean(this._zlibInflateStream && this._zlibDeflateStream); + } + + public get recordedInflateBytes(): VSBuffer { + if (this._zlibInflateStream) { + return this._zlibInflateStream.recordedInflateBytes; + } + return VSBuffer.alloc(0); + } + + constructor( + private readonly _tracer: ISocketTracer, + permessageDeflate: boolean, + inflateBytes: VSBuffer | null, + recordInflateBytes: boolean, + private readonly _onData: Emitter, + private readonly _writeFn: (data: VSBuffer, compressed: boolean) => void + ) { + super(); + if (permessageDeflate) { + // See https://tools.ietf.org/html/rfc7692#page-16 + // To simplify our logic, we don't negotiate the window size + // and simply dedicate (2^15) / 32kb per web socket + this._zlibInflateStream = this._register(new ZlibInflateStream(this._tracer, recordInflateBytes, inflateBytes, { windowBits: 15 })); + this._zlibDeflateStream = this._register(new ZlibDeflateStream(this._tracer, { windowBits: 15 })); + this._register(this._zlibInflateStream.onError((err) => this._onError.fire(err))); + this._register(this._zlibDeflateStream.onError((err) => this._onError.fire(err))); + } else { + this._zlibInflateStream = null; + this._zlibDeflateStream = null; + } + } + + public writeMessage(message: VSBuffer): void { + this._writeQueue.push(message); + this._processWriteQueue(); + } + + private _isProcessingWriteQueue = false; + private async _processWriteQueue(): Promise { + if (this._isProcessingWriteQueue) { + return; + } + this._isProcessingWriteQueue = true; + while (this._writeQueue.length > 0) { + const message = this._writeQueue.shift()!; + if (this._zlibDeflateStream) { + const data = await this._deflateMessage(this._zlibDeflateStream, message); + this._writeFn(data, true); + } else { + this._writeFn(message, false); + } + } + this._isProcessingWriteQueue = false; + this._onDidFinishProcessingWriteQueue.fire(); + } + + public isProcessingWriteQueue(): boolean { + return (this._isProcessingWriteQueue); + } + + /** + * Subsequent calls should wait for the previous `_deflateBuffer` call to complete. + */ + private _deflateMessage(zlibDeflateStream: ZlibDeflateStream, buffer: VSBuffer): Promise { + return new Promise((resolve, reject) => { + zlibDeflateStream.write(buffer); + zlibDeflateStream.flush(data => resolve(data)); + }); + } + + public acceptFrame(data: VSBuffer, isCompressed: boolean, isLastFrameOfMessage: boolean): void { + this._readQueue.push({ data, isCompressed, isLastFrameOfMessage }); + this._processReadQueue(); + } + + private _isProcessingReadQueue = false; + private async _processReadQueue(): Promise { + if (this._isProcessingReadQueue) { + return; + } + this._isProcessingReadQueue = true; + while (this._readQueue.length > 0) { + const frameInfo = this._readQueue.shift()!; + if (this._zlibInflateStream && frameInfo.isCompressed) { + // See https://datatracker.ietf.org/doc/html/rfc7692#section-9.2 + // Even if permessageDeflate is negotiated, it is possible + // that the other side might decide to send uncompressed messages + // So only decompress messages that have the RSV 1 bit set + const data = await this._inflateFrame(this._zlibInflateStream, frameInfo.data, frameInfo.isLastFrameOfMessage); + this._onData.fire(data); + } else { + this._onData.fire(frameInfo.data); + } + } + this._isProcessingReadQueue = false; + } + + /** + * Subsequent calls should wait for the previous `transformRead` call to complete. + */ + private _inflateFrame(zlibInflateStream: ZlibInflateStream, buffer: VSBuffer, isLastFrameOfMessage: boolean): Promise { + return new Promise((resolve, reject) => { + // See https://tools.ietf.org/html/rfc7692#section-7.2.2 + zlibInflateStream.write(buffer); + if (isLastFrameOfMessage) { + zlibInflateStream.write(VSBuffer.fromByteArray([0x00, 0x00, 0xff, 0xff])); + } + zlibInflateStream.flush(data => resolve(data)); + }); + } +} + class ZlibInflateStream extends Disposable { private readonly _onError = this._register(new Emitter()); public readonly onError = this._onError.event; - private readonly _onData = this._register(new Emitter()); - public readonly onData = this._onData.event; - private readonly _zlibInflate: zlib.InflateRaw; private readonly _recordedInflateBytes: VSBuffer[] = []; private readonly _pendingInflateData: VSBuffer[] = []; @@ -551,12 +607,12 @@ class ZlibInflateStream extends Disposable { this._zlibInflate.write(buffer.buffer); } - public flush(): void { + public flush(callback: (data: VSBuffer) => void): void { this._zlibInflate.flush(() => { this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired); const data = VSBuffer.concat(this._pendingInflateData); this._pendingInflateData.length = 0; - this._onData.fire(data); + callback(data); }); } } @@ -566,12 +622,8 @@ class ZlibDeflateStream extends Disposable { private readonly _onError = this._register(new Emitter()); public readonly onError = this._onError.event; - private readonly _onDidDrain = this._register(new Emitter()); - public readonly onDidDrain = this._onDidDrain.event; - private readonly _zlibDeflate: zlib.DeflateRaw; private readonly _pendingDeflateData: VSBuffer[] = []; - private _flushWaitingCount: number = 0; constructor( private readonly _tracer: ISocketTracer, @@ -598,12 +650,8 @@ class ZlibDeflateStream extends Disposable { } public flush(callback: (data: VSBuffer) => void): void { - this._flushWaitingCount++; - // See https://zlib.net/manual.html#Constants this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => { - this._flushWaitingCount--; - this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired); let data = VSBuffer.concat(this._pendingDeflateData); @@ -613,22 +661,8 @@ class ZlibDeflateStream extends Disposable { data = data.slice(0, data.byteLength - 4); callback(data); - - if (this._flushWaitingCount === 0) { - this._onDidDrain.fire(); - } }); } - - public needsDraining(): boolean { - return (this._flushWaitingCount > 0); - } - - public async drain(): Promise { - if (this._flushWaitingCount > 0) { - await Event.toPromise(this.onDidDrain); - } - } } function unmask(buffer: VSBuffer, mask: number): void { diff --git a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts index 4345fcccaca..042bfcbabfe 100644 --- a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts +++ b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts @@ -525,5 +525,14 @@ suite('WebSocketNodeSocket', () => { const actual = await testReading(frames, true); assert.deepStrictEqual(actual, 'Hello'); }); + + test('A single-frame compressed text message followed by a single-frame non-compressed text message', async () => { + const frames = [ + [0xc1, 0x07, 0xf2, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00], // contains "Hello" + [0x81, 0x05, 0x77, 0x6f, 0x72, 0x6c, 0x64] // contains "world" + ]; + const actual = await testReading(frames, true); + assert.deepStrictEqual(actual, 'Helloworld'); + }); }); }); From 3fb9624b29ef3878abef01c3a817a9b489b9c1fc Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 21:07:14 +0100 Subject: [PATCH 7/9] Remove `KeepAlive` message and rely solely on unacknowledged messages as a trigger for timeout --- src/vs/base/parts/ipc/common/ipc.net.ts | 102 +++++------------- .../base/parts/ipc/test/node/ipc.net.test.ts | 4 +- .../remote/common/remoteAgentConnection.ts | 15 ++- .../remote/common/remote.contribution.ts | 41 +++++++ 4 files changed, 81 insertions(+), 81 deletions(-) diff --git a/src/vs/base/parts/ipc/common/ipc.net.ts b/src/vs/base/parts/ipc/common/ipc.net.ts index 5ff8d7bcf40..6b72f2f2dad 100644 --- a/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/src/vs/base/parts/ipc/common/ipc.net.ts @@ -253,7 +253,6 @@ const enum ProtocolMessageType { Regular = 1, Control = 2, Ack = 3, - KeepAlive = 4, Disconnect = 5, ReplayRequest = 6 } @@ -264,7 +263,6 @@ function protocolMessageTypeToString(messageType: ProtocolMessageType) { case ProtocolMessageType.Regular: return 'Regular'; case ProtocolMessageType.Control: return 'Control'; case ProtocolMessageType.Ack: return 'Ack'; - case ProtocolMessageType.KeepAlive: return 'KeepAlive'; case ProtocolMessageType.Disconnect: return 'Disconnect'; case ProtocolMessageType.ReplayRequest: return 'ReplayRequest'; } @@ -277,17 +275,11 @@ export const enum ProtocolConstants { */ AcknowledgeTime = 2000, // 2 seconds /** - * If there is a message that has been unacknowledged for 10 seconds, consider the connection closed... + * If there is a sent message that has been unacknowledged for 20 seconds, + * and we didn't see any incoming server data in the past 20 seconds, + * then consider the connection has timed out. */ - AcknowledgeTimeoutTime = 20000, // 20 seconds - /** - * Send at least a message every 5s for keep alive reasons. - */ - KeepAliveTime = 5000, // 5 seconds - /** - * If there is no message received for 10 seconds, consider the connection closed... - */ - KeepAliveTimeoutTime = 20000, // 20 seconds + TimeoutTime = 20000, // 20 seconds /** * If there is no reconnection within this time-frame, consider the connection permanently closed... */ @@ -406,6 +398,7 @@ class ProtocolReader extends Disposable { class ProtocolWriter { private _isDisposed: boolean; + private _isPaused: boolean; private readonly _socket: ISocket; private _data: VSBuffer[]; private _totalLength: number; @@ -413,6 +406,7 @@ class ProtocolWriter { constructor(socket: ISocket) { this._isDisposed = false; + this._isPaused = false; this._socket = socket; this._data = []; this._totalLength = 0; @@ -438,6 +432,10 @@ class ProtocolWriter { this._writeNow(); } + public pause() { + this._isPaused = true; + } + public write(msg: ProtocolMessage) { if (this._isDisposed) { // ignore: there could be left-over promises which complete and then @@ -484,6 +482,9 @@ class ProtocolWriter { if (this._totalLength === 0) { return; } + if (this._isPaused) { + return; + } const data = this._bufferTake(); this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolWrite, { byteLength: data.byteLength }); this._socket.write(data); @@ -758,9 +759,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { private _incomingMsgLastTime: number; private _incomingAckTimeout: any | null; - private _outgoingKeepAliveTimeout: any | null; - private _incomingKeepAliveTimeout: any | null; - private _lastReplayRequestTime: number; private _socket: ISocket; @@ -802,9 +800,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._incomingMsgLastTime = 0; this._incomingAckTimeout = null; - this._outgoingKeepAliveTimeout = null; - this._incomingKeepAliveTimeout = null; - this._lastReplayRequestTime = 0; this._socketDisposables = []; @@ -818,9 +813,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { if (initialChunk) { this._socketReader.acceptChunk(initialChunk); } - - this._sendKeepAliveCheck(); - this._recvKeepAliveCheck(); } dispose(): void { @@ -832,14 +824,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { clearTimeout(this._incomingAckTimeout); this._incomingAckTimeout = null; } - if (this._outgoingKeepAliveTimeout) { - clearTimeout(this._outgoingKeepAliveTimeout); - this._outgoingKeepAliveTimeout = null; - } - if (this._incomingKeepAliveTimeout) { - clearTimeout(this._incomingKeepAliveTimeout); - this._incomingKeepAliveTimeout = null; - } this._socketDisposables = dispose(this._socketDisposables); } @@ -853,50 +837,8 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._socketWriter.flush(); } - private _sendKeepAliveCheck(): void { - if (this._outgoingKeepAliveTimeout) { - // there will be a check in the near future - return; - } - - const timeSinceLastOutgoingMsg = Date.now() - this._socketWriter.lastWriteTime; - if (timeSinceLastOutgoingMsg >= ProtocolConstants.KeepAliveTime) { - // sufficient time has passed since last message was written, - // and no message from our side needed to be sent in the meantime, - // so we will send a message containing only a keep alive. - const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, 0, getEmptyBuffer()); - this._socketWriter.write(msg); - this._sendKeepAliveCheck(); - return; - } - - this._outgoingKeepAliveTimeout = setTimeout(() => { - this._outgoingKeepAliveTimeout = null; - this._sendKeepAliveCheck(); - }, ProtocolConstants.KeepAliveTime - timeSinceLastOutgoingMsg + 5); - } - - private _recvKeepAliveCheck(): void { - if (this._incomingKeepAliveTimeout) { - // there will be a check in the near future - return; - } - - const timeSinceLastIncomingMsg = Date.now() - this._socketReader.lastReadTime; - if (timeSinceLastIncomingMsg >= ProtocolConstants.KeepAliveTimeoutTime) { - // It's been a long time since we received a server message - // But this might be caused by the event loop being busy and failing to read messages - if (!this._loadEstimator.hasHighLoad()) { - // Trash the socket - this._onSocketTimeout.fire(undefined); - return; - } - } - - this._incomingKeepAliveTimeout = setTimeout(() => { - this._incomingKeepAliveTimeout = null; - this._recvKeepAliveCheck(); - }, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5); + pauseSocketWriting() { + this._socketWriter.pause(); } public getSocket(): ISocket { @@ -937,9 +879,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._socketWriter.write(toSend[i]); } this._recvAckCheck(); - - this._sendKeepAliveCheck(); - this._recvKeepAliveCheck(); } public acceptDisconnect(): void { @@ -1064,8 +1003,15 @@ export class PersistentProtocol implements IMessagePassingProtocol { const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!; const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime; - if (timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.AcknowledgeTimeoutTime) { + const timeSinceLastReceivedSomeData = Date.now() - this._socketReader.lastReadTime; + + if ( + timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.TimeoutTime + && timeSinceLastReceivedSomeData >= ProtocolConstants.TimeoutTime + ) { // It's been a long time since our sent message was acknowledged + // and a long time since we received some data + // But this might be caused by the event loop being busy and failing to read messages if (!this._loadEstimator.hasHighLoad()) { // Trash the socket @@ -1077,7 +1023,7 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._outgoingAckTimeout = setTimeout(() => { this._outgoingAckTimeout = null; this._recvAckCheck(); - }, Math.max(ProtocolConstants.AcknowledgeTimeoutTime - timeSinceOldestUnacknowledgedMsg, 0) + 5); + }, Math.max(ProtocolConstants.TimeoutTime - timeSinceOldestUnacknowledgedMsg, 500)); } private _sendAck(): void { diff --git a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts index 042bfcbabfe..b8f3c3d2ebe 100644 --- a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts +++ b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts @@ -342,7 +342,7 @@ suite('PersistentProtocol reconnection', () => { assert.strictEqual(b.unacknowledgedCount, 1); // wait for scheduled _recvAckCheck() to execute - await timeout(2 * ProtocolConstants.AcknowledgeTimeoutTime); + await timeout(2 * ProtocolConstants.TimeoutTime); assert.strictEqual(a.unacknowledgedCount, 1); assert.strictEqual(b.unacknowledgedCount, 1); @@ -351,7 +351,7 @@ suite('PersistentProtocol reconnection', () => { a.endAcceptReconnection(); assert.strictEqual(timeoutListenerCalled, false); - await timeout(2 * ProtocolConstants.AcknowledgeTimeoutTime); + await timeout(2 * ProtocolConstants.TimeoutTime); assert.strictEqual(a.unacknowledgedCount, 0); assert.strictEqual(b.unacknowledgedCount, 0); assert.strictEqual(timeoutListenerCalled, false); diff --git a/src/vs/platform/remote/common/remoteAgentConnection.ts b/src/vs/platform/remote/common/remoteAgentConnection.ts index b555abd3694..7729501248f 100644 --- a/src/vs/platform/remote/common/remoteAgentConnection.ts +++ b/src/vs/platform/remote/common/remoteAgentConnection.ts @@ -512,7 +512,7 @@ export class ReconnectionPermanentFailureEvent { } export type PersistentConnectionEvent = ConnectionGainEvent | ConnectionLostEvent | ReconnectionWaitEvent | ReconnectionRunningEvent | ReconnectionPermanentFailureEvent; -abstract class PersistentConnection extends Disposable { +export abstract class PersistentConnection extends Disposable { public static triggerPermanentFailure(millisSinceLastIncomingData: number, attempt: number, handled: boolean): void { this._permanentFailure = true; @@ -521,6 +521,15 @@ abstract class PersistentConnection extends Disposable { this._permanentFailureHandled = handled; this._instances.forEach(instance => instance._gotoPermanentFailure(this._permanentFailureMillisSinceLastIncomingData, this._permanentFailureAttempt, this._permanentFailureHandled)); } + + public static debugTriggerReconnection() { + this._instances.forEach(instance => instance._beginReconnecting()); + } + + public static debugPauseSocketWriting() { + this._instances.forEach(instance => instance._pauseSocketWriting()); + } + private static _permanentFailure: boolean = false; private static _permanentFailureMillisSinceLastIncomingData: number = 0; private static _permanentFailureAttempt: number = 0; @@ -678,6 +687,10 @@ abstract class PersistentConnection extends Disposable { safeDisposeProtocolAndSocket(this.protocol); } + private _pauseSocketWriting(): void { + this.protocol.pauseSocketWriting(); + } + protected abstract _reconnect(options: ISimpleConnectionOptions, timeoutCancellationToken: CancellationToken): Promise; } diff --git a/src/vs/workbench/contrib/remote/common/remote.contribution.ts b/src/vs/workbench/contrib/remote/common/remote.contribution.ts index a75b4c1198c..ad750d87ffd 100644 --- a/src/vs/workbench/contrib/remote/common/remote.contribution.ts +++ b/src/vs/workbench/contrib/remote/common/remote.contribution.ts @@ -25,6 +25,10 @@ import { IDialogService, IFileDialogService } from 'vs/platform/dialogs/common/d import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService'; import { IWorkspaceContextService } from 'vs/platform/workspace/common/workspace'; import { firstOrDefault } from 'vs/base/common/arrays'; +import { ServicesAccessor } from 'vs/platform/instantiation/common/instantiation'; +import { Action2, registerAction2 } from 'vs/platform/actions/common/actions'; +import { CATEGORIES } from 'vs/workbench/common/actions'; +import { PersistentConnection } from 'vs/platform/remote/common/remoteAgentConnection'; export class LabelContribution implements IWorkbenchContribution { constructor( @@ -161,6 +165,43 @@ workbenchContributionsRegistry.registerWorkbenchContribution(RemoteLogOutputChan workbenchContributionsRegistry.registerWorkbenchContribution(TunnelFactoryContribution, LifecyclePhase.Ready); workbenchContributionsRegistry.registerWorkbenchContribution(ShowCandidateContribution, LifecyclePhase.Ready); +const enableDiagnostics = false; + +if (enableDiagnostics) { + class TriggerReconnectAction extends Action2 { + constructor() { + super({ + id: 'workbench.action.triggerReconnect', + title: { value: localize('triggerReconnect', "Connection: Trigger Reconnect"), original: 'Connection: Trigger Reconnect' }, + category: CATEGORIES.Developer, + f1: true, + }); + } + + async run(accessor: ServicesAccessor): Promise { + PersistentConnection.debugTriggerReconnection(); + } + } + + class PauseSocketWriting extends Action2 { + constructor() { + super({ + id: 'workbench.action.pauseSocketWriting', + title: { value: localize('pauseSocketWriting', "Connection: Pause socket writing"), original: 'Connection: Pause socket writing' }, + category: CATEGORIES.Developer, + f1: true, + }); + } + + async run(accessor: ServicesAccessor): Promise { + PersistentConnection.debugPauseSocketWriting(); + } + } + + registerAction2(TriggerReconnectAction); + registerAction2(PauseSocketWriting); +} + const extensionKindSchema: IJSONSchema = { type: 'string', enum: [ From be87ebcd0dfceb7d2f1ceb128e0f16e39fc1bf13 Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 22:41:43 +0100 Subject: [PATCH 8/9] Ask the client to pause writing until the socket is sent to the remote extension host process, which then asks the client to resume (#134429) --- src/vs/base/parts/ipc/common/ipc.net.ts | 111 +++++++++++++----- .../base/parts/ipc/test/node/ipc.net.test.ts | 53 +++++++++ .../server/remoteExtensionHostAgentServer.ts | 2 + .../node/extensionHostProcessSetup.ts | 2 + 4 files changed, 139 insertions(+), 29 deletions(-) diff --git a/src/vs/base/parts/ipc/common/ipc.net.ts b/src/vs/base/parts/ipc/common/ipc.net.ts index 6b72f2f2dad..4c8ddd29b43 100644 --- a/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/src/vs/base/parts/ipc/common/ipc.net.ts @@ -254,7 +254,9 @@ const enum ProtocolMessageType { Control = 2, Ack = 3, Disconnect = 5, - ReplayRequest = 6 + ReplayRequest = 6, + Pause = 7, + Resume = 8 } function protocolMessageTypeToString(messageType: ProtocolMessageType) { @@ -265,6 +267,8 @@ function protocolMessageTypeToString(messageType: ProtocolMessageType) { case ProtocolMessageType.Ack: return 'Ack'; case ProtocolMessageType.Disconnect: return 'Disconnect'; case ProtocolMessageType.ReplayRequest: return 'ReplayRequest'; + case ProtocolMessageType.Pause: return 'PauseWriting'; + case ProtocolMessageType.Resume: return 'ResumeWriting'; } } @@ -432,10 +436,15 @@ class ProtocolWriter { this._writeNow(); } - public pause() { + public pause(): void { this._isPaused = true; } + public resume(): void { + this._isPaused = false; + this._scheduleWriting(); + } + public write(msg: ProtocolMessage) { if (this._isDisposed) { // ignore: there could be left-over promises which complete and then @@ -472,12 +481,21 @@ class ProtocolWriter { private _writeSoon(header: VSBuffer, data: VSBuffer): void { if (this._bufferAdd(header, data)) { - setTimeout(() => { - this._writeNow(); - }); + this._scheduleWriting(); } } + private _writeNowTimeout: any = null; + private _scheduleWriting(): void { + if (this._writeNowTimeout) { + return; + } + this._writeNowTimeout = setTimeout(() => { + this._writeNowTimeout = null; + this._writeNow(); + }); + } + private _writeNow(): void { if (this._totalLength === 0) { return; @@ -837,6 +855,16 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._socketWriter.flush(); } + sendPause(): void { + const msg = new ProtocolMessage(ProtocolMessageType.Pause, 0, 0, getEmptyBuffer()); + this._socketWriter.write(msg); + } + + sendResume(): void { + const msg = new ProtocolMessage(ProtocolMessageType.Resume, 0, 0, getEmptyBuffer()); + this._socketWriter.write(msg); + } + pauseSocketWriting() { this._socketWriter.pause(); } @@ -899,34 +927,59 @@ export class PersistentProtocol implements IMessagePassingProtocol { } while (true); } - if (msg.type === ProtocolMessageType.Regular) { - if (msg.id > this._incomingMsgId) { - if (msg.id !== this._incomingMsgId + 1) { - // in case we missed some messages we ask the other party to resend them - const now = Date.now(); - if (now - this._lastReplayRequestTime > 10000) { - // send a replay request at most once every 10s - this._lastReplayRequestTime = now; - this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.ReplayRequest, 0, 0, getEmptyBuffer())); + switch (msg.type) { + case ProtocolMessageType.None: { + // N/A + break; + } + case ProtocolMessageType.Regular: { + if (msg.id > this._incomingMsgId) { + if (msg.id !== this._incomingMsgId + 1) { + // in case we missed some messages we ask the other party to resend them + const now = Date.now(); + if (now - this._lastReplayRequestTime > 10000) { + // send a replay request at most once every 10s + this._lastReplayRequestTime = now; + this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.ReplayRequest, 0, 0, getEmptyBuffer())); + } + } else { + this._incomingMsgId = msg.id; + this._incomingMsgLastTime = Date.now(); + this._sendAckCheck(); + this._onMessage.fire(msg.data); } - } else { - this._incomingMsgId = msg.id; - this._incomingMsgLastTime = Date.now(); - this._sendAckCheck(); - this._onMessage.fire(msg.data); } + break; } - } else if (msg.type === ProtocolMessageType.Control) { - this._onControlMessage.fire(msg.data); - } else if (msg.type === ProtocolMessageType.Disconnect) { - this._onDidDispose.fire(); - } else if (msg.type === ProtocolMessageType.ReplayRequest) { - // Send again all unacknowledged messages - const toSend = this._outgoingUnackMsg.toArray(); - for (let i = 0, len = toSend.length; i < len; i++) { - this._socketWriter.write(toSend[i]); + case ProtocolMessageType.Control: { + this._onControlMessage.fire(msg.data); + break; + } + case ProtocolMessageType.Ack: { + // nothing to do + break; + } + case ProtocolMessageType.Disconnect: { + this._onDidDispose.fire(); + break; + } + case ProtocolMessageType.ReplayRequest: { + // Send again all unacknowledged messages + const toSend = this._outgoingUnackMsg.toArray(); + for (let i = 0, len = toSend.length; i < len; i++) { + this._socketWriter.write(toSend[i]); + } + this._recvAckCheck(); + break; + } + case ProtocolMessageType.Pause: { + this._socketWriter.pause(); + break; + } + case ProtocolMessageType.Resume: { + this._socketWriter.resume(); + break; } - this._recvAckCheck(); } } diff --git a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts index b8f3c3d2ebe..ec49f0069b6 100644 --- a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts +++ b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts @@ -364,6 +364,59 @@ suite('PersistentProtocol reconnection', () => { } ); }); + + test('writing can be paused', async () => { + await runWithFakedTimers({ useFakeTimers: true, maxTaskCount: 100 }, async () => { + const loadEstimator: ILoadEstimator = { + hasHighLoad: () => false + }; + const ether = new Ether(); + const aSocket = new NodeSocket(ether.a); + const a = new PersistentProtocol(aSocket, null, loadEstimator); + const aMessages = new MessageStream(a); + const bSocket = new NodeSocket(ether.b); + const b = new PersistentProtocol(bSocket, null, loadEstimator); + const bMessages = new MessageStream(b); + + // send one message A -> B + a.send(VSBuffer.fromString('a1')); + const a1 = await bMessages.waitForOne(); + assert.strictEqual(a1.toString(), 'a1'); + + // ask A to pause writing + b.sendPause(); + + // send a message B -> A + b.send(VSBuffer.fromString('b1')); + const b1 = await aMessages.waitForOne(); + assert.strictEqual(b1.toString(), 'b1'); + + // send a message A -> B (this should be blocked at A) + a.send(VSBuffer.fromString('a2')); + + // wait a long time and check that not even acks are written + await timeout(2 * ProtocolConstants.AcknowledgeTime); + assert.strictEqual(a.unacknowledgedCount, 1); + assert.strictEqual(b.unacknowledgedCount, 1); + + // ask A to resume writing + b.sendResume(); + + // check that B receives message + const a2 = await bMessages.waitForOne(); + assert.strictEqual(a2.toString(), 'a2'); + + // wait a long time and check that acks are written + await timeout(2 * ProtocolConstants.AcknowledgeTime); + assert.strictEqual(a.unacknowledgedCount, 0); + assert.strictEqual(b.unacknowledgedCount, 0); + + aMessages.dispose(); + bMessages.dispose(); + a.dispose(); + b.dispose(); + }); + }); }); suite('IPC, create handle', () => { diff --git a/src/vs/server/remoteExtensionHostAgentServer.ts b/src/vs/server/remoteExtensionHostAgentServer.ts index b3a66234045..24007c162b0 100644 --- a/src/vs/server/remoteExtensionHostAgentServer.ts +++ b/src/vs/server/remoteExtensionHostAgentServer.ts @@ -754,6 +754,7 @@ export class RemoteExtensionHostAgentServer extends Disposable { } } + protocol.sendPause(); protocol.sendControl(VSBuffer.fromString(JSON.stringify(startParams.port ? { debugPort: startParams.port } : {}))); const dataChunk = protocol.readEntireBuffer(); protocol.dispose(); @@ -766,6 +767,7 @@ export class RemoteExtensionHostAgentServer extends Disposable { return this._rejectWebSocketConnection(logPrefix, protocol, `Duplicate reconnection token`); } + protocol.sendPause(); protocol.sendControl(VSBuffer.fromString(JSON.stringify(startParams.port ? { debugPort: startParams.port } : {}))); const dataChunk = protocol.readEntireBuffer(); protocol.dispose(); diff --git a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts index 36dc1a64166..e9a12654d26 100644 --- a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts +++ b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts @@ -134,9 +134,11 @@ function _createExtHostProtocol(): Promise { disconnectRunner2.cancel(); protocol.beginAcceptReconnection(socket, initialDataChunk); protocol.endAcceptReconnection(); + protocol.sendResume(); } else { clearTimeout(timer); protocol = new PersistentProtocol(socket, initialDataChunk); + protocol.sendResume(); protocol.onDidDispose(() => onTerminate('renderer disconnected')); resolve(protocol); From 8dbd9d0ee65440f09230916ce48f4f40559dcafc Mon Sep 17 00:00:00 2001 From: Alex Dima Date: Thu, 25 Nov 2021 23:40:30 +0100 Subject: [PATCH 9/9] Remove unnecessary `socket.pause()` calls --- src/vs/server/extensionHostConnection.ts | 2 -- src/vs/workbench/contrib/remote/common/remote.contribution.ts | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/vs/server/extensionHostConnection.ts b/src/vs/server/extensionHostConnection.ts index 3f8b75c3649..261aa659097 100644 --- a/src/vs/server/extensionHostConnection.ts +++ b/src/vs/server/extensionHostConnection.ts @@ -110,7 +110,6 @@ export class ExtensionHostConnection { this._remoteAddress = remoteAddress; this._extensionHostProcess = null; this._connectionData = ExtensionHostConnection._toConnectionData(socket, initialDataChunk); - this._connectionData.socket.pause(); this._log(`New connection established.`); } @@ -156,7 +155,6 @@ export class ExtensionHostConnection { this._remoteAddress = remoteAddress; this._log(`The client has reconnected.`); const connectionData = ExtensionHostConnection._toConnectionData(_socket, initialDataChunk); - connectionData.socket.pause(); if (!this._extensionHostProcess) { // The extension host didn't even start up yet diff --git a/src/vs/workbench/contrib/remote/common/remote.contribution.ts b/src/vs/workbench/contrib/remote/common/remote.contribution.ts index ad750d87ffd..83c158dd41b 100644 --- a/src/vs/workbench/contrib/remote/common/remote.contribution.ts +++ b/src/vs/workbench/contrib/remote/common/remote.contribution.ts @@ -165,7 +165,7 @@ workbenchContributionsRegistry.registerWorkbenchContribution(RemoteLogOutputChan workbenchContributionsRegistry.registerWorkbenchContribution(TunnelFactoryContribution, LifecyclePhase.Ready); workbenchContributionsRegistry.registerWorkbenchContribution(ShowCandidateContribution, LifecyclePhase.Ready); -const enableDiagnostics = false; +const enableDiagnostics = true; if (enableDiagnostics) { class TriggerReconnectAction extends Action2 {