[Task Manager] Reject invalid Timeout values in Task Type Definitions (#88602)
This PR adds the following: 1. We now validate the interval passed to `timeout` when a task type definition is registered. 2. replaces usage of `Joi` with `schema-type`
This commit is contained in:
parent
4878554cc9
commit
e21defa448
|
@ -12,6 +12,19 @@ export enum IntervalCadence {
|
|||
Hour = 'h',
|
||||
Day = 'd',
|
||||
}
|
||||
|
||||
// Once Babel is updated ot support Typescript 4.x templated types, we can use
|
||||
// this more accurate and safer compile-time valdiation
|
||||
// export type Interval = `${number}${IntervalCadence}`;
|
||||
export type Interval = string;
|
||||
|
||||
export function isInterval(interval: Interval | string): interval is Interval {
|
||||
const numericAsStr: string = interval.slice(0, -1);
|
||||
const numeric: number = parseInt(numericAsStr, 10);
|
||||
const cadence: IntervalCadence | string = interval.slice(-1);
|
||||
return !(!isCadence(cadence) || isNaN(numeric) || numeric <= 0 || !isNumeric(numericAsStr));
|
||||
}
|
||||
|
||||
const VALID_CADENCE = new Set(Object.values(IntervalCadence));
|
||||
const CADENCE_IN_MS: Record<IntervalCadence, number> = {
|
||||
[IntervalCadence.Second]: 1000,
|
||||
|
@ -24,7 +37,7 @@ function isCadence(cadence: IntervalCadence | string): cadence is IntervalCadenc
|
|||
return VALID_CADENCE.has(cadence as IntervalCadence);
|
||||
}
|
||||
|
||||
export function asInterval(ms: number): string {
|
||||
export function asInterval(ms: number): Interval {
|
||||
const secondsRemainder = ms % 1000;
|
||||
const minutesRemainder = ms % 60000;
|
||||
return secondsRemainder ? `${ms}ms` : minutesRemainder ? `${ms / 1000}s` : `${ms / 60000}m`;
|
||||
|
@ -34,9 +47,9 @@ export function asInterval(ms: number): string {
|
|||
* Returns a date that is the specified interval from now. Currently,
|
||||
* only minute-intervals and second-intervals are supported.
|
||||
*
|
||||
* @param {string} interval - An interval of the form `Nm` such as `5m`
|
||||
* @param {Interval} interval - An interval of the form `Nm` such as `5m`
|
||||
*/
|
||||
export function intervalFromNow(interval?: string): Date | undefined {
|
||||
export function intervalFromNow(interval?: Interval): Date | undefined {
|
||||
if (interval === undefined) {
|
||||
return;
|
||||
}
|
||||
|
@ -48,9 +61,9 @@ export function intervalFromNow(interval?: string): Date | undefined {
|
|||
* only minute-intervals and second-intervals are supported.
|
||||
*
|
||||
* @param {Date} date - The date to add interval to
|
||||
* @param {string} interval - An interval of the form `Nm` such as `5m`
|
||||
* @param {Interval} interval - An interval of the form `Nm` such as `5m`
|
||||
*/
|
||||
export function intervalFromDate(date: Date, interval?: string): Date | undefined {
|
||||
export function intervalFromDate(date: Date, interval?: Interval): Date | undefined {
|
||||
if (interval === undefined) {
|
||||
return;
|
||||
}
|
||||
|
@ -59,9 +72,11 @@ export function intervalFromDate(date: Date, interval?: string): Date | undefine
|
|||
|
||||
export function maxIntervalFromDate(
|
||||
date: Date,
|
||||
...intervals: Array<string | undefined>
|
||||
...intervals: Array<Interval | undefined>
|
||||
): Date | undefined {
|
||||
const maxSeconds = Math.max(...intervals.filter(isString).map(parseIntervalAsSecond));
|
||||
const maxSeconds = Math.max(
|
||||
...intervals.filter(isString).map((interval) => parseIntervalAsSecond(interval as Interval))
|
||||
);
|
||||
if (!isNaN(maxSeconds)) {
|
||||
return secondsFromDate(date, maxSeconds);
|
||||
}
|
||||
|
@ -91,14 +106,14 @@ export function secondsFromDate(date: Date, secs: number): Date {
|
|||
/**
|
||||
* Verifies that the specified interval matches our expected format.
|
||||
*
|
||||
* @param {string} interval - An interval such as `5m` or `10s`
|
||||
* @param {Interval} interval - An interval such as `5m` or `10s`
|
||||
* @returns {number} The interval as seconds
|
||||
*/
|
||||
export const parseIntervalAsSecond = memoize((interval: string): number => {
|
||||
export const parseIntervalAsSecond = memoize((interval: Interval): number => {
|
||||
return Math.round(parseIntervalAsMillisecond(interval) / 1000);
|
||||
});
|
||||
|
||||
export const parseIntervalAsMillisecond = memoize((interval: string): number => {
|
||||
export const parseIntervalAsMillisecond = memoize((interval: Interval): number => {
|
||||
const numericAsStr: string = interval.slice(0, -1);
|
||||
const numeric: number = parseInt(numericAsStr, 10);
|
||||
const cadence: IntervalCadence | string = interval.slice(-1);
|
||||
|
|
|
@ -39,6 +39,14 @@ export function isErr<T, E>(result: Result<T, E>): result is Err<E> {
|
|||
return !isOk(result);
|
||||
}
|
||||
|
||||
export function tryAsResult<T, E>(fn: () => T): Result<T, E> {
|
||||
try {
|
||||
return asOk(fn());
|
||||
} catch (e) {
|
||||
return asErr(e);
|
||||
}
|
||||
}
|
||||
|
||||
export async function promiseResult<T, E>(future: Promise<T>): Promise<Result<T, E>> {
|
||||
try {
|
||||
return asOk(await future);
|
||||
|
|
|
@ -4,7 +4,9 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import Joi from 'joi';
|
||||
import { schema, TypeOf } from '@kbn/config-schema';
|
||||
import { Interval, isInterval, parseIntervalAsMillisecond } from './lib/intervals';
|
||||
import { isErr, tryAsResult } from './lib/result_type';
|
||||
|
||||
/*
|
||||
* Type definitions and validations for tasks.
|
||||
|
@ -83,17 +85,8 @@ export interface FailedTaskResult {
|
|||
status: TaskStatus.Failed;
|
||||
}
|
||||
|
||||
export const validateRunResult = Joi.object({
|
||||
runAt: Joi.date().optional(),
|
||||
schedule: Joi.object().optional(),
|
||||
error: Joi.object().optional(),
|
||||
state: Joi.object().optional(),
|
||||
}).optional();
|
||||
|
||||
export type RunFunction = () => Promise<RunResult | undefined | void>;
|
||||
|
||||
export type CancelFunction = () => Promise<RunResult | undefined | void>;
|
||||
|
||||
export interface CancellableTask {
|
||||
run: RunFunction;
|
||||
cancel?: CancelFunction;
|
||||
|
@ -101,40 +94,53 @@ export interface CancellableTask {
|
|||
|
||||
export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask;
|
||||
|
||||
export const taskDefinitionSchema = schema.object(
|
||||
{
|
||||
/**
|
||||
* A unique identifier for the type of task being defined.
|
||||
*/
|
||||
type: schema.string(),
|
||||
/**
|
||||
* A brief, human-friendly title for this task.
|
||||
*/
|
||||
title: schema.maybe(schema.string()),
|
||||
/**
|
||||
* An optional more detailed description of what this task does.
|
||||
*/
|
||||
description: schema.maybe(schema.string()),
|
||||
/**
|
||||
* How long, in minutes or seconds, the system should wait for the task to complete
|
||||
* before it is considered to be timed out. (e.g. '5m', the default). If
|
||||
* the task takes longer than this, Kibana will send it a kill command and
|
||||
* the task will be re-attempted.
|
||||
*/
|
||||
timeout: schema.string({
|
||||
defaultValue: '5m',
|
||||
}),
|
||||
/**
|
||||
* Up to how many times the task should retry when it fails to run. This will
|
||||
* default to the global variable.
|
||||
*/
|
||||
maxAttempts: schema.maybe(
|
||||
schema.number({
|
||||
min: 1,
|
||||
})
|
||||
),
|
||||
},
|
||||
{
|
||||
validate({ timeout }) {
|
||||
if (!isInterval(timeout) || isErr(tryAsResult(() => parseIntervalAsMillisecond(timeout)))) {
|
||||
return `Invalid timeout "${timeout}". Timeout must be of the form "{number}{cadance}" where number is an integer. Example: 5m.`;
|
||||
}
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
/**
|
||||
* Defines a task which can be scheduled and run by the Kibana
|
||||
* task manager.
|
||||
*/
|
||||
export interface TaskDefinition {
|
||||
/**
|
||||
* A unique identifier for the type of task being defined.
|
||||
*/
|
||||
type: string;
|
||||
|
||||
/**
|
||||
* A brief, human-friendly title for this task.
|
||||
*/
|
||||
title: string;
|
||||
|
||||
/**
|
||||
* An optional more detailed description of what this task does.
|
||||
*/
|
||||
description?: string;
|
||||
|
||||
/**
|
||||
* How long, in minutes or seconds, the system should wait for the task to complete
|
||||
* before it is considered to be timed out. (e.g. '5m', the default). If
|
||||
* the task takes longer than this, Kibana will send it a kill command and
|
||||
* the task will be re-attempted.
|
||||
*/
|
||||
timeout?: string;
|
||||
|
||||
/**
|
||||
* Up to how many times the task should retry when it fails to run. This will
|
||||
* default to the global variable.
|
||||
*/
|
||||
maxAttempts?: number;
|
||||
|
||||
export type TaskDefinition = TypeOf<typeof taskDefinitionSchema> & {
|
||||
/**
|
||||
* Function that customizes how the task should behave when the task fails. This
|
||||
* function can return `true`, `false` or a Date. True will tell task manager
|
||||
|
@ -149,17 +155,7 @@ export interface TaskDefinition {
|
|||
* and an optional cancel function which cancels the task.
|
||||
*/
|
||||
createTaskRunner: TaskRunCreatorFunction;
|
||||
}
|
||||
|
||||
export const validateTaskDefinition = Joi.object({
|
||||
type: Joi.string().required(),
|
||||
title: Joi.string().optional(),
|
||||
description: Joi.string().optional(),
|
||||
timeout: Joi.string().default('5m'),
|
||||
maxAttempts: Joi.number().min(1).optional(),
|
||||
createTaskRunner: Joi.func().required(),
|
||||
getRetry: Joi.func().optional(),
|
||||
}).default();
|
||||
};
|
||||
|
||||
export enum TaskStatus {
|
||||
Idle = 'idle',
|
||||
|
@ -174,12 +170,11 @@ export enum TaskLifecycleResult {
|
|||
}
|
||||
|
||||
export type TaskLifecycle = TaskStatus | TaskLifecycleResult;
|
||||
|
||||
export interface IntervalSchedule {
|
||||
/**
|
||||
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
|
||||
* */
|
||||
interval: string;
|
||||
interval: Interval;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -10,10 +10,10 @@ import { secondsFromNow } from '../lib/intervals';
|
|||
import { asOk, asErr } from '../lib/result_type';
|
||||
import { TaskManagerRunner, TaskRunResult } from '../task_running';
|
||||
import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent, TaskRun } from '../task_events';
|
||||
import { ConcreteTaskInstance, TaskStatus, TaskDefinition, SuccessfulRunResult } from '../task';
|
||||
import { ConcreteTaskInstance, TaskStatus } from '../task';
|
||||
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
|
||||
import moment from 'moment';
|
||||
import { TaskTypeDictionary } from '../task_type_dictionary';
|
||||
import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_dictionary';
|
||||
import { mockLogger } from '../test_utils';
|
||||
import { throwUnrecoverableError } from './errors';
|
||||
|
||||
|
@ -41,24 +41,6 @@ describe('TaskManagerRunner', () => {
|
|||
expect(runner.toString()).toEqual('bar "foo"');
|
||||
});
|
||||
|
||||
test('warns if the task returns an unexpected result', async () => {
|
||||
await allowsReturnType(undefined);
|
||||
await allowsReturnType({});
|
||||
await allowsReturnType({
|
||||
runAt: new Date(),
|
||||
});
|
||||
await allowsReturnType({
|
||||
error: new Error('Dang it!'),
|
||||
});
|
||||
await allowsReturnType({
|
||||
state: { shazm: true },
|
||||
});
|
||||
await disallowsReturnType('hm....');
|
||||
await disallowsReturnType({
|
||||
whatIsThis: '?!!?',
|
||||
});
|
||||
});
|
||||
|
||||
test('queues a reattempt if the task fails', async () => {
|
||||
const initialAttempts = _.random(0, 2);
|
||||
const id = Date.now().toString();
|
||||
|
@ -1121,7 +1103,7 @@ describe('TaskManagerRunner', () => {
|
|||
|
||||
interface TestOpts {
|
||||
instance?: Partial<ConcreteTaskInstance>;
|
||||
definitions?: Record<string, Omit<TaskDefinition, 'type'>>;
|
||||
definitions?: TaskDefinitionRegistry;
|
||||
onTaskEvent?: (event: TaskEvent<unknown, unknown>) => void;
|
||||
}
|
||||
|
||||
|
@ -1196,34 +1178,4 @@ describe('TaskManagerRunner', () => {
|
|||
instance,
|
||||
};
|
||||
}
|
||||
|
||||
async function testReturn(result: unknown, shouldBeValid: boolean) {
|
||||
const { runner, logger } = testOpts({
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
createTaskRunner: () => ({
|
||||
run: async () => result as SuccessfulRunResult,
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
if (shouldBeValid) {
|
||||
expect(logger.warn).not.toHaveBeenCalled();
|
||||
} else {
|
||||
expect(logger.warn).toHaveBeenCalledTimes(1);
|
||||
expect(logger.warn.mock.calls[0][0]).toMatch(/invalid task result/i);
|
||||
}
|
||||
}
|
||||
|
||||
function allowsReturnType(result: unknown) {
|
||||
return testReturn(result, true);
|
||||
}
|
||||
|
||||
function disallowsReturnType(result: unknown) {
|
||||
return testReturn(result, false);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
import { Logger } from 'src/core/server';
|
||||
import apm from 'elastic-apm-node';
|
||||
import { performance } from 'perf_hooks';
|
||||
import Joi from 'joi';
|
||||
import { identity, defaults, flow } from 'lodash';
|
||||
|
||||
import { Middleware } from '../lib/middleware';
|
||||
|
@ -36,7 +35,6 @@ import {
|
|||
FailedRunResult,
|
||||
FailedTaskResult,
|
||||
TaskDefinition,
|
||||
validateRunResult,
|
||||
TaskStatus,
|
||||
} from '../task';
|
||||
import { TaskTypeDictionary } from '../task_type_dictionary';
|
||||
|
@ -311,20 +309,9 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
private validateResult(
|
||||
result?: SuccessfulRunResult | FailedRunResult | void
|
||||
): Result<SuccessfulRunResult, FailedRunResult> {
|
||||
const { error } = Joi.validate(result, validateRunResult);
|
||||
|
||||
if (error) {
|
||||
this.logger.warn(`Invalid task result for ${this}: ${error.message}`);
|
||||
return asErr({
|
||||
error: new Error(`Invalid task result for ${this}: ${error.message}`),
|
||||
state: {},
|
||||
});
|
||||
}
|
||||
if (!result) {
|
||||
return asOk(EMPTY_RUN_RESULT);
|
||||
}
|
||||
|
||||
return isFailedRunResult(result) ? asErr({ ...result, error: result.error }) : asOk(result);
|
||||
return isFailedRunResult(result)
|
||||
? asErr({ ...result, error: result.error })
|
||||
: asOk(result || EMPTY_RUN_RESULT);
|
||||
}
|
||||
|
||||
private shouldTryToScheduleRetry(): boolean {
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
import { get } from 'lodash';
|
||||
import { RunContext, TaskDefinition } from './task';
|
||||
import { sanitizeTaskDefinitions } from './task_type_dictionary';
|
||||
import { sanitizeTaskDefinitions, TaskDefinitionRegistry } from './task_type_dictionary';
|
||||
|
||||
interface Opts {
|
||||
numTasks: number;
|
||||
|
@ -73,8 +73,9 @@ describe('taskTypeDictionary', () => {
|
|||
|
||||
it('throws a validation exception for invalid task definition', () => {
|
||||
const runsanitize = () => {
|
||||
const taskDefinitions = {
|
||||
const taskDefinitions: TaskDefinitionRegistry = {
|
||||
some_kind_of_task: {
|
||||
// @ts-ignore
|
||||
fail: 'extremely', // cause a validation failure
|
||||
type: 'breaky_task',
|
||||
title: 'Test XYZ',
|
||||
|
@ -94,6 +95,62 @@ describe('taskTypeDictionary', () => {
|
|||
return sanitizeTaskDefinitions(taskDefinitions);
|
||||
};
|
||||
|
||||
expect(runsanitize).toThrowError();
|
||||
expect(runsanitize).toThrowErrorMatchingInlineSnapshot(
|
||||
`"[fail]: definition for this key is missing"`
|
||||
);
|
||||
});
|
||||
|
||||
it('throws a validation exception for invalid timeout on task definition', () => {
|
||||
const runsanitize = () => {
|
||||
const taskDefinitions: TaskDefinitionRegistry = {
|
||||
some_kind_of_task: {
|
||||
title: 'Test XYZ',
|
||||
timeout: '15 days',
|
||||
description: `Actually this won't work`,
|
||||
createTaskRunner() {
|
||||
return {
|
||||
async run() {
|
||||
return {
|
||||
state: {},
|
||||
};
|
||||
},
|
||||
};
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
return sanitizeTaskDefinitions(taskDefinitions);
|
||||
};
|
||||
|
||||
expect(runsanitize).toThrowErrorMatchingInlineSnapshot(
|
||||
`"Invalid timeout \\"15 days\\". Timeout must be of the form \\"{number}{cadance}\\" where number is an integer. Example: 5m."`
|
||||
);
|
||||
});
|
||||
|
||||
it('throws a validation exception for invalid floating point timeout on task definition', () => {
|
||||
const runsanitize = () => {
|
||||
const taskDefinitions: TaskDefinitionRegistry = {
|
||||
some_kind_of_task: {
|
||||
title: 'Test XYZ',
|
||||
timeout: '1.5h',
|
||||
description: `Actually this won't work`,
|
||||
createTaskRunner() {
|
||||
return {
|
||||
async run() {
|
||||
return {
|
||||
state: {},
|
||||
};
|
||||
},
|
||||
};
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
return sanitizeTaskDefinitions(taskDefinitions);
|
||||
};
|
||||
|
||||
expect(runsanitize).toThrowErrorMatchingInlineSnapshot(
|
||||
`"Invalid timeout \\"1.5h\\". Timeout must be of the form \\"{number}{cadance}\\" where number is an integer. Example: 5m."`
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -3,23 +3,13 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import Joi from 'joi';
|
||||
import { TaskDefinition, validateTaskDefinition } from './task';
|
||||
import { TaskDefinition, taskDefinitionSchema } from './task';
|
||||
import { Logger } from '../../../../src/core/server';
|
||||
|
||||
/*
|
||||
* The TaskManager is the public interface into the task manager system. This glues together
|
||||
* all of the disparate modules in one integration point. The task manager operates in two different ways:
|
||||
*
|
||||
* - pre-init, it allows middleware registration, but disallows task manipulation
|
||||
* - post-init, it disallows middleware registration, but allows task manipulation
|
||||
*
|
||||
* Due to its complexity, this is mostly tested by integration tests (see readme).
|
||||
*/
|
||||
|
||||
/**
|
||||
* The public interface into the task manager system.
|
||||
*/
|
||||
export type TaskDefinitionRegistry = Record<
|
||||
string,
|
||||
Omit<TaskDefinition, 'type' | 'timeout'> & Pick<Partial<TaskDefinition>, 'timeout'>
|
||||
>;
|
||||
export class TaskTypeDictionary {
|
||||
private definitions = new Map<string, TaskDefinition>();
|
||||
private logger: Logger;
|
||||
|
@ -57,7 +47,7 @@ export class TaskTypeDictionary {
|
|||
* Method for allowing consumers to register task definitions into the system.
|
||||
* @param taskDefinitions - The Kibana task definitions dictionary
|
||||
*/
|
||||
public registerTaskDefinitions(taskDefinitions: Record<string, Omit<TaskDefinition, 'type'>>) {
|
||||
public registerTaskDefinitions(taskDefinitions: TaskDefinitionRegistry) {
|
||||
const duplicate = Object.keys(taskDefinitions).find((type) => this.definitions.has(type));
|
||||
if (duplicate) {
|
||||
throw new Error(`Task ${duplicate} is already defined!`);
|
||||
|
@ -79,10 +69,8 @@ export class TaskTypeDictionary {
|
|||
*
|
||||
* @param taskDefinitions - The Kibana task definitions dictionary
|
||||
*/
|
||||
export function sanitizeTaskDefinitions(
|
||||
taskDefinitions: Record<string, Omit<TaskDefinition, 'type'>>
|
||||
): TaskDefinition[] {
|
||||
return Object.entries(taskDefinitions).map(([type, rawDefinition]) =>
|
||||
Joi.attempt<TaskDefinition>({ type, ...rawDefinition }, validateTaskDefinition)
|
||||
);
|
||||
export function sanitizeTaskDefinitions(taskDefinitions: TaskDefinitionRegistry): TaskDefinition[] {
|
||||
return Object.entries(taskDefinitions).map(([type, rawDefinition]) => {
|
||||
return taskDefinitionSchema.validate({ type, ...rawDefinition }) as TaskDefinition;
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue