[ML] Better handling of job request timeouts (#20985)

This commit is contained in:
James Gowdy 2018-07-19 16:39:54 +01:00 committed by GitHub
parent 3d06046295
commit d1b667512c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 164 additions and 62 deletions

View file

@ -63,6 +63,8 @@ function errorNotify(text, resp) {
let err = null;
if (typeof text === 'object' && text.response !== undefined) {
resp = text.response;
} else if (typeof text === 'object' && text.message !== undefined) {
err = new Error(text.message);
} else {
err = new Error(text);
}

View file

@ -20,7 +20,7 @@
width: 90px;
}
th:nth-child(7) {
width: 70px;
width: 85px;
}
th:nth-child(8) {
width: 80px;

View file

@ -5,6 +5,9 @@
*/
import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states';
import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils';
export function datafeedsProvider(callWithRequest) {
async function forceStartDatafeeds(datafeedIds, start, end) {
@ -15,7 +18,6 @@ export function datafeedsProvider(callWithRequest) {
}, {});
const results = {};
const START_TIMEOUT = 10000; // 10s
async function doStart(datafeedId) {
if (doStartsCalled[datafeedId] === false) {
@ -33,19 +35,19 @@ export function datafeedsProvider(callWithRequest) {
for (const datafeedId of datafeedIds) {
const jobId = jobIds[datafeedId];
if (jobId !== undefined) {
setTimeout(async () => {
// in 10 seconds start the datafeed.
// this should give the openJob enough time.
// if not, the start request will be queued
// behind the open request on the server.
results[datafeedId] = await doStart(datafeedId);
}, START_TIMEOUT);
try {
if (await openJob(jobId)) {
results[datafeedId] = await doStart(datafeedId);
}
} catch (error) {
if (isRequestTimeout(error)) {
// if the open request times out, start the datafeed anyway
// then break out of the loop so no more requests are fired.
// use fillResultsWithTimeouts to add a timeout error to each
// remaining job
results[datafeedId] = await doStart(datafeedId);
return fillResultsWithTimeouts(results, datafeedId, datafeedIds, JOB_STATE.OPENED);
}
results[datafeedId] = { started: false, error };
}
} else {
@ -79,7 +81,13 @@ export function datafeedsProvider(callWithRequest) {
const results = {};
for (const datafeedId of datafeedIds) {
results[datafeedId] = await callWithRequest('ml.stopDatafeed', { datafeedId });
try {
results[datafeedId] = await callWithRequest('ml.stopDatafeed', { datafeedId });
} catch (error) {
if (isRequestTimeout(error)) {
return fillResultsWithTimeouts(results, datafeedId, datafeedIds, DATAFEED_STATE.STOPPED);
}
}
}
return results;

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states';
const REQUEST_TIMEOUT = 'RequestTimeout';
export function isRequestTimeout(error) {
return (error.displayName === REQUEST_TIMEOUT);
}
// populate a results object with timeout errors
// for the ids which haven't already been set
export function fillResultsWithTimeouts(results, id, ids, status) {
const action = getAction(status);
const extra = ((ids.length - Object.keys(results).length) > 1) ? ' All other requests cancelled.' : '';
const error = {
response: {
error: {
root_cause: [{
reason: `Request to ${action} '${id}' timed out.${extra}`
}]
}
}
};
return ids.reduce((p, c) => {
if (results[c] === undefined) {
p[c] = {
[status]: false,
error
};
} else {
p[c] = results[c];
}
return p;
}, {});
}
function getAction(status) {
let action = '';
if (status === DATAFEED_STATE.STARTED) {
action = 'start';
} else if (status === DATAFEED_STATE.STOPPED) {
action = 'stop';
} else if (status === DATAFEED_STATE.DELETED) {
action = 'delete';
} else if (status === JOB_STATE.OPENED) {
action = 'open';
} else if (status === JOB_STATE.CLOSED) {
action = 'close';
}
return action;
}

View file

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { CalendarManager } from '../calendar';
export function groupsProvider(callWithRequest) {
const calMngr = new CalendarManager(callWithRequest);
async function getAllGroups() {
const groups = {};
const jobIds = {};
const [ JOBS, CALENDARS ] = [0, 1];
const results = await Promise.all([
callWithRequest('ml.jobs'),
calMngr.getAllCalendars(),
]);
if (results[JOBS] && results[JOBS].jobs) {
results[JOBS].jobs.forEach((job) => {
jobIds[job.job_id] = null;
if (job.groups !== undefined) {
job.groups.forEach((g) => {
if (groups[g] === undefined) {
groups[g] = {
id: g,
jobIds: [job.job_id],
calendarIds: []
};
} else {
groups[g].jobIds.push(job.job_id);
}
});
}
});
}
if (results[CALENDARS]) {
results[CALENDARS].forEach((cal) => {
cal.job_ids.forEach((jId) => {
if (jobIds[jId] === undefined) {
if (groups[jId] === undefined) {
groups[jId] = {
id: jId,
jobIds: [],
calendarIds: [cal.calendar_id]
};
} else {
groups[jId].calendarIds.push(cal.calendar_id);
}
}
});
});
}
return Object.keys(groups).map(g => groups[g]);
}
return {
getAllGroups
};
}

View file

@ -7,10 +7,12 @@
import { datafeedsProvider } from './datafeeds';
import { jobsProvider } from './jobs';
import { groupsProvider } from './groups';
export function jobServiceProvider(callWithRequest) {
return {
...datafeedsProvider(callWithRequest),
...jobsProvider(callWithRequest),
...groupsProvider(callWithRequest),
};
}

View file

@ -4,9 +4,12 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states';
import { datafeedsProvider } from './datafeeds';
import { jobAuditMessagesProvider } from '../job_audit_messages';
import { CalendarManager } from '../calendar';
import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils';
import moment from 'moment';
import { uniq } from 'lodash';
@ -37,10 +40,16 @@ export function jobsProvider(callWithRequest) {
await forceDeleteJob(jobId);
results[jobId] = { deleted: true };
} catch (error) {
if (isRequestTimeout(error)) {
return fillResultsWithTimeouts(results, jobId, jobIds, DATAFEED_STATE.DELETED);
}
results[jobId] = { deleted: false, error };
}
}
} catch (error) {
if (isRequestTimeout(error)) {
return fillResultsWithTimeouts(results, datafeedIds[jobId], jobIds, DATAFEED_STATE.DELETED);
}
results[jobId] = { deleted: false, error };
}
}
@ -54,6 +63,10 @@ export function jobsProvider(callWithRequest) {
await callWithRequest('ml.closeJob', { jobId });
results[jobId] = { closed: true };
} catch (error) {
if (isRequestTimeout(error)) {
return fillResultsWithTimeouts(results, jobId, jobIds, JOB_STATE.CLOSED);
}
if (error.statusCode === 409 && (error.response && error.response.includes('datafeed') === false)) {
// the close job request may fail (409) if the job has failed or if the datafeed hasn't been stopped.
// if the job has failed we want to attempt a force close.
@ -62,6 +75,9 @@ export function jobsProvider(callWithRequest) {
await callWithRequest('ml.closeJob', { jobId, force: true });
results[jobId] = { closed: true };
} catch (error2) {
if (isRequestTimeout(error)) {
return fillResultsWithTimeouts(results, jobId, jobIds, JOB_STATE.CLOSED);
}
results[jobId] = { closed: false, error: error2 };
}
} else {
@ -248,62 +264,11 @@ export function jobsProvider(callWithRequest) {
return obj;
}
async function getAllGroups() {
const groups = {};
const jobIds = {};
const [ JOBS, CALENDARS ] = [0, 1];
const results = await Promise.all([
callWithRequest('ml.jobs'),
calMngr.getAllCalendars(),
]);
if (results[JOBS] && results[JOBS].jobs) {
results[JOBS].jobs.forEach((job) => {
jobIds[job.job_id] = null;
if (job.groups !== undefined) {
job.groups.forEach((g) => {
if (groups[g] === undefined) {
groups[g] = {
id: g,
jobIds: [job.job_id],
calendarIds: []
};
} else {
groups[g].jobIds.push(job.job_id);
}
});
}
});
}
if (results[CALENDARS]) {
results[CALENDARS].forEach((cal) => {
cal.job_ids.forEach((jId) => {
if (jobIds[jId] === undefined) {
if (groups[jId] === undefined) {
groups[jId] = {
id: jId,
jobIds: [],
calendarIds: [cal.calendar_id]
};
} else {
groups[jId].calendarIds.push(cal.calendar_id);
}
}
});
});
}
return Object.keys(groups).map(g => groups[g]);
}
return {
forceDeleteJob,
deleteJobs,
closeJobs,
jobsSummary,
createFullJobsList,
getAllGroups,
};
}