From 0507ac5da02aef0317f17ce3b859c5f741913fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Tue, 20 Apr 2021 19:11:25 -0400 Subject: [PATCH] Create task to cleanup action execution failures (#96971) * Initial commit * Add tests and support for concurrency * Ability to disable functionality, use bulk APIs * Fix type check * Fix jest tests * Cleanup * Cleanup pt2 * Add unit tests * Fix type check * Fixes * Update test failures * Split schedule between cleanup and idle * Add functional tests * Add one more test * Cleanup repeated code * Remove duplicate actions plugin requirement Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> --- .../actions/server/actions_client.test.ts | 6 + .../actions/server/actions_config.test.ts | 7 + .../cleanup_tasks.test.ts | 126 +++++++++++++ .../cleanup_tasks.ts | 109 ++++++++++++ .../cleanup_failed_executions/constants.ts | 9 + .../ensure_scheduled.test.ts | 55 ++++++ .../ensure_scheduled.ts | 34 ++++ .../find_and_cleanup_tasks.test.ts | 167 ++++++++++++++++++ .../find_and_cleanup_tasks.ts | 80 +++++++++ .../server/cleanup_failed_executions/index.ts | 9 + .../lib/bulk_delete.ts | 25 +++ .../extract_bulk_response_delete_failures.ts | 29 +++ ...get_raw_action_task_params_id_from_task.ts | 27 +++ .../cleanup_failed_executions/lib/index.ts | 10 ++ .../register_task_definition.test.ts | 71 ++++++++ .../register_task_definition.ts | 22 +++ .../task_runner.test.ts | 108 +++++++++++ .../cleanup_failed_executions/task_runner.ts | 45 +++++ x-pack/plugins/actions/server/config.test.ts | 12 ++ x-pack/plugins/actions/server/config.ts | 6 + x-pack/plugins/actions/server/lib/index.ts | 1 + .../server/lib/space_id_to_namespace.ts | 12 ++ x-pack/plugins/actions/server/plugin.test.ts | 20 ++- x-pack/plugins/actions/server/plugin.ts | 44 ++++- x-pack/plugins/task_manager/server/index.ts | 1 + x-pack/plugins/task_manager/server/mocks.ts | 1 + x-pack/plugins/task_manager/server/plugin.ts | 12 +- .../fixtures/plugins/alerts/server/plugin.ts | 2 + .../fixtures/plugins/alerts/server/routes.ts | 67 +++++-- .../tests/alerting/rbac_legacy.ts | 12 +- .../spaces_only/tests/actions/enqueue.ts | 142 +++++++++++++++ .../spaces_only/tests/actions/index.ts | 1 + 32 files changed, 1247 insertions(+), 25 deletions(-) create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.test.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/constants.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.test.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.test.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/index.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/lib/bulk_delete.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/lib/extract_bulk_response_delete_failures.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/lib/get_raw_action_task_params_id_from_task.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/lib/index.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.test.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.test.ts create mode 100644 x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.ts create mode 100644 x-pack/plugins/actions/server/lib/space_id_to_namespace.ts create mode 100644 x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts diff --git a/x-pack/plugins/actions/server/actions_client.test.ts b/x-pack/plugins/actions/server/actions_client.test.ts index ae7faca1465c..9b22e31c05e8 100644 --- a/x-pack/plugins/actions/server/actions_client.test.ts +++ b/x-pack/plugins/actions/server/actions_client.test.ts @@ -413,6 +413,12 @@ describe('create()', () => { proxyOnlyHosts: undefined, maxResponseContentLength: new ByteSizeValue(1000000), responseTimeout: moment.duration('60s'), + cleanupFailedExecutionsTask: { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }, }); const localActionTypeRegistryParams = { diff --git a/x-pack/plugins/actions/server/actions_config.test.ts b/x-pack/plugins/actions/server/actions_config.test.ts index 1b9de0162f34..70c8b0e8185d 100644 --- a/x-pack/plugins/actions/server/actions_config.test.ts +++ b/x-pack/plugins/actions/server/actions_config.test.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { schema } from '@kbn/config-schema'; import { ByteSizeValue } from '@kbn/config-schema'; import { ActionsConfig } from './config'; import { @@ -24,6 +25,12 @@ const defaultActionsConfig: ActionsConfig = { rejectUnauthorized: true, maxResponseContentLength: new ByteSizeValue(1000000), responseTimeout: moment.duration(60000), + cleanupFailedExecutionsTask: { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }, }; describe('ensureUriAllowed', () => { diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.test.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.test.ts new file mode 100644 index 000000000000..07c09a2dfef7 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.test.ts @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObjectsFindResult, SavedObjectsSerializer } from 'kibana/server'; +import { loggingSystemMock, elasticsearchServiceMock } from '../../../../../src/core/server/mocks'; +import { spacesMock } from '../../../spaces/server/mocks'; +import { CleanupTasksOpts, cleanupTasks } from './cleanup_tasks'; +import { TaskInstance } from '../../../task_manager/server'; +import { ApiResponse, estypes } from '@elastic/elasticsearch'; + +describe('cleanupTasks', () => { + const logger = loggingSystemMock.create().get(); + const esClient = elasticsearchServiceMock.createElasticsearchClient(); + const spaces = spacesMock.createStart(); + const savedObjectsSerializer = ({ + generateRawId: jest + .fn() + .mockImplementation((namespace: string | undefined, type: string, id: string) => { + const namespacePrefix = namespace ? `${namespace}:` : ''; + return `${namespacePrefix}${type}:${id}`; + }), + } as unknown) as SavedObjectsSerializer; + + const cleanupTasksOpts: CleanupTasksOpts = { + logger, + esClient, + spaces, + savedObjectsSerializer, + kibanaIndex: '.kibana', + taskManagerIndex: '.kibana_task_manager', + tasks: [], + }; + + const taskSO: SavedObjectsFindResult = { + id: '123', + type: 'task', + references: [], + score: 0, + attributes: { + id: '123', + taskType: 'foo', + scheduledAt: new Date(), + state: {}, + runAt: new Date(), + startedAt: new Date(), + retryAt: new Date(), + ownerId: '234', + params: { spaceId: undefined, actionTaskParamsId: '123' }, + schedule: { interval: '5m' }, + }, + }; + + beforeEach(() => { + esClient.bulk.mockReset(); + }); + + it('should skip cleanup when there are no tasks to cleanup', async () => { + const result = await cleanupTasks(cleanupTasksOpts); + expect(result).toEqual({ + success: true, + successCount: 0, + failureCount: 0, + }); + expect(esClient.bulk).not.toHaveBeenCalled(); + }); + + it('should delete action_task_params and task objects', async () => { + esClient.bulk.mockResolvedValue(({ + body: { items: [], errors: false, took: 1 }, + } as unknown) as ApiResponse); + const result = await cleanupTasks({ + ...cleanupTasksOpts, + tasks: [taskSO], + }); + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }], + }); + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }], + }); + expect(result).toEqual({ + success: true, + successCount: 1, + failureCount: 0, + }); + }); + + it('should not delete the task if the action_task_params failed to delete', async () => { + esClient.bulk.mockResolvedValue(({ + body: { + items: [ + { + delete: { + _index: cleanupTasksOpts.kibanaIndex, + _id: 'action_task_params:123', + status: 500, + result: 'Failure', + error: true, + }, + }, + ], + errors: true, + took: 1, + }, + } as unknown) as ApiResponse); + const result = await cleanupTasks({ + ...cleanupTasksOpts, + tasks: [taskSO], + }); + expect(esClient.bulk).toHaveBeenCalledWith({ + body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }], + }); + expect(esClient.bulk).not.toHaveBeenCalledWith({ + body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }], + }); + expect(result).toEqual({ + success: false, + successCount: 0, + failureCount: 1, + }); + }); +}); diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.ts new file mode 100644 index 000000000000..3009bfe1a277 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/cleanup_tasks.ts @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + Logger, + ElasticsearchClient, + SavedObjectsFindResult, + SavedObjectsSerializer, +} from 'kibana/server'; +import { TaskInstance } from '../../../task_manager/server'; +import { SpacesPluginStart } from '../../../spaces/server'; +import { + bulkDelete, + extractBulkResponseDeleteFailures, + getRawActionTaskParamsIdFromTask, +} from './lib'; + +export interface CleanupTasksOpts { + logger: Logger; + esClient: ElasticsearchClient; + tasks: Array>; + spaces?: SpacesPluginStart; + savedObjectsSerializer: SavedObjectsSerializer; + kibanaIndex: string; + taskManagerIndex: string; +} + +export interface CleanupTasksResult { + success: boolean; + successCount: number; + failureCount: number; +} + +/** + * Cleanup tasks + * + * This function receives action execution tasks that are in a failed state, removes + * the linked "action_task_params" object first and then if successful, the task manager's task. + */ +export async function cleanupTasks({ + logger, + esClient, + tasks, + spaces, + savedObjectsSerializer, + kibanaIndex, + taskManagerIndex, +}: CleanupTasksOpts): Promise { + const deserializedTasks = tasks.map((task) => ({ + ...task, + attributes: { + ...task.attributes, + params: + typeof task.attributes.params === 'string' + ? JSON.parse(task.attributes.params) + : task.attributes.params || {}, + }, + })); + + // Remove accumulated action task params + const actionTaskParamIdsToDelete = deserializedTasks.map((task) => + getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer }) + ); + const actionTaskParamBulkDeleteResult = await bulkDelete( + esClient, + kibanaIndex, + actionTaskParamIdsToDelete + ); + const failedActionTaskParams = actionTaskParamBulkDeleteResult + ? extractBulkResponseDeleteFailures(actionTaskParamBulkDeleteResult) + : []; + if (failedActionTaskParams?.length) { + logger.debug( + `Failed to delete the following action_task_params [${JSON.stringify( + failedActionTaskParams + )}]` + ); + } + + // Remove accumulated tasks + const taskIdsToDelete = deserializedTasks + .map((task) => { + const rawId = getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer }); + // Avoid removing tasks that failed to remove linked objects + if (failedActionTaskParams?.find((item) => item._id === rawId)) { + return null; + } + const rawTaskId = savedObjectsSerializer.generateRawId(undefined, 'task', task.id); + return rawTaskId; + }) + .filter((id) => !!id) as string[]; + const taskBulkDeleteResult = await bulkDelete(esClient, taskManagerIndex, taskIdsToDelete); + const failedTasks = taskBulkDeleteResult + ? extractBulkResponseDeleteFailures(taskBulkDeleteResult) + : []; + if (failedTasks?.length) { + logger.debug(`Failed to delete the following tasks [${JSON.stringify(failedTasks)}]`); + } + + return { + success: failedActionTaskParams?.length === 0 && failedTasks.length === 0, + successCount: tasks.length - failedActionTaskParams.length - failedTasks.length, + failureCount: failedActionTaskParams.length + failedTasks.length, + }; +} diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/constants.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/constants.ts new file mode 100644 index 000000000000..c8c1d6105586 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/constants.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const TASK_TYPE = 'cleanup_failed_action_executions'; +export const TASK_ID = `Actions-${TASK_TYPE}`; diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.test.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.test.ts new file mode 100644 index 000000000000..3c27a38e818e --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.test.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { schema } from '@kbn/config-schema'; +import { ActionsConfig } from '../config'; +import { ensureScheduled } from './ensure_scheduled'; +import { taskManagerMock } from '../../../task_manager/server/mocks'; +import { loggingSystemMock } from '../../../../../src/core/server/mocks'; + +describe('ensureScheduled', () => { + const logger = loggingSystemMock.create().get(); + const taskManager = taskManagerMock.createStart(); + + const config: ActionsConfig['cleanupFailedExecutionsTask'] = { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }; + + beforeEach(() => jest.resetAllMocks()); + + it(`should call task manager's ensureScheduled function with proper params`, async () => { + await ensureScheduled(taskManager, logger, config); + expect(taskManager.ensureScheduled).toHaveBeenCalledTimes(1); + expect(taskManager.ensureScheduled.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + Object { + "id": "Actions-cleanup_failed_action_executions", + "params": Object {}, + "schedule": Object { + "interval": "5m", + }, + "state": Object { + "runs": 0, + "total_cleaned_up": 0, + }, + "taskType": "cleanup_failed_action_executions", + }, + ] + `); + }); + + it('should log an error and not throw when ensureScheduled function throws', async () => { + taskManager.ensureScheduled.mockRejectedValue(new Error('Fail')); + await ensureScheduled(taskManager, logger, config); + expect(logger.error).toHaveBeenCalledWith( + 'Error scheduling Actions-cleanup_failed_action_executions, received Fail' + ); + }); +}); diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.ts new file mode 100644 index 000000000000..6dc1ce44982c --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/ensure_scheduled.ts @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger } from 'kibana/server'; +import { TASK_ID, TASK_TYPE } from './constants'; +import { ActionsConfig } from '../config'; +import { TaskManagerStartContract, asInterval } from '../../../task_manager/server'; + +export async function ensureScheduled( + taskManager: TaskManagerStartContract, + logger: Logger, + { cleanupInterval }: ActionsConfig['cleanupFailedExecutionsTask'] +) { + try { + await taskManager.ensureScheduled({ + id: TASK_ID, + taskType: TASK_TYPE, + schedule: { + interval: asInterval(cleanupInterval.asMilliseconds()), + }, + state: { + runs: 0, + total_cleaned_up: 0, + }, + params: {}, + }); + } catch (e) { + logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`); + } +} diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.test.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.test.ts new file mode 100644 index 000000000000..81c2a348bc09 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.test.ts @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreStart } from 'kibana/server'; +import { schema } from '@kbn/config-schema'; +import { ActionsConfig } from '../config'; +import { ActionsPluginsStart } from '../plugin'; +import { spacesMock } from '../../../spaces/server/mocks'; +import { esKuery } from '../../../../../src/plugins/data/server'; +import { + loggingSystemMock, + savedObjectsRepositoryMock, + savedObjectsServiceMock, + elasticsearchServiceMock, +} from '../../../../../src/core/server/mocks'; +import { actionTypeRegistryMock } from '../action_type_registry.mock'; +import { FindAndCleanupTasksOpts, findAndCleanupTasks } from './find_and_cleanup_tasks'; + +jest.mock('./cleanup_tasks', () => ({ + cleanupTasks: jest.fn(), +})); + +describe('findAndCleanupTasks', () => { + const logger = loggingSystemMock.create().get(); + const actionTypeRegistry = actionTypeRegistryMock.create(); + const savedObjectsRepository = savedObjectsRepositoryMock.create(); + const esStart = elasticsearchServiceMock.createStart(); + const spaces = spacesMock.createStart(); + const soService = savedObjectsServiceMock.createStartContract(); + const coreStartServices = (Promise.resolve([ + { + savedObjects: { + ...soService, + createInternalRepository: () => savedObjectsRepository, + }, + elasticsearch: esStart, + }, + { + spaces, + }, + {}, + ]) as unknown) as Promise<[CoreStart, ActionsPluginsStart, unknown]>; + + const config: ActionsConfig['cleanupFailedExecutionsTask'] = { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }; + + const findAndCleanupTasksOpts: FindAndCleanupTasksOpts = { + logger, + actionTypeRegistry, + coreStartServices, + config, + kibanaIndex: '.kibana', + taskManagerIndex: '.kibana_task_manager', + }; + + beforeEach(() => { + actionTypeRegistry.list.mockReturnValue([ + { + id: 'my-action-type', + name: 'My action type', + enabled: true, + enabledInConfig: true, + enabledInLicense: true, + minimumLicenseRequired: 'basic', + }, + ]); + jest.requireMock('./cleanup_tasks').cleanupTasks.mockResolvedValue({ + success: true, + successCount: 0, + failureCount: 0, + }); + savedObjectsRepository.find.mockResolvedValue({ + total: 0, + page: 1, + per_page: 10, + saved_objects: [], + }); + }); + + it('should call the find function with proper parameters', async () => { + await findAndCleanupTasks(findAndCleanupTasksOpts); + expect(savedObjectsRepository.find).toHaveBeenCalledWith({ + type: 'task', + filter: expect.any(Object), + page: 1, + perPage: config.pageSize, + sortField: 'runAt', + sortOrder: 'asc', + }); + expect(esKuery.toElasticsearchQuery(savedObjectsRepository.find.mock.calls[0][0].filter)) + .toMatchInlineSnapshot(` + Object { + "bool": Object { + "filter": Array [ + Object { + "bool": Object { + "minimum_should_match": 1, + "should": Array [ + Object { + "match": Object { + "task.attributes.status": "failed", + }, + }, + ], + }, + }, + Object { + "bool": Object { + "minimum_should_match": 1, + "should": Array [ + Object { + "match": Object { + "task.attributes.taskType": "actions:my-action-type", + }, + }, + ], + }, + }, + ], + }, + } + `); + }); + + it('should call the cleanupTasks function with proper parameters', async () => { + await findAndCleanupTasks(findAndCleanupTasksOpts); + expect(jest.requireMock('./cleanup_tasks').cleanupTasks).toHaveBeenCalledWith({ + logger: findAndCleanupTasksOpts.logger, + esClient: esStart.client.asInternalUser, + spaces, + kibanaIndex: findAndCleanupTasksOpts.kibanaIndex, + taskManagerIndex: findAndCleanupTasksOpts.taskManagerIndex, + savedObjectsSerializer: soService.createSerializer(), + tasks: [], + }); + }); + + it('should return the cleanup result', async () => { + const result = await findAndCleanupTasks(findAndCleanupTasksOpts); + expect(result).toEqual({ + success: true, + successCount: 0, + failureCount: 0, + remaining: 0, + }); + }); + + it('should log a message before cleaning up tasks', async () => { + await findAndCleanupTasks(findAndCleanupTasksOpts); + expect(logger.debug).toHaveBeenCalledWith('Removing 0 of 0 failed execution task(s)'); + }); + + it('should log a message after cleaning up tasks', async () => { + await findAndCleanupTasks(findAndCleanupTasksOpts); + expect(logger.debug).toHaveBeenCalledWith( + 'Finished cleanup of failed executions. [success=0, failures=0]' + ); + }); +}); diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.ts new file mode 100644 index 000000000000..0afb82a515b7 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/find_and_cleanup_tasks.ts @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger, CoreStart } from 'kibana/server'; +import { ActionsConfig } from '../config'; +import { ActionsPluginsStart } from '../plugin'; +import { ActionTypeRegistryContract } from '../types'; +import { cleanupTasks, CleanupTasksResult } from './cleanup_tasks'; +import { TaskInstance } from '../../../task_manager/server'; +import { nodeBuilder } from '../../../../../src/plugins/data/common'; + +export interface FindAndCleanupTasksOpts { + logger: Logger; + actionTypeRegistry: ActionTypeRegistryContract; + coreStartServices: Promise<[CoreStart, ActionsPluginsStart, unknown]>; + config: ActionsConfig['cleanupFailedExecutionsTask']; + kibanaIndex: string; + taskManagerIndex: string; +} + +export interface FindAndCleanupTasksResult extends CleanupTasksResult { + remaining: number; +} + +export async function findAndCleanupTasks({ + logger, + actionTypeRegistry, + coreStartServices, + config, + kibanaIndex, + taskManagerIndex, +}: FindAndCleanupTasksOpts): Promise { + logger.debug('Starting cleanup of failed executions'); + const [{ savedObjects, elasticsearch }, { spaces }] = await coreStartServices; + const esClient = elasticsearch.client.asInternalUser; + const savedObjectsClient = savedObjects.createInternalRepository(['task']); + const savedObjectsSerializer = savedObjects.createSerializer(); + + const result = await savedObjectsClient.find({ + type: 'task', + filter: nodeBuilder.and([ + nodeBuilder.is('task.attributes.status', 'failed'), + nodeBuilder.or( + actionTypeRegistry + .list() + .map((actionType) => + nodeBuilder.is('task.attributes.taskType', `actions:${actionType.id}`) + ) + ), + ]), + page: 1, + perPage: config.pageSize, + sortField: 'runAt', + sortOrder: 'asc', + }); + + logger.debug( + `Removing ${result.saved_objects.length} of ${result.total} failed execution task(s)` + ); + const cleanupResult = await cleanupTasks({ + logger, + esClient, + spaces, + kibanaIndex, + taskManagerIndex, + savedObjectsSerializer, + tasks: result.saved_objects, + }); + logger.debug( + `Finished cleanup of failed executions. [success=${cleanupResult.successCount}, failures=${cleanupResult.failureCount}]` + ); + return { + ...cleanupResult, + remaining: result.total - cleanupResult.successCount, + }; +} diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/index.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/index.ts new file mode 100644 index 000000000000..e8e93caed4f8 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { ensureScheduled as ensureCleanupFailedExecutionsTaskScheduled } from './ensure_scheduled'; +export { registerTaskDefinition as registerCleanupFailedExecutionsTaskDefinition } from './register_task_definition'; diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/lib/bulk_delete.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/bulk_delete.ts new file mode 100644 index 000000000000..2e0037d01943 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/bulk_delete.ts @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from 'kibana/server'; +import { ApiResponse, estypes } from '@elastic/elasticsearch'; + +export async function bulkDelete( + esClient: ElasticsearchClient, + index: string, + ids: string[] +): Promise | undefined> { + if (ids.length === 0) { + return; + } + + return await esClient.bulk({ + body: ids.map((id) => ({ + delete: { _index: index, _id: id }, + })), + }); +} diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/lib/extract_bulk_response_delete_failures.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/extract_bulk_response_delete_failures.ts new file mode 100644 index 000000000000..90418c9763a4 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/extract_bulk_response_delete_failures.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ApiResponse, estypes } from '@elastic/elasticsearch'; + +type ResponseFailures = Array>; + +export function extractBulkResponseDeleteFailures( + response: ApiResponse +): ResponseFailures { + const result: ResponseFailures = []; + for (const item of response.body.items) { + if (!item.delete || !item.delete.error) { + continue; + } + + result.push({ + _id: item.delete._id, + status: item.delete.status, + result: item.delete.result, + }); + } + + return result; +} diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/lib/get_raw_action_task_params_id_from_task.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/get_raw_action_task_params_id_from_task.ts new file mode 100644 index 000000000000..7a9b664387ff --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/get_raw_action_task_params_id_from_task.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SavedObjectsFindResult, SavedObjectsSerializer } from 'kibana/server'; +import { spaceIdToNamespace } from '../../lib'; +import { TaskInstance } from '../../../../task_manager/server'; +import { SpacesPluginStart } from '../../../../spaces/server'; + +interface GetRawActionTaskParamsIdFromTaskOpts { + task: SavedObjectsFindResult; + spaces?: SpacesPluginStart; + savedObjectsSerializer: SavedObjectsSerializer; +} + +export function getRawActionTaskParamsIdFromTask({ + task, + spaces, + savedObjectsSerializer, +}: GetRawActionTaskParamsIdFromTaskOpts) { + const { spaceId, actionTaskParamsId } = task.attributes.params; + const namespace = spaceIdToNamespace(spaces, spaceId); + return savedObjectsSerializer.generateRawId(namespace, 'action_task_params', actionTaskParamsId); +} diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/lib/index.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/index.ts new file mode 100644 index 000000000000..d332c2e1ef06 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/lib/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { extractBulkResponseDeleteFailures } from './extract_bulk_response_delete_failures'; +export { bulkDelete } from './bulk_delete'; +export { getRawActionTaskParamsIdFromTask } from './get_raw_action_task_params_id_from_task'; diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.test.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.test.ts new file mode 100644 index 000000000000..a12ab16facdc --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.test.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreStart } from 'kibana/server'; +import { schema } from '@kbn/config-schema'; +import { ActionsConfig } from '../config'; +import { ActionsPluginsStart } from '../plugin'; +import { registerTaskDefinition } from './register_task_definition'; +import { taskManagerMock } from '../../../task_manager/server/mocks'; +import { loggingSystemMock, coreMock } from '../../../../../src/core/server/mocks'; +import { actionTypeRegistryMock } from '../action_type_registry.mock'; +import { TaskRunnerOpts } from './task_runner'; + +jest.mock('./task_runner', () => ({ taskRunner: jest.fn() })); + +describe('registerTaskDefinition', () => { + const logger = loggingSystemMock.create().get(); + const taskManager = taskManagerMock.createSetup(); + const actionTypeRegistry = actionTypeRegistryMock.create(); + const coreStartServices = coreMock.createSetup().getStartServices() as Promise< + [CoreStart, ActionsPluginsStart, unknown] + >; + + const config: ActionsConfig['cleanupFailedExecutionsTask'] = { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }; + + const taskRunnerOpts: TaskRunnerOpts = { + logger, + coreStartServices, + actionTypeRegistry, + config, + kibanaIndex: '.kibana', + taskManagerIndex: '.kibana_task_manager', + }; + + beforeEach(() => { + jest.resetAllMocks(); + jest.requireMock('./task_runner').taskRunner.mockReturnValue(jest.fn()); + }); + + it('should call registerTaskDefinitions with proper parameters', () => { + registerTaskDefinition(taskManager, taskRunnerOpts); + expect(taskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1); + expect(taskManager.registerTaskDefinitions.mock.calls).toMatchInlineSnapshot(` + Array [ + Array [ + Object { + "cleanup_failed_action_executions": Object { + "createTaskRunner": [MockFunction], + "title": "Cleanup failed action executions", + }, + }, + ], + ] + `); + }); + + it('should call taskRunner with proper parameters', () => { + registerTaskDefinition(taskManager, taskRunnerOpts); + const { taskRunner } = jest.requireMock('./task_runner'); + expect(taskRunner).toHaveBeenCalledWith(taskRunnerOpts); + }); +}); diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.ts new file mode 100644 index 000000000000..c9a6b486a646 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/register_task_definition.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TASK_TYPE } from './constants'; +import { taskRunner, TaskRunnerOpts } from './task_runner'; +import { TaskManagerSetupContract } from '../../../task_manager/server'; + +export function registerTaskDefinition( + taskManager: TaskManagerSetupContract, + taskRunnerOpts: TaskRunnerOpts +) { + taskManager.registerTaskDefinitions({ + [TASK_TYPE]: { + title: 'Cleanup failed action executions', + createTaskRunner: taskRunner(taskRunnerOpts), + }, + }); +} diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.test.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.test.ts new file mode 100644 index 000000000000..d465e532b028 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.test.ts @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreStart } from 'kibana/server'; +import { schema } from '@kbn/config-schema'; +import { ActionsConfig } from '../config'; +import { ActionsPluginsStart } from '../plugin'; +import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server'; +import { loggingSystemMock, coreMock } from '../../../../../src/core/server/mocks'; +import { actionTypeRegistryMock } from '../action_type_registry.mock'; +import { taskRunner, TaskRunnerOpts } from './task_runner'; + +jest.mock('./find_and_cleanup_tasks', () => ({ + findAndCleanupTasks: jest.fn(), +})); + +describe('taskRunner', () => { + const logger = loggingSystemMock.create().get(); + const actionTypeRegistry = actionTypeRegistryMock.create(); + const coreStartServices = coreMock.createSetup().getStartServices() as Promise< + [CoreStart, ActionsPluginsStart, unknown] + >; + + const config: ActionsConfig['cleanupFailedExecutionsTask'] = { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }; + + const taskRunnerOpts: TaskRunnerOpts = { + logger, + coreStartServices, + actionTypeRegistry, + config, + kibanaIndex: '.kibana', + taskManagerIndex: '.kibana_task_manager', + }; + + const taskInstance: ConcreteTaskInstance = { + id: '123', + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Running, + state: { runs: 0, total_cleaned_up: 0 }, + runAt: new Date(), + startedAt: new Date(), + retryAt: new Date(), + ownerId: '234', + taskType: 'foo', + params: {}, + }; + + beforeEach(() => { + jest.resetAllMocks(); + jest.requireMock('./find_and_cleanup_tasks').findAndCleanupTasks.mockResolvedValue({ + success: true, + successCount: 1, + failureCount: 1, + remaining: 0, + }); + }); + + describe('run', () => { + it('should call findAndCleanupTasks with proper parameters', async () => { + const runner = taskRunner(taskRunnerOpts)({ taskInstance }); + await runner.run(); + expect(jest.requireMock('./find_and_cleanup_tasks').findAndCleanupTasks).toHaveBeenCalledWith( + taskRunnerOpts + ); + }); + + it('should update state to reflect cleanup result', async () => { + const runner = taskRunner(taskRunnerOpts)({ taskInstance }); + const { state } = await runner.run(); + expect(state).toEqual({ + runs: 1, + total_cleaned_up: 1, + }); + }); + + it('should return idle schedule when no remaining tasks to cleanup', async () => { + const runner = taskRunner(taskRunnerOpts)({ taskInstance }); + const { schedule } = await runner.run(); + expect(schedule).toEqual({ + interval: '60m', + }); + }); + + it('should return cleanup schedule when there are some remaining tasks to cleanup', async () => { + jest.requireMock('./find_and_cleanup_tasks').findAndCleanupTasks.mockResolvedValue({ + success: true, + successCount: 1, + failureCount: 1, + remaining: 1, + }); + const runner = taskRunner(taskRunnerOpts)({ taskInstance }); + const { schedule } = await runner.run(); + expect(schedule).toEqual({ + interval: '5m', + }); + }); + }); +}); diff --git a/x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.ts b/x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.ts new file mode 100644 index 000000000000..38eb672238c7 --- /dev/null +++ b/x-pack/plugins/actions/server/cleanup_failed_executions/task_runner.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger, CoreStart } from 'kibana/server'; +import { ActionsConfig } from '../config'; +import { RunContext, asInterval } from '../../../task_manager/server'; +import { ActionsPluginsStart } from '../plugin'; +import { ActionTypeRegistryContract } from '../types'; +import { findAndCleanupTasks } from './find_and_cleanup_tasks'; + +export interface TaskRunnerOpts { + logger: Logger; + actionTypeRegistry: ActionTypeRegistryContract; + coreStartServices: Promise<[CoreStart, ActionsPluginsStart, unknown]>; + config: ActionsConfig['cleanupFailedExecutionsTask']; + kibanaIndex: string; + taskManagerIndex: string; +} + +export function taskRunner(opts: TaskRunnerOpts) { + return ({ taskInstance }: RunContext) => { + const { state } = taskInstance; + return { + async run() { + const cleanupResult = await findAndCleanupTasks(opts); + return { + state: { + runs: state.runs + 1, + total_cleaned_up: state.total_cleaned_up + cleanupResult.successCount, + }, + schedule: { + interval: + cleanupResult.remaining > 0 + ? asInterval(opts.config.cleanupInterval.asMilliseconds()) + : asInterval(opts.config.idleInterval.asMilliseconds()), + }, + }; + }, + }; + }; +} diff --git a/x-pack/plugins/actions/server/config.test.ts b/x-pack/plugins/actions/server/config.test.ts index ad598bffe04b..092b5d2cce58 100644 --- a/x-pack/plugins/actions/server/config.test.ts +++ b/x-pack/plugins/actions/server/config.test.ts @@ -23,6 +23,12 @@ describe('config validation', () => { "allowedHosts": Array [ "*", ], + "cleanupFailedExecutionsTask": Object { + "cleanupInterval": "PT5M", + "enabled": true, + "idleInterval": "PT1H", + "pageSize": 100, + }, "enabled": true, "enabledActionTypes": Array [ "*", @@ -58,6 +64,12 @@ describe('config validation', () => { "allowedHosts": Array [ "*", ], + "cleanupFailedExecutionsTask": Object { + "cleanupInterval": "PT5M", + "enabled": true, + "idleInterval": "PT1H", + "pageSize": 100, + }, "enabled": true, "enabledActionTypes": Array [ "*", diff --git a/x-pack/plugins/actions/server/config.ts b/x-pack/plugins/actions/server/config.ts index 36948478816c..7225c54d5759 100644 --- a/x-pack/plugins/actions/server/config.ts +++ b/x-pack/plugins/actions/server/config.ts @@ -50,6 +50,12 @@ export const configSchema = schema.object({ rejectUnauthorized: schema.boolean({ defaultValue: true }), maxResponseContentLength: schema.byteSize({ defaultValue: '1mb' }), responseTimeout: schema.duration({ defaultValue: '60s' }), + cleanupFailedExecutionsTask: schema.object({ + enabled: schema.boolean({ defaultValue: true }), + cleanupInterval: schema.duration({ defaultValue: '5m' }), + idleInterval: schema.duration({ defaultValue: '1h' }), + pageSize: schema.number({ defaultValue: 100 }), + }), }); export type ActionsConfig = TypeOf; diff --git a/x-pack/plugins/actions/server/lib/index.ts b/x-pack/plugins/actions/server/lib/index.ts index e900b81bb65a..fba47f9a0f99 100644 --- a/x-pack/plugins/actions/server/lib/index.ts +++ b/x-pack/plugins/actions/server/lib/index.ts @@ -12,6 +12,7 @@ export { ActionExecutor, ActionExecutorContract } from './action_executor'; export { ILicenseState, LicenseState } from './license_state'; export { verifyApiAccess } from './verify_api_access'; export { getActionTypeFeatureUsageName } from './get_action_type_feature_usage_name'; +export { spaceIdToNamespace } from './space_id_to_namespace'; export { ActionTypeDisabledError, ActionTypeDisabledReason, diff --git a/x-pack/plugins/actions/server/lib/space_id_to_namespace.ts b/x-pack/plugins/actions/server/lib/space_id_to_namespace.ts new file mode 100644 index 000000000000..826c4e44b2b8 --- /dev/null +++ b/x-pack/plugins/actions/server/lib/space_id_to_namespace.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SpacesPluginStart } from '../../../spaces/server'; + +export function spaceIdToNamespace(spaces?: SpacesPluginStart, spaceId?: string) { + return spaces && spaceId ? spaces.spacesService.spaceIdToNamespace(spaceId) : undefined; +} diff --git a/x-pack/plugins/actions/server/plugin.test.ts b/x-pack/plugins/actions/server/plugin.test.ts index 3485891a0126..9464421d5f0f 100644 --- a/x-pack/plugins/actions/server/plugin.test.ts +++ b/x-pack/plugins/actions/server/plugin.test.ts @@ -6,7 +6,7 @@ */ import moment from 'moment'; -import { ByteSizeValue } from '@kbn/config-schema'; +import { schema, ByteSizeValue } from '@kbn/config-schema'; import { PluginInitializerContext, RequestHandlerContext } from '../../../../src/core/server'; import { coreMock, httpServerMock } from '../../../../src/core/server/mocks'; import { usageCollectionPluginMock } from '../../../../src/plugins/usage_collection/server/mocks'; @@ -43,6 +43,12 @@ describe('Actions Plugin', () => { rejectUnauthorized: true, maxResponseContentLength: new ByteSizeValue(1000000), responseTimeout: moment.duration(60000), + cleanupFailedExecutionsTask: { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }, }); plugin = new ActionsPlugin(context); coreSetup = coreMock.createSetup(); @@ -207,6 +213,12 @@ describe('Actions Plugin', () => { rejectUnauthorized: true, maxResponseContentLength: new ByteSizeValue(1000000), responseTimeout: moment.duration(60000), + cleanupFailedExecutionsTask: { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }, }); plugin = new ActionsPlugin(context); coreSetup = coreMock.createSetup(); @@ -274,6 +286,12 @@ describe('Actions Plugin', () => { rejectUnauthorized: true, maxResponseContentLength: new ByteSizeValue(1000000), responseTimeout: moment.duration('60s'), + cleanupFailedExecutionsTask: { + enabled: true, + cleanupInterval: schema.duration().validate('5m'), + idleInterval: schema.duration().validate('1h'), + pageSize: 100, + }, ...overrides, }; } diff --git a/x-pack/plugins/actions/server/plugin.ts b/x-pack/plugins/actions/server/plugin.ts index 1d941617789b..106e41259e69 100644 --- a/x-pack/plugins/actions/server/plugin.ts +++ b/x-pack/plugins/actions/server/plugin.ts @@ -26,17 +26,27 @@ import { } from '../../encrypted_saved_objects/server'; import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server'; import { LicensingPluginSetup, LicensingPluginStart } from '../../licensing/server'; -import { SpacesPluginStart } from '../../spaces/server'; +import { SpacesPluginStart, SpacesPluginSetup } from '../../spaces/server'; import { PluginSetupContract as FeaturesPluginSetup } from '../../features/server'; import { SecurityPluginSetup } from '../../security/server'; +import { + ensureCleanupFailedExecutionsTaskScheduled, + registerCleanupFailedExecutionsTaskDefinition, +} from './cleanup_failed_executions'; import { ActionsConfig, getValidatedConfig } from './config'; -import { ActionExecutor, TaskRunnerFactory, LicenseState, ILicenseState } from './lib'; import { ActionsClient } from './actions_client'; import { ActionTypeRegistry } from './action_type_registry'; import { createExecutionEnqueuerFunction } from './create_execute_function'; import { registerBuiltInActionTypes } from './builtin_action_types'; import { registerActionsUsageCollector } from './usage'; +import { + ActionExecutor, + TaskRunnerFactory, + LicenseState, + ILicenseState, + spaceIdToNamespace, +} from './lib'; import { Services, ActionType, @@ -115,6 +125,7 @@ export interface ActionsPluginsSetup { usageCollection?: UsageCollectionSetup; security?: SecurityPluginSetup; features: FeaturesPluginSetup; + spaces?: SpacesPluginSetup; } export interface ActionsPluginsStart { encryptedSavedObjects: EncryptedSavedObjectsPluginStart; @@ -245,6 +256,18 @@ export class ActionsPlugin implements Plugin(), this.licenseState); + // Cleanup failed execution task definition + if (this.actionsConfig.cleanupFailedExecutionsTask.enabled) { + registerCleanupFailedExecutionsTaskDefinition(plugins.taskManager, { + actionTypeRegistry, + logger: this.logger, + coreStartServices: core.getStartServices(), + config: this.actionsConfig.cleanupFailedExecutionsTask, + kibanaIndex: this.kibanaIndexConfig.kibana.index, + taskManagerIndex: plugins.taskManager.index, + }); + } + return { registerType: < Config extends ActionTypeConfig = ActionTypeConfig, @@ -352,18 +375,12 @@ export class ActionsPlugin implements Plugin { - return plugins.spaces && spaceId - ? plugins.spaces.spacesService.spaceIdToNamespace(spaceId) - : undefined; - }; - taskRunnerFactory!.initialize({ logger, actionTypeRegistry: actionTypeRegistry!, encryptedSavedObjectsClient, basePathService: core.http.basePath, - spaceIdToNamespace, + spaceIdToNamespace: (spaceId?: string) => spaceIdToNamespace(plugins.spaces, spaceId), getUnsecuredSavedObjectsClient: (request: KibanaRequest) => this.getUnsecuredSavedObjectsClient(core.savedObjects, request), }); @@ -377,6 +394,15 @@ export class ActionsPlugin implements Plugin { return this.actionTypeRegistry!.isActionTypeEnabled(id, options); diff --git a/x-pack/plugins/task_manager/server/index.ts b/x-pack/plugins/task_manager/server/index.ts index a34f5a87fddb..9d2f8f4189ae 100644 --- a/x-pack/plugins/task_manager/server/index.ts +++ b/x-pack/plugins/task_manager/server/index.ts @@ -20,6 +20,7 @@ export { RunContext, } from './task'; +export { asInterval } from './lib/intervals'; export { isUnrecoverableError, throwUnrecoverableError } from './task_running'; export { diff --git a/x-pack/plugins/task_manager/server/mocks.ts b/x-pack/plugins/task_manager/server/mocks.ts index 3a45cefd9bda..c713e1e98a1e 100644 --- a/x-pack/plugins/task_manager/server/mocks.ts +++ b/x-pack/plugins/task_manager/server/mocks.ts @@ -9,6 +9,7 @@ import { TaskManagerSetupContract, TaskManagerStartContract } from './plugin'; const createSetupMock = () => { const mock: jest.Mocked = { + index: '.kibana_task_manager', addMiddleware: jest.fn(), registerTaskDefinitions: jest.fn(), }; diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 507a021214a9..51199da26ee7 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -28,10 +28,13 @@ import { TaskScheduling } from './task_scheduling'; import { healthRoute } from './routes'; import { createMonitoringStats, MonitoringStats } from './monitoring'; -export type TaskManagerSetupContract = { addMiddleware: (middleware: Middleware) => void } & Pick< - TaskTypeDictionary, - 'registerTaskDefinitions' ->; +export type TaskManagerSetupContract = { + /** + * @deprecated + */ + index: string; + addMiddleware: (middleware: Middleware) => void; +} & Pick; export type TaskManagerStartContract = Pick< TaskScheduling, @@ -95,6 +98,7 @@ export class TaskManagerPlugin }); return { + index: this.config.index, addMiddleware: (middleware: Middleware) => { this.assertStillInSetup('add Middleware'); this.middleware = addMiddlewareToChain(this.middleware, middleware); diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/plugin.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/plugin.ts index bf5d05ee4624..9a7cd8d333b4 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/plugin.ts @@ -15,6 +15,7 @@ import { defineActionTypes } from './action_types'; import { defineRoutes } from './routes'; import { SpacesPluginStart } from '../../../../../../../plugins/spaces/server'; import { SecurityPluginStart } from '../../../../../../../plugins/security/server'; +import { PluginStartContract as ActionsPluginStart } from '../../../../../../../plugins/actions/server'; export interface FixtureSetupDeps { features: FeaturesPluginSetup; @@ -26,6 +27,7 @@ export interface FixtureStartDeps { encryptedSavedObjects: EncryptedSavedObjectsPluginStart; security?: SecurityPluginStart; spaces?: SpacesPluginStart; + actions: ActionsPluginStart; } export class FixturePlugin implements Plugin { diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/routes.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/routes.ts index 5dc607bdbb69..091034bd1df7 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/routes.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/routes.ts @@ -5,6 +5,7 @@ * 2.0. */ +import uuid from 'uuid'; import { CoreSetup, RequestHandlerContext, @@ -174,10 +175,10 @@ export function defineRoutes(core: CoreSetup, { logger }: { lo router.put( { - path: '/api/alerts_fixture/{id}/reschedule_task', + path: '/api/alerts_fixture/{taskId}/reschedule_task', validate: { params: schema.object({ - id: schema.string(), + taskId: schema.string(), }), body: schema.object({ runAt: schema.string(), @@ -189,23 +190,20 @@ export function defineRoutes(core: CoreSetup, { logger }: { lo req: KibanaRequest, res: KibanaResponseFactory ): Promise> => { - const { id } = req.params; + const { taskId } = req.params; const { runAt } = req.body; const [{ savedObjects }] = await core.getStartServices(); const savedObjectsWithTasksAndAlerts = await savedObjects.getScopedClient(req, { includedHiddenTypes: ['task', 'alert'], }); - const alert = await savedObjectsWithTasksAndAlerts.get('alert', id); const result = await retryIfConflicts( logger, - `/api/alerts_fixture/${id}/reschedule_task`, + `/api/alerts_fixture/${taskId}/reschedule_task`, async () => { - return await savedObjectsWithTasksAndAlerts.update( - 'task', - alert.attributes.scheduledTaskId!, - { runAt } - ); + return await savedObjectsWithTasksAndAlerts.update('task', taskId, { + runAt, + }); } ); return res.ok({ body: result }); @@ -278,4 +276,53 @@ export function defineRoutes(core: CoreSetup, { logger }: { lo } } ); + + router.post( + { + path: '/api/alerts_fixture/{id}/enqueue_action', + validate: { + params: schema.object({ + id: schema.string(), + }), + body: schema.object({ + params: schema.recordOf(schema.string(), schema.any()), + }), + }, + }, + async ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> => { + try { + const [, { actions, security, spaces }] = await core.getStartServices(); + const actionsClient = await actions.getActionsClientWithRequest(req); + + const createAPIKeyResult = + security && + (await security.authc.apiKeys.grantAsInternalUser(req, { + name: `alerts_fixture:enqueue_action:${uuid.v4()}`, + role_descriptors: {}, + })); + + await actionsClient.enqueueExecution({ + id: req.params.id, + spaceId: spaces ? spaces.spacesService.getSpaceId(req) : 'default', + apiKey: createAPIKeyResult + ? Buffer.from(`${createAPIKeyResult.id}:${createAPIKeyResult.api_key}`).toString( + 'base64' + ) + : null, + params: req.body.params, + source: { + type: 'HTTP_REQUEST' as any, + source: req, + }, + }); + return res.noContent(); + } catch (err) { + return res.badRequest({ body: err }); + } + } + ); } diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/rbac_legacy.ts b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/rbac_legacy.ts index fb32be12500c..53ea2b845af1 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/rbac_legacy.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/rbac_legacy.ts @@ -177,12 +177,22 @@ export default function alertTests({ getService }: FtrProviderContext) { 'pre-7.10.0' ); + // Get scheduled task id + const getResponse = await supertestWithoutAuth + .get(`${getUrlPrefix(space.id)}/api/alerting/rule/${alertId}`) + .auth(user.username, user.password) + .expect(200); + // loading the archive likely caused the task to fail so ensure it's rescheduled to run in 2 seconds, // otherwise this test will stall for 5 minutes // no other attributes are touched, only runAt, so unless it would have ran when runAt expired, it // won't run now await supertest - .put(`${getUrlPrefix(space.id)}/api/alerts_fixture/${alertId}/reschedule_task`) + .put( + `${getUrlPrefix(space.id)}/api/alerts_fixture/${ + getResponse.body.scheduled_task_id + }/reschedule_task` + ) .set('kbn-xsrf', 'foo') .send({ runAt: getRunAt(2000), diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts new file mode 100644 index 000000000000..b6e47df31527 --- /dev/null +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { Spaces } from '../../scenarios'; +import { + ESTestIndexTool, + ES_TEST_INDEX_NAME, + getUrlPrefix, + ObjectRemover, +} from '../../../common/lib'; +import { FtrProviderContext } from '../../../common/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default function ({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const es = getService('legacyEs'); + const retry = getService('retry'); + const esTestIndexTool = new ESTestIndexTool(es, retry); + + describe('enqueue', () => { + const objectRemover = new ObjectRemover(supertest); + + before(async () => { + await esTestIndexTool.destroy(); + await esTestIndexTool.setup(); + }); + after(async () => { + await esTestIndexTool.destroy(); + await objectRemover.removeAll(); + }); + + it('should handle enqueue request appropriately', async () => { + const { body: createdAction } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/actions/connector`) + .set('kbn-xsrf', 'foo') + .send({ + name: 'My action', + connector_type_id: 'test.index-record', + config: { + unencrypted: `This value shouldn't get encrypted`, + }, + secrets: { + encrypted: 'This value should be encrypted', + }, + }) + .expect(200); + objectRemover.add(Spaces.space1.id, createdAction.id, 'action', 'actions'); + + const reference = `actions-enqueue-1:${Spaces.space1.id}:${createdAction.id}`; + const response = await supertest + .post( + `${getUrlPrefix(Spaces.space1.id)}/api/alerts_fixture/${createdAction.id}/enqueue_action` + ) + .set('kbn-xsrf', 'foo') + .send({ + params: { + reference, + index: ES_TEST_INDEX_NAME, + message: 'Testing 123', + }, + }); + + expect(response.status).to.eql(204); + await esTestIndexTool.waitForDocs('action:test.index-record', reference, 1); + }); + + it('should cleanup task after a failure', async () => { + const testStart = new Date(); + const { body: createdAction } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/actions/connector`) + .set('kbn-xsrf', 'foo') + .send({ + name: 'My action', + connector_type_id: 'test.failing', + config: {}, + secrets: {}, + }) + .expect(200); + objectRemover.add(Spaces.space1.id, createdAction.id, 'action', 'actions'); + + const reference = `actions-enqueue-2:${Spaces.space1.id}:${createdAction.id}`; + await supertest + .post( + `${getUrlPrefix(Spaces.space1.id)}/api/alerts_fixture/${createdAction.id}/enqueue_action` + ) + .set('kbn-xsrf', 'foo') + .send({ + params: { + reference, + index: ES_TEST_INDEX_NAME, + }, + }) + .expect(204); + + await esTestIndexTool.waitForDocs('action:test.failing', reference, 1); + + await supertest + .put( + `${getUrlPrefix( + Spaces.space1.id + )}/api/alerts_fixture/Actions-cleanup_failed_action_executions/reschedule_task` + ) + .set('kbn-xsrf', 'foo') + .send({ + runAt: new Date().toISOString(), + }) + .expect(200); + + await retry.try(async () => { + const searchResult = await es.search({ + index: '.kibana_task_manager', + body: { + query: { + bool: { + must: [ + { + term: { + 'task.taskType': 'actions:test.failing', + }, + }, + { + range: { + 'task.scheduledAt': { + gte: testStart, + }, + }, + }, + ], + }, + }, + }, + }); + expect(searchResult.hits.total.value).to.eql(0); + }); + }); + }); +} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/actions/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/actions/index.ts index 43f442c13162..fc0b23290a86 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/actions/index.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/actions/index.ts @@ -21,6 +21,7 @@ export default function actionsTests({ loadTestFile, getService }: FtrProviderCo loadTestFile(require.resolve('./connector_types')); loadTestFile(require.resolve('./update')); loadTestFile(require.resolve('./execute')); + loadTestFile(require.resolve('./enqueue')); loadTestFile(require.resolve('./builtin_action_types/es_index')); loadTestFile(require.resolve('./builtin_action_types/webhook')); loadTestFile(require.resolve('./builtin_action_types/preconfigured_alert_history_connector'));