[Monitoring/Telemetry] Force collectors to indicate when they are ready (#36153)
* Initial code to force collectors to indicate when they are ready * Add and fix tests * Remove debug * Add ready check in api call * Fix prettier complaints * Return 503 if not all collectors are ready * PR feedback * Add retry logic for usage collection in the reporting tests * Fix incorrect boomify usage * Fix more issues with the tests * Just add debug I guess * More debug * Try and handle this exception * Try and make the tests more defensive and remove console logs * Retry logic here too * Debug for the reporting tests failure * I don't like this, but lets see if it works * Move the retry logic into the collector set directly * Add support for this new collector * Localize this * This shouldn't be static on the class, but rather static for the entire runtime
This commit is contained in:
parent
54f53d1348
commit
c87e8811cb
|
@ -25,6 +25,7 @@ export function makeKQLUsageCollector(server) {
|
||||||
const kqlUsageCollector = server.usage.collectorSet.makeUsageCollector({
|
const kqlUsageCollector = server.usage.collectorSet.makeUsageCollector({
|
||||||
type: 'kql',
|
type: 'kql',
|
||||||
fetch,
|
fetch,
|
||||||
|
isReady: () => true,
|
||||||
});
|
});
|
||||||
|
|
||||||
server.usage.collectorSet.register(kqlUsageCollector);
|
server.usage.collectorSet.register(kqlUsageCollector);
|
||||||
|
|
|
@ -52,6 +52,7 @@ export function registerUiMetricUsageCollector(server: any) {
|
||||||
|
|
||||||
return uiMetricsByAppName;
|
return uiMetricsByAppName;
|
||||||
},
|
},
|
||||||
|
isReady: () => true,
|
||||||
});
|
});
|
||||||
|
|
||||||
server.usage.collectorSet.register(collector);
|
server.usage.collectorSet.register(collector);
|
||||||
|
|
|
@ -175,6 +175,10 @@ export default () => Joi.object({
|
||||||
pollInterval: Joi.number().default(1500),
|
pollInterval: Joi.number().default(1500),
|
||||||
}).default(),
|
}).default(),
|
||||||
|
|
||||||
|
stats: Joi.object({
|
||||||
|
maximumWaitTimeForAllCollectorsInS: Joi.number().default(60)
|
||||||
|
}).default(),
|
||||||
|
|
||||||
optimize: Joi.object({
|
optimize: Joi.object({
|
||||||
enabled: Joi.boolean().default(true),
|
enabled: Joi.boolean().default(true),
|
||||||
bundleFilter: Joi.string().default('!tests'),
|
bundleFilter: Joi.string().default('!tests'),
|
||||||
|
|
|
@ -36,6 +36,7 @@ export function makeSampleDataUsageCollector(server: KbnServer) {
|
||||||
server.usage.collectorSet.makeUsageCollector({
|
server.usage.collectorSet.makeUsageCollector({
|
||||||
type: 'sample-data',
|
type: 'sample-data',
|
||||||
fetch: fetchProvider(index),
|
fetch: fetchProvider(index),
|
||||||
|
isReady: () => true,
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ export function getOpsStatsCollector(server, kbnServer) {
|
||||||
...kbnServer.metrics // latest metrics captured from the ops event listener in src/legacy/server/status/index
|
...kbnServer.metrics // latest metrics captured from the ops event listener in src/legacy/server/status/index
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
|
isReady: () => true,
|
||||||
ignoreForInternalUploader: true, // Ignore this one from internal uploader. A different stats collector is used there.
|
ignoreForInternalUploader: true, // Ignore this one from internal uploader. A different stats collector is used there.
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,10 +18,15 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import Joi from 'joi';
|
import Joi from 'joi';
|
||||||
import { boomify } from 'boom';
|
import boom from 'boom';
|
||||||
|
import { i18n } from '@kbn/i18n';
|
||||||
import { wrapAuthConfig } from '../../wrap_auth_config';
|
import { wrapAuthConfig } from '../../wrap_auth_config';
|
||||||
import { KIBANA_STATS_TYPE } from '../../constants';
|
import { KIBANA_STATS_TYPE } from '../../constants';
|
||||||
|
|
||||||
|
const STATS_NOT_READY_MESSAGE = i18n.translate('server.stats.notReadyMessage', {
|
||||||
|
defaultMessage: 'Stats are not ready yet. Please try again later.',
|
||||||
|
});
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* API for Kibana meta info and accumulated operations stats
|
* API for Kibana meta info and accumulated operations stats
|
||||||
* Including ?extended in the query string fetches Elasticsearch cluster_uuid and server.usage.collectorSet data
|
* Including ?extended in the query string fetches Elasticsearch cluster_uuid and server.usage.collectorSet data
|
||||||
|
@ -69,6 +74,11 @@ export function registerStatsApi(kbnServer, server, config) {
|
||||||
if (isExtended) {
|
if (isExtended) {
|
||||||
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('admin');
|
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('admin');
|
||||||
const callCluster = (...args) => callWithRequest(req, ...args);
|
const callCluster = (...args) => callWithRequest(req, ...args);
|
||||||
|
const collectorsReady = await collectorSet.areAllCollectorsReady();
|
||||||
|
|
||||||
|
if (shouldGetUsage && !collectorsReady) {
|
||||||
|
return boom.serverUnavailable(STATS_NOT_READY_MESSAGE);
|
||||||
|
}
|
||||||
|
|
||||||
const usagePromise = shouldGetUsage ? getUsage(callCluster) : Promise.resolve();
|
const usagePromise = shouldGetUsage ? getUsage(callCluster) : Promise.resolve();
|
||||||
try {
|
try {
|
||||||
|
@ -77,7 +87,6 @@ export function registerStatsApi(kbnServer, server, config) {
|
||||||
getClusterUuid(callCluster),
|
getClusterUuid(callCluster),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
|
||||||
let modifiedUsage = usage;
|
let modifiedUsage = usage;
|
||||||
if (isLegacy) {
|
if (isLegacy) {
|
||||||
// In an effort to make telemetry more easily augmented, we need to ensure
|
// In an effort to make telemetry more easily augmented, we need to ensure
|
||||||
|
@ -123,7 +132,7 @@ export function registerStatsApi(kbnServer, server, config) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
throw boomify(e);
|
throw boom.boomify(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,6 +140,9 @@ export function registerStatsApi(kbnServer, server, config) {
|
||||||
* for health-checking Kibana and fetch does not rely on fetching data
|
* for health-checking Kibana and fetch does not rely on fetching data
|
||||||
* from ES */
|
* from ES */
|
||||||
const kibanaStatsCollector = collectorSet.getCollectorByType(KIBANA_STATS_TYPE);
|
const kibanaStatsCollector = collectorSet.getCollectorByType(KIBANA_STATS_TYPE);
|
||||||
|
if (!await kibanaStatsCollector.isReady()) {
|
||||||
|
return boom.serverUnavailable(STATS_NOT_READY_MESSAGE);
|
||||||
|
}
|
||||||
let kibanaStats = await kibanaStatsCollector.fetch();
|
let kibanaStats = await kibanaStatsCollector.fetch();
|
||||||
kibanaStats = collectorSet.toApiFieldNames(kibanaStats);
|
kibanaStats = collectorSet.toApiFieldNames(kibanaStats);
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ export class Collector {
|
||||||
* @param {Function} options.formatForBulkUpload - optional
|
* @param {Function} options.formatForBulkUpload - optional
|
||||||
* @param {Function} options.rest - optional other properties
|
* @param {Function} options.rest - optional other properties
|
||||||
*/
|
*/
|
||||||
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
|
constructor(server, { type, init, fetch, formatForBulkUpload = null, isReady = null, ...options } = {}) {
|
||||||
if (type === undefined) {
|
if (type === undefined) {
|
||||||
throw new Error('Collector must be instantiated with a options.type string property');
|
throw new Error('Collector must be instantiated with a options.type string property');
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,9 @@ export class Collector {
|
||||||
|
|
||||||
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
|
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
|
||||||
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
|
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
|
||||||
|
if (typeof isReady === 'function') {
|
||||||
|
this.isReady = isReady;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -69,4 +72,8 @@ export class Collector {
|
||||||
formatForBulkUpload(result) {
|
formatForBulkUpload(result) {
|
||||||
return this._formatForBulkUpload(result);
|
return this._formatForBulkUpload(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isReady() {
|
||||||
|
throw `isReady() must be implemented in ${this.type} collector`;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,18 +23,19 @@ import { getCollectorLogger } from '../lib';
|
||||||
import { Collector } from './collector';
|
import { Collector } from './collector';
|
||||||
import { UsageCollector } from './usage_collector';
|
import { UsageCollector } from './usage_collector';
|
||||||
|
|
||||||
|
let _waitingForAllCollectorsTimestamp = null;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A collector object has types registered into it with the register(type)
|
* A collector object has types registered into it with the register(type)
|
||||||
* function. Each type that gets registered defines how to fetch its own data
|
* function. Each type that gets registered defines how to fetch its own data
|
||||||
* and optionally, how to combine it into a unified payload for bulk upload.
|
* and optionally, how to combine it into a unified payload for bulk upload.
|
||||||
*/
|
*/
|
||||||
export class CollectorSet {
|
export class CollectorSet {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @param {Object} server - server object
|
* @param {Object} server - server object
|
||||||
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
|
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
|
||||||
*/
|
*/
|
||||||
constructor(server, collectors = []) {
|
constructor(server, collectors = [], config = null) {
|
||||||
this._log = getCollectorLogger(server);
|
this._log = getCollectorLogger(server);
|
||||||
this._collectors = collectors;
|
this._collectors = collectors;
|
||||||
|
|
||||||
|
@ -44,7 +45,9 @@ export class CollectorSet {
|
||||||
*/
|
*/
|
||||||
this.makeStatsCollector = options => new Collector(server, options);
|
this.makeStatsCollector = options => new Collector(server, options);
|
||||||
this.makeUsageCollector = options => new UsageCollector(server, options);
|
this.makeUsageCollector = options => new UsageCollector(server, options);
|
||||||
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
|
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray, config);
|
||||||
|
|
||||||
|
this._maximumWaitTimeForAllCollectorsInS = config ? config.get('stats.maximumWaitTimeForAllCollectorsInS') : 60;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -73,6 +76,40 @@ export class CollectorSet {
|
||||||
return x instanceof UsageCollector;
|
return x instanceof UsageCollector;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async areAllCollectorsReady(collectorSet = this) {
|
||||||
|
if (!(collectorSet instanceof CollectorSet)) {
|
||||||
|
throw new Error(`areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
const collectorTypesNotReady = [];
|
||||||
|
let allReady = true;
|
||||||
|
await collectorSet.asyncEach(async collector => {
|
||||||
|
if (!await collector.isReady()) {
|
||||||
|
allReady = false;
|
||||||
|
collectorTypesNotReady.push(collector.type);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!allReady && this._maximumWaitTimeForAllCollectorsInS >= 0) {
|
||||||
|
const nowTimestamp = +new Date();
|
||||||
|
_waitingForAllCollectorsTimestamp = _waitingForAllCollectorsTimestamp || nowTimestamp;
|
||||||
|
const timeWaitedInMS = nowTimestamp - _waitingForAllCollectorsTimestamp;
|
||||||
|
const timeLeftInMS = (this._maximumWaitTimeForAllCollectorsInS * 1000) - timeWaitedInMS;
|
||||||
|
if (timeLeftInMS <= 0) {
|
||||||
|
this._log.debug(`All collectors are not ready (waiting for ${collectorTypesNotReady.join(',')}) `
|
||||||
|
+ `but we have waited the required `
|
||||||
|
+ `${this._maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.`);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
this._log.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
_waitingForAllCollectorsTimestamp = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return allReady;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Call a bunch of fetch methods and then do them in bulk
|
* Call a bunch of fetch methods and then do them in bulk
|
||||||
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
|
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
|
||||||
|
@ -155,4 +192,14 @@ export class CollectorSet {
|
||||||
map(mapFn) {
|
map(mapFn) {
|
||||||
return this._collectors.map(mapFn);
|
return this._collectors.map(mapFn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
some(someFn) {
|
||||||
|
return this._collectors.some(someFn);
|
||||||
|
}
|
||||||
|
|
||||||
|
async asyncEach(eachFn) {
|
||||||
|
for (const collector of this._collectors) {
|
||||||
|
await eachFn(collector);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,8 @@
|
||||||
|
|
||||||
import { CollectorSet } from './classes';
|
import { CollectorSet } from './classes';
|
||||||
|
|
||||||
export function usageMixin(kbnServer, server) {
|
export function usageMixin(kbnServer, server, config) {
|
||||||
const collectorSet = new CollectorSet(server);
|
const collectorSet = new CollectorSet(server, undefined, config);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* expose the collector set object on the server
|
* expose the collector set object on the server
|
||||||
|
|
|
@ -37,7 +37,8 @@ export function makeApmUsageCollector(core: CoreSetupWithUsageCollector) {
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return createApmTelementry();
|
return createApmTelementry();
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
isReady: () => true
|
||||||
});
|
});
|
||||||
server.usage.collectorSet.register(apmUsageCollector);
|
server.usage.collectorSet.register(apmUsageCollector);
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,6 +136,7 @@ export function registerCanvasUsageCollector(server) {
|
||||||
const index = server.config().get('kibana.index');
|
const index = server.config().get('kibana.index');
|
||||||
const collector = server.usage.collectorSet.makeUsageCollector({
|
const collector = server.usage.collectorSet.makeUsageCollector({
|
||||||
type: CANVAS_USAGE_TYPE,
|
type: CANVAS_USAGE_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
fetch: async callCluster => {
|
fetch: async callCluster => {
|
||||||
const searchParams = {
|
const searchParams = {
|
||||||
size: 10000, // elasticsearch index.max_result_window default value
|
size: 10000, // elasticsearch index.max_result_window default value
|
||||||
|
|
|
@ -37,6 +37,7 @@ export function getCloudUsageCollector(server: KibanaHapiServer) {
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_CLOUD_STATS_TYPE,
|
type: KIBANA_CLOUD_STATS_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
fetch: createCollectorFetch(server),
|
fetch: createCollectorFetch(server),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ export class UsageCollector {
|
||||||
|
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_REPORTING_TYPE,
|
type: KIBANA_REPORTING_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
fetch: async () => {
|
fetch: async () => {
|
||||||
return this.getReport();
|
return this.getReport();
|
||||||
},
|
},
|
||||||
|
|
|
@ -13,37 +13,63 @@ export function initTelemetryCollection(server) {
|
||||||
registerMapsUsageCollector(server);
|
registerMapsUsageCollector(server);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function buildCollectorObj(server) {
|
async function isTaskManagerReady(server) {
|
||||||
return {
|
const result = await fetch(server);
|
||||||
type: 'maps',
|
return result !== null;
|
||||||
fetch: async () => {
|
}
|
||||||
let docs;
|
|
||||||
try {
|
async function fetch(server) {
|
||||||
({ docs } = await server.taskManager.fetch({
|
let docs;
|
||||||
query: {
|
try {
|
||||||
bool: {
|
({ docs } = await server.taskManager.fetch({
|
||||||
filter: {
|
query: {
|
||||||
term: {
|
bool: {
|
||||||
_id: TASK_ID
|
filter: {
|
||||||
}
|
term: {
|
||||||
}
|
_id: TASK_ID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}));
|
|
||||||
} catch (err) {
|
|
||||||
const errMessage = err && err.message ? err.message : err.toString();
|
|
||||||
/*
|
|
||||||
* The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager
|
|
||||||
* has to wait for all plugins to initialize first.
|
|
||||||
* It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error)
|
|
||||||
*/
|
|
||||||
if (errMessage.indexOf('NotInitialized') >= 0) {
|
|
||||||
docs = {};
|
|
||||||
} else {
|
|
||||||
throw err;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}));
|
||||||
|
} catch (err) {
|
||||||
|
const errMessage = err && err.message ? err.message : err.toString();
|
||||||
|
/*
|
||||||
|
* The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager
|
||||||
|
* has to wait for all plugins to initialize first.
|
||||||
|
* It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error)
|
||||||
|
*/
|
||||||
|
if (errMessage.indexOf('NotInitialized') >= 0) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return docs;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildCollectorObj(server) {
|
||||||
|
let isCollectorReady = false;
|
||||||
|
async function determineIfTaskManagerIsReady() {
|
||||||
|
let isReady = false;
|
||||||
|
try {
|
||||||
|
isReady = await isTaskManagerReady(server);
|
||||||
|
} catch (err) {} // eslint-disable-line
|
||||||
|
|
||||||
|
if (isReady) {
|
||||||
|
isCollectorReady = true;
|
||||||
|
} else {
|
||||||
|
setTimeout(determineIfTaskManagerIsReady, 500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
determineIfTaskManagerIsReady();
|
||||||
|
|
||||||
|
return {
|
||||||
|
type: 'maps',
|
||||||
|
isReady: () => isCollectorReady,
|
||||||
|
fetch: async () => {
|
||||||
|
const docs = await fetch(server);
|
||||||
return _.get(docs, '[0].state.stats');
|
return _.get(docs, '[0].state.stats');
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
|
@ -27,6 +27,7 @@ interface KibanaHapiServer extends Server {
|
||||||
export function makeMlUsageCollector(server: KibanaHapiServer): void {
|
export function makeMlUsageCollector(server: KibanaHapiServer): void {
|
||||||
const mlUsageCollector = server.usage.collectorSet.makeUsageCollector({
|
const mlUsageCollector = server.usage.collectorSet.makeUsageCollector({
|
||||||
type: 'ml',
|
type: 'ml',
|
||||||
|
isReady: () => true,
|
||||||
fetch: async (): Promise<MlTelemetry> => {
|
fetch: async (): Promise<MlTelemetry> => {
|
||||||
try {
|
try {
|
||||||
const savedObjectsClient = getSavedObjectsClient(server);
|
const savedObjectsClient = getSavedObjectsClient(server);
|
||||||
|
|
|
@ -20,6 +20,9 @@ class MockCollectorSet {
|
||||||
isUsageCollector(x) {
|
isUsageCollector(x) {
|
||||||
return !!x.isUsageCollector;
|
return !!x.isUsageCollector;
|
||||||
}
|
}
|
||||||
|
areAllCollectorsReady() {
|
||||||
|
return this.mockCollectors.every(collector => collector.isReady());
|
||||||
|
}
|
||||||
getCollectorByType(type) {
|
getCollectorByType(type) {
|
||||||
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
|
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
|
||||||
}
|
}
|
||||||
|
@ -29,6 +32,9 @@ class MockCollectorSet {
|
||||||
async bulkFetch() {
|
async bulkFetch() {
|
||||||
return this.mockCollectors.map(({ fetch }) => fetch());
|
return this.mockCollectors.map(({ fetch }) => fetch());
|
||||||
}
|
}
|
||||||
|
some(someFn) {
|
||||||
|
return this.mockCollectors.some(someFn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
describe('BulkUploader', () => {
|
describe('BulkUploader', () => {
|
||||||
|
@ -61,6 +67,7 @@ describe('BulkUploader', () => {
|
||||||
{
|
{
|
||||||
type: 'type_collector_test',
|
type: 'type_collector_test',
|
||||||
fetch: noop, // empty payloads,
|
fetch: noop, // empty payloads,
|
||||||
|
isReady: () => true,
|
||||||
formatForBulkUpload: result => result,
|
formatForBulkUpload: result => result,
|
||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
@ -94,10 +101,56 @@ describe('BulkUploader', () => {
|
||||||
}, CHECK_DELAY);
|
}, CHECK_DELAY);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should not upload if some collectors are not ready', done => {
|
||||||
|
const collectors = new MockCollectorSet(server, [
|
||||||
|
{
|
||||||
|
type: 'type_collector_test',
|
||||||
|
fetch: noop, // empty payloads,
|
||||||
|
isReady: () => false,
|
||||||
|
formatForBulkUpload: result => result,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
type: 'type_collector_test2',
|
||||||
|
fetch: noop, // empty payloads,
|
||||||
|
isReady: () => true,
|
||||||
|
formatForBulkUpload: result => result,
|
||||||
|
}
|
||||||
|
]);
|
||||||
|
|
||||||
|
const uploader = new BulkUploader(server, {
|
||||||
|
interval: FETCH_INTERVAL
|
||||||
|
});
|
||||||
|
|
||||||
|
uploader.start(collectors);
|
||||||
|
|
||||||
|
// allow interval to tick a few times
|
||||||
|
setTimeout(() => {
|
||||||
|
uploader.stop();
|
||||||
|
|
||||||
|
const loggingCalls = server.log.getCalls();
|
||||||
|
expect(loggingCalls.length).to.be.greaterThan(2); // should be 3-5: start, fetch, skip, fetch, skip
|
||||||
|
expect(loggingCalls[0].args).to.eql([
|
||||||
|
['info', 'monitoring', 'kibana-monitoring'],
|
||||||
|
'Starting monitoring stats collection',
|
||||||
|
]);
|
||||||
|
expect(loggingCalls[1].args).to.eql([
|
||||||
|
['debug', 'monitoring', 'kibana-monitoring'],
|
||||||
|
'Skipping bulk uploading because not all collectors are ready',
|
||||||
|
]);
|
||||||
|
expect(loggingCalls[loggingCalls.length - 1].args).to.eql([
|
||||||
|
['info', 'monitoring', 'kibana-monitoring'],
|
||||||
|
'Monitoring stats collection is stopped',
|
||||||
|
]);
|
||||||
|
|
||||||
|
done();
|
||||||
|
}, CHECK_DELAY);
|
||||||
|
});
|
||||||
|
|
||||||
it('should run the bulk upload handler', done => {
|
it('should run the bulk upload handler', done => {
|
||||||
const collectors = new MockCollectorSet(server, [
|
const collectors = new MockCollectorSet(server, [
|
||||||
{
|
{
|
||||||
fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }),
|
fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }),
|
||||||
|
isReady: () => true,
|
||||||
formatForBulkUpload: result => result
|
formatForBulkUpload: result => result
|
||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
@ -135,11 +188,13 @@ describe('BulkUploader', () => {
|
||||||
const collectors = new MockCollectorSet(server, [
|
const collectors = new MockCollectorSet(server, [
|
||||||
{
|
{
|
||||||
fetch: usageCollectorFetch,
|
fetch: usageCollectorFetch,
|
||||||
|
isReady: () => true,
|
||||||
formatForBulkUpload: result => result,
|
formatForBulkUpload: result => result,
|
||||||
isUsageCollector: true,
|
isUsageCollector: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
fetch: collectorFetch,
|
fetch: collectorFetch,
|
||||||
|
isReady: () => true,
|
||||||
formatForBulkUpload: result => result,
|
formatForBulkUpload: result => result,
|
||||||
isUsageCollector: false,
|
isUsageCollector: false,
|
||||||
}
|
}
|
||||||
|
@ -166,11 +221,13 @@ describe('BulkUploader', () => {
|
||||||
const collectors = new MockCollectorSet(server, [
|
const collectors = new MockCollectorSet(server, [
|
||||||
{
|
{
|
||||||
fetch: usageCollectorFetch,
|
fetch: usageCollectorFetch,
|
||||||
|
isReady: () => true,
|
||||||
formatForBulkUpload: result => result,
|
formatForBulkUpload: result => result,
|
||||||
isUsageCollector: true,
|
isUsageCollector: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
fetch: collectorFetch,
|
fetch: collectorFetch,
|
||||||
|
isReady: () => true,
|
||||||
formatForBulkUpload: result => result,
|
formatForBulkUpload: result => result,
|
||||||
isUsageCollector: false,
|
isUsageCollector: false,
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,6 @@ export class BulkUploader {
|
||||||
this._log.info('Starting monitoring stats collection');
|
this._log.info('Starting monitoring stats collection');
|
||||||
const filterCollectorSet = _collectorSet => {
|
const filterCollectorSet = _collectorSet => {
|
||||||
const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
|
const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
|
||||||
this._lastFetchWithUsage = !filterUsage;
|
|
||||||
if (!filterUsage) {
|
if (!filterUsage) {
|
||||||
this._lastFetchUsageTime = Date.now();
|
this._lastFetchUsageTime = Date.now();
|
||||||
}
|
}
|
||||||
|
@ -123,6 +122,16 @@ export class BulkUploader {
|
||||||
* @return {Promise} - resolves to undefined
|
* @return {Promise} - resolves to undefined
|
||||||
*/
|
*/
|
||||||
async _fetchAndUpload(collectorSet) {
|
async _fetchAndUpload(collectorSet) {
|
||||||
|
const collectorsReady = await collectorSet.areAllCollectorsReady();
|
||||||
|
if (!collectorsReady) {
|
||||||
|
this._log.debug('Skipping bulk uploading because not all collectors are ready');
|
||||||
|
if (collectorSet.some(collectorSet.isUsageCollector)) {
|
||||||
|
this._lastFetchUsageTime = null;
|
||||||
|
this._log.debug('Resetting lastFetchWithUsage because not all collectors are ready');
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser);
|
const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser);
|
||||||
const payload = this.toBulkUploadFormat(compact(data), collectorSet);
|
const payload = this.toBulkUploadFormat(compact(data), collectorSet);
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ export function getKibanaUsageCollector(server) {
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_USAGE_TYPE,
|
type: KIBANA_USAGE_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
async fetch(callCluster) {
|
async fetch(callCluster) {
|
||||||
const index = server.config().get('kibana.index');
|
const index = server.config().get('kibana.index');
|
||||||
const savedObjectCountSearchParams = {
|
const savedObjectCountSearchParams = {
|
||||||
|
|
|
@ -80,6 +80,7 @@ export function getOpsStatsCollector(server, kbnServer) {
|
||||||
return collectorSet.makeStatsCollector({
|
return collectorSet.makeStatsCollector({
|
||||||
type: KIBANA_STATS_TYPE_MONITORING,
|
type: KIBANA_STATS_TYPE_MONITORING,
|
||||||
init: opsMonitor.start,
|
init: opsMonitor.start,
|
||||||
|
isReady: () => buffer.hasEvents(),
|
||||||
fetch: async () => {
|
fetch: async () => {
|
||||||
return await buffer.flush();
|
return await buffer.flush();
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,6 +86,7 @@ export function getSettingsCollector(server) {
|
||||||
|
|
||||||
return collectorSet.makeStatsCollector({
|
return collectorSet.makeStatsCollector({
|
||||||
type: KIBANA_SETTINGS_TYPE,
|
type: KIBANA_SETTINGS_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
async fetch(callCluster) {
|
async fetch(callCluster) {
|
||||||
let kibanaSettingsData;
|
let kibanaSettingsData;
|
||||||
const defaultAdminEmail = await checkForEmailValue(config, callCluster, this.log);
|
const defaultAdminEmail = await checkForEmailValue(config, callCluster, this.log);
|
||||||
|
|
|
@ -22,6 +22,10 @@ export class EventRoller {
|
||||||
return get(this.rollup, path);
|
return get(this.rollup, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hasEvents() {
|
||||||
|
return this.rollup !== null;
|
||||||
|
}
|
||||||
|
|
||||||
rollupEvent(event) {
|
rollupEvent(event) {
|
||||||
const heapStats = v8.getHeapStatistics();
|
const heapStats = v8.getHeapStatistics();
|
||||||
const requests = mapRequests(event.requests);
|
const requests = mapRequests(event.requests);
|
||||||
|
|
|
@ -29,6 +29,10 @@ export function opsBuffer(server) {
|
||||||
server.log(['debug', LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG], 'Received Kibana Ops event data');
|
server.log(['debug', LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG], 'Received Kibana Ops event data');
|
||||||
},
|
},
|
||||||
|
|
||||||
|
hasEvents() {
|
||||||
|
return eventRoller.hasEvents();
|
||||||
|
},
|
||||||
|
|
||||||
async flush() {
|
async flush() {
|
||||||
let cloud; // a property that will be left out of the result if the details are undefined
|
let cloud; // a property that will be left out of the result if the details are undefined
|
||||||
const cloudDetails = cloudDetector.getCloudDetails();
|
const cloudDetails = cloudDetector.getCloudDetails();
|
||||||
|
|
|
@ -8,30 +8,55 @@ import { get } from 'lodash';
|
||||||
import { HapiServer } from '../../../../';
|
import { HapiServer } from '../../../../';
|
||||||
import { PLUGIN_ID, VIS_TELEMETRY_TASK, VIS_USAGE_TYPE } from '../../../../constants';
|
import { PLUGIN_ID, VIS_TELEMETRY_TASK, VIS_USAGE_TYPE } from '../../../../constants';
|
||||||
|
|
||||||
export function getUsageCollector(server: HapiServer) {
|
async function isTaskManagerReady(server: HapiServer) {
|
||||||
|
const result = await fetch(server);
|
||||||
|
return result !== null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function fetch(server: HapiServer) {
|
||||||
const { taskManager } = server;
|
const { taskManager } = server;
|
||||||
|
|
||||||
|
let docs;
|
||||||
|
try {
|
||||||
|
({ docs } = await taskManager.fetch({
|
||||||
|
query: { bool: { filter: { term: { _id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}` } } } },
|
||||||
|
}));
|
||||||
|
} catch (err) {
|
||||||
|
const errMessage = err && err.message ? err.message : err.toString();
|
||||||
|
/*
|
||||||
|
The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager has to wait for all plugins to initialize first. It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error)
|
||||||
|
*/
|
||||||
|
if (errMessage.includes('NotInitialized')) {
|
||||||
|
docs = null;
|
||||||
|
} else {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return docs;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getUsageCollector(server: HapiServer) {
|
||||||
|
let isCollectorReady = false;
|
||||||
|
async function determineIfTaskManagerIsReady() {
|
||||||
|
let isReady = false;
|
||||||
|
try {
|
||||||
|
isReady = await isTaskManagerReady(server);
|
||||||
|
} catch (err) {} // eslint-disable-line
|
||||||
|
|
||||||
|
if (isReady) {
|
||||||
|
isCollectorReady = true;
|
||||||
|
} else {
|
||||||
|
setTimeout(determineIfTaskManagerIsReady, 500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
determineIfTaskManagerIsReady();
|
||||||
|
|
||||||
return {
|
return {
|
||||||
type: VIS_USAGE_TYPE,
|
type: VIS_USAGE_TYPE,
|
||||||
|
isReady: () => isCollectorReady,
|
||||||
fetch: async () => {
|
fetch: async () => {
|
||||||
let docs;
|
const docs = await fetch(server);
|
||||||
try {
|
|
||||||
({ docs } = await taskManager.fetch({
|
|
||||||
query: { bool: { filter: { term: { _id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}` } } } },
|
|
||||||
}));
|
|
||||||
} catch (err) {
|
|
||||||
const errMessage = err && err.message ? err.message : err.toString();
|
|
||||||
/*
|
|
||||||
* The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager
|
|
||||||
* has to wait for all plugins to initialize first.
|
|
||||||
* It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error)
|
|
||||||
*/
|
|
||||||
if (errMessage.includes('NotInitialized')) {
|
|
||||||
docs = {};
|
|
||||||
} else {
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the accumulated state from the recurring task
|
// get the accumulated state from the recurring task
|
||||||
return get(docs, '[0].state.stats');
|
return get(docs, '[0].state.stats');
|
||||||
},
|
},
|
||||||
|
|
|
@ -151,6 +151,11 @@ export const reporting = (kibana) => {
|
||||||
},
|
},
|
||||||
|
|
||||||
init: async function (server) {
|
init: async function (server) {
|
||||||
|
let isCollectorReady = false;
|
||||||
|
const isReady = () => isCollectorReady;
|
||||||
|
// Register a function with server to manage the collection of usage stats
|
||||||
|
server.usage.collectorSet.register(getReportingUsageCollector(server, isReady));
|
||||||
|
|
||||||
const exportTypesRegistry = await exportTypesRegistryFactory(server);
|
const exportTypesRegistry = await exportTypesRegistryFactory(server);
|
||||||
const browserFactory = await createBrowserDriverFactory(server);
|
const browserFactory = await createBrowserDriverFactory(server);
|
||||||
server.expose('exportTypesRegistry', exportTypesRegistry);
|
server.expose('exportTypesRegistry', exportTypesRegistry);
|
||||||
|
@ -172,8 +177,8 @@ export const reporting = (kibana) => {
|
||||||
xpackMainPlugin.info.feature(this.id).registerLicenseCheckResultsGenerator(checkLicense);
|
xpackMainPlugin.info.feature(this.id).registerLicenseCheckResultsGenerator(checkLicense);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Register a function with server to manage the collection of usage stats
|
// Post initialization of the above code, the collector is now ready to fetch its data
|
||||||
server.usage.collectorSet.register(getReportingUsageCollector(server));
|
isCollectorReady = true;
|
||||||
|
|
||||||
server.expose('browserDriverFactory', browserFactory);
|
server.expose('browserDriverFactory', browserFactory);
|
||||||
server.expose('queue', createQueueFactory(server));
|
server.expose('queue', createQueueFactory(server));
|
||||||
|
|
|
@ -114,11 +114,11 @@ async function getReportingUsageWithinRange(callCluster, server, reportingAvaila
|
||||||
* @param {Object} server
|
* @param {Object} server
|
||||||
* @return {Object} kibana usage stats type collection object
|
* @return {Object} kibana usage stats type collection object
|
||||||
*/
|
*/
|
||||||
export function getReportingUsageCollector(server) {
|
export function getReportingUsageCollector(server, isReady) {
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_REPORTING_TYPE,
|
type: KIBANA_REPORTING_TYPE,
|
||||||
|
isReady,
|
||||||
fetch: async callCluster => {
|
fetch: async callCluster => {
|
||||||
const xpackInfo = server.plugins.xpack_main.info;
|
const xpackInfo = server.plugins.xpack_main.info;
|
||||||
const config = server.config();
|
const config = server.config();
|
||||||
|
|
|
@ -168,6 +168,7 @@ export function registerRollupUsageCollector(server) {
|
||||||
|
|
||||||
const collector = server.usage.collectorSet.makeUsageCollector({
|
const collector = server.usage.collectorSet.makeUsageCollector({
|
||||||
type: ROLLUP_USAGE_TYPE,
|
type: ROLLUP_USAGE_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
fetch: async callCluster => {
|
fetch: async callCluster => {
|
||||||
const rollupIndexPatterns = await fetchRollupIndexPatterns(kibanaIndex, callCluster);
|
const rollupIndexPatterns = await fetchRollupIndexPatterns(kibanaIndex, callCluster);
|
||||||
const rollupIndexPatternToFlagMap = createIdToFlagMap(rollupIndexPatterns);
|
const rollupIndexPatternToFlagMap = createIdToFlagMap(rollupIndexPatterns);
|
||||||
|
|
|
@ -98,6 +98,7 @@ export function getSpacesUsageCollector(server: any) {
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_SPACES_STATS_TYPE,
|
type: KIBANA_SPACES_STATS_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
fetch: async (callCluster: any) => {
|
fetch: async (callCluster: any) => {
|
||||||
const xpackInfo = server.plugins.xpack_main.info;
|
const xpackInfo = server.plugins.xpack_main.info;
|
||||||
const config = server.config();
|
const config = server.config();
|
||||||
|
|
|
@ -100,6 +100,7 @@ export function makeUpgradeAssistantUsageCollector(server: UpgradeAssistantTelem
|
||||||
const kbnServer = server as UpgradeAssistantTelemetryServer;
|
const kbnServer = server as UpgradeAssistantTelemetryServer;
|
||||||
const upgradeAssistantUsageCollector = kbnServer.usage.collectorSet.makeUsageCollector({
|
const upgradeAssistantUsageCollector = kbnServer.usage.collectorSet.makeUsageCollector({
|
||||||
type: UPGRADE_ASSISTANT_TYPE,
|
type: UPGRADE_ASSISTANT_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
fetch: async (callCluster: any) => fetchUpgradeAssistantMetrics(callCluster, server),
|
fetch: async (callCluster: any) => fetchUpgradeAssistantMetrics(callCluster, server),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,7 @@ export function getLocalizationUsageCollector(server: any) {
|
||||||
const { collectorSet } = server.usage;
|
const { collectorSet } = server.usage;
|
||||||
return collectorSet.makeUsageCollector({
|
return collectorSet.makeUsageCollector({
|
||||||
type: KIBANA_LOCALIZATION_STATS_TYPE,
|
type: KIBANA_LOCALIZATION_STATS_TYPE,
|
||||||
|
isReady: () => true,
|
||||||
fetch: createCollectorFetch(server),
|
fetch: createCollectorFetch(server),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,7 @@ export async function readTelemetryFile(path: string): Promise<object | undefine
|
||||||
export function createTelemetryUsageCollector(server: KibanaHapiServer) {
|
export function createTelemetryUsageCollector(server: KibanaHapiServer) {
|
||||||
return server.usage.collectorSet.makeUsageCollector({
|
return server.usage.collectorSet.makeUsageCollector({
|
||||||
type: 'static_telemetry',
|
type: 'static_telemetry',
|
||||||
|
isReady: () => true,
|
||||||
fetch: async () => {
|
fetch: async () => {
|
||||||
const configPath: string = server.config().get('xpack.xpack_main.telemetry.config');
|
const configPath: string = server.config().get('xpack.xpack_main.telemetry.config');
|
||||||
const telemetryPath = join(dirname(configPath), 'telemetry.yml');
|
const telemetryPath = join(dirname(configPath), 'telemetry.yml');
|
||||||
|
|
|
@ -192,6 +192,7 @@ export default async function ({ readConfigFile }) {
|
||||||
'--server.uuid=5b2de169-2785-441b-ae8c-186a1936b17d',
|
'--server.uuid=5b2de169-2785-441b-ae8c-186a1936b17d',
|
||||||
'--xpack.xpack_main.telemetry.enabled=false',
|
'--xpack.xpack_main.telemetry.enabled=false',
|
||||||
'--xpack.maps.showMapsInspectorAdapter=true',
|
'--xpack.maps.showMapsInspectorAdapter=true',
|
||||||
|
'--stats.maximumWaitTimeForAllCollectorsInS=0',
|
||||||
'--xpack.security.encryptionKey="wuGNaIhoMpk5sO4UBxgr3NyW1sFcLgIf"', // server restarts should not invalidate active sessions
|
'--xpack.security.encryptionKey="wuGNaIhoMpk5sO4UBxgr3NyW1sFcLgIf"', // server restarts should not invalidate active sessions
|
||||||
'--xpack.code.security.enableGitCertCheck=false', // Disable git certificate check
|
'--xpack.code.security.enableGitCertCheck=false', // Disable git certificate check
|
||||||
'--timelion.ui.enabled=true',
|
'--timelion.ui.enabled=true',
|
||||||
|
|
Loading…
Reference in a new issue