[ML] Start datafeed from the module setup endpoint (#19254)

* [ML] Start datafeeds from the module setup endpoint

* changes based on review

* correcting use of filter
This commit is contained in:
James Gowdy 2018-05-22 10:37:21 +01:00 committed by GitHub
parent 14022c3602
commit 0263d886e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 10 deletions

View file

@ -191,7 +191,17 @@ export class DataRecognizer {
// takes a module config id, an optional jobPrefix and the request object
// creates all of the jobs, datafeeds and savedObjects listed in the module config.
// if any of the savedObjects already exist, they will not be overwritten.
async setupModuleItems(moduleId, jobPrefix, groups, indexPatternName, query, request) {
async setupModuleItems(
moduleId,
jobPrefix,
groups,
indexPatternName,
query,
startDatafeed,
start,
end,
request
) {
this.savedObjectsClient = request.getSavedObjectsClient();
this.indexPatterns = await this.loadIndexPatterns();
@ -235,6 +245,24 @@ export class DataRecognizer {
});
}
saveResults.datafeeds = await this.saveDatafeeds(moduleConfig.datafeeds);
if (startDatafeed) {
const savedDatafeeds = moduleConfig.datafeeds.filter((df) => {
const datafeedResult = saveResults.datafeeds.find(d => d.id === df.id);
return (datafeedResult !== undefined && datafeedResult.success === true);
});
const startResults = await this.startDatafeeds(savedDatafeeds, start, end);
saveResults.datafeeds.forEach((df) => {
const startedDatafeed = startResults[df.id];
if (startedDatafeed !== undefined) {
df.started = startedDatafeed.started;
if (startedDatafeed.error !== undefined) {
df.error = startedDatafeed.error;
}
}
});
}
}
// create the savedObjects
@ -367,14 +395,11 @@ export class DataRecognizer {
// as success: false
async saveDatafeeds(datafeeds) {
return await Promise.all(datafeeds.map(async (datafeed) => {
const datafeedId = datafeed.id;
try {
datafeed.id = datafeedId;
await this.saveDatafeed(datafeed);
return { id: datafeedId, success: true };
return { id: datafeed.id, success: true, started: false };
} catch (error) {
return { id: datafeedId, success: false, error };
return { id: datafeed.id, success: false, started: false, error };
}
}));
}
@ -384,6 +409,51 @@ export class DataRecognizer {
return this.callWithRequest('ml.addDatafeed', { datafeedId, body });
}
async startDatafeeds(datafeeds, start, end) {
const results = {};
for (const datafeed of datafeeds) {
results[datafeed.id] = await this.startDatafeed(datafeed, start, end);
}
return results;
}
async startDatafeed(datafeed, start, end) {
const result = { started: false };
let opened = false;
try {
const openResult = await this.callWithRequest('ml.openJob', { jobId: datafeed.config.job_id });
opened = openResult.opened;
} catch (error) {
// if the job is already open, a 409 will be returned.
if (error.statusCode === 409) {
opened = true;
} else {
opened = false;
result.started = false;
result.error = error;
}
}
if (opened) {
try {
const duration = { start: 0 };
if (start !== undefined) {
duration.start = start;
}
if (end !== undefined) {
duration.end = end;
}
await this.callWithRequest('ml.startDatafeed', { datafeedId: datafeed.id, ...duration });
result.started = true;
} catch (error) {
result.started = false;
result.error = error;
}
}
return result;
}
// merge all of the save results into one result object
// which is returned from the endpoint
async updateResults(results, saveResults) {
@ -404,6 +474,7 @@ export class DataRecognizer {
saveResults.datafeeds.forEach((d2) => {
if (d.id === d2.id) {
d.success = d2.success;
d.started = d2.started;
if (d2.error !== undefined) {
d.error = d2.error;
}

View file

@ -21,9 +21,29 @@ function getModule(callWithRequest, moduleId) {
return dr.getModule(moduleId);
}
function saveModuleItems(callWithRequest, moduleId, prefix, groups, indexPatternName, query, request) {
function saveModuleItems(
callWithRequest,
moduleId,
prefix,
groups,
indexPatternName,
query,
startDatafeed,
start,
end,
request
) {
const dr = new DataRecognizer(callWithRequest);
return dr.setupModuleItems(moduleId, prefix, groups, indexPatternName, query, request);
return dr.setupModuleItems(
moduleId,
prefix,
groups,
indexPatternName,
query,
startDatafeed,
start,
end,
request);
}
export function dataRecognizer(server, commonRouteConfig) {
@ -69,10 +89,24 @@ export function dataRecognizer(server, commonRouteConfig) {
prefix,
groups,
indexPatternName,
query
query,
startDatafeed,
start,
end
} = request.payload;
return saveModuleItems(callWithRequest, moduleId, prefix, groups, indexPatternName, query, request)
return saveModuleItems(
callWithRequest,
moduleId,
prefix,
groups,
indexPatternName,
query,
startDatafeed,
start,
end,
request
)
.then(resp => reply(resp))
.catch(resp => reply(wrapError(resp)));
},