diff --git a/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.js b/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.js index 61340f40a9f9..0c3d9988d76b 100644 --- a/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.js +++ b/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.js @@ -191,7 +191,17 @@ export class DataRecognizer { // takes a module config id, an optional jobPrefix and the request object // creates all of the jobs, datafeeds and savedObjects listed in the module config. // if any of the savedObjects already exist, they will not be overwritten. - async setupModuleItems(moduleId, jobPrefix, groups, indexPatternName, query, request) { + async setupModuleItems( + moduleId, + jobPrefix, + groups, + indexPatternName, + query, + startDatafeed, + start, + end, + request + ) { this.savedObjectsClient = request.getSavedObjectsClient(); this.indexPatterns = await this.loadIndexPatterns(); @@ -235,6 +245,24 @@ export class DataRecognizer { }); } saveResults.datafeeds = await this.saveDatafeeds(moduleConfig.datafeeds); + + if (startDatafeed) { + const savedDatafeeds = moduleConfig.datafeeds.filter((df) => { + const datafeedResult = saveResults.datafeeds.find(d => d.id === df.id); + return (datafeedResult !== undefined && datafeedResult.success === true); + }); + + const startResults = await this.startDatafeeds(savedDatafeeds, start, end); + saveResults.datafeeds.forEach((df) => { + const startedDatafeed = startResults[df.id]; + if (startedDatafeed !== undefined) { + df.started = startedDatafeed.started; + if (startedDatafeed.error !== undefined) { + df.error = startedDatafeed.error; + } + } + }); + } } // create the savedObjects @@ -367,14 +395,11 @@ export class DataRecognizer { // as success: false async saveDatafeeds(datafeeds) { return await Promise.all(datafeeds.map(async (datafeed) => { - const datafeedId = datafeed.id; - try { - datafeed.id = datafeedId; await this.saveDatafeed(datafeed); - return { id: datafeedId, success: true }; + return { id: datafeed.id, success: true, started: false }; } catch (error) { - return { id: datafeedId, success: false, error }; + return { id: datafeed.id, success: false, started: false, error }; } })); } @@ -384,6 +409,51 @@ export class DataRecognizer { return this.callWithRequest('ml.addDatafeed', { datafeedId, body }); } + async startDatafeeds(datafeeds, start, end) { + const results = {}; + for (const datafeed of datafeeds) { + results[datafeed.id] = await this.startDatafeed(datafeed, start, end); + } + return results; + } + + async startDatafeed(datafeed, start, end) { + const result = { started: false }; + let opened = false; + try { + const openResult = await this.callWithRequest('ml.openJob', { jobId: datafeed.config.job_id }); + opened = openResult.opened; + } catch (error) { + // if the job is already open, a 409 will be returned. + if (error.statusCode === 409) { + opened = true; + } else { + opened = false; + result.started = false; + result.error = error; + } + } + if (opened) { + try { + const duration = { start: 0 }; + if (start !== undefined) { + duration.start = start; + } + if (end !== undefined) { + duration.end = end; + } + + await this.callWithRequest('ml.startDatafeed', { datafeedId: datafeed.id, ...duration }); + result.started = true; + } catch (error) { + result.started = false; + result.error = error; + } + } + return result; + } + + // merge all of the save results into one result object // which is returned from the endpoint async updateResults(results, saveResults) { @@ -404,6 +474,7 @@ export class DataRecognizer { saveResults.datafeeds.forEach((d2) => { if (d.id === d2.id) { d.success = d2.success; + d.started = d2.started; if (d2.error !== undefined) { d.error = d2.error; } diff --git a/x-pack/plugins/ml/server/routes/modules.js b/x-pack/plugins/ml/server/routes/modules.js index 3eb883a8dd9f..bae0e4b6fcbc 100644 --- a/x-pack/plugins/ml/server/routes/modules.js +++ b/x-pack/plugins/ml/server/routes/modules.js @@ -21,9 +21,29 @@ function getModule(callWithRequest, moduleId) { return dr.getModule(moduleId); } -function saveModuleItems(callWithRequest, moduleId, prefix, groups, indexPatternName, query, request) { +function saveModuleItems( + callWithRequest, + moduleId, + prefix, + groups, + indexPatternName, + query, + startDatafeed, + start, + end, + request +) { const dr = new DataRecognizer(callWithRequest); - return dr.setupModuleItems(moduleId, prefix, groups, indexPatternName, query, request); + return dr.setupModuleItems( + moduleId, + prefix, + groups, + indexPatternName, + query, + startDatafeed, + start, + end, + request); } export function dataRecognizer(server, commonRouteConfig) { @@ -69,10 +89,24 @@ export function dataRecognizer(server, commonRouteConfig) { prefix, groups, indexPatternName, - query + query, + startDatafeed, + start, + end } = request.payload; - return saveModuleItems(callWithRequest, moduleId, prefix, groups, indexPatternName, query, request) + return saveModuleItems( + callWithRequest, + moduleId, + prefix, + groups, + indexPatternName, + query, + startDatafeed, + start, + end, + request + ) .then(resp => reply(resp)) .catch(resp => reply(wrapError(resp))); },