Monitoring telemetry collection interval (#34609)

* Only collect usage data once a day from kibana monitoring

* isUsageCollector

* bulk uploader tests

* linting

* condition filter

* checkout yarn.lock from master

* update mappings for functional tests

* debugging for refactoring

* support legacy mappings

* self code review

* isUsageCollector

* fix mock

* live coding session with pickypg

* self code review

* Update x-pack/plugins/xpack_main/server/lib/telemetry/monitoring/get_high_level_stats.js

Co-Authored-By: Bamieh <ahmadbamieh@gmail.com>

* update mappings
This commit is contained in:
Ahmad Bamieh 2019-04-27 03:38:05 +03:00 committed by GitHub
parent a20e6b9f86
commit 27de17abbb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 149 additions and 20 deletions

View file

@ -22,6 +22,7 @@ import sinon from 'sinon';
import expect from '@kbn/expect';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';
import { UsageCollector } from '../usage_collector';
describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
@ -140,6 +141,26 @@ describe('CollectorSet', () => {
});
});
});
describe('isUsageCollector', () => {
const server = { };
const collectorOptions = { type: 'MY_TEST_COLLECTOR', fetch: () => {} };
it('returns true only for UsageCollector instances', () => {
const collectors = new CollectorSet(server);
const usageCollector = new UsageCollector(server, collectorOptions);
const collector = new Collector(server, collectorOptions);
const randomClass = new (class Random {});
expect(collectors.isUsageCollector(usageCollector)).to.be(true);
expect(collectors.isUsageCollector(collector)).to.be(false);
expect(collectors.isUsageCollector(randomClass)).to.be(false);
expect(collectors.isUsageCollector({})).to.be(false);
expect(collectors.isUsageCollector(null)).to.be(false);
expect(collectors.isUsageCollector('')).to.be(false);
expect(collectors.isUsageCollector()).to.be(false);
});
});
});

View file

@ -68,6 +68,11 @@ export class CollectorSet {
return this._collectors.find(c => c.type === type);
}
// isUsageCollector(x: UsageCollector | any): x is UsageCollector {
isUsageCollector(x) {
return x instanceof UsageCollector;
}
/*
* Call a bunch of fetch methods and then do them in bulk
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors

View file

@ -17,6 +17,9 @@ class MockCollectorSet {
this.mockServer = _mockServer;
this.mockCollectors = mockCollectors;
}
isUsageCollector(x) {
return !!x.isUsageCollector;
}
getCollectorByType(type) {
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
}
@ -41,6 +44,9 @@ describe('BulkUploader', () => {
server = {
log: sinon.spy(),
plugins: {
xpack_main: {
telemetryCollectionInterval: 3000,
},
elasticsearch: {
createCluster: () => cluster,
getCluster: () => cluster,
@ -121,5 +127,67 @@ describe('BulkUploader', () => {
done();
}, CHECK_DELAY);
});
it('does not call UsageCollectors if last reported is within the usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });
const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);
const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now();
uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.eql(0);
done();
}, CHECK_DELAY);
});
it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });
const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);
const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now() - uploader._usageInterval;
uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.be.greaterThan(0);
done();
}, CHECK_DELAY);
});
});
});

View file

@ -6,6 +6,7 @@
import { defaultsDeep, uniq, compact } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';
import {
LOGGING_TAG,
KIBANA_MONITORING_LOGGING_TAG,
@ -42,6 +43,9 @@ export class BulkUploader {
this._timer = null;
this._interval = interval;
this._lastFetchUsageTime = null;
this._usageInterval = server.plugins.xpack_main.telemetryCollectionInterval;
this._log = {
debug: message => server.log(['debug', ...LOGGING_TAGS], message),
info: message => server.log(['info', ...LOGGING_TAGS], message),
@ -63,18 +67,34 @@ export class BulkUploader {
*/
start(collectorSet) {
this._log.info('Starting monitoring stats collection');
const filterCollectorSet = _collectorSet => {
const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
this._lastFetchWithUsage = !filterUsage;
if (!filterUsage) {
this._lastFetchUsageTime = Date.now();
}
// this is internal bulk upload, so filter out API-only collectors
const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true);
return _collectorSet.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (filterUsage && _collectorSet.isUsageCollector(c)) {
return false;
}
return true;
});
};
if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(filterThem(collectorSet)); // initial fetch
this._fetchAndUpload(filterCollectorSet(collectorSet)); // initial fetch
}
this._timer = setInterval(() => {
this._fetchAndUpload(filterThem(collectorSet));
this._fetchAndUpload(filterCollectorSet(collectorSet));
}, this._interval);
}
@ -172,7 +192,6 @@ export class BulkUploader {
const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result);
return defaultsDeep(accum, { [uploadType]: uploadData });
}, {});
// convert the nested object into a flat array, with each payload prefixed
// with an 'index' instruction, for bulk upload
const flat = Object.keys(typesNested).reduce((accum, type) => {

View file

@ -6,6 +6,7 @@
import { injectXPackInfoSignature } from './inject_xpack_info_signature';
import { XPackInfo } from './xpack_info';
import { REPORT_INTERVAL_MS } from '../../common/constants';
import { FeatureRegistry } from './feature_registry';
/**
@ -22,6 +23,7 @@ export function setupXPackMain(server) {
});
server.expose('info', info);
server.expose('telemetryCollectionInterval', REPORT_INTERVAL_MS);
server.expose('createXPackInfo', (options) => new XPackInfo(server, options));
server.ext('onPreResponse', (request, h) => injectXPackInfoSignature(info, request, h));

View file

@ -175,20 +175,34 @@ export function getHighLevelStats(server, callCluster, clusterUuids, start, end,
.then(response => handleHighLevelStatsResponse(response, product));
}
/**
* Fetch the high level stats to report for the {@code product}.
*
* @param {Object} server The server instance
* @param {function} callCluster The callWithRequest or callWithInternalUser handler
* @param {Array} indices The indices to use for the request
* @param {Array} clusterUuids Cluster UUIDs to limit the request against
* @param {Date} start Start time to limit the stats
* @param {Date} end End time to limit the stats
* @param {String} product The product to limit too ('kibana', 'logstash', 'beats')
* @return {Promise} Response for the instances to fetch detailed for the product.
*/
export function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) {
export async function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) {
const config = server.config();
const isKibanaIndex = product === KIBANA_SYSTEM_ID;
const filters = [
{ terms: { cluster_uuid: clusterUuids } },
];
// we should supply this from a parameter in the future so that this remains generic
if (isKibanaIndex) {
const kibanaFilter = {
bool: {
should: [
{ exists: { field: 'kibana_stats.usage.index' } },
{
bool: {
should: [
{ range: { 'kibana_stats.kibana.version': { lt: '6.7.3' } } },
{ term: { 'kibana_stats.kibana.version': '7.0.0' } },
]
}
}
],
}
};
filters.push(kibanaFilter);
}
const params = {
index: getIndexPatternForStackProduct(product),
size: config.get('xpack.monitoring.max_bucket_size'),
@ -210,7 +224,7 @@ export function fetchHighLevelStats(server, callCluster, clusterUuids, start, en
start,
end,
type: `${product}_stats`,
filters: [ { terms: { cluster_uuid: clusterUuids } } ]
filters,
}),
collapse: {
// a more ideal field would be the concatenation of the uuid + transport address for duped UUIDs (copied installations)

View file

@ -42,7 +42,7 @@
"aliases": {
},
"index": ".monitoring-kibana-6-2017.08.15",
"mappings":{"_doc":{"dynamic":false,"properties":{"cluster_uuid":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"},"interval_ms":{"type":"long"},"type":{"type":"keyword"},"source_node":{"properties":{"uuid":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"ip":{"type":"keyword"},"name":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"}}},"kibana_stats":{"properties":{"kibana":{"properties":{"uuid":{"type":"keyword"},"name":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"version":{"type":"keyword"},"snapshot":{"type":"boolean"},"status":{"type":"keyword"},"statuses":{"properties":{"name":{"type":"keyword"},"state":{"type":"keyword"}}}}},"cloud":{"properties":{"name":{"type":"keyword"},"id":{"type":"keyword"},"vm_type":{"type":"keyword"},"region":{"type":"keyword"},"zone":{"type":"keyword"},"metadata":{"type":"object"}}},"os":{"properties":{"load":{"properties":{"1m":{"type":"half_float"},"5m":{"type":"half_float"},"15m":{"type":"half_float"}}},"memory":{"properties":{"total_in_bytes":{"type":"float"},"free_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"}}},"uptime_in_millis":{"type":"long"}}},"process":{"properties":{"memory":{"properties":{"heap":{"properties":{"total_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"},"size_limit":{"type":"float"}}},"resident_set_size_in_bytes":{"type":"float"}}},"event_loop_delay":{"type":"float"},"uptime_in_millis":{"type":"long"}}},"sockets":{"properties":{"http":{"properties":{"total":{"type":"long"}}},"https":{"properties":{"total":{"type":"long"}}}}},"timestamp":{"type":"date"},"requests":{"properties":{"disconnects":{"type":"long"},"total":{"type":"long"},"status_codes":{"type":"object"}}},"response_times":{"properties":{"average":{"type":"float"},"max":{"type":"float"}}},"concurrent_connections":{"type":"long"}}}}}},
"mappings":{"_doc":{"dynamic":false,"properties":{"cluster_uuid":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"},"interval_ms":{"type":"long"},"type":{"type":"keyword"},"source_node":{"properties":{"uuid":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"ip":{"type":"keyword"},"name":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"}}},"kibana_stats":{"properties":{"usage":{"properties":{"index":{"type":"keyword"}}},"kibana":{"properties":{"uuid":{"type":"keyword"},"name":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"version":{"type":"keyword"},"snapshot":{"type":"boolean"},"status":{"type":"keyword"},"statuses":{"properties":{"name":{"type":"keyword"},"state":{"type":"keyword"}}}}},"cloud":{"properties":{"name":{"type":"keyword"},"id":{"type":"keyword"},"vm_type":{"type":"keyword"},"region":{"type":"keyword"},"zone":{"type":"keyword"},"metadata":{"type":"object"}}},"os":{"properties":{"load":{"properties":{"1m":{"type":"half_float"},"5m":{"type":"half_float"},"15m":{"type":"half_float"}}},"memory":{"properties":{"total_in_bytes":{"type":"float"},"free_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"}}},"uptime_in_millis":{"type":"long"}}},"process":{"properties":{"memory":{"properties":{"heap":{"properties":{"total_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"},"size_limit":{"type":"float"}}},"resident_set_size_in_bytes":{"type":"float"}}},"event_loop_delay":{"type":"float"},"uptime_in_millis":{"type":"long"}}},"sockets":{"properties":{"http":{"properties":{"total":{"type":"long"}}},"https":{"properties":{"total":{"type":"long"}}}}},"timestamp":{"type":"date"},"requests":{"properties":{"disconnects":{"type":"long"},"total":{"type":"long"},"status_codes":{"type":"object"}}},"response_times":{"properties":{"average":{"type":"float"},"max":{"type":"float"}}},"concurrent_connections":{"type":"long"}}}}}},
"settings": {
"index": {
"auto_expand_replicas": "0-1",