diff --git a/src/vs/base/parts/ipc/common/ipc.net.ts b/src/vs/base/parts/ipc/common/ipc.net.ts index 5ff8d7bcf40..6b72f2f2dad 100644 --- a/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/src/vs/base/parts/ipc/common/ipc.net.ts @@ -253,7 +253,6 @@ const enum ProtocolMessageType { Regular = 1, Control = 2, Ack = 3, - KeepAlive = 4, Disconnect = 5, ReplayRequest = 6 } @@ -264,7 +263,6 @@ function protocolMessageTypeToString(messageType: ProtocolMessageType) { case ProtocolMessageType.Regular: return 'Regular'; case ProtocolMessageType.Control: return 'Control'; case ProtocolMessageType.Ack: return 'Ack'; - case ProtocolMessageType.KeepAlive: return 'KeepAlive'; case ProtocolMessageType.Disconnect: return 'Disconnect'; case ProtocolMessageType.ReplayRequest: return 'ReplayRequest'; } @@ -277,17 +275,11 @@ export const enum ProtocolConstants { */ AcknowledgeTime = 2000, // 2 seconds /** - * If there is a message that has been unacknowledged for 10 seconds, consider the connection closed... + * If there is a sent message that has been unacknowledged for 20 seconds, + * and we didn't see any incoming server data in the past 20 seconds, + * then consider the connection has timed out. */ - AcknowledgeTimeoutTime = 20000, // 20 seconds - /** - * Send at least a message every 5s for keep alive reasons. - */ - KeepAliveTime = 5000, // 5 seconds - /** - * If there is no message received for 10 seconds, consider the connection closed... - */ - KeepAliveTimeoutTime = 20000, // 20 seconds + TimeoutTime = 20000, // 20 seconds /** * If there is no reconnection within this time-frame, consider the connection permanently closed... */ @@ -406,6 +398,7 @@ class ProtocolReader extends Disposable { class ProtocolWriter { private _isDisposed: boolean; + private _isPaused: boolean; private readonly _socket: ISocket; private _data: VSBuffer[]; private _totalLength: number; @@ -413,6 +406,7 @@ class ProtocolWriter { constructor(socket: ISocket) { this._isDisposed = false; + this._isPaused = false; this._socket = socket; this._data = []; this._totalLength = 0; @@ -438,6 +432,10 @@ class ProtocolWriter { this._writeNow(); } + public pause() { + this._isPaused = true; + } + public write(msg: ProtocolMessage) { if (this._isDisposed) { // ignore: there could be left-over promises which complete and then @@ -484,6 +482,9 @@ class ProtocolWriter { if (this._totalLength === 0) { return; } + if (this._isPaused) { + return; + } const data = this._bufferTake(); this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolWrite, { byteLength: data.byteLength }); this._socket.write(data); @@ -758,9 +759,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { private _incomingMsgLastTime: number; private _incomingAckTimeout: any | null; - private _outgoingKeepAliveTimeout: any | null; - private _incomingKeepAliveTimeout: any | null; - private _lastReplayRequestTime: number; private _socket: ISocket; @@ -802,9 +800,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._incomingMsgLastTime = 0; this._incomingAckTimeout = null; - this._outgoingKeepAliveTimeout = null; - this._incomingKeepAliveTimeout = null; - this._lastReplayRequestTime = 0; this._socketDisposables = []; @@ -818,9 +813,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { if (initialChunk) { this._socketReader.acceptChunk(initialChunk); } - - this._sendKeepAliveCheck(); - this._recvKeepAliveCheck(); } dispose(): void { @@ -832,14 +824,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { clearTimeout(this._incomingAckTimeout); this._incomingAckTimeout = null; } - if (this._outgoingKeepAliveTimeout) { - clearTimeout(this._outgoingKeepAliveTimeout); - this._outgoingKeepAliveTimeout = null; - } - if (this._incomingKeepAliveTimeout) { - clearTimeout(this._incomingKeepAliveTimeout); - this._incomingKeepAliveTimeout = null; - } this._socketDisposables = dispose(this._socketDisposables); } @@ -853,50 +837,8 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._socketWriter.flush(); } - private _sendKeepAliveCheck(): void { - if (this._outgoingKeepAliveTimeout) { - // there will be a check in the near future - return; - } - - const timeSinceLastOutgoingMsg = Date.now() - this._socketWriter.lastWriteTime; - if (timeSinceLastOutgoingMsg >= ProtocolConstants.KeepAliveTime) { - // sufficient time has passed since last message was written, - // and no message from our side needed to be sent in the meantime, - // so we will send a message containing only a keep alive. - const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, 0, getEmptyBuffer()); - this._socketWriter.write(msg); - this._sendKeepAliveCheck(); - return; - } - - this._outgoingKeepAliveTimeout = setTimeout(() => { - this._outgoingKeepAliveTimeout = null; - this._sendKeepAliveCheck(); - }, ProtocolConstants.KeepAliveTime - timeSinceLastOutgoingMsg + 5); - } - - private _recvKeepAliveCheck(): void { - if (this._incomingKeepAliveTimeout) { - // there will be a check in the near future - return; - } - - const timeSinceLastIncomingMsg = Date.now() - this._socketReader.lastReadTime; - if (timeSinceLastIncomingMsg >= ProtocolConstants.KeepAliveTimeoutTime) { - // It's been a long time since we received a server message - // But this might be caused by the event loop being busy and failing to read messages - if (!this._loadEstimator.hasHighLoad()) { - // Trash the socket - this._onSocketTimeout.fire(undefined); - return; - } - } - - this._incomingKeepAliveTimeout = setTimeout(() => { - this._incomingKeepAliveTimeout = null; - this._recvKeepAliveCheck(); - }, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5); + pauseSocketWriting() { + this._socketWriter.pause(); } public getSocket(): ISocket { @@ -937,9 +879,6 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._socketWriter.write(toSend[i]); } this._recvAckCheck(); - - this._sendKeepAliveCheck(); - this._recvKeepAliveCheck(); } public acceptDisconnect(): void { @@ -1064,8 +1003,15 @@ export class PersistentProtocol implements IMessagePassingProtocol { const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!; const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime; - if (timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.AcknowledgeTimeoutTime) { + const timeSinceLastReceivedSomeData = Date.now() - this._socketReader.lastReadTime; + + if ( + timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.TimeoutTime + && timeSinceLastReceivedSomeData >= ProtocolConstants.TimeoutTime + ) { // It's been a long time since our sent message was acknowledged + // and a long time since we received some data + // But this might be caused by the event loop being busy and failing to read messages if (!this._loadEstimator.hasHighLoad()) { // Trash the socket @@ -1077,7 +1023,7 @@ export class PersistentProtocol implements IMessagePassingProtocol { this._outgoingAckTimeout = setTimeout(() => { this._outgoingAckTimeout = null; this._recvAckCheck(); - }, Math.max(ProtocolConstants.AcknowledgeTimeoutTime - timeSinceOldestUnacknowledgedMsg, 0) + 5); + }, Math.max(ProtocolConstants.TimeoutTime - timeSinceOldestUnacknowledgedMsg, 500)); } private _sendAck(): void { diff --git a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts index 042bfcbabfe..b8f3c3d2ebe 100644 --- a/src/vs/base/parts/ipc/test/node/ipc.net.test.ts +++ b/src/vs/base/parts/ipc/test/node/ipc.net.test.ts @@ -342,7 +342,7 @@ suite('PersistentProtocol reconnection', () => { assert.strictEqual(b.unacknowledgedCount, 1); // wait for scheduled _recvAckCheck() to execute - await timeout(2 * ProtocolConstants.AcknowledgeTimeoutTime); + await timeout(2 * ProtocolConstants.TimeoutTime); assert.strictEqual(a.unacknowledgedCount, 1); assert.strictEqual(b.unacknowledgedCount, 1); @@ -351,7 +351,7 @@ suite('PersistentProtocol reconnection', () => { a.endAcceptReconnection(); assert.strictEqual(timeoutListenerCalled, false); - await timeout(2 * ProtocolConstants.AcknowledgeTimeoutTime); + await timeout(2 * ProtocolConstants.TimeoutTime); assert.strictEqual(a.unacknowledgedCount, 0); assert.strictEqual(b.unacknowledgedCount, 0); assert.strictEqual(timeoutListenerCalled, false); diff --git a/src/vs/platform/remote/common/remoteAgentConnection.ts b/src/vs/platform/remote/common/remoteAgentConnection.ts index b555abd3694..7729501248f 100644 --- a/src/vs/platform/remote/common/remoteAgentConnection.ts +++ b/src/vs/platform/remote/common/remoteAgentConnection.ts @@ -512,7 +512,7 @@ export class ReconnectionPermanentFailureEvent { } export type PersistentConnectionEvent = ConnectionGainEvent | ConnectionLostEvent | ReconnectionWaitEvent | ReconnectionRunningEvent | ReconnectionPermanentFailureEvent; -abstract class PersistentConnection extends Disposable { +export abstract class PersistentConnection extends Disposable { public static triggerPermanentFailure(millisSinceLastIncomingData: number, attempt: number, handled: boolean): void { this._permanentFailure = true; @@ -521,6 +521,15 @@ abstract class PersistentConnection extends Disposable { this._permanentFailureHandled = handled; this._instances.forEach(instance => instance._gotoPermanentFailure(this._permanentFailureMillisSinceLastIncomingData, this._permanentFailureAttempt, this._permanentFailureHandled)); } + + public static debugTriggerReconnection() { + this._instances.forEach(instance => instance._beginReconnecting()); + } + + public static debugPauseSocketWriting() { + this._instances.forEach(instance => instance._pauseSocketWriting()); + } + private static _permanentFailure: boolean = false; private static _permanentFailureMillisSinceLastIncomingData: number = 0; private static _permanentFailureAttempt: number = 0; @@ -678,6 +687,10 @@ abstract class PersistentConnection extends Disposable { safeDisposeProtocolAndSocket(this.protocol); } + private _pauseSocketWriting(): void { + this.protocol.pauseSocketWriting(); + } + protected abstract _reconnect(options: ISimpleConnectionOptions, timeoutCancellationToken: CancellationToken): Promise; } diff --git a/src/vs/workbench/contrib/remote/common/remote.contribution.ts b/src/vs/workbench/contrib/remote/common/remote.contribution.ts index a75b4c1198c..ad750d87ffd 100644 --- a/src/vs/workbench/contrib/remote/common/remote.contribution.ts +++ b/src/vs/workbench/contrib/remote/common/remote.contribution.ts @@ -25,6 +25,10 @@ import { IDialogService, IFileDialogService } from 'vs/platform/dialogs/common/d import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService'; import { IWorkspaceContextService } from 'vs/platform/workspace/common/workspace'; import { firstOrDefault } from 'vs/base/common/arrays'; +import { ServicesAccessor } from 'vs/platform/instantiation/common/instantiation'; +import { Action2, registerAction2 } from 'vs/platform/actions/common/actions'; +import { CATEGORIES } from 'vs/workbench/common/actions'; +import { PersistentConnection } from 'vs/platform/remote/common/remoteAgentConnection'; export class LabelContribution implements IWorkbenchContribution { constructor( @@ -161,6 +165,43 @@ workbenchContributionsRegistry.registerWorkbenchContribution(RemoteLogOutputChan workbenchContributionsRegistry.registerWorkbenchContribution(TunnelFactoryContribution, LifecyclePhase.Ready); workbenchContributionsRegistry.registerWorkbenchContribution(ShowCandidateContribution, LifecyclePhase.Ready); +const enableDiagnostics = false; + +if (enableDiagnostics) { + class TriggerReconnectAction extends Action2 { + constructor() { + super({ + id: 'workbench.action.triggerReconnect', + title: { value: localize('triggerReconnect', "Connection: Trigger Reconnect"), original: 'Connection: Trigger Reconnect' }, + category: CATEGORIES.Developer, + f1: true, + }); + } + + async run(accessor: ServicesAccessor): Promise { + PersistentConnection.debugTriggerReconnection(); + } + } + + class PauseSocketWriting extends Action2 { + constructor() { + super({ + id: 'workbench.action.pauseSocketWriting', + title: { value: localize('pauseSocketWriting', "Connection: Pause socket writing"), original: 'Connection: Pause socket writing' }, + category: CATEGORIES.Developer, + f1: true, + }); + } + + async run(accessor: ServicesAccessor): Promise { + PersistentConnection.debugPauseSocketWriting(); + } + } + + registerAction2(TriggerReconnectAction); + registerAction2(PauseSocketWriting); +} + const extensionKindSchema: IJSONSchema = { type: 'string', enum: [