diff --git a/src/es_archiver/lib/indices/kibana_index.js b/src/es_archiver/lib/indices/kibana_index.js index 7a732adcc69e..379a4078e32b 100644 --- a/src/es_archiver/lib/indices/kibana_index.js +++ b/src/es_archiver/lib/indices/kibana_index.js @@ -48,8 +48,7 @@ const buildUiExports = _.once(async () => { * Deletes all indices that start with `.kibana` */ export async function deleteKibanaIndices({ client, stats, log }) { - const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' }); - const indexNames = kibanaIndices.map(x => x.index); + const indexNames = await fetchKibanaIndices(client); if (!indexNames.length) { return; } @@ -152,6 +151,20 @@ export async function createDefaultSpace({ index, client }) { }); } +/** + * Migrations mean that the Kibana index will look something like: + * .kibana, .kibana_1, .kibana_323, etc. This finds all indices starting + * with .kibana, then filters out any that aren't actually Kibana's core + * index (e.g. we don't want to remove .kibana_task_manager or the like). + * + * @param {string} index + */ +async function fetchKibanaIndices(client) { + const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' }); + const isKibanaIndex = (index) => (/^\.kibana(:?_\d*)?$/).test(index); + return kibanaIndices.map(x => x.index).filter(isKibanaIndex); +} + export async function cleanKibanaIndices({ client, stats, log, kibanaUrl }) { if (!await isSpacesEnabled({ kibanaUrl })) { return await deleteKibanaIndices({ diff --git a/src/ui/ui_exports/ui_export_types/index.js b/src/ui/ui_exports/ui_export_types/index.js index c12c445540ef..2a9d8d0c023e 100644 --- a/src/ui/ui_exports/ui_export_types/index.js +++ b/src/ui/ui_exports/ui_export_types/index.js @@ -29,6 +29,10 @@ export { validations, } from './saved_object'; +export { + taskDefinitions +} from './task_definitions'; + export { app, apps, diff --git a/src/ui/ui_exports/ui_export_types/task_definitions.js b/src/ui/ui_exports/ui_export_types/task_definitions.js new file mode 100644 index 000000000000..279583e51679 --- /dev/null +++ b/src/ui/ui_exports/ui_export_types/task_definitions.js @@ -0,0 +1,28 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { mergeAtType } from './reduce'; +import { alias, wrap, uniqueKeys } from './modify_reduce'; + +// How plugins define tasks that the task manager can run. +export const taskDefinitions = wrap( + alias('taskDefinitions'), + uniqueKeys(), + mergeAtType, +); diff --git a/x-pack/index.js b/x-pack/index.js index a2f952117ed3..9dbbc340f76a 100644 --- a/x-pack/index.js +++ b/x-pack/index.js @@ -29,6 +29,7 @@ import { notifications } from './plugins/notifications'; import { kueryAutocomplete } from './plugins/kuery_autocomplete'; import { canvas } from './plugins/canvas'; import { infra } from './plugins/infra'; +import { taskManager } from './plugins/task_manager'; import { rollup } from './plugins/rollup'; import { remoteClusters } from './plugins/remote_clusters'; import { crossClusterReplication } from './plugins/cross_cluster_replication'; @@ -62,6 +63,7 @@ module.exports = function (kibana) { indexLifecycleManagement(kibana), kueryAutocomplete(kibana), infra(kibana), + taskManager(kibana), rollup(kibana), remoteClusters(kibana), crossClusterReplication(kibana), diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md new file mode 100644 index 000000000000..33f9dd87613a --- /dev/null +++ b/x-pack/plugins/task_manager/README.md @@ -0,0 +1,306 @@ +# Kibana task manager + +The task manager is a generic system for running background tasks. It supports: + +- Single-run and recurring tasks +- Scheduling tasks to run after a specified datetime +- Basic retry logic +- Recovery of stalled tasks / timeouts +- Tracking task state across multiple runs +- Configuring the run-parameters for specific tasks +- Basic coordination to prevent the same task instance from running on more than one Kibana system at a time + +## Implementation details + +At a high-level, the task manager works like this: + +- Every `{poll_interval}` milliseconds, check the `{index}` for any tasks that need to be run: + - `runAt` is past + - `attempts` is less than the configured threshold +- Attempt to claim the task by using optimistic concurrency to set: + - status to `running` + - `runAt` to now + the timeout specified by the task +- Execute the task, if the previous claim succeeded +- If the task fails, increment the `attempts` count and reschedule it +- If the task succeeds: + - If it is recurring, store the result of the run, and reschedule + - If it is not recurring, remove it from the index + +## Pooling + +Each task manager instance runs tasks in a pool which ensures that at most N tasks are run at a time, where N is configurable. This prevents the system from running too many tasks at once in resource constrained environments. In addition to this, each individual task type definition can have `numWorkers` specified, which tells the system how many workers are consumed by a single running instance of a task. This effectively limits how many tasks of a given type can be run at once. + +For example, we may have a system with a `max_workers` of 10, but a super expensive task (such as `reporting`) which specifies a `numWorkers` of 10. In this case, `reporting` tasks will run one at a time. + +If a task specifies a higher `numWorkers` than the system supports, the system's `max_workers` setting will be substituted for it. + +## Config options + +The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`): + +- `max_attempts` - How many times a failing task instance will be retried before it is never run again +- `poll_interval` - How often the background worker should check the task_manager index for more work +- `index` - The name of the index that the task_manager +- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) +- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security. +- `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks + - For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting` + - This allows sysadmins to tweak the operational performance of Kibana, allowing more or fewer tasks of a specific type to run simultaneously + +## Task definitions + +Plugins define tasks by calling the `registerTaskDefinitions` method on the `server.taskManager` object. + +A sample task can be found in the [x-pack/test/plugin_api_integration/plugins/task_manager](../../test/plugin_api_integration/plugins/task_manager/index.js) folder. + +```js +const { taskManager } = server; +taskManager.registerTaskDefinitions({ + // clusterMonitoring is the task type, and must be unique across the entire system + clusterMonitoring: { + // Human friendly name, used to represent this task in logs, UI, etc + title: 'Human friendly name', + + // Optional, human-friendly, more detailed description + description: 'Amazing!!', + + // Optional, how long, in minutes, the system should wait before + // a running instance of this task is considered to be timed out. + // This defaults to 5 minutes. + timeout: '5m', + + // The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots, + // 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is + // overridden by the `override_num_workers` config value, if specified. + numWorkers: 2, + + // The createTaskRunner function / method returns an object that is responsible for + // performing the work of the task. context: { taskInstance, kbnServer }, is documented below. + createTaskRunner(context) { + return { + // Perform the work of the task. The return value should fit the TaskResult interface, documented + // below. Invalid return values will result in a logged warning. + async run() { + // Do some work + // Conditionally send some alerts + // Return some result or other... + }, + + // Optional, will be called if a running instance of this task times out, allowing the task + // to attempt to clean itself up. + async cancel() { + // Do whatever is required to cancel this task, such as killing any spawned processes + }, + }; + }, + }, +}); +``` + +When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its createTaskRunner's method, passing it a run context which looks like this: + +```js +{ + // An instance of the Kibana server object. + kbnServer, + + // The data associated with this instance of the task, with two properties being most notable: + // + // params: + // An object, specific to this task instance, used by the + // task to determine exactly what work should be performed. + // e.g. a cluster-monitoring task might have a `clusterName` + // property in here, but a movie-monitoring task might have + // a `directorName` property. + // + // state: + // The state returned from the previous run of this task instance. + // If this task instance has never succesfully run, this will + // be an empty object: {} + taskInstance, +} +``` + +## Task result + +The task runner's `run` method is expected to return a promise that resolves to undefined or to an object that looks like the following: +```js +{ + // Optional, if specified, this is used as the tasks' nextRun, overriding + // the default system scheduler. + runAt: "2020-07-24T17:34:35.272Z", + + // Optional, an error object, logged out as a warning. The pressence of this + // property indicates that the task did not succeed. + error: { message: 'Hrumph!' }, + + // Optional, this will be passed into the next run of the task, if + // this is a recurring task. + state: { + anything: 'goes here', + }, +} +``` + +Other return values will result in a warning, but the system should continue to work. + +## Task instances + +The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc. + +The data stored for a task instance looks something like this: + +```js +{ + // The type of task that will run this instance. + taskType: 'clusterMonitoring', + + // The next time this task instance should run. It is not guaranteed + // to run at this time, but it is guaranteed not to run earlier than + // this. + runAt: "2020-07-24T17:34:35.272Z", + + // Indicates that this is a recurring task. We currently only support + // 1 minute granularity. + interval: '5m', + + // How many times this task has been unsuccesfully attempted, + // this will be reset to 0 if the task ever succesfully completes. + // This is incremented if a task fails or times out. + attempts: 0, + + // Currently, this is either idle | running. It is used to + // coordinate which Kibana instance owns / is running a specific + // task instance. + status: 'idle', + + // The params specific to this task instance, which will be + // passed to the task when it runs, and will be used by the + // task to determine exactly what work should be performed. + // This is a JSON blob, and will be different per task type. + // e.g. a cluster-monitoring task might have a `clusterName` + // property in here, but a movie-monitoring task might have + // a `directorName` property. + params: '{ "task": "specific stuff here" }', + + // The result of the previous run of this task instance. This + // will be passed to the next run of the task, along with the + // params, and could be used by a task to do special logic If + // the task state changes (e.g. from green to red, or foo to bar) + // If there was no previous run (e.g. the instance has never succesfully + // completed, this will be an empty object.). This is a JSON blob, + // and will be different per task type. + state: '{ "status": "green" }', + + // An extension point for 3rd parties to build in security features on + // top of the task manager. For example, this might be the token of the user + // who scheduled this task. + userContext: 'the token of the user who scheduled this task', + + // An extension point for 3rd parties to build in security features on + // top of the task manager, and is expected to be the id of the user, if any, + // that scheduled this task. + user: '23lk3l42', + + // An application-specific designation, allowing different Kibana + // plugins / apps to query for only those tasks they care about. + scope: ['alerting'], +} +``` + +## Programmatic access + +The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected. + +```js +const { taskManager } = server; +// Schedules a task. All properties are as documented in the previous +// storage section, except that here, params is an object, not a JSON +// string. +const task = await taskManager.schedule({ + taskType, + runAt, + interval, + params, + scope: ['my-fanci-app'], +}); + +// Removes the specified task +await manager.remove(task.id); + +// Fetches tasks, supports pagination, via the search-after API: +// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html +// If scope is not specified, all tasks are returned, otherwise only tasks +// with the given scope are returned. +const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] }); + +// results look something like this: +{ + searchAfter: ['233322'], + // Tasks is an array of task instances + tasks: [{ + id: '3242342', + taskType: 'reporting', + // etc + }] +} +``` + +More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. + +## Middleware + +The task manager exposes a middleware layer that allows modifying tasks before they are scheduled / persisted to the task manager index, and modifying tasks / the run context before a task is run. + +For example: + +```js +// In your plugin's init +server.taskManager.addMiddleware({ + async beforeSave({ taskInstance, ...opts }) { + console.log(`About to save a task of type ${taskInstance.taskType}`); + + return { + ...opts, + taskInstance: { + ...taskInstance, + params: { + ...taskInstance.params, + example: 'Added to params!', + }, + }, + }; + }, + + async beforeRun({ taskInstance, ...opts }) { + console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`); + const { example, ...taskWithoutExampleProp } = taskInstance; + + return { + ...opts, + taskInstance: taskWithoutExampleProp, + }; + }, +}); +``` + +## Limitations in v1.0 + +In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value. + +There is only a rudimentary mechanism for coordinating tasks and handling expired tasks. Tasks are considered expired if their runAt has arrived, and their status is still 'running'. + +There is no task history. Each run overwrites the previous run's state. One-time tasks are removed from the index upon completion regardless of success / failure. + +The task manager's public API is create / delete / list. Updates aren't directly supported, and listing should be scoped so that users only see their own tasks. + +## Testing + +- `node scripts/jest --testPathPattern=task_manager --watch` + +Integration tests can be run like so: + +``` +node scripts/functional_tests_server.js --config test/plugin_functional/config.js +node scripts/functional_test_runner --config test/plugin_functional/config.js --grep task_manager +``` diff --git a/x-pack/plugins/task_manager/index.js b/x-pack/plugins/task_manager/index.js new file mode 100644 index 000000000000..e0116820a3e0 --- /dev/null +++ b/x-pack/plugins/task_manager/index.js @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TaskManager } from './task_manager'; + +export function taskManager(kibana) { + return new kibana.Plugin({ + id: 'task_manager', + require: ['kibana', 'elasticsearch', 'xpack_main'], + configPrefix: 'xpack.task_manager', + config(Joi) { + return Joi.object({ + enabled: Joi.boolean().default(true), + max_attempts: Joi.number() + .description('The maximum number of times a task will be attempted before being abandoned as failed') + .min(0) // no retries + .default(3), + poll_interval: Joi.number() + .description('How often, in milliseconds, the task manager will look for more work.') + .min(1000) + .default(3000), + index: Joi.string() + .description('The name of the index used to store task information.') + .default('.kibana_task_manager'), + max_workers: Joi.number() + .description('The maximum number of tasks that this Kibana instance will run simultaneously.') + .min(1) // disable the task manager rather than trying to specify it with 0 workers + .default(10), + override_num_workers: Joi.object() + .pattern(/.*/, Joi.number().greater(0)) + .description('Customize the number of workers occupied by specific tasks (e.g. override_num_workers.reporting: 2)') + .default({}) + }).default(); + }, + init(server) { + const config = server.config(); + const taskManager = new TaskManager(this.kbnServer, server, config); + server.decorate('server', 'taskManager', taskManager); + }, + }); +} diff --git a/x-pack/plugins/task_manager/lib/fill_pool.test.ts b/x-pack/plugins/task_manager/lib/fill_pool.test.ts new file mode 100644 index 000000000000..a0361103f0cc --- /dev/null +++ b/x-pack/plugins/task_manager/lib/fill_pool.test.ts @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { fillPool } from './fill_pool'; + +describe('fillPool', () => { + test('stops filling when there are no more tasks in the store', async () => { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => true); + const converter = _.identity; + + await fillPool(run, fetchAvailableTasks, converter); + + expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]); + }); + + test('stops filling when the pool has no more capacity', async () => { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => false); + const converter = _.identity; + + await fillPool(run, fetchAvailableTasks, converter); + + expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]); + }); + + test('calls the converter on the records prior to running', async () => { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => false); + const converter = (x: number) => x.toString(); + + await fillPool(run, fetchAvailableTasks, converter); + + expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3']); + }); + + describe('error handling', () => { + test('throws exception from fetchAvailableTasks', async () => { + const run = sinon.spy(() => false); + const converter = (x: number) => x.toString(); + + try { + const fetchAvailableTasks = async () => Promise.reject('fetch is not working'); + + await fillPool(run, fetchAvailableTasks, converter); + } catch (err) { + expect(err.toString()).toBe('fetch is not working'); + expect(run.called).toBe(false); + } + }); + + test('throws exception from run', async () => { + const run = sinon.spy(() => Promise.reject('run is not working')); + const converter = (x: number) => x.toString(); + + try { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + + await fillPool(run, fetchAvailableTasks, converter); + } catch (err) { + expect(err.toString()).toBe('run is not working'); + } + }); + + test('throws exception from converter', async () => { + try { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => false); + const converter = (x: number) => { + throw new Error(`can not convert ${x}`); + }; + + await fillPool(run, fetchAvailableTasks, converter); + } catch (err) { + expect(err.toString()).toBe('Error: can not convert 1'); + } + }); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/fill_pool.ts b/x-pack/plugins/task_manager/lib/fill_pool.ts new file mode 100644 index 000000000000..a5970574abaa --- /dev/null +++ b/x-pack/plugins/task_manager/lib/fill_pool.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +type BatchRun = (tasks: T[]) => Promise; +type Fetcher = () => Promise; +type Converter = (t: T1) => T2; + +/** + * Given a function that runs a batch of tasks (e.g. taskPool.run), a function + * that fetches task records (e.g. store.fetchAvailableTasks), and a function + * that converts task records to the appropriate task runner, this function + * fills the pool with work. + * + * This is annoyingly general in order to simplify testing. + * + * @param run - a function that runs a batch of tasks (e.g. taskPool.run) + * @param fetchAvailableTasks - a function that fetches task records (e.g. store.fetchAvailableTasks) + * @param converter - a function that converts task records to the appropriate task runner + */ +export async function fillPool( + run: BatchRun, + fetchAvailableTasks: Fetcher, + converter: Converter +): Promise { + while (true) { + const instances = await fetchAvailableTasks(); + + if (!instances.length) { + return; + } + + const tasks = instances.map(converter); + + if (!(await run(tasks))) { + return; + } + } +} diff --git a/x-pack/plugins/task_manager/lib/intervals.test.ts b/x-pack/plugins/task_manager/lib/intervals.test.ts new file mode 100644 index 000000000000..bba5cdf8591f --- /dev/null +++ b/x-pack/plugins/task_manager/lib/intervals.test.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import { assertValidInterval, intervalFromNow, minutesFromNow } from './intervals'; + +describe('taskIntervals', () => { + describe('assertValidInterval', () => { + test('it accepts intervals in the form `Nm`', () => { + expect(() => assertValidInterval(`${_.random(1000)}m`)).not.toThrow(); + }); + + test('it rejects intervals are not of the form `Nm`', () => { + expect(() => assertValidInterval(`5m 2s`)).toThrow( + /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => assertValidInterval(`hello`)).toThrow( + /Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + }); + + describe('intervalFromNow', () => { + test('it returns the current date plus n minutes', () => { + const mins = _.random(1, 100); + const expected = Date.now() + mins * 60 * 1000; + const nextRun = intervalFromNow(`${mins}m`)!.getTime(); + expect(Math.abs(nextRun - expected)).toBeLessThan(100); + }); + + test('it rejects intervals are not of the form `Nm`', () => { + expect(() => intervalFromNow(`5m 2s`)).toThrow( + /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => intervalFromNow(`hello`)).toThrow( + /Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + }); + + describe('minutesFromNow', () => { + test('it returns the current date plus a number of minutes', () => { + const mins = _.random(1, 100); + const expected = Date.now() + mins * 60 * 1000; + const nextRun = minutesFromNow(mins).getTime(); + expect(Math.abs(nextRun - expected)).toBeLessThan(100); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/intervals.ts b/x-pack/plugins/task_manager/lib/intervals.ts new file mode 100644 index 000000000000..f095c336098f --- /dev/null +++ b/x-pack/plugins/task_manager/lib/intervals.ts @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/** + * Returns a date that is the specified interval from now. Currently, + * only minute-intervals are supported. + * + * @param {string} interval - An interval of the form `Nm` such as `5m` + */ +export function intervalFromNow(interval?: string): Date | undefined { + if (interval === undefined) { + return; + } + + assertValidInterval(interval); + + return minutesFromNow(parseInterval(interval)); +} + +/** + * Returns a date that is mins minutes from now. + * + * @param mins The number of mintues from now + */ +export function minutesFromNow(mins: number): Date { + const now = new Date(); + + now.setMinutes(now.getMinutes() + mins); + + return now; +} + +/** + * Verifies that the specified interval matches our expected format. + * + * @param {string} interval - An interval such as `5m` + */ +export function assertValidInterval(interval: string) { + if (/^[0-9]+m$/.test(interval)) { + return interval; + } + + throw new Error( + `Invalid interval "${interval}". Intervals must be of the form {number}m. Example: 5m.` + ); +} + +function parseInterval(interval: string) { + return parseInt(interval, 10); +} diff --git a/x-pack/plugins/task_manager/lib/logger.ts b/x-pack/plugins/task_manager/lib/logger.ts new file mode 100644 index 000000000000..932acaa15de2 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/logger.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export type LogFn = (prefix: string[], msg: string) => void; + +type SimpleLogFn = (msg: string) => void; + +export interface Logger { + error: SimpleLogFn; + warning: SimpleLogFn; + debug: SimpleLogFn; + info: SimpleLogFn; +} + +export class TaskManagerLogger implements Logger { + private write: LogFn; + + constructor(log: LogFn) { + this.write = log; + } + + public error(msg: string) { + this.log('error', msg); + } + + public warning(msg: string) { + this.log('warning', msg); + } + + public debug(msg: string) { + this.log('debug', msg); + } + + public info(msg: string) { + this.log('info', msg); + } + + private log(type: string, msg: string) { + this.write([type, 'task_manager'], msg); + } +} diff --git a/x-pack/plugins/task_manager/lib/middleware.test.ts b/x-pack/plugins/task_manager/lib/middleware.test.ts new file mode 100644 index 000000000000..c249f86d1592 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/middleware.test.ts @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import moment from 'moment'; +import { ConcreteTaskInstance, RunContext, TaskInstance, TaskStatus } from '../task'; +import { addMiddlewareToChain } from './middleware'; + +interface BeforeSaveOpts { + taskInstance: TaskInstance; +} + +const getMockTaskInstance = () => ({ + taskType: 'nice_task', + state: {}, + params: { abc: 'def' }, +}); +const getMockConcreteTaskInstance = () => { + const concrete: { + id: string; + version: number; + attempts: number; + status: TaskStatus; + runAt: Date; + state: any; + taskType: string; + params: any; + } = { + id: 'hy8o99o83', + version: 1, + attempts: 0, + status: 'idle', + runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()), + state: {}, + taskType: 'nice_task', + params: { abc: 'def' }, + }; + return concrete; +}; +const getMockRunContext = (runTask: ConcreteTaskInstance) => ({ + taskInstance: runTask, + kbnServer: {}, +}); + +const defaultBeforeSave = async (opts: BeforeSaveOpts) => { + return opts; +}; + +const defaultBeforeRun = async (opts: RunContext) => { + return opts; +}; + +describe('addMiddlewareToChain', () => { + it('chains the beforeSave functions', () => { + const m1 = { + beforeSave: async (opts: BeforeSaveOpts) => { + Object.assign(opts.taskInstance.params, { m1: true }); + return opts; + }, + beforeRun: defaultBeforeRun, + }; + const m2 = { + beforeSave: async (opts: BeforeSaveOpts) => { + Object.assign(opts.taskInstance.params, { m2: true }); + return opts; + }, + beforeRun: defaultBeforeRun, + }; + const m3 = { + beforeSave: async (opts: BeforeSaveOpts) => { + Object.assign(opts.taskInstance.params, { m3: true }); + return opts; + }, + beforeRun: defaultBeforeRun, + }; + + let middlewareChain; + middlewareChain = addMiddlewareToChain(m1, m2); + middlewareChain = addMiddlewareToChain(middlewareChain, m3); + + middlewareChain.beforeSave({ taskInstance: getMockTaskInstance() }).then((saveOpts: any) => { + expect(saveOpts).toMatchInlineSnapshot(` +Object { + "taskInstance": Object { + "params": Object { + "abc": "def", + "m1": true, + "m2": true, + "m3": true, + }, + "state": Object {}, + "taskType": "nice_task", + }, +} +`); + }); + }); + + it('chains the beforeRun functions', () => { + const m1 = { + beforeSave: defaultBeforeSave, + beforeRun: async (opts: RunContext) => { + return { + ...opts, + m1: true, + }; + }, + }; + const m2 = { + beforeSave: defaultBeforeSave, + beforeRun: async (opts: RunContext) => { + return { + ...opts, + m2: true, + }; + }, + }; + const m3 = { + beforeSave: defaultBeforeSave, + beforeRun: async (opts: RunContext) => { + return { + ...opts, + m3: true, + }; + }, + }; + + let middlewareChain; + middlewareChain = addMiddlewareToChain(m1, m2); + middlewareChain = addMiddlewareToChain(middlewareChain, m3); + + middlewareChain + .beforeRun(getMockRunContext(getMockConcreteTaskInstance())) + .then(contextOpts => { + expect(contextOpts).toMatchInlineSnapshot(` +Object { + "kbnServer": Object {}, + "m1": true, + "m2": true, + "m3": true, + "taskInstance": Object { + "attempts": 0, + "id": "hy8o99o83", + "params": Object { + "abc": "def", + }, + "runAt": 2018-09-18T05:33:09.588Z, + "state": Object {}, + "status": "idle", + "taskType": "nice_task", + "version": 1, + }, +} +`); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/middleware.ts b/x-pack/plugins/task_manager/lib/middleware.ts new file mode 100644 index 000000000000..d81b76fda7e5 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/middleware.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { RunContext, TaskInstance } from '../task'; + +/* + * BeforeSaveMiddlewareParams is nearly identical to RunContext, but + * taskInstance is before save (no _id property) + * + * taskInstance property is guaranteed to exist. The params can optionally + * include fields from an "options" object passed as the 2nd parameter to + * taskManager.schedule() + */ +export interface BeforeSaveMiddlewareParams { + taskInstance: TaskInstance; +} + +export type BeforeSaveFunction = ( + params: BeforeSaveMiddlewareParams +) => Promise; + +export type BeforeRunFunction = (params: RunContext) => Promise; + +export interface Middleware { + beforeSave: BeforeSaveFunction; + beforeRun: BeforeRunFunction; +} + +export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) { + const beforeSave = middleware.beforeSave + ? (params: BeforeSaveMiddlewareParams) => + middleware.beforeSave(params).then(prevMiddleware.beforeSave) + : prevMiddleware.beforeSave; + + const beforeRun = middleware.beforeRun + ? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun) + : prevMiddleware.beforeRun; + + return { + beforeSave, + beforeRun, + }; +} diff --git a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts new file mode 100644 index 000000000000..af6ee536498b --- /dev/null +++ b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { get } from 'lodash'; +import { RunContext } from '../task'; +import { sanitizeTaskDefinitions } from './sanitize_task_definitions'; + +interface Opts { + numTasks: number; + numWorkers?: number; +} + +const getMockTaskDefinitions = (opts: Opts) => { + const { numTasks, numWorkers } = opts; + const tasks: any = {}; + + for (let i = 0; i < numTasks; i++) { + const type = `test_task_type_${i}`; + tasks[type] = { + type, + title: 'Test', + description: 'one super cool task', + numWorkers: numWorkers ? numWorkers : 1, + createTaskRunner(context: RunContext) { + const incre = get(context, 'taskInstance.state.incre', -1); + return { + run: () => ({ + state: { + incre: incre + 1, + }, + runAt: Date.now(), + }), + }; + }, + }; + } + return tasks; +}; + +describe('sanitizeTaskDefinitions', () => { + it('provides tasks with defaults if there are no overrides', () => { + const maxWorkers = 10; + const overrideNumWorkers = {}; + const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); + const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + + expect(result).toMatchInlineSnapshot(` +Object { + "test_task_type_0": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_0", + }, + "test_task_type_1": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_1", + }, + "test_task_type_2": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_2", + }, +} +`); + }); + + it('scales down task definitions workers if larger than max workers', () => { + const maxWorkers = 2; + const overrideNumWorkers = {}; + const taskDefinitions = getMockTaskDefinitions({ numTasks: 2, numWorkers: 5 }); + const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + + expect(result).toMatchInlineSnapshot(` +Object { + "test_task_type_0": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 2, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_0", + }, + "test_task_type_1": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 2, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_1", + }, +} +`); + }); + + it('incorporates overrideNumWorkers to give certain type an override of number of workers', () => { + const overrideNumWorkers = { + test_task_type_0: 5, + test_task_type_1: 2, + }; + const maxWorkers = 5; + const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); + const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + + expect(result).toMatchInlineSnapshot(` +Object { + "test_task_type_0": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 5, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_0", + }, + "test_task_type_1": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 2, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_1", + }, + "test_task_type_2": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_2", + }, +} +`); + }); + + it('throws a validation exception for invalid task definition', () => { + const runsanitize = () => { + const maxWorkers = 10; + const overrideNumWorkers = {}; + const taskDefinitions = { + some_kind_of_task: { + fail: 'extremely', // cause a validation failure + type: 'breaky_task', + title: 'Test XYZ', + description: `Actually this won't work`, + createTaskRunner() { + return { + async run() { + return { + state: {}, + }; + }, + }; + }, + }, + }; + + return sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + }; + + expect(runsanitize).toThrowError(); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts new file mode 100644 index 000000000000..07d7cdd0df40 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Joi from 'joi'; +import { + SanitizedTaskDefinition, + TaskDefinition, + TaskDictionary, + validateTaskDefinition, +} from '../task'; + +/** + * Sanitizes the system's task definitions. Task definitions have optional properties, and + * this ensures they all are given a reasonable default. This also overrides certain task + * definition properties with kibana.yml overrides (such as the `override_num_workers` config + * value). + * + * @param maxWorkers - The maxiumum numer of workers allowed to run at once + * @param taskDefinitions - The Kibana task definitions dictionary + * @param overrideNumWorkers - The kibana.yml overrides numWorkers per task type. + */ +export function sanitizeTaskDefinitions( + taskDefinitions: TaskDictionary = {}, + maxWorkers: number, + overrideNumWorkers: { [taskType: string]: number } +): TaskDictionary { + return Object.keys(taskDefinitions).reduce( + (acc, type) => { + const rawDefinition = taskDefinitions[type]; + rawDefinition.type = type; + const definition = Joi.attempt(rawDefinition, validateTaskDefinition) as TaskDefinition; + const numWorkers = Math.min( + maxWorkers, + overrideNumWorkers[definition.type] || definition.numWorkers || 1 + ); + + acc[type] = { + ...definition, + numWorkers, + }; + + return acc; + }, + {} as TaskDictionary + ); +} diff --git a/x-pack/plugins/task_manager/task.ts b/x-pack/plugins/task_manager/task.ts new file mode 100644 index 000000000000..71f61ea0576b --- /dev/null +++ b/x-pack/plugins/task_manager/task.ts @@ -0,0 +1,236 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Joi from 'joi'; + +/* + * Type definitions and validations for tasks. + */ + +/** + * A loosely typed definition of the elasticjs wrapper. It's beyond the scope + * of this work to try to make a comprehensive type definition of this. + */ +export type ElasticJs = (action: string, args: any) => Promise; + +/** + * The run context is passed into a task's run function as its sole argument. + */ +export interface RunContext { + /** + * The Kibana server object. This gives tasks full-access to the server object, + * including the various ES options client functions + */ + kbnServer: object; + + /** + * The document describing the task instance, its params, state, id, etc. + */ + taskInstance: ConcreteTaskInstance; +} + +/** + * The return value of a task's run function should be a promise of RunResult. + */ +export interface RunResult { + /** + * Specifies the next run date / time for this task. If unspecified, this is + * treated as a single-run task, and will not be rescheduled after + * completion. + */ + runAt?: Date; + + /** + * If specified, indicates that the task failed to accomplish its work. This is + * logged out as a warning, and the task will be reattempted after a delay. + */ + error?: object; + + /** + * The state which will be passed to the next run of this task (if this is a + * recurring task). See the RunContext type definition for more details. + */ + state: object; +} + +export const validateRunResult = Joi.object({ + runAt: Joi.date().optional(), + error: Joi.object().optional(), + state: Joi.object().optional(), +}).optional(); + +export type RunFunction = () => Promise; + +export type CancelFunction = () => Promise; + +export interface CancellableTask { + run: RunFunction; + cancel?: CancelFunction; +} + +export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask; + +/** + * Defines a task which can be scheduled and run by the Kibana + * task manager. + */ +export interface TaskDefinition { + /** + * A unique identifier for the type of task being defined. + */ + type: string; + + /** + * A brief, human-friendly title for this task. + */ + title: string; + + /** + * An optional more detailed description of what this task does. + */ + description?: string; + + /** + * How long, in minutes, the system should wait for the task to complete + * before it is considered to be timed out. (e.g. '5m', the default). If + * the task takes longer than this, Kibana will send it a kill command and + * the task will be re-attempted. + */ + timeout?: string; + + /** + * The numer of workers / slots a running instance of this task occupies. + * This defaults to 1. + */ + numWorkers?: number; + + /** + * Creates an object that has a run function which performs the task's work, + * and an optional cancel function which cancels the task. + */ + createTaskRunner: TaskRunCreatorFunction; +} + +/** + * A task definition with all of its properties set to a valid value. + */ +export interface SanitizedTaskDefinition extends TaskDefinition { + numWorkers: number; +} + +export const validateTaskDefinition = Joi.object({ + type: Joi.string().required(), + title: Joi.string().optional(), + description: Joi.string().optional(), + timeout: Joi.string().default('5m'), + numWorkers: Joi.number() + .min(1) + .default(1), + createTaskRunner: Joi.func().required(), +}).default(); + +/** + * A dictionary mapping task types to their definitions. + */ +export interface TaskDictionary { + [taskType: string]: T; +} + +export type TaskStatus = 'idle' | 'running' | 'failed'; + +/* + * A task instance represents all of the data required to store, fetch, + * and execute a task. + */ +export interface TaskInstance { + /** + * Optional ID that can be passed by the caller. When ID is undefined, ES + * will auto-generate a unique id. Otherwise, ID will be used to either + * create a new document, or update existing document + */ + id?: string; + + /** + * The task definition type whose run function will execute this instance. + */ + taskType: string; + + /** + * The date and time that this task is scheduled to be run. It is not + * guaranteed to run at this time, but it is guaranteed not to run earlier + * than this. Defaults to immediately. + */ + runAt?: Date; + + /** + * An interval in minutes (e.g. '5m'). If specified, this is a recurring task. + */ + interval?: string; + + /** + * A task-specific set of parameters, used by the task's run function to tailor + * its work. This is generally user-input, such as { sms: '333-444-2222' }. + */ + params: object; + + /** + * The state passed into the task's run function, and returned by the previous + * run. If there was no previous run, or if the previous run did not return + * any state, this will be the empy object: {} + */ + state: object; + + /** + * The id of the user who scheduled this task. + */ + user?: string; + + /** + * Used to group tasks for querying. So, reporting might schedule tasks with a scope of 'reporting', + * and then query such tasks to provide a glimpse at only reporting tasks, rather than at all tasks. + */ + scope?: string[]; +} + +/** + * A task instance that has an id and is ready for storage. + */ +export interface ConcreteTaskInstance extends TaskInstance { + /** + * The id of the Elastic document that stores this instance's data. This can + * be passed by the caller when scheduling the task. + */ + id: string; + + /** + * The version of the Elaticsearch document. + */ + version: number; + + /** + * The number of unsuccessful attempts since the last successful run. This + * will be zeroed out after a successful run. + */ + attempts: number; + + /** + * Indicates whether or not the task is currently running. + */ + status: TaskStatus; + + /** + * The date and time that this task is scheduled to be run. It is not guaranteed + * to run at this time, but it is guaranteed not to run earlier than this. + */ + runAt: Date; + + /** + * The state passed into the task's run function, and returned by the previous + * run. If there was no previous run, or if the previous run did not return + * any state, this will be the empy object: {} + */ + state: object; +} diff --git a/x-pack/plugins/task_manager/task_manager.test.ts b/x-pack/plugins/task_manager/task_manager.test.ts new file mode 100644 index 000000000000..72cdd890f56b --- /dev/null +++ b/x-pack/plugins/task_manager/task_manager.test.ts @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { TaskManager } from './task_manager'; + +describe('TaskManager', () => { + let clock: sinon.SinonFakeTimers; + const defaultConfig = { + task_manager: { + max_workers: 10, + override_num_workers: {}, + index: 'foo', + max_attempts: 9, + poll_interval: 6000000, + }, + }; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('disallows schedule before init', async () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const task = { + taskType: 'foo', + params: {}, + state: {}, + }; + await expect(client.schedule(task)).rejects.toThrow(/^NotInitialized: .*/i); + }); + + test('disallows fetch before init', async () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + await expect(client.fetch({})).rejects.toThrow(/^NotInitialized: .*/i); + }); + + test('disallows remove before init', async () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + await expect(client.remove('23')).rejects.toThrow(/^NotInitialized: .*/i); + }); + + test('allows middleware registration before init', () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const middleware = { + beforeSave: async (saveOpts: any) => saveOpts, + beforeRun: async (runOpts: any) => runOpts, + }; + expect(() => client.addMiddleware(middleware)).not.toThrow(); + }); + + test('disallows middleware registration after init', async () => { + const { $test, opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const middleware = { + beforeSave: async (saveOpts: any) => saveOpts, + beforeRun: async (runOpts: any) => runOpts, + }; + + $test.afterPluginsInit(); + + expect(() => client.addMiddleware(middleware)).toThrow( + /Cannot add middleware after the task manager is initialized/i + ); + }); + + function testOpts() { + const $test = { + events: {} as any, + afterPluginsInit: _.noop, + }; + + const opts = { + config: { + get: (path: string) => _.get(defaultConfig, path), + }, + kbnServer: { + uiExports: { + taskDefinitions: {}, + }, + afterPluginsInit(callback: any) { + $test.afterPluginsInit = callback; + }, + }, + server: { + log: sinon.spy(), + decorate(...args: any[]) { + _.set(opts, args.slice(0, -1), _.last(args)); + }, + plugins: { + elasticsearch: { + getCluster() { + return { callWithInternalUser: _.noop }; + }, + status: { + on(eventName: string, callback: () => any) { + $test.events[eventName] = callback; + }, + }, + }, + }, + }, + }; + + return { + $test, + opts, + }; + } +}); diff --git a/x-pack/plugins/task_manager/task_manager.ts b/x-pack/plugins/task_manager/task_manager.ts new file mode 100644 index 000000000000..370ef6d4a2a5 --- /dev/null +++ b/x-pack/plugins/task_manager/task_manager.ts @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { fillPool } from './lib/fill_pool'; +import { Logger, TaskManagerLogger } from './lib/logger'; +import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware'; +import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions'; +import { ConcreteTaskInstance, RunContext, TaskInstance } from './task'; +import { SanitizedTaskDefinition, TaskDefinition, TaskDictionary } from './task'; +import { TaskPoller } from './task_poller'; +import { TaskPool } from './task_pool'; +import { TaskManagerRunner } from './task_runner'; +import { FetchOpts, FetchResult, RemoveResult, TaskStore } from './task_store'; + +/* + * The TaskManager is the public interface into the task manager system. This glues together + * all of the disparate modules in one integration point. The task manager operates in two different ways: + * + * - pre-init, it allows middleware registration, but disallows task manipulation + * - post-init, it disallows middleware registration, but allows task manipulation + * + * Due to its complexity, this is mostly tested by integration tests (see readme). + */ + +/** + * The public interface into the task manager system. + */ +export class TaskManager { + private isInitialized = false; + private maxWorkers: number; + private overrideNumWorkers: { [taskType: string]: number }; + private definitions: TaskDictionary; + private store: TaskStore; + private poller: TaskPoller; + private logger: Logger; + private middleware = { + beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts, + beforeRun: async (runOpts: RunContext) => runOpts, + }; + + /** + * Initializes the task manager, preventing any further addition of middleware, + * enabling the task manipulation methods, and beginning the background polling + * mechanism. + */ + public constructor(kbnServer: any, server: any, config: any) { + this.maxWorkers = config.get('xpack.task_manager.max_workers'); + this.overrideNumWorkers = config.get('xpack.task_manager.override_num_workers'); + this.definitions = {}; + + const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); + + const store = new TaskStore({ + callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser, + index: config.get('xpack.task_manager.index'), + maxAttempts: config.get('xpack.task_manager.max_attempts'), + supportedTypes: Object.keys(this.definitions), + }); + const pool = new TaskPool({ + logger, + maxWorkers: this.maxWorkers, + }); + const createRunner = (instance: ConcreteTaskInstance) => + new TaskManagerRunner({ + logger, + kbnServer, + instance, + store, + definitions: this.definitions, + beforeRun: this.middleware.beforeRun, + }); + const poller = new TaskPoller({ + logger, + pollInterval: config.get('xpack.task_manager.poll_interval'), + work(): Promise { + return fillPool(pool.run, store.fetchAvailableTasks, createRunner); + }, + }); + + this.logger = logger; + this.store = store; + this.poller = poller; + + kbnServer.afterPluginsInit(async () => { + this.isInitialized = true; + store.addSupportedTypes(Object.keys(this.definitions)); + await store.init(); + await poller.start(); + }); + } + + /** + * Method for allowing consumers to register task definitions into the system. + * @param taskDefinitions - The Kibana task definitions dictionary + */ + public registerTaskDefinitions(taskDefinitions: TaskDictionary) { + this.assertUninitialized('register task definitions'); + const duplicate = Object.keys(taskDefinitions).find(k => !!this.definitions[k]); + if (duplicate) { + throw new Error(`Task ${duplicate} is already defined!`); + } + + try { + const sanitized = sanitizeTaskDefinitions( + taskDefinitions, + this.maxWorkers, + this.overrideNumWorkers + ); + + Object.assign(this.definitions, sanitized); + } catch (e) { + this.logger.error('Could not sanitize task definitions'); + } + } + + /** + * Adds middleware to the task manager, such as adding security layers, loggers, etc. + * + * @param {Middleware} middleware - The middlware being added. + */ + public addMiddleware(middleware: Middleware) { + this.assertUninitialized('add middleware'); + const prevMiddleWare = this.middleware; + this.middleware = addMiddlewareToChain(prevMiddleWare, middleware); + } + + /** + * Schedules a task. + * + * @param task - The task being scheduled. + * @returns {Promise} + */ + public async schedule(taskInstance: TaskInstance, options?: any): Promise { + this.assertInitialized('Tasks cannot be scheduled until after task manager is initialized!'); + const { taskInstance: modifiedTask } = await this.middleware.beforeSave({ + ...options, + taskInstance, + }); + const result = await this.store.schedule(modifiedTask); + this.poller.attemptWork(); + return result; + } + + /** + * Fetches a paginatable list of scheduled tasks. + * + * @param opts - The query options used to filter tasks + * @returns {Promise} + */ + public async fetch(opts: FetchOpts): Promise { + this.assertInitialized('Tasks cannot be fetched before task manager is initialized!'); + return this.store.fetch(opts); + } + + /** + * Removes the specified task from the index. + * + * @param {string} id + * @returns {Promise} + */ + public async remove(id: string): Promise { + this.assertInitialized('Tasks cannot be removed before task manager is initialized!'); + return this.store.remove(id); + } + + /** + * Ensures task manager IS NOT already initialized + * + * @param {string} message shown if task manager is already initialized + * @returns void + */ + private assertUninitialized(message: string) { + if (this.isInitialized) { + throw new Error(`Cannot ${message} after the task manager is initialized!`); + } + } + + /** + * Ensures task manager IS already initialized + * + * @param {string} message shown if task manager is not initialized + * @returns void + */ + private assertInitialized(message: string) { + if (!this.isInitialized) { + throw new Error(`NotInitialized: ${message}`); + } + } +} diff --git a/x-pack/plugins/task_manager/task_poller.test.ts b/x-pack/plugins/task_manager/task_poller.test.ts new file mode 100644 index 000000000000..08bb21b5cdd5 --- /dev/null +++ b/x-pack/plugins/task_manager/task_poller.test.ts @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { TaskPoller } from './task_poller'; +import { mockLogger, resolvable, sleep } from './test_utils'; + +describe('TaskPoller', () => { + describe('interval tests', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('runs the work function on an interval', async () => { + const pollInterval = _.random(10, 20); + const done = resolvable(); + const work = sinon.spy(() => { + done.resolve(); + return Promise.resolve(); + }); + const poller = new TaskPoller({ + pollInterval, + work, + logger: mockLogger(), + }); + + poller.start(); + + sinon.assert.calledOnce(work); + await done; + + clock.tick(pollInterval - 1); + sinon.assert.calledOnce(work); + clock.tick(1); + sinon.assert.calledTwice(work); + }); + }); + + test('logs, but does not crash if the work function fails', async () => { + let count = 0; + const logger = mockLogger(); + const doneWorking = resolvable(); + const poller = new TaskPoller({ + logger, + pollInterval: 1, + work: async () => { + ++count; + if (count === 1) { + throw new Error('Dang it!'); + } + if (count > 1) { + poller.stop(); + doneWorking.resolve(); + } + }, + }); + + poller.start(); + + await doneWorking; + + expect(count).toEqual(2); + sinon.assert.calledWithMatch(logger.error, /Dang it/i); + }); + + test('is stoppable', async () => { + const doneWorking = resolvable(); + const work = sinon.spy(async () => { + poller.stop(); + doneWorking.resolve(); + }); + + const poller = new TaskPoller({ + logger: mockLogger(), + pollInterval: 1, + work, + }); + + poller.start(); + await doneWorking; + await sleep(10); + + sinon.assert.calledOnce(work); + }); + + test('disregards duplicate calls to "start"', async () => { + const doneWorking = resolvable(); + const work = sinon.spy(async () => { + await doneWorking; + }); + const poller = new TaskPoller({ + pollInterval: 1, + logger: mockLogger(), + work, + }); + + poller.start(); + poller.start(); + poller.start(); + poller.start(); + + poller.stop(); + + doneWorking.resolve(); + + sinon.assert.calledOnce(work); + }); + + test('waits for work before polling', async () => { + const doneWorking = resolvable(); + const work = sinon.spy(async () => { + await sleep(10); + poller.stop(); + doneWorking.resolve(); + }); + const poller = new TaskPoller({ + pollInterval: 1, + logger: mockLogger(), + work, + }); + + poller.start(); + await doneWorking; + + sinon.assert.calledOnce(work); + }); +}); diff --git a/x-pack/plugins/task_manager/task_poller.ts b/x-pack/plugins/task_manager/task_poller.ts new file mode 100644 index 000000000000..9aefdeb98fc4 --- /dev/null +++ b/x-pack/plugins/task_manager/task_poller.ts @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains the logic for polling the task manager index for new work. + */ + +import { Logger } from './lib/logger'; + +type WorkFn = () => Promise; + +interface Opts { + pollInterval: number; + logger: Logger; + work: WorkFn; +} + +/** + * Performs work on a scheduled interval, logging any errors. This waits for work to complete + * (or error) prior to attempting another run. + */ +export class TaskPoller { + private isStarted = false; + private isWorking = false; + private timeout: any; + private pollInterval: number; + private logger: Logger; + private work: WorkFn; + + /** + * Constructs a new TaskPoller. + * + * @param opts + * @prop {number} pollInterval - How often, in milliseconds, we will run the work function + * @prop {Logger} logger - The task manager logger + * @prop {WorkFn} work - An empty, asynchronous function that performs the desired work + */ + constructor(opts: Opts) { + this.pollInterval = opts.pollInterval; + this.logger = opts.logger; + this.work = opts.work; + } + + /** + * Starts the poller. If the poller is already running, this has no effect. + */ + public async start() { + if (this.isStarted) { + return; + } + this.isStarted = true; + + const poll = async () => { + await this.attemptWork(); + + if (this.isStarted) { + this.timeout = setTimeout(poll, this.pollInterval); + } + }; + + poll(); + } + + /** + * Stops the poller. + */ + public stop() { + this.isStarted = false; + clearTimeout(this.timeout); + this.timeout = undefined; + } + + /** + * Runs the work function. If the work function is currently running, + * this has no effect. + */ + public async attemptWork() { + if (!this.isStarted || this.isWorking) { + return; + } + + this.isWorking = true; + + try { + await this.work(); + } catch (err) { + this.logger.error(`Failed to poll for work: ${err}`); + } finally { + this.isWorking = false; + } + } +} diff --git a/x-pack/plugins/task_manager/task_pool.test.ts b/x-pack/plugins/task_manager/task_pool.test.ts new file mode 100644 index 000000000000..b883831f2c6f --- /dev/null +++ b/x-pack/plugins/task_manager/task_pool.test.ts @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { TaskPool } from './task_pool'; +import { mockLogger, resolvable, sleep } from './test_utils'; + +describe('TaskPool', () => { + test('occupiedWorkers are a sum of worker costs', async () => { + const pool = new TaskPool({ + maxWorkers: 200, + logger: mockLogger(), + }); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 10 }, + { ...mockTask(), numWorkers: 20 }, + { ...mockTask(), numWorkers: 30 }, + ]); + + expect(result).toBeTruthy(); + expect(pool.occupiedWorkers).toEqual(60); + }); + + test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => { + const pool = new TaskPool({ + maxWorkers: 100, + logger: mockLogger(), + }); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 20 }, + { ...mockTask(), numWorkers: 30 }, + { ...mockTask(), numWorkers: 40 }, + ]); + + expect(result).toBeTruthy(); + expect(pool.availableWorkers).toEqual(10); + }); + + test('does not run tasks that are beyond its available capacity', async () => { + const pool = new TaskPool({ + maxWorkers: 10, + logger: mockLogger(), + }); + + const shouldRun = mockRun(); + const shouldNotRun = mockRun(); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 9, run: shouldRun }, + { ...mockTask(), numWorkers: 9, run: shouldNotRun }, + ]); + + expect(result).toBeFalsy(); + expect(pool.availableWorkers).toEqual(1); + sinon.assert.calledOnce(shouldRun); + sinon.assert.notCalled(shouldNotRun); + }); + + test('clears up capacity when a task completes', async () => { + const pool = new TaskPool({ + maxWorkers: 10, + logger: mockLogger(), + }); + + const firstWork = resolvable(); + const firstRun = sinon.spy(async () => { + await sleep(0); + firstWork.resolve(); + }); + const secondWork = resolvable(); + const secondRun = sinon.spy(async () => { + await sleep(0); + secondWork.resolve(); + }); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 9, run: firstRun }, + { ...mockTask(), numWorkers: 2, run: secondRun }, + ]); + + expect(result).toBeFalsy(); + expect(pool.occupiedWorkers).toEqual(9); + expect(pool.availableWorkers).toEqual(1); + + await firstWork; + sinon.assert.calledOnce(firstRun); + sinon.assert.notCalled(secondRun); + + await pool.run([{ ...mockTask(), numWorkers: 2, run: secondRun }]); + + expect(pool.occupiedWorkers).toEqual(2); + expect(pool.availableWorkers).toEqual(8); + + await secondWork; + + expect(pool.occupiedWorkers).toEqual(0); + expect(pool.availableWorkers).toEqual(10); + sinon.assert.calledOnce(secondRun); + }); + + test('run cancels expired tasks prior to running new tasks', async () => { + const pool = new TaskPool({ + maxWorkers: 10, + logger: mockLogger(), + }); + + const expired = resolvable(); + const shouldRun = sinon.spy(() => Promise.resolve()); + const shouldNotRun = sinon.spy(() => Promise.resolve()); + const result = await pool.run([ + { + ...mockTask(), + numWorkers: 9, + async run() { + this.isExpired = true; + expired.resolve(); + await sleep(10); + return { + state: {}, + }; + }, + cancel: shouldRun, + }, + { + ...mockTask(), + numWorkers: 1, + async run() { + await sleep(10); + return { + state: {}, + }; + }, + cancel: shouldNotRun, + }, + ]); + + expect(result).toBeTruthy(); + expect(pool.occupiedWorkers).toEqual(10); + expect(pool.availableWorkers).toEqual(0); + + await expired; + + expect(await pool.run([{ ...mockTask(), numWorkers: 7 }])).toBeTruthy(); + sinon.assert.calledOnce(shouldRun); + sinon.assert.notCalled(shouldNotRun); + + expect(pool.occupiedWorkers).toEqual(8); + expect(pool.availableWorkers).toEqual(2); + }); + + test('logs if cancellation errors', async () => { + const logger = mockLogger(); + const pool = new TaskPool({ + logger, + maxWorkers: 20, + }); + + const cancelled = resolvable(); + const result = await pool.run([ + { + ...mockTask(), + numWorkers: 7, + async run() { + this.isExpired = true; + await sleep(10); + return { + state: {}, + }; + }, + async cancel() { + cancelled.resolve(); + throw new Error('Dern!'); + }, + toString: () => '"shooooo!"', + }, + ]); + + expect(result).toBeTruthy(); + await pool.run([]); + + expect(pool.occupiedWorkers).toEqual(0); + + // Allow the task to cancel... + await cancelled; + + sinon.assert.calledWithMatch(logger.error, /Failed to cancel task "shooooo!"/); + }); + + function mockRun() { + return sinon.spy(async () => sleep(0)); + } + + function mockTask() { + return { + numWorkers: 1, + isExpired: false, + cancel: async () => undefined, + claimOwnership: async () => true, + run: mockRun(), + }; + } +}); diff --git a/x-pack/plugins/task_manager/task_pool.ts b/x-pack/plugins/task_manager/task_pool.ts new file mode 100644 index 000000000000..839f022dae75 --- /dev/null +++ b/x-pack/plugins/task_manager/task_pool.ts @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains the logic that ensures we don't run too many + * tasks at once in a given Kibana instance. + */ + +import { Logger } from './lib/logger'; +import { TaskRunner } from './task_runner'; + +interface Opts { + maxWorkers: number; + logger: Logger; +} + +/** + * Runs tasks in batches, taking costs into account. + */ +export class TaskPool { + private maxWorkers: number; + private running = new Set(); + private logger: Logger; + + /** + * Creates an instance of TaskPool. + * + * @param {Opts} opts + * @prop {number} maxWorkers - The total number of workers / work slots available + * (e.g. maxWorkers is 4, then 2 tasks of cost 2 can run at a time, or 4 tasks of cost 1) + * @prop {Logger} logger - The task manager logger. + */ + constructor(opts: Opts) { + this.maxWorkers = opts.maxWorkers; + this.logger = opts.logger; + } + + /** + * Gets how many workers are currently in use. + */ + get occupiedWorkers() { + const running = Array.from(this.running); // get array from a Set + return running.reduce((total, { numWorkers }) => (total += numWorkers), 0); + } + + /** + * Gets how many workers are currently available. + */ + get availableWorkers() { + return this.maxWorkers - this.occupiedWorkers; + } + + /** + * Attempts to run the specified list of tasks. Returns true if it was able + * to start every task in the list, false if there was not enough capacity + * to run every task. + * + * @param {TaskRunner[]} tasks + * @returns {Promise} + */ + public run = (tasks: TaskRunner[]) => { + this.cancelExpiredTasks(); + return this.attemptToRun(tasks); + }; + + private async attemptToRun(tasks: TaskRunner[]) { + for (const task of tasks) { + if (this.availableWorkers < task.numWorkers) { + return false; + } + + if (await task.claimOwnership()) { + this.running.add(task); + task + .run() + .catch(err => { + this.logger.warning(`Task ${task} failed in attempt to run: ${err}`); + }) + .then(() => this.running.delete(task)); + } + } + + return true; + } + + private cancelExpiredTasks() { + for (const task of this.running) { + if (task.isExpired) { + this.cancelTask(task); + } + } + } + + private async cancelTask(task: TaskRunner) { + try { + this.logger.debug(`Cancelling expired task ${task}.`); + this.running.delete(task); + await task.cancel(); + } catch (err) { + this.logger.error(`Failed to cancel task ${task}: ${err}`); + } + } +} diff --git a/x-pack/plugins/task_manager/task_runner.test.ts b/x-pack/plugins/task_manager/task_runner.test.ts new file mode 100644 index 000000000000..a3866c65f9d1 --- /dev/null +++ b/x-pack/plugins/task_manager/task_runner.test.ts @@ -0,0 +1,289 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { minutesFromNow } from './lib/intervals'; +import { ConcreteTaskInstance } from './task'; +import { TaskManagerRunner } from './task_runner'; + +describe('TaskManagerRunner', () => { + test('provides details about the task that is running', () => { + const { runner } = testOpts({ + instance: { + id: 'foo', + taskType: 'bar', + }, + }); + + expect(runner.id).toEqual('foo'); + expect(runner.taskType).toEqual('bar'); + expect(runner.toString()).toEqual('bar "foo"'); + }); + + test('warns if the task returns an unexpected result', async () => { + await allowsReturnType(undefined); + await allowsReturnType({}); + await allowsReturnType({ + runAt: new Date(), + }); + await allowsReturnType({ + error: new Error('Dang it!'), + }); + await allowsReturnType({ + state: { shazm: true }, + }); + await disallowsReturnType('hm....'); + await disallowsReturnType({ + whatIsThis: '?!!?', + }); + }); + + test('queues a reattempt if the task fails', async () => { + const initialAttempts = _.random(0, 2); + const id = Date.now().toString(); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + params: { a: 'b' }, + state: { hey: 'there' }, + }, + definitions: { + testtype: { + createTaskRunner: () => ({ + async run() { + throw new Error('Dangit!'); + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.id).toEqual(id); + expect(instance.attempts).toEqual(initialAttempts + 1); + expect(instance.runAt.getTime()).toBeGreaterThan(Date.now()); + expect(instance.params).toEqual({ a: 'b' }); + expect(instance.state).toEqual({ hey: 'there' }); + }); + + test('reschedules tasks that have an interval', async () => { + const { runner, store } = testOpts({ + instance: { + interval: '10m', + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime()); + expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime()); + }); + + test('reschedules tasks that return a runAt', async () => { + const runAt = minutesFromNow(_.random(1, 10)); + const { runner, store } = testOpts({ + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + return { runAt }; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWithMatch(store.update, { runAt }); + }); + + test('tasks that return runAt override interval', async () => { + const runAt = minutesFromNow(_.random(5)); + const { runner, store } = testOpts({ + instance: { + interval: '20m', + }, + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + return { runAt }; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWithMatch(store.update, { runAt }); + }); + + test('removes non-recurring tasks after they complete', async () => { + const id = _.random(1, 20).toString(); + const { runner, store } = testOpts({ + instance: { + id, + interval: undefined, + }, + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + return undefined; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.remove); + sinon.assert.calledWith(store.remove, id); + }); + + test('cancel cancels the task runner, if it is cancellable', async () => { + let wasCancelled = false; + const { runner, logger } = testOpts({ + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + await new Promise(r => setTimeout(r, 1000)); + }, + async cancel() { + wasCancelled = true; + }, + }), + }, + }, + }); + + const promise = runner.run(); + await new Promise(r => setInterval(r, 1)); + await runner.cancel(); + await promise; + + expect(wasCancelled).toBeTruthy(); + sinon.assert.neverCalledWithMatch(logger.warning, /not cancellable/); + }); + + test('warns if cancel is called on a non-cancellable task', async () => { + const { runner, logger } = testOpts({ + definitions: { + testType: { + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + const promise = runner.run(); + await runner.cancel(); + await promise; + + sinon.assert.calledWithMatch(logger.warning, /not cancellable/); + }); + + interface TestOpts { + instance?: Partial; + definitions?: any; + } + + function testOpts(opts: TestOpts) { + const callCluster = sinon.stub(); + const createTaskRunner = sinon.stub(); + const logger = { + error: sinon.stub(), + debug: sinon.stub(), + info: sinon.stub(), + warning: sinon.stub(), + }; + const store = { + update: sinon.stub(), + remove: sinon.stub(), + maxAttempts: 5, + }; + const runner = new TaskManagerRunner({ + kbnServer: sinon.stub(), + beforeRun: context => Promise.resolve(context), + logger, + store, + instance: Object.assign( + { + id: 'foo', + taskType: 'bar', + version: 32, + runAt: new Date(), + attempts: 0, + params: {}, + scope: ['reporting'], + state: {}, + status: 'idle', + user: 'example', + }, + opts.instance || {} + ), + definitions: Object.assign(opts.definitions || {}, { + testbar: { + type: 'bar', + title: 'Bar!', + createTaskRunner, + }, + }), + }); + + return { + callCluster, + createTaskRunner, + runner, + logger, + store, + }; + } + + async function testReturn(result: any, shouldBeValid: boolean) { + const { runner, logger } = testOpts({ + definitions: { + bar: { + createTaskRunner: () => ({ + run: async () => result, + }), + }, + }, + }); + + await runner.run(); + + if (shouldBeValid) { + sinon.assert.notCalled(logger.warning); + } else { + sinon.assert.calledWith(logger.warning, sinon.match(/invalid task result/i)); + } + } + + function allowsReturnType(result: any) { + return testReturn(result, true); + } + + function disallowsReturnType(result: any) { + return testReturn(result, false); + } +}); diff --git a/x-pack/plugins/task_manager/task_runner.ts b/x-pack/plugins/task_manager/task_runner.ts new file mode 100644 index 000000000000..d5e0196d8086 --- /dev/null +++ b/x-pack/plugins/task_manager/task_runner.ts @@ -0,0 +1,273 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains the core logic for running an individual task. + * It handles the full lifecycle of a task run, including error handling, + * rescheduling, middleware application, etc. + */ + +import Joi from 'joi'; +import { intervalFromNow, minutesFromNow } from './lib/intervals'; +import { Logger } from './lib/logger'; +import { BeforeRunFunction } from './lib/middleware'; +import { + CancelFunction, + CancellableTask, + ConcreteTaskInstance, + RunResult, + SanitizedTaskDefinition, + TaskDictionary, + validateRunResult, +} from './task'; +import { RemoveResult } from './task_store'; + +export interface TaskRunner { + numWorkers: number; + isExpired: boolean; + cancel: CancelFunction; + claimOwnership: () => Promise; + run: () => Promise; + toString?: () => string; +} + +interface Updatable { + readonly maxAttempts: number; + update(doc: ConcreteTaskInstance): Promise; + remove(id: string): Promise; +} + +interface Opts { + logger: Logger; + definitions: TaskDictionary; + instance: ConcreteTaskInstance; + store: Updatable; + kbnServer: any; + beforeRun: BeforeRunFunction; +} + +/** + * Runs a background task, ensures that errors are properly handled, + * allows for cancellation. + * + * @export + * @class TaskManagerRunner + * @implements {TaskRunner} + */ +export class TaskManagerRunner implements TaskRunner { + private task?: CancellableTask; + private instance: ConcreteTaskInstance; + private definitions: TaskDictionary; + private logger: Logger; + private store: Updatable; + private kbnServer: any; + private beforeRun: BeforeRunFunction; + + /** + * Creates an instance of TaskManagerRunner. + * @param {Opts} opts + * @prop {Logger} logger - The task manager logger + * @prop {TaskDefinition} definition - The definition of the task being run + * @prop {ConcreteTaskInstance} instance - The record describing this particular task instance + * @prop {Updatable} store - The store used to read / write tasks instance info + * @prop {kbnServer} kbnServer - An async function that provides the task's run context + * @prop {BeforeRunFunction} beforeRun - A function that adjusts the run context prior to running the task + * @memberof TaskManagerRunner + */ + constructor(opts: Opts) { + this.instance = sanitizeInstance(opts.instance); + this.definitions = opts.definitions; + this.logger = opts.logger; + this.store = opts.store; + this.kbnServer = opts.kbnServer; + this.beforeRun = opts.beforeRun; + } + + /** + * Gets how many workers are occupied by this task instance. + * Per Joi validation logic, this will return a number >= 1 + */ + public get numWorkers() { + return this.definition.numWorkers; + } + + /** + * Gets the id of this task instance. + */ + public get id() { + return this.instance.id; + } + + /** + * Gets the task type of this task instance. + */ + public get taskType() { + return this.instance.taskType; + } + + /** + * Gets the task defintion from the dictionary. + */ + public get definition() { + return this.definitions[this.taskType]; + } + + /** + * Gets whether or not this task has run longer than its expiration setting allows. + */ + public get isExpired() { + return this.instance.runAt < new Date(); + } + + /** + * Returns a log-friendly representation of this task. + */ + public toString() { + return `${this.taskType} "${this.id}"`; + } + + /** + * Runs the task, handling the task result, errors, etc, rescheduling if need + * be. NOTE: the time of applying the middleware's beforeRun is incorporated + * into the total timeout time the task in configured with. We may decide to + * start the timer after beforeRun resolves + * + * @returns {Promise} + */ + public async run(): Promise { + this.logger.debug(`Running task ${this}`); + const modifiedContext = await this.beforeRun({ + kbnServer: this.kbnServer, + taskInstance: this.instance, + }); + + try { + this.task = this.definition.createTaskRunner(modifiedContext); + const result = await this.task.run(); + const validatedResult = this.validateResult(result); + return this.processResult(validatedResult); + } catch (err) { + this.logger.error(`Task ${this} failed: ${err}`); + + // in error scenario, we can not get the RunResult + // re-use modifiedContext's state, which is correct as of beforeRun + return this.processResult({ error: err, state: modifiedContext.taskInstance.state }); + } + } + + /** + * Attempts to claim exclusive rights to run the task. If the attempt fails + * with a 409 (http conflict), we assume another Kibana instance beat us to the punch. + * + * @returns {Promise} + */ + public async claimOwnership(): Promise { + const VERSION_CONFLICT_STATUS = 409; + + try { + this.instance = await this.store.update({ + ...this.instance, + status: 'running', + runAt: intervalFromNow(this.definition.timeout)!, + }); + + return true; + } catch (error) { + if (error.statusCode !== VERSION_CONFLICT_STATUS) { + throw error; + } + } + + return false; + } + + /** + * Attempts to cancel the task. + * + * @returns {Promise} + */ + public async cancel() { + const { task } = this; + if (task && task.cancel) { + this.task = undefined; + return task.cancel(); + } + + this.logger.warning(`The task ${this} is not cancellable.`); + } + + private validateResult(result?: RunResult | void): RunResult { + const { error } = Joi.validate(result, validateRunResult); + + if (error) { + this.logger.warning(`Invalid task result for ${this}: ${error.message}`); + } + + return result || { state: {} }; + } + + private async processResultForRecurringTask(result: RunResult): Promise { + // recurring task: update the task instance + const state = result.state || this.instance.state || {}; + const status = this.instance.attempts < this.store.maxAttempts ? 'idle' : 'failed'; + + let runAt; + if (status === 'failed') { + // task run errored, keep the same runAt + runAt = this.instance.runAt; + } else { + runAt = + result.runAt || + intervalFromNow(this.instance.interval) || + // when result.error is truthy, then we're retrying because it failed + minutesFromNow((this.instance.attempts + 1) * 5); // incrementally backs off an extra 5m per failure + } + + await this.store.update({ + ...this.instance, + runAt, + state, + status, + attempts: result.error ? this.instance.attempts + 1 : 0, + }); + + return result; + } + + private async processResultWhenDone(result: RunResult): Promise { + // not a recurring task: clean up by removing the task instance from store + try { + await this.store.remove(this.instance.id); + } catch (err) { + if (err.statusCode === 404) { + this.logger.warning( + `Task cleanup of ${this} failed in processing. Was remove called twice?` + ); + } else { + throw err; + } + } + + return result; + } + + private async processResult(result: RunResult): Promise { + if (result.runAt || this.instance.interval || result.error) { + await this.processResultForRecurringTask(result); + } else { + await this.processResultWhenDone(result); + } + return result; + } +} + +function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance { + return { + ...instance, + params: instance.params || {}, + state: instance.state || {}, + }; +} diff --git a/x-pack/plugins/task_manager/task_store.test.ts b/x-pack/plugins/task_manager/task_store.test.ts new file mode 100644 index 000000000000..a76ba03de0fa --- /dev/null +++ b/x-pack/plugins/task_manager/task_store.test.ts @@ -0,0 +1,515 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { TaskInstance, TaskStatus } from './task'; +import { FetchOpts, TaskStore } from './task_store'; + +describe('TaskStore', () => { + describe('init', () => { + test('creates the task manager index', async () => { + const callCluster = sinon.spy(); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['a', 'b', 'c'], + }); + + await store.init(); + + sinon.assert.calledOnce(callCluster); + + sinon.assert.calledWithMatch(callCluster, 'indices.putTemplate', { + body: { + index_patterns: ['tasky'], + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + }, + }, + name: 'tasky', + }); + }); + }); + + describe('schedule', () => { + async function testSchedule(task: TaskInstance) { + const callCluster = sinon.spy(() => + Promise.resolve({ + _id: 'testid', + _version: 3344, + }) + ); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['report', 'dernstraight', 'yawn'], + }); + const result = await store.schedule(task); + + sinon.assert.calledTwice(callCluster); + + return { result, callCluster, arg: callCluster.args[1][1] }; + } + + test('serializes the params and state', async () => { + const task = { + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + const { callCluster, arg } = await testSchedule(task); + + sinon.assert.calledWith(callCluster, 'index'); + + expect(arg).toMatchObject({ + index: 'tasky', + type: '_doc', + body: { + task: { + params: JSON.stringify(task.params), + state: JSON.stringify(task.state), + }, + }, + }); + }); + + test('retiurns a concrete task instance', async () => { + const task = { + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + const { result } = await testSchedule(task); + + expect(result).toMatchObject({ + ...task, + version: 3344, + id: 'testid', + }); + }); + + test('sets runAt to now if not specified', async () => { + const now = Date.now(); + const { arg } = await testSchedule({ taskType: 'dernstraight', params: {}, state: {} }); + expect(arg.body.task.runAt.getTime()).toBeGreaterThanOrEqual(now); + }); + + test('ensures params and state are not null', async () => { + const { arg } = await testSchedule({ taskType: 'yawn' } as any); + expect(arg.body.task.params).toEqual('{}'); + expect(arg.body.task.state).toEqual('{}'); + }); + + test('errors if the task type is unknown', async () => { + await expect(testSchedule({ taskType: 'nope', params: {}, state: {} })).rejects.toThrow( + /Unsupported task type "nope"/i + ); + }); + }); + + describe('fetch', () => { + async function testFetch(opts?: FetchOpts, hits: any[] = []) { + const callCluster = sinon.spy(async () => ({ hits: { hits } })); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['a', 'b', 'c'], + }); + + const result = await store.fetch(opts); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'search'); + + return { + result, + args: callCluster.args[0][1], + }; + } + + test('empty call filters by type, sorts by runAt and id', async () => { + const { args } = await testFetch(); + expect(args).toMatchObject({ + type: '_doc', + index: 'tasky', + body: { + sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }], + query: { term: { type: 'task' } }, + }, + }); + }); + + test('allows custom queries', async () => { + const { args } = await testFetch({ + query: { + term: { 'task.taskType': 'bar' }, + }, + }); + + expect(args).toMatchObject({ + body: { + query: { + bool: { + must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'bar' } }], + }, + }, + }, + }); + }); + + test('sorts by id if custom sort does not have an id sort in it', async () => { + const { args } = await testFetch({ + sort: [{ 'task.taskType': 'desc' }], + }); + + expect(args).toMatchObject({ + body: { + sort: [{ 'task.taskType': 'desc' }, { _id: 'desc' }], + }, + }); + }); + + test('allows custom sort by id', async () => { + const { args } = await testFetch({ + sort: [{ _id: 'asc' }], + }); + + expect(args).toMatchObject({ + body: { + sort: [{ _id: 'asc' }], + }, + }); + }); + + test('allows specifying pagination', async () => { + const now = new Date(); + const searchAfter = [now, '143243sdafa32']; + const { args } = await testFetch({ + searchAfter, + }); + + expect(args).toMatchObject({ + body: { + search_after: searchAfter, + }, + }); + }); + + test('returns paginated tasks', async () => { + const runAt = new Date(); + const { result } = await testFetch(undefined, [ + { + _id: 'aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + interval: undefined, + attempts: 0, + status: 'idle', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + }, + }, + sort: ['a', 1], + }, + { + _id: 'bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + interval: '5m', + attempts: 2, + status: 'running', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + }, + }, + sort: ['b', 2], + }, + ]); + + expect(result).toEqual({ + docs: [ + { + attempts: 0, + id: 'aaa', + interval: undefined, + params: { hello: 'world' }, + runAt, + scope: ['reporting'], + state: { baby: 'Henhen' }, + status: 'idle', + taskType: 'foo', + user: 'jimbo', + version: undefined, + }, + { + attempts: 2, + id: 'bbb', + interval: '5m', + params: { shazm: 1 }, + runAt, + scope: ['reporting', 'ceo'], + state: { henry: 'The 8th' }, + status: 'running', + taskType: 'bar', + user: 'dabo', + version: undefined, + }, + ], + searchAfter: ['b', 2], + }); + }); + }); + + describe('fetchAvailableTasks', () => { + async function testFetchAvailableTasks({ opts = {}, hits = [] }: any = {}) { + const callCluster = sinon.spy(async () => ({ hits: { hits } })); + const store = new TaskStore({ + callCluster, + supportedTypes: ['a', 'b', 'c'], + index: 'tasky', + maxAttempts: 2, + ...opts, + }); + + const result = await store.fetchAvailableTasks(); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'search'); + + return { + result, + args: callCluster.args[0][1], + }; + } + + test('it returns normally with no tasks when the index does not exist.', async () => { + const callCluster = sinon.spy(async () => ({ hits: { hits: [] } })); + const store = new TaskStore({ + callCluster, + supportedTypes: ['a', 'b', 'c'], + index: 'tasky', + maxAttempts: 2, + }); + + const result = await store.fetchAvailableTasks(); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWithMatch(callCluster, 'search', { ignoreUnavailable: true }); + + expect(result.length).toBe(0); + }); + + test('it filters tasks by supported types, maxAttempts, and runAt', async () => { + const maxAttempts = _.random(2, 43); + const index = `index_${_.random(1, 234)}`; + const { args } = await testFetchAvailableTasks({ + opts: { + index, + maxAttempts, + supportedTypes: ['foo', 'bar'], + }, + }); + expect(args).toMatchObject({ + body: { + query: { + bool: { + must: [ + { term: { type: 'task' } }, + { + bool: { + must: [ + { terms: { 'task.taskType': ['foo', 'bar'] } }, + { range: { 'task.attempts': { lte: maxAttempts } } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + ], + }, + }, + size: 10, + sort: { 'task.runAt': { order: 'asc' } }, + version: true, + }, + index, + type: '_doc', + }); + }); + + test('it returns task objects', async () => { + const runAt = new Date(); + const { result } = await testFetchAvailableTasks({ + hits: [ + { + _id: 'aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + interval: undefined, + attempts: 0, + status: 'idle', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + }, + }, + sort: ['a', 1], + }, + { + _id: 'bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + interval: '5m', + attempts: 2, + status: 'running', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + }, + }, + sort: ['b', 2], + }, + ], + }); + expect(result).toMatchObject([ + { + attempts: 0, + id: 'aaa', + interval: undefined, + params: { hello: 'world' }, + runAt, + scope: ['reporting'], + state: { baby: 'Henhen' }, + status: 'idle', + taskType: 'foo', + user: 'jimbo', + version: undefined, + }, + { + attempts: 2, + id: 'bbb', + interval: '5m', + params: { shazm: 1 }, + runAt, + scope: ['reporting', 'ceo'], + state: { henry: 'The 8th' }, + status: 'running', + taskType: 'bar', + user: 'dabo', + version: undefined, + }, + ]); + }); + }); + + describe('update', () => { + test('refreshes the index, handles versioning', async () => { + const runAt = new Date(); + const task = { + runAt, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + version: 2, + attempts: 3, + status: 'idle' as TaskStatus, + }; + + const callCluster = sinon.spy(async () => ({ _version: task.version + 1 })); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['a', 'b', 'c'], + }); + + const result = await store.update(task); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'update'); + + expect(callCluster.args[0][1]).toMatchObject({ + id: task.id, + index: 'tasky', + type: '_doc', + version: 2, + refresh: true, + body: { + doc: { + task: { + ...['id', 'version'].reduce((acc, prop) => _.omit(acc, prop), task), + params: JSON.stringify(task.params), + state: JSON.stringify(task.state), + }, + }, + }, + }); + + expect(result).toEqual({ ...task, version: 3 }); + }); + }); + + describe('remove', () => { + test('removes the task with the specified id', async () => { + const id = `id-${_.random(1, 20)}`; + const callCluster = sinon.spy(() => + Promise.resolve({ + _index: 'myindex', + _id: id, + _version: 32, + result: 'deleted', + }) + ); + const store = new TaskStore({ + callCluster, + index: 'myindex', + maxAttempts: 2, + supportedTypes: ['a'], + }); + const result = await store.remove(id); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'delete'); + + expect(result).toEqual({ + id, + index: 'myindex', + version: 32, + result: 'deleted', + }); + + expect(callCluster.args[0][1]).toMatchObject({ + id, + index: 'myindex', + type: '_doc', + refresh: true, + }); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts new file mode 100644 index 000000000000..84ee7ba8bb11 --- /dev/null +++ b/x-pack/plugins/task_manager/task_store.ts @@ -0,0 +1,375 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains helpers for managing the task manager storage layer. + */ + +import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task'; + +const DOC_TYPE = '_doc'; + +export interface StoreOpts { + callCluster: ElasticJs; + index: string; + maxAttempts: number; + supportedTypes: string[]; +} + +export interface FetchOpts { + searchAfter?: any[]; + sort?: object[]; + query?: object; +} + +export interface FetchResult { + searchAfter: any[]; + docs: ConcreteTaskInstance[]; +} + +export interface RemoveResult { + index: string; + id: string; + version: string; + result: string; +} + +// Internal, the raw document, as stored in the Kibana index. +export interface RawTaskDoc { + _id: string; + _index: string; + _type: string; + _version: number; + _source: { + type: string; + task: { + taskType: string; + runAt: Date; + interval?: string; + attempts: number; + status: TaskStatus; + params: string; + state: string; + user?: string; + scope?: string[]; + }; + }; +} + +/** + * Wraps an elasticsearch connection and provides a task manager-specific + * interface into the index. + */ +export class TaskStore { + public readonly maxAttempts: number; + private callCluster: ElasticJs; + private index: string; + private supportedTypes: string[]; + private wasInitialized = false; + + /** + * Constructs a new TaskStore. + * @param {StoreOpts} opts + * @prop {CallCluster} callCluster - The elastic search connection + * @prop {string} index - The name of the task manager index + * @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned + * @prop {string[]} supportedTypes - The task types supported by this store + */ + constructor(opts: StoreOpts) { + this.callCluster = opts.callCluster; + this.index = opts.index; + this.maxAttempts = opts.maxAttempts; + this.supportedTypes = opts.supportedTypes; + + this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this); + } + + public addSupportedTypes(types: string[]) { + if (!this.wasInitialized) { + this.supportedTypes = this.supportedTypes.concat(types); + } else { + throw new Error('Cannot add task types after initialization'); + } + } + + /** + * Initializes the store, ensuring the task manager index is created and up to date. + */ + public async init() { + if (this.wasInitialized) { + throw new Error('TaskStore has already been initialized!'); + } + + const properties = { + type: { type: 'keyword' }, + task: { + properties: { + taskType: { type: 'keyword' }, + runAt: { type: 'date' }, + interval: { type: 'text' }, + attempts: { type: 'integer' }, + status: { type: 'keyword' }, + params: { type: 'text' }, + state: { type: 'text' }, + user: { type: 'keyword' }, + scope: { type: 'keyword' }, + }, + }, + }; + + try { + const templateResult = await this.callCluster('indices.putTemplate', { + name: this.index, + body: { + index_patterns: [this.index], + mappings: { + _doc: { + dynamic: 'strict', + properties, + }, + }, + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + }, + }, + }); + this.wasInitialized = true; + return templateResult; + } catch (err) { + throw err; + } + + return; + } + + /** + * Schedules a task. + * + * @param task - The task being scheduled. + */ + public async schedule(taskInstance: TaskInstance): Promise { + if (!this.wasInitialized) { + await this.init(); + } + + if (!this.supportedTypes.includes(taskInstance.taskType)) { + throw new Error( + `Unsupported task type "${ + taskInstance.taskType + }". Supported types are ${this.supportedTypes.join(', ')}` + ); + } + + const { id, ...body } = rawSource(taskInstance); + const result = await this.callCluster('index', { + id, + body, + index: this.index, + type: DOC_TYPE, + refresh: true, + }); + + const { task } = body; + return { + ...taskInstance, + id: result._id, + version: result._version, + attempts: 0, + status: task.status, + runAt: task.runAt, + state: taskInstance.state || {}, + }; + } + + /** + * Fetches a paginatable list of scheduled tasks. + * + * @param opts - The query options used to filter tasks + */ + public async fetch(opts: FetchOpts = {}): Promise { + const sort = paginatableSort(opts.sort); + return this.search({ + sort, + search_after: opts.searchAfter, + query: opts.query, + }); + } + + /** + * Fetches tasks from the index, which are ready to be run. + * - runAt is now or past + * - id is not currently running in this instance of Kibana + * - has a type that is in our task definitions + * + * @param {TaskQuery} query + * @prop {string[]} types - Task types to be queried + * @prop {number} size - The number of task instances to retrieve + * @returns {Promise} + */ + public async fetchAvailableTasks(): Promise { + const { docs } = await this.search({ + query: { + bool: { + must: [ + { terms: { 'task.taskType': this.supportedTypes } }, + { range: { 'task.attempts': { lte: this.maxAttempts } } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + size: 10, + sort: { 'task.runAt': { order: 'asc' } }, + version: true, + }); + + return docs; + } + + /** + * Updates the specified doc in the index, returning the doc + * with its version up to date. + * + * @param {TaskDoc} doc + * @returns {Promise} + */ + public async update(doc: ConcreteTaskInstance): Promise { + const rawDoc = taskDocToRaw(doc, this.index); + + const { _version } = await this.callCluster('update', { + body: { + doc: rawDoc._source, + }, + id: doc.id, + index: this.index, + type: DOC_TYPE, + version: doc.version, + // The refresh is important so that if we immediately look for work, + // we don't pick up this task. + refresh: true, + }); + + return { + ...doc, + version: _version, + }; + } + + /** + * Removes the specified task from the index. + * + * @param {string} id + * @returns {Promise} + */ + public async remove(id: string): Promise { + const result = await this.callCluster('delete', { + id, + index: this.index, + type: DOC_TYPE, + // The refresh is important so that if we immediately look for work, + // we don't pick up this task. + refresh: true, + }); + + return { + index: result._index, + id: result._id, + version: result._version, + result: result.result, + }; + } + + private async search(opts: any = {}): Promise { + const originalQuery = opts.query; + const queryOnlyTasks = { term: { type: 'task' } }; + const query = originalQuery + ? { bool: { must: [queryOnlyTasks, originalQuery] } } + : queryOnlyTasks; + + const result = await this.callCluster('search', { + type: DOC_TYPE, + index: this.index, + ignoreUnavailable: true, + body: { + ...opts, + query, + }, + }); + + const rawDocs = result.hits.hits; + + return { + docs: (rawDocs as RawTaskDoc[]).map(rawToTaskDoc), + searchAfter: (rawDocs.length && rawDocs[rawDocs.length - 1].sort) || [], + }; + } +} + +function paginatableSort(sort: any[] = []) { + const sortById = { _id: 'desc' }; + + if (!sort.length) { + return [{ 'task.runAt': 'asc' }, sortById]; + } + + if (sort.find(({ _id }) => !!_id)) { + return sort; + } + + return [...sort, sortById]; +} + +function rawSource(doc: TaskInstance) { + const { id, ...taskFields } = doc; + const source = { + ...taskFields, + params: JSON.stringify(doc.params || {}), + state: JSON.stringify(doc.state || {}), + attempts: (doc as ConcreteTaskInstance).attempts || 0, + runAt: doc.runAt || new Date(), + status: (doc as ConcreteTaskInstance).status || 'idle', + }; + + delete (source as any).id; + delete (source as any).version; + delete (source as any).type; + + return { + id, + type: 'task', + task: source, + }; +} + +function taskDocToRaw(doc: ConcreteTaskInstance, index: string): RawTaskDoc { + const { type, task } = rawSource(doc); + + return { + _id: doc.id, + _index: index, + _source: { type, task }, + _type: DOC_TYPE, + _version: doc.version, + }; +} + +function rawToTaskDoc(doc: RawTaskDoc): ConcreteTaskInstance { + return { + ...doc._source.task, + id: doc._id, + version: doc._version, + params: parseJSONField(doc._source.task.params, 'params', doc), + state: parseJSONField(doc._source.task.state, 'state', doc), + }; +} + +function parseJSONField(json: string, fieldName: string, doc: RawTaskDoc) { + try { + return json ? JSON.parse(json) : {}; + } catch (error) { + throw new Error(`Task "${doc._id}"'s ${fieldName} field has invalid JSON: ${json}`); + } +} diff --git a/x-pack/plugins/task_manager/test_utils/index.ts b/x-pack/plugins/task_manager/test_utils/index.ts new file mode 100644 index 000000000000..6a427de41b31 --- /dev/null +++ b/x-pack/plugins/task_manager/test_utils/index.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * A handful of helper functions for testing the task manager. + */ + +import sinon from 'sinon'; + +// Caching this here to avoid setTimeout mocking affecting our tests. +const nativeTimeout = setTimeout; + +/** + * Creates a mock task manager Logger. + */ +export function mockLogger() { + return { + info: sinon.stub(), + debug: sinon.stub(), + warning: sinon.stub(), + error: sinon.stub(), + }; +} + +interface Resolvable { + resolve: () => void; +} + +/** + * Creates a promise which can be resolved externally, useful for + * coordinating async tests. + */ +export function resolvable(): PromiseLike & Resolvable { + let resolve: () => void; + const result = new Promise(r => (resolve = r)) as any; + + result.resolve = () => nativeTimeout(resolve, 0); + + return result; +} + +/** + * A simple helper for waiting a specified number of milliseconds. + * + * @param {number} ms + */ +export async function sleep(ms: number) { + return new Promise(r => nativeTimeout(r, ms)); +} diff --git a/x-pack/scripts/functional_tests.js b/x-pack/scripts/functional_tests.js index a97748f87c9d..67d13381b3a7 100644 --- a/x-pack/scripts/functional_tests.js +++ b/x-pack/scripts/functional_tests.js @@ -10,6 +10,7 @@ require('@kbn/test').runTestsCli([ require.resolve('../test/reporting/configs/chromium_functional.js'), require.resolve('../test/functional/config.js'), require.resolve('../test/api_integration/config.js'), + require.resolve('../test/plugin_api_integration/config.js'), require.resolve('../test/saml_api_integration/config.js'), require.resolve('../test/token_api_integration/config.js'), require.resolve('../test/spaces_api_integration/spaces_only/config'), diff --git a/x-pack/test/plugin_api_integration/config.js b/x-pack/test/plugin_api_integration/config.js new file mode 100644 index 000000000000..e4f7743c4444 --- /dev/null +++ b/x-pack/test/plugin_api_integration/config.js @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import path from 'path'; +import fs from 'fs'; + +export default async function ({ readConfigFile }) { + const integrationConfig = await readConfigFile(require.resolve('../api_integration/config')); + const kibanaFunctionalConfig = await readConfigFile(require.resolve('../../../test/functional/config.js')); + + // Find all folders in ./plugins since we treat all them as plugin folder + const allFiles = fs.readdirSync(path.resolve(__dirname, 'plugins')); + const plugins = allFiles.filter(file => fs.statSync(path.resolve(__dirname, 'plugins', file)).isDirectory()); + + return { + testFiles: [ + require.resolve('./test_suites/task_manager'), + ], + services: { + retry: kibanaFunctionalConfig.get('services.retry'), + ...integrationConfig.get('services'), + }, + pageObjects: integrationConfig.get('pageObjects'), + servers: integrationConfig.get('servers'), + esTestCluster: integrationConfig.get('esTestCluster'), + apps: integrationConfig.get('apps'), + esArchiver: { + directory: path.resolve(__dirname, '../es_archives') + }, + screenshots: integrationConfig.get('screenshots'), + junit: { + reportName: 'Plugin Functional Tests', + }, + kbnTestServer: { + ...integrationConfig.get('kbnTestServer'), + serverArgs: [ + ...integrationConfig.get('kbnTestServer.serverArgs'), + ...plugins.map(pluginDir => `--plugin-path=${path.resolve(__dirname, 'plugins', pluginDir)}`), + ], + }, + }; +} + diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/index.js b/x-pack/test/plugin_api_integration/plugins/task_manager/index.js new file mode 100644 index 000000000000..8d9a949f63f9 --- /dev/null +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/index.js @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { initRoutes } from './init_routes'; + +export default function (kibana) { + return new kibana.Plugin({ + name: 'sampleTask', + require: ['elasticsearch', 'task_manager'], + + config(Joi) { + return Joi.object({ + enabled: Joi.boolean().default(true), + }).default(); + }, + + init(server) { + const { taskManager } = server; + + taskManager.registerTaskDefinitions({ + sampleTask: { + title: 'Sample Task', + description: 'A sample task for testing the task_manager.', + timeout: '1m', + numWorkers: 2, + + // This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc) + // taskInstance.params has the following optional fields: + // nextRunMilliseconds: number - If specified, the run method will return a runAt that is now + nextRunMilliseconds + // failWith: string - If specified, the task will throw an error with the specified message + createTaskRunner: ({ kbnServer, taskInstance }) => ({ + async run() { + const { params, state } = taskInstance; + const prevState = state || { count: 0 }; + + if (params.failWith) { + throw new Error(params.failWith); + } + + const callCluster = kbnServer.server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; + await callCluster('index', { + index: '.task_manager_test_result', + type: '_doc', + body: { + type: 'task', + taskId: taskInstance.id, + params: JSON.stringify(params), + state: JSON.stringify(state), + ranAt: new Date(), + }, + refresh: true, + }); + + return { + state: { count: (prevState.count || 0) + 1 }, + runAt: millisecondsFromNow(params.nextRunMilliseconds), + }; + }, + }), + }, + }); + + taskManager.addMiddleware({ + async beforeSave({ taskInstance, ...opts }) { + const modifiedInstance = { + ...taskInstance, + params: { + originalParams: taskInstance.params, + superFly: 'My middleware param!', + }, + }; + + return { + ...opts, + taskInstance: modifiedInstance, + }; + }, + + async beforeRun({ taskInstance, ...opts }) { + return { + ...opts, + taskInstance: { + ...taskInstance, + params: taskInstance.params.originalParams, + }, + }; + }, + }); + + initRoutes(server); + }, + }); +} + +function millisecondsFromNow(ms) { + if (!ms) { + return; + } + + const dt = new Date(); + dt.setTime(dt.getTime() + ms); + return dt; +} diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js new file mode 100644 index 000000000000..c266d756377d --- /dev/null +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Joi from 'joi'; + +export function initRoutes(server) { + const { taskManager } = server; + + server.route({ + path: '/api/sample_tasks', + method: 'POST', + config: { + validate: { + payload: Joi.object({ + taskType: Joi.string().required(), + interval: Joi.string().optional(), + params: Joi.object().required(), + state: Joi.object().optional(), + id: Joi.string().optional(), + }), + }, + }, + async handler(request) { + try { + const task = await taskManager.schedule(request.payload, { request }); + return task; + } catch (err) { + return err; + } + }, + }); + + server.route({ + path: '/api/sample_tasks', + method: 'GET', + async handler() { + try { + return taskManager.fetch(); + } catch (err) { + return err; + } + } + }); + + server.route({ + path: '/api/sample_tasks', + method: 'DELETE', + async handler() { + try { + const { docs: tasks } = await taskManager.fetch(); + return Promise.all(tasks.map((task) => taskManager.remove(task.id))); + } catch (err) { + return err; + } + }, + }); +} diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/package.json b/x-pack/test/plugin_api_integration/plugins/task_manager/package.json new file mode 100644 index 000000000000..ede03a08a272 --- /dev/null +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/package.json @@ -0,0 +1,4 @@ +{ + "name": "sample_task_plugin", + "version": "kibana" +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js new file mode 100644 index 000000000000..c3efe56c80e8 --- /dev/null +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export default function ({ loadTestFile }) { + describe('task_manager', function taskManagerSuite() { + this.tags('ciGroup4'); + loadTestFile(require.resolve('./task_manager_integration')); + }); +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js new file mode 100644 index 000000000000..c6c531f83cc6 --- /dev/null +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import expect from 'expect.js'; +import url from 'url'; +import supertestAsPromised from 'supertest-as-promised'; + +export default function ({ getService }) { + const es = getService('es'); + const retry = getService('retry'); + const config = getService('config'); + const testHistoryIndex = '.task_manager_test_result'; + const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); + + describe('scheduling and running tasks', () => { + beforeEach(() => supertest.delete('/api/sample_tasks') + .set('kbn-xsrf', 'xxx') + .expect(200)); + + beforeEach(async () => + (await es.indices.exists({ index: testHistoryIndex })) && es.deleteByQuery({ + index: testHistoryIndex, + q: 'type:task', + refresh: true, + })); + + function currentTasks() { + return supertest.get('/api/sample_tasks') + .expect(200) + .then((response) => response.body); + } + + function historyDocs() { + return es.search({ + index: testHistoryIndex, + type: '_doc', + q: 'type:task', + }).then(result => result.hits.hits); + } + + function scheduleTask(task) { + return supertest.post('/api/sample_tasks') + .set('kbn-xsrf', 'xxx') + .send(task) + .expect(200) + .then((response) => response.body); + } + + it('should support middleware', async () => { + const historyItem = _.random(1, 100); + + await scheduleTask({ + taskType: 'sampleTask', + interval: '30m', + params: { historyItem }, + }); + + await retry.try(async () => { + expect((await historyDocs()).length).to.eql(1); + + const [task] = (await currentTasks()).docs; + + expect(task.attempts).to.eql(0); + expect(task.state.count).to.eql(1); + + expect(task.params).to.eql({ + superFly: 'My middleware param!', + originalParams: { historyItem }, + }); + }); + }); + + it('should remove non-recurring tasks after they complete', async () => { + await scheduleTask({ + taskType: 'sampleTask', + params: { }, + }); + + await retry.try(async () => { + const history = await historyDocs(); + expect(history.length).to.eql(1); + expect((await currentTasks()).docs).to.eql([]); + }); + }); + + it('should use a given ID as the task document ID', async () => { + const result = await scheduleTask({ + id: 'test-task-for-sample-task-plugin-to-test-task-manager', + taskType: 'sampleTask', + params: { }, + }); + + expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager'); + }); + + it('should reschedule if task errors', async () => { + const task = await scheduleTask({ + taskType: 'sampleTask', + params: { failWith: 'Dangit!!!!!' }, + }); + + await retry.try(async () => { + const [scheduledTask] = (await currentTasks()).docs; + expect(scheduledTask.id).to.eql(task.id); + expect(scheduledTask.attempts).to.be.greaterThan(0); + expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt)); + }); + }); + + it('should reschedule if task returns runAt', async () => { + const nextRunMilliseconds = _.random(60000, 200000); + const count = _.random(1, 20); + + const originalTask = await scheduleTask({ + taskType: 'sampleTask', + params: { nextRunMilliseconds }, + state: { count }, + }); + + await retry.try(async () => { + expect((await historyDocs()).length).to.eql(1); + + const [task] = (await currentTasks()).docs; + expect(task.attempts).to.eql(0); + expect(task.state.count).to.eql(count + 1); + + expectReschedule(originalTask, task, nextRunMilliseconds); + }); + }); + + it('should reschedule if task has an interval', async () => { + const interval = _.random(5, 200); + const intervalMilliseconds = interval * 60000; + + const originalTask = await scheduleTask({ + taskType: 'sampleTask', + interval: `${interval}m`, + params: { }, + }); + + await retry.try(async () => { + expect((await historyDocs()).length).to.eql(1); + + const [task] = (await currentTasks()).docs; + expect(task.attempts).to.eql(0); + expect(task.state.count).to.eql(1); + + expectReschedule(originalTask, task, intervalMilliseconds); + }); + }); + + async function expectReschedule(originalTask, currentTask, expectedDiff) { + const originalRunAt = Date.parse(originalTask.runAt); + const buffer = 10000; + expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer); + expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer); + } + }); +}