Retrofit the Bulk Uploader types combiner [ch2198] (#22030)
* Retrofit the Bulk Uploader types combiner [ch2198] fix usage collector, add comments to formatForBulk remove unnecessary customizations * override default format for bulk upload for usage type collectors * rename to ignoreForInternalUploader * collectors -> collectorSet * use constant for kibana_stats type * example of data formatting for bulk in function comment
This commit is contained in:
parent
91759dc3d9
commit
8718d1ee4d
|
@ -44,6 +44,7 @@ export function getOpsStatsCollector(server, kbnServer) {
|
||||||
kibana: getKibanaInfoForStats(server, kbnServer),
|
kibana: getKibanaInfoForStats(server, kbnServer),
|
||||||
...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index
|
...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index
|
||||||
};
|
};
|
||||||
}
|
},
|
||||||
|
ignoreForInternalUploader: true, // Ignore this one from internal uploader. A different stats collector is used there.
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,26 +25,48 @@ export class Collector {
|
||||||
* @param {String} options.type - property name as the key for the data
|
* @param {String} options.type - property name as the key for the data
|
||||||
* @param {Function} options.init (optional) - initialization function
|
* @param {Function} options.init (optional) - initialization function
|
||||||
* @param {Function} options.fetch - function to query data
|
* @param {Function} options.fetch - function to query data
|
||||||
|
* @param {Function} options.formatForBulkUpload - optional
|
||||||
|
* @param {Function} options.rest - optional other properties
|
||||||
*/
|
*/
|
||||||
constructor(server, { type, init, fetch } = {}) {
|
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
|
||||||
if (type === undefined) {
|
if (type === undefined) {
|
||||||
throw new Error('Collector must be instantiated with a options.type string property');
|
throw new Error('Collector must be instantiated with a options.type string property');
|
||||||
}
|
}
|
||||||
|
if (typeof init !== 'undefined' && typeof init !== 'function') {
|
||||||
|
throw new Error('If init property is passed, Collector must be instantiated with a options.init as a function property');
|
||||||
|
}
|
||||||
if (typeof fetch !== 'function') {
|
if (typeof fetch !== 'function') {
|
||||||
throw new Error('Collector must be instantiated with a options.fetch function property');
|
throw new Error('Collector must be instantiated with a options.fetch function property');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.log = getCollectorLogger(server);
|
||||||
|
|
||||||
|
Object.assign(this, options); // spread in other properties and mutate "this"
|
||||||
|
|
||||||
this.type = type;
|
this.type = type;
|
||||||
this.init = init;
|
this.init = init;
|
||||||
this.fetch = fetch;
|
this.fetch = fetch;
|
||||||
|
|
||||||
this.log = getCollectorLogger(server);
|
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
|
||||||
|
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param {Function} callCluster - callCluster function
|
||||||
|
*/
|
||||||
fetchInternal(callCluster) {
|
fetchInternal(callCluster) {
|
||||||
if (typeof callCluster !== 'function') {
|
if (typeof callCluster !== 'function') {
|
||||||
throw new Error('A `callCluster` function must be passed to the fetch methods of collectors');
|
throw new Error('A `callCluster` function must be passed to the fetch methods of collectors');
|
||||||
}
|
}
|
||||||
return this.fetch(callCluster);
|
return this.fetch(callCluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* A hook for allowing the fetched data payload to be organized into a typed
|
||||||
|
* data model for internal bulk upload. See defaultFormatterForBulkUpload for
|
||||||
|
* a generic example.
|
||||||
|
*/
|
||||||
|
formatForBulkUpload(result) {
|
||||||
|
return this._formatForBulkUpload(result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,19 +26,17 @@ import { UsageCollector } from './usage_collector';
|
||||||
/*
|
/*
|
||||||
* A collector object has types registered into it with the register(type)
|
* A collector object has types registered into it with the register(type)
|
||||||
* function. Each type that gets registered defines how to fetch its own data
|
* function. Each type that gets registered defines how to fetch its own data
|
||||||
* and combine it into a unified payload for bulk upload.
|
* and optionally, how to combine it into a unified payload for bulk upload.
|
||||||
*/
|
*/
|
||||||
export class CollectorSet {
|
export class CollectorSet {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @param {Object} server - server object
|
* @param {Object} server - server object
|
||||||
* @param {Number} options.interval - in milliseconds
|
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
|
||||||
* @param {Function} options.combineTypes
|
|
||||||
* @param {Function} options.onPayload
|
|
||||||
*/
|
*/
|
||||||
constructor(server) {
|
constructor(server, collectors = []) {
|
||||||
this._log = getCollectorLogger(server);
|
this._log = getCollectorLogger(server);
|
||||||
this._collectors = [];
|
this._collectors = collectors;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Helper Factory methods
|
* Helper Factory methods
|
||||||
|
@ -46,6 +44,7 @@ export class CollectorSet {
|
||||||
*/
|
*/
|
||||||
this.makeStatsCollector = options => new Collector(server, options);
|
this.makeStatsCollector = options => new Collector(server, options);
|
||||||
this.makeUsageCollector = options => new UsageCollector(server, options);
|
this.makeUsageCollector = options => new UsageCollector(server, options);
|
||||||
|
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -71,14 +70,14 @@ export class CollectorSet {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Call a bunch of fetch methods and then do them in bulk
|
* Call a bunch of fetch methods and then do them in bulk
|
||||||
* @param {Array} collectors - an array of collectors, default to all registered collectors
|
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
|
||||||
*/
|
*/
|
||||||
bulkFetch(callCluster, collectors = this._collectors) {
|
bulkFetch(callCluster, collectorSet = this) {
|
||||||
if (!Array.isArray(collectors)) {
|
if (!(collectorSet instanceof CollectorSet)) {
|
||||||
throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors);
|
throw new Error(`bulkFetch method given bad collectorSet parameter: ` + typeof collectorSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Promise.map(collectors, collector => {
|
const fetchPromises = collectorSet.map(collector => {
|
||||||
const collectorType = collector.type;
|
const collectorType = collector.type;
|
||||||
this._log.debug(`Fetching data from ${collectorType} collector`);
|
this._log.debug(`Fetching data from ${collectorType} collector`);
|
||||||
return Promise.props({
|
return Promise.props({
|
||||||
|
@ -90,10 +89,19 @@ export class CollectorSet {
|
||||||
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
|
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
return Promise.all(fetchPromises);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @return {new CollectorSet}
|
||||||
|
*/
|
||||||
|
getFilteredCollectorSet(filter) {
|
||||||
|
const filtered = this._collectors.filter(filter);
|
||||||
|
return this._makeCollectorSetFromArray(filtered);
|
||||||
}
|
}
|
||||||
|
|
||||||
async bulkFetchUsage(callCluster) {
|
async bulkFetchUsage(callCluster) {
|
||||||
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
|
const usageCollectors = this.getFilteredCollectorSet(c => c instanceof UsageCollector);
|
||||||
return this.bulkFetch(callCluster, usageCollectors);
|
return this.bulkFetch(callCluster, usageCollectors);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,4 +145,8 @@ export class CollectorSet {
|
||||||
};
|
};
|
||||||
}, {});
|
}, {});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
map(mapFn) {
|
||||||
|
return this._collectors.map(mapFn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,35 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import { KIBANA_STATS_TYPE } from '../../status/constants';
|
||||||
import { Collector } from './collector';
|
import { Collector } from './collector';
|
||||||
|
|
||||||
export class UsageCollector extends Collector {}
|
export class UsageCollector extends Collector {
|
||||||
|
/*
|
||||||
|
* @param {Object} server - server object
|
||||||
|
* @param {String} options.type - property name as the key for the data
|
||||||
|
* @param {Function} options.init (optional) - initialization function
|
||||||
|
* @param {Function} options.fetch - function to query data
|
||||||
|
* @param {Function} options.formatForBulkUpload - optional
|
||||||
|
* @param {Function} options.rest - optional other properties
|
||||||
|
*/
|
||||||
|
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
|
||||||
|
super(server, { type, init, fetch, formatForBulkUpload, ...options });
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Currently, for internal bulk uploading, usage stats are part of
|
||||||
|
* `kibana_stats` type, under the `usage` namespace in the document.
|
||||||
|
*/
|
||||||
|
const defaultUsageFormatterForBulkUpload = result => {
|
||||||
|
return {
|
||||||
|
type: KIBANA_STATS_TYPE,
|
||||||
|
payload: {
|
||||||
|
usage: {
|
||||||
|
[type]: result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
this._formatForBulkUpload = formatForBulkUpload || defaultUsageFormatterForBulkUpload;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ export const MONITORING_SYSTEM_API_VERSION = '6';
|
||||||
* The type name used within the Monitoring index to publish Kibana ops stats.
|
* The type name used within the Monitoring index to publish Kibana ops stats.
|
||||||
* @type {string}
|
* @type {string}
|
||||||
*/
|
*/
|
||||||
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_monitoring'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
|
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
|
||||||
/**
|
/**
|
||||||
* The type name used within the Monitoring index to publish Kibana stats.
|
* The type name used within the Monitoring index to publish Kibana stats.
|
||||||
* @type {string}
|
* @type {string}
|
||||||
|
|
|
@ -14,8 +14,15 @@ const CHECK_DELAY = 500;
|
||||||
|
|
||||||
class MockCollectorSet {
|
class MockCollectorSet {
|
||||||
constructor(_mockServer, mockCollectors) {
|
constructor(_mockServer, mockCollectors) {
|
||||||
|
this.mockServer = _mockServer;
|
||||||
this.mockCollectors = mockCollectors;
|
this.mockCollectors = mockCollectors;
|
||||||
}
|
}
|
||||||
|
getCollectorByType(type) {
|
||||||
|
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
|
||||||
|
}
|
||||||
|
getFilteredCollectorSet(filter) {
|
||||||
|
return new MockCollectorSet(this.mockServer, this.mockCollectors.filter(filter));
|
||||||
|
}
|
||||||
async bulkFetch() {
|
async bulkFetch() {
|
||||||
return this.mockCollectors.map(({ fetch }) => fetch());
|
return this.mockCollectors.map(({ fetch }) => fetch());
|
||||||
}
|
}
|
||||||
|
@ -47,7 +54,8 @@ describe('BulkUploader', () => {
|
||||||
const collectors = new MockCollectorSet(server, [
|
const collectors = new MockCollectorSet(server, [
|
||||||
{
|
{
|
||||||
type: 'type_collector_test',
|
type: 'type_collector_test',
|
||||||
fetch: noop, // empty payloads
|
fetch: noop, // empty payloads,
|
||||||
|
formatForBulkUpload: result => result,
|
||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
@ -82,7 +90,10 @@ describe('BulkUploader', () => {
|
||||||
|
|
||||||
it('should run the bulk upload handler', done => {
|
it('should run the bulk upload handler', done => {
|
||||||
const collectors = new MockCollectorSet(server, [
|
const collectors = new MockCollectorSet(server, [
|
||||||
{ fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }) }
|
{
|
||||||
|
fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }),
|
||||||
|
formatForBulkUpload: result => result
|
||||||
|
}
|
||||||
]);
|
]);
|
||||||
const uploader = new BulkUploader(server, {
|
const uploader = new BulkUploader(server, {
|
||||||
interval: FETCH_INTERVAL
|
interval: FETCH_INTERVAL
|
||||||
|
|
|
@ -1,187 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import { KIBANA_STATS_TYPE_MONITORING, KIBANA_USAGE_TYPE, KIBANA_SETTINGS_TYPE } from '../../common/constants';
|
|
||||||
import { KIBANA_REPORTING_TYPE } from '../../../reporting/common/constants';
|
|
||||||
import { BulkUploader } from './bulk_uploader';
|
|
||||||
|
|
||||||
const getInitial = () => {
|
|
||||||
return [
|
|
||||||
[
|
|
||||||
{ 'index': { '_type': KIBANA_STATS_TYPE_MONITORING } },
|
|
||||||
{
|
|
||||||
'host': 'tsullivan.local',
|
|
||||||
'concurrent_connections': 0,
|
|
||||||
'os': {
|
|
||||||
'load': { '1m': 2.28857421875, '5m': 2.45068359375, '15m': 2.29248046875 },
|
|
||||||
'memory': { 'total_in_bytes': 17179869184, 'free_in_bytes': 527749120, 'used_in_bytes': 16652120064 },
|
|
||||||
'uptime_in_millis': 1211027000
|
|
||||||
},
|
|
||||||
'process': {
|
|
||||||
'event_loop_delay': 4.222616970539093,
|
|
||||||
'memory': {
|
|
||||||
'heap': { 'total_in_bytes': 219455488, 'used_in_bytes': 152622064, 'size_limit': 1501560832 },
|
|
||||||
'resident_set_size_in_bytes': 245923840
|
|
||||||
},
|
|
||||||
'uptime_in_millis': 18467
|
|
||||||
},
|
|
||||||
'requests': {
|
|
||||||
'disconnects': 0,
|
|
||||||
'total': 2,
|
|
||||||
},
|
|
||||||
'response_times': { 'average': 47, 'max': 47 },
|
|
||||||
'timestamp': '2017-07-26T00:14:20.771Z',
|
|
||||||
}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
{ 'index': { '_type': KIBANA_USAGE_TYPE } },
|
|
||||||
{
|
|
||||||
'dashboard': { 'total': 0 },
|
|
||||||
'visualization': { 'total': 0 },
|
|
||||||
'search': { 'total': 0 },
|
|
||||||
'index_pattern': { 'total': 2 },
|
|
||||||
'index': '.kibana'
|
|
||||||
}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
{ 'index': { '_type': KIBANA_REPORTING_TYPE } },
|
|
||||||
{
|
|
||||||
'available': true,
|
|
||||||
'enabled': false,
|
|
||||||
'_all': 55,
|
|
||||||
'csv': {
|
|
||||||
'available': true,
|
|
||||||
'count': 25
|
|
||||||
},
|
|
||||||
'printable_pdf': {
|
|
||||||
'available': true,
|
|
||||||
'count': 30
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
{ 'index': { '_type': KIBANA_SETTINGS_TYPE } },
|
|
||||||
{ 'xpack': { 'defaultAdminEmail': 'tim@elastic.co' } }
|
|
||||||
]
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO use jest snapshotting
|
|
||||||
const getResult = () => {
|
|
||||||
return [
|
|
||||||
[
|
|
||||||
{ 'index': { '_type': 'kibana_stats' } },
|
|
||||||
{
|
|
||||||
'host': 'tsullivan.local',
|
|
||||||
'concurrent_connections': 0,
|
|
||||||
'os': {
|
|
||||||
'load': { '1m': 2.28857421875, '5m': 2.45068359375, '15m': 2.29248046875 },
|
|
||||||
'memory': { 'total_in_bytes': 17179869184, 'free_in_bytes': 527749120, 'used_in_bytes': 16652120064 },
|
|
||||||
'uptime_in_millis': 1211027000
|
|
||||||
},
|
|
||||||
'process': {
|
|
||||||
'event_loop_delay': 4.222616970539093,
|
|
||||||
'memory': {
|
|
||||||
'heap': { 'total_in_bytes': 219455488, 'used_in_bytes': 152622064, 'size_limit': 1501560832 },
|
|
||||||
'resident_set_size_in_bytes': 245923840
|
|
||||||
},
|
|
||||||
'uptime_in_millis': 18467
|
|
||||||
},
|
|
||||||
'requests': {
|
|
||||||
'disconnects': 0,
|
|
||||||
'total': 2,
|
|
||||||
},
|
|
||||||
'response_times': { 'average': 47, 'max': 47 },
|
|
||||||
'timestamp': '2017-07-26T00:14:20.771Z',
|
|
||||||
'usage': {
|
|
||||||
'dashboard': { 'total': 0 },
|
|
||||||
'visualization': { 'total': 0 },
|
|
||||||
'search': { 'total': 0 },
|
|
||||||
'index_pattern': { 'total': 2 },
|
|
||||||
'index': '.kibana',
|
|
||||||
'xpack': {
|
|
||||||
'reporting': {
|
|
||||||
'_all': 55,
|
|
||||||
'available': true,
|
|
||||||
'csv': {
|
|
||||||
'available': true,
|
|
||||||
'count': 25,
|
|
||||||
},
|
|
||||||
'enabled': false,
|
|
||||||
'printable_pdf': {
|
|
||||||
'available': true,
|
|
||||||
'count': 30,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
{ 'index': { '_type': 'kibana_settings' } },
|
|
||||||
{
|
|
||||||
'xpack': { 'defaultAdminEmail': 'tim@elastic.co' },
|
|
||||||
}
|
|
||||||
]
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
describe('Collector Types Combiner', () => {
|
|
||||||
describe('with all the data types present', () => {
|
|
||||||
it('provides settings, and combined stats/usage data', () => {
|
|
||||||
// default gives all the data types
|
|
||||||
const initial = getInitial();
|
|
||||||
const result = BulkUploader.combineStatsLegacy(initial);
|
|
||||||
expect(result).toEqual(getResult());
|
|
||||||
});
|
|
||||||
});
|
|
||||||
describe('with settings data missing', () => {
|
|
||||||
it('provides combined stats/usage data', () => {
|
|
||||||
// default gives all the data types
|
|
||||||
const initial = getInitial();
|
|
||||||
const trimmedInitial = [ initial[0], initial[1], initial[2] ]; // just stats, usage and reporting, no settings
|
|
||||||
const result = BulkUploader.combineStatsLegacy(trimmedInitial);
|
|
||||||
const expectedResult = getResult();
|
|
||||||
const trimmedExpectedResult = [ expectedResult[0] ]; // single combined item
|
|
||||||
expect(result).toEqual(trimmedExpectedResult);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
describe('with usage data missing', () => {
|
|
||||||
it('provides settings, and stats data', () => {
|
|
||||||
// default gives all the data types
|
|
||||||
const initial = getInitial();
|
|
||||||
const trimmedInitial = [ initial[0], initial[3] ]; // just stats and settings, no usage or reporting
|
|
||||||
const result = BulkUploader.combineStatsLegacy(trimmedInitial);
|
|
||||||
const expectedResult = getResult();
|
|
||||||
delete expectedResult[0][1].usage; // usage stats should not be present in the result
|
|
||||||
const trimmedExpectedResult = [ expectedResult[0], expectedResult[1] ];
|
|
||||||
expect(result).toEqual(trimmedExpectedResult);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
describe('with stats data missing', () => {
|
|
||||||
it('provides settings data', () => {
|
|
||||||
// default gives all the data types
|
|
||||||
const initial = getInitial();
|
|
||||||
const trimmedInitial = [ initial[3] ]; // just settings
|
|
||||||
const result = BulkUploader.combineStatsLegacy(trimmedInitial);
|
|
||||||
const expectedResult = getResult();
|
|
||||||
const trimmedExpectedResult = [ expectedResult[1] ]; // just settings
|
|
||||||
expect(result).toEqual(trimmedExpectedResult);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('throws an error if duplicate types are registered', () => {
|
|
||||||
const combineWithDuplicate = () => {
|
|
||||||
const initial = getInitial();
|
|
||||||
const withDuplicate = [ initial[0] ].concat(initial);
|
|
||||||
return BulkUploader.combineStatsLegacy(withDuplicate);
|
|
||||||
};
|
|
||||||
expect(combineWithDuplicate).toThrow(
|
|
||||||
'Duplicate collector type identifiers found in payload! ' +
|
|
||||||
'kibana_stats_monitoring,kibana_stats_monitoring,kibana,reporting,kibana_settings'
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
|
@ -4,19 +4,16 @@
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { get, set, isEmpty, flatten, uniq } from 'lodash';
|
import { defaultsDeep, isEmpty, uniq, compact } from 'lodash';
|
||||||
import { callClusterFactory } from '../../../xpack_main';
|
import { callClusterFactory } from '../../../xpack_main';
|
||||||
import {
|
import {
|
||||||
LOGGING_TAG,
|
LOGGING_TAG,
|
||||||
KIBANA_MONITORING_LOGGING_TAG,
|
KIBANA_MONITORING_LOGGING_TAG,
|
||||||
KIBANA_STATS_TYPE_MONITORING,
|
|
||||||
KIBANA_SETTINGS_TYPE,
|
|
||||||
KIBANA_USAGE_TYPE,
|
|
||||||
} from '../../common/constants';
|
} from '../../common/constants';
|
||||||
import { KIBANA_REPORTING_TYPE } from '../../../reporting/common/constants';
|
|
||||||
import {
|
import {
|
||||||
sendBulkPayload,
|
sendBulkPayload,
|
||||||
monitoringBulk,
|
monitoringBulk,
|
||||||
|
getKibanaInfoForStats,
|
||||||
} from './lib';
|
} from './lib';
|
||||||
|
|
||||||
const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
|
const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
|
||||||
|
@ -38,7 +35,7 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
|
||||||
* @param {Object} xpackInfo server.plugins.xpack_main.info object
|
* @param {Object} xpackInfo server.plugins.xpack_main.info object
|
||||||
*/
|
*/
|
||||||
export class BulkUploader {
|
export class BulkUploader {
|
||||||
constructor(server, { interval }) {
|
constructor(server, { kbnServer, interval }) {
|
||||||
if (typeof interval !== 'number') {
|
if (typeof interval !== 'number') {
|
||||||
throw new Error('interval number of milliseconds is required');
|
throw new Error('interval number of milliseconds is required');
|
||||||
}
|
}
|
||||||
|
@ -56,7 +53,7 @@ export class BulkUploader {
|
||||||
});
|
});
|
||||||
|
|
||||||
this._callClusterWithInternalUser = callClusterFactory(server).getCallClusterInternal();
|
this._callClusterWithInternalUser = callClusterFactory(server).getCallClusterInternal();
|
||||||
|
this._getKibanaInfoForStats = () => getKibanaInfoForStats(server, kbnServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -66,9 +63,12 @@ export class BulkUploader {
|
||||||
*/
|
*/
|
||||||
start(collectorSet) {
|
start(collectorSet) {
|
||||||
this._log.info('Starting monitoring stats collection');
|
this._log.info('Starting monitoring stats collection');
|
||||||
this._fetchAndUpload(collectorSet); // initial fetch
|
|
||||||
|
// this is internal bulk upload, so filter out API-only collectors
|
||||||
|
const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true);
|
||||||
|
this._fetchAndUpload(filterThem(collectorSet)); // initial fetch
|
||||||
this._timer = setInterval(() => {
|
this._timer = setInterval(() => {
|
||||||
this._fetchAndUpload(collectorSet);
|
this._fetchAndUpload(filterThem(collectorSet));
|
||||||
}, this._interval);
|
}, this._interval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ export class BulkUploader {
|
||||||
*/
|
*/
|
||||||
async _fetchAndUpload(collectorSet) {
|
async _fetchAndUpload(collectorSet) {
|
||||||
const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser);
|
const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser);
|
||||||
const payload = BulkUploader.toBulkUploadFormat(data);
|
const payload = this.toBulkUploadFormat(compact(data), collectorSet);
|
||||||
|
|
||||||
if (payload) {
|
if (payload) {
|
||||||
try {
|
try {
|
||||||
|
@ -120,15 +120,69 @@ export class BulkUploader {
|
||||||
/*
|
/*
|
||||||
* Bulk stats are transformed into a bulk upload format
|
* Bulk stats are transformed into a bulk upload format
|
||||||
* Non-legacy transformation is done in CollectorSet.toApiStats
|
* Non-legacy transformation is done in CollectorSet.toApiStats
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
* Before:
|
||||||
|
* [
|
||||||
|
* {
|
||||||
|
* "type": "kibana_stats",
|
||||||
|
* "result": {
|
||||||
|
* "process": { ... },
|
||||||
|
* "requests": { ... },
|
||||||
|
* ...
|
||||||
|
* }
|
||||||
|
* },
|
||||||
|
* ]
|
||||||
|
*
|
||||||
|
* After:
|
||||||
|
* [
|
||||||
|
* {
|
||||||
|
* "index": {
|
||||||
|
* "_type": "kibana_stats"
|
||||||
|
* }
|
||||||
|
* },
|
||||||
|
* {
|
||||||
|
* "kibana": {
|
||||||
|
* "host": "localhost",
|
||||||
|
* "uuid": "d619c5d1-4315-4f35-b69d-a3ac805489fb",
|
||||||
|
* "version": "7.0.0-alpha1",
|
||||||
|
* ...
|
||||||
|
* },
|
||||||
|
* "process": { ... },
|
||||||
|
* "requests": { ... },
|
||||||
|
* ...
|
||||||
|
* }
|
||||||
|
* ]
|
||||||
*/
|
*/
|
||||||
static toBulkUploadFormat(uploadData) {
|
toBulkUploadFormat(rawData, collectorSet) {
|
||||||
const payload = uploadData
|
if (rawData.length === 0) {
|
||||||
.filter(d => Boolean(d) && !isEmpty(d.result))
|
return;
|
||||||
.map(({ result, type }) => [{ index: { _type: type } }, result]);
|
|
||||||
if (payload.length > 0) {
|
|
||||||
const combinedData = BulkUploader.combineStatsLegacy(payload); // arrange the usage data into the stats
|
|
||||||
return flatten(combinedData);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// convert the raw data to a nested object by taking each payload through
|
||||||
|
// its formatter, organizing it per-type
|
||||||
|
const typesNested = rawData.reduce((accum, { type, result }) => {
|
||||||
|
if (isEmpty(result)) {
|
||||||
|
return accum;
|
||||||
|
}
|
||||||
|
const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result);
|
||||||
|
return defaultsDeep(accum, { [uploadType]: uploadData });
|
||||||
|
}, {});
|
||||||
|
|
||||||
|
// convert the nested object into a flat array, with each payload prefixed
|
||||||
|
// with an 'index' instruction, for bulk upload
|
||||||
|
const flat = Object.keys(typesNested).reduce((accum, type) => {
|
||||||
|
return [
|
||||||
|
...accum,
|
||||||
|
{ index: { _type: type } },
|
||||||
|
{
|
||||||
|
kibana: this._getKibanaInfoForStats(),
|
||||||
|
...typesNested[type],
|
||||||
|
}
|
||||||
|
];
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
return flat;
|
||||||
}
|
}
|
||||||
|
|
||||||
static checkPayloadTypesUnique(payload) {
|
static checkPayloadTypesUnique(payload) {
|
||||||
|
@ -138,45 +192,4 @@ export class BulkUploader {
|
||||||
throw new Error('Duplicate collector type identifiers found in payload! ' + ids.join(','));
|
throw new Error('Duplicate collector type identifiers found in payload! ' + ids.join(','));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static combineStatsLegacy(payload) {
|
|
||||||
BulkUploader.checkPayloadTypesUnique(payload);
|
|
||||||
|
|
||||||
// default the item to [] to allow destructuring
|
|
||||||
const findItem = type => payload.find(item => get(item, '[0].index._type') === type) || [];
|
|
||||||
|
|
||||||
// kibana usage and stats
|
|
||||||
let statsResult;
|
|
||||||
const [ statsHeader, statsPayload ] = findItem(KIBANA_STATS_TYPE_MONITORING);
|
|
||||||
const [ reportingHeader, reportingPayload ] = findItem(KIBANA_REPORTING_TYPE);
|
|
||||||
|
|
||||||
if (statsHeader && statsPayload) {
|
|
||||||
statsHeader.index._type = 'kibana_stats'; // HACK to convert kibana_stats_monitoring to just kibana_stats for bwc
|
|
||||||
const [ usageHeader, usagePayload ] = findItem(KIBANA_USAGE_TYPE);
|
|
||||||
const kibanaUsage = (usageHeader && usagePayload) ? usagePayload : null;
|
|
||||||
const reportingUsage = (reportingHeader && reportingPayload) ? reportingPayload : null;
|
|
||||||
statsResult = [ statsHeader, statsPayload ];
|
|
||||||
if (kibanaUsage) {
|
|
||||||
set(statsResult, '[1].usage', kibanaUsage);
|
|
||||||
}
|
|
||||||
if (reportingUsage) {
|
|
||||||
set(statsResult, '[1].usage.xpack.reporting', reportingUsage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// kibana settings
|
|
||||||
let settingsResult;
|
|
||||||
const [ settingsHeader, settingsPayload ] = findItem(KIBANA_SETTINGS_TYPE);
|
|
||||||
if (settingsHeader && settingsPayload) {
|
|
||||||
settingsResult = [ settingsHeader, settingsPayload ];
|
|
||||||
}
|
|
||||||
|
|
||||||
// return new payload with the combined data
|
|
||||||
// adds usage data to stats data
|
|
||||||
// strips usage out as a top-level type
|
|
||||||
const result = [ statsResult, settingsResult ];
|
|
||||||
|
|
||||||
// remove result items that are undefined
|
|
||||||
return result.filter(Boolean);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { get, snakeCase } from 'lodash';
|
import { get, snakeCase } from 'lodash';
|
||||||
import { KIBANA_USAGE_TYPE } from '../../../common/constants';
|
import { KIBANA_USAGE_TYPE, KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants';
|
||||||
|
|
||||||
const TYPES = [
|
const TYPES = [
|
||||||
'dashboard',
|
'dashboard',
|
||||||
|
@ -17,12 +17,13 @@ const TYPES = [
|
||||||
];
|
];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches saved object client counts by querying the saved object index
|
* Fetches saved object counts by querying the .kibana index
|
||||||
*/
|
*/
|
||||||
export function getKibanaUsageCollector(server) {
|
export function getKibanaUsageCollector(server) {
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_USAGE_TYPE,
|
type: KIBANA_USAGE_TYPE,
|
||||||
|
|
||||||
async fetch(callCluster) {
|
async fetch(callCluster) {
|
||||||
const index = server.config().get('kibana.index');
|
const index = server.config().get('kibana.index');
|
||||||
const savedObjectCountSearchParams = {
|
const savedObjectCountSearchParams = {
|
||||||
|
@ -60,6 +61,20 @@ export function getKibanaUsageCollector(server) {
|
||||||
}
|
}
|
||||||
}), {})
|
}), {})
|
||||||
};
|
};
|
||||||
|
},
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Format the response data into a model for internal upload
|
||||||
|
* 1. Make this data part of the "kibana_stats" type
|
||||||
|
* 2. Organize the payload in the usage namespace of the data payload (usage.index, etc)
|
||||||
|
*/
|
||||||
|
formatForBulkUpload: result => {
|
||||||
|
return {
|
||||||
|
type: KIBANA_STATS_TYPE_MONITORING,
|
||||||
|
payload: {
|
||||||
|
usage: result
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,12 +6,11 @@
|
||||||
|
|
||||||
import { KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants';
|
import { KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants';
|
||||||
import { opsBuffer } from './ops_buffer';
|
import { opsBuffer } from './ops_buffer';
|
||||||
import { getKibanaInfoForStats } from '../lib';
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize a collector for Kibana Ops Stats
|
* Initialize a collector for Kibana Ops Stats
|
||||||
*/
|
*/
|
||||||
export function getOpsStatsCollector(server, kbnServer) {
|
export function getOpsStatsCollector(server) {
|
||||||
let monitor;
|
let monitor;
|
||||||
const buffer = opsBuffer(server);
|
const buffer = opsBuffer(server);
|
||||||
const onOps = event => buffer.push(event);
|
const onOps = event => buffer.push(event);
|
||||||
|
@ -48,10 +47,7 @@ export function getOpsStatsCollector(server, kbnServer) {
|
||||||
type: KIBANA_STATS_TYPE_MONITORING,
|
type: KIBANA_STATS_TYPE_MONITORING,
|
||||||
init: start,
|
init: start,
|
||||||
fetch: () => {
|
fetch: () => {
|
||||||
return {
|
return buffer.flush();
|
||||||
kibana: getKibanaInfoForStats(server, kbnServer),
|
|
||||||
...buffer.flush()
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@
|
||||||
import { get } from 'lodash';
|
import { get } from 'lodash';
|
||||||
import { XPACK_DEFAULT_ADMIN_EMAIL_UI_SETTING } from '../../../../../server/lib/constants';
|
import { XPACK_DEFAULT_ADMIN_EMAIL_UI_SETTING } from '../../../../../server/lib/constants';
|
||||||
import { KIBANA_SETTINGS_TYPE } from '../../../common/constants';
|
import { KIBANA_SETTINGS_TYPE } from '../../../common/constants';
|
||||||
import { getKibanaInfoForStats } from '../lib';
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check if Cluster Alert email notifications is enabled in config
|
* Check if Cluster Alert email notifications is enabled in config
|
||||||
|
@ -54,14 +53,14 @@ export async function checkForEmailValue(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getSettingsCollector(server, kbnServer) {
|
export function getSettingsCollector(server) {
|
||||||
const config = server.config();
|
const config = server.config();
|
||||||
|
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
|
|
||||||
return collectorSet.makeStatsCollector({
|
return collectorSet.makeStatsCollector({
|
||||||
type: KIBANA_SETTINGS_TYPE,
|
type: KIBANA_SETTINGS_TYPE,
|
||||||
async fetch(callCluster) {
|
async fetch(callCluster) {
|
||||||
let kibanaSettingsData = null;
|
let kibanaSettingsData;
|
||||||
const defaultAdminEmail = await checkForEmailValue(config, callCluster);
|
const defaultAdminEmail = await checkForEmailValue(config, callCluster);
|
||||||
|
|
||||||
// skip everything if defaultAdminEmail === undefined
|
// skip everything if defaultAdminEmail === undefined
|
||||||
|
@ -79,16 +78,8 @@ export function getSettingsCollector(server, kbnServer) {
|
||||||
// remember the current email so that we can mark it as successful if the bulk does not error out
|
// remember the current email so that we can mark it as successful if the bulk does not error out
|
||||||
shouldUseNull = !!defaultAdminEmail;
|
shouldUseNull = !!defaultAdminEmail;
|
||||||
|
|
||||||
// return nothing when there was no result
|
// returns undefined if there was no result
|
||||||
let settingsDoc;
|
return kibanaSettingsData;
|
||||||
if (kibanaSettingsData !== null) {
|
|
||||||
settingsDoc = {
|
|
||||||
kibana: getKibanaInfoForStats(server, kbnServer),
|
|
||||||
...kibanaSettingsData
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
return settingsDoc;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,8 +27,14 @@ export function opsBuffer(server) {
|
||||||
},
|
},
|
||||||
|
|
||||||
flush() {
|
flush() {
|
||||||
|
let cloud; // a property that will be left out of the result if the details are undefined
|
||||||
|
const cloudDetails = cloudDetector.getCloudDetails();
|
||||||
|
if (cloudDetails != null) {
|
||||||
|
cloud = { cloud: cloudDetails };
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
cloud: cloudDetector.getCloudDetails(),
|
...cloud,
|
||||||
...eventRoller.flush()
|
...eventRoller.flush()
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,11 @@ import { BulkUploader } from './bulk_uploader';
|
||||||
* @param {Object} kbnServer manager of Kibana services - see `src/server/kbn_server` in Kibana core
|
* @param {Object} kbnServer manager of Kibana services - see `src/server/kbn_server` in Kibana core
|
||||||
* @param {Object} server HapiJS server instance
|
* @param {Object} server HapiJS server instance
|
||||||
*/
|
*/
|
||||||
export function initBulkUploader(_kbnServer, server) {
|
export function initBulkUploader(kbnServer, server) {
|
||||||
|
|
||||||
const config = server.config();
|
const config = server.config();
|
||||||
const interval = config.get('xpack.monitoring.kibana.collection.interval');
|
const interval = config.get('xpack.monitoring.kibana.collection.interval');
|
||||||
return new BulkUploader(server, {
|
return new BulkUploader(server, {
|
||||||
|
kbnServer,
|
||||||
interval
|
interval
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import { uniq } from 'lodash';
|
||||||
import { getExportTypesHandler } from './get_export_type_handler';
|
import { getExportTypesHandler } from './get_export_type_handler';
|
||||||
import { getReportCountsByParameter } from './get_reporting_type_counts';
|
import { getReportCountsByParameter } from './get_reporting_type_counts';
|
||||||
import { KIBANA_REPORTING_TYPE } from '../../common/constants';
|
import { KIBANA_REPORTING_TYPE } from '../../common/constants';
|
||||||
|
import { KIBANA_STATS_TYPE_MONITORING } from '../../../monitoring/common/constants';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {Object} ReportingUsageStats Almost all of these stats are optional.
|
* @typedef {Object} ReportingUsageStats Almost all of these stats are optional.
|
||||||
|
@ -117,6 +118,7 @@ export function getReportingUsageCollector(server) {
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_REPORTING_TYPE,
|
type: KIBANA_REPORTING_TYPE,
|
||||||
|
|
||||||
fetch: async callCluster => {
|
fetch: async callCluster => {
|
||||||
const xpackInfo = server.plugins.xpack_main.info;
|
const xpackInfo = server.plugins.xpack_main.info;
|
||||||
const config = server.config();
|
const config = server.config();
|
||||||
|
@ -147,6 +149,24 @@ export function getReportingUsageCollector(server) {
|
||||||
...statsOverLast7Days
|
...statsOverLast7Days
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
},
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Format the response data into a model for internal upload
|
||||||
|
* 1. Make this data part of the "kibana_stats" type
|
||||||
|
* 2. Organize the payload in the usage.xpack.reporting namespace of the data payload
|
||||||
|
*/
|
||||||
|
formatForBulkUpload: result => {
|
||||||
|
return {
|
||||||
|
type: KIBANA_STATS_TYPE_MONITORING,
|
||||||
|
payload: {
|
||||||
|
usage: {
|
||||||
|
xpack: {
|
||||||
|
reporting: result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue