[Monitoring/Beats] Telemetry Data from Beats (#18833)

* [Monitoring/Beats] Telemetry Data from Beats

* filter apm-server

* ignore results payload if hitsLength === 0

* process each payload as stats are saved to clusters object
This commit is contained in:
Tim Sullivan 2018-05-08 09:19:15 -07:00 committed by GitHub
parent 930e80efba
commit 698dfa455c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 11376 additions and 3 deletions

View file

@ -23,6 +23,12 @@ export const CONFIG_TELEMETRY_DESC = (
*/
export const KIBANA_SYSTEM_ID = 'kibana';
/**
* The name of the Beats System ID used to publish and look up Beats stats through the Monitoring system.
* @type {string}
*/
export const BEATS_SYSTEM_ID = 'beats';
/**
* The name of the Kibana System ID used to look up Logstash stats through the Monitoring system.
* @type {string}

View file

@ -0,0 +1,142 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { fetchBeatsStats, processResults } from '../get_beats_stats';
import sinon from 'sinon';
import expect from 'expect.js';
import beatsStatsResultSet from './fixtures/beats_stats_results';
describe('Get Beats Stats', () => {
describe('fetchBeatsStats', () => {
const clusterUuids = ['aCluster', 'bCluster', 'cCluster'];
const start = 100;
const end = 200;
let server;
let callCluster;
beforeEach(() => {
const getStub = { get: sinon.stub() };
getStub.get
.withArgs('xpack.monitoring.beats.index_pattern')
.returns('beats-indices-*');
server = { config: () => getStub };
callCluster = sinon.stub();
});
it('should set `from: 0, to: 10000` in the query', () => {
fetchBeatsStats(server, callCluster, clusterUuids, start, end);
const { args } = callCluster.firstCall;
const [api, { body }] = args;
expect(api).to.be('search');
expect(body.from).to.be(0);
expect(body.size).to.be(10000);
});
it('should set `from: 10000, from: 10000` in the query', () => {
fetchBeatsStats(server, callCluster, clusterUuids, start, end, { page: 1 });
const { args } = callCluster.firstCall;
const [api, { body }] = args;
expect(api).to.be('search');
expect(body.from).to.be(10000);
expect(body.size).to.be(10000);
});
it('should set `from: 20000, from: 10000` in the query', () => {
fetchBeatsStats(server, callCluster, clusterUuids, start, end, { page: 2 });
const { args } = callCluster.firstCall;
const [api, { body }] = args;
expect(api).to.be('search');
expect(body.from).to.be(20000);
expect(body.size).to.be(10000);
});
});
describe('processResults', () => {
it('should summarize empty results', () => {
const resultsEmpty = undefined;
const clusters = {};
const clusterHostMaps = {};
processResults(resultsEmpty, clusters, clusterHostMaps);
expect(clusters).to.eql({});
});
it('should summarize single result with some missing fields', () => {
const results = {
hits: {
hits: [
{
_source: {
cluster_uuid: 'FlV4ckTxQ0a78hmBkzzc9A',
beats_stats: {
metrics: { libbeat: { output: { type: 'elasticsearch' } } }, // missing events published
beat: { type: 'cowbeat' }, // missing version and output
},
},
},
],
},
};
const clusters = {};
const clusterHostMaps = {};
processResults(results, clusters, clusterHostMaps);
expect(clusters).to.eql({
FlV4ckTxQ0a78hmBkzzc9A: {
count: 1,
versions: {},
types: { cowbeat: 1 },
outputs: { elasticsearch: 1 },
eventsPublished: 0,
hosts: 0,
},
});
});
it('should summarize stats from hits across multiple result objects', () => {
const clusters = {};
const clusterHostMaps = {};
// beatsStatsResultSet is an array of many small query results
beatsStatsResultSet.forEach(results => {
processResults(results, clusters, clusterHostMaps);
});
expect(clusters).to.eql({
W7hppdX7R229Oy3KQbZrTw: {
count: 5,
versions: { '7.0.0-alpha1': 5 },
types: { metricbeat: 1, filebeat: 4 },
outputs: { elasticsearch: 5 },
eventsPublished: 80875,
hosts: 1,
},
FlV4ckTxQ0a78hmBkzzc9A: {
count: 405,
versions: { '7.0.0-alpha1': 405 },
types: {
filebeat: 200,
metricbeat: 100,
heartbeat: 100,
winlogbeat: 1,
duckbeat: 1,
'apm-server': 1,
sheepbeat: 1,
cowbeat: 1,
},
outputs: { elasticsearch: 405 },
eventsPublished: 723985,
hosts: 1,
},
});
});
});
});

View file

@ -9,11 +9,13 @@ import {
KIBANA_SYSTEM_ID,
LOGSTASH_SYSTEM_ID,
REPORTING_SYSTEM_ID,
BEATS_SYSTEM_ID,
} from '../../../../common/constants';
import { getClusterUuids } from './get_cluster_uuids';
import { getElasticsearchStats } from './get_es_stats';
import { getKibanaStats } from './get_kibana_stats';
import { getReportingStats } from './get_reporting_stats';
import { getBeatsStats } from './get_beats_stats';
import { getHighLevelStats } from './get_high_level_stats';
/**
@ -68,8 +70,9 @@ function getAllStatsWithCaller(server, callCluster, start, end) {
getKibanaStats(server, callCluster, clusterUuids, start, end), // stack_stats.kibana
getHighLevelStats(server, callCluster, clusterUuids, start, end, LOGSTASH_SYSTEM_ID), // stack_stats.logstash
getReportingStats(server, callCluster, clusterUuids, start, end), // stack_stats.xpack.reporting
getBeatsStats(server, callCluster, clusterUuids, start, end), // stack_stats.beats
])
.then(([esClusters, kibana, logstash, reporting]) => handleAllStats(esClusters, { kibana, logstash, reporting }));
.then(([esClusters, kibana, logstash, reporting, beats]) => handleAllStats(esClusters, { kibana, logstash, reporting, beats }));
});
}
@ -82,12 +85,12 @@ function getAllStatsWithCaller(server, callCluster, start, end) {
* @param {Object} logstash The Logstash nodes keyed by Cluster UUID
* @return {Array} The clusters joined with the Kibana and Logstash instances under each cluster's {@code stack_stats}.
*/
export function handleAllStats(clusters, { kibana, logstash, reporting }) {
export function handleAllStats(clusters, { kibana, logstash, reporting, beats }) {
return clusters.map(cluster => {
// if they are using Kibana or Logstash, then add it to the cluster details under cluster.stack_stats
addStackStats(cluster, kibana, KIBANA_SYSTEM_ID);
addStackStats(cluster, logstash, LOGSTASH_SYSTEM_ID);
addStackStats(cluster, beats, BEATS_SYSTEM_ID);
addXPackStats(cluster, reporting, REPORTING_SYSTEM_ID);
mergeXPackStats(cluster, kibana, 'graph_workspace', 'graph'); // copy graph_workspace info out of kibana, merge it into stack_stats.xpack.graph

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { get } from 'lodash';
import { createQuery } from './create_query';
const HITS_SIZE = 10000; // maximum hits to receive from ES with each search
/*
* Create a set of result objects where each is the result of searching hits from Elasticsearch with a size of HITS_SIZE each time.
* @param {Object} server - The server instance
* @param {function} callCluster - The callWithRequest or callWithInternalUser handler
* @param {Array} clusterUuids - The string Cluster UUIDs to fetch details for
* @param {Date} start - Start time to limit the stats
* @param {Date} end - End time to limit the stats
* @param {Number} page - selection of hits to fetch from ES
* @param {Object} clusters - Beats stats in an object keyed by the cluster UUIDs
* @param {Object} clusterHostMaps - the object keyed by cluster UUIDs to count the unique hosts
* @return {Promise}
*/
export async function fetchBeatsStats(
server, callCluster, clusterUuids, start, end,
{ page = 0, clusters, clusterHostMaps } = {}
) {
const config = server.config();
const params = {
index: config.get('xpack.monitoring.beats.index_pattern'),
ignoreUnavailable: true,
filterPath: [
'hits.hits._source.cluster_uuid',
'hits.hits._source.beats_stats.beat.version',
'hits.hits._source.beats_stats.beat.type',
'hits.hits._source.beats_stats.beat.host',
'hits.hits._source.beats_stats.metrics.libbeat.pipeline.events.published',
'hits.hits._source.beats_stats.metrics.libbeat.output.type',
],
body: {
query: createQuery({
start,
end,
type: 'beats_stats',
filters: [
{ terms: { cluster_uuid: clusterUuids } },
{ bool: { must_not: { term: { 'beats_stats.beat.type': 'apm-server' } } } },
],
}),
collapse: { field: 'beats_stats.beat.uuid' },
sort: [{ 'beats_stats.timestamp': 'desc' }],
from: page * HITS_SIZE,
size: HITS_SIZE,
},
};
const results = await callCluster('search', params);
const hitsLength = get(results, 'hits.hits.length', 0);
if (hitsLength > 0) {
// further augment the clusters object with more stats
processResults(results, clusters, clusterHostMaps);
if (hitsLength === HITS_SIZE) {
// call recursively
const nextOptions = {
clusters,
clusterHostMaps,
page: page + 1,
};
// returns a promise and keeps the caller blocked from returning until the entire clusters object is built
return fetchBeatsStats(server, callCluster, clusterUuids, start, end, nextOptions);
}
}
return Promise.resolve();
}
const getBaseStats = () => ({
count: 0,
versions: {},
types: {},
outputs: {},
eventsPublished: 0,
hosts: 0,
});
/*
* Update a clusters object with processed beat stats
* @param {Array} results - array of Beats docs from ES
* @param {Object} clusters - Beats stats in an object keyed by the cluster UUIDs
* @param {Object} clusterHostMaps - the object keyed by cluster UUIDs to count the unique hosts
*/
export function processResults(results = [], clusters, clusterHostMaps) {
const currHits = get(results, 'hits.hits', []);
currHits.forEach(hit => {
const clusterUuid = get(hit, '_source.cluster_uuid');
if (clusters[clusterUuid] === undefined) {
clusters[clusterUuid] = getBaseStats();
clusterHostMaps[clusterUuid] = new Map();
}
clusters[clusterUuid].count += 1;
const { versions, types, outputs } = clusters[clusterUuid];
const thisVersion = get(hit, '_source.beats_stats.beat.version');
if (thisVersion !== undefined) {
const thisVersionAccum = versions[thisVersion] || 0;
versions[thisVersion] = thisVersionAccum + 1;
}
const thisType = get(hit, '_source.beats_stats.beat.type');
if (thisType !== undefined) {
const thisTypeAccum = types[thisType] || 0;
types[thisType] = thisTypeAccum + 1;
}
const thisOutput = get(hit, '_source.beats_stats.metrics.libbeat.output.type');
if (thisOutput !== undefined) {
const thisOutputAccum = outputs[thisOutput] || 0;
outputs[thisOutput] = thisOutputAccum + 1;
}
const thisEvents = get(hit, '_source.beats_stats.metrics.libbeat.pipeline.events.published');
if (thisEvents !== undefined) {
clusters[clusterUuid].eventsPublished += thisEvents;
}
const thisHost = get(hit, '_source.beats_stats.beat.host');
if (thisHost !== undefined) {
const hostsMap = clusterHostMaps[clusterUuid];
hostsMap.set(thisHost, thisHost); // values don't matter, as this data structure is not part of the output
clusters[clusterUuid].hosts = hostsMap.size;
}
});
}
/*
* Call the function for fetching and summarizing beats stats
* @return {Object} - Beats stats in an object keyed by the cluster UUIDs
*/
export async function getBeatsStats(server, callCluster, clusterUuids, start, end) {
const clusters = {};
const clusterHostMaps = {};
await fetchBeatsStats(server, callCluster, clusterUuids, start, end, { clusters, clusterHostMaps });
return clusters;
}