From d1b667512cb649cfaf3487b705df959e747c01f0 Mon Sep 17 00:00:00 2001 From: James Gowdy Date: Thu, 19 Jul 2018 16:39:54 +0100 Subject: [PATCH] [ML] Better handling of job request timeouts (#20985) --- .../messagebar/messagebar_service.js | 2 + .../components/jobs_list/styles/main.less | 2 +- .../ml/server/models/job_service/datafeeds.js | 28 +++++--- .../server/models/job_service/error_utils.js | 59 ++++++++++++++++ .../ml/server/models/job_service/groups.js | 66 ++++++++++++++++++ .../ml/server/models/job_service/index.js | 2 + .../ml/server/models/job_service/jobs.js | 67 +++++-------------- 7 files changed, 164 insertions(+), 62 deletions(-) create mode 100644 x-pack/plugins/ml/server/models/job_service/error_utils.js create mode 100644 x-pack/plugins/ml/server/models/job_service/groups.js diff --git a/x-pack/plugins/ml/public/components/messagebar/messagebar_service.js b/x-pack/plugins/ml/public/components/messagebar/messagebar_service.js index 4fc89a2aca03..3bbf7e79c683 100644 --- a/x-pack/plugins/ml/public/components/messagebar/messagebar_service.js +++ b/x-pack/plugins/ml/public/components/messagebar/messagebar_service.js @@ -63,6 +63,8 @@ function errorNotify(text, resp) { let err = null; if (typeof text === 'object' && text.response !== undefined) { resp = text.response; + } else if (typeof text === 'object' && text.message !== undefined) { + err = new Error(text.message); } else { err = new Error(text); } diff --git a/x-pack/plugins/ml/public/jobs/jobs_list_new/components/jobs_list/styles/main.less b/x-pack/plugins/ml/public/jobs/jobs_list_new/components/jobs_list/styles/main.less index 23daec1f0710..f32e922e32a2 100644 --- a/x-pack/plugins/ml/public/jobs/jobs_list_new/components/jobs_list/styles/main.less +++ b/x-pack/plugins/ml/public/jobs/jobs_list_new/components/jobs_list/styles/main.less @@ -20,7 +20,7 @@ width: 90px; } th:nth-child(7) { - width: 70px; + width: 85px; } th:nth-child(8) { width: 80px; diff --git a/x-pack/plugins/ml/server/models/job_service/datafeeds.js b/x-pack/plugins/ml/server/models/job_service/datafeeds.js index 48aadfb688e7..f496f832548b 100644 --- a/x-pack/plugins/ml/server/models/job_service/datafeeds.js +++ b/x-pack/plugins/ml/server/models/job_service/datafeeds.js @@ -5,6 +5,9 @@ */ +import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states'; +import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils'; + export function datafeedsProvider(callWithRequest) { async function forceStartDatafeeds(datafeedIds, start, end) { @@ -15,7 +18,6 @@ export function datafeedsProvider(callWithRequest) { }, {}); const results = {}; - const START_TIMEOUT = 10000; // 10s async function doStart(datafeedId) { if (doStartsCalled[datafeedId] === false) { @@ -33,19 +35,19 @@ export function datafeedsProvider(callWithRequest) { for (const datafeedId of datafeedIds) { const jobId = jobIds[datafeedId]; if (jobId !== undefined) { - setTimeout(async () => { - // in 10 seconds start the datafeed. - // this should give the openJob enough time. - // if not, the start request will be queued - // behind the open request on the server. - results[datafeedId] = await doStart(datafeedId); - }, START_TIMEOUT); - try { if (await openJob(jobId)) { results[datafeedId] = await doStart(datafeedId); } } catch (error) { + if (isRequestTimeout(error)) { + // if the open request times out, start the datafeed anyway + // then break out of the loop so no more requests are fired. + // use fillResultsWithTimeouts to add a timeout error to each + // remaining job + results[datafeedId] = await doStart(datafeedId); + return fillResultsWithTimeouts(results, datafeedId, datafeedIds, JOB_STATE.OPENED); + } results[datafeedId] = { started: false, error }; } } else { @@ -79,7 +81,13 @@ export function datafeedsProvider(callWithRequest) { const results = {}; for (const datafeedId of datafeedIds) { - results[datafeedId] = await callWithRequest('ml.stopDatafeed', { datafeedId }); + try { + results[datafeedId] = await callWithRequest('ml.stopDatafeed', { datafeedId }); + } catch (error) { + if (isRequestTimeout(error)) { + return fillResultsWithTimeouts(results, datafeedId, datafeedIds, DATAFEED_STATE.STOPPED); + } + } } return results; diff --git a/x-pack/plugins/ml/server/models/job_service/error_utils.js b/x-pack/plugins/ml/server/models/job_service/error_utils.js new file mode 100644 index 000000000000..f9396bffd794 --- /dev/null +++ b/x-pack/plugins/ml/server/models/job_service/error_utils.js @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + + +import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states'; + +const REQUEST_TIMEOUT = 'RequestTimeout'; + +export function isRequestTimeout(error) { + return (error.displayName === REQUEST_TIMEOUT); +} + +// populate a results object with timeout errors +// for the ids which haven't already been set +export function fillResultsWithTimeouts(results, id, ids, status) { + const action = getAction(status); + const extra = ((ids.length - Object.keys(results).length) > 1) ? ' All other requests cancelled.' : ''; + + const error = { + response: { + error: { + root_cause: [{ + reason: `Request to ${action} '${id}' timed out.${extra}` + }] + } + } + }; + + return ids.reduce((p, c) => { + if (results[c] === undefined) { + p[c] = { + [status]: false, + error + }; + } else { + p[c] = results[c]; + } + return p; + }, {}); +} + +function getAction(status) { + let action = ''; + if (status === DATAFEED_STATE.STARTED) { + action = 'start'; + } else if (status === DATAFEED_STATE.STOPPED) { + action = 'stop'; + } else if (status === DATAFEED_STATE.DELETED) { + action = 'delete'; + } else if (status === JOB_STATE.OPENED) { + action = 'open'; + } else if (status === JOB_STATE.CLOSED) { + action = 'close'; + } + return action; +} diff --git a/x-pack/plugins/ml/server/models/job_service/groups.js b/x-pack/plugins/ml/server/models/job_service/groups.js new file mode 100644 index 000000000000..430fa258bd18 --- /dev/null +++ b/x-pack/plugins/ml/server/models/job_service/groups.js @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + + +import { CalendarManager } from '../calendar'; + +export function groupsProvider(callWithRequest) { + const calMngr = new CalendarManager(callWithRequest); + + async function getAllGroups() { + const groups = {}; + const jobIds = {}; + const [ JOBS, CALENDARS ] = [0, 1]; + const results = await Promise.all([ + callWithRequest('ml.jobs'), + calMngr.getAllCalendars(), + ]); + + if (results[JOBS] && results[JOBS].jobs) { + results[JOBS].jobs.forEach((job) => { + jobIds[job.job_id] = null; + if (job.groups !== undefined) { + job.groups.forEach((g) => { + if (groups[g] === undefined) { + groups[g] = { + id: g, + jobIds: [job.job_id], + calendarIds: [] + }; + } else { + groups[g].jobIds.push(job.job_id); + } + + }); + + } + }); + } + if (results[CALENDARS]) { + results[CALENDARS].forEach((cal) => { + cal.job_ids.forEach((jId) => { + if (jobIds[jId] === undefined) { + if (groups[jId] === undefined) { + groups[jId] = { + id: jId, + jobIds: [], + calendarIds: [cal.calendar_id] + }; + } else { + groups[jId].calendarIds.push(cal.calendar_id); + } + } + }); + }); + } + + return Object.keys(groups).map(g => groups[g]); + } + + return { + getAllGroups + }; +} diff --git a/x-pack/plugins/ml/server/models/job_service/index.js b/x-pack/plugins/ml/server/models/job_service/index.js index e3562cf9e476..ca896fe302e2 100644 --- a/x-pack/plugins/ml/server/models/job_service/index.js +++ b/x-pack/plugins/ml/server/models/job_service/index.js @@ -7,10 +7,12 @@ import { datafeedsProvider } from './datafeeds'; import { jobsProvider } from './jobs'; +import { groupsProvider } from './groups'; export function jobServiceProvider(callWithRequest) { return { ...datafeedsProvider(callWithRequest), ...jobsProvider(callWithRequest), + ...groupsProvider(callWithRequest), }; } diff --git a/x-pack/plugins/ml/server/models/job_service/jobs.js b/x-pack/plugins/ml/server/models/job_service/jobs.js index b236cf3af06f..6a9da9ff5b75 100644 --- a/x-pack/plugins/ml/server/models/job_service/jobs.js +++ b/x-pack/plugins/ml/server/models/job_service/jobs.js @@ -4,9 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ + +import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states'; import { datafeedsProvider } from './datafeeds'; import { jobAuditMessagesProvider } from '../job_audit_messages'; import { CalendarManager } from '../calendar'; +import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils'; import moment from 'moment'; import { uniq } from 'lodash'; @@ -37,10 +40,16 @@ export function jobsProvider(callWithRequest) { await forceDeleteJob(jobId); results[jobId] = { deleted: true }; } catch (error) { + if (isRequestTimeout(error)) { + return fillResultsWithTimeouts(results, jobId, jobIds, DATAFEED_STATE.DELETED); + } results[jobId] = { deleted: false, error }; } } } catch (error) { + if (isRequestTimeout(error)) { + return fillResultsWithTimeouts(results, datafeedIds[jobId], jobIds, DATAFEED_STATE.DELETED); + } results[jobId] = { deleted: false, error }; } } @@ -54,6 +63,10 @@ export function jobsProvider(callWithRequest) { await callWithRequest('ml.closeJob', { jobId }); results[jobId] = { closed: true }; } catch (error) { + if (isRequestTimeout(error)) { + return fillResultsWithTimeouts(results, jobId, jobIds, JOB_STATE.CLOSED); + } + if (error.statusCode === 409 && (error.response && error.response.includes('datafeed') === false)) { // the close job request may fail (409) if the job has failed or if the datafeed hasn't been stopped. // if the job has failed we want to attempt a force close. @@ -62,6 +75,9 @@ export function jobsProvider(callWithRequest) { await callWithRequest('ml.closeJob', { jobId, force: true }); results[jobId] = { closed: true }; } catch (error2) { + if (isRequestTimeout(error)) { + return fillResultsWithTimeouts(results, jobId, jobIds, JOB_STATE.CLOSED); + } results[jobId] = { closed: false, error: error2 }; } } else { @@ -248,62 +264,11 @@ export function jobsProvider(callWithRequest) { return obj; } - async function getAllGroups() { - const groups = {}; - const jobIds = {}; - const [ JOBS, CALENDARS ] = [0, 1]; - const results = await Promise.all([ - callWithRequest('ml.jobs'), - calMngr.getAllCalendars(), - ]); - - if (results[JOBS] && results[JOBS].jobs) { - results[JOBS].jobs.forEach((job) => { - jobIds[job.job_id] = null; - if (job.groups !== undefined) { - job.groups.forEach((g) => { - if (groups[g] === undefined) { - groups[g] = { - id: g, - jobIds: [job.job_id], - calendarIds: [] - }; - } else { - groups[g].jobIds.push(job.job_id); - } - - }); - - } - }); - } - if (results[CALENDARS]) { - results[CALENDARS].forEach((cal) => { - cal.job_ids.forEach((jId) => { - if (jobIds[jId] === undefined) { - if (groups[jId] === undefined) { - groups[jId] = { - id: jId, - jobIds: [], - calendarIds: [cal.calendar_id] - }; - } else { - groups[jId].calendarIds.push(cal.calendar_id); - } - } - }); - }); - } - - return Object.keys(groups).map(g => groups[g]); - } - return { forceDeleteJob, deleteJobs, closeJobs, jobsSummary, createFullJobsList, - getAllGroups, }; }