From dcfa4850098972b6b00fe869048e61602aea4683 Mon Sep 17 00:00:00 2001 From: spalger Date: Thu, 21 May 2020 20:52:55 -0700 Subject: [PATCH] Revert "[kbn/optimizer] poll parent process to avoid zombie processes (#67059)" This reverts commit e824ac3ba952ee2099900e3ce6091d85b78ab00b. --- packages/kbn-optimizer/src/common/index.ts | 1 - .../src/common/parent_messages.ts | 33 ---- .../src/common/worker_messages.ts | 19 +- .../src/optimizer/observe_worker.ts | 16 +- .../src/worker/observe_parent_offline.test.ts | 178 ------------------ .../src/worker/observe_parent_offline.ts | 97 ---------- .../kbn-optimizer/src/worker/run_worker.ts | 45 +++-- 7 files changed, 28 insertions(+), 361 deletions(-) delete mode 100644 packages/kbn-optimizer/src/common/parent_messages.ts delete mode 100644 packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts delete mode 100644 packages/kbn-optimizer/src/worker/observe_parent_offline.ts diff --git a/packages/kbn-optimizer/src/common/index.ts b/packages/kbn-optimizer/src/common/index.ts index 376b9ed35090..c51905be0456 100644 --- a/packages/kbn-optimizer/src/common/index.ts +++ b/packages/kbn-optimizer/src/common/index.ts @@ -21,7 +21,6 @@ export * from './bundle'; export * from './bundle_cache'; export * from './worker_config'; export * from './worker_messages'; -export * from './parent_messages'; export * from './compiler_messages'; export * from './ts_helpers'; export * from './rxjs_helpers'; diff --git a/packages/kbn-optimizer/src/common/parent_messages.ts b/packages/kbn-optimizer/src/common/parent_messages.ts deleted file mode 100644 index c27bcb96f4a8..000000000000 --- a/packages/kbn-optimizer/src/common/parent_messages.ts +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -export interface ParentPongMsg { - type: 'pong'; -} - -export const isParentPong = (value: any): value is ParentPongMsg => - typeof value === 'object' && value && value.type === 'pong'; - -export class ParentMsgs { - pong(): ParentPongMsg { - return { - type: 'pong', - }; - } -} diff --git a/packages/kbn-optimizer/src/common/worker_messages.ts b/packages/kbn-optimizer/src/common/worker_messages.ts index 0435f5b4c401..d3c03f483d7e 100644 --- a/packages/kbn-optimizer/src/common/worker_messages.ts +++ b/packages/kbn-optimizer/src/common/worker_messages.ts @@ -24,17 +24,13 @@ import { CompilerErrorMsg, } from './compiler_messages'; -export type InternalWorkerMsg = - | WorkerPingMsg +export type WorkerMsg = | CompilerRunningMsg | CompilerIssueMsg | CompilerSuccessMsg | CompilerErrorMsg | WorkerErrorMsg; -// ping messages are internal, they don't apper in public message streams -export type WorkerMsg = Exclude; - /** * Message sent when the worker encounters an error that it can't * recover from, no more messages will be sent and the worker @@ -46,10 +42,6 @@ export interface WorkerErrorMsg { errorStack?: string; } -export interface WorkerPingMsg { - type: 'ping'; -} - const WORKER_STATE_TYPES: ReadonlyArray = [ 'running', 'compiler issue', @@ -58,19 +50,10 @@ const WORKER_STATE_TYPES: ReadonlyArray = [ 'worker error', ]; -export const isWorkerPing = (value: any): value is WorkerPingMsg => - typeof value === 'object' && value && value.type === 'ping'; - export const isWorkerMsg = (value: any): value is WorkerMsg => typeof value === 'object' && value && WORKER_STATE_TYPES.includes(value.type); export class WorkerMsgs { - ping(): WorkerPingMsg { - return { - type: 'ping', - }; - } - error(error: Error): WorkerErrorMsg { return { type: 'worker error', diff --git a/packages/kbn-optimizer/src/optimizer/observe_worker.ts b/packages/kbn-optimizer/src/optimizer/observe_worker.ts index 90c53f1ef9e8..bfc853e5a6b7 100644 --- a/packages/kbn-optimizer/src/optimizer/observe_worker.ts +++ b/packages/kbn-optimizer/src/optimizer/observe_worker.ts @@ -22,14 +22,12 @@ import { Readable } from 'stream'; import { inspect } from 'util'; import * as Rx from 'rxjs'; -import { map, filter, takeUntil } from 'rxjs/operators'; +import { map, takeUntil } from 'rxjs/operators'; -import { isWorkerMsg, isWorkerPing, WorkerConfig, WorkerMsg, Bundle, ParentMsgs } from '../common'; +import { isWorkerMsg, WorkerConfig, WorkerMsg, Bundle } from '../common'; import { OptimizerConfig } from './optimizer_config'; -const parentMsgs = new ParentMsgs(); - export interface WorkerStdio { type: 'worker stdio'; stream: 'stdout' | 'stderr'; @@ -148,16 +146,6 @@ export function observeWorker( observeStdio$(proc.stderr, 'stderr'), Rx.fromEvent<[unknown]>(proc, 'message') .pipe( - // filter out ping messages so they don't end up in the general message stream - filter(([msg]) => { - if (!isWorkerPing(msg)) { - return true; - } - - proc.send(parentMsgs.pong()); - return false; - }), - // validate the messages from the process map(([msg]) => { if (!isWorkerMsg(msg)) { diff --git a/packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts b/packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts deleted file mode 100644 index 353f570e2cac..000000000000 --- a/packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { EventEmitter } from 'events'; -import { inspect } from 'util'; - -import * as Rx from 'rxjs'; -import { tap, takeUntil } from 'rxjs/operators'; - -import { observeParentOffline, Process } from './observe_parent_offline'; -import { WorkerMsgs, ParentMsgs, isWorkerPing } from '../common'; - -jest.useFakeTimers(); - -beforeEach(() => { - jest.clearAllTimers(); -}); - -const workerMsgs = new WorkerMsgs(); -const parentMsgs = new ParentMsgs(); -class MockProcess extends EventEmitter implements Process { - connected?: boolean; - send?: jest.Mock; - - constructor(options: { connected?: boolean; send?: jest.Mock | false } = {}) { - super(); - - this.connected = options.connected ?? true; - this.send = options.send === false ? undefined : options.send ?? jest.fn(); - } -} - -async function record(observable: Rx.Observable): Promise { - const notes: string[] = []; - - await observable - .pipe( - tap({ - next(value) { - notes.push(`next: ${inspect(value)}`); - }, - error(error) { - notes.push(`error: ${inspect(error)}`); - }, - complete() { - notes.push(`complete`); - }, - }) - ) - .toPromise(); - - return notes; -} - -async function waitForTick() { - await new Promise(resolve => { - process.nextTick(resolve); - }); -} - -describe('emits and completes when parent exists because:', () => { - test('"disconnect" event', async () => { - const mockProc = new MockProcess(); - const promise = record(observeParentOffline(mockProc, workerMsgs)); - mockProc.emit('disconnect'); - expect(await promise).toMatchInlineSnapshot(` - Array [ - "next: 'parent offline (disconnect event)'", - "complete", - ] - `); - }); - - test('process.connected is false', async () => { - const mockProc = new MockProcess({ - connected: false, - }); - - const promise = record(observeParentOffline(mockProc, workerMsgs)); - jest.advanceTimersToNextTimer(); - expect(await promise).toMatchInlineSnapshot(` - Array [ - "next: 'parent offline (disconnected)'", - "complete", - ] - `); - }); - - test('process.send is falsey', async () => { - const mockProc = new MockProcess({ - send: false, - }); - - const promise = record(observeParentOffline(mockProc, workerMsgs)); - jest.advanceTimersToNextTimer(); - expect(await promise).toMatchInlineSnapshot(` - Array [ - "next: 'parent offline (disconnected)'", - "complete", - ] - `); - }); - - test('process.send throws "ERR_IPC_CHANNEL_CLOSED"', async () => { - const mockProc = new MockProcess({ - send: jest.fn(() => { - const error = new Error(); - (error as any).code = 'ERR_IPC_CHANNEL_CLOSED'; - throw error; - }), - }); - - const promise = record(observeParentOffline(mockProc, workerMsgs)); - jest.advanceTimersToNextTimer(); - expect(await promise).toMatchInlineSnapshot(` - Array [ - "next: 'parent offline (ipc channel exception)'", - "complete", - ] - `); - }); - - test('ping timeout', async () => { - const mockProc = new MockProcess({}); - - const promise = record(observeParentOffline(mockProc, workerMsgs)); - jest.advanceTimersByTime(10000); - expect(await promise).toMatchInlineSnapshot(` - Array [ - "next: 'parent offline (ping timeout)'", - "complete", - ] - `); - }); -}); - -test('it emits nothing if parent responds with pongs', async () => { - const send = jest.fn((msg: any) => { - if (isWorkerPing(msg)) { - process.nextTick(() => { - mockProc.emit('message', parentMsgs.pong(), undefined); - }); - } - }); - - const mockProc = new MockProcess({ send }); - const unsub$ = new Rx.Subject(); - const promise = record(observeParentOffline(mockProc, workerMsgs).pipe(takeUntil(unsub$))); - - jest.advanceTimersByTime(5000); - await waitForTick(); - jest.advanceTimersByTime(5000); - await waitForTick(); - unsub$.next(); - - expect(await promise).toMatchInlineSnapshot(` - Array [ - "complete", - ] - `); - expect(send).toHaveBeenCalledTimes(2); -}); diff --git a/packages/kbn-optimizer/src/worker/observe_parent_offline.ts b/packages/kbn-optimizer/src/worker/observe_parent_offline.ts deleted file mode 100644 index 94ec34b409dd..000000000000 --- a/packages/kbn-optimizer/src/worker/observe_parent_offline.ts +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { EventEmitter } from 'events'; - -import * as Rx from 'rxjs'; -import { mergeMap, take, first, map, catchError } from 'rxjs/operators'; - -import { isParentPong, WorkerMsgs } from '../common'; - -const sleep = (ms: number) => Rx.timer(ms).pipe(take(1)); - -export interface Process extends EventEmitter { - connected?: boolean; - send?: (msg: any) => void; -} - -/** - * Returns an observable that will emit a value when the parent - * process goes offline. It accomplishes this by merging several - * signals: - * - * 1. process "disconnect" event - * 2. process.connected or process.send are falsy - * 3. a ping was sent to the parent process but it didn't respond - * with a pong within 5 seconds - * 4. a ping was sent to the parent process but the process.send - * call errored with an 'ERR_IPC_CHANNEL_CLOSED' exception - */ -export function observeParentOffline(process: Process, workerMsgs: WorkerMsgs) { - return Rx.race( - Rx.fromEvent(process, 'disconnect').pipe( - take(1), - map(() => 'parent offline (disconnect event)') - ), - - sleep(5000).pipe( - mergeMap(() => { - if (!process.connected || !process.send) { - return Rx.of('parent offline (disconnected)'); - } - - process.send(workerMsgs.ping()); - - const pong$ = Rx.fromEvent<[any]>(process, 'message').pipe( - first(([msg]) => isParentPong(msg)), - map(() => { - throw new Error('parent still online'); - }) - ); - - // give the parent some time to respond, if the ping - // wins the race the parent is considered online - const timeout$ = sleep(5000).pipe(map(() => 'parent offline (ping timeout)')); - - return Rx.race(pong$, timeout$); - }) - ) - ).pipe( - /** - * resubscribe to the source observable (triggering the timer, - * ping, wait for response) if the source observable does not - * observe the parent being offline yet. - * - * Scheduling the interval this way prevents the ping timeout - * from overlaping with the interval by only scheduling the - * next ping once the previous ping has completed - */ - catchError((error, resubscribe) => { - if (error.code === 'ERR_IPC_CHANNEL_CLOSED') { - return Rx.of('parent offline (ipc channel exception)'); - } - - if (error.message === 'parent still online') { - return resubscribe; - } - - throw error; - }) - ); -} diff --git a/packages/kbn-optimizer/src/worker/run_worker.ts b/packages/kbn-optimizer/src/worker/run_worker.ts index 0a9adc2a3db5..cbec4c3f44c7 100644 --- a/packages/kbn-optimizer/src/worker/run_worker.ts +++ b/packages/kbn-optimizer/src/worker/run_worker.ts @@ -18,12 +18,10 @@ */ import * as Rx from 'rxjs'; -import { takeUntil } from 'rxjs/operators'; import { parseBundles, parseWorkerConfig, WorkerMsg, isWorkerMsg, WorkerMsgs } from '../common'; import { runCompilers } from './run_compilers'; -import { observeParentOffline } from './observe_parent_offline'; /** ** @@ -66,6 +64,15 @@ const exit = (code: number) => { }, 5000).unref(); }; +// check for connected parent on an unref'd timer rather than listening +// to "disconnect" since that listner prevents the process from exiting +setInterval(() => { + if (!process.connected) { + // parent is gone + process.exit(0); + } +}, 1000).unref(); + Rx.defer(() => { const workerConfig = parseWorkerConfig(process.argv[2]); const bundles = parseBundles(process.argv[3]); @@ -74,22 +81,20 @@ Rx.defer(() => { process.env.BROWSERSLIST_ENV = workerConfig.browserslistEnv; return runCompilers(workerConfig, bundles); -}) - .pipe(takeUntil(observeParentOffline(process, workerMsgs))) - .subscribe( - msg => { - send(msg); - }, - error => { - if (isWorkerMsg(error)) { - send(error); - } else { - send(workerMsgs.error(error)); - } - - exit(1); - }, - () => { - exit(0); +}).subscribe( + msg => { + send(msg); + }, + error => { + if (isWorkerMsg(error)) { + send(error); + } else { + send(workerMsgs.error(error)); } - ); + + exit(1); + }, + () => { + exit(0); + } +);