[PipelineViewer] [Monitoring] Flatten Pipeline Statement Trees (#19084)

* Add flatten function and list class.

* Rename functions to be more descriptive.

* WIP moving flatten logic to classes.

* Finish moving flatten logic into classes, add tests.

* Simplify flattening, remove non-native dependency. Add more tests.

* Add defaults to simplify function call.

* Refactor two blocks into a function.

* Fixed broken test.

* Update classes based on PR feedback.

* Update module exports, remove obsolete file.
This commit is contained in:
Justin Kambic 2018-05-22 15:48:39 -04:00 committed by GitHub
parent c6bf945f2f
commit b1527d85dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 548 additions and 1 deletions

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export class Element {
constructor(id, statement, depth, parentId) {
this.id = id;
this.statement = statement;
this.depth = depth;
this.parentId = parentId;
}
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Element } from './element';
export class ElseElement extends Element {
constructor(statement, depth, parentId) {
const { id } = statement;
super(`${id}_else`, statement, depth, parentId);
}
}

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ElseElement } from './else_element';
describe('ElseElement', () => {
let statement;
let depth;
let parentId;
beforeEach(() => {
statement = {
id: 'statement_id'
};
depth = 2;
parentId = 'parent_id';
});
it('has expected props', () => {
const element = new ElseElement(statement, depth, parentId);
expect(element.id).toBe('statement_id_else');
expect(element.statement).toBe(statement);
expect(element.depth).toBe(depth);
expect(element.parentId).toBe(parentId);
});
});

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export function flattenPipelineSection(pipelineSection, depth = 0, parentId = null) {
const list = [];
pipelineSection.forEach(statement => {
list.push(...statement.toList(depth, parentId));
});
return list;
}

View file

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { flattenPipelineSection } from './flatten_pipeline_section';
import { PluginStatement } from '../pipeline/plugin_statement';
import { IfStatement } from '../pipeline/if_statement';
describe('flattenPipelineSection', () => {
let pipelineSection;
beforeEach(() => {
pipelineSection = [];
});
it('creates list for only-plugin pipeline section', () => {
pipelineSection = [
PluginStatement.fromPipelineGraphVertex({ id: 'first' }),
PluginStatement.fromPipelineGraphVertex({ id: 'second' })
];
const result = flattenPipelineSection(pipelineSection, 0, null);
expect(result).toHaveLength(2);
expect(result[0].parentId).toBe(null);
expect(result[0].depth).toBe(0);
expect(result[0].id).toBe('first');
expect(result[1].parentId).toBe(null);
expect(result[1].depth).toBe(0);
expect(result[1].id).toBe('second');
});
it('flattens pipeline with if statement', () => {
pipelineSection = [
PluginStatement.fromPipelineGraphVertex({ id: 'first' }),
new IfStatement(
{ id: 'if_parent' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'if_child1' })],
[]
)
];
const result = flattenPipelineSection(pipelineSection, 0, null);
expect(result).toHaveLength(3);
expect(result[0].id).toBe('first');
expect(result[1].id).toBe('if_parent');
expect(result[1].parentId).toBe(null);
expect(result[1].depth).toBe(0);
expect(result[2].id).toBe('if_child1');
expect(result[2].parentId).toBe('if_parent');
expect(result[2].depth).toBe(1);
});
it('flattens pipeline with else statement', () => {
pipelineSection = [
new IfStatement(
{ id: 'if_parent' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'if_child1' })],
[ PluginStatement.fromPipelineGraphVertex({ id: 'else_child1' })]
)
];
const result = flattenPipelineSection(pipelineSection, 0, null);
expect(result).toHaveLength(4);
expect(result[0].id).toBe('if_parent');
expect(result[1].id).toBe('if_child1');
expect(result[1].parentId).toBe('if_parent');
expect(result[1].depth).toBe(1);
expect(result[2].constructor.name).toBe('ElseElement');
expect(result[2].depth).toBe(0);
expect(result[2].parentId).toBe(null);
expect(result[3].id).toBe('else_child1');
expect(result[3].parentId).toBe('if_parent_else');
expect(result[3].depth).toBe(1);
});
it('flattens pipeline section with if/else and nested if', () => {
pipelineSection = [
new IfStatement(
{ id: 'if_parent' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'if_child1' })],
[
new IfStatement(
{ id: 'if_parent2' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'if_child2' })],
[]
),
PluginStatement.fromPipelineGraphVertex({ id: 'else_child1' })]
)
];
const result = flattenPipelineSection(pipelineSection, 0, null);
expect(result).toHaveLength(6);
expect(result[1].id).toBe('if_child1');
expect(result[1].depth).toBe(1);
expect(result[1].parentId).toBe('if_parent');
expect(result[2].id).toBe('if_parent_else');
expect(result[2].depth).toBe(0);
expect(result[2].parentId).toBe(null);
expect(result[3].id).toBe('if_parent2');
expect(result[3].depth).toBe(1);
expect(result[3].parentId).toBe('if_parent_else');
expect(result[4].id).toBe('if_child2');
expect(result[4].depth).toBe(2);
expect(result[4].parentId).toBe('if_parent2');
expect(result[5].id).toBe('else_child1');
expect(result[5].depth).toBe(1);
expect(result[5].parentId).toBe('if_parent_else');
});
});

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Element } from './element';
export class IfElement extends Element {
constructor(statement, depth, parentId) {
const { id } = statement;
super(id, statement, depth, parentId);
}
}

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { IfElement } from './if_element';
describe('IfElement', () => {
let statement;
let depth;
let parentId;
beforeEach(() => {
statement = {
id: 'statement_id'
};
depth = 1;
parentId = 'parent_id';
});
it('has expected props', () => {
const element = new IfElement(statement, depth, parentId);
expect(element.id).toBe('statement_id');
expect(element.statement).toBe(statement);
expect(element.depth).toBe(1);
expect(element.parentId).toBe('parent_id');
});
});

