Provide events in & out per millisecond for Logstash pipelines (#19446)

* Port changes to dedicated branch to isolate PR.

* Rename variable in test to match function name.
This commit is contained in:
Justin Kambic 2018-05-25 15:26:02 -04:00 committed by GitHub
parent b57aed821a
commit aec1904685
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 14 deletions

View file

@ -40,11 +40,17 @@ describe('PluginVertex', () => {
[ 1516667386000, 0.3 ] [ 1516667386000, 0.3 ]
] ]
}, },
events_per_millisecond: { events_in_per_millisecond: {
data: [ data: [
[ 1516667383000, 0.01 ], [ 1516667383000, 0.01 ],
[ 1516667386000, 0.02 ] [ 1516667386000, 0.02 ]
] ]
},
events_out_per_millisecond: {
data: [
[ 1516667383000, 0.01 ],
[ 1516667386000, 0.03 ]
]
} }
} }
}; };

View file

@ -57,7 +57,9 @@ export class PluginVertex extends Vertex {
} }
get eventsPerSecond() { get eventsPerSecond() {
const eventsPerMillisecond = this.stats.events_per_millisecond; const eventsPerMillisecond = this.isInput
? this.stats.events_out_per_millisecond
: this.stats.events_in_per_millisecond;
return { return {
...omit(eventsPerMillisecond, 'data'), ...omit(eventsPerMillisecond, 'data'),
data: get(eventsPerMillisecond, 'data', []).map(([x, y]) => [x, y * 1000]) data: get(eventsPerMillisecond, 'data', []).map(([x, y]) => [x, y * 1000])

View file

@ -15,7 +15,7 @@ describe('get_pipeline', () => {
let vertex; let vertex;
let vertexStatsBucket; let vertexStatsBucket;
let totalProcessorsDurationInMillis; let totalProcessorsDurationInMillis;
let timeboundsInMillis; let timeseriesIntervalInSeconds;
beforeEach(() => { beforeEach(() => {
vertex = { vertex = {
@ -31,16 +31,16 @@ describe('get_pipeline', () => {
}; };
totalProcessorsDurationInMillis = 24000; totalProcessorsDurationInMillis = 24000;
timeboundsInMillis = 15 * 60 * 1000; timeseriesIntervalInSeconds = 15 * 60;
}); });
it('returns correct stats', () => { it('returns correct stats', () => {
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeboundsInMillis); const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
expect(result).to.eql({ expect(result).to.eql({
events_in: 10000, events_in: 10000,
events_out: 9000, events_out: 9000,
duration_in_millis: 18000, duration_in_millis: 18000,
events_per_millisecond: 0.00001, events_out_per_millisecond: 0.01,
millis_per_event: 2, millis_per_event: 2,
queue_push_duration_in_millis: 100000, queue_push_duration_in_millis: 100000,
queue_push_duration_in_millis_per_event: 11.11111111111111 queue_push_duration_in_millis_per_event: 11.11111111111111
@ -55,12 +55,13 @@ describe('get_pipeline', () => {
}); });
it('returns correct stats', () => { it('returns correct stats', () => {
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeboundsInMillis); const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
expect(result).to.eql({ expect(result).to.eql({
events_in: 10000, events_in: 10000,
events_out: 9000, events_out: 9000,
duration_in_millis: 18000, duration_in_millis: 18000,
events_per_millisecond: 0.000011111111111111112, events_in_per_millisecond: 0.011111111111111112,
events_out_per_millisecond: 0.01,
millis_per_event: 1.8, millis_per_event: 1.8,
percent_of_total_processor_duration: 0.75 percent_of_total_processor_duration: 0.75
}); });
@ -75,12 +76,13 @@ describe('get_pipeline', () => {
}); });
it('returns correct stats', () => { it('returns correct stats', () => {
const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeboundsInMillis); const result = _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationInMillis, timeseriesIntervalInSeconds);
expect(result).to.eql({ expect(result).to.eql({
events_in: 10000, events_in: 10000,
events_out: 9000, events_out: 9000,
duration_in_millis: 18000, duration_in_millis: 18000,
events_per_millisecond: 0.000011111111111111112, events_in_per_millisecond: 0.011111111111111112,
events_out_per_millisecond: 0.01,
millis_per_event: 1.8, millis_per_event: 1.8,
percent_of_total_processor_duration: 0.75 percent_of_total_processor_duration: 0.75
}); });
@ -289,7 +291,7 @@ describe('get_pipeline', () => {
max: 1516135440463 max: 1516135440463
} }
}, },
events_per_millisecond: { events_out_per_millisecond: {
data: [ data: [
[ 1516131120000, 0.03333333333333333 ], [ 1516131120000, 0.03333333333333333 ],
[ 1516131180000, 0.06666666666666667 ] [ 1516131180000, 0.06666666666666667 ]
@ -367,7 +369,17 @@ describe('get_pipeline', () => {
max: 1516135440463 max: 1516135440463
} }
}, },
events_per_millisecond: { events_in_per_millisecond: {
data: [
[1516131120000, 0.03333333333333333],
[1516131180000, 0.06666666666666667]
],
timeRange: {
min: 1516131138639,
max: 1516135440463
}
},
events_out_per_millisecond: {
data: [ data: [
[ 1516131120000, 0.03333333333333333 ], [ 1516131120000, 0.03333333333333333 ],
[ 1516131180000, 0.06666666666666667 ] [ 1516131180000, 0.06666666666666667 ]

View file

@ -17,6 +17,8 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
const isInput = vertex.plugin_type === 'input'; const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output'; const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
const timeseriesIntervalInMillis = timeseriesIntervalInSeconds * 1000;
const eventsInTotal = vertexStatsBucket.events_in_total.value; const eventsInTotal = vertexStatsBucket.events_in_total.value;
const eventsOutTotal = get(vertexStatsBucket, 'events_out_total.value', null); const eventsOutTotal = get(vertexStatsBucket, 'events_out_total.value', null);
@ -24,6 +26,9 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
const inputStats = {}; const inputStats = {};
const processorStats = {}; const processorStats = {};
const eventsProcessedStats = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis
};
let eventsTotal; let eventsTotal;
@ -36,16 +41,17 @@ export function _vertexStats(vertex, vertexStatsBucket, totalProcessorsDurationI
if (isProcessor) { if (isProcessor) {
eventsTotal = eventsInTotal; eventsTotal = eventsInTotal;
processorStats.percent_of_total_processor_duration = durationInMillis / totalProcessorsDurationInMillis; processorStats.percent_of_total_processor_duration = durationInMillis / totalProcessorsDurationInMillis;
eventsProcessedStats.events_in_per_millisecond = eventsInTotal / timeseriesIntervalInMillis;
} }
return { return {
events_in: eventsInTotal, events_in: eventsInTotal,
events_out: eventsOutTotal, events_out: eventsOutTotal,
duration_in_millis: durationInMillis, duration_in_millis: durationInMillis,
events_per_millisecond: eventsTotal / (timeseriesIntervalInSeconds * 1000),
millis_per_event: durationInMillis / eventsTotal, millis_per_event: durationInMillis / eventsTotal,
...inputStats, ...inputStats,
...processorStats ...processorStats,
...eventsProcessedStats
}; };
} }