Backport 42876 (#46986)

This commit is contained in:
Chris Roberson 2019-09-30 18:11:07 -04:00 committed by GitHub
parent 22c977e35e
commit bde180ddb1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 17 deletions

View file

@ -71,6 +71,24 @@ describe('CollectorSet', () => {
result: { passTest: 1000 }
}]);
});
it('should gracefully handle a collector fetch method throwing an error', async () => {
const mockCallCluster = () => Promise.resolve({ passTest: 1000 });
const collectors = new CollectorSet(server);
collectors.register(new Collector(server, {
type: 'MY_TEST_COLLECTOR',
fetch: () => new Promise((_resolve, reject) => reject())
}));
let result;
try {
result = await collectors.bulkFetch(mockCallCluster);
} catch (err) {
// Do nothing
}
// This must return an empty object instead of null/undefined
expect(result).to.eql([]);
});
});
describe('toApiFieldNames', () => {
@ -162,5 +180,3 @@ describe('CollectorSet', () => {
});
});
});

View file

@ -18,7 +18,6 @@
*/
import { snakeCase } from 'lodash';
import Promise from 'bluebird';
import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
import { UsageCollector } from './usage_collector';
@ -31,7 +30,6 @@ let _waitingForAllCollectorsTimestamp = null;
* and optionally, how to combine it into a unified payload for bulk upload.
*/
export class CollectorSet {
/*
* @param {Object} server - server object
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
@ -46,7 +44,7 @@ export class CollectorSet {
*/
this.makeStatsCollector = options => new Collector(server, options);
this.makeUsageCollector = options => new UsageCollector(server, options);
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray, config);
this._maximumWaitTimeForAllCollectorsInS = config ? config.get('stats.maximumWaitTimeForAllCollectorsInS') : 60;
}
@ -115,24 +113,26 @@ export class CollectorSet {
* Call a bunch of fetch methods and then do them in bulk
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
*/
bulkFetch(callCluster, collectorSet = this) {
async bulkFetch(callCluster, collectorSet = this) {
if (!(collectorSet instanceof CollectorSet)) {
throw new Error(`bulkFetch method given bad collectorSet parameter: ` + typeof collectorSet);
}
const fetchPromises = collectorSet.map(collector => {
const collectorType = collector.type;
this._log.debug(`Fetching data from ${collectorType} collector`);
return Promise.props({
type: collectorType,
result: collector.fetchInternal(callCluster) // use the wrapper for fetch, kicks in error checking
})
.catch(err => {
this._log.warn(err);
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
const responses = [];
await collectorSet.asyncEach(async collector => {
this._log.debug(`Fetching data from ${collector.type} collector`);
try {
responses.push({
type: collector.type,
result: await collector.fetchInternal(callCluster)
});
}
catch (err) {
this._log.warn(err);
this._log.warn(`Unable to fetch data from ${collector.type} collector`);
}
});
return Promise.all(fetchPromises);
return responses;
}
/*
@ -150,6 +150,7 @@ export class CollectorSet {
// convert an array of fetched stats results into key/object
toObject(statsData) {
if (!statsData) return {};
return statsData.reduce((accumulatedStats, { type, result }) => {
return {
...accumulatedStats,