View file

@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { List } from './list';

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { flattenPipelineSection } from './flatten_pipeline_section';
export class List {
constructor(inputs, filters, outputs, queue) {
this.inputs = inputs;
this.filters = filters;
this.outputs = outputs;
this.queue = queue;
}
static fromPipeline(pipeline) {
const {
inputStatements,
filterStatements,
outputStatements,
queue
} = pipeline;
return new List(
flattenPipelineSection(inputStatements),
flattenPipelineSection(filterStatements),
flattenPipelineSection(outputStatements),
queue,
);
}
}

View file

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { List } from './list';
import { PluginStatement } from '../pipeline/plugin_statement';
import { IfStatement } from '../pipeline/if_statement';
import { PluginElement } from './plugin_element';
import { IfElement } from './if_element';
import { ElseElement } from './else_element';
describe('pipelineToList', () => {
let pipeline;
beforeEach(() => {
pipeline = {
inputStatements: [
PluginStatement.fromPipelineGraphVertex({ id: 'first' }),
],
filterStatements: [
new IfStatement(
{ id: 'if' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'if_child' })],
[ PluginStatement.fromPipelineGraphVertex({ id: 'else_child' })]
)
],
outputStatements: [
PluginStatement.fromPipelineGraphVertex({ id: 'output' })
],
queue: { }
};
});
it('creates list with element for each statement', () => {
const result = List.fromPipeline(pipeline);
const {
inputs,
filters,
outputs
} = result;
expect(inputs).toHaveLength(1);
expect(filters).toHaveLength(4);
expect(outputs).toHaveLength(1);
});
it('creates list with if in outputs', () => {
pipeline.outputStatements.push(
new IfStatement(
{ id: 'output_if' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'output_true' })],
[ PluginStatement.fromPipelineGraphVertex({ id: 'output_else' })]
)
);
const result = List.fromPipeline(pipeline);
const {
inputs,
filters,
outputs
} = result;
expect(inputs).toHaveLength(1);
expect(filters).toHaveLength(4);
expect(outputs).toHaveLength(5);
expect(outputs[0]).toBeInstanceOf(PluginElement);
expect(outputs[1]).toBeInstanceOf(IfElement);
expect(outputs[2]).toBeInstanceOf(PluginElement);
expect(outputs[3]).toBeInstanceOf(ElseElement);
expect(outputs[4]).toBeInstanceOf(PluginElement);
});
it('creates list for multi-nested if/else statements in filter section', () => {
pipeline.filterStatements = [
new IfStatement(
{ id: 'filter_if_1' },
[
new IfStatement(
{ id: 'filter_if_2' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'plugin_1' })],
[ ]
)
],
[
new IfStatement(
{ id: 'filter_if_3' },
[ PluginStatement.fromPipelineGraphVertex({ id: 'plugin_2' })],
[ PluginStatement.fromPipelineGraphVertex({ id: 'plugin_3' })]
)
]
)
];
const result = List.fromPipeline(pipeline);
const {
inputs,
filters,
outputs
} = result;
expect(inputs).toHaveLength(1);
expect(outputs).toHaveLength(1);
expect(filters).toHaveLength(8);
expect(filters[0].id).toBe('filter_if_1');
expect(filters[0].depth).toBe(0);
expect(filters[0].parentId).toBe(null);
expect(filters[0]).toBeInstanceOf(IfElement);
expect(filters[1].id).toBe('filter_if_2');
expect(filters[1].depth).toBe(1);
expect(filters[1].parentId).toBe('filter_if_1');
expect(filters[1]).toBeInstanceOf(IfElement);
expect(filters[2].id).toBe('plugin_1');
expect(filters[2].depth).toBe(2);
expect(filters[2].parentId).toBe('filter_if_2');
expect(filters[2]).toBeInstanceOf(PluginElement);
expect(filters[3].id).toBe('filter_if_1_else');
expect(filters[3].depth).toBe(0);
expect(filters[3].parentId).toBe(null);
expect(filters[3]).toBeInstanceOf(ElseElement);
expect(filters[4].id).toBe('filter_if_3');
expect(filters[4].depth).toBe(1);
expect(filters[4].parentId).toBe('filter_if_1_else');
expect(filters[4]).toBeInstanceOf(IfElement);
expect(filters[5].id).toBe('plugin_2');
expect(filters[5].depth).toBe(2);
expect(filters[5].parentId).toBe('filter_if_3');
expect(filters[5]).toBeInstanceOf(PluginElement);
expect(filters[6].id).toBe('filter_if_3_else');
expect(filters[6].depth).toBe(1);
expect(filters[6].parentId).toBe('filter_if_1_else');
expect(filters[6]).toBeInstanceOf(ElseElement);
expect(filters[7].id).toBe('plugin_3');
expect(filters[7].depth).toBe(2);
expect(filters[7].parentId).toBe('filter_if_3_else');
expect(filters[7]).toBeInstanceOf(PluginElement);
});
});

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Element } from './element';
export class PluginElement extends Element {
constructor(statement, depth, parentId) {
const { id } = statement;
super(id, statement, depth, parentId);
}
}

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { PluginElement } from './plugin_element';
describe('PluginElement', () => {
let statement;
let depth;
let parentId;
beforeEach(() => {
statement = {
id: 'statement_id'
};
depth = 1;
parentId = 'parent_id';
});
it('has expected props', () => {
const element = new PluginElement(statement, depth, parentId);
expect(element.id).toBe('statement_id');
expect(element.statement).toBe(statement);
expect(element.depth).toBe(1);
expect(element.parentId).toBe('parent_id');
});
});

