[Telemetry] [Monitoring] Only retry fetching usage once monito… (#54309)

* fix interval and add tests

* Update x-pack/legacy/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js

Co-Authored-By: Christiane (Tina) Heiligers <christiane.heiligers@elastic.co>

Co-authored-by: Christiane (Tina) Heiligers <christiane.heiligers@elastic.co>
This commit is contained in:
Ahmad Bamieh 2020-01-09 02:55:17 +02:00 committed by GitHub
parent 0e46b240bb
commit a27c4c4a4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 27 deletions

View file

@ -209,7 +209,7 @@ describe('BulkUploader', () => {
}, CHECK_DELAY);
});
it('refetches UsageCollectors if uploading to local cluster was not successful', done => {
it('stops refetching UsageCollectors if uploading to local cluster was not successful', async () => {
const usageCollectorFetch = sinon
.stub()
.returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });
@ -227,12 +227,52 @@ describe('BulkUploader', () => {
uploader._onPayload = async () => ({ took: 0, ignored: true, errors: false });
uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(usageCollectorFetch.callCount).to.be.greaterThan(1);
done();
}, CHECK_DELAY);
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
expect(uploader._holdSendingUsage).to.eql(true);
expect(usageCollectorFetch.callCount).to.eql(1);
});
it('fetches UsageCollectors once uploading to local cluster is successful again', async () => {
const usageCollectorFetch = sinon
.stub()
.returns({ type: 'type_usage_collector_test', result: { usageData: 12345 } });
const statsCollectorFetch = sinon
.stub()
.returns({ type: 'type_stats_collector_test', result: { statsData: 12345 } });
const collectors = new MockCollectorSet(server, [
{
fetch: statsCollectorFetch,
isReady: () => true,
formatForBulkUpload: result => result,
isUsageCollector: false,
},
{
fetch: usageCollectorFetch,
isReady: () => true,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
]);
const uploader = new BulkUploader({ ...server, interval: FETCH_INTERVAL });
let bulkIgnored = true;
uploader._onPayload = async () => ({ took: 0, ignored: bulkIgnored, errors: false });
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
expect(uploader._holdSendingUsage).to.eql(true);
bulkIgnored = false;
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
await uploader._fetchAndUpload(uploader.filterCollectorSet(collectors));
expect(uploader._holdSendingUsage).to.eql(false);
expect(usageCollectorFetch.callCount).to.eql(2);
expect(statsCollectorFetch.callCount).to.eql(3);
});
it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => {

View file

@ -40,8 +40,14 @@ export class BulkUploader {
}
this._timer = null;
// Hold sending and fetching usage until monitoring.bulk is successful. This means that we
// send usage data on the second tick. But would save a lot of bandwidth fetching usage on
// every tick when ES is failing or monitoring is disabled.
this._holdSendingUsage = false;
this._interval = interval;
this._lastFetchUsageTime = null;
// Limit sending and fetching usage to once per day once usage is successfully stored
// into the monitoring indices.
this._usageInterval = TELEMETRY_COLLECTION_INTERVAL;
this._log = {
@ -65,6 +71,29 @@ export class BulkUploader {
});
}
filterCollectorSet(usageCollection) {
const successfulUploadInLastDay =
this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
return usageCollection.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (usageCollection.isUsageCollector(c)) {
if (this._holdSendingUsage) {
return false;
}
if (successfulUploadInLastDay) {
return false;
}
}
return true;
});
}
/*
* Start the interval timer
* @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval
@ -72,31 +101,15 @@ export class BulkUploader {
*/
start(usageCollection) {
this._log.info('Starting monitoring stats collection');
const filterCollectorSet = _usageCollection => {
const successfulUploadInLastDay =
this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
return _usageCollection.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (successfulUploadInLastDay && _usageCollection.isUsageCollector(c)) {
return false;
}
return true;
});
};
if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(filterCollectorSet(usageCollection)); // initial fetch
this._fetchAndUpload(this.filterCollectorSet(usageCollection)); // initial fetch
}
this._timer = setInterval(() => {
this._fetchAndUpload(filterCollectorSet(usageCollection));
this._fetchAndUpload(this.filterCollectorSet(usageCollection));
}, this._interval);
}
@ -146,12 +159,17 @@ export class BulkUploader {
const sendSuccessful = !result.ignored && !result.errors;
if (!sendSuccessful && hasUsageCollectors) {
this._lastFetchUsageTime = null;
this._holdSendingUsage = true;
this._log.debug(
'Resetting lastFetchWithUsage because uploading to the cluster was not successful.'
);
}
if (sendSuccessful && hasUsageCollectors) {
this._lastFetchUsageTime = Date.now();
if (sendSuccessful) {
this._holdSendingUsage = false;
if (hasUsageCollectors) {
this._lastFetchUsageTime = Date.now();
}
}
this._log.debug(`Uploaded bulk stats payload to the local cluster`);
} catch (err) {