From f960e89ef6a9f17c9da0bb73e369e179a673c453 Mon Sep 17 00:00:00 2001 From: Ahmad Bamieh Date: Mon, 5 Oct 2020 16:16:07 +0300 Subject: [PATCH] [Telemetry] server fetcher check all collectors ready before sending (#79398) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alejandro Fernández Haro --- src/plugins/telemetry/server/fetcher.test.ts | 84 +++++++++++++++++-- src/plugins/telemetry/server/fetcher.ts | 30 +++++-- .../server/index.ts | 1 + .../server/plugin.ts | 6 ++ .../server/types.ts | 2 + .../server/collector/collector_set.ts | 29 ++++--- 6 files changed, 130 insertions(+), 22 deletions(-) diff --git a/src/plugins/telemetry/server/fetcher.test.ts b/src/plugins/telemetry/server/fetcher.test.ts index 245adf59799c..45712df772e1 100644 --- a/src/plugins/telemetry/server/fetcher.test.ts +++ b/src/plugins/telemetry/server/fetcher.test.ts @@ -23,19 +23,93 @@ import { coreMock } from '../../../core/server/mocks'; describe('FetcherTask', () => { describe('sendIfDue', () => { - it('returns undefined and warns when it fails to get telemetry configs', async () => { + it('stops when it fails to get telemetry configs', async () => { const initializerContext = coreMock.createPluginInitializerContext({}); const fetcherTask = new FetcherTask(initializerContext); const mockError = new Error('Some message.'); - fetcherTask['getCurrentConfigs'] = async () => { - throw mockError; - }; + const getCurrentConfigs = jest.fn().mockRejectedValue(mockError); + const fetchTelemetry = jest.fn(); + const sendTelemetry = jest.fn(); + Object.assign(fetcherTask, { + getCurrentConfigs, + fetchTelemetry, + sendTelemetry, + }); const result = await fetcherTask['sendIfDue'](); expect(result).toBe(undefined); + expect(getCurrentConfigs).toBeCalledTimes(1); + expect(fetchTelemetry).toBeCalledTimes(0); + expect(sendTelemetry).toBeCalledTimes(0); expect(fetcherTask['logger'].warn).toBeCalledTimes(1); expect(fetcherTask['logger'].warn).toHaveBeenCalledWith( - `Error fetching telemetry configs: ${mockError}` + `Error getting telemetry configs. (${mockError})` ); }); + + it('stops when all collectors are not ready', async () => { + const initializerContext = coreMock.createPluginInitializerContext({}); + const fetcherTask = new FetcherTask(initializerContext); + const getCurrentConfigs = jest.fn().mockResolvedValue({}); + const areAllCollectorsReady = jest.fn().mockResolvedValue(false); + const shouldSendReport = jest.fn().mockReturnValue(true); + const fetchTelemetry = jest.fn(); + const sendTelemetry = jest.fn(); + const updateReportFailure = jest.fn(); + + Object.assign(fetcherTask, { + getCurrentConfigs, + areAllCollectorsReady, + shouldSendReport, + fetchTelemetry, + updateReportFailure, + sendTelemetry, + }); + + await fetcherTask['sendIfDue'](); + + expect(fetchTelemetry).toBeCalledTimes(0); + expect(sendTelemetry).toBeCalledTimes(0); + + expect(areAllCollectorsReady).toBeCalledTimes(1); + expect(updateReportFailure).toBeCalledTimes(0); + expect(fetcherTask['logger'].warn).toBeCalledTimes(1); + expect(fetcherTask['logger'].warn).toHaveBeenCalledWith( + `Error fetching usage. (Error: Not all collectors are ready.)` + ); + }); + + it('fetches usage and send telemetry', async () => { + const initializerContext = coreMock.createPluginInitializerContext({}); + const fetcherTask = new FetcherTask(initializerContext); + const mockTelemetryUrl = 'mock_telemetry_url'; + const mockClusters = ['cluster_1', 'cluster_2']; + const getCurrentConfigs = jest.fn().mockResolvedValue({ + telemetryUrl: mockTelemetryUrl, + }); + const areAllCollectorsReady = jest.fn().mockResolvedValue(true); + const shouldSendReport = jest.fn().mockReturnValue(true); + + const fetchTelemetry = jest.fn().mockResolvedValue(mockClusters); + const sendTelemetry = jest.fn(); + const updateReportFailure = jest.fn(); + + Object.assign(fetcherTask, { + getCurrentConfigs, + areAllCollectorsReady, + shouldSendReport, + fetchTelemetry, + updateReportFailure, + sendTelemetry, + }); + + await fetcherTask['sendIfDue'](); + + expect(areAllCollectorsReady).toBeCalledTimes(1); + expect(fetchTelemetry).toBeCalledTimes(1); + expect(sendTelemetry).toBeCalledTimes(2); + expect(sendTelemetry).toHaveBeenNthCalledWith(1, mockTelemetryUrl, mockClusters[0]); + expect(sendTelemetry).toHaveBeenNthCalledWith(2, mockTelemetryUrl, mockClusters[1]); + expect(updateReportFailure).toBeCalledTimes(0); + }); }); }); diff --git a/src/plugins/telemetry/server/fetcher.ts b/src/plugins/telemetry/server/fetcher.ts index 75cfac721bcd..e6d909965f5f 100644 --- a/src/plugins/telemetry/server/fetcher.ts +++ b/src/plugins/telemetry/server/fetcher.ts @@ -22,7 +22,10 @@ import { Observable } from 'rxjs'; import { take } from 'rxjs/operators'; // @ts-ignore import fetch from 'node-fetch'; -import { TelemetryCollectionManagerPluginStart } from 'src/plugins/telemetry_collection_manager/server'; +import { + TelemetryCollectionManagerPluginStart, + UsageStatsPayload, +} from 'src/plugins/telemetry_collection_manager/server'; import { PluginInitializerContext, Logger, @@ -94,6 +97,10 @@ export class FetcherTask { } } + private async areAllCollectorsReady() { + return (await this.telemetryCollectionManager?.areAllCollectorsReady()) ?? false; + } + private async sendIfDue() { if (this.isSending) { return; @@ -103,7 +110,7 @@ export class FetcherTask { try { telemetryConfig = await this.getCurrentConfigs(); } catch (err) { - this.logger.warn(`Error fetching telemetry configs: ${err}`); + this.logger.warn(`Error getting telemetry configs. (${err})`); return; } @@ -111,9 +118,22 @@ export class FetcherTask { return; } + let clusters: Array = []; + this.isSending = true; + + try { + const allCollectorsReady = await this.areAllCollectorsReady(); + if (!allCollectorsReady) { + throw new Error('Not all collectors are ready.'); + } + clusters = await this.fetchTelemetry(); + } catch (err) { + this.logger.warn(`Error fetching usage. (${err})`); + this.isSending = false; + return; + } + try { - this.isSending = true; - const clusters = await this.fetchTelemetry(); const { telemetryUrl } = telemetryConfig; for (const cluster of clusters) { await this.sendTelemetry(telemetryUrl, cluster); @@ -123,7 +143,7 @@ export class FetcherTask { } catch (err) { await this.updateReportFailure(telemetryConfig); - this.logger.warn(`Error sending telemetry usage data: ${err}`); + this.logger.warn(`Error sending telemetry usage data. (${err})`); } this.isSending = false; } diff --git a/src/plugins/telemetry_collection_manager/server/index.ts b/src/plugins/telemetry_collection_manager/server/index.ts index 8761c28e1409..36ab64731fe5 100644 --- a/src/plugins/telemetry_collection_manager/server/index.ts +++ b/src/plugins/telemetry_collection_manager/server/index.ts @@ -38,4 +38,5 @@ export { ClusterDetails, ClusterDetailsGetter, LicenseGetter, + UsageStatsPayload, } from './types'; diff --git a/src/plugins/telemetry_collection_manager/server/plugin.ts b/src/plugins/telemetry_collection_manager/server/plugin.ts index e54e7451a670..ff63262004cf 100644 --- a/src/plugins/telemetry_collection_manager/server/plugin.ts +++ b/src/plugins/telemetry_collection_manager/server/plugin.ts @@ -67,6 +67,7 @@ export class TelemetryCollectionManagerPlugin setCollection: this.setCollection.bind(this), getOptInStats: this.getOptInStats.bind(this), getStats: this.getStats.bind(this), + areAllCollectorsReady: this.areAllCollectorsReady.bind(this), }; } @@ -75,6 +76,7 @@ export class TelemetryCollectionManagerPlugin setCollection: this.setCollection.bind(this), getOptInStats: this.getOptInStats.bind(this), getStats: this.getStats.bind(this), + areAllCollectorsReady: this.areAllCollectorsReady.bind(this), }; } @@ -185,6 +187,10 @@ export class TelemetryCollectionManagerPlugin return []; } + private areAllCollectorsReady = async () => { + return await this.usageCollection?.areAllCollectorsReady(); + }; + private getOptInStatsForCollection = async ( collection: Collection, optInStatus: boolean, diff --git a/src/plugins/telemetry_collection_manager/server/types.ts b/src/plugins/telemetry_collection_manager/server/types.ts index 44970df30fd1..3b0936fb73a6 100644 --- a/src/plugins/telemetry_collection_manager/server/types.ts +++ b/src/plugins/telemetry_collection_manager/server/types.ts @@ -34,6 +34,7 @@ export interface TelemetryCollectionManagerPluginSetup { ) => void; getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats']; getStats: TelemetryCollectionManagerPlugin['getStats']; + areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady']; } export interface TelemetryCollectionManagerPluginStart { @@ -42,6 +43,7 @@ export interface TelemetryCollectionManagerPluginStart { ) => void; getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats']; getStats: TelemetryCollectionManagerPlugin['getStats']; + areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady']; } export interface TelemetryOptInStats { diff --git a/src/plugins/usage_collection/server/collector/collector_set.ts b/src/plugins/usage_collection/server/collector/collector_set.ts index 6861be7f4f76..7bf4e19c72cc 100644 --- a/src/plugins/usage_collection/server/collector/collector_set.ts +++ b/src/plugins/usage_collection/server/collector/collector_set.ts @@ -76,23 +76,27 @@ export class CollectorSet { }; public areAllCollectorsReady = async (collectorSet: CollectorSet = this) => { - // Kept this for runtime validation in JS code. if (!(collectorSet instanceof CollectorSet)) { throw new Error( `areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet ); } - const collectorTypesNotReady = ( - await Promise.all( - [...collectorSet.collectors.values()].map(async (collector) => { - if (!(await collector.isReady())) { - return collector.type; - } - }) - ) - ).filter((collectorType): collectorType is string => !!collectorType); - const allReady = collectorTypesNotReady.length === 0; + const collectors = [...collectorSet.collectors.values()]; + const collectorsWithStatus = await Promise.all( + collectors.map(async (collector) => { + return { + isReady: await collector.isReady(), + collector, + }; + }) + ); + + const collectorsTypesNotReady = collectorsWithStatus + .filter((collectorWithStatus) => collectorWithStatus.isReady === false) + .map((collectorWithStatus) => collectorWithStatus.collector.type); + + const allReady = collectorsTypesNotReady.length === 0; if (!allReady && this.maximumWaitTimeForAllCollectorsInS >= 0) { const nowTimestamp = +new Date(); @@ -102,10 +106,11 @@ export class CollectorSet { const timeLeftInMS = this.maximumWaitTimeForAllCollectorsInS * 1000 - timeWaitedInMS; if (timeLeftInMS <= 0) { this.logger.debug( - `All collectors are not ready (waiting for ${collectorTypesNotReady.join(',')}) ` + + `All collectors are not ready (waiting for ${collectorsTypesNotReady.join(',')}) ` + `but we have waited the required ` + `${this.maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.` ); + return true; } else { this.logger.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`);