Fix: socket and thread memory leaks (#26984) (#27338)

## Summary

Closes https://github.com/elastic/kibana/issues/25656

Fixes one memory leak, and another potential memory leak.

### Worker thread leak

Occasionally, users would see warnings on the server about a listener having more than 11 detached functions. I tracked it down to the server-side worker thread, specifically the handling of the `getWorkerFunctions` event. Every user or refresh would create another listener, which was never cleaned up. This PR pulls the handler function out in `functionListHandler` and removes the listener once the matching message is received.

We also weren't cleaning up event listeners on the worker when it exited, and while it's not clear if that's actually required, it seemed safer to just remove all of the listeners before new worker instance is created.

### Websocket server leak

This one I don't think is actually a leak. It doesn't look like the teardown on the client instance is required on disconnect, but I couldn't verify that in the code, so better safe than sorry. When a client disconnects, we were removing the `run` listener, but not any of the others. In this pr, `removeAllListeners` is used so that everything is cleaned up on disconnect.
This commit is contained in:
Joe Fleming 2018-12-17 13:48:28 -07:00 committed by GitHub
parent 8645ed03e5
commit 08c73e6a49
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 17 deletions

View file

@ -31,15 +31,7 @@ export function getWorker() {
if (worker) return worker;
worker = fork(workerPath, {});
// 'exit' happens whether we kill the worker or it just dies.
// No need to look for 'error', our worker is intended to be long lived so it isn't running, it's an issue
worker.on('exit', () => {
// Heads up: there is no worker.off
worker = null;
// Restart immediately on exit since node takes a couple seconds to spin up
worker = getWorker();
});
// handle run requests
worker.on('message', msg => {
const { type, value, id } = msg;
if (type === 'run') {
@ -59,6 +51,18 @@ export function getWorker() {
if (type === 'msgError' && heap[id]) heap[id].reject(new Error(value));
});
// handle exit event, fired when we kill the worker or it just dies
worker.on('exit', () => {
// NOTE: No need to look for 'code' or 'signal', if it isn't running, it's an issue
// clean up any listeners on the worker before replacing it
worker.removeAllListeners();
// Restart immediately on exit since node takes a couple seconds to spin up
worker = null;
worker = getWorker();
});
return worker;
}
@ -66,10 +70,18 @@ export function getWorker() {
export const thread = ({ onFunctionNotFound, serialize, deserialize }) => {
const getWorkerFunctions = new Promise(resolve => {
const worker = getWorker();
function functionListHandler(msg) {
// wait for the function list message
if (msg.type === 'functionList') {
// tear dowm the listener once the function list is provided
worker.removeListener('message', functionListHandler);
resolve(msg.value);
}
}
worker.on('message', functionListHandler);
worker.send({ type: 'getFunctions' });
worker.on('message', msg => {
if (msg.type === 'functionList') resolve(msg.value);
});
});
return getWorkerFunctions.then(functions => {
@ -94,7 +106,7 @@ export const thread = ({ onFunctionNotFound, serialize, deserialize }) => {
onFunctionNotFound(ast, deserialize(context)).then(serialize),
};
//
// kill the worker after the timeout is exceeded
setTimeout(() => {
if (!heap[id]) return; // Looks like this has already been cleared from the heap.
if (worker) worker.kill();

View file

@ -75,7 +75,7 @@ export function socketApi(server) {
);
});
const handler = async ({ ast, context, id }) => {
socket.on('run', async ({ ast, context, id }) => {
try {
const value = await routeExpression(ast, deserialize(context));
socket.emit(`resp:${id}`, { type: 'msgSuccess', value: serialize(value) });
@ -83,11 +83,11 @@ export function socketApi(server) {
// TODO: I don't think it is possible to hit this right now? Maybe ever?
socket.emit(`resp:${id}`, { type: 'msgError', value: err });
}
};
});
socket.on('run', handler);
socket.on('disconnect', () => {
socket.removeListener('run', handler);
// remove all listeners on disconnect
socket.removeAllListeners();
});
});
}