[Monitoring] Fix inaccuracies in logstash pipeline listing metrics (#55868)

* Change how we fetch pipeline listing metrics to match what other charts show

* Fix tests

* Fix tests

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Chris Roberson 2020-01-28 11:29:16 -05:00 committed by GitHub
parent 9685eca401
commit 30dbdf7350
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 149 additions and 309 deletions

View file

@ -34,6 +34,7 @@ import { checkCcrEnabled } from '../elasticsearch/ccr';
import { getStandaloneClusterDefinition, hasStandaloneClusters } from '../standalone_clusters';
import { getLogTypes } from '../logs';
import { isInCodePath } from './is_in_code_path';
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';
/**
* Get all clusters or the cluster associated with {@code clusterUuid} when it is defined.
@ -53,6 +54,8 @@ export async function getClustersFromRequest(
filebeatIndexPattern,
} = indexPatterns;
const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
const isStandaloneCluster = clusterUuid === STANDALONE_CLUSTER_CLUSTER_UUID;
let clusters = [];
@ -158,25 +161,27 @@ export async function getClustersFromRequest(
});
// add logstash data
const logstashes = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
? await getLogstashForClusters(req, lsIndexPattern, clusters)
: [];
if (isInCodePath(codePaths, [CODE_PATH_LOGSTASH])) {
const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters);
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size);
const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, pipelines, [
'logstash_cluster_pipeline_nodes_count',
]);
// add the logstash data to each cluster
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });
const clusterPipelineNodesCount = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
? await getPipelines(req, lsIndexPattern, null, ['logstash_cluster_pipeline_nodes_count'])
: [];
// withhold LS overview stats until pipeline metrics have at least one full bucket
if (
logstash.clusterUuid === req.params.clusterUuid &&
clusterPipelineNodesCount.length === 0
) {
logstash.stats = {};
}
// add the logstash data to each cluster
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });
// withhold LS overview stats until pipeline metrics have at least one full bucket
if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0) {
logstash.stats = {};
}
set(clusters[clusterIndex], 'logstash', logstash.stats);
});
set(clusters[clusterIndex], 'logstash', logstash.stats);
});
}
// add beats data
const beatsByCluster = isInCodePath(codePaths, [CODE_PATH_BEATS])
@ -199,7 +204,6 @@ export async function getClustersFromRequest(
// check ccr configuration
const isCcrEnabled = await checkCcrEnabled(req, esIndexPattern);
const config = req.server.config();
const kibanaUuid = config.get('server.uuid');
return getClustersSummary(req.server, clusters, kibanaUuid, isCcrEnabled);

View file

@ -5,7 +5,7 @@
*/
import expect from '@kbn/expect';
import { handleGetPipelinesResponse, processPipelinesAPIResponse } from '../get_pipelines';
import { processPipelinesAPIResponse } from '../get_pipelines';
describe('processPipelinesAPIResponse', () => {
let response;
@ -13,6 +13,7 @@ describe('processPipelinesAPIResponse', () => {
response = {
pipelines: [
{
id: 1,
metrics: {
throughput_for_cluster: {
data: [
@ -22,8 +23,8 @@ describe('processPipelinesAPIResponse', () => {
},
nodes_count_for_cluster: {
data: [
[1513721903, 3],
[1513722162, 2],
[1513721903, { 1: 5 }],
[1513722162, { 1: 10 }],
],
},
},
@ -32,96 +33,27 @@ describe('processPipelinesAPIResponse', () => {
};
});
it('normalizes the metric keys', () => {
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
processedResponse => {
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
response.pipelines[0].metrics.throughput_for_cluster
);
expect(processedResponse.pipelines[0].metrics.nodesCount).to.eql(
response.pipelines[0].metrics.nodes_count_for_cluster
);
}
it('normalizes the metric keys', async () => {
const processedResponse = await processPipelinesAPIResponse(
response,
'throughput_for_cluster',
'nodes_count_for_cluster'
);
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
response.pipelines[0].metrics.throughput_for_cluster
);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][0]).to.eql(1513721903);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][1]).to.eql(5);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][0]).to.eql(1513722162);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][1]).to.eql(10);
});
it('computes the latest metrics', () => {
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
processedResponse => {
expect(processedResponse.pipelines[0].latestThroughput).to.eql(23);
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(2);
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(10);
}
);
});
});
describe('get_pipelines', () => {
let fetchPipelinesWithMetricsResult;
describe('fetchPipelinesWithMetrics result contains no pipelines', () => {
beforeEach(() => {
fetchPipelinesWithMetricsResult = {
logstash_pipeline_throughput: [
{
data: [],
},
],
logstash_pipeline_nodes_count: [
{
data: [],
},
],
};
});
it('returns an empty array', () => {
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
expect(result).to.eql([]);
});
});
describe('fetchPipelinesWithMetrics result contains pipelines', () => {
beforeEach(() => {
fetchPipelinesWithMetricsResult = {
logstash_pipeline_throughput: [
{
data: [[1513123151000, { apache_logs: 231, logstash_tweets: 34 }]],
},
],
logstash_pipeline_nodes_count: [
{
data: [[1513123151000, { apache_logs: 3, logstash_tweets: 1 }]],
},
],
};
});
it('returns the correct structure for a non-empty response', () => {
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
expect(result).to.eql([
{
id: 'apache_logs',
metrics: {
logstash_pipeline_throughput: {
data: [[1513123151000, 231]],
},
logstash_pipeline_nodes_count: {
data: [[1513123151000, 3]],
},
},
},
{
id: 'logstash_tweets',
metrics: {
logstash_pipeline_throughput: {
data: [[1513123151000, 34]],
},
logstash_pipeline_nodes_count: {
data: [[1513123151000, 1]],
},
},
},
]);
});
});
});

View file

@ -7,7 +7,6 @@
import { get } from 'lodash';
import { filter } from '../pagination/filter';
import { getLogstashPipelineIds } from './get_pipeline_ids';
import { handleGetPipelinesResponse } from './get_pipelines';
import { sortPipelines } from './sort_pipelines';
import { paginate } from '../pagination/paginate';
import { getMetrics } from '../details/get_metrics';
@ -51,19 +50,33 @@ export async function getPaginatedPipelines(
// the necessary sort - we only need the last bucket of data so we
// fetch the last two buckets of data (to ensure we have a single full bucekt),
// then return the value from that last bucket
const metricSeriesData = await getMetrics(
req,
lsIndexPattern,
metricSet,
[],
{ pageOfPipelines: pipelines },
2
const metricSeriesData = Object.values(
await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(
req,
lsIndexPattern,
metricSet,
[],
{
pipeline,
},
2
);
resolve({
id: pipeline.id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
});
});
})
)
);
const pipelineAggregationsData = handleGetPipelinesResponse(
metricSeriesData,
pipelines.map(p => p.id)
);
for (const pipelineAggregationData of pipelineAggregationsData) {
for (const pipelineAggregationData of metricSeriesData) {
for (const pipeline of pipelines) {
if (pipelineAggregationData.id === pipeline.id) {
for (const metric of metricSet) {

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { get, uniq } from 'lodash';
import { get } from 'lodash';
import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';
@ -26,7 +26,7 @@ export async function getLogstashPipelineIds(
index: logstashIndexPattern,
size: 0,
ignoreUnavailable: true,
filterPath: ['aggregations.nested_context.composite_data.buckets'],
filterPath: ['aggregations.nest.id.buckets'],
body: {
query: createQuery({
start,
@ -36,37 +36,28 @@ export async function getLogstashPipelineIds(
filters,
}),
aggs: {
nested_context: {
nest: {
nested: {
path: 'logstash_stats.pipelines',
},
aggs: {
composite_data: {
composite: {
id: {
terms: {
field: 'logstash_stats.pipelines.id',
size,
sources: [
{
id: {
},
aggs: {
unnest: {
reverse_nested: {},
aggs: {
nodes: {
terms: {
field: 'logstash_stats.pipelines.id',
field: 'logstash_stats.logstash.uuid',
size,
},
},
},
{
hash: {
terms: {
field: 'logstash_stats.pipelines.hash',
},
},
},
{
ephemeral_id: {
terms: {
field: 'logstash_stats.pipelines.ephemeral_id',
},
},
},
],
},
},
},
},
@ -77,8 +68,8 @@ export async function getLogstashPipelineIds(
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const response = await callWithRequest(req, 'search', params);
const data = get(response, 'aggregations.nested_context.composite_data.buckets', []).map(
bucket => bucket.key
);
return uniq(data, item => item.id);
return get(response, 'aggregations.nest.id.buckets', []).map(bucket => ({
id: bucket.key,
nodeIds: get(bucket, 'unnest.nodes.buckets', []).map(item => item.key),
}));
}

View file

@ -3,66 +3,10 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { cloneDeep, last, omit } from 'lodash';
import { cloneDeep, last } from 'lodash';
import { checkParam } from '../error_missing_required';
import { getMetrics } from '../details/get_metrics';
export function handleGetPipelinesResponse(response, exclusivePipelineIds) {
const pipelinesById = {};
const metrics = Object.keys(response);
metrics.forEach(metric => {
response[metric][0].data.forEach(([x, y]) => {
const pipelineIds = Object.keys(y);
pipelineIds.forEach(pipelineId => {
if (exclusivePipelineIds && !exclusivePipelineIds.includes(pipelineId)) {
return;
}
// Create new pipeline object if necessary
if (!pipelinesById.hasOwnProperty(pipelineId)) {
pipelinesById[pipelineId] = {
metrics: {},
};
}
const pipeline = pipelinesById[pipelineId];
// Create new metric object in pipeline object if necessary
if (!pipeline.metrics.hasOwnProperty(metric)) {
// Clone the metric object from the response so we don't accidentally overwrite it
// in the code further below. Also, reset data to empty array because we only want
// to keep data "y" values specific to this pipeline
pipeline.metrics[metric] = {
...omit(response[metric][0], 'data'),
data: [],
};
}
pipeline.metrics[metric].data.push([x, y[pipelineId]]);
});
});
});
// Convert pipelinesById map to array and preserve sorting
const pipelines = [];
if (exclusivePipelineIds) {
for (const exclusivePipelineId of exclusivePipelineIds) {
pipelines.push({
id: exclusivePipelineId,
...pipelinesById[exclusivePipelineId],
});
}
} else {
Object.keys(pipelinesById).forEach(pipelineId => {
pipelines.push({
id: pipelineId,
...pipelinesById[pipelineId],
});
});
}
return pipelines;
}
export async function processPipelinesAPIResponse(
response,
throughputMetricKey,
@ -76,7 +20,13 @@ export async function processPipelinesAPIResponse(
processedResponse.pipelines.forEach(pipeline => {
pipeline.metrics = {
throughput: pipeline.metrics[throughputMetricKey],
nodesCount: pipeline.metrics[nodesCountMetricKey],
nodesCount: {
...pipeline.metrics[nodesCountMetricKey],
data: pipeline.metrics[nodesCountMetricKey].data.map(item => [
item[0],
item[1][pipeline.id],
]),
},
};
pipeline.latestThroughput = last(pipeline.metrics.throughput.data)[1];
@ -86,24 +36,29 @@ export async function processPipelinesAPIResponse(
return processedResponse;
}
export async function getPipelines(
req,
logstashIndexPattern,
pipelineIds,
metricSet,
metricOptions = {}
) {
export async function getPipelines(req, logstashIndexPattern, pipelines, metricSet) {
checkParam(logstashIndexPattern, 'logstashIndexPattern in logstash/getPipelines');
checkParam(metricSet, 'metricSet in logstash/getPipelines');
const filters = [];
const metricsResponse = await getMetrics(
req,
logstashIndexPattern,
metricSet,
filters,
metricOptions
const metricsResponse = await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(req, logstashIndexPattern, metricSet, filters, {
pipeline,
});
resolve({
id: pipeline.id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
});
});
})
);
return handleGetPipelinesResponse(metricsResponse, pipelineIds);
return Object.values(metricsResponse);
}

View file

@ -3020,8 +3020,7 @@ Object {
},
"logstash_cluster_pipeline_throughput": LogstashPipelineThroughputMetric {
"app": "logstash",
"calculation": [Function],
"derivative": false,
"derivative": true,
"description": "Number of events emitted per second by the Logstash pipeline at the outputs stage.",
"field": "logstash_stats.pipelines.events.out",
"format": "0,0.[00]",
@ -3296,8 +3295,7 @@ Object {
},
"logstash_node_pipeline_throughput": LogstashPipelineThroughputMetric {
"app": "logstash",
"calculation": [Function],
"derivative": false,
"derivative": true,
"description": "Number of events emitted per second by the Logstash pipeline at the outputs stage.",
"field": "logstash_stats.pipelines.events.out",
"format": "0,0.[00]",

View file

@ -250,59 +250,45 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
constructor(opts) {
super({
...opts,
derivative: false,
derivative: true,
});
this.getDateHistogramSubAggs = ({ pageOfPipelines }) => ({
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines',
this.getDateHistogramSubAggs = ({ pipeline }) => {
return {
metric_deriv: {
derivative: {
buckets_path: 'sum',
gap_policy: 'skip',
unit: NORMALIZED_DERIVATIVE_UNIT,
},
},
aggs: {
by_pipeline_id: {
terms: {
field: 'logstash_stats.pipelines.id',
size: 1000,
include: pageOfPipelines.map(pipeline => pipeline.id),
},
aggs: {
throughput: {
sum_bucket: {
buckets_path: 'by_pipeline_hash>throughput',
},
sum: {
sum_bucket: {
buckets_path: 'by_node_id>nest>pipeline>events_stats',
},
},
by_node_id: {
terms: {
field: 'logstash_stats.logstash.uuid',
size: 1000,
include: pipeline.uuids,
},
aggs: {
nest: {
nested: {
path: 'logstash_stats.pipelines',
},
by_pipeline_hash: {
terms: {
field: 'logstash_stats.pipelines.hash',
size: 1000,
include: pageOfPipelines.map(pipeline => pipeline.hash),
},
aggs: {
throughput: {
sum_bucket: {
buckets_path: 'by_ephemeral_id>throughput',
aggs: {
pipeline: {
filter: {
term: {
'logstash_stats.pipelines.id': pipeline.id,
},
},
by_ephemeral_id: {
terms: {
field: 'logstash_stats.pipelines.ephemeral_id',
size: 1000,
include: pageOfPipelines.map(pipeline => pipeline.ephemeral_id),
},
aggs: {
events_stats: {
stats: {
field: this.field,
},
},
throughput: {
bucket_script: {
script: 'params.max - params.min',
buckets_path: {
min: 'events_stats.min',
max: 'events_stats.max',
},
},
aggs: {
events_stats: {
max: {
field: this.field,
},
},
},
@ -311,19 +297,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
},
},
},
},
});
this.calculation = (bucket, _key, _metric, bucketSizeInSeconds) => {
const pipelineThroughputs = {};
const pipelineBuckets = _.get(bucket, 'pipelines_nested.by_pipeline_id.buckets', []);
pipelineBuckets.forEach(pipelineBucket => {
pipelineThroughputs[pipelineBucket.key] = bucketSizeInSeconds
? _.get(pipelineBucket, 'throughput.value') / bucketSizeInSeconds
: undefined;
});
return pipelineThroughputs;
};
};
}
}

View file

@ -79,21 +79,8 @@ export function logstashClusterPipelinesRoute(server) {
queryText
);
// Just the IDs for the rest
const pipelineIds = pageOfPipelines.map(pipeline => pipeline.id);
const metricOptions = {
pageOfPipelines,
};
try {
const pipelineData = await getPipelines(
req,
lsIndexPattern,
pipelineIds,
metricSet,
metricOptions
);
const pipelineData = await getPipelines(req, lsIndexPattern, pageOfPipelines, metricSet);
const response = await processPipelinesAPIResponse(
{
pipelines: pipelineData,

View file

@ -78,22 +78,8 @@ export function logstashNodePipelinesRoute(server) {
sort,
queryText
);
// Just the IDs for the rest
const pipelineIds = pageOfPipelines.map(pipeline => pipeline.id);
const metricOptions = {
pageOfPipelines,
};
try {
const pipelineData = await getPipelines(
req,
lsIndexPattern,
pipelineIds,
metricSet,
metricOptions
);
const pipelineData = await getPipelines(req, lsIndexPattern, pageOfPipelines, metricSet);
const response = await processPipelinesAPIResponse(
{
pipelines: pipelineData,