Remove WebSockets from Canvas expressions interpreter (#29792)

This modifies the interpreter to use REST instead of WebSockets.
This commit is contained in:
Chris Davies 2019-02-05 11:51:45 -05:00 committed by GitHub
parent 52e12c7cda
commit ebd3a82643
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 179 additions and 730 deletions

View file

@ -215,7 +215,6 @@
"rxjs": "^6.2.1",
"script-loader": "0.7.2",
"semver": "^5.5.0",
"socket.io": "^2.1.1",
"stream-stream": "^1.2.6",
"style-loader": "0.23.1",
"tar": "2.2.0",

View file

@ -12,7 +12,6 @@
"@kbn/i18n": "1.0.0",
"lodash": "npm:@elastic/lodash@3.10.1-kibana1",
"lodash.clone": "^4.5.0",
"socket.io-client": "^2.1.1",
"uuid": "3.0.1"
},
"devDependencies": {

View file

@ -20,7 +20,7 @@
export { FunctionsRegistry } from './lib/functions_registry';
export { TypesRegistry } from './lib/types_registry';
export { createError } from './interpreter/create_error';
export { interpretProvider } from './interpreter/interpret';
export { interpreterProvider } from './interpreter/interpret';
export { serializeProvider } from './lib/serialize';
export { fromExpression, toExpression, safeElementFromExpression } from './lib/ast';
export { Fn } from './lib/fn';

View file

@ -25,8 +25,8 @@ import { getByAlias } from '../lib/get_by_alias';
import { castProvider } from './cast';
import { createError } from './create_error';
export function interpretProvider(config) {
const { functions, onFunctionNotFound, types } = config;
export function interpreterProvider(config) {
const { functions, types } = config;
const handlers = { ...config.handlers, types };
const cast = castProvider(types);
@ -54,15 +54,8 @@ export function interpretProvider(config) {
const { function: fnName, arguments: fnArgs } = link;
const fnDef = getByAlias(functions, fnName);
// if the function is not found, pass the expression chain to the not found handler
// in this case, it will try to execute the function in another context
if (!fnDef) {
chain.unshift(link);
try {
return await onFunctionNotFound({ type: 'expression', chain: chain }, context);
} catch (e) {
return createError(e);
}
return createError({ message: `Function ${fnName} could not be found.` });
}
try {

View file

@ -1,81 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import uuid from 'uuid/v4';
import { getByAlias } from '../lib/get_by_alias';
import { serializeProvider } from '../lib/serialize';
import { interpretProvider } from './interpret';
/*
Returns an interpet function that can shuttle partial ASTs and context between instances of itself over a socket
This is the interpreter that gets called during interactive sessions in the browser and communicates with the
same instance on the backend
types: a registry of types
functions: registry of known functions
referableFunctions: An array, or a promise for an array, with a list of functions that are available to be defered to
socket: the socket to communicate over
*/
export function socketInterpreterProvider({
types,
functions,
handlers,
referableFunctions,
socket,
}) {
// Return the interpet() function
return interpretProvider({
types,
functions,
handlers,
onFunctionNotFound: (ast, context) => {
// Get the name of the function that wasn't found
const functionName = ast.chain[0].function;
// Get the list of functions that are known elsewhere
return Promise.resolve(referableFunctions).then(referableFunctionMap => {
// Check if the not-found function is in the list of alternatives, if not, throw
if (!getByAlias(referableFunctionMap, functionName)) {
throw new Error(`Function not found: ${functionName}`);
}
// set a unique message ID so the code knows what response to process
const id = uuid();
return new Promise(resolve => {
const { serialize, deserialize } = serializeProvider(types);
// This will receive {type: [msgSuccess || msgError] value: foo}
// However it doesn't currently do anything with it. Which means `value`, regardless
// of failure or success, needs to be something the interpreters would logically return
// er, a primative or a {type: foo} object
const listener = resp => resolve(deserialize(resp.value));
socket.once(`resp:${id}`, listener);
// Go run the remaining AST and context somewhere else, meaning either the browser or the server, depending on
// where this file was loaded
socket.emit('run', { ast, context: serialize(context), id });
});
});
},
});
}

View file

@ -17,7 +17,7 @@
* under the License.
*/
export function createHandlers(/*socket*/) {
export function createHandlers() {
return {
environment: 'client',
};

View file

@ -18,6 +18,5 @@
*/
export { loadBrowserRegistries } from './browser_registries';
export { createSocket } from './socket';
export { initializeInterpreter } from './interpreter';
export { RenderFunctionsRegistry } from './render_functions_registry';

View file

@ -17,55 +17,47 @@
* under the License.
*/
import { socketInterpreterProvider } from '../common/interpreter/socket_interpret';
import { interpreterProvider } from '../common/interpreter/interpret';
import { serializeProvider } from '../common/lib/serialize';
import { createHandlers } from './create_handlers';
export async function initializeInterpreter(socket, typesRegistry, functionsRegistry) {
let resolve;
const functionList = new Promise(_resolve => (resolve = _resolve));
export const FUNCTIONS_URL = '/api/canvas/fns';
const getInitializedFunctions = async () => {
return functionList;
};
export async function initializeInterpreter(kfetch, typesRegistry, functionsRegistry) {
const serverFunctionList = await kfetch({ pathname: FUNCTIONS_URL });
// For every sever-side function, register a client-side
// function that matches its definition, but which simply
// calls the server-side function endpoint.
Object.keys(serverFunctionList).forEach(functionName => {
functionsRegistry.register(() => ({
...serverFunctionList[functionName],
async fn(context, args) {
const types = typesRegistry.toJS();
const { serialize } = serializeProvider(types);
const result = await kfetch({
pathname: `${FUNCTIONS_URL}/${functionName}`,
method: 'POST',
body: JSON.stringify({
args,
context: serialize(context),
}),
});
return result;
},
}));
});
const interpretAst = async (ast, context, handlers) => {
// Load plugins before attempting to get functions, otherwise this gets racey
const serverFunctionList = await functionList;
const interpretFn = await socketInterpreterProvider({
const interpretFn = await interpreterProvider({
types: typesRegistry.toJS(),
handlers: { ...handlers, ...createHandlers(socket) },
handlers: { ...handlers, ...createHandlers() },
functions: functionsRegistry.toJS(),
referableFunctions: serverFunctionList,
socket: socket,
});
return interpretFn(ast, context);
};
// Listen for interpreter runs
socket.on('run', ({ ast, context, id }) => {
const types = typesRegistry.toJS();
const { serialize, deserialize } = serializeProvider(types);
interpretAst(ast, deserialize(context)).then(value => {
socket.emit(`resp:${id}`, { value: serialize(value) });
});
});
// Create the function list
let gotFunctionList = false;
socket.once('functionList', (fl) => {
gotFunctionList = true;
resolve(fl);
});
const interval = setInterval(() => {
if (gotFunctionList) {
clearInterval(interval);
return;
}
socket.emit('getFunctionList');
}, 1000);
return { getInitializedFunctions, interpretAst };
return { interpretAst };
}

View file

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { initializeInterpreter, FUNCTIONS_URL } from './interpreter';
jest.mock('../common/interpreter/interpret', () => ({
interpreterProvider: () => () => ({}),
}));
jest.mock('../common/lib/serialize', () => ({
serializeProvider: () => ({ serialize: () => ({}) }),
}));
jest.mock('./create_handlers', () => ({
createHandlers: () => ({}),
}));
describe('kbn-interpreter/interpreter', () => {
it('loads server-side functions', async () => {
const kfetch = jest.fn(async () => ({}));
await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register: () => {} }));
expect(kfetch).toHaveBeenCalledTimes(1);
expect(kfetch).toHaveBeenCalledWith({ pathname: FUNCTIONS_URL });
});
it('registers client-side functions that pass through to the server', async () => {
const kfetch = jest.fn(async () => ({
hello: { name: 'hello' },
world: { name: 'world' },
}));
const register = jest.fn();
await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register }));
expect(register).toHaveBeenCalledTimes(2);
const [ hello, world ] = register.mock.calls.map(([fn]) => fn());
expect(hello.name).toEqual('hello');
expect(typeof hello.fn).toEqual('function');
expect(world.name).toEqual('world');
expect(typeof world.fn).toEqual('function');
const context = {};
const args = { quote: 'All we have to decide is what to do with the time that is given us.' };
await hello.fn(context, args);
expect(kfetch).toHaveBeenCalledWith({
pathname: `${FUNCTIONS_URL}/hello`,
method: 'POST',
body: JSON.stringify({ args, context }),
});
});
});

View file

@ -1,63 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import io from 'socket.io-client';
const SOCKET_CONNECTION_TIMEOUT = 5000; // timeout in ms
export async function createSocket(basePath, functionsRegistry) {
return new Promise((resolve, reject) => {
const socket = io({
path: `${basePath}/socket.io`,
transports: ['polling'],
transportOptions: {
polling: {
extraHeaders: {
'kbn-xsrf': 'professionally-crafted-string-of-text',
},
},
},
timeout: SOCKET_CONNECTION_TIMEOUT,
// ensure socket.io always tries polling first, otherwise auth will fail
rememberUpgrade: false,
});
socket.on('getFunctionList', () => {
socket.emit('functionList', functionsRegistry.toJS());
});
socket.on('connect', () => {
resolve(socket);
socket.off('connectionFailed', errorHandler);
socket.off('connect_error', errorHandler);
socket.off('connect_timeout', errorHandler);
});
function errorHandler(err) {
// 'connectionFailed' returns an object with a reason prop
// other error cases provide their own error
reject(err.reason ? new Error(err.reason) : err);
}
socket.on('connectionFailed', errorHandler);
socket.on('connect_error', errorHandler);
socket.on('connect_timeout', errorHandler);
});
}

View file

@ -18,7 +18,8 @@
*/
import { initializeInterpreter, loadBrowserRegistries, createSocket } from '@kbn/interpreter/public';
import { initializeInterpreter, loadBrowserRegistries } from '@kbn/interpreter/public';
import { kfetch } from 'ui/kfetch';
import chrome from 'ui/chrome';
import { functions } from './functions';
import { functionsRegistry } from './functions_registry';
@ -46,9 +47,8 @@ let _interpreterPromise;
const initialize = async () => {
await loadBrowserRegistries(types, basePath);
const socket = await createSocket(basePath, functionsRegistry);
initializeInterpreter(socket, typesRegistry, functionsRegistry).then(interpreter => {
_resolve({ interpreter, socket });
initializeInterpreter(kfetch, typesRegistry, functionsRegistry).then(interpreter => {
_resolve({ interpreter });
});
};
@ -64,8 +64,3 @@ export const interpretAst = async (...params) => {
const { interpreter } = await getInterpreter();
return await interpreter.interpretAst(...params);
};
export const updateInterpreterFunctions = async () => {
const { socket } = await getInterpreter();
socket.emit('updateFunctionList');
};

View file

@ -1,78 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { getInterpreter } from './interpreter';
import { createSocket } from '@kbn/interpreter/public';
import { functionsRegistry } from './functions_registry';
jest.mock('@kbn/interpreter/public', () => ({
createSocket: jest.fn(),
initializeInterpreter: jest.fn(() => Promise.resolve()),
loadBrowserRegistries: jest.fn(() => Promise.resolve())
}));
jest.mock('ui/chrome', () => ({
getBasePath: jest.fn(() => '/abc/s/123'),
getInjected: jest.fn(config => { // eslint-disable-line no-unused-vars
return config === 'serverBasePath' ? '/abc' : '/123';
}),
}));
jest.mock('./functions', () => ({
functions: [jest.fn()]
}));
jest.mock('./render_functions_registry', () => ({
renderFunctionsRegistry: {
register: jest.fn()
}
}));
jest.mock('./renderers/visualization', () => ({
visualization: jest.fn()
}));
jest.mock('./functions_registry', () => ({
functionsRegistry: {
register: jest.fn()
}
}));
jest.mock('./types_registry', () => ({
typesRegistry: jest.fn()
}));
describe('Core Interpreter', () => {
beforeEach(() => {
jest.resetModules();
});
describe('getInterpreter', () => {
it('calls createSocket with the correct arguments', async () => {
await getInterpreter();
expect(createSocket).toHaveBeenCalledTimes(1);
expect(createSocket).toHaveBeenCalledWith('/abc', functionsRegistry);
});
});
});

View file

@ -1,44 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import boom from 'boom';
import { API_ROUTE } from '../../common/constants';
export function getRequest(server, { headers }) {
const url = `${API_ROUTE}/ping`;
return server
.inject({
method: 'POST',
url,
headers,
})
.then(res => {
if (res.statusCode !== 200) {
if (process.env.NODE_ENV !== 'production') {
console.error(
new Error(`Auth request failed: [${res.statusCode}] ${res.result.message}`)
);
}
throw boom.unauthorized('Failed to authenticate socket connection');
}
return res.request;
});
}

View file

@ -1,67 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import uuid from 'uuid/v4';
export const browser = ({ socket, serialize, deserialize }) => {
// Note that we need to be careful about how many times routeExpressionProvider is called, because of the socket.once below.
// It's too bad we can't get a list of browser plugins on the server
let getFunctionsPromise;
socket.on('updateFunctionList', () => {
getFunctionsPromise = undefined;
});
const getFunctions = async () => {
if (!getFunctionsPromise) {
getFunctionsPromise = new Promise(resolve => {
socket.once('functionList', resolve);
socket.emit('getFunctionList');
});
}
return Object.keys(await getFunctionsPromise);
};
return {
interpret: (ast, context) => {
return new Promise(async (resolve, reject) => {
await getFunctions();
const id = uuid();
const listener = resp => {
if (resp.type === 'msgError') {
const { value } = resp;
// cast error strings back into error instances
const err = value instanceof Error ? value : new Error(value);
if (value.stack) err.stack = value.stack;
// Reject's with a legit error. Check! Environments should always reject with an error when something bad happens
reject(err);
} else {
resolve(deserialize(resp.value));
}
};
// {type: msgSuccess or msgError, value: foo}. Doesn't matter if it's success or error, we do the same thing for now
socket.once(`resp:${id}`, listener);
socket.emit('run', { ast, context: serialize(context), id });
});
}, getFunctions
};
};

View file

@ -17,19 +17,18 @@
* under the License.
*/
import { interpretProvider } from '@kbn/interpreter/common';
import { interpreterProvider } from '@kbn/interpreter/common';
import { createHandlers } from '../create_handlers';
export const server = async ({ onFunctionNotFound, server, request }) => {
export const server = async ({ server, request }) => {
const { serverFunctions, types } = server.plugins.interpreter;
return {
interpret: (ast, context) => {
const interpret = interpretProvider({
const interpret = interpreterProvider({
types: types.toJS(),
functions: serverFunctions.toJS(),
handlers: createHandlers(request, server),
onFunctionNotFound,
});
return interpret(ast, context);

View file

@ -1,21 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
require('../../../../../../../setup_node_env');
require('./worker');

View file

@ -1,125 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { fork } from 'child_process';
import { resolve } from 'path';
import uuid from 'uuid/v4';
// If the worker doesn't response in 10s, kill it.
const WORKER_TIMEOUT = 20000;
const workerPath = resolve(__dirname, 'babeled.js');
const heap = {};
let worker = null;
export function getWorker() {
if (worker) return worker;
worker = fork(workerPath, {});
// handle run requests
worker.on('message', msg => {
const { type, value, id } = msg;
if (type === 'run') {
const { threadId } = msg;
const { ast, context } = value;
if (heap[threadId]) {
heap[threadId]
.onFunctionNotFound(ast, context)
.then(value => {
worker.send({ type: 'msgSuccess', id, value: value });
})
.catch(e => heap[threadId].reject(e));
}
}
if (type === 'msgSuccess' && heap[id]) heap[id].resolve(value);
// TODO: I don't think it is even possible to hit this
if (type === 'msgError' && heap[id]) heap[id].reject(new Error(value));
});
// handle exit event, fired when we kill the worker or it just dies
worker.on('exit', () => {
// NOTE: No need to look for 'code' or 'signal', if it isn't running, it's an issue
// clean up any listeners on the worker before replacing it
worker.removeAllListeners();
// Restart immediately on exit since node takes a couple seconds to spin up
worker = null;
worker = getWorker();
});
return worker;
}
// All serialize/deserialize must occur in here. We should not return serialized stuff to the expressionRouter
export const thread = ({ onFunctionNotFound, serialize, deserialize }) => {
const getWorkerFunctions = new Promise(resolve => {
const worker = getWorker();
function functionListHandler(msg) {
// wait for the function list message
if (msg.type === 'functionList') {
// tear dowm the listener once the function list is provided
worker.removeListener('message', functionListHandler);
resolve(msg.value);
}
}
worker.on('message', functionListHandler);
worker.send({ type: 'getFunctions' });
});
return getWorkerFunctions.then(functions => {
return {
interpret: (ast, context) => {
const worker = getWorker();
const id = uuid();
worker.send({ type: 'run', id, value: { ast, context: serialize(context) } });
return new Promise((resolve, reject) => {
heap[id] = {
time: new Date().getTime(),
resolve: value => {
delete heap[id];
resolve(deserialize(value));
},
reject: e => {
delete heap[id];
reject(e);
},
onFunctionNotFound: (ast, context) =>
onFunctionNotFound(ast, deserialize(context)).then(serialize),
};
// kill the worker after the timeout is exceeded
setTimeout(() => {
if (!heap[id]) return; // Looks like this has already been cleared from the heap.
if (worker) worker.kill();
// The heap will be cleared because the reject on heap will delete its own id
heap[id].reject(new Error('Request timed out'));
}, WORKER_TIMEOUT);
});
},
getFunctions: () => functions,
};
});
};

View file

@ -1,86 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import uuid from 'uuid/v4';
import { populateServerRegistries } from '@kbn/interpreter/server';
import { interpretProvider, serializeProvider, FunctionsRegistry, TypesRegistry } from '@kbn/interpreter/common';
// We actually DO need populateServerRegistries here since this is a different node process
const registries = {
commonFunctions: new FunctionsRegistry(),
types: new TypesRegistry(),
};
const pluginsReady = populateServerRegistries(registries);
const heap = {};
process.on('message', msg => {
const { type, id, value } = msg;
const threadId = id;
pluginsReady.then(({ commonFunctions, types }) => {
types = types.toJS();
const { serialize, deserialize } = serializeProvider(types);
const interpret = interpretProvider({
types,
functions: commonFunctions.toJS(),
handlers: { environment: 'serverThreaded' },
onFunctionNotFound: (ast, context) => {
const id = uuid();
// This needs to send a message to the main thread, and receive a response. Uhg.
process.send({
type: 'run',
threadId,
id,
value: {
ast,
context: serialize(context),
},
});
// Note that there is no facility to reject here. That's because this would only occur as the result of something that happens in the main thread, and we reject there
return new Promise(resolve => {
heap[id] = { resolve };
});
},
});
if (type === 'getFunctions') {
process.send({ type: 'functionList', value: Object.keys(commonFunctions.toJS()) });
}
if (type === 'msgSuccess') {
heap[id].resolve(deserialize(value));
delete heap[id];
}
if (type === 'run') {
const { ast, context } = msg.value;
interpret(ast, deserialize(context))
.then(value => {
process.send({ type: 'msgSuccess', value: serialize(value), id });
})
// TODO: I don't think it is even possible to hit this
.catch(value => {
process.send({ type: 'msgError', value, id });
});
}
});
});

View file

@ -17,12 +17,12 @@
* under the License.
*/
import { socketApi } from './socket';
import { translate } from './translate';
import { plugins } from './plugins';
import { registerServerFunctions } from './server_functions';
export function routes(server) {
plugins(server);
socketApi(server);
translate(server);
registerServerFunctions(server);
}

View file

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import Boom from 'boom';
import { serializeProvider } from '@kbn/interpreter/common';
import { API_ROUTE } from '../../common/constants';
import { createHandlers } from '../lib/create_handlers';
export function registerServerFunctions(server) {
// Execute functions, kind of RPC like.
server.route({
method: 'POST',
path: `${API_ROUTE}/fns/{functionName}`,
async handler(req) {
const types = server.plugins.interpreter.types.toJS();
const { deserialize } = serializeProvider(types);
const { functionName } = req.params;
const { args, context } = req.payload;
const fnDef = server.plugins.interpreter.serverFunctions.toJS()[functionName];
if (!fnDef) {
throw Boom.notFound(`Function "${functionName}" could not be found.`);
}
const handlers = await createHandlers(req, server);
const result = await fnDef.fn(deserialize(context), args, handlers);
return result;
},
});
// Give the client the list of server-functions.
server.route({
method: 'GET',
path: `${API_ROUTE}/fns`,
handler() {
return server.plugins.interpreter.serverFunctions.toJS();
},
});
}

View file

@ -1,93 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import socket from 'socket.io';
import { serializeProvider } from '@kbn/interpreter/common';
import { routeExpressionProvider } from '../lib/route_expression/index';
import { browser } from '../lib/route_expression/browser';
import { thread } from '../lib/route_expression/thread/index';
import { server as serverEnv } from '../lib/route_expression/server';
import { getRequest } from '../lib/get_request';
import { API_ROUTE } from '../../common/constants';
async function getModifiedRequest(server, socket) {
try {
return await getRequest(server, socket.handshake);
} catch (err) {
// on errors, notify the client and close the connection
socket.emit('connectionFailed', { reason: err.message || 'Socket connection failed' });
socket.disconnect(true);
return false;
}
}
export function socketApi(server) {
// add a POST ping route for `getRequest` to use
// TODO: remove this once we have upstream socket support
server.route({
method: 'POST',
path: `${API_ROUTE}/ping`,
handler: () => 'pong',
});
const io = socket(server.listener, {
path: '/socket.io',
transports: ['polling'],
});
io.on('connection', async socket => {
// 'request' is the modified hapi request object
const request = await getModifiedRequest(server, socket);
if (!request) return; // do nothing without the request object
const types = server.plugins.interpreter.types.toJS();
const { serialize, deserialize } = serializeProvider(types);
// I'd love to find a way to generalize all of these, but they each need a different set of things
// Note that ORDER MATTERS here. The environments will be tried in this order. Do not reorder this array.
const routeExpression = routeExpressionProvider([
thread({ onFunctionNotFound, serialize, deserialize }),
serverEnv({ onFunctionNotFound, request, server }),
browser({ onFunctionNotFound, socket, serialize, deserialize }),
]);
function onFunctionNotFound(ast, context) {
return routeExpression(ast, context);
}
socket.on('getFunctionList', () => {
socket.emit('functionList', server.plugins.interpreter.serverFunctions.toJS());
});
socket.on('run', async ({ ast, context, id }) => {
try {
const value = await routeExpression(ast, deserialize(context));
socket.emit(`resp:${id}`, { type: 'msgSuccess', value: serialize(value) });
} catch (err) {
// TODO: I don't think it is possible to hit this right now? Maybe ever?
socket.emit(`resp:${id}`, { type: 'msgError', value: err });
}
});
socket.on('disconnect', () => {
// remove all listeners on disconnect
socket.removeAllListeners();
});
});
}

View file

@ -5,7 +5,7 @@
*/
import { functionsRegistry } from 'plugins/interpreter/functions_registry';
import { getInterpreter, updateInterpreterFunctions } from 'plugins/interpreter/interpreter';
import { getInterpreter } from 'plugins/interpreter/interpreter';
import { loadBrowserRegistries } from '@kbn/interpreter/public';
import { connect } from 'react-redux';
import { compose, withProps } from 'recompose';
@ -55,7 +55,7 @@ const mapDispatchToProps = dispatch => ({
await getInterpreter();
// initialize the socket and interpreter
loadPrivateBrowserFunctions(functionsRegistry);
await updateInterpreterFunctions();
await loadBrowserRegistries(types, basePath);
// set app state to ready

View file

@ -19702,7 +19702,7 @@ socket.io-adapter@~1.1.0:
resolved "https://registry.yarnpkg.com/socket.io-adapter/-/socket.io-adapter-1.1.1.tgz#2a805e8a14d6372124dd9159ad4502f8cb07f06b"
integrity sha1-KoBeihTWNyEk3ZFZrUUC+MsH8Gs=
socket.io-client@2.1.1, socket.io-client@^2.1.1:
socket.io-client@2.1.1:
version "2.1.1"
resolved "https://registry.yarnpkg.com/socket.io-client/-/socket.io-client-2.1.1.tgz#dcb38103436ab4578ddb026638ae2f21b623671f"
integrity sha512-jxnFyhAuFxYfjqIgduQlhzqTcOEQSn+OHKVfAxWaNWa7ecP7xSNk2Dx/3UEsDcY7NcFafxvNvKPmmO7HTwTxGQ==
@ -19731,7 +19731,7 @@ socket.io-parser@~3.2.0:
debug "~3.1.0"
isarray "2.0.1"
socket.io@2.1.1, socket.io@^2.1.1:
socket.io@2.1.1:
version "2.1.1"
resolved "https://registry.yarnpkg.com/socket.io/-/socket.io-2.1.1.tgz#a069c5feabee3e6b214a75b40ce0652e1cfb9980"
integrity sha512-rORqq9c+7W0DAK3cleWNSyfv/qKXV99hV4tZe+gGLfBECw3XEhBy7x85F3wypA9688LKjtwO9pX9L33/xQI8yA==