diff --git a/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js b/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js index ff476793325e..942d0c66dc81 100644 --- a/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js +++ b/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js @@ -31,15 +31,7 @@ export function getWorker() { if (worker) return worker; worker = fork(workerPath, {}); - // 'exit' happens whether we kill the worker or it just dies. - // No need to look for 'error', our worker is intended to be long lived so it isn't running, it's an issue - worker.on('exit', () => { - // Heads up: there is no worker.off - worker = null; - // Restart immediately on exit since node takes a couple seconds to spin up - worker = getWorker(); - }); - + // handle run requests worker.on('message', msg => { const { type, value, id } = msg; if (type === 'run') { @@ -59,6 +51,18 @@ export function getWorker() { if (type === 'msgError' && heap[id]) heap[id].reject(new Error(value)); }); + // handle exit event, fired when we kill the worker or it just dies + worker.on('exit', () => { + // NOTE: No need to look for 'code' or 'signal', if it isn't running, it's an issue + + // clean up any listeners on the worker before replacing it + worker.removeAllListeners(); + + // Restart immediately on exit since node takes a couple seconds to spin up + worker = null; + worker = getWorker(); + }); + return worker; } @@ -66,10 +70,18 @@ export function getWorker() { export const thread = ({ onFunctionNotFound, serialize, deserialize }) => { const getWorkerFunctions = new Promise(resolve => { const worker = getWorker(); + + function functionListHandler(msg) { + // wait for the function list message + if (msg.type === 'functionList') { + // tear dowm the listener once the function list is provided + worker.removeListener('message', functionListHandler); + resolve(msg.value); + } + } + worker.on('message', functionListHandler); + worker.send({ type: 'getFunctions' }); - worker.on('message', msg => { - if (msg.type === 'functionList') resolve(msg.value); - }); }); return getWorkerFunctions.then(functions => { @@ -94,7 +106,7 @@ export const thread = ({ onFunctionNotFound, serialize, deserialize }) => { onFunctionNotFound(ast, deserialize(context)).then(serialize), }; - // + // kill the worker after the timeout is exceeded setTimeout(() => { if (!heap[id]) return; // Looks like this has already been cleared from the heap. if (worker) worker.kill(); diff --git a/src/legacy/core_plugins/interpreter/server/routes/socket.js b/src/legacy/core_plugins/interpreter/server/routes/socket.js index 80733690a7d1..8befd511937e 100644 --- a/src/legacy/core_plugins/interpreter/server/routes/socket.js +++ b/src/legacy/core_plugins/interpreter/server/routes/socket.js @@ -75,7 +75,7 @@ export function socketApi(server) { ); }); - const handler = async ({ ast, context, id }) => { + socket.on('run', async ({ ast, context, id }) => { try { const value = await routeExpression(ast, deserialize(context)); socket.emit(`resp:${id}`, { type: 'msgSuccess', value: serialize(value) }); @@ -83,11 +83,11 @@ export function socketApi(server) { // TODO: I don't think it is possible to hit this right now? Maybe ever? socket.emit(`resp:${id}`, { type: 'msgError', value: err }); } - }; + }); - socket.on('run', handler); socket.on('disconnect', () => { - socket.removeListener('run', handler); + // remove all listeners on disconnect + socket.removeAllListeners(); }); }); }