Revert "[kbn/optimizer] poll parent process to avoid zombie processes (#67059)"

This reverts commit e824ac3ba9.
This commit is contained in:
spalger 2020-05-21 20:52:55 -07:00
parent 556ae523c8
commit dcfa485009
7 changed files with 28 additions and 361 deletions

View file

@ -21,7 +21,6 @@ export * from './bundle';
export * from './bundle_cache';
export * from './worker_config';
export * from './worker_messages';
export * from './parent_messages';
export * from './compiler_messages';
export * from './ts_helpers';
export * from './rxjs_helpers';

View file

@ -1,33 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export interface ParentPongMsg {
type: 'pong';
}
export const isParentPong = (value: any): value is ParentPongMsg =>
typeof value === 'object' && value && value.type === 'pong';
export class ParentMsgs {
pong(): ParentPongMsg {
return {
type: 'pong',
};
}
}

View file

@ -24,17 +24,13 @@ import {
CompilerErrorMsg,
} from './compiler_messages';
export type InternalWorkerMsg =
| WorkerPingMsg
export type WorkerMsg =
| CompilerRunningMsg
| CompilerIssueMsg
| CompilerSuccessMsg
| CompilerErrorMsg
| WorkerErrorMsg;
// ping messages are internal, they don't apper in public message streams
export type WorkerMsg = Exclude<InternalWorkerMsg, WorkerPingMsg>;
/**
* Message sent when the worker encounters an error that it can't
* recover from, no more messages will be sent and the worker
@ -46,10 +42,6 @@ export interface WorkerErrorMsg {
errorStack?: string;
}
export interface WorkerPingMsg {
type: 'ping';
}
const WORKER_STATE_TYPES: ReadonlyArray<WorkerMsg['type']> = [
'running',
'compiler issue',
@ -58,19 +50,10 @@ const WORKER_STATE_TYPES: ReadonlyArray<WorkerMsg['type']> = [
'worker error',
];
export const isWorkerPing = (value: any): value is WorkerPingMsg =>
typeof value === 'object' && value && value.type === 'ping';
export const isWorkerMsg = (value: any): value is WorkerMsg =>
typeof value === 'object' && value && WORKER_STATE_TYPES.includes(value.type);
export class WorkerMsgs {
ping(): WorkerPingMsg {
return {
type: 'ping',
};
}
error(error: Error): WorkerErrorMsg {
return {
type: 'worker error',

View file

@ -22,14 +22,12 @@ import { Readable } from 'stream';
import { inspect } from 'util';
import * as Rx from 'rxjs';
import { map, filter, takeUntil } from 'rxjs/operators';
import { map, takeUntil } from 'rxjs/operators';
import { isWorkerMsg, isWorkerPing, WorkerConfig, WorkerMsg, Bundle, ParentMsgs } from '../common';
import { isWorkerMsg, WorkerConfig, WorkerMsg, Bundle } from '../common';
import { OptimizerConfig } from './optimizer_config';
const parentMsgs = new ParentMsgs();
export interface WorkerStdio {
type: 'worker stdio';
stream: 'stdout' | 'stderr';
@ -148,16 +146,6 @@ export function observeWorker(
observeStdio$(proc.stderr, 'stderr'),
Rx.fromEvent<[unknown]>(proc, 'message')
.pipe(
// filter out ping messages so they don't end up in the general message stream
filter(([msg]) => {
if (!isWorkerPing(msg)) {
return true;
}
proc.send(parentMsgs.pong());
return false;
}),
// validate the messages from the process
map(([msg]) => {
if (!isWorkerMsg(msg)) {

View file

@ -1,178 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { EventEmitter } from 'events';
import { inspect } from 'util';
import * as Rx from 'rxjs';
import { tap, takeUntil } from 'rxjs/operators';
import { observeParentOffline, Process } from './observe_parent_offline';
import { WorkerMsgs, ParentMsgs, isWorkerPing } from '../common';
jest.useFakeTimers();
beforeEach(() => {
jest.clearAllTimers();
});
const workerMsgs = new WorkerMsgs();
const parentMsgs = new ParentMsgs();
class MockProcess extends EventEmitter implements Process {
connected?: boolean;
send?: jest.Mock;
constructor(options: { connected?: boolean; send?: jest.Mock | false } = {}) {
super();
this.connected = options.connected ?? true;
this.send = options.send === false ? undefined : options.send ?? jest.fn();
}
}
async function record(observable: Rx.Observable<any>): Promise<string[]> {
const notes: string[] = [];
await observable
.pipe(
tap({
next(value) {
notes.push(`next: ${inspect(value)}`);
},
error(error) {
notes.push(`error: ${inspect(error)}`);
},
complete() {
notes.push(`complete`);
},
})
)
.toPromise();
return notes;
}
async function waitForTick() {
await new Promise(resolve => {
process.nextTick(resolve);
});
}
describe('emits and completes when parent exists because:', () => {
test('"disconnect" event', async () => {
const mockProc = new MockProcess();
const promise = record(observeParentOffline(mockProc, workerMsgs));
mockProc.emit('disconnect');
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (disconnect event)'",
"complete",
]
`);
});
test('process.connected is false', async () => {
const mockProc = new MockProcess({
connected: false,
});
const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersToNextTimer();
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (disconnected)'",
"complete",
]
`);
});
test('process.send is falsey', async () => {
const mockProc = new MockProcess({
send: false,
});
const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersToNextTimer();
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (disconnected)'",
"complete",
]
`);
});
test('process.send throws "ERR_IPC_CHANNEL_CLOSED"', async () => {
const mockProc = new MockProcess({
send: jest.fn(() => {
const error = new Error();
(error as any).code = 'ERR_IPC_CHANNEL_CLOSED';
throw error;
}),
});
const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersToNextTimer();
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (ipc channel exception)'",
"complete",
]
`);
});
test('ping timeout', async () => {
const mockProc = new MockProcess({});
const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersByTime(10000);
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (ping timeout)'",
"complete",
]
`);
});
});
test('it emits nothing if parent responds with pongs', async () => {
const send = jest.fn((msg: any) => {
if (isWorkerPing(msg)) {
process.nextTick(() => {
mockProc.emit('message', parentMsgs.pong(), undefined);
});
}
});
const mockProc = new MockProcess({ send });
const unsub$ = new Rx.Subject();
const promise = record(observeParentOffline(mockProc, workerMsgs).pipe(takeUntil(unsub$)));
jest.advanceTimersByTime(5000);
await waitForTick();
jest.advanceTimersByTime(5000);
await waitForTick();
unsub$.next();
expect(await promise).toMatchInlineSnapshot(`
Array [
"complete",
]
`);
expect(send).toHaveBeenCalledTimes(2);
});

View file

@ -1,97 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { EventEmitter } from 'events';
import * as Rx from 'rxjs';
import { mergeMap, take, first, map, catchError } from 'rxjs/operators';
import { isParentPong, WorkerMsgs } from '../common';
const sleep = (ms: number) => Rx.timer(ms).pipe(take(1));
export interface Process extends EventEmitter {
connected?: boolean;
send?: (msg: any) => void;
}
/**
* Returns an observable that will emit a value when the parent
* process goes offline. It accomplishes this by merging several
* signals:
*
* 1. process "disconnect" event
* 2. process.connected or process.send are falsy
* 3. a ping was sent to the parent process but it didn't respond
* with a pong within 5 seconds
* 4. a ping was sent to the parent process but the process.send
* call errored with an 'ERR_IPC_CHANNEL_CLOSED' exception
*/
export function observeParentOffline(process: Process, workerMsgs: WorkerMsgs) {
return Rx.race(
Rx.fromEvent(process, 'disconnect').pipe(
take(1),
map(() => 'parent offline (disconnect event)')
),
sleep(5000).pipe(
mergeMap(() => {
if (!process.connected || !process.send) {
return Rx.of('parent offline (disconnected)');
}
process.send(workerMsgs.ping());
const pong$ = Rx.fromEvent<[any]>(process, 'message').pipe(
first(([msg]) => isParentPong(msg)),
map(() => {
throw new Error('parent still online');
})
);
// give the parent some time to respond, if the ping
// wins the race the parent is considered online
const timeout$ = sleep(5000).pipe(map(() => 'parent offline (ping timeout)'));
return Rx.race(pong$, timeout$);
})
)
).pipe(
/**
* resubscribe to the source observable (triggering the timer,
* ping, wait for response) if the source observable does not
* observe the parent being offline yet.
*
* Scheduling the interval this way prevents the ping timeout
* from overlaping with the interval by only scheduling the
* next ping once the previous ping has completed
*/
catchError((error, resubscribe) => {
if (error.code === 'ERR_IPC_CHANNEL_CLOSED') {
return Rx.of('parent offline (ipc channel exception)');
}
if (error.message === 'parent still online') {
return resubscribe;
}
throw error;
})
);
}

View file

@ -18,12 +18,10 @@
*/
import * as Rx from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { parseBundles, parseWorkerConfig, WorkerMsg, isWorkerMsg, WorkerMsgs } from '../common';
import { runCompilers } from './run_compilers';
import { observeParentOffline } from './observe_parent_offline';
/**
**
@ -66,6 +64,15 @@ const exit = (code: number) => {
}, 5000).unref();
};
// check for connected parent on an unref'd timer rather than listening
// to "disconnect" since that listner prevents the process from exiting
setInterval(() => {
if (!process.connected) {
// parent is gone
process.exit(0);
}
}, 1000).unref();
Rx.defer(() => {
const workerConfig = parseWorkerConfig(process.argv[2]);
const bundles = parseBundles(process.argv[3]);
@ -74,22 +81,20 @@ Rx.defer(() => {
process.env.BROWSERSLIST_ENV = workerConfig.browserslistEnv;
return runCompilers(workerConfig, bundles);
})
.pipe(takeUntil(observeParentOffline(process, workerMsgs)))
.subscribe(
msg => {
send(msg);
},
error => {
if (isWorkerMsg(error)) {
send(error);
} else {
send(workerMsgs.error(error));
}
exit(1);
},
() => {
exit(0);
}).subscribe(
msg => {
send(msg);
},
error => {
if (isWorkerMsg(error)) {
send(error);
} else {
send(workerMsgs.error(error));
}
);
exit(1);
},
() => {
exit(0);
}
);