View file

@ -7,6 +7,8 @@
import expect from 'expect.js';
import { IfStatement } from '../if_statement';
import { PluginVertex } from '../../graph/plugin_vertex';
import { IfElement } from '../../list/if_element';
import { PluginElement } from '../../list/plugin_element';
describe('IfStatement class', () => {
let ifVertex;
@ -65,6 +67,7 @@ describe('IfStatement class', () => {
expect(ifStatement.trueStatements.length).to.be(1);
expect(ifStatement.elseStatements).to.be.an(Array);
expect(ifStatement.elseStatements.length).to.be(0);
expect(ifStatement.vertex).to.eql(ifVertex);
});
});
@ -107,6 +110,7 @@ describe('IfStatement class', () => {
expect(ifStatement.trueStatements.length).to.be(1);
expect(ifStatement.elseStatements).to.be.an(Array);
expect(ifStatement.elseStatements.length).to.be(1);
expect(ifStatement.vertex).to.eql(ifVertex);
});
});
@ -147,6 +151,7 @@ describe('IfStatement class', () => {
expect(ifStatement.trueStatements.length).to.be(2);
expect(ifStatement.elseStatements).to.be.an(Array);
expect(ifStatement.elseStatements.length).to.be(0);
expect(ifStatement.vertex).to.eql(ifVertex);
});
});
@ -195,6 +200,32 @@ describe('IfStatement class', () => {
expect(ifStatement.trueStatements.length).to.be(1);
expect(ifStatement.elseStatements).to.be.an(Array);
expect(ifStatement.elseStatements.length).to.be(2);
expect(ifStatement.vertex).to.eql(ifVertex);
});
});
describe('toList', () => {
beforeEach(() => {
const esVertex = new PluginVertex({ edgesByFrom: {} }, { id: 'es_output' });
esVertex.pipelineStage = 'output';
ifVertex.trueOutgoingVertices = [esVertex];
ifVertex.falseOutgoingVertices = [];
});
it('creates list and sub-list for nested statements', () => {
const ifStatement = IfStatement.fromPipelineGraphVertex(ifVertex, pipelineStage);
const result = ifStatement.toList(0, 'output');
expect(result).to.be.an(Array);
expect(result.length).to.be(2);
expect(result[0]).to.be.an(IfElement);
expect(result[0].id).to.be('0aef421');
expect(result[1]).to.be.an(PluginElement);
const plugin = result[1];
expect(plugin).to.be.an(PluginElement);
expect(plugin.id).to.be('es_output');
});
});
});

View file

@ -39,6 +39,17 @@ describe('PluginStatement class', () => {
expect(pluginStatement.meta).to.be(meta);
expect(pluginStatement.pluginType).to.be('output');
expect(pluginStatement.name).to.be('elasticsearch');
expect(pluginStatement.vertex).to.eql(pluginVertex);
});
});
describe('toList', () => {
it('creates a list with plugin statement in it', () => {
const pluginStatement = PluginStatement.fromPipelineGraphVertex(pluginVertex);
const result = pluginStatement.toList();
expect(result.length).to.be(1);
expect(result[0].id).to.be('es_output');
});
});
});

View file

@ -37,6 +37,7 @@ describe('Queue class', () => {
expect(queue.stats).to.eql({});
expect(queue.meta).to.be(meta);
expect(queue).to.be.a(Queue);
expect(queue.vertex).to.eql(queueVertex);
});
});
});
});

View file

@ -7,6 +7,8 @@
import { Statement } from './statement';
import { makeStatement } from './make_statement';
import { isVertexPipelineStage } from './utils';
import { IfElement } from '../list/if_element';
import { ElseElement } from '../list/else_element';
function makeStatementsForOutgoingVertices(outgoingVertices, statements, next, pipelineStage) {
outgoingVertices.forEach(vertex => {
@ -18,6 +20,12 @@ function makeStatementsForOutgoingVertices(outgoingVertices, statements, next, p
});
}
function addStatementsToList(list, statements, depth, id) {
statements.forEach(statement => {
list.push(...statement.toList(depth, id));
});
}
export class IfStatement extends Statement {
constructor(vertex, trueStatements, elseStatements) {
super(vertex);
@ -29,6 +37,25 @@ export class IfStatement extends Statement {
this.elseStatements = elseStatements;
}
toList(depth, parentId) {
const list = [];
const ifElement = new IfElement(this, depth, parentId);
list.push(ifElement);
const nestedElementDepth = depth + 1;
addStatementsToList(list, this.trueStatements, nestedElementDepth, ifElement.id);
if (this.elseStatements.length) {
const elseElement = new ElseElement(this, depth, parentId);
list.push(elseElement);
addStatementsToList(list, this.elseStatements, nestedElementDepth, elseElement.id);
}
return list;
}
static fromPipelineGraphVertex(ifVertex, pipelineStage) {
const trueStatements = [];
const elseStatements = [];

View file

@ -5,6 +5,7 @@
*/
import { Statement } from './statement';
import { PluginElement } from '../list/plugin_element';
export class PluginStatement extends Statement {
constructor(vertex) {
@ -19,6 +20,10 @@ export class PluginStatement extends Statement {
this.name = name; // twitter, grok, elasticsearch, etc.
}
toList(depth, parentId) {
return [ new PluginElement(this, depth, parentId) ];
}
static fromPipelineGraphVertex(pluginVertex) {
return new PluginStatement(
pluginVertex