Add tracing to sockets

This commit is contained in:
Alex Dima 2021-11-25 09:40:16 +01:00
parent eace6a0af1
commit 648e355c05
No known key found for this signature in database
GPG key ID: 39563C1504FDD0C9
9 changed files with 274 additions and 39 deletions

View file

@ -90,11 +90,20 @@ export class VSBuffer {
set(array: VSBuffer, offset?: number): void;
set(array: Uint8Array, offset?: number): void;
set(array: VSBuffer | Uint8Array, offset?: number): void {
set(array: ArrayBuffer, offset?: number): void;
set(array: ArrayBufferView, offset?: number): void;
set(array: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView, offset?: number): void;
set(array: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView, offset?: number): void {
if (array instanceof VSBuffer) {
this.buffer.set(array.buffer, offset);
} else {
} else if (array instanceof Uint8Array) {
this.buffer.set(array, offset);
} else if (array instanceof ArrayBuffer) {
this.buffer.set(new Uint8Array(array), offset);
} else if (ArrayBuffer.isView(array)) {
this.buffer.set(new Uint8Array(array.buffer, array.byteOffset, array.byteLength), offset);
} else {
throw new Error(`Unkown argument 'array'`);
}
}

View file

@ -8,6 +8,89 @@ import { Emitter, Event } from 'vs/base/common/event';
import { Disposable, dispose, IDisposable } from 'vs/base/common/lifecycle';
import { IIPCLogger, IMessagePassingProtocol, IPCClient } from 'vs/base/parts/ipc/common/ipc';
export const enum SocketDiagnosticsEventType {
Created = 'created',
Read = 'read',
Write = 'write',
Open = 'open',
Error = 'error',
Close = 'close',
BrowserWebSocketBlobReceived = 'browserWebSocketBlobReceived',
NodeEndReceived = 'nodeEndReceived',
NodeEndSent = 'nodeEndSent',
NodeDrainBegin = 'nodeDrainBegin',
NodeDrainEnd = 'nodeDrainEnd',
zlibInflateError = 'zlibInflateError',
zlibInflateData = 'zlibInflateData',
zlibInflateWriteInitial = 'zlibInflateWriteInitial',
zlibInflateInitialFlushFired = 'zlibInflateInitialFlushFired',
zlibInflateWrite = 'zlibInflateWrite',
zlibInflateFlushFired = 'zlibInflateFlushFired',
zlibDeflateError = 'zlibDeflateError',
zlibDeflateData = 'zlibDeflateData',
zlibDeflateWrite = 'zlibDeflateWrite',
zlibDeflateFlushFired = 'zlibDeflateFlushFired',
WebSocketNodeSocketWrite = 'webSocketNodeSocketWrite',
WebSocketNodeSocketPeekedHeader = 'webSocketNodeSocketPeekedHeader',
WebSocketNodeSocketReadHeader = 'webSocketNodeSocketReadHeader',
WebSocketNodeSocketReadData = 'webSocketNodeSocketReadData',
WebSocketNodeSocketUnmaskedData = 'webSocketNodeSocketUnmaskedData',
WebSocketNodeSocketDrainBegin = 'webSocketNodeSocketDrainBegin',
WebSocketNodeSocketDrainEnd = 'webSocketNodeSocketDrainEnd',
ProtocolHeaderRead = 'protocolHeaderRead',
ProtocolMessageRead = 'protocolMessageRead',
ProtocolHeaderWrite = 'protocolHeaderWrite',
ProtocolMessageWrite = 'protocolMessageWrite',
ProtocolWrite = 'protocolWrite',
}
export namespace SocketDiagnostics {
export const enableDiagnostics = false;
export interface IRecord {
timestamp: number;
id: string;
label: string;
type: SocketDiagnosticsEventType;
buff?: VSBuffer;
data?: any;
}
export const records: IRecord[] = [];
const socketIds = new WeakMap<any, string>();
let lastUsedSocketId = 0;
function getSocketId(nativeObject: any, label: string): string {
if (!socketIds.has(nativeObject)) {
const id = String(++lastUsedSocketId);
socketIds.set(nativeObject, id);
}
return socketIds.get(nativeObject)!;
}
export function traceSocketEvent(nativeObject: any, socketDebugLabel: string, type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
if (!enableDiagnostics) {
return;
}
const id = getSocketId(nativeObject, socketDebugLabel);
if (data instanceof VSBuffer || data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
const copiedData = VSBuffer.alloc(data.byteLength);
copiedData.set(data);
records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, buff: copiedData });
} else {
// data is a custom object
records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, data: data });
}
}
}
export const enum SocketCloseEventType {
NodeSocketCloseEvent = 0,
WebSocketCloseEvent = 1
@ -60,6 +143,8 @@ export interface ISocket extends IDisposable {
write(buffer: VSBuffer): void;
end(): void;
drain(): Promise<void>;
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;
}
let emptyBuffer: VSBuffer | null = null;
@ -173,6 +258,18 @@ const enum ProtocolMessageType {
ReplayRequest = 6
}
function protocolMessageTypeToString(messageType: ProtocolMessageType) {
switch (messageType) {
case ProtocolMessageType.None: return 'None';
case ProtocolMessageType.Regular: return 'Regular';
case ProtocolMessageType.Control: return 'Control';
case ProtocolMessageType.Ack: return 'Ack';
case ProtocolMessageType.KeepAlive: return 'KeepAlive';
case ProtocolMessageType.Disconnect: return 'Disconnect';
case ProtocolMessageType.ReplayRequest: return 'ReplayRequest';
}
}
export const enum ProtocolConstants {
HeaderLength = 13,
/**
@ -268,6 +365,9 @@ class ProtocolReader extends Disposable {
this._state.messageType = buff.readUInt8(0);
this._state.id = buff.readUInt32BE(1);
this._state.ack = buff.readUInt32BE(5);
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderRead, { messageType: protocolMessageTypeToString(this._state.messageType), id: this._state.id, ack: this._state.ack, messageSize: this._state.readLen });
} else {
// buff is the body
const messageType = this._state.messageType;
@ -281,6 +381,8 @@ class ProtocolReader extends Disposable {
this._state.id = 0;
this._state.ack = 0;
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageRead, buff);
this._onMessage.fire(new ProtocolMessage(messageType, id, ack, buff));
if (this._isDisposed) {
@ -349,6 +451,10 @@ class ProtocolWriter {
header.writeUInt32BE(msg.id, 1);
header.writeUInt32BE(msg.ack, 5);
header.writeUInt32BE(msg.data.byteLength, 9);
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderWrite, { messageType: protocolMessageTypeToString(msg.type), id: msg.id, ack: msg.ack, messageSize: msg.data.byteLength });
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageWrite, msg.data);
this._writeSoon(header, msg.data);
}
@ -378,7 +484,9 @@ class ProtocolWriter {
if (this._totalLength === 0) {
return;
}
this._socket.write(this._bufferTake());
const data = this._bufferTake();
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolWrite, { byteLength: data.byteLength });
this._socket.write(data);
}
}
@ -983,3 +1091,39 @@ export class PersistentProtocol implements IMessagePassingProtocol {
this._socketWriter.write(msg);
}
}
// (() => {
// if (!SocketDiagnostics.enableDiagnostics) {
// return;
// }
// if (typeof require.__$__nodeRequire !== 'function') {
// console.log(`Can only log socket diagnostics on native platforms.`);
// return;
// }
// const type = (
// process.argv.includes('--type=renderer')
// ? 'renderer'
// : (process.argv.includes('--type=extensionHost')
// ? 'extensionHost'
// : (process.argv.some(item => item.includes('server/main'))
// ? 'server'
// : 'unknown'
// )
// )
// );
// setTimeout(() => {
// SocketDiagnostics.records.forEach(r => {
// if (r.buff) {
// r.data = Buffer.from(r.buff.buffer).toString('base64');
// r.buff = undefined;
// }
// });
// const fs = <typeof import('fs')>require.__$__nodeRequire('fs');
// const path = <typeof import('path')>require.__$__nodeRequire('path');
// const logPath = path.join(process.cwd(),`${type}-${process.pid}`);
// console.log(`dumping socket diagnostics at ${logPath}`);
// fs.writeFileSync(logPath, JSON.stringify(SocketDiagnostics.records));
// }, 20000);
// })();

View file

@ -14,17 +14,25 @@ import { join } from 'vs/base/common/path';
import { Platform, platform } from 'vs/base/common/platform';
import { generateUuid } from 'vs/base/common/uuid';
import { ClientConnectionEvent, IPCServer } from 'vs/base/parts/ipc/common/ipc';
import { ChunkStream, Client, ISocket, Protocol, SocketCloseEvent, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { ChunkStream, Client, ISocket, Protocol, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
import * as zlib from 'zlib';
export class NodeSocket implements ISocket {
public readonly debugLabel: string;
public readonly socket: Socket;
private readonly _errorListener: (err: any) => void;
constructor(socket: Socket) {
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
SocketDiagnostics.traceSocketEvent(this.socket, this.debugLabel, type, data);
}
constructor(socket: Socket, debugLabel: string = '') {
this.debugLabel = debugLabel;
this.socket = socket;
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'NodeSocket' });
this._errorListener = (err: any) => {
this.traceSocketEvent(SocketDiagnosticsEventType.Error, { code: err?.code, message: err?.message });
if (err) {
if (err.code === 'EPIPE') {
// An EPIPE exception at the wrong time can lead to a renderer process crash
@ -47,7 +55,10 @@ export class NodeSocket implements ISocket {
}
public onData(_listener: (e: VSBuffer) => void): IDisposable {
const listener = (buff: Buffer) => _listener(VSBuffer.wrap(buff));
const listener = (buff: Buffer) => {
this.traceSocketEvent(SocketDiagnosticsEventType.Read, buff);
_listener(VSBuffer.wrap(buff));
};
this.socket.on('data', listener);
return {
dispose: () => this.socket.off('data', listener)
@ -56,6 +67,7 @@ export class NodeSocket implements ISocket {
public onClose(listener: (e: SocketCloseEvent) => void): IDisposable {
const adapter = (hadError: boolean) => {
this.traceSocketEvent(SocketDiagnosticsEventType.Close, { hadError });
listener({
type: SocketCloseEventType.NodeSocketCloseEvent,
hadError: hadError,
@ -69,9 +81,13 @@ export class NodeSocket implements ISocket {
}
public onEnd(listener: () => void): IDisposable {
this.socket.on('end', listener);
const adapter = () => {
this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndReceived);
listener();
};
this.socket.on('end', adapter);
return {
dispose: () => this.socket.off('end', listener)
dispose: () => this.socket.off('end', adapter)
};
}
@ -87,7 +103,8 @@ export class NodeSocket implements ISocket {
// > However, the false return value is only advisory and the writable stream will unconditionally
// > accept and buffer chunk even if it has not been allowed to drain.
try {
this.socket.write(<Buffer>buffer.buffer, (err: any) => {
this.traceSocketEvent(SocketDiagnosticsEventType.Write, buffer);
this.socket.write(buffer.buffer, (err: any) => {
if (err) {
if (err.code === 'EPIPE') {
// An EPIPE exception at the wrong time can lead to a renderer process crash
@ -116,12 +133,15 @@ export class NodeSocket implements ISocket {
}
public end(): void {
this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndSent);
this.socket.end();
}
public drain(): Promise<void> {
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainBegin);
return new Promise<void>((resolve, reject) => {
if (this.socket.bufferSize === 0) {
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd);
resolve();
return;
}
@ -131,6 +151,7 @@ export class NodeSocket implements ISocket {
this.socket.off('error', finished);
this.socket.off('timeout', finished);
this.socket.off('drain', finished);
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd);
resolve();
};
this.socket.on('close', finished);
@ -209,6 +230,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
return VSBuffer.alloc(0);
}
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
this.socket.traceSocketEvent(type, data);
}
/**
* Create a socket which can communicate using WebSocket frames.
*
@ -224,6 +249,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
constructor(socket: NodeSocket, permessageDeflate: boolean, inflateBytes: VSBuffer | null, recordInflateBytes: boolean) {
super();
this.socket = socket;
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'WebSocketNodeSocket', permessageDeflate, inflateBytesLength: inflateBytes?.byteLength || 0, recordInflateBytes });
this._totalIncomingWireBytes = 0;
this._totalIncomingDataBytes = 0;
this._totalOutgoingWireBytes = 0;
@ -238,6 +264,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
windowBits: 15
});
this._zlibInflate.on('error', (err) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (<any>err)?.code });
// zlib errors are fatal, since we have no idea how to recover
console.error(err);
onUnexpectedError(err);
@ -248,11 +275,14 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
});
});
this._zlibInflate.on('data', (data: Buffer) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data);
this._pendingInflateData.push(data);
});
if (inflateBytes) {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWriteInitial, inflateBytes.buffer);
this._zlibInflate.write(inflateBytes.buffer);
this._zlibInflate.flush(() => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired);
this._pendingInflateData.length = 0;
});
}
@ -261,6 +291,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
windowBits: 15
});
this._zlibDeflate.on('error', (err) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (<any>err)?.code });
// zlib errors are fatal, since we have no idea how to recover
console.error(err);
onUnexpectedError(err);
@ -271,6 +302,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
});
});
this._zlibDeflate.on('data', (data: Buffer) => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data);
this._pendingDeflateData.push(data);
});
} else {
@ -311,11 +343,13 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
this._totalOutgoingDataBytes += buffer.byteLength;
if (this._zlibDeflate) {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer);
this._zlibDeflate.write(<Buffer>buffer.buffer);
this._zlibDeflateFlushWaitingCount++;
// See https://zlib.net/manual.html#Constants
this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired);
this._zlibDeflateFlushWaitingCount--;
let data = Buffer.concat(this._pendingDeflateData);
this._pendingDeflateData.length = 0;
@ -338,6 +372,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
}
private _write(buffer: VSBuffer, compressed: boolean): void {
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketWrite, buffer);
let headerLen = Constants.MinHeaderByteSize;
if (buffer.byteLength < 126) {
headerLen += 0;
@ -413,6 +448,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
this._state.firstFrameOfMessage = Boolean(finBit);
this._state.mask = 0;
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin });
} else if (this._state.state === ReadState.ReadHeader) {
// read entire header
const header = this._incomingData.read(this._state.readLen);
@ -453,11 +490,16 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
this._state.readLen = len;
this._state.mask = mask;
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask });
} else if (this._state.state === ReadState.ReadBody) {
// read body
const body = this._incomingData.read(this._state.readLen);
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketReadData, body);
unmask(body, this._state.mask);
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketUnmaskedData, body);
this._state.state = ReadState.PeekHeader;
this._state.readLen = Constants.MinHeaderByteSize;
@ -473,14 +515,19 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
if (this._recordInflateBytes) {
this._recordedInflateBytes.push(Buffer.from(<Buffer>body.buffer));
}
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, body.buffer);
this._zlibInflate.write(<Buffer>body.buffer);
if (this._state.fin) {
if (this._recordInflateBytes) {
this._recordedInflateBytes.push(Buffer.from([0x00, 0x00, 0xff, 0xff]));
}
this._zlibInflate.write(Buffer.from([0x00, 0x00, 0xff, 0xff]));
const buff = Buffer.from([0x00, 0x00, 0xff, 0xff]);
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buff);
this._zlibInflate.write(buff);
}
this._zlibInflate.flush(() => {
this.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired);
const data = Buffer.concat(this._pendingInflateData);
this._pendingInflateData.length = 0;
this._totalIncomingDataBytes += data.length;
@ -495,10 +542,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
}
public async drain(): Promise<void> {
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainBegin);
if (this._zlibDeflateFlushWaitingCount > 0) {
await Event.toPromise(this._onDidZlibFlush.event);
}
await this.socket.drain();
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainEnd);
}
}
@ -597,7 +646,7 @@ export class Server extends IPCServer {
const onConnection = Event.fromNodeEventEmitter<Socket>(server, 'connection');
return Event.map(onConnection, socket => ({
protocol: new Protocol(new NodeSocket(socket)),
protocol: new Protocol(new NodeSocket(socket, 'ipc-server-connection')),
onDidClientDisconnect: Event.once(Event.fromNodeEventEmitter<void>(socket, 'close'))
}));
}
@ -639,7 +688,7 @@ export function connect(hook: any, clientId: string): Promise<Client> {
return new Promise<Client>((c, e) => {
const socket = createConnection(hook, () => {
socket.removeListener('error', e);
c(Client.fromSocket(new NodeSocket(socket), clientId));
c(Client.fromSocket(new NodeSocket(socket, `ipc-client${clientId}`), clientId));
});
socket.once('error', e);

View file

@ -8,12 +8,12 @@ import { RunOnceScheduler } from 'vs/base/common/async';
import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter, Event } from 'vs/base/common/event';
import { Disposable, IDisposable } from 'vs/base/common/lifecycle';
import { ISocket, SocketCloseEvent, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { IConnectCallback, ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection';
import { RemoteAuthorityResolverError, RemoteAuthorityResolverErrorCode } from 'vs/platform/remote/common/remoteAuthorityResolver';
export interface IWebSocketFactory {
create(url: string): IWebSocket;
create(url: string, debugLabel: string): IWebSocket;
}
export interface IWebSocketCloseEvent {
@ -41,6 +41,7 @@ export interface IWebSocket {
readonly onClose: Event<IWebSocketCloseEvent | void>;
readonly onError: Event<any>;
traceSocketEvent?(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;
send(data: ArrayBuffer | ArrayBufferView): void;
close(): void;
}
@ -50,7 +51,8 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
private readonly _onData = new Emitter<ArrayBuffer>();
public readonly onData = this._onData.event;
public readonly onOpen: Event<void>;
private readonly _onOpen = this._register(new Emitter<void>());
public readonly onOpen = this._onOpen.event;
private readonly _onClose = this._register(new Emitter<IWebSocketCloseEvent>());
public readonly onClose = this._onClose.event;
@ -58,6 +60,7 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
private readonly _onError = this._register(new Emitter<any>());
public readonly onError = this._onError.event;
private readonly _debugLabel: string;
private readonly _socket: WebSocket;
private readonly _fileReader: FileReader;
private readonly _queue: Blob[];
@ -66,9 +69,15 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
private readonly _socketMessageListener: (ev: MessageEvent) => void;
constructor(socket: WebSocket) {
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
SocketDiagnostics.traceSocketEvent(this._socket, this._debugLabel, type, data);
}
constructor(url: string, debugLabel: string) {
super();
this._socket = socket;
this._debugLabel = debugLabel;
this._socket = new WebSocket(url);
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'BrowserWebSocket', url });
this._fileReader = new FileReader();
this._queue = [];
this._isReading = false;
@ -78,6 +87,7 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
this._isReading = false;
const buff = <ArrayBuffer>(<any>event.target).result;
this.traceSocketEvent(SocketDiagnosticsEventType.Read, buff);
this._onData.fire(buff);
if (this._queue.length > 0) {
@ -95,11 +105,16 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
};
this._socketMessageListener = (ev: MessageEvent) => {
enqueue(<Blob>ev.data);
const blob = (<Blob>ev.data);
this.traceSocketEvent(SocketDiagnosticsEventType.BrowserWebSocketBlobReceived, { type: blob.type, size: blob.size });
enqueue(blob);
};
this._socket.addEventListener('message', this._socketMessageListener);
this.onOpen = Event.fromDOMEventEmitter(this._socket, 'open');
this._register(dom.addDisposableListener(this._socket, 'open', (e) => {
this.traceSocketEvent(SocketDiagnosticsEventType.Open);
this._onOpen.fire();
}));
// WebSockets emit error events that do not contain any real information
// Our only chance of getting to the root cause of an error is to
@ -134,6 +149,8 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
};
this._register(dom.addDisposableListener(this._socket, 'close', (e: CloseEvent) => {
this.traceSocketEvent(SocketDiagnosticsEventType.Close, { code: e.code, reason: e.reason, wasClean: e.wasClean });
this._isClosed = true;
if (pendingErrorEvent) {
@ -157,7 +174,10 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
this._onClose.fire({ code: e.code, reason: e.reason, wasClean: e.wasClean, event: e });
}));
this._register(dom.addDisposableListener(this._socket, 'error', sendErrorSoon));
this._register(dom.addDisposableListener(this._socket, 'error', (err) => {
this.traceSocketEvent(SocketDiagnosticsEventType.Error, { message: err?.message });
sendErrorSoon(err);
}));
}
send(data: ArrayBuffer | ArrayBufferView): void {
@ -165,11 +185,13 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
// Refuse to write data to closed WebSocket...
return;
}
this.traceSocketEvent(SocketDiagnosticsEventType.Write, data);
this._socket.send(data);
}
close(): void {
this._isClosed = true;
this.traceSocketEvent(SocketDiagnosticsEventType.Close);
this._socket.close();
this._socket.removeEventListener('message', this._socketMessageListener);
this.dispose();
@ -177,16 +199,27 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
}
export const defaultWebSocketFactory = new class implements IWebSocketFactory {
create(url: string): IWebSocket {
return new BrowserWebSocket(new WebSocket(url));
create(url: string, debugLabel: string): IWebSocket {
return new BrowserWebSocket(url, debugLabel);
}
};
class BrowserSocket implements ISocket {
public readonly socket: IWebSocket;
constructor(socket: IWebSocket) {
public readonly socket: IWebSocket;
public readonly debugLabel: string;
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
if (typeof this.socket.traceSocketEvent === 'function') {
this.socket.traceSocketEvent(type, data);
} else {
SocketDiagnostics.traceSocketEvent(this.socket, this.debugLabel, type, data);
}
}
constructor(socket: IWebSocket, debugLabel: string) {
this.socket = socket;
this.debugLabel = debugLabel;
}
public dispose(): void {
@ -239,13 +272,13 @@ export class BrowserSocketFactory implements ISocketFactory {
this._webSocketFactory = webSocketFactory || defaultWebSocketFactory;
}
connect(host: string, port: number, query: string, callback: IConnectCallback): void {
connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void {
const webSocketSchema = (/^https:/.test(window.location.href) ? 'wss' : 'ws');
const socket = this._webSocketFactory.create(`${webSocketSchema}://${/:/.test(host) ? `[${host}]` : host}:${port}/?${query}&skipWebSocketFrames=false`);
const socket = this._webSocketFactory.create(`${webSocketSchema}://${/:/.test(host) ? `[${host}]` : host}:${port}/?${query}&skipWebSocketFrames=false`, debugLabel);
const errorListener = socket.onError((err) => callback(err, undefined));
socket.onOpen(() => {
errorListener.dispose();
callback(undefined, new BrowserSocket(socket));
callback(undefined, new BrowserSocket(socket, debugLabel));
});
}
}

View file

@ -85,7 +85,7 @@ export interface IConnectCallback {
}
export interface ISocketFactory {
connect(host: string, port: number, query: string, callback: IConnectCallback): void;
connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void;
}
function createTimeoutCancellation(millis: number): CancellationToken {
@ -188,9 +188,9 @@ function readOneControlMessage<T>(protocol: PersistentProtocol, timeoutCancellat
return result.promise;
}
function createSocket(logService: ILogService, socketFactory: ISocketFactory, host: string, port: number, query: string, timeoutCancellationToken: CancellationToken): Promise<ISocket> {
function createSocket(logService: ILogService, socketFactory: ISocketFactory, host: string, port: number, query: string, debugLabel: string, timeoutCancellationToken: CancellationToken): Promise<ISocket> {
const result = new PromiseWithTimeout<ISocket>(timeoutCancellationToken);
socketFactory.connect(host, port, query, (err: any, socket: ISocket | undefined) => {
socketFactory.connect(host, port, query, debugLabel, (err: any, socket: ISocket | undefined) => {
if (result.didTimeout) {
if (err) {
logService.error(err);
@ -231,7 +231,7 @@ async function connectToRemoteExtensionHostAgent(options: ISimpleConnectionOptio
let socket: ISocket;
try {
socket = await createSocket(options.logService, options.socketFactory, options.host, options.port, `reconnectionToken=${options.reconnectionToken}&reconnection=${options.reconnectionProtocol ? 'true' : 'false'}`, timeoutCancellationToken);
socket = await createSocket(options.logService, options.socketFactory, options.host, options.port, `reconnectionToken=${options.reconnectionToken}&reconnection=${options.reconnectionProtocol ? 'true' : 'false'}`, `renderer-${connectionTypeToString(connectionType)}-${options.reconnectionToken}`, timeoutCancellationToken);
} catch (error) {
options.logService.error(`${logPrefix} socketFactory.connect() failed or timed out. Error:`);
options.logService.error(error);

View file

@ -8,7 +8,7 @@ import { NodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import { IConnectCallback, ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection';
export const nodeSocketFactory = new class implements ISocketFactory {
connect(host: string, port: number, query: string, callback: IConnectCallback): void {
connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void {
const errorListener = (err: any) => callback(err, undefined);
const socket = net.createConnection({ host: host, port: port }, () => {
@ -34,7 +34,7 @@ export const nodeSocketFactory = new class implements ISocketFactory {
if (strData.indexOf('\r\n\r\n') >= 0) {
// headers received OK
socket.off('data', onData);
callback(undefined, new NodeSocket(socket));
callback(undefined, new NodeSocket(socket, debugLabel));
}
};
socket.on('data', onData);

View file

@ -497,9 +497,9 @@ export class RemoteExtensionHostAgentServer extends Disposable {
// Finally!
if (skipWebSocketFrames) {
this._handleWebSocketConnection(new NodeSocket(socket), isReconnection, reconnectionToken);
this._handleWebSocketConnection(new NodeSocket(socket, `server-connection-${reconnectionToken}`), isReconnection, reconnectionToken);
} else {
this._handleWebSocketConnection(new WebSocketNodeSocket(new NodeSocket(socket), permessageDeflate, null, true), isReconnection, reconnectionToken);
this._handleWebSocketConnection(new WebSocketNodeSocket(new NodeSocket(socket, `server-connection-${reconnectionToken}`), permessageDeflate, null, true), isReconnection, reconnectionToken);
}
}

View file

@ -450,7 +450,7 @@ export class LocalProcessExtensionHost implements IExtensionHost {
// using a buffered message protocol here because between now
// and the first time a `then` executes some messages might be lost
// unless we immediately register a listener for `onMessage`.
resolve(new PersistentProtocol(new NodeSocket(this._extensionHostConnection)));
resolve(new PersistentProtocol(new NodeSocket(this._extensionHostConnection, 'renderer-exthost')));
});
// Now that the named pipe listener is installed, start the ext host process

View file

@ -123,10 +123,10 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
const initialDataChunk = VSBuffer.wrap(Buffer.from(msg.initialDataChunk, 'base64'));
let socket: NodeSocket | WebSocketNodeSocket;
if (msg.skipWebSocketFrames) {
socket = new NodeSocket(handle);
socket = new NodeSocket(handle, 'extHost-socket');
} else {
const inflateBytes = VSBuffer.wrap(Buffer.from(msg.inflateBytes, 'base64'));
socket = new WebSocketNodeSocket(new NodeSocket(handle), msg.permessageDeflate, inflateBytes, false);
socket = new WebSocketNodeSocket(new NodeSocket(handle, 'extHost-socket'), msg.permessageDeflate, inflateBytes, false);
}
if (protocol) {
// reconnection case
@ -174,7 +174,7 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
const socket = net.createConnection(pipeName, () => {
socket.removeListener('error', reject);
resolve(new PersistentProtocol(new NodeSocket(socket)));
resolve(new PersistentProtocol(new NodeSocket(socket, 'extHost-renderer')));
});
socket.once('error', reject);