Add RequestStore helper, adopt in ptyhost and add timeout support (#128793)

* Create helper class for event with reply pattern in terminal

Part of #128786

* Adopt RequestStore in pty host

* Support timeout in RequestStore

Fixes #128786

* Docs

* Move request store to common

* Add request store test

* Undo bad changes to unrelated files
This commit is contained in:
Daniel Imms 2021-07-16 08:41:05 -07:00 committed by GitHub
parent b27a33cc43
commit 51bbf51f0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 146 additions and 48 deletions

View file

@ -0,0 +1,68 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { timeout } from 'vs/base/common/async';
import { CancellationTokenSource } from 'vs/base/common/cancellation';
import { Emitter } from 'vs/base/common/event';
import { Disposable, dispose, toDisposable, IDisposable } from 'vs/base/common/lifecycle';
import { ILogService } from 'vs/platform/log/common/log';
/**
* A helper class to track requests that have replies. Using this it's easy to implement an event
* that accepts a reply.
*/
export class RequestStore<T, RequestArgs> extends Disposable {
private _lastRequestId = 0;
private readonly _timeout: number;
private _pendingRequests: Map<number, (resolved: T) => void> = new Map();
private _pendingRequestDisposables: Map<number, IDisposable[]> = new Map();
private readonly _onCreateRequest = this._register(new Emitter<RequestArgs & { requestId: number }>());
readonly onCreateRequest = this._onCreateRequest.event;
/**
* @param timeout How long in ms to allow requests to go unanswered for, undefined will use the
* default (15 seconds).
*/
constructor(
timeout: number | undefined,
@ILogService private readonly _logService: ILogService
) {
super();
this._timeout = timeout === undefined ? 15000 : timeout;
}
/**
* Creates a request.
* @param args The arguments to pass to the onCreateRequest event.
*/
createRequest(args: RequestArgs): Promise<T> {
return new Promise<T>((resolve, reject) => {
const requestId = ++this._lastRequestId;
this._pendingRequests.set(requestId, resolve);
this._onCreateRequest.fire({ requestId, ...args });
const tokenSource = new CancellationTokenSource();
timeout(this._timeout, tokenSource.token).then(() => reject(`Request ${requestId} timed out (${this._timeout}ms)`));
this._pendingRequestDisposables.set(requestId, [toDisposable(() => tokenSource.cancel())]);
});
}
/**
* Accept a reply to a request.
* @param requestId The request ID originating from the onCreateRequest event.
* @param data The reply data.
*/
acceptReply(requestId: number, data: T) {
const resolveRequest = this._pendingRequests.get(requestId);
if (resolveRequest) {
this._pendingRequests.delete(requestId);
dispose(this._pendingRequestDisposables.get(requestId) || []);
this._pendingRequestDisposables.delete(requestId);
resolveRequest(data);
} else {
this._logService.warn(`RequestStore#acceptReply was called without receiving a matching request ${requestId}`);
}
}
}

View file

@ -191,7 +191,7 @@ export interface IPtyService {
restartPtyHost?(): Promise<void>;
shutdownAll?(): Promise<void>;
acceptPtyHostResolvedVariables?(id: number, resolved: string[]): Promise<void>;
acceptPtyHostResolvedVariables?(requestId: number, resolved: string[]): Promise<void>;
createProcess(
shellLaunchConfig: IShellLaunchConfig,
@ -234,11 +234,11 @@ export interface IPtyService {
getTerminalLayoutInfo(args: IGetTerminalLayoutInfoArgs): Promise<ITerminalsLayoutInfo | undefined>;
reduceConnectionGraceTime(): Promise<void>;
requestDetachInstance(workspaceId: string, instanceId: number): Promise<IProcessDetails | undefined>;
acceptDetachedInstance(requestId: number, process: number): Promise<IProcessDetails | undefined>;
acceptDetachedInstance(requestId: number, process: number): Promise<void>;
}
export interface IRequestResolveVariablesEvent {
id: number;
requestId: number;
workspaceId: string;
originalText: string[];
}

View file

@ -17,6 +17,7 @@ import { ITelemetryService } from 'vs/platform/telemetry/common/telemetry';
import { detectAvailableProfiles } from 'vs/platform/terminal/node/terminalProfiles';
import { IConfigurationService } from 'vs/platform/configuration/common/configuration';
import { registerTerminalPlatformConfiguration } from 'vs/platform/terminal/common/terminalPlatformConfiguration';
import { RequestStore } from 'vs/platform/terminal/common/requestStore';
enum Constants {
MaxRestarts = 5
@ -28,8 +29,6 @@ enum Constants {
*/
let lastPtyId = 0;
let lastResolveVariablesRequestId = 0;
/**
* This service implements IPtyService by launching a pty host process, forwarding messages to and
* from the pty host process and manages the connection.
@ -41,6 +40,7 @@ export class PtyHostService extends Disposable implements IPtyService {
// ProxyChannel is not used here because events get lost when forwarding across multiple proxies
private _proxy: IPtyService;
private readonly _resolveVariablesRequestStore: RequestStore<string[], { workspaceId: string, originalText: string[] }>;
private _restartCount = 0;
private _isResponsive = true;
private _isDisposed = false;
@ -96,6 +96,9 @@ export class PtyHostService extends Disposable implements IPtyService {
this._register(toDisposable(() => this._disposePtyHost()));
this._resolveVariablesRequestStore = this._register(new RequestStore(undefined, this._logService));
this._resolveVariablesRequestStore.onCreateRequest(this._onPtyHostRequestResolveVariables.fire, this._onPtyHostRequestResolveVariables);
[this._client, this._proxy] = this._startPtyHost();
}
@ -249,7 +252,7 @@ export class PtyHostService extends Disposable implements IPtyService {
return this._proxy.requestDetachInstance(workspaceId, instanceId);
}
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<IProcessDetails | undefined> {
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<void> {
return this._proxy.acceptDetachedInstance(requestId, persistentProcessId);
}
@ -323,21 +326,10 @@ export class PtyHostService extends Disposable implements IPtyService {
}
}
private _pendingResolveVariablesRequests: Map<number, (resolved: string[]) => void> = new Map();
private _resolveVariables(workspaceId: string, text: string[]): Promise<string[]> {
return new Promise<string[]>(resolve => {
const id = ++lastResolveVariablesRequestId;
this._pendingResolveVariablesRequests.set(id, resolve);
this._onPtyHostRequestResolveVariables.fire({ id, workspaceId, originalText: text });
});
return this._resolveVariablesRequestStore.createRequest({ workspaceId, originalText: text });
}
async acceptPtyHostResolvedVariables(id: number, resolved: string[]) {
const request = this._pendingResolveVariablesRequests.get(id);
if (request) {
request(resolved);
this._pendingResolveVariablesRequests.delete(id);
} else {
this._logService.warn(`Resolved variables received without matching request ${id}`);
}
async acceptPtyHostResolvedVariables(requestId: number, resolved: string[]) {
this._resolveVariablesRequestStore.acceptReply(requestId, resolved);
}
}

View file

@ -18,15 +18,16 @@ import { getWindowsBuildNumber } from 'vs/platform/terminal/node/terminalEnviron
import { execFile } from 'child_process';
import { escapeNonWindowsPath } from 'vs/platform/terminal/common/terminalEnvironment';
import { URI } from 'vs/base/common/uri';
import { RequestStore } from 'vs/platform/terminal/common/requestStore';
type WorkspaceId = string;
let lastResolvedInstanceRequestId = 0;
export class PtyService extends Disposable implements IPtyService {
declare readonly _serviceBrand: undefined;
private readonly _ptys: Map<number, PersistentTerminalProcess> = new Map();
private readonly _workspaceLayoutInfos = new Map<WorkspaceId, ISetTerminalLayoutInfoArgs>();
private readonly _detachInstanceRequestStore: RequestStore<IProcessDetails | undefined, { workspaceId: string, instanceId: number }>;
private readonly _onHeartbeat = this._register(new Emitter<void>());
readonly onHeartbeat = this._onHeartbeat.event;
@ -67,29 +68,22 @@ export class PtyService extends Disposable implements IPtyService {
}
this._ptys.clear();
}));
this._detachInstanceRequestStore = this._register(new RequestStore(undefined, this._logService));
this._detachInstanceRequestStore.onCreateRequest(this._onDidRequestDetach.fire, this._onDidRequestDetach);
}
private _pendingDetachInstanceRequests: Map<number, (resolved: IProcessDetails | undefined) => void> = new Map();
async requestDetachInstance(workspaceId: string, instanceId: number): Promise<IProcessDetails | undefined> {
return new Promise<IProcessDetails | undefined>(resolve => {
const requestId = ++lastResolvedInstanceRequestId;
this._pendingDetachInstanceRequests.set(requestId, resolve);
this._onDidRequestDetach.fire({ requestId, workspaceId, instanceId });
});
return this._detachInstanceRequestStore.createRequest({ workspaceId, instanceId });
}
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<IProcessDetails | undefined> {
const request = this._pendingDetachInstanceRequests.get(requestId);
if (request) {
this._pendingDetachInstanceRequests.delete(requestId);
const pty = this._throwIfNoPty(persistentProcessId);
const process = await this._buildProcessDetails(persistentProcessId, pty);
request(process);
return process;
} else {
this._logService.warn(`Accept detached instance was called without receiving a matching request ${requestId} for process with ID: ${persistentProcessId}`);
return undefined;
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<void> {
let processDetails: IProcessDetails | undefined = undefined;
const pty = this._ptys.get(persistentProcessId);
if (pty) {
processDetails = await this._buildProcessDetails(persistentProcessId, pty);
}
this._detachInstanceRequestStore.acceptReply(requestId, processDetails);
}
async shutdownAll(): Promise<void> {

View file

@ -0,0 +1,44 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { fail, strictEqual } from 'assert';
import { TestInstantiationService } from 'vs/platform/instantiation/test/common/instantiationServiceMock';
import { ConsoleLogger, ILogService, LogService } from 'vs/platform/log/common/log';
import { RequestStore } from 'vs/platform/terminal/common/requestStore';
suite('RequestStore', () => {
let instantiationService: TestInstantiationService;
setup(() => {
instantiationService = new TestInstantiationService();
instantiationService.stub(ILogService, new LogService(new ConsoleLogger()));
});
test('should resolve requests', async () => {
const store: RequestStore<{ data: string }, { arg: string }> = instantiationService.createInstance(RequestStore, undefined);
let eventArgs: { requestId: number, arg: string } | undefined;
store.onCreateRequest(e => eventArgs = e);
const request = store.createRequest({ arg: 'foo' });
strictEqual(typeof eventArgs?.requestId, 'number');
strictEqual(eventArgs?.arg, 'foo');
store.acceptReply(eventArgs!.requestId, { data: 'bar' });
const result = await request;
strictEqual(result.data, 'bar');
});
test('should reject the promise when the request times out', async () => {
const store: RequestStore<{ data: string }, { arg: string }> = instantiationService.createInstance(RequestStore, 1);
const request = store.createRequest({ arg: 'foo' });
let threw = false;
try {
await request;
} catch (e) {
threw = true;
}
if (!threw) {
fail();
}
});
});

View file

@ -148,7 +148,7 @@ export class RemoteTerminalService extends Disposable implements IRemoteTerminal
return configurationResolverService.resolveAsync(lastActiveWorkspaceRoot, t);
});
const result = await Promise.all(resolveCalls);
channel.acceptPtyHostResolvedVariables(e.id, result);
channel.acceptPtyHostResolvedVariables(e.requestId, result);
}));
} else {
this._remoteTerminalChannel = null;
@ -162,7 +162,7 @@ export class RemoteTerminalService extends Disposable implements IRemoteTerminal
return this._remoteTerminalChannel.requestDetachInstance(workspaceId, instanceId);
}
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<IProcessDetails | undefined> {
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<void> {
if (!this._remoteTerminalChannel) {
throw new Error(`Cannot accept detached instance when there is no remote!`);
}

View file

@ -204,7 +204,7 @@ export class RemoteTerminalChannelClient {
requestDetachInstance(workspaceId: string, instanceId: number): Promise<IProcessDetails | undefined> {
return this._channel.call('$requestDetachInstance', [workspaceId, instanceId]);
}
acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<IProcessDetails | undefined> {
acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<void> {
return this._channel.call('$acceptDetachedInstance', [requestId, persistentProcessId]);
}
attachToProcess(id: number): Promise<void> {
@ -256,8 +256,8 @@ export class RemoteTerminalChannelClient {
getProfiles(profiles: unknown, defaultProfile: unknown, includeDetectedProfiles?: boolean): Promise<ITerminalProfile[]> {
return this._channel.call('$getProfiles', [this._workspaceContextService.getWorkspace().id, profiles, defaultProfile, includeDetectedProfiles]);
}
acceptPtyHostResolvedVariables(id: number, resolved: string[]) {
return this._channel.call('$acceptPtyHostResolvedVariables', [id, resolved]);
acceptPtyHostResolvedVariables(requestId: number, resolved: string[]) {
return this._channel.call('$acceptPtyHostResolvedVariables', [requestId, resolved]);
}
getEnvironment(): Promise<IProcessEnvironment> {

View file

@ -96,7 +96,7 @@ export interface IOffProcessTerminalService {
getTerminalLayoutInfo(): Promise<ITerminalsLayoutInfo | undefined>;
reduceConnectionGraceTime(): Promise<void>;
requestDetachInstance(workspaceId: string, instanceId: number): Promise<IProcessDetails | undefined>;
acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<IProcessDetails | undefined>;
acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<void>;
}
export const ILocalTerminalService = createDecorator<ILocalTerminalService>('localTerminalService');

View file

@ -121,7 +121,7 @@ export class LocalTerminalService extends Disposable implements ILocalTerminalSe
return configurationResolverService.resolveAsync(lastActiveWorkspaceRoot, t);
});
const result = await Promise.all(resolveCalls);
this._localPtyService.acceptPtyHostResolvedVariables?.(e.id, result);
this._localPtyService.acceptPtyHostResolvedVariables?.(e.requestId, result);
}));
}
}
@ -130,8 +130,8 @@ export class LocalTerminalService extends Disposable implements ILocalTerminalSe
return this._localPtyService.requestDetachInstance(workspaceId, instanceId);
}
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<IProcessDetails | undefined> {
return this._localPtyService.acceptDetachedInstance(requestId, persistentProcessId);
async acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<void> {
await this._localPtyService.acceptDetachedInstance(requestId, persistentProcessId);
}
async updateTitle(id: number, title: string, titleSource: TitleEventSource): Promise<void> {

View file

@ -1684,7 +1684,7 @@ export class TestLocalTerminalService implements ILocalTerminalService {
updateTitle(id: number, title: string): Promise<void> { throw new Error('Method not implemented.'); }
updateIcon(id: number, icon: URI | { light: URI; dark: URI } | { id: string, color?: { id: string } }, color?: string): Promise<void> { throw new Error('Method not implemented.'); }
requestDetachInstance(workspaceId: string, instanceId: number): Promise<IProcessDetails | undefined> { throw new Error('Method not implemented.'); }
acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<IProcessDetails | undefined> { throw new Error('Method not implemented.'); }
acceptDetachedInstance(requestId: number, persistentProcessId: number): Promise<void> { throw new Error('Method not implemented.'); }
}
class TestTerminalChildProcess implements ITerminalChildProcess {