Merge pull request #137863 from microsoft/alex/remote-connection-improvements
Remote connection improvements
This commit is contained in:
commit
4aad18d229
|
@ -43,6 +43,14 @@ export class VSBuffer {
|
|||
}
|
||||
}
|
||||
|
||||
static fromByteArray(source: number[]): VSBuffer {
|
||||
const result = VSBuffer.alloc(source.length);
|
||||
for (let i = 0, len = source.length; i < len; i++) {
|
||||
result.buffer[i] = source[i];
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static concat(buffers: VSBuffer[], totalLength?: number): VSBuffer {
|
||||
if (typeof totalLength === 'undefined') {
|
||||
totalLength = 0;
|
||||
|
@ -70,6 +78,12 @@ export class VSBuffer {
|
|||
this.byteLength = this.buffer.byteLength;
|
||||
}
|
||||
|
||||
clone(): VSBuffer {
|
||||
const result = VSBuffer.alloc(this.byteLength);
|
||||
result.set(this);
|
||||
return result;
|
||||
}
|
||||
|
||||
toString(): string {
|
||||
if (hasBuffer) {
|
||||
return this.buffer.toString();
|
||||
|
@ -90,11 +104,20 @@ export class VSBuffer {
|
|||
|
||||
set(array: VSBuffer, offset?: number): void;
|
||||
set(array: Uint8Array, offset?: number): void;
|
||||
set(array: VSBuffer | Uint8Array, offset?: number): void {
|
||||
set(array: ArrayBuffer, offset?: number): void;
|
||||
set(array: ArrayBufferView, offset?: number): void;
|
||||
set(array: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView, offset?: number): void;
|
||||
set(array: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView, offset?: number): void {
|
||||
if (array instanceof VSBuffer) {
|
||||
this.buffer.set(array.buffer, offset);
|
||||
} else {
|
||||
} else if (array instanceof Uint8Array) {
|
||||
this.buffer.set(array, offset);
|
||||
} else if (array instanceof ArrayBuffer) {
|
||||
this.buffer.set(new Uint8Array(array), offset);
|
||||
} else if (ArrayBuffer.isView(array)) {
|
||||
this.buffer.set(new Uint8Array(array.buffer, array.byteOffset, array.byteLength), offset);
|
||||
} else {
|
||||
throw new Error(`Unkown argument 'array'`);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,89 @@ import { Emitter, Event } from 'vs/base/common/event';
|
|||
import { Disposable, dispose, IDisposable } from 'vs/base/common/lifecycle';
|
||||
import { IIPCLogger, IMessagePassingProtocol, IPCClient } from 'vs/base/parts/ipc/common/ipc';
|
||||
|
||||
export const enum SocketDiagnosticsEventType {
|
||||
Created = 'created',
|
||||
Read = 'read',
|
||||
Write = 'write',
|
||||
Open = 'open',
|
||||
Error = 'error',
|
||||
Close = 'close',
|
||||
|
||||
BrowserWebSocketBlobReceived = 'browserWebSocketBlobReceived',
|
||||
|
||||
NodeEndReceived = 'nodeEndReceived',
|
||||
NodeEndSent = 'nodeEndSent',
|
||||
NodeDrainBegin = 'nodeDrainBegin',
|
||||
NodeDrainEnd = 'nodeDrainEnd',
|
||||
|
||||
zlibInflateError = 'zlibInflateError',
|
||||
zlibInflateData = 'zlibInflateData',
|
||||
zlibInflateInitialWrite = 'zlibInflateInitialWrite',
|
||||
zlibInflateInitialFlushFired = 'zlibInflateInitialFlushFired',
|
||||
zlibInflateWrite = 'zlibInflateWrite',
|
||||
zlibInflateFlushFired = 'zlibInflateFlushFired',
|
||||
zlibDeflateError = 'zlibDeflateError',
|
||||
zlibDeflateData = 'zlibDeflateData',
|
||||
zlibDeflateWrite = 'zlibDeflateWrite',
|
||||
zlibDeflateFlushFired = 'zlibDeflateFlushFired',
|
||||
|
||||
WebSocketNodeSocketWrite = 'webSocketNodeSocketWrite',
|
||||
WebSocketNodeSocketPeekedHeader = 'webSocketNodeSocketPeekedHeader',
|
||||
WebSocketNodeSocketReadHeader = 'webSocketNodeSocketReadHeader',
|
||||
WebSocketNodeSocketReadData = 'webSocketNodeSocketReadData',
|
||||
WebSocketNodeSocketUnmaskedData = 'webSocketNodeSocketUnmaskedData',
|
||||
WebSocketNodeSocketDrainBegin = 'webSocketNodeSocketDrainBegin',
|
||||
WebSocketNodeSocketDrainEnd = 'webSocketNodeSocketDrainEnd',
|
||||
|
||||
ProtocolHeaderRead = 'protocolHeaderRead',
|
||||
ProtocolMessageRead = 'protocolMessageRead',
|
||||
ProtocolHeaderWrite = 'protocolHeaderWrite',
|
||||
ProtocolMessageWrite = 'protocolMessageWrite',
|
||||
ProtocolWrite = 'protocolWrite',
|
||||
}
|
||||
|
||||
export namespace SocketDiagnostics {
|
||||
|
||||
export const enableDiagnostics = false;
|
||||
|
||||
export interface IRecord {
|
||||
timestamp: number;
|
||||
id: string;
|
||||
label: string;
|
||||
type: SocketDiagnosticsEventType;
|
||||
buff?: VSBuffer;
|
||||
data?: any;
|
||||
}
|
||||
|
||||
export const records: IRecord[] = [];
|
||||
const socketIds = new WeakMap<any, string>();
|
||||
let lastUsedSocketId = 0;
|
||||
|
||||
function getSocketId(nativeObject: any, label: string): string {
|
||||
if (!socketIds.has(nativeObject)) {
|
||||
const id = String(++lastUsedSocketId);
|
||||
socketIds.set(nativeObject, id);
|
||||
}
|
||||
return socketIds.get(nativeObject)!;
|
||||
}
|
||||
|
||||
export function traceSocketEvent(nativeObject: any, socketDebugLabel: string, type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
|
||||
if (!enableDiagnostics) {
|
||||
return;
|
||||
}
|
||||
const id = getSocketId(nativeObject, socketDebugLabel);
|
||||
|
||||
if (data instanceof VSBuffer || data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
|
||||
const copiedData = VSBuffer.alloc(data.byteLength);
|
||||
copiedData.set(data);
|
||||
records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, buff: copiedData });
|
||||
} else {
|
||||
// data is a custom object
|
||||
records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, data: data });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const enum SocketCloseEventType {
|
||||
NodeSocketCloseEvent = 0,
|
||||
WebSocketCloseEvent = 1
|
||||
|
@ -60,6 +143,8 @@ export interface ISocket extends IDisposable {
|
|||
write(buffer: VSBuffer): void;
|
||||
end(): void;
|
||||
drain(): Promise<void>;
|
||||
|
||||
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;
|
||||
}
|
||||
|
||||
let emptyBuffer: VSBuffer | null = null;
|
||||
|
@ -168,9 +253,23 @@ const enum ProtocolMessageType {
|
|||
Regular = 1,
|
||||
Control = 2,
|
||||
Ack = 3,
|
||||
KeepAlive = 4,
|
||||
Disconnect = 5,
|
||||
ReplayRequest = 6
|
||||
ReplayRequest = 6,
|
||||
Pause = 7,
|
||||
Resume = 8
|
||||
}
|
||||
|
||||
function protocolMessageTypeToString(messageType: ProtocolMessageType) {
|
||||
switch (messageType) {
|
||||
case ProtocolMessageType.None: return 'None';
|
||||
case ProtocolMessageType.Regular: return 'Regular';
|
||||
case ProtocolMessageType.Control: return 'Control';
|
||||
case ProtocolMessageType.Ack: return 'Ack';
|
||||
case ProtocolMessageType.Disconnect: return 'Disconnect';
|
||||
case ProtocolMessageType.ReplayRequest: return 'ReplayRequest';
|
||||
case ProtocolMessageType.Pause: return 'PauseWriting';
|
||||
case ProtocolMessageType.Resume: return 'ResumeWriting';
|
||||
}
|
||||
}
|
||||
|
||||
export const enum ProtocolConstants {
|
||||
|
@ -180,17 +279,11 @@ export const enum ProtocolConstants {
|
|||
*/
|
||||
AcknowledgeTime = 2000, // 2 seconds
|
||||
/**
|
||||
* If there is a message that has been unacknowledged for 10 seconds, consider the connection closed...
|
||||
* If there is a sent message that has been unacknowledged for 20 seconds,
|
||||
* and we didn't see any incoming server data in the past 20 seconds,
|
||||
* then consider the connection has timed out.
|
||||
*/
|
||||
AcknowledgeTimeoutTime = 20000, // 20 seconds
|
||||
/**
|
||||
* Send at least a message every 5s for keep alive reasons.
|
||||
*/
|
||||
KeepAliveTime = 5000, // 5 seconds
|
||||
/**
|
||||
* If there is no message received for 10 seconds, consider the connection closed...
|
||||
*/
|
||||
KeepAliveTimeoutTime = 20000, // 20 seconds
|
||||
TimeoutTime = 20000, // 20 seconds
|
||||
/**
|
||||
* If there is no reconnection within this time-frame, consider the connection permanently closed...
|
||||
*/
|
||||
|
@ -268,6 +361,9 @@ class ProtocolReader extends Disposable {
|
|||
this._state.messageType = buff.readUInt8(0);
|
||||
this._state.id = buff.readUInt32BE(1);
|
||||
this._state.ack = buff.readUInt32BE(5);
|
||||
|
||||
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderRead, { messageType: protocolMessageTypeToString(this._state.messageType), id: this._state.id, ack: this._state.ack, messageSize: this._state.readLen });
|
||||
|
||||
} else {
|
||||
// buff is the body
|
||||
const messageType = this._state.messageType;
|
||||
|
@ -281,6 +377,8 @@ class ProtocolReader extends Disposable {
|
|||
this._state.id = 0;
|
||||
this._state.ack = 0;
|
||||
|
||||
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageRead, buff);
|
||||
|
||||
this._onMessage.fire(new ProtocolMessage(messageType, id, ack, buff));
|
||||
|
||||
if (this._isDisposed) {
|
||||
|
@ -304,6 +402,7 @@ class ProtocolReader extends Disposable {
|
|||
class ProtocolWriter {
|
||||
|
||||
private _isDisposed: boolean;
|
||||
private _isPaused: boolean;
|
||||
private readonly _socket: ISocket;
|
||||
private _data: VSBuffer[];
|
||||
private _totalLength: number;
|
||||
|
@ -311,6 +410,7 @@ class ProtocolWriter {
|
|||
|
||||
constructor(socket: ISocket) {
|
||||
this._isDisposed = false;
|
||||
this._isPaused = false;
|
||||
this._socket = socket;
|
||||
this._data = [];
|
||||
this._totalLength = 0;
|
||||
|
@ -336,6 +436,15 @@ class ProtocolWriter {
|
|||
this._writeNow();
|
||||
}
|
||||
|
||||
public pause(): void {
|
||||
this._isPaused = true;
|
||||
}
|
||||
|
||||
public resume(): void {
|
||||
this._isPaused = false;
|
||||
this._scheduleWriting();
|
||||
}
|
||||
|
||||
public write(msg: ProtocolMessage) {
|
||||
if (this._isDisposed) {
|
||||
// ignore: there could be left-over promises which complete and then
|
||||
|
@ -349,6 +458,10 @@ class ProtocolWriter {
|
|||
header.writeUInt32BE(msg.id, 1);
|
||||
header.writeUInt32BE(msg.ack, 5);
|
||||
header.writeUInt32BE(msg.data.byteLength, 9);
|
||||
|
||||
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderWrite, { messageType: protocolMessageTypeToString(msg.type), id: msg.id, ack: msg.ack, messageSize: msg.data.byteLength });
|
||||
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageWrite, msg.data);
|
||||
|
||||
this._writeSoon(header, msg.data);
|
||||
}
|
||||
|
||||
|
@ -368,17 +481,31 @@ class ProtocolWriter {
|
|||
|
||||
private _writeSoon(header: VSBuffer, data: VSBuffer): void {
|
||||
if (this._bufferAdd(header, data)) {
|
||||
setTimeout(() => {
|
||||
this._scheduleWriting();
|
||||
}
|
||||
}
|
||||
|
||||
private _writeNowTimeout: any = null;
|
||||
private _scheduleWriting(): void {
|
||||
if (this._writeNowTimeout) {
|
||||
return;
|
||||
}
|
||||
this._writeNowTimeout = setTimeout(() => {
|
||||
this._writeNowTimeout = null;
|
||||
this._writeNow();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private _writeNow(): void {
|
||||
if (this._totalLength === 0) {
|
||||
return;
|
||||
}
|
||||
this._socket.write(this._bufferTake());
|
||||
if (this._isPaused) {
|
||||
return;
|
||||
}
|
||||
const data = this._bufferTake();
|
||||
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolWrite, { byteLength: data.byteLength });
|
||||
this._socket.write(data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -650,9 +777,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
private _incomingMsgLastTime: number;
|
||||
private _incomingAckTimeout: any | null;
|
||||
|
||||
private _outgoingKeepAliveTimeout: any | null;
|
||||
private _incomingKeepAliveTimeout: any | null;
|
||||
|
||||
private _lastReplayRequestTime: number;
|
||||
|
||||
private _socket: ISocket;
|
||||
|
@ -694,9 +818,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
this._incomingMsgLastTime = 0;
|
||||
this._incomingAckTimeout = null;
|
||||
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
|
||||
this._lastReplayRequestTime = 0;
|
||||
|
||||
this._socketDisposables = [];
|
||||
|
@ -710,9 +831,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
if (initialChunk) {
|
||||
this._socketReader.acceptChunk(initialChunk);
|
||||
}
|
||||
|
||||
this._sendKeepAliveCheck();
|
||||
this._recvKeepAliveCheck();
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
|
@ -724,14 +842,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
clearTimeout(this._incomingAckTimeout);
|
||||
this._incomingAckTimeout = null;
|
||||
}
|
||||
if (this._outgoingKeepAliveTimeout) {
|
||||
clearTimeout(this._outgoingKeepAliveTimeout);
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
}
|
||||
if (this._incomingKeepAliveTimeout) {
|
||||
clearTimeout(this._incomingKeepAliveTimeout);
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
}
|
||||
this._socketDisposables = dispose(this._socketDisposables);
|
||||
}
|
||||
|
||||
|
@ -745,50 +855,18 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
this._socketWriter.flush();
|
||||
}
|
||||
|
||||
private _sendKeepAliveCheck(): void {
|
||||
if (this._outgoingKeepAliveTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastOutgoingMsg = Date.now() - this._socketWriter.lastWriteTime;
|
||||
if (timeSinceLastOutgoingMsg >= ProtocolConstants.KeepAliveTime) {
|
||||
// sufficient time has passed since last message was written,
|
||||
// and no message from our side needed to be sent in the meantime,
|
||||
// so we will send a message containing only a keep alive.
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, 0, getEmptyBuffer());
|
||||
sendPause(): void {
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Pause, 0, 0, getEmptyBuffer());
|
||||
this._socketWriter.write(msg);
|
||||
this._sendKeepAliveCheck();
|
||||
return;
|
||||
}
|
||||
|
||||
this._outgoingKeepAliveTimeout = setTimeout(() => {
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
this._sendKeepAliveCheck();
|
||||
}, ProtocolConstants.KeepAliveTime - timeSinceLastOutgoingMsg + 5);
|
||||
sendResume(): void {
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Resume, 0, 0, getEmptyBuffer());
|
||||
this._socketWriter.write(msg);
|
||||
}
|
||||
|
||||
private _recvKeepAliveCheck(): void {
|
||||
if (this._incomingKeepAliveTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastIncomingMsg = Date.now() - this._socketReader.lastReadTime;
|
||||
if (timeSinceLastIncomingMsg >= ProtocolConstants.KeepAliveTimeoutTime) {
|
||||
// It's been a long time since we received a server message
|
||||
// But this might be caused by the event loop being busy and failing to read messages
|
||||
if (!this._loadEstimator.hasHighLoad()) {
|
||||
// Trash the socket
|
||||
this._onSocketTimeout.fire(undefined);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this._incomingKeepAliveTimeout = setTimeout(() => {
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
this._recvKeepAliveCheck();
|
||||
}, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5);
|
||||
pauseSocketWriting() {
|
||||
this._socketWriter.pause();
|
||||
}
|
||||
|
||||
public getSocket(): ISocket {
|
||||
|
@ -829,9 +907,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
this._socketWriter.write(toSend[i]);
|
||||
}
|
||||
this._recvAckCheck();
|
||||
|
||||
this._sendKeepAliveCheck();
|
||||
this._recvKeepAliveCheck();
|
||||
}
|
||||
|
||||
public acceptDisconnect(): void {
|
||||
|
@ -852,7 +927,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
} while (true);
|
||||
}
|
||||
|
||||
if (msg.type === ProtocolMessageType.Regular) {
|
||||
switch (msg.type) {
|
||||
case ProtocolMessageType.None: {
|
||||
// N/A
|
||||
break;
|
||||
}
|
||||
case ProtocolMessageType.Regular: {
|
||||
if (msg.id > this._incomingMsgId) {
|
||||
if (msg.id !== this._incomingMsgId + 1) {
|
||||
// in case we missed some messages we ask the other party to resend them
|
||||
|
@ -869,17 +949,37 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
this._onMessage.fire(msg.data);
|
||||
}
|
||||
}
|
||||
} else if (msg.type === ProtocolMessageType.Control) {
|
||||
break;
|
||||
}
|
||||
case ProtocolMessageType.Control: {
|
||||
this._onControlMessage.fire(msg.data);
|
||||
} else if (msg.type === ProtocolMessageType.Disconnect) {
|
||||
break;
|
||||
}
|
||||
case ProtocolMessageType.Ack: {
|
||||
// nothing to do
|
||||
break;
|
||||
}
|
||||
case ProtocolMessageType.Disconnect: {
|
||||
this._onDidDispose.fire();
|
||||
} else if (msg.type === ProtocolMessageType.ReplayRequest) {
|
||||
break;
|
||||
}
|
||||
case ProtocolMessageType.ReplayRequest: {
|
||||
// Send again all unacknowledged messages
|
||||
const toSend = this._outgoingUnackMsg.toArray();
|
||||
for (let i = 0, len = toSend.length; i < len; i++) {
|
||||
this._socketWriter.write(toSend[i]);
|
||||
}
|
||||
this._recvAckCheck();
|
||||
break;
|
||||
}
|
||||
case ProtocolMessageType.Pause: {
|
||||
this._socketWriter.pause();
|
||||
break;
|
||||
}
|
||||
case ProtocolMessageType.Resume: {
|
||||
this._socketWriter.resume();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -956,8 +1056,15 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
|
||||
const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!;
|
||||
const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime;
|
||||
if (timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.AcknowledgeTimeoutTime) {
|
||||
const timeSinceLastReceivedSomeData = Date.now() - this._socketReader.lastReadTime;
|
||||
|
||||
if (
|
||||
timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.TimeoutTime
|
||||
&& timeSinceLastReceivedSomeData >= ProtocolConstants.TimeoutTime
|
||||
) {
|
||||
// It's been a long time since our sent message was acknowledged
|
||||
// and a long time since we received some data
|
||||
|
||||
// But this might be caused by the event loop being busy and failing to read messages
|
||||
if (!this._loadEstimator.hasHighLoad()) {
|
||||
// Trash the socket
|
||||
|
@ -969,7 +1076,7 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
this._outgoingAckTimeout = setTimeout(() => {
|
||||
this._outgoingAckTimeout = null;
|
||||
this._recvAckCheck();
|
||||
}, Math.max(ProtocolConstants.AcknowledgeTimeoutTime - timeSinceOldestUnacknowledgedMsg, 0) + 5);
|
||||
}, Math.max(ProtocolConstants.TimeoutTime - timeSinceOldestUnacknowledgedMsg, 500));
|
||||
}
|
||||
|
||||
private _sendAck(): void {
|
||||
|
@ -983,3 +1090,39 @@ export class PersistentProtocol implements IMessagePassingProtocol {
|
|||
this._socketWriter.write(msg);
|
||||
}
|
||||
}
|
||||
|
||||
// (() => {
|
||||
// if (!SocketDiagnostics.enableDiagnostics) {
|
||||
// return;
|
||||
// }
|
||||
// if (typeof require.__$__nodeRequire !== 'function') {
|
||||
// console.log(`Can only log socket diagnostics on native platforms.`);
|
||||
// return;
|
||||
// }
|
||||
// const type = (
|
||||
// process.argv.includes('--type=renderer')
|
||||
// ? 'renderer'
|
||||
// : (process.argv.includes('--type=extensionHost')
|
||||
// ? 'extensionHost'
|
||||
// : (process.argv.some(item => item.includes('server/main'))
|
||||
// ? 'server'
|
||||
// : 'unknown'
|
||||
// )
|
||||
// )
|
||||
// );
|
||||
// setTimeout(() => {
|
||||
// SocketDiagnostics.records.forEach(r => {
|
||||
// if (r.buff) {
|
||||
// r.data = Buffer.from(r.buff.buffer).toString('base64');
|
||||
// r.buff = undefined;
|
||||
// }
|
||||
// });
|
||||
|
||||
// const fs = <typeof import('fs')>require.__$__nodeRequire('fs');
|
||||
// const path = <typeof import('path')>require.__$__nodeRequire('path');
|
||||
// const logPath = path.join(process.cwd(),`${type}-${process.pid}`);
|
||||
|
||||
// console.log(`dumping socket diagnostics at ${logPath}`);
|
||||
// fs.writeFileSync(logPath, JSON.stringify(SocketDiagnostics.records));
|
||||
// }, 20000);
|
||||
// })();
|
||||
|
|
|
@ -14,17 +14,25 @@ import { join } from 'vs/base/common/path';
|
|||
import { Platform, platform } from 'vs/base/common/platform';
|
||||
import { generateUuid } from 'vs/base/common/uuid';
|
||||
import { ClientConnectionEvent, IPCServer } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { ChunkStream, Client, ISocket, Protocol, SocketCloseEvent, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
import { ChunkStream, Client, ISocket, Protocol, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
import * as zlib from 'zlib';
|
||||
|
||||
export class NodeSocket implements ISocket {
|
||||
|
||||
public readonly debugLabel: string;
|
||||
public readonly socket: Socket;
|
||||
private readonly _errorListener: (err: any) => void;
|
||||
|
||||
constructor(socket: Socket) {
|
||||
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
|
||||
SocketDiagnostics.traceSocketEvent(this.socket, this.debugLabel, type, data);
|
||||
}
|
||||
|
||||
constructor(socket: Socket, debugLabel: string = '') {
|
||||
this.debugLabel = debugLabel;
|
||||
this.socket = socket;
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'NodeSocket' });
|
||||
this._errorListener = (err: any) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Error, { code: err?.code, message: err?.message });
|
||||
if (err) {
|
||||
if (err.code === 'EPIPE') {
|
||||
// An EPIPE exception at the wrong time can lead to a renderer process crash
|
||||
|
@ -47,7 +55,10 @@ export class NodeSocket implements ISocket {
|
|||
}
|
||||
|
||||
public onData(_listener: (e: VSBuffer) => void): IDisposable {
|
||||
const listener = (buff: Buffer) => _listener(VSBuffer.wrap(buff));
|
||||
const listener = (buff: Buffer) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Read, buff);
|
||||
_listener(VSBuffer.wrap(buff));
|
||||
};
|
||||
this.socket.on('data', listener);
|
||||
return {
|
||||
dispose: () => this.socket.off('data', listener)
|
||||
|
@ -56,6 +67,7 @@ export class NodeSocket implements ISocket {
|
|||
|
||||
public onClose(listener: (e: SocketCloseEvent) => void): IDisposable {
|
||||
const adapter = (hadError: boolean) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Close, { hadError });
|
||||
listener({
|
||||
type: SocketCloseEventType.NodeSocketCloseEvent,
|
||||
hadError: hadError,
|
||||
|
@ -69,9 +81,13 @@ export class NodeSocket implements ISocket {
|
|||
}
|
||||
|
||||
public onEnd(listener: () => void): IDisposable {
|
||||
this.socket.on('end', listener);
|
||||
const adapter = () => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndReceived);
|
||||
listener();
|
||||
};
|
||||
this.socket.on('end', adapter);
|
||||
return {
|
||||
dispose: () => this.socket.off('end', listener)
|
||||
dispose: () => this.socket.off('end', adapter)
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -87,7 +103,8 @@ export class NodeSocket implements ISocket {
|
|||
// > However, the false return value is only advisory and the writable stream will unconditionally
|
||||
// > accept and buffer chunk even if it has not been allowed to drain.
|
||||
try {
|
||||
this.socket.write(<Buffer>buffer.buffer, (err: any) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Write, buffer);
|
||||
this.socket.write(buffer.buffer, (err: any) => {
|
||||
if (err) {
|
||||
if (err.code === 'EPIPE') {
|
||||
// An EPIPE exception at the wrong time can lead to a renderer process crash
|
||||
|
@ -116,12 +133,15 @@ export class NodeSocket implements ISocket {
|
|||
}
|
||||
|
||||
public end(): void {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndSent);
|
||||
this.socket.end();
|
||||
}
|
||||
|
||||
public drain(): Promise<void> {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainBegin);
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (this.socket.bufferSize === 0) {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd);
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
@ -131,6 +151,7 @@ export class NodeSocket implements ISocket {
|
|||
this.socket.off('error', finished);
|
||||
this.socket.off('timeout', finished);
|
||||
this.socket.off('drain', finished);
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd);
|
||||
resolve();
|
||||
};
|
||||
this.socket.on('close', finished);
|
||||
|
@ -153,25 +174,17 @@ const enum ReadState {
|
|||
Fin = 4
|
||||
}
|
||||
|
||||
interface ISocketTracer {
|
||||
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void
|
||||
}
|
||||
|
||||
/**
|
||||
* See https://tools.ietf.org/html/rfc6455#section-5.2
|
||||
*/
|
||||
export class WebSocketNodeSocket extends Disposable implements ISocket {
|
||||
export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketTracer {
|
||||
|
||||
public readonly socket: NodeSocket;
|
||||
public readonly permessageDeflate: boolean;
|
||||
private _totalIncomingWireBytes: number;
|
||||
private _totalIncomingDataBytes: number;
|
||||
private _totalOutgoingWireBytes: number;
|
||||
private _totalOutgoingDataBytes: number;
|
||||
private readonly _zlibInflate: zlib.InflateRaw | null;
|
||||
private readonly _zlibDeflate: zlib.DeflateRaw | null;
|
||||
private _zlibDeflateFlushWaitingCount: number;
|
||||
private readonly _onDidZlibFlush = this._register(new Emitter<void>());
|
||||
private readonly _recordInflateBytes: boolean;
|
||||
private readonly _recordedInflateBytes: Buffer[] = [];
|
||||
private readonly _pendingInflateData: Buffer[] = [];
|
||||
private readonly _pendingDeflateData: Buffer[] = [];
|
||||
private readonly _flowManager: WebSocketFlowManager;
|
||||
private readonly _incomingData: ChunkStream;
|
||||
private readonly _onData = this._register(new Emitter<VSBuffer>());
|
||||
private readonly _onClose = this._register(new Emitter<SocketCloseEvent>());
|
||||
|
@ -186,27 +199,16 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
mask: 0
|
||||
};
|
||||
|
||||
public get totalIncomingWireBytes(): number {
|
||||
return this._totalIncomingWireBytes;
|
||||
}
|
||||
|
||||
public get totalIncomingDataBytes(): number {
|
||||
return this._totalIncomingDataBytes;
|
||||
}
|
||||
|
||||
public get totalOutgoingWireBytes(): number {
|
||||
return this._totalOutgoingWireBytes;
|
||||
}
|
||||
|
||||
public get totalOutgoingDataBytes(): number {
|
||||
return this._totalOutgoingDataBytes;
|
||||
public get permessageDeflate(): boolean {
|
||||
return this._flowManager.permessageDeflate;
|
||||
}
|
||||
|
||||
public get recordedInflateBytes(): VSBuffer {
|
||||
if (this._recordInflateBytes) {
|
||||
return VSBuffer.wrap(Buffer.concat(this._recordedInflateBytes));
|
||||
return this._flowManager.recordedInflateBytes;
|
||||
}
|
||||
return VSBuffer.alloc(0);
|
||||
|
||||
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
|
||||
this.socket.traceSocketEvent(type, data);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -224,20 +226,16 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
constructor(socket: NodeSocket, permessageDeflate: boolean, inflateBytes: VSBuffer | null, recordInflateBytes: boolean) {
|
||||
super();
|
||||
this.socket = socket;
|
||||
this._totalIncomingWireBytes = 0;
|
||||
this._totalIncomingDataBytes = 0;
|
||||
this._totalOutgoingWireBytes = 0;
|
||||
this._totalOutgoingDataBytes = 0;
|
||||
this.permessageDeflate = permessageDeflate;
|
||||
this._recordInflateBytes = recordInflateBytes;
|
||||
if (permessageDeflate) {
|
||||
// See https://tools.ietf.org/html/rfc7692#page-16
|
||||
// To simplify our logic, we don't negotiate the window size
|
||||
// and simply dedicate (2^15) / 32kb per web socket
|
||||
this._zlibInflate = zlib.createInflateRaw({
|
||||
windowBits: 15
|
||||
});
|
||||
this._zlibInflate.on('error', (err) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'WebSocketNodeSocket', permessageDeflate, inflateBytesLength: inflateBytes?.byteLength || 0, recordInflateBytes });
|
||||
this._flowManager = this._register(new WebSocketFlowManager(
|
||||
this,
|
||||
permessageDeflate,
|
||||
inflateBytes,
|
||||
recordInflateBytes,
|
||||
this._onData,
|
||||
(data, compressed) => this._write(data, compressed)
|
||||
));
|
||||
this._register(this._flowManager.onError((err) => {
|
||||
// zlib errors are fatal, since we have no idea how to recover
|
||||
console.error(err);
|
||||
onUnexpectedError(err);
|
||||
|
@ -246,47 +244,16 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
hadError: true,
|
||||
error: err
|
||||
});
|
||||
});
|
||||
this._zlibInflate.on('data', (data: Buffer) => {
|
||||
this._pendingInflateData.push(data);
|
||||
});
|
||||
if (inflateBytes) {
|
||||
this._zlibInflate.write(inflateBytes.buffer);
|
||||
this._zlibInflate.flush(() => {
|
||||
this._pendingInflateData.length = 0;
|
||||
});
|
||||
}
|
||||
|
||||
this._zlibDeflate = zlib.createDeflateRaw({
|
||||
windowBits: 15
|
||||
});
|
||||
this._zlibDeflate.on('error', (err) => {
|
||||
// zlib errors are fatal, since we have no idea how to recover
|
||||
console.error(err);
|
||||
onUnexpectedError(err);
|
||||
this._onClose.fire({
|
||||
type: SocketCloseEventType.NodeSocketCloseEvent,
|
||||
hadError: true,
|
||||
error: err
|
||||
});
|
||||
});
|
||||
this._zlibDeflate.on('data', (data: Buffer) => {
|
||||
this._pendingDeflateData.push(data);
|
||||
});
|
||||
} else {
|
||||
this._zlibInflate = null;
|
||||
this._zlibDeflate = null;
|
||||
}
|
||||
this._zlibDeflateFlushWaitingCount = 0;
|
||||
}));
|
||||
this._incomingData = new ChunkStream();
|
||||
this._register(this.socket.onData(data => this._acceptChunk(data)));
|
||||
this._register(this.socket.onClose((e) => this._onClose.fire(e)));
|
||||
}
|
||||
|
||||
public override dispose(): void {
|
||||
if (this._zlibDeflateFlushWaitingCount > 0) {
|
||||
if (this._flowManager.isProcessingWriteQueue()) {
|
||||
// Wait for any outstanding writes to finish before disposing
|
||||
this._register(this._onDidZlibFlush.event(() => {
|
||||
this._register(this._flowManager.onDidFinishProcessingWriteQueue(() => {
|
||||
this.dispose();
|
||||
}));
|
||||
} else {
|
||||
|
@ -308,36 +275,16 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
}
|
||||
|
||||
public write(buffer: VSBuffer): void {
|
||||
this._totalOutgoingDataBytes += buffer.byteLength;
|
||||
|
||||
if (this._zlibDeflate) {
|
||||
this._zlibDeflate.write(<Buffer>buffer.buffer);
|
||||
|
||||
this._zlibDeflateFlushWaitingCount++;
|
||||
// See https://zlib.net/manual.html#Constants
|
||||
this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => {
|
||||
this._zlibDeflateFlushWaitingCount--;
|
||||
let data = Buffer.concat(this._pendingDeflateData);
|
||||
this._pendingDeflateData.length = 0;
|
||||
|
||||
// See https://tools.ietf.org/html/rfc7692#section-7.2.1
|
||||
data = data.slice(0, data.length - 4);
|
||||
|
||||
if (!this._isEnded) {
|
||||
// Avoid ERR_STREAM_WRITE_AFTER_END
|
||||
this._write(VSBuffer.wrap(data), true);
|
||||
}
|
||||
|
||||
if (this._zlibDeflateFlushWaitingCount === 0) {
|
||||
this._onDidZlibFlush.fire();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
this._write(buffer, false);
|
||||
}
|
||||
this._flowManager.writeMessage(buffer);
|
||||
}
|
||||
|
||||
private _write(buffer: VSBuffer, compressed: boolean): void {
|
||||
if (this._isEnded) {
|
||||
// Avoid ERR_STREAM_WRITE_AFTER_END
|
||||
return;
|
||||
}
|
||||
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketWrite, buffer);
|
||||
let headerLen = Constants.MinHeaderByteSize;
|
||||
if (buffer.byteLength < 126) {
|
||||
headerLen += 0;
|
||||
|
@ -374,7 +321,6 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
header.writeUInt8((buffer.byteLength >>> 0) & 0b11111111, ++offset);
|
||||
}
|
||||
|
||||
this._totalOutgoingWireBytes += header.byteLength + buffer.byteLength;
|
||||
this.socket.write(VSBuffer.concat([header, buffer]));
|
||||
}
|
||||
|
||||
|
@ -387,7 +333,6 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
if (data.byteLength === 0) {
|
||||
return;
|
||||
}
|
||||
this._totalIncomingWireBytes += data.byteLength;
|
||||
|
||||
this._incomingData.acceptChunk(data);
|
||||
|
||||
|
@ -413,6 +358,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
this._state.firstFrameOfMessage = Boolean(finBit);
|
||||
this._state.mask = 0;
|
||||
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin });
|
||||
|
||||
} else if (this._state.state === ReadState.ReadHeader) {
|
||||
// read entire header
|
||||
const header = this._incomingData.read(this._state.readLen);
|
||||
|
@ -453,52 +400,268 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
|
|||
this._state.readLen = len;
|
||||
this._state.mask = mask;
|
||||
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask });
|
||||
|
||||
} else if (this._state.state === ReadState.ReadBody) {
|
||||
// read body
|
||||
|
||||
const body = this._incomingData.read(this._state.readLen);
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketReadData, body);
|
||||
|
||||
unmask(body, this._state.mask);
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketUnmaskedData, body);
|
||||
|
||||
this._state.state = ReadState.PeekHeader;
|
||||
this._state.readLen = Constants.MinHeaderByteSize;
|
||||
this._state.mask = 0;
|
||||
|
||||
if (this._zlibInflate && this._state.compressed) {
|
||||
// See https://datatracker.ietf.org/doc/html/rfc7692#section-9.2
|
||||
// Even if permessageDeflate is negotiated, it is possible
|
||||
// that the other side might decide to send uncompressed messages
|
||||
// So only decompress messages that have the RSV 1 bit set
|
||||
//
|
||||
// See https://tools.ietf.org/html/rfc7692#section-7.2.2
|
||||
if (this._recordInflateBytes) {
|
||||
this._recordedInflateBytes.push(Buffer.from(<Buffer>body.buffer));
|
||||
}
|
||||
this._zlibInflate.write(<Buffer>body.buffer);
|
||||
if (this._state.fin) {
|
||||
if (this._recordInflateBytes) {
|
||||
this._recordedInflateBytes.push(Buffer.from([0x00, 0x00, 0xff, 0xff]));
|
||||
}
|
||||
this._zlibInflate.write(Buffer.from([0x00, 0x00, 0xff, 0xff]));
|
||||
}
|
||||
this._zlibInflate.flush(() => {
|
||||
const data = Buffer.concat(this._pendingInflateData);
|
||||
this._pendingInflateData.length = 0;
|
||||
this._totalIncomingDataBytes += data.length;
|
||||
this._onData.fire(VSBuffer.wrap(data));
|
||||
});
|
||||
} else {
|
||||
this._totalIncomingDataBytes += body.byteLength;
|
||||
this._onData.fire(body);
|
||||
}
|
||||
this._flowManager.acceptFrame(body, this._state.compressed, !!this._state.fin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async drain(): Promise<void> {
|
||||
if (this._zlibDeflateFlushWaitingCount > 0) {
|
||||
await Event.toPromise(this._onDidZlibFlush.event);
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainBegin);
|
||||
if (this._flowManager.isProcessingWriteQueue()) {
|
||||
await Event.toPromise(this._flowManager.onDidFinishProcessingWriteQueue);
|
||||
}
|
||||
await this.socket.drain();
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainEnd);
|
||||
}
|
||||
}
|
||||
|
||||
class WebSocketFlowManager extends Disposable {
|
||||
|
||||
private readonly _onError = this._register(new Emitter<Error>());
|
||||
public readonly onError = this._onError.event;
|
||||
|
||||
private readonly _zlibInflateStream: ZlibInflateStream | null;
|
||||
private readonly _zlibDeflateStream: ZlibDeflateStream | null;
|
||||
private readonly _writeQueue: VSBuffer[] = [];
|
||||
private readonly _readQueue: { data: VSBuffer, isCompressed: boolean, isLastFrameOfMessage: boolean }[] = [];
|
||||
|
||||
private readonly _onDidFinishProcessingWriteQueue = this._register(new Emitter<void>());
|
||||
public readonly onDidFinishProcessingWriteQueue = this._onDidFinishProcessingWriteQueue.event;
|
||||
|
||||
public get permessageDeflate(): boolean {
|
||||
return Boolean(this._zlibInflateStream && this._zlibDeflateStream);
|
||||
}
|
||||
|
||||
public get recordedInflateBytes(): VSBuffer {
|
||||
if (this._zlibInflateStream) {
|
||||
return this._zlibInflateStream.recordedInflateBytes;
|
||||
}
|
||||
return VSBuffer.alloc(0);
|
||||
}
|
||||
|
||||
constructor(
|
||||
private readonly _tracer: ISocketTracer,
|
||||
permessageDeflate: boolean,
|
||||
inflateBytes: VSBuffer | null,
|
||||
recordInflateBytes: boolean,
|
||||
private readonly _onData: Emitter<VSBuffer>,
|
||||
private readonly _writeFn: (data: VSBuffer, compressed: boolean) => void
|
||||
) {
|
||||
super();
|
||||
if (permessageDeflate) {
|
||||
// See https://tools.ietf.org/html/rfc7692#page-16
|
||||
// To simplify our logic, we don't negotiate the window size
|
||||
// and simply dedicate (2^15) / 32kb per web socket
|
||||
this._zlibInflateStream = this._register(new ZlibInflateStream(this._tracer, recordInflateBytes, inflateBytes, { windowBits: 15 }));
|
||||
this._zlibDeflateStream = this._register(new ZlibDeflateStream(this._tracer, { windowBits: 15 }));
|
||||
this._register(this._zlibInflateStream.onError((err) => this._onError.fire(err)));
|
||||
this._register(this._zlibDeflateStream.onError((err) => this._onError.fire(err)));
|
||||
} else {
|
||||
this._zlibInflateStream = null;
|
||||
this._zlibDeflateStream = null;
|
||||
}
|
||||
}
|
||||
|
||||
public writeMessage(message: VSBuffer): void {
|
||||
this._writeQueue.push(message);
|
||||
this._processWriteQueue();
|
||||
}
|
||||
|
||||
private _isProcessingWriteQueue = false;
|
||||
private async _processWriteQueue(): Promise<void> {
|
||||
if (this._isProcessingWriteQueue) {
|
||||
return;
|
||||
}
|
||||
this._isProcessingWriteQueue = true;
|
||||
while (this._writeQueue.length > 0) {
|
||||
const message = this._writeQueue.shift()!;
|
||||
if (this._zlibDeflateStream) {
|
||||
const data = await this._deflateMessage(this._zlibDeflateStream, message);
|
||||
this._writeFn(data, true);
|
||||
} else {
|
||||
this._writeFn(message, false);
|
||||
}
|
||||
}
|
||||
this._isProcessingWriteQueue = false;
|
||||
this._onDidFinishProcessingWriteQueue.fire();
|
||||
}
|
||||
|
||||
public isProcessingWriteQueue(): boolean {
|
||||
return (this._isProcessingWriteQueue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subsequent calls should wait for the previous `_deflateBuffer` call to complete.
|
||||
*/
|
||||
private _deflateMessage(zlibDeflateStream: ZlibDeflateStream, buffer: VSBuffer): Promise<VSBuffer> {
|
||||
return new Promise<VSBuffer>((resolve, reject) => {
|
||||
zlibDeflateStream.write(buffer);
|
||||
zlibDeflateStream.flush(data => resolve(data));
|
||||
});
|
||||
}
|
||||
|
||||
public acceptFrame(data: VSBuffer, isCompressed: boolean, isLastFrameOfMessage: boolean): void {
|
||||
this._readQueue.push({ data, isCompressed, isLastFrameOfMessage });
|
||||
this._processReadQueue();
|
||||
}
|
||||
|
||||
private _isProcessingReadQueue = false;
|
||||
private async _processReadQueue(): Promise<void> {
|
||||
if (this._isProcessingReadQueue) {
|
||||
return;
|
||||
}
|
||||
this._isProcessingReadQueue = true;
|
||||
while (this._readQueue.length > 0) {
|
||||
const frameInfo = this._readQueue.shift()!;
|
||||
if (this._zlibInflateStream && frameInfo.isCompressed) {
|
||||
// See https://datatracker.ietf.org/doc/html/rfc7692#section-9.2
|
||||
// Even if permessageDeflate is negotiated, it is possible
|
||||
// that the other side might decide to send uncompressed messages
|
||||
// So only decompress messages that have the RSV 1 bit set
|
||||
const data = await this._inflateFrame(this._zlibInflateStream, frameInfo.data, frameInfo.isLastFrameOfMessage);
|
||||
this._onData.fire(data);
|
||||
} else {
|
||||
this._onData.fire(frameInfo.data);
|
||||
}
|
||||
}
|
||||
this._isProcessingReadQueue = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subsequent calls should wait for the previous `transformRead` call to complete.
|
||||
*/
|
||||
private _inflateFrame(zlibInflateStream: ZlibInflateStream, buffer: VSBuffer, isLastFrameOfMessage: boolean): Promise<VSBuffer> {
|
||||
return new Promise<VSBuffer>((resolve, reject) => {
|
||||
// See https://tools.ietf.org/html/rfc7692#section-7.2.2
|
||||
zlibInflateStream.write(buffer);
|
||||
if (isLastFrameOfMessage) {
|
||||
zlibInflateStream.write(VSBuffer.fromByteArray([0x00, 0x00, 0xff, 0xff]));
|
||||
}
|
||||
zlibInflateStream.flush(data => resolve(data));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class ZlibInflateStream extends Disposable {
|
||||
|
||||
private readonly _onError = this._register(new Emitter<Error>());
|
||||
public readonly onError = this._onError.event;
|
||||
|
||||
private readonly _zlibInflate: zlib.InflateRaw;
|
||||
private readonly _recordedInflateBytes: VSBuffer[] = [];
|
||||
private readonly _pendingInflateData: VSBuffer[] = [];
|
||||
|
||||
public get recordedInflateBytes(): VSBuffer {
|
||||
if (this._recordInflateBytes) {
|
||||
return VSBuffer.concat(this._recordedInflateBytes);
|
||||
}
|
||||
return VSBuffer.alloc(0);
|
||||
}
|
||||
|
||||
constructor(
|
||||
private readonly _tracer: ISocketTracer,
|
||||
private readonly _recordInflateBytes: boolean,
|
||||
inflateBytes: VSBuffer | null,
|
||||
options: zlib.ZlibOptions
|
||||
) {
|
||||
super();
|
||||
this._zlibInflate = zlib.createInflateRaw(options);
|
||||
this._zlibInflate.on('error', (err) => {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (<any>err)?.code });
|
||||
this._onError.fire(err);
|
||||
});
|
||||
this._zlibInflate.on('data', (data: Buffer) => {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data);
|
||||
this._pendingInflateData.push(VSBuffer.wrap(data));
|
||||
});
|
||||
if (inflateBytes) {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialWrite, inflateBytes.buffer);
|
||||
this._zlibInflate.write(inflateBytes.buffer);
|
||||
this._zlibInflate.flush(() => {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired);
|
||||
this._pendingInflateData.length = 0;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public write(buffer: VSBuffer): void {
|
||||
if (this._recordInflateBytes) {
|
||||
this._recordedInflateBytes.push(buffer.clone());
|
||||
}
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buffer);
|
||||
this._zlibInflate.write(buffer.buffer);
|
||||
}
|
||||
|
||||
public flush(callback: (data: VSBuffer) => void): void {
|
||||
this._zlibInflate.flush(() => {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired);
|
||||
const data = VSBuffer.concat(this._pendingInflateData);
|
||||
this._pendingInflateData.length = 0;
|
||||
callback(data);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class ZlibDeflateStream extends Disposable {
|
||||
|
||||
private readonly _onError = this._register(new Emitter<Error>());
|
||||
public readonly onError = this._onError.event;
|
||||
|
||||
private readonly _zlibDeflate: zlib.DeflateRaw;
|
||||
private readonly _pendingDeflateData: VSBuffer[] = [];
|
||||
|
||||
constructor(
|
||||
private readonly _tracer: ISocketTracer,
|
||||
options: zlib.ZlibOptions
|
||||
) {
|
||||
super();
|
||||
|
||||
this._zlibDeflate = zlib.createDeflateRaw({
|
||||
windowBits: 15
|
||||
});
|
||||
this._zlibDeflate.on('error', (err) => {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (<any>err)?.code });
|
||||
this._onError.fire(err);
|
||||
});
|
||||
this._zlibDeflate.on('data', (data: Buffer) => {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data);
|
||||
this._pendingDeflateData.push(VSBuffer.wrap(data));
|
||||
});
|
||||
}
|
||||
|
||||
public write(buffer: VSBuffer): void {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer);
|
||||
this._zlibDeflate.write(<Buffer>buffer.buffer);
|
||||
}
|
||||
|
||||
public flush(callback: (data: VSBuffer) => void): void {
|
||||
// See https://zlib.net/manual.html#Constants
|
||||
this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => {
|
||||
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired);
|
||||
|
||||
let data = VSBuffer.concat(this._pendingDeflateData);
|
||||
this._pendingDeflateData.length = 0;
|
||||
|
||||
// See https://tools.ietf.org/html/rfc7692#section-7.2.1
|
||||
data = data.slice(0, data.byteLength - 4);
|
||||
|
||||
callback(data);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -597,7 +760,7 @@ export class Server extends IPCServer {
|
|||
const onConnection = Event.fromNodeEventEmitter<Socket>(server, 'connection');
|
||||
|
||||
return Event.map(onConnection, socket => ({
|
||||
protocol: new Protocol(new NodeSocket(socket)),
|
||||
protocol: new Protocol(new NodeSocket(socket, 'ipc-server-connection')),
|
||||
onDidClientDisconnect: Event.once(Event.fromNodeEventEmitter<void>(socket, 'close'))
|
||||
}));
|
||||
}
|
||||
|
@ -639,7 +802,7 @@ export function connect(hook: any, clientId: string): Promise<Client> {
|
|||
return new Promise<Client>((c, e) => {
|
||||
const socket = createConnection(hook, () => {
|
||||
socket.removeListener('error', e);
|
||||
c(Client.fromSocket(new NodeSocket(socket), clientId));
|
||||
c(Client.fromSocket(new NodeSocket(socket, `ipc-client${clientId}`), clientId));
|
||||
});
|
||||
|
||||
socket.once('error', e);
|
||||
|
|
|
@ -11,7 +11,7 @@ import { Barrier, timeout } from 'vs/base/common/async';
|
|||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
import { Emitter } from 'vs/base/common/event';
|
||||
import { Disposable, DisposableStore } from 'vs/base/common/lifecycle';
|
||||
import { ILoadEstimator, PersistentProtocol, Protocol, ProtocolConstants, SocketCloseEvent } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
import { ILoadEstimator, PersistentProtocol, Protocol, ProtocolConstants, SocketCloseEvent, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
import { createRandomIPCHandle, createStaticIPCHandle, NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
|
||||
import { runWithFakedTimers } from 'vs/base/test/common/timeTravelScheduler';
|
||||
import { ensureNoDisposablesAreLeakedInTestSuite } from 'vs/base/test/common/utils';
|
||||
|
@ -342,7 +342,7 @@ suite('PersistentProtocol reconnection', () => {
|
|||
assert.strictEqual(b.unacknowledgedCount, 1);
|
||||
|
||||
// wait for scheduled _recvAckCheck() to execute
|
||||
await timeout(2 * ProtocolConstants.AcknowledgeTimeoutTime);
|
||||
await timeout(2 * ProtocolConstants.TimeoutTime);
|
||||
|
||||
assert.strictEqual(a.unacknowledgedCount, 1);
|
||||
assert.strictEqual(b.unacknowledgedCount, 1);
|
||||
|
@ -351,7 +351,7 @@ suite('PersistentProtocol reconnection', () => {
|
|||
a.endAcceptReconnection();
|
||||
assert.strictEqual(timeoutListenerCalled, false);
|
||||
|
||||
await timeout(2 * ProtocolConstants.AcknowledgeTimeoutTime);
|
||||
await timeout(2 * ProtocolConstants.TimeoutTime);
|
||||
assert.strictEqual(a.unacknowledgedCount, 0);
|
||||
assert.strictEqual(b.unacknowledgedCount, 0);
|
||||
assert.strictEqual(timeoutListenerCalled, false);
|
||||
|
@ -364,6 +364,59 @@ suite('PersistentProtocol reconnection', () => {
|
|||
}
|
||||
);
|
||||
});
|
||||
|
||||
test('writing can be paused', async () => {
|
||||
await runWithFakedTimers({ useFakeTimers: true, maxTaskCount: 100 }, async () => {
|
||||
const loadEstimator: ILoadEstimator = {
|
||||
hasHighLoad: () => false
|
||||
};
|
||||
const ether = new Ether();
|
||||
const aSocket = new NodeSocket(ether.a);
|
||||
const a = new PersistentProtocol(aSocket, null, loadEstimator);
|
||||
const aMessages = new MessageStream(a);
|
||||
const bSocket = new NodeSocket(ether.b);
|
||||
const b = new PersistentProtocol(bSocket, null, loadEstimator);
|
||||
const bMessages = new MessageStream(b);
|
||||
|
||||
// send one message A -> B
|
||||
a.send(VSBuffer.fromString('a1'));
|
||||
const a1 = await bMessages.waitForOne();
|
||||
assert.strictEqual(a1.toString(), 'a1');
|
||||
|
||||
// ask A to pause writing
|
||||
b.sendPause();
|
||||
|
||||
// send a message B -> A
|
||||
b.send(VSBuffer.fromString('b1'));
|
||||
const b1 = await aMessages.waitForOne();
|
||||
assert.strictEqual(b1.toString(), 'b1');
|
||||
|
||||
// send a message A -> B (this should be blocked at A)
|
||||
a.send(VSBuffer.fromString('a2'));
|
||||
|
||||
// wait a long time and check that not even acks are written
|
||||
await timeout(2 * ProtocolConstants.AcknowledgeTime);
|
||||
assert.strictEqual(a.unacknowledgedCount, 1);
|
||||
assert.strictEqual(b.unacknowledgedCount, 1);
|
||||
|
||||
// ask A to resume writing
|
||||
b.sendResume();
|
||||
|
||||
// check that B receives message
|
||||
const a2 = await bMessages.waitForOne();
|
||||
assert.strictEqual(a2.toString(), 'a2');
|
||||
|
||||
// wait a long time and check that acks are written
|
||||
await timeout(2 * ProtocolConstants.AcknowledgeTime);
|
||||
assert.strictEqual(a.unacknowledgedCount, 0);
|
||||
assert.strictEqual(b.unacknowledgedCount, 0);
|
||||
|
||||
aMessages.dispose();
|
||||
bMessages.dispose();
|
||||
a.dispose();
|
||||
b.dispose();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
suite('IPC, create handle', () => {
|
||||
|
@ -432,6 +485,9 @@ suite('WebSocketNodeSocket', () => {
|
|||
private readonly _onClose = new Emitter<SocketCloseEvent>();
|
||||
public readonly onClose = this._onClose.event;
|
||||
|
||||
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
|
||||
}
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
}
|
||||
|
@ -522,5 +578,14 @@ suite('WebSocketNodeSocket', () => {
|
|||
const actual = await testReading(frames, true);
|
||||
assert.deepStrictEqual(actual, 'Hello');
|
||||
});
|
||||
|
||||
test('A single-frame compressed text message followed by a single-frame non-compressed text message', async () => {
|
||||
const frames = [
|
||||
[0xc1, 0x07, 0xf2, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00], // contains "Hello"
|
||||
[0x81, 0x05, 0x77, 0x6f, 0x72, 0x6c, 0x64] // contains "world"
|
||||
];
|
||||
const actual = await testReading(frames, true);
|
||||
assert.deepStrictEqual(actual, 'Helloworld');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -8,12 +8,12 @@ import { RunOnceScheduler } from 'vs/base/common/async';
|
|||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
import { Emitter, Event } from 'vs/base/common/event';
|
||||
import { Disposable, IDisposable } from 'vs/base/common/lifecycle';
|
||||
import { ISocket, SocketCloseEvent, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
import { IConnectCallback, ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection';
|
||||
import { RemoteAuthorityResolverError, RemoteAuthorityResolverErrorCode } from 'vs/platform/remote/common/remoteAuthorityResolver';
|
||||
|
||||
export interface IWebSocketFactory {
|
||||
create(url: string): IWebSocket;
|
||||
create(url: string, debugLabel: string): IWebSocket;
|
||||
}
|
||||
|
||||
export interface IWebSocketCloseEvent {
|
||||
|
@ -41,6 +41,7 @@ export interface IWebSocket {
|
|||
readonly onClose: Event<IWebSocketCloseEvent | void>;
|
||||
readonly onError: Event<any>;
|
||||
|
||||
traceSocketEvent?(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;
|
||||
send(data: ArrayBuffer | ArrayBufferView): void;
|
||||
close(): void;
|
||||
}
|
||||
|
@ -50,7 +51,8 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
private readonly _onData = new Emitter<ArrayBuffer>();
|
||||
public readonly onData = this._onData.event;
|
||||
|
||||
public readonly onOpen: Event<void>;
|
||||
private readonly _onOpen = this._register(new Emitter<void>());
|
||||
public readonly onOpen = this._onOpen.event;
|
||||
|
||||
private readonly _onClose = this._register(new Emitter<IWebSocketCloseEvent>());
|
||||
public readonly onClose = this._onClose.event;
|
||||
|
@ -58,6 +60,7 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
private readonly _onError = this._register(new Emitter<any>());
|
||||
public readonly onError = this._onError.event;
|
||||
|
||||
private readonly _debugLabel: string;
|
||||
private readonly _socket: WebSocket;
|
||||
private readonly _fileReader: FileReader;
|
||||
private readonly _queue: Blob[];
|
||||
|
@ -66,9 +69,15 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
|
||||
private readonly _socketMessageListener: (ev: MessageEvent) => void;
|
||||
|
||||
constructor(socket: WebSocket) {
|
||||
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
|
||||
SocketDiagnostics.traceSocketEvent(this._socket, this._debugLabel, type, data);
|
||||
}
|
||||
|
||||
constructor(url: string, debugLabel: string) {
|
||||
super();
|
||||
this._socket = socket;
|
||||
this._debugLabel = debugLabel;
|
||||
this._socket = new WebSocket(url);
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'BrowserWebSocket', url });
|
||||
this._fileReader = new FileReader();
|
||||
this._queue = [];
|
||||
this._isReading = false;
|
||||
|
@ -78,6 +87,7 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
this._isReading = false;
|
||||
const buff = <ArrayBuffer>(<any>event.target).result;
|
||||
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Read, buff);
|
||||
this._onData.fire(buff);
|
||||
|
||||
if (this._queue.length > 0) {
|
||||
|
@ -95,11 +105,16 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
};
|
||||
|
||||
this._socketMessageListener = (ev: MessageEvent) => {
|
||||
enqueue(<Blob>ev.data);
|
||||
const blob = (<Blob>ev.data);
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.BrowserWebSocketBlobReceived, { type: blob.type, size: blob.size });
|
||||
enqueue(blob);
|
||||
};
|
||||
this._socket.addEventListener('message', this._socketMessageListener);
|
||||
|
||||
this.onOpen = Event.fromDOMEventEmitter(this._socket, 'open');
|
||||
this._register(dom.addDisposableListener(this._socket, 'open', (e) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Open);
|
||||
this._onOpen.fire();
|
||||
}));
|
||||
|
||||
// WebSockets emit error events that do not contain any real information
|
||||
// Our only chance of getting to the root cause of an error is to
|
||||
|
@ -134,6 +149,8 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
};
|
||||
|
||||
this._register(dom.addDisposableListener(this._socket, 'close', (e: CloseEvent) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Close, { code: e.code, reason: e.reason, wasClean: e.wasClean });
|
||||
|
||||
this._isClosed = true;
|
||||
|
||||
if (pendingErrorEvent) {
|
||||
|
@ -157,7 +174,10 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
this._onClose.fire({ code: e.code, reason: e.reason, wasClean: e.wasClean, event: e });
|
||||
}));
|
||||
|
||||
this._register(dom.addDisposableListener(this._socket, 'error', sendErrorSoon));
|
||||
this._register(dom.addDisposableListener(this._socket, 'error', (err) => {
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Error, { message: err?.message });
|
||||
sendErrorSoon(err);
|
||||
}));
|
||||
}
|
||||
|
||||
send(data: ArrayBuffer | ArrayBufferView): void {
|
||||
|
@ -165,11 +185,13 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
// Refuse to write data to closed WebSocket...
|
||||
return;
|
||||
}
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Write, data);
|
||||
this._socket.send(data);
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this._isClosed = true;
|
||||
this.traceSocketEvent(SocketDiagnosticsEventType.Close);
|
||||
this._socket.close();
|
||||
this._socket.removeEventListener('message', this._socketMessageListener);
|
||||
this.dispose();
|
||||
|
@ -177,16 +199,27 @@ class BrowserWebSocket extends Disposable implements IWebSocket {
|
|||
}
|
||||
|
||||
export const defaultWebSocketFactory = new class implements IWebSocketFactory {
|
||||
create(url: string): IWebSocket {
|
||||
return new BrowserWebSocket(new WebSocket(url));
|
||||
create(url: string, debugLabel: string): IWebSocket {
|
||||
return new BrowserWebSocket(url, debugLabel);
|
||||
}
|
||||
};
|
||||
|
||||
class BrowserSocket implements ISocket {
|
||||
public readonly socket: IWebSocket;
|
||||
|
||||
constructor(socket: IWebSocket) {
|
||||
public readonly socket: IWebSocket;
|
||||
public readonly debugLabel: string;
|
||||
|
||||
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
|
||||
if (typeof this.socket.traceSocketEvent === 'function') {
|
||||
this.socket.traceSocketEvent(type, data);
|
||||
} else {
|
||||
SocketDiagnostics.traceSocketEvent(this.socket, this.debugLabel, type, data);
|
||||
}
|
||||
}
|
||||
|
||||
constructor(socket: IWebSocket, debugLabel: string) {
|
||||
this.socket = socket;
|
||||
this.debugLabel = debugLabel;
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
|
@ -239,13 +272,13 @@ export class BrowserSocketFactory implements ISocketFactory {
|
|||
this._webSocketFactory = webSocketFactory || defaultWebSocketFactory;
|
||||
}
|
||||
|
||||
connect(host: string, port: number, query: string, callback: IConnectCallback): void {
|
||||
connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void {
|
||||
const webSocketSchema = (/^https:/.test(window.location.href) ? 'wss' : 'ws');
|
||||
const socket = this._webSocketFactory.create(`${webSocketSchema}://${/:/.test(host) ? `[${host}]` : host}:${port}/?${query}&skipWebSocketFrames=false`);
|
||||
const socket = this._webSocketFactory.create(`${webSocketSchema}://${/:/.test(host) ? `[${host}]` : host}:${port}/?${query}&skipWebSocketFrames=false`, debugLabel);
|
||||
const errorListener = socket.onError((err) => callback(err, undefined));
|
||||
socket.onOpen(() => {
|
||||
errorListener.dispose();
|
||||
callback(undefined, new BrowserSocket(socket));
|
||||
callback(undefined, new BrowserSocket(socket, debugLabel));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ export interface IConnectCallback {
|
|||
}
|
||||
|
||||
export interface ISocketFactory {
|
||||
connect(host: string, port: number, query: string, callback: IConnectCallback): void;
|
||||
connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void;
|
||||
}
|
||||
|
||||
function createTimeoutCancellation(millis: number): CancellationToken {
|
||||
|
@ -188,9 +188,9 @@ function readOneControlMessage<T>(protocol: PersistentProtocol, timeoutCancellat
|
|||
return result.promise;
|
||||
}
|
||||
|
||||
function createSocket(logService: ILogService, socketFactory: ISocketFactory, host: string, port: number, query: string, timeoutCancellationToken: CancellationToken): Promise<ISocket> {
|
||||
function createSocket(logService: ILogService, socketFactory: ISocketFactory, host: string, port: number, query: string, debugLabel: string, timeoutCancellationToken: CancellationToken): Promise<ISocket> {
|
||||
const result = new PromiseWithTimeout<ISocket>(timeoutCancellationToken);
|
||||
socketFactory.connect(host, port, query, (err: any, socket: ISocket | undefined) => {
|
||||
socketFactory.connect(host, port, query, debugLabel, (err: any, socket: ISocket | undefined) => {
|
||||
if (result.didTimeout) {
|
||||
if (err) {
|
||||
logService.error(err);
|
||||
|
@ -231,7 +231,7 @@ async function connectToRemoteExtensionHostAgent(options: ISimpleConnectionOptio
|
|||
|
||||
let socket: ISocket;
|
||||
try {
|
||||
socket = await createSocket(options.logService, options.socketFactory, options.host, options.port, `reconnectionToken=${options.reconnectionToken}&reconnection=${options.reconnectionProtocol ? 'true' : 'false'}`, timeoutCancellationToken);
|
||||
socket = await createSocket(options.logService, options.socketFactory, options.host, options.port, `reconnectionToken=${options.reconnectionToken}&reconnection=${options.reconnectionProtocol ? 'true' : 'false'}`, `renderer-${connectionTypeToString(connectionType)}-${options.reconnectionToken}`, timeoutCancellationToken);
|
||||
} catch (error) {
|
||||
options.logService.error(`${logPrefix} socketFactory.connect() failed or timed out. Error:`);
|
||||
options.logService.error(error);
|
||||
|
@ -512,7 +512,7 @@ export class ReconnectionPermanentFailureEvent {
|
|||
}
|
||||
export type PersistentConnectionEvent = ConnectionGainEvent | ConnectionLostEvent | ReconnectionWaitEvent | ReconnectionRunningEvent | ReconnectionPermanentFailureEvent;
|
||||
|
||||
abstract class PersistentConnection extends Disposable {
|
||||
export abstract class PersistentConnection extends Disposable {
|
||||
|
||||
public static triggerPermanentFailure(millisSinceLastIncomingData: number, attempt: number, handled: boolean): void {
|
||||
this._permanentFailure = true;
|
||||
|
@ -521,6 +521,15 @@ abstract class PersistentConnection extends Disposable {
|
|||
this._permanentFailureHandled = handled;
|
||||
this._instances.forEach(instance => instance._gotoPermanentFailure(this._permanentFailureMillisSinceLastIncomingData, this._permanentFailureAttempt, this._permanentFailureHandled));
|
||||
}
|
||||
|
||||
public static debugTriggerReconnection() {
|
||||
this._instances.forEach(instance => instance._beginReconnecting());
|
||||
}
|
||||
|
||||
public static debugPauseSocketWriting() {
|
||||
this._instances.forEach(instance => instance._pauseSocketWriting());
|
||||
}
|
||||
|
||||
private static _permanentFailure: boolean = false;
|
||||
private static _permanentFailureMillisSinceLastIncomingData: number = 0;
|
||||
private static _permanentFailureAttempt: number = 0;
|
||||
|
@ -678,6 +687,10 @@ abstract class PersistentConnection extends Disposable {
|
|||
safeDisposeProtocolAndSocket(this.protocol);
|
||||
}
|
||||
|
||||
private _pauseSocketWriting(): void {
|
||||
this.protocol.pauseSocketWriting();
|
||||
}
|
||||
|
||||
protected abstract _reconnect(options: ISimpleConnectionOptions, timeoutCancellationToken: CancellationToken): Promise<void>;
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import { NodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
|
|||
import { IConnectCallback, ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection';
|
||||
|
||||
export const nodeSocketFactory = new class implements ISocketFactory {
|
||||
connect(host: string, port: number, query: string, callback: IConnectCallback): void {
|
||||
connect(host: string, port: number, query: string, debugLabel: string, callback: IConnectCallback): void {
|
||||
const errorListener = (err: any) => callback(err, undefined);
|
||||
|
||||
const socket = net.createConnection({ host: host, port: port }, () => {
|
||||
|
@ -34,7 +34,7 @@ export const nodeSocketFactory = new class implements ISocketFactory {
|
|||
if (strData.indexOf('\r\n\r\n') >= 0) {
|
||||
// headers received OK
|
||||
socket.off('data', onData);
|
||||
callback(undefined, new NodeSocket(socket));
|
||||
callback(undefined, new NodeSocket(socket, debugLabel));
|
||||
}
|
||||
};
|
||||
socket.on('data', onData);
|
||||
|
|
|
@ -110,7 +110,6 @@ export class ExtensionHostConnection {
|
|||
this._remoteAddress = remoteAddress;
|
||||
this._extensionHostProcess = null;
|
||||
this._connectionData = ExtensionHostConnection._toConnectionData(socket, initialDataChunk);
|
||||
this._connectionData.socket.pause();
|
||||
|
||||
this._log(`New connection established.`);
|
||||
}
|
||||
|
@ -156,7 +155,6 @@ export class ExtensionHostConnection {
|
|||
this._remoteAddress = remoteAddress;
|
||||
this._log(`The client has reconnected.`);
|
||||
const connectionData = ExtensionHostConnection._toConnectionData(_socket, initialDataChunk);
|
||||
connectionData.socket.pause();
|
||||
|
||||
if (!this._extensionHostProcess) {
|
||||
// The extension host didn't even start up yet
|
||||
|
|
|
@ -497,9 +497,9 @@ export class RemoteExtensionHostAgentServer extends Disposable {
|
|||
// Finally!
|
||||
|
||||
if (skipWebSocketFrames) {
|
||||
this._handleWebSocketConnection(new NodeSocket(socket), isReconnection, reconnectionToken);
|
||||
this._handleWebSocketConnection(new NodeSocket(socket, `server-connection-${reconnectionToken}`), isReconnection, reconnectionToken);
|
||||
} else {
|
||||
this._handleWebSocketConnection(new WebSocketNodeSocket(new NodeSocket(socket), permessageDeflate, null, true), isReconnection, reconnectionToken);
|
||||
this._handleWebSocketConnection(new WebSocketNodeSocket(new NodeSocket(socket, `server-connection-${reconnectionToken}`), permessageDeflate, null, true), isReconnection, reconnectionToken);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -754,6 +754,7 @@ export class RemoteExtensionHostAgentServer extends Disposable {
|
|||
}
|
||||
}
|
||||
|
||||
protocol.sendPause();
|
||||
protocol.sendControl(VSBuffer.fromString(JSON.stringify(startParams.port ? { debugPort: startParams.port } : {})));
|
||||
const dataChunk = protocol.readEntireBuffer();
|
||||
protocol.dispose();
|
||||
|
@ -766,6 +767,7 @@ export class RemoteExtensionHostAgentServer extends Disposable {
|
|||
return this._rejectWebSocketConnection(logPrefix, protocol, `Duplicate reconnection token`);
|
||||
}
|
||||
|
||||
protocol.sendPause();
|
||||
protocol.sendControl(VSBuffer.fromString(JSON.stringify(startParams.port ? { debugPort: startParams.port } : {})));
|
||||
const dataChunk = protocol.readEntireBuffer();
|
||||
protocol.dispose();
|
||||
|
|
|
@ -25,6 +25,10 @@ import { IDialogService, IFileDialogService } from 'vs/platform/dialogs/common/d
|
|||
import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService';
|
||||
import { IWorkspaceContextService } from 'vs/platform/workspace/common/workspace';
|
||||
import { firstOrDefault } from 'vs/base/common/arrays';
|
||||
import { ServicesAccessor } from 'vs/platform/instantiation/common/instantiation';
|
||||
import { Action2, registerAction2 } from 'vs/platform/actions/common/actions';
|
||||
import { CATEGORIES } from 'vs/workbench/common/actions';
|
||||
import { PersistentConnection } from 'vs/platform/remote/common/remoteAgentConnection';
|
||||
|
||||
export class LabelContribution implements IWorkbenchContribution {
|
||||
constructor(
|
||||
|
@ -161,6 +165,43 @@ workbenchContributionsRegistry.registerWorkbenchContribution(RemoteLogOutputChan
|
|||
workbenchContributionsRegistry.registerWorkbenchContribution(TunnelFactoryContribution, LifecyclePhase.Ready);
|
||||
workbenchContributionsRegistry.registerWorkbenchContribution(ShowCandidateContribution, LifecyclePhase.Ready);
|
||||
|
||||
const enableDiagnostics = true;
|
||||
|
||||
if (enableDiagnostics) {
|
||||
class TriggerReconnectAction extends Action2 {
|
||||
constructor() {
|
||||
super({
|
||||
id: 'workbench.action.triggerReconnect',
|
||||
title: { value: localize('triggerReconnect', "Connection: Trigger Reconnect"), original: 'Connection: Trigger Reconnect' },
|
||||
category: CATEGORIES.Developer,
|
||||
f1: true,
|
||||
});
|
||||
}
|
||||
|
||||
async run(accessor: ServicesAccessor): Promise<void> {
|
||||
PersistentConnection.debugTriggerReconnection();
|
||||
}
|
||||
}
|
||||
|
||||
class PauseSocketWriting extends Action2 {
|
||||
constructor() {
|
||||
super({
|
||||
id: 'workbench.action.pauseSocketWriting',
|
||||
title: { value: localize('pauseSocketWriting', "Connection: Pause socket writing"), original: 'Connection: Pause socket writing' },
|
||||
category: CATEGORIES.Developer,
|
||||
f1: true,
|
||||
});
|
||||
}
|
||||
|
||||
async run(accessor: ServicesAccessor): Promise<void> {
|
||||
PersistentConnection.debugPauseSocketWriting();
|
||||
}
|
||||
}
|
||||
|
||||
registerAction2(TriggerReconnectAction);
|
||||
registerAction2(PauseSocketWriting);
|
||||
}
|
||||
|
||||
const extensionKindSchema: IJSONSchema = {
|
||||
type: 'string',
|
||||
enum: [
|
||||
|
|
|
@ -450,7 +450,7 @@ export class LocalProcessExtensionHost implements IExtensionHost {
|
|||
// using a buffered message protocol here because between now
|
||||
// and the first time a `then` executes some messages might be lost
|
||||
// unless we immediately register a listener for `onMessage`.
|
||||
resolve(new PersistentProtocol(new NodeSocket(this._extensionHostConnection)));
|
||||
resolve(new PersistentProtocol(new NodeSocket(this._extensionHostConnection, 'renderer-exthost')));
|
||||
});
|
||||
|
||||
// Now that the named pipe listener is installed, start the ext host process
|
||||
|
|
|
@ -123,10 +123,10 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
|
|||
const initialDataChunk = VSBuffer.wrap(Buffer.from(msg.initialDataChunk, 'base64'));
|
||||
let socket: NodeSocket | WebSocketNodeSocket;
|
||||
if (msg.skipWebSocketFrames) {
|
||||
socket = new NodeSocket(handle);
|
||||
socket = new NodeSocket(handle, 'extHost-socket');
|
||||
} else {
|
||||
const inflateBytes = VSBuffer.wrap(Buffer.from(msg.inflateBytes, 'base64'));
|
||||
socket = new WebSocketNodeSocket(new NodeSocket(handle), msg.permessageDeflate, inflateBytes, false);
|
||||
socket = new WebSocketNodeSocket(new NodeSocket(handle, 'extHost-socket'), msg.permessageDeflate, inflateBytes, false);
|
||||
}
|
||||
if (protocol) {
|
||||
// reconnection case
|
||||
|
@ -134,9 +134,11 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
|
|||
disconnectRunner2.cancel();
|
||||
protocol.beginAcceptReconnection(socket, initialDataChunk);
|
||||
protocol.endAcceptReconnection();
|
||||
protocol.sendResume();
|
||||
} else {
|
||||
clearTimeout(timer);
|
||||
protocol = new PersistentProtocol(socket, initialDataChunk);
|
||||
protocol.sendResume();
|
||||
protocol.onDidDispose(() => onTerminate('renderer disconnected'));
|
||||
resolve(protocol);
|
||||
|
||||
|
@ -174,7 +176,7 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
|
|||
|
||||
const socket = net.createConnection(pipeName, () => {
|
||||
socket.removeListener('error', reject);
|
||||
resolve(new PersistentProtocol(new NodeSocket(socket)));
|
||||
resolve(new PersistentProtocol(new NodeSocket(socket, 'extHost-renderer')));
|
||||
});
|
||||
socket.once('error', reject);
|
||||
|
||||
|
|
Loading…
Reference in a new issue