Ask the client to pause writing until the socket is sent to the remote extension host process, which then asks the client to resume (#134429)

This commit is contained in:
Alex Dima 2021-11-25 22:41:43 +01:00
parent 3fb9624b29
commit be87ebcd0d
No known key found for this signature in database
GPG key ID: 39563C1504FDD0C9
4 changed files with 139 additions and 29 deletions

View file

@ -254,7 +254,9 @@ const enum ProtocolMessageType {
Control = 2,
Ack = 3,
Disconnect = 5,
ReplayRequest = 6
ReplayRequest = 6,
Pause = 7,
Resume = 8
}
function protocolMessageTypeToString(messageType: ProtocolMessageType) {
@ -265,6 +267,8 @@ function protocolMessageTypeToString(messageType: ProtocolMessageType) {
case ProtocolMessageType.Ack: return 'Ack';
case ProtocolMessageType.Disconnect: return 'Disconnect';
case ProtocolMessageType.ReplayRequest: return 'ReplayRequest';
case ProtocolMessageType.Pause: return 'PauseWriting';
case ProtocolMessageType.Resume: return 'ResumeWriting';
}
}
@ -432,10 +436,15 @@ class ProtocolWriter {
this._writeNow();
}
public pause() {
public pause(): void {
this._isPaused = true;
}
public resume(): void {
this._isPaused = false;
this._scheduleWriting();
}
public write(msg: ProtocolMessage) {
if (this._isDisposed) {
// ignore: there could be left-over promises which complete and then
@ -472,12 +481,21 @@ class ProtocolWriter {
private _writeSoon(header: VSBuffer, data: VSBuffer): void {
if (this._bufferAdd(header, data)) {
setTimeout(() => {
this._writeNow();
});
this._scheduleWriting();
}
}
private _writeNowTimeout: any = null;
private _scheduleWriting(): void {
if (this._writeNowTimeout) {
return;
}
this._writeNowTimeout = setTimeout(() => {
this._writeNowTimeout = null;
this._writeNow();
});
}
private _writeNow(): void {
if (this._totalLength === 0) {
return;
@ -837,6 +855,16 @@ export class PersistentProtocol implements IMessagePassingProtocol {
this._socketWriter.flush();
}
sendPause(): void {
const msg = new ProtocolMessage(ProtocolMessageType.Pause, 0, 0, getEmptyBuffer());
this._socketWriter.write(msg);
}
sendResume(): void {
const msg = new ProtocolMessage(ProtocolMessageType.Resume, 0, 0, getEmptyBuffer());
this._socketWriter.write(msg);
}
pauseSocketWriting() {
this._socketWriter.pause();
}
@ -899,34 +927,59 @@ export class PersistentProtocol implements IMessagePassingProtocol {
} while (true);
}
if (msg.type === ProtocolMessageType.Regular) {
if (msg.id > this._incomingMsgId) {
if (msg.id !== this._incomingMsgId + 1) {
// in case we missed some messages we ask the other party to resend them
const now = Date.now();
if (now - this._lastReplayRequestTime > 10000) {
// send a replay request at most once every 10s
this._lastReplayRequestTime = now;
this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.ReplayRequest, 0, 0, getEmptyBuffer()));
switch (msg.type) {
case ProtocolMessageType.None: {
// N/A
break;
}
case ProtocolMessageType.Regular: {
if (msg.id > this._incomingMsgId) {
if (msg.id !== this._incomingMsgId + 1) {
// in case we missed some messages we ask the other party to resend them
const now = Date.now();
if (now - this._lastReplayRequestTime > 10000) {
// send a replay request at most once every 10s
this._lastReplayRequestTime = now;
this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.ReplayRequest, 0, 0, getEmptyBuffer()));
}
} else {
this._incomingMsgId = msg.id;
this._incomingMsgLastTime = Date.now();
this._sendAckCheck();
this._onMessage.fire(msg.data);
}
} else {
this._incomingMsgId = msg.id;
this._incomingMsgLastTime = Date.now();
this._sendAckCheck();
this._onMessage.fire(msg.data);
}
break;
}
} else if (msg.type === ProtocolMessageType.Control) {
this._onControlMessage.fire(msg.data);
} else if (msg.type === ProtocolMessageType.Disconnect) {
this._onDidDispose.fire();
} else if (msg.type === ProtocolMessageType.ReplayRequest) {
// Send again all unacknowledged messages
const toSend = this._outgoingUnackMsg.toArray();
for (let i = 0, len = toSend.length; i < len; i++) {
this._socketWriter.write(toSend[i]);
case ProtocolMessageType.Control: {
this._onControlMessage.fire(msg.data);
break;
}
case ProtocolMessageType.Ack: {
// nothing to do
break;
}
case ProtocolMessageType.Disconnect: {
this._onDidDispose.fire();
break;
}
case ProtocolMessageType.ReplayRequest: {
// Send again all unacknowledged messages
const toSend = this._outgoingUnackMsg.toArray();
for (let i = 0, len = toSend.length; i < len; i++) {
this._socketWriter.write(toSend[i]);
}
this._recvAckCheck();
break;
}
case ProtocolMessageType.Pause: {
this._socketWriter.pause();
break;
}
case ProtocolMessageType.Resume: {
this._socketWriter.resume();
break;
}
this._recvAckCheck();
}
}

View file

@ -364,6 +364,59 @@ suite('PersistentProtocol reconnection', () => {
}
);
});
test('writing can be paused', async () => {
await runWithFakedTimers({ useFakeTimers: true, maxTaskCount: 100 }, async () => {
const loadEstimator: ILoadEstimator = {
hasHighLoad: () => false
};
const ether = new Ether();
const aSocket = new NodeSocket(ether.a);
const a = new PersistentProtocol(aSocket, null, loadEstimator);
const aMessages = new MessageStream(a);
const bSocket = new NodeSocket(ether.b);
const b = new PersistentProtocol(bSocket, null, loadEstimator);
const bMessages = new MessageStream(b);
// send one message A -> B
a.send(VSBuffer.fromString('a1'));
const a1 = await bMessages.waitForOne();
assert.strictEqual(a1.toString(), 'a1');
// ask A to pause writing
b.sendPause();
// send a message B -> A
b.send(VSBuffer.fromString('b1'));
const b1 = await aMessages.waitForOne();
assert.strictEqual(b1.toString(), 'b1');
// send a message A -> B (this should be blocked at A)
a.send(VSBuffer.fromString('a2'));
// wait a long time and check that not even acks are written
await timeout(2 * ProtocolConstants.AcknowledgeTime);
assert.strictEqual(a.unacknowledgedCount, 1);
assert.strictEqual(b.unacknowledgedCount, 1);
// ask A to resume writing
b.sendResume();
// check that B receives message
const a2 = await bMessages.waitForOne();
assert.strictEqual(a2.toString(), 'a2');
// wait a long time and check that acks are written
await timeout(2 * ProtocolConstants.AcknowledgeTime);
assert.strictEqual(a.unacknowledgedCount, 0);
assert.strictEqual(b.unacknowledgedCount, 0);
aMessages.dispose();
bMessages.dispose();
a.dispose();
b.dispose();
});
});
});
suite('IPC, create handle', () => {

View file

@ -754,6 +754,7 @@ export class RemoteExtensionHostAgentServer extends Disposable {
}
}
protocol.sendPause();
protocol.sendControl(VSBuffer.fromString(JSON.stringify(startParams.port ? { debugPort: startParams.port } : {})));
const dataChunk = protocol.readEntireBuffer();
protocol.dispose();
@ -766,6 +767,7 @@ export class RemoteExtensionHostAgentServer extends Disposable {
return this._rejectWebSocketConnection(logPrefix, protocol, `Duplicate reconnection token`);
}
protocol.sendPause();
protocol.sendControl(VSBuffer.fromString(JSON.stringify(startParams.port ? { debugPort: startParams.port } : {})));
const dataChunk = protocol.readEntireBuffer();
protocol.dispose();

View file

@ -134,9 +134,11 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
disconnectRunner2.cancel();
protocol.beginAcceptReconnection(socket, initialDataChunk);
protocol.endAcceptReconnection();
protocol.sendResume();
} else {
clearTimeout(timer);
protocol = new PersistentProtocol(socket, initialDataChunk);
protocol.sendResume();
protocol.onDidDispose(() => onTerminate('renderer disconnected'));
resolve(protocol);