From ff7d887d1de4c777b63fe53f9b80c044ab8e440a Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Mon, 17 Dec 2018 13:06:50 -0700 Subject: [PATCH] Fix: socket and thread memory leaks (#26984) ## Summary Closes https://github.com/elastic/kibana/issues/25656 Fixes one memory leak, and another potential memory leak. ### Worker thread leak Occasionally, users would see warnings on the server about a listener having more than 11 detached functions. I tracked it down to the server-side worker thread, specifically the handling of the `getWorkerFunctions` event. Every user or refresh would create another listener, which was never cleaned up. This PR pulls the handler function out in `functionListHandler` and removes the listener once the matching message is received. We also weren't cleaning up event listeners on the worker when it exited, and while it's not clear if that's actually required, it seemed safer to just remove all of the listeners before new worker instance is created. ### Websocket server leak This one I don't think is actually a leak. It doesn't look like the teardown on the client instance is required on disconnect, but I couldn't verify that in the code, so better safe than sorry. When a client disconnects, we were removing the `run` listener, but not any of the others. In this pr, `removeAllListeners` is used so that everything is cleaned up on disconnect. --- .../lib/route_expression/thread/index.js | 38 ++++++++++++------- .../interpreter/server/routes/socket.js | 8 ++-- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js b/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js index ff476793325e..942d0c66dc81 100644 --- a/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js +++ b/src/legacy/core_plugins/interpreter/server/lib/route_expression/thread/index.js @@ -31,15 +31,7 @@ export function getWorker() { if (worker) return worker; worker = fork(workerPath, {}); - // 'exit' happens whether we kill the worker or it just dies. - // No need to look for 'error', our worker is intended to be long lived so it isn't running, it's an issue - worker.on('exit', () => { - // Heads up: there is no worker.off - worker = null; - // Restart immediately on exit since node takes a couple seconds to spin up - worker = getWorker(); - }); - + // handle run requests worker.on('message', msg => { const { type, value, id } = msg; if (type === 'run') { @@ -59,6 +51,18 @@ export function getWorker() { if (type === 'msgError' && heap[id]) heap[id].reject(new Error(value)); }); + // handle exit event, fired when we kill the worker or it just dies + worker.on('exit', () => { + // NOTE: No need to look for 'code' or 'signal', if it isn't running, it's an issue + + // clean up any listeners on the worker before replacing it + worker.removeAllListeners(); + + // Restart immediately on exit since node takes a couple seconds to spin up + worker = null; + worker = getWorker(); + }); + return worker; } @@ -66,10 +70,18 @@ export function getWorker() { export const thread = ({ onFunctionNotFound, serialize, deserialize }) => { const getWorkerFunctions = new Promise(resolve => { const worker = getWorker(); + + function functionListHandler(msg) { + // wait for the function list message + if (msg.type === 'functionList') { + // tear dowm the listener once the function list is provided + worker.removeListener('message', functionListHandler); + resolve(msg.value); + } + } + worker.on('message', functionListHandler); + worker.send({ type: 'getFunctions' }); - worker.on('message', msg => { - if (msg.type === 'functionList') resolve(msg.value); - }); }); return getWorkerFunctions.then(functions => { @@ -94,7 +106,7 @@ export const thread = ({ onFunctionNotFound, serialize, deserialize }) => { onFunctionNotFound(ast, deserialize(context)).then(serialize), }; - // + // kill the worker after the timeout is exceeded setTimeout(() => { if (!heap[id]) return; // Looks like this has already been cleared from the heap. if (worker) worker.kill(); diff --git a/src/legacy/core_plugins/interpreter/server/routes/socket.js b/src/legacy/core_plugins/interpreter/server/routes/socket.js index 80733690a7d1..8befd511937e 100644 --- a/src/legacy/core_plugins/interpreter/server/routes/socket.js +++ b/src/legacy/core_plugins/interpreter/server/routes/socket.js @@ -75,7 +75,7 @@ export function socketApi(server) { ); }); - const handler = async ({ ast, context, id }) => { + socket.on('run', async ({ ast, context, id }) => { try { const value = await routeExpression(ast, deserialize(context)); socket.emit(`resp:${id}`, { type: 'msgSuccess', value: serialize(value) }); @@ -83,11 +83,11 @@ export function socketApi(server) { // TODO: I don't think it is possible to hit this right now? Maybe ever? socket.emit(`resp:${id}`, { type: 'msgError', value: err }); } - }; + }); - socket.on('run', handler); socket.on('disconnect', () => { - socket.removeListener('run', handler); + // remove all listeners on disconnect + socket.removeAllListeners(); }); }); }