Update logstash files to ts where we read from source (#86787)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Chris Roberson 2021-01-12 11:47:49 -05:00 committed by GitHub
parent 04d4d8523d
commit fc994dfe97
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 146 additions and 67 deletions

View file

@ -24,9 +24,40 @@ export interface ElasticsearchSourceKibanaStats {
}; };
} }
export interface ElasticsearchSourceLogstashPipelineVertex {
id: string;
plugin_type: string;
stats?: {
[key: string]: {
data?: any[];
};
};
}
export interface ElasticsearchSource { export interface ElasticsearchSource {
timestamp: string; timestamp: string;
kibana_stats?: ElasticsearchSourceKibanaStats; kibana_stats?: ElasticsearchSourceKibanaStats;
logstash_state?: {
pipeline?: {
representation?: {
graph?: {
vertices?: ElasticsearchSourceLogstashPipelineVertex[];
};
};
};
};
logstash_stats?: {
timestamp?: string;
logstash?: {};
events?: {};
reloads?: {};
queue?: {
type?: string;
};
jvm?: {
uptime_in_millis?: number;
};
};
beats_stats?: { beats_stats?: {
timestamp?: string; timestamp?: string;
beat?: { beat?: {

View file

@ -4,24 +4,31 @@
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
import { get, merge } from 'lodash'; import { merge } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required'; import { checkParam } from '../error_missing_required';
import { calculateAvailability } from './../calculate_availability'; // @ts-ignore
import { calculateAvailability } from '../calculate_availability';
import { LegacyRequest, ElasticsearchResponse } from '../../types';
export function handleResponse(resp) { export function handleResponse(resp: ElasticsearchResponse) {
const source = get(resp, 'hits.hits[0]._source.logstash_stats'); const source = resp.hits?.hits[0]?._source?.logstash_stats;
const logstash = get(source, 'logstash'); const logstash = source?.logstash;
const info = merge(logstash, { const info = merge(logstash, {
availability: calculateAvailability(get(source, 'timestamp')), availability: calculateAvailability(source?.timestamp),
events: get(source, 'events'), events: source?.events,
reloads: get(source, 'reloads'), reloads: source?.reloads,
queue_type: get(source, 'queue.type'), queue_type: source?.queue?.type,
uptime: get(source, 'jvm.uptime_in_millis'), uptime: source?.jvm?.uptime_in_millis,
}); });
return info; return info;
} }
export function getNodeInfo(req, lsIndexPattern, { clusterUuid, logstashUuid }) { export function getNodeInfo(
req: LegacyRequest,
lsIndexPattern: string,
{ clusterUuid, logstashUuid }: { clusterUuid: string; logstashUuid: string }
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getNodeInfo'); checkParam(lsIndexPattern, 'lsIndexPattern in getNodeInfo');
const params = { const params = {

View file

@ -6,16 +6,24 @@
import boom from '@hapi/boom'; import boom from '@hapi/boom';
import { get } from 'lodash'; import { get } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required'; import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document'; import { getPipelineStateDocument } from './get_pipeline_state_document';
// @ts-ignore
import { getPipelineStatsAggregation } from './get_pipeline_stats_aggregation'; import { getPipelineStatsAggregation } from './get_pipeline_stats_aggregation';
// @ts-ignore
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
import { LegacyRequest } from '../../types';
import {
ElasticsearchSource,
ElasticsearchSourceLogstashPipelineVertex,
} from '../../../common/types/es';
export function _vertexStats( export function _vertexStats(
vertex, vertex: ElasticsearchSourceLogstashPipelineVertex,
vertexStatsBucket, vertexStatsBucket: any,
totalProcessorsDurationInMillis, totalProcessorsDurationInMillis: number,
timeseriesIntervalInSeconds timeseriesIntervalInSeconds: number
) { ) {
const isInput = vertex.plugin_type === 'input'; const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output'; const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
@ -27,8 +35,11 @@ export function _vertexStats(
const durationInMillis = vertexStatsBucket.duration_in_millis_total.value; const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;
const processorStats = {}; const processorStats: any = {};
const eventsProcessedStats = { const eventsProcessedStats: {
events_out_per_millisecond: number;
events_in_per_millisecond?: number;
} = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis, events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis,
}; };
@ -63,14 +74,14 @@ export function _vertexStats(
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds * @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/ */
export function _enrichStateWithStatsAggregation( export function _enrichStateWithStatsAggregation(
stateDocument, stateDocument: ElasticsearchSource,
statsAggregation, statsAggregation: any,
timeseriesIntervalInSeconds timeseriesIntervalInSeconds: number
) { ) {
const logstashState = stateDocument.logstash_state; const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices; const vertices = logstashState?.pipeline?.representation?.graph?.vertices ?? [];
const verticesById = {}; const verticesById: any = {};
vertices.forEach((vertex) => { vertices.forEach((vertex) => {
verticesById[vertex.id] = vertex; verticesById[vertex.id] = vertex;
vertex.stats = {}; vertex.stats = {};
@ -82,7 +93,7 @@ export function _enrichStateWithStatsAggregation(
const verticesWithStatsBuckets = const verticesWithStatsBuckets =
statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets; statsAggregation.aggregations.pipelines.scoped.vertices.vertex_id.buckets;
verticesWithStatsBuckets.forEach((vertexStatsBucket) => { verticesWithStatsBuckets.forEach((vertexStatsBucket: any) => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval // Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key; const vertexId = vertexStatsBucket.key;
const vertex = verticesById[vertexId]; const vertex = verticesById[vertexId];
@ -98,13 +109,20 @@ export function _enrichStateWithStatsAggregation(
} }
}); });
return stateDocument.logstash_state.pipeline; return stateDocument.logstash_state?.pipeline;
} }
export async function getPipeline(req, config, lsIndexPattern, clusterUuid, pipelineId, version) { export async function getPipeline(
req: LegacyRequest,
config: { get: (key: string) => string | undefined },
lsIndexPattern: string,
clusterUuid: string,
pipelineId: string,
version: { firstSeen: string; lastSeen: string; hash: string }
) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline'); checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');
const options = { const options: any = {
clusterUuid, clusterUuid,
pipelineId, pipelineId,
version, version,

View file

@ -4,16 +4,22 @@
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
// @ts-ignore
import { createQuery } from '../create_query'; import { createQuery } from '../create_query';
// @ts-ignore
import { LogstashMetric } from '../metrics'; import { LogstashMetric } from '../metrics';
import { get } from 'lodash'; import { LegacyRequest, ElasticsearchResponse } from '../../types';
export async function getPipelineStateDocument( export async function getPipelineStateDocument(
req, req: LegacyRequest,
logstashIndexPattern, logstashIndexPattern: string,
{ clusterUuid, pipelineId, version } {
clusterUuid,
pipelineId,
version,
}: { clusterUuid: string; pipelineId: string; version: { hash: string } }
) { ) {
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring'); const { callWithRequest } = req.server.plugins?.elasticsearch.getCluster('monitoring');
const filters = [ const filters = [
{ term: { 'logstash_state.pipeline.id': pipelineId } }, { term: { 'logstash_state.pipeline.id': pipelineId } },
{ term: { 'logstash_state.pipeline.hash': version.hash } }, { term: { 'logstash_state.pipeline.hash': version.hash } },
@ -43,8 +49,8 @@ export async function getPipelineStateDocument(
}, },
}; };
const resp = await callWithRequest(req, 'search', params); const resp = (await callWithRequest(req, 'search', params)) as ElasticsearchResponse;
// Return null if doc not found // Return null if doc not found
return get(resp, 'hits.hits[0]._source', null); return resp.hits?.hits[0]?._source ?? null;
} }

View file

@ -6,16 +6,24 @@
import boom from '@hapi/boom'; import boom from '@hapi/boom';
import { get } from 'lodash'; import { get } from 'lodash';
// @ts-ignore
import { checkParam } from '../error_missing_required'; import { checkParam } from '../error_missing_required';
import { getPipelineStateDocument } from './get_pipeline_state_document'; import { getPipelineStateDocument } from './get_pipeline_state_document';
// @ts-ignore
import { getPipelineVertexStatsAggregation } from './get_pipeline_vertex_stats_aggregation'; import { getPipelineVertexStatsAggregation } from './get_pipeline_vertex_stats_aggregation';
// @ts-ignore
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';
import { LegacyRequest } from '../../types';
import {
ElasticsearchSource,
ElasticsearchSourceLogstashPipelineVertex,
} from '../../../common/types/es';
export function _vertexStats( export function _vertexStats(
vertex, vertex: ElasticsearchSourceLogstashPipelineVertex,
vertexStatsBucket, vertexStatsBucket: any,
totalProcessorsDurationInMillis, totalProcessorsDurationInMillis: number,
timeseriesIntervalInSeconds timeseriesIntervalInSeconds: number
) { ) {
const isInput = vertex.plugin_type === 'input'; const isInput = vertex.plugin_type === 'input';
const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output'; const isProcessor = vertex.plugin_type === 'filter' || vertex.plugin_type === 'output';
@ -27,9 +35,12 @@ export function _vertexStats(
const durationInMillis = vertexStatsBucket.duration_in_millis_total.value; const durationInMillis = vertexStatsBucket.duration_in_millis_total.value;
const inputStats = {}; const inputStats: any = {};
const processorStats = {}; const processorStats: any = {};
const eventsProcessedStats = { const eventsProcessedStats: {
events_out_per_millisecond: number;
events_in_per_millisecond?: number;
} = {
events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis, events_out_per_millisecond: eventsOutTotal / timeseriesIntervalInMillis,
}; };
@ -72,21 +83,23 @@ export function _vertexStats(
* @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds * @param {Integer} timeseriesIntervalInSeconds The size of each timeseries bucket, in seconds
*/ */
export function _enrichVertexStateWithStatsAggregation( export function _enrichVertexStateWithStatsAggregation(
stateDocument, stateDocument: ElasticsearchSource,
vertexStatsAggregation, vertexStatsAggregation: any,
vertexId, vertexId: string,
timeseriesIntervalInSeconds timeseriesIntervalInSeconds: number
) { ) {
const logstashState = stateDocument.logstash_state; const logstashState = stateDocument.logstash_state;
const vertices = logstashState.pipeline.representation.graph.vertices; const vertices = logstashState?.pipeline?.representation?.graph?.vertices;
// First, filter out the vertex we care about // First, filter out the vertex we care about
const vertex = vertices.find((v) => v.id === vertexId); const vertex = vertices?.find((v) => v.id === vertexId);
vertex.stats = {}; if (vertex) {
vertex.stats = {};
}
// Next, iterate over timeseries metrics and attach them to vertex // Next, iterate over timeseries metrics and attach them to vertex
const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets; const timeSeriesBuckets = vertexStatsAggregation.aggregations.timeseries.buckets;
timeSeriesBuckets.forEach((timeSeriesBucket) => { timeSeriesBuckets.forEach((timeSeriesBucket: any) => {
// each bucket calculates stats for total pipeline CPU time for the associated timeseries // each bucket calculates stats for total pipeline CPU time for the associated timeseries
const totalDurationStats = timeSeriesBucket.pipelines.scoped.total_processor_duration_stats; const totalDurationStats = timeSeriesBucket.pipelines.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min; const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;
@ -94,31 +107,35 @@ export function _enrichVertexStateWithStatsAggregation(
const timestamp = timeSeriesBucket.key; const timestamp = timeSeriesBucket.key;
const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id; const vertexStatsBucket = timeSeriesBucket.pipelines.scoped.vertices.vertex_id;
const vertexStats = _vertexStats( if (vertex) {
vertex, const vertexStats = _vertexStats(
vertexStatsBucket, vertex,
totalProcessorsDurationInMillis, vertexStatsBucket,
timeseriesIntervalInSeconds totalProcessorsDurationInMillis,
); timeseriesIntervalInSeconds
Object.keys(vertexStats).forEach((stat) => { );
if (!vertex.stats.hasOwnProperty(stat)) { Object.keys(vertexStats).forEach((stat) => {
vertex.stats[stat] = { data: [] }; if (vertex?.stats) {
} if (!vertex.stats.hasOwnProperty(stat)) {
vertex.stats[stat].data.push([timestamp, vertexStats[stat]]); vertex.stats[stat] = { data: [] };
}); }
vertex.stats[stat].data?.push([timestamp, vertexStats[stat]]);
}
});
}
}); });
return vertex; return vertex;
} }
export async function getPipelineVertex( export async function getPipelineVertex(
req, req: LegacyRequest,
config, config: { get: (key: string) => string | undefined },
lsIndexPattern, lsIndexPattern: string,
clusterUuid, clusterUuid: string,
pipelineId, pipelineId: string,
version, version: { hash: string; firstSeen: string; lastSeen: string },
vertexId vertexId: string
) { ) {
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline'); checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');