[Monitoring] Revert direct shipping code (#72505)

* Backout these changes

* Fix test
This commit is contained in:
Chris Roberson 2020-07-22 09:24:14 -04:00 committed by GitHub
parent 82dd173b2a
commit 670520a253
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 5 additions and 202 deletions

View file

@ -27,33 +27,6 @@ describe('config schema', () => {
}, },
"enabled": true, "enabled": true,
}, },
"elasticsearch": Object {
"apiVersion": "master",
"customHeaders": Object {},
"healthCheck": Object {
"delay": "PT2.5S",
},
"ignoreVersionMismatch": false,
"logFetchCount": 10,
"logQueries": false,
"pingTimeout": "PT30S",
"preserveHost": true,
"requestHeadersWhitelist": Array [
"authorization",
],
"requestTimeout": "PT30S",
"shardTimeout": "PT30S",
"sniffInterval": false,
"sniffOnConnectionFault": false,
"sniffOnStart": false,
"ssl": Object {
"alwaysPresentCertificate": false,
"keystore": Object {},
"truststore": Object {},
"verificationMode": "full",
},
"startupTimeout": "PT5S",
},
"enabled": true, "enabled": true,
"kibana": Object { "kibana": Object {
"collection": Object { "collection": Object {
@ -125,9 +98,6 @@ describe('createConfig()', () => {
it('should wrap in Elasticsearch config', async () => { it('should wrap in Elasticsearch config', async () => {
const config = createConfig( const config = createConfig(
configSchema.validate({ configSchema.validate({
elasticsearch: {
hosts: 'http://localhost:9200',
},
ui: { ui: {
elasticsearch: { elasticsearch: {
hosts: 'http://localhost:9200', hosts: 'http://localhost:9200',
@ -135,7 +105,6 @@ describe('createConfig()', () => {
}, },
}) })
); );
expect(config.elasticsearch.hosts).toEqual(['http://localhost:9200']);
expect(config.ui.elasticsearch.hosts).toEqual(['http://localhost:9200']); expect(config.ui.elasticsearch.hosts).toEqual(['http://localhost:9200']);
}); });
@ -147,9 +116,6 @@ describe('createConfig()', () => {
}; };
const config = createConfig( const config = createConfig(
configSchema.validate({ configSchema.validate({
elasticsearch: {
ssl,
},
ui: { ui: {
elasticsearch: { elasticsearch: {
ssl, ssl,
@ -162,7 +128,6 @@ describe('createConfig()', () => {
key: 'contents-of-packages/kbn-dev-utils/certs/elasticsearch.key', key: 'contents-of-packages/kbn-dev-utils/certs/elasticsearch.key',
certificateAuthorities: ['contents-of-packages/kbn-dev-utils/certs/ca.crt'], certificateAuthorities: ['contents-of-packages/kbn-dev-utils/certs/ca.crt'],
}); });
expect(config.elasticsearch.ssl).toEqual(expected);
expect(config.ui.elasticsearch.ssl).toEqual(expected); expect(config.ui.elasticsearch.ssl).toEqual(expected);
}); });
}); });

View file

@ -21,7 +21,6 @@ export const monitoringElasticsearchConfigSchema = elasticsearchConfigSchema.ext
export const configSchema = schema.object({ export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }), enabled: schema.boolean({ defaultValue: true }),
elasticsearch: monitoringElasticsearchConfigSchema,
ui: schema.object({ ui: schema.object({
enabled: schema.boolean({ defaultValue: true }), enabled: schema.boolean({ defaultValue: true }),
ccs: schema.object({ ccs: schema.object({
@ -86,7 +85,6 @@ export type MonitoringConfig = ReturnType<typeof createConfig>;
export function createConfig(config: TypeOf<typeof configSchema>) { export function createConfig(config: TypeOf<typeof configSchema>) {
return { return {
...config, ...config,
elasticsearch: new ElasticsearchConfig(config.elasticsearch as ElasticsearchConfigType),
ui: { ui: {
...config.ui, ...config.ui,
elasticsearch: new MonitoringElasticsearchConfig(config.ui.elasticsearch), elasticsearch: new MonitoringElasticsearchConfig(config.ui.elasticsearch),

View file

@ -6,10 +6,8 @@
import { noop } from 'lodash'; import { noop } from 'lodash';
import sinon from 'sinon'; import sinon from 'sinon';
import moment from 'moment';
import expect from '@kbn/expect'; import expect from '@kbn/expect';
import { BulkUploader } from '../bulk_uploader'; import { BulkUploader } from '../bulk_uploader';
import { MONITORING_SYSTEM_API_VERSION } from '../../../common/constants';
const FETCH_INTERVAL = 300; const FETCH_INTERVAL = 300;
const CHECK_DELAY = 500; const CHECK_DELAY = 500;
@ -314,92 +312,5 @@ describe('BulkUploader', () => {
done(); done();
}, CHECK_DELAY); }, CHECK_DELAY);
}); });
it('uses a direct connection to the monitoring cluster, when configured', (done) => {
const dateInIndex = '2020.02.10';
const oldNow = moment.now;
moment.now = () => 1581310800000;
const prodClusterUuid = '1sdfd5';
const prodCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('monitoring.bulk')
.callsFake((arg) => {
let resolution = null;
if (arg === 'info') {
resolution = { cluster_uuid: prodClusterUuid };
}
return new Promise((resolve) => resolve(resolution));
}),
};
const monitoringCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('bulk')
.callsFake(() => {
return new Promise((resolve) => setTimeout(resolve, CHECK_DELAY + 1));
}),
};
const collectorFetch = sinon.stub().returns({
type: 'kibana_stats',
result: { type: 'kibana_stats', payload: { testData: 12345 } },
});
const collectors = new MockCollectorSet(server, [
{
fetch: collectorFetch,
isReady: () => true,
formatForBulkUpload: (result) => result,
isUsageCollector: false,
},
]);
const customServer = {
...server,
elasticsearchPlugin: {
createCluster: () => monitoringCluster,
getCluster: (name) => {
if (name === 'admin' || name === 'data') {
return prodCluster;
}
return monitoringCluster;
},
},
config: {
get: (key) => {
if (key === 'monitoring.elasticsearch') {
return {
hosts: ['http://localhost:9200'],
username: 'tester',
password: 'testing',
ssl: {},
};
}
return null;
},
},
};
const kbnServerStatus = { toJSON: () => ({ overall: { state: 'green' } }) };
const kbnServerVersion = 'master';
const uploader = new BulkUploader({
...customServer,
interval: FETCH_INTERVAL,
kbnServerStatus,
kbnServerVersion,
});
uploader.start(collectors);
setTimeout(() => {
uploader.stop();
const firstCallArgs = monitoringCluster.callWithInternalUser.firstCall.args;
expect(firstCallArgs[0]).to.be('bulk');
expect(firstCallArgs[1].body[0].index._index).to.be(
`.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateInIndex}`
);
expect(firstCallArgs[1].body[1].type).to.be('kibana_stats');
expect(firstCallArgs[1].body[1].cluster_uuid).to.be(prodClusterUuid);
moment.now = oldNow;
done();
}, CHECK_DELAY);
});
}); });
}); });

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
import { defaultsDeep, uniq, compact, get } from 'lodash'; import { defaultsDeep, uniq, compact } from 'lodash';
import { import {
TELEMETRY_COLLECTION_INTERVAL, TELEMETRY_COLLECTION_INTERVAL,
@ -12,7 +12,6 @@ import {
} from '../../common/constants'; } from '../../common/constants';
import { sendBulkPayload, monitoringBulk } from './lib'; import { sendBulkPayload, monitoringBulk } from './lib';
import { hasMonitoringCluster } from '../es_client/instantiate_client';
/* /*
* Handles internal Kibana stats collection and uploading data to Monitoring * Handles internal Kibana stats collection and uploading data to Monitoring
@ -31,13 +30,11 @@ import { hasMonitoringCluster } from '../es_client/instantiate_client';
* @param {Object} xpackInfo server.plugins.xpack_main.info object * @param {Object} xpackInfo server.plugins.xpack_main.info object
*/ */
export class BulkUploader { export class BulkUploader {
constructor({ config, log, interval, elasticsearch, kibanaStats }) { constructor({ log, interval, elasticsearch, kibanaStats }) {
if (typeof interval !== 'number') { if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required'); throw new Error('interval number of milliseconds is required');
} }
this._hasDirectConnectionToMonitoringCluster = false;
this._productionClusterUuid = null;
this._timer = null; this._timer = null;
// Hold sending and fetching usage until monitoring.bulk is successful. This means that we // Hold sending and fetching usage until monitoring.bulk is successful. This means that we
// send usage data on the second tick. But would save a lot of bandwidth fetching usage on // send usage data on the second tick. But would save a lot of bandwidth fetching usage on
@ -54,15 +51,6 @@ export class BulkUploader {
plugins: [monitoringBulk], plugins: [monitoringBulk],
}); });
if (hasMonitoringCluster(config.elasticsearch)) {
this._log.info(`Detected direct connection to monitoring cluster`);
this._hasDirectConnectionToMonitoringCluster = true;
this._cluster = elasticsearch.legacy.createClient('monitoring-direct', config.elasticsearch);
elasticsearch.legacy.client.callAsInternalUser('info').then((data) => {
this._productionClusterUuid = get(data, 'cluster_uuid');
});
}
this.kibanaStats = kibanaStats; this.kibanaStats = kibanaStats;
this.kibanaStatusGetter = null; this.kibanaStatusGetter = null;
} }
@ -181,14 +169,7 @@ export class BulkUploader {
} }
async _onPayload(payload) { async _onPayload(payload) {
return await sendBulkPayload( return await sendBulkPayload(this._cluster, this._interval, payload, this._log);
this._cluster,
this._interval,
payload,
this._log,
this._hasDirectConnectionToMonitoringCluster,
this._productionClusterUuid
);
} }
getKibanaStats(type) { getKibanaStats(type) {

View file

@ -3,64 +3,12 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
import moment from 'moment'; import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';
import { chunk, get } from 'lodash';
import {
MONITORING_SYSTEM_API_VERSION,
KIBANA_SYSTEM_ID,
KIBANA_STATS_TYPE_MONITORING,
KIBANA_SETTINGS_TYPE,
} from '../../../common/constants';
const SUPPORTED_TYPES = [KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE];
export function formatForNormalBulkEndpoint(payload, productionClusterUuid) {
const dateSuffix = moment.utc().format('YYYY.MM.DD');
return chunk(payload, 2).reduce((accum, chunk) => {
const type = get(chunk[0], 'index._type');
if (!type || !SUPPORTED_TYPES.includes(type)) {
return accum;
}
const { timestamp } = chunk[1];
accum.push({
index: {
_index: `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateSuffix}`,
},
});
accum.push({
[type]: chunk[1],
type,
timestamp,
cluster_uuid: productionClusterUuid,
});
return accum;
}, []);
}
/* /*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint * Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/ */
export async function sendBulkPayload( export async function sendBulkPayload(cluster, interval, payload) {
cluster,
interval,
payload,
log,
hasDirectConnectionToMonitoringCluster = false,
productionClusterUuid = null
) {
if (hasDirectConnectionToMonitoringCluster) {
if (productionClusterUuid === null) {
log.warn(
`Unable to determine production cluster uuid to use for shipping monitoring data. Kibana monitoring data will appear in a standalone cluster in the Stack Monitoring UI.`
);
}
const formattedPayload = formatForNormalBulkEndpoint(payload, productionClusterUuid);
return await cluster.callAsInternalUser('bulk', {
body: formattedPayload,
});
}
return cluster.callAsInternalUser('monitoring.bulk', { return cluster.callAsInternalUser('monitoring.bulk', {
system_id: KIBANA_SYSTEM_ID, system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION, system_api_version: MONITORING_SYSTEM_API_VERSION,