[Monitoring] Bulk Uploader uses new ES client (#94908)

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alejandro Fernández Haro 2021-03-23 13:31:02 +01:00 committed by GitHub
parent 71466b2dd0
commit 1e6a024c4d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 38 deletions

View file

@ -5,29 +5,28 @@
* 2.0.
*/
import { Observable, Subscription } from 'rxjs';
import type { Observable, Subscription } from 'rxjs';
import { take } from 'rxjs/operators';
import moment from 'moment';
import {
ElasticsearchServiceSetup,
ILegacyCustomClusterClient,
import type {
ElasticsearchClient,
Logger,
OpsMetrics,
ServiceStatus,
ServiceStatusLevel,
ServiceStatusLevels,
} from '../../../../../src/core/server';
} from 'src/core/server';
import { ServiceStatusLevels } from '../../../../../src/core/server';
import { KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE } from '../../common/constants';
import { sendBulkPayload, monitoringBulk } from './lib';
import { sendBulkPayload } from './lib';
import { getKibanaSettings } from './collectors';
import { MonitoringConfig } from '../config';
import type { MonitoringConfig } from '../config';
import type { IBulkUploader } from '../types';
export interface BulkUploaderOptions {
log: Logger;
config: MonitoringConfig;
interval: number;
elasticsearch: ElasticsearchServiceSetup;
statusGetter$: Observable<ServiceStatus>;
opsMetrics$: Observable<OpsMetrics>;
kibanaStats: KibanaStats;
@ -61,11 +60,11 @@ export interface KibanaStats {
* @param {Object} server HapiJS server instance
* @param {Object} xpackInfo server.plugins.xpack_main.info object
*/
export class BulkUploader {
export class BulkUploader implements IBulkUploader {
private readonly _log: Logger;
private readonly _cluster: ILegacyCustomClusterClient;
private readonly kibanaStats: KibanaStats;
private readonly kibanaStatusGetter$: Subscription;
private readonly kibanaStatusGetter$: Observable<ServiceStatus>;
private kibanaStatusSubscription?: Subscription;
private readonly opsMetrics$: Observable<OpsMetrics>;
private kibanaStatus: ServiceStatusLevel | null;
private _timer: NodeJS.Timer | null;
@ -75,7 +74,6 @@ export class BulkUploader {
log,
config,
interval,
elasticsearch,
statusGetter$,
opsMetrics$,
kibanaStats,
@ -91,16 +89,10 @@ export class BulkUploader {
this._interval = interval;
this._log = log;
this._cluster = elasticsearch.legacy.createClient('admin', {
plugins: [monitoringBulk],
});
this.kibanaStats = kibanaStats;
this.kibanaStatus = null;
this.kibanaStatusGetter$ = statusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});
this.kibanaStatusGetter$ = statusGetter$;
}
/*
@ -108,17 +100,21 @@ export class BulkUploader {
* @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval
* @return undefined
*/
public start() {
public start(esClient: ElasticsearchClient) {
this._log.info('Starting monitoring stats collection');
this.kibanaStatusSubscription = this.kibanaStatusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});
if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(); // initial fetch
this._fetchAndUpload(esClient); // initial fetch
}
this._timer = setInterval(() => {
this._fetchAndUpload();
this._fetchAndUpload(esClient);
}, this._interval);
}
@ -131,8 +127,7 @@ export class BulkUploader {
if (this._timer) clearInterval(this._timer);
this._timer = null;
this.kibanaStatusGetter$.unsubscribe();
this._cluster.close();
this.kibanaStatusSubscription?.unsubscribe();
const prefix = logPrefix ? logPrefix + ':' : '';
this._log.info(prefix + 'Monitoring stats collection is stopped');
@ -168,7 +163,7 @@ export class BulkUploader {
};
}
private async _fetchAndUpload() {
private async _fetchAndUpload(esClient: ElasticsearchClient) {
const data = await Promise.all([
{ type: KIBANA_STATS_TYPE_MONITORING, result: await this.getOpsMetrics() },
{ type: KIBANA_SETTINGS_TYPE, result: await getKibanaSettings(this._log, this.config) },
@ -178,7 +173,7 @@ export class BulkUploader {
if (payload && payload.length > 0) {
try {
this._log.debug(`Uploading bulk stats payload to the local cluster`);
await this._onPayload(payload);
await this._onPayload(esClient, payload);
this._log.debug(`Uploaded bulk stats payload to the local cluster`);
} catch (err) {
this._log.warn(err.stack);
@ -189,8 +184,8 @@ export class BulkUploader {
}
}
private async _onPayload(payload: object[]) {
return await sendBulkPayload(this._cluster, this._interval, payload);
private async _onPayload(esClient: ElasticsearchClient, payload: object[]) {
return await sendBulkPayload(esClient, this._interval, payload);
}
private getConvertedKibanaStatus() {

View file

@ -5,21 +5,22 @@
* 2.0.
*/
import { ILegacyClusterClient } from 'src/core/server';
import type { ElasticsearchClient } from 'src/core/server';
import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';
/*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/
export async function sendBulkPayload(
cluster: ILegacyClusterClient,
esClient: ElasticsearchClient,
interval: number,
payload: object[]
) {
return cluster.callAsInternalUser('monitoring.bulk', {
const { body } = await esClient.monitoring.bulk({
system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION,
interval: interval + 'ms',
body: payload,
});
return body;
}

View file

@ -32,7 +32,6 @@ jest.mock('./config', () => ({
describe('Monitoring plugin', () => {
const coreSetup = coreMock.createSetup();
coreSetup.http.getServerInfo.mockReturnValue({ port: 5601 } as any);
coreSetup.status.overall$.subscribe = jest.fn();
const setupPlugins = {
usageCollection: {
@ -60,13 +59,13 @@ describe('Monitoring plugin', () => {
afterEach(() => {
(setupPlugins.alerting.registerType as jest.Mock).mockReset();
(coreSetup.status.overall$.subscribe as jest.Mock).mockReset();
});
it('always create the bulk uploader', async () => {
const plugin = new MonitoringPlugin(initializerContext as any);
await plugin.setup(coreSetup, setupPlugins as any);
expect(coreSetup.status.overall$.subscribe).toHaveBeenCalled();
// eslint-disable-next-line dot-notation
expect(plugin['bulkUploader']).not.toBeUndefined();
});
it('should register all alerts', async () => {

View file

@ -138,7 +138,6 @@ export class MonitoringPlugin
// Always create the bulk uploader
const kibanaMonitoringLog = this.getLogger(KIBANA_MONITORING_LOGGING_TAG);
const bulkUploader = (this.bulkUploader = initBulkUploader({
elasticsearch: core.elasticsearch,
config,
log: kibanaMonitoringLog,
opsMetrics$: core.metrics.getOpsMetrics$(),
@ -210,7 +209,7 @@ export class MonitoringPlugin
const monitoringBulkEnabled =
mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled;
if (monitoringBulkEnabled) {
this.bulkUploader?.start();
this.bulkUploader?.start(core.elasticsearch.client.asInternalUser);
} else {
this.bulkUploader?.handleNotEnabled();
}

View file

@ -12,6 +12,7 @@ import type {
Logger,
ILegacyCustomClusterClient,
RequestHandlerContext,
ElasticsearchClient,
} from 'kibana/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { LicenseFeature, ILicense } from '../../licensing/server';
@ -92,7 +93,7 @@ export interface LegacyShimDependencies {
export interface IBulkUploader {
getKibanaStats: () => any;
stop: () => void;
start: () => void;
start: (esClient: ElasticsearchClient) => void;
handleNotEnabled: () => void;
}