diff --git a/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts index e21fee4ce3d6..a47af44d330c 100644 --- a/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts @@ -43,7 +43,6 @@ import { import { EncryptedSavedObjectsClient } from '../../../encrypted_saved_objects/server'; import { TaskManagerStartContract } from '../../../task_manager/server'; import { taskInstanceToAlertTaskInstance } from '../task_runner/alert_task_instance'; -import { deleteTaskIfItExists } from '../lib/delete_task_if_it_exists'; import { RegistryAlertType, UntypedNormalizedAlertType } from '../alert_type_registry'; import { AlertsAuthorization, WriteOperations, ReadOperations } from '../authorization'; import { IEventLogClient } from '../../../../plugins/event_log/server'; @@ -602,7 +601,7 @@ export class AlertsClient { const removeResult = await this.unsecuredSavedObjectsClient.delete('alert', id); await Promise.all([ - taskIdToRemove ? deleteTaskIfItExists(this.taskManager, taskIdToRemove) : null, + taskIdToRemove ? this.taskManager.removeIfExists(taskIdToRemove) : null, apiKeyToInvalidate ? markApiKeyForInvalidation( { apiKey: apiKeyToInvalidate }, @@ -1060,7 +1059,7 @@ export class AlertsClient { await Promise.all([ attributes.scheduledTaskId - ? deleteTaskIfItExists(this.taskManager, attributes.scheduledTaskId) + ? this.taskManager.removeIfExists(attributes.scheduledTaskId) : null, apiKeyToInvalidate ? await markApiKeyForInvalidation( diff --git a/x-pack/plugins/alerts/server/alerts_client/tests/delete.test.ts b/x-pack/plugins/alerts/server/alerts_client/tests/delete.test.ts index a7ef008eaa2e..8022bc26742a 100644 --- a/x-pack/plugins/alerts/server/alerts_client/tests/delete.test.ts +++ b/x-pack/plugins/alerts/server/alerts_client/tests/delete.test.ts @@ -110,7 +110,7 @@ describe('delete()', () => { const result = await alertsClient.delete({ id: '1' }); expect(result).toEqual({ success: true }); expect(unsecuredSavedObjectsClient.delete).toHaveBeenCalledWith('alert', '1'); - expect(taskManager.remove).toHaveBeenCalledWith('task-123'); + expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); expect(unsecuredSavedObjectsClient.create.mock.calls[0][0]).toBe( 'api_key_pending_invalidation' ); @@ -135,7 +135,7 @@ describe('delete()', () => { const result = await alertsClient.delete({ id: '1' }); expect(result).toEqual({ success: true }); expect(unsecuredSavedObjectsClient.delete).toHaveBeenCalledWith('alert', '1'); - expect(taskManager.remove).toHaveBeenCalledWith('task-123'); + expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled(); expect(unsecuredSavedObjectsClient.get).toHaveBeenCalledWith('alert', '1'); expect(alertsClientParams.logger.error).toHaveBeenCalledWith( @@ -153,7 +153,7 @@ describe('delete()', () => { }); await alertsClient.delete({ id: '1' }); - expect(taskManager.remove).not.toHaveBeenCalled(); + expect(taskManager.removeIfExists).not.toHaveBeenCalled(); }); test(`doesn't invalidate API key when apiKey is null`, async () => { @@ -217,8 +217,8 @@ describe('delete()', () => { ); }); - test('throws error when taskManager.remove throws an error', async () => { - taskManager.remove.mockRejectedValue(new Error('TM Fail')); + test('throws error when taskManager.removeIfExists throws an error', async () => { + taskManager.removeIfExists.mockRejectedValue(new Error('TM Fail')); await expect(alertsClient.delete({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( `"TM Fail"` diff --git a/x-pack/plugins/alerts/server/alerts_client/tests/disable.test.ts b/x-pack/plugins/alerts/server/alerts_client/tests/disable.test.ts index ce0688a5ab2f..448546941185 100644 --- a/x-pack/plugins/alerts/server/alerts_client/tests/disable.test.ts +++ b/x-pack/plugins/alerts/server/alerts_client/tests/disable.test.ts @@ -199,7 +199,7 @@ describe('disable()', () => { version: '123', } ); - expect(taskManager.remove).toHaveBeenCalledWith('task-123'); + expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); expect( (unsecuredSavedObjectsClient.create.mock.calls[0][1] as InvalidatePendingApiKey).apiKeyId ).toBe('123'); @@ -254,7 +254,7 @@ describe('disable()', () => { version: '123', } ); - expect(taskManager.remove).toHaveBeenCalledWith('task-123'); + expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled(); }); @@ -280,7 +280,7 @@ describe('disable()', () => { await alertsClient.disable({ id: '1' }); expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled(); - expect(taskManager.remove).not.toHaveBeenCalled(); + expect(taskManager.removeIfExists).not.toHaveBeenCalled(); expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled(); }); @@ -314,7 +314,7 @@ describe('disable()', () => { await alertsClient.disable({ id: '1' }); expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled(); - expect(taskManager.remove).toHaveBeenCalled(); + expect(taskManager.removeIfExists).toHaveBeenCalled(); expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled(); expect(alertsClientParams.logger.error).toHaveBeenCalledWith( 'disable(): Failed to load API key to invalidate on alert 1: Fail' @@ -338,7 +338,7 @@ describe('disable()', () => { }); test('throws when failing to remove task from task manager', async () => { - taskManager.remove.mockRejectedValueOnce(new Error('Failed to remove task')); + taskManager.removeIfExists.mockRejectedValueOnce(new Error('Failed to remove task')); await expect(alertsClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( `"Failed to remove task"` diff --git a/x-pack/plugins/data_enhanced/common/index.ts b/x-pack/plugins/data_enhanced/common/index.ts index e3e91ccf967c..669c33230a34 100644 --- a/x-pack/plugins/data_enhanced/common/index.ts +++ b/x-pack/plugins/data_enhanced/common/index.ts @@ -13,7 +13,6 @@ export { IAsyncSearchOptions, pollSearch, SearchSessionSavedObjectAttributes, - SearchSessionFindOptions, SearchSessionStatus, SearchSessionRequestInfo, } from './search'; diff --git a/x-pack/plugins/data_enhanced/common/search/session/types.ts b/x-pack/plugins/data_enhanced/common/search/session/types.ts index 6f75e6085636..ada7988c31f3 100644 --- a/x-pack/plugins/data_enhanced/common/search/session/types.ts +++ b/x-pack/plugins/data_enhanced/common/search/session/types.ts @@ -13,18 +13,55 @@ export interface SearchSessionSavedObjectAttributes { * App that created the session. e.g 'discover' */ appId: string; + /** + * Creation time of the session + */ created: string; + /** + * Expiration time of the session. Expiration itself is managed by Elasticsearch. + */ expires: string; + /** + * status + */ status: string; + /** + * urlGeneratorId + */ urlGeneratorId: string; + /** + * The application state that was used to create the session. + * Should be used, for example, to re-load an expired search session. + */ initialState: Record; + /** + * Application state that should be used to restore the session. + * For example, relative dates are conveted to absolute ones. + */ restoreState: Record; + /** + * Mapping of search request hashes to their corresponsing info (async search id, etc.) + */ idMapping: Record; } export interface SearchSessionRequestInfo { - id: string; // ID of the async search request - strategy: string; // Search strategy used to submit the search request + /** + * ID of the async search request + */ + id: string; + /** + * Search strategy used to submit the search request + */ + strategy: string; + /** + * status + */ + status: string; + /** + * An optional error. Set if status is set to error. + */ + error?: string; } export interface SearchSessionFindOptions { diff --git a/x-pack/plugins/data_enhanced/kibana.json b/x-pack/plugins/data_enhanced/kibana.json index eea0101ec4ed..3951468f6e56 100644 --- a/x-pack/plugins/data_enhanced/kibana.json +++ b/x-pack/plugins/data_enhanced/kibana.json @@ -8,7 +8,8 @@ "requiredPlugins": [ "bfetch", "data", - "features" + "features", + "taskManager" ], "optionalPlugins": ["kibanaUtils", "usageCollection"], "server": true, diff --git a/x-pack/plugins/data_enhanced/server/plugin.ts b/x-pack/plugins/data_enhanced/server/plugin.ts index 69a92f1d60ac..592a5df1eee2 100644 --- a/x-pack/plugins/data_enhanced/server/plugin.ts +++ b/x-pack/plugins/data_enhanced/server/plugin.ts @@ -5,6 +5,7 @@ */ import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'kibana/server'; +import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server'; import { PluginSetup as DataPluginSetup, PluginStart as DataPluginStart, @@ -24,9 +25,15 @@ import { getUiSettings } from './ui_settings'; interface SetupDependencies { data: DataPluginSetup; usageCollection?: UsageCollectionSetup; + taskManager: TaskManagerSetupContract; +} +export interface StartDependencies { + data: DataPluginStart; + taskManager: TaskManagerStartContract; } -export class EnhancedDataServerPlugin implements Plugin { +export class EnhancedDataServerPlugin + implements Plugin { private readonly logger: Logger; private sessionService!: SearchSessionService; @@ -65,10 +72,17 @@ export class EnhancedDataServerPlugin implements Plugin { + let mockClient: any; + let savedObjectsClient: jest.Mocked; + const mockLogger: any = { + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }; + + beforeEach(() => { + savedObjectsClient = savedObjectsClientMock.create(); + mockClient = { + asyncSearch: { + status: jest.fn(), + }, + }; + }); + + test('does nothing if there are no open sessions', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [], + total: 0, + } as any); + + await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + }); + + test('does nothing if there are no searchIds in the saved object', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + attributes: { + idMapping: {}, + }, + }, + ], + total: 1, + } as any); + + await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + }); + + test('does nothing if the search is still running', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + const so = { + attributes: { + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.IN_PROGRESS, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); + + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: true, + is_running: true, + }, + }); + + await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + + expect(savedObjectsClient.bulkUpdate).not.toBeCalled(); + }); + + test("doesn't re-check completed or errored searches", async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + const so = { + attributes: { + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.COMPLETE, + }, + 'another-search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.ERROR, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); + + await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + + expect(mockClient.asyncSearch.status).not.toBeCalled(); + }); + + test('updates to complete if the search is done', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + const so = { + attributes: { + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.IN_PROGRESS, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); + + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: false, + completion_status: 200, + }, + }); + + await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + + expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' }); + const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; + const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes; + expect(updatedAttributes.status).toBe(SearchSessionStatus.COMPLETE); + expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.COMPLETE); + expect(updatedAttributes.idMapping['search-hash'].error).toBeUndefined(); + }); + + test('updates to error if the search is errored', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + const so = { + attributes: { + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.IN_PROGRESS, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); + + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: false, + completion_status: 500, + }, + }); + + await checkRunningSessions(savedObjectsClient, mockClient, mockLogger); + const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; + + const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes; + expect(updatedAttributes.status).toBe(SearchSessionStatus.ERROR); + expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR); + expect(updatedAttributes.idMapping['search-hash'].error).toBe( + 'Search completed with a 500 status' + ); + }); +}); diff --git a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts new file mode 100644 index 000000000000..71274e15e284 --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + Logger, + ElasticsearchClient, + SavedObjectsFindResult, + SavedObjectsClientContract, +} from 'kibana/server'; +import { + SearchSessionStatus, + SearchSessionSavedObjectAttributes, + SearchSessionRequestInfo, +} from '../../../common'; +import { SEARCH_SESSION_TYPE } from '../../saved_objects'; +import { getSearchStatus } from './get_search_status'; +import { getSessionStatus } from './get_session_status'; +import { SearchStatus } from './types'; + +export async function checkRunningSessions( + savedObjectsClient: SavedObjectsClientContract, + client: ElasticsearchClient, + logger: Logger +): Promise { + try { + const runningSearchSessionsResponse = await savedObjectsClient.find( + { + type: SEARCH_SESSION_TYPE, + search: SearchSessionStatus.IN_PROGRESS.toString(), + searchFields: ['status'], + namespaces: ['*'], + } + ); + + if (!runningSearchSessionsResponse.total) return; + + logger.debug(`Found ${runningSearchSessionsResponse.total} running sessions`); + + const updatedSessions = new Array>(); + + let sessionUpdated = false; + + await Promise.all( + runningSearchSessionsResponse.saved_objects.map(async (session) => { + // Check statuses of all running searches + await Promise.all( + Object.keys(session.attributes.idMapping).map(async (searchKey: string) => { + const updateSearchRequest = ( + currentStatus: Pick + ) => { + sessionUpdated = true; + session.attributes.idMapping[searchKey] = { + ...session.attributes.idMapping[searchKey], + ...currentStatus, + }; + }; + + const searchInfo = session.attributes.idMapping[searchKey]; + if (searchInfo.status === SearchStatus.IN_PROGRESS) { + try { + const currentStatus = await getSearchStatus(client, searchInfo.id); + + if (currentStatus.status !== SearchStatus.IN_PROGRESS) { + updateSearchRequest(currentStatus); + } + } catch (e) { + logger.error(e); + updateSearchRequest({ + status: SearchStatus.ERROR, + error: e.message || e.meta.error?.caused_by?.reason, + }); + } + } + }) + ); + + // And only then derive the session's status + const sessionStatus = getSessionStatus(session.attributes); + if (sessionStatus !== SearchSessionStatus.IN_PROGRESS) { + session.attributes.status = sessionStatus; + sessionUpdated = true; + } + + if (sessionUpdated) { + updatedSessions.push(session); + } + }) + ); + + if (updatedSessions.length) { + // If there's an error, we'll try again in the next iteration, so there's no need to check the output. + const updatedResponse = await savedObjectsClient.bulkUpdate( + updatedSessions + ); + logger.debug(`Updated ${updatedResponse.saved_objects.length} background sessions`); + } + } catch (err) { + logger.error(err); + } +} diff --git a/x-pack/plugins/data_enhanced/server/search/session/constants.ts b/x-pack/plugins/data_enhanced/server/search/session/constants.ts new file mode 100644 index 000000000000..4ac32938c484 --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/constants.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export const INMEM_MAX_SESSIONS = 10000; +export const DEFAULT_EXPIRATION = 7 * 24 * 60 * 60 * 1000; +export const INMEM_TRACKING_INTERVAL = 10 * 1000; +export const INMEM_TRACKING_TIMEOUT_SEC = 60; +export const MAX_UPDATE_RETRIES = 3; diff --git a/x-pack/plugins/data_enhanced/server/search/session/get_search_status.test.ts b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.test.ts new file mode 100644 index 000000000000..e66ce613b71d --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.test.ts @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { SearchStatus } from './types'; +import { getSearchStatus } from './get_search_status'; + +describe('getSearchStatus', () => { + let mockClient: any; + beforeEach(() => { + mockClient = { + asyncSearch: { + status: jest.fn(), + }, + }; + }); + + test('returns an error status if search is partial and not running', () => { + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: true, + is_running: false, + completion_status: 200, + }, + }); + expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + }); + + test('returns an error status if completion_status is an error', () => { + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: false, + completion_status: 500, + }, + }); + expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + }); + + test('returns an error status if gets an ES error', () => { + mockClient.asyncSearch.status.mockResolvedValue({ + error: { + root_cause: { + reason: 'not found', + }, + }, + }); + expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + }); + + test('returns an error status throws', () => { + mockClient.asyncSearch.status.mockRejectedValue(new Error('O_o')); + expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR); + }); + + test('returns a complete status', () => { + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: false, + completion_status: 200, + }, + }); + expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.COMPLETE); + }); + + test('returns a running status otherwise', () => { + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: true, + completion_status: undefined, + }, + }); + expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.IN_PROGRESS); + }); +}); diff --git a/x-pack/plugins/data_enhanced/server/search/session/get_search_status.ts b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.ts new file mode 100644 index 000000000000..e2b5fc0157b3 --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/get_search_status.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { i18n } from '@kbn/i18n'; +import { ApiResponse } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from 'src/core/server'; +import { SearchStatus } from './types'; +import { AsyncSearchStatusResponse } from '../types'; +import { SearchSessionRequestInfo } from '../../../common'; + +export async function getSearchStatus( + client: ElasticsearchClient, + asyncId: string +): Promise> { + // TODO: Handle strategies other than the default one + const apiResponse: ApiResponse = await client.asyncSearch.status({ + id: asyncId, + }); + const response = apiResponse.body; + if ((response.is_partial && !response.is_running) || response.completion_status >= 400) { + return { + status: SearchStatus.ERROR, + error: i18n.translate('xpack.data.search.statusError', { + defaultMessage: `Search completed with a {errorCode} status`, + values: { errorCode: response.completion_status }, + }), + }; + } else if (!response.is_partial && !response.is_running) { + return { + status: SearchStatus.COMPLETE, + error: undefined, + }; + } else { + return { + status: SearchStatus.IN_PROGRESS, + error: undefined, + }; + } +} diff --git a/x-pack/plugins/data_enhanced/server/search/session/get_session_status.test.ts b/x-pack/plugins/data_enhanced/server/search/session/get_session_status.test.ts new file mode 100644 index 000000000000..35bfdeee691e --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/get_session_status.test.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { SearchStatus } from './types'; +import { getSessionStatus } from './get_session_status'; +import { SearchSessionStatus } from '../../../common'; + +describe('getSessionStatus', () => { + test("returns an in_progress status if there's nothing inside the session", () => { + const session: any = { + idMapping: {}, + }; + expect(getSessionStatus(session)).toBe(SearchSessionStatus.IN_PROGRESS); + }); + + test("returns an error status if there's at least one error", () => { + const session: any = { + idMapping: { + a: { status: SearchStatus.IN_PROGRESS }, + b: { status: SearchStatus.ERROR, error: 'Nope' }, + c: { status: SearchStatus.COMPLETE }, + }, + }; + expect(getSessionStatus(session)).toBe(SearchSessionStatus.ERROR); + }); + + test('returns a complete status if all are complete', () => { + const session: any = { + idMapping: { + a: { status: SearchStatus.COMPLETE }, + b: { status: SearchStatus.COMPLETE }, + c: { status: SearchStatus.COMPLETE }, + }, + }; + expect(getSessionStatus(session)).toBe(SearchSessionStatus.COMPLETE); + }); + + test('returns a running status if some are still running', () => { + const session: any = { + idMapping: { + a: { status: SearchStatus.IN_PROGRESS }, + b: { status: SearchStatus.COMPLETE }, + c: { status: SearchStatus.IN_PROGRESS }, + }, + }; + expect(getSessionStatus(session)).toBe(SearchSessionStatus.IN_PROGRESS); + }); +}); diff --git a/x-pack/plugins/data_enhanced/server/search/session/get_session_status.ts b/x-pack/plugins/data_enhanced/server/search/session/get_session_status.ts new file mode 100644 index 000000000000..296f4e489932 --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/get_session_status.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { SearchSessionSavedObjectAttributes, SearchSessionStatus } from '../../../common'; +import { SearchStatus } from './types'; + +export function getSessionStatus(session: SearchSessionSavedObjectAttributes): SearchSessionStatus { + const searchStatuses = Object.values(session.idMapping); + if (searchStatuses.some((item) => item.status === SearchStatus.ERROR)) { + return SearchSessionStatus.ERROR; + } else if ( + searchStatuses.length > 0 && + searchStatuses.every((item) => item.status === SearchStatus.COMPLETE) + ) { + return SearchSessionStatus.COMPLETE; + } else { + return SearchSessionStatus.IN_PROGRESS; + } +} diff --git a/x-pack/plugins/data_enhanced/server/search/session/index.ts b/x-pack/plugins/data_enhanced/server/search/session/index.ts index 5b75885fb31d..8d5e21f3d827 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/index.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/index.ts @@ -5,3 +5,4 @@ */ export * from './session_service'; +export { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task'; diff --git a/x-pack/plugins/data_enhanced/server/search/session/monitoring_task.ts b/x-pack/plugins/data_enhanced/server/search/session/monitoring_task.ts new file mode 100644 index 000000000000..a7d57c94fa15 --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/monitoring_task.ts @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + TaskManagerSetupContract, + TaskManagerStartContract, + RunContext, +} from '../../../../task_manager/server'; +import { checkRunningSessions } from './check_running_sessions'; +import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server'; +import { SEARCH_SESSION_TYPE } from '../../saved_objects'; + +export const SEARCH_SESSIONS_TASK_TYPE = 'bg_monitor'; +export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`; +export const MONITOR_INTERVAL = 15; // in seconds + +function searchSessionRunner(core: CoreSetup, logger: Logger) { + return ({ taskInstance }: RunContext) => { + return { + async run() { + const [coreStart] = await core.getStartServices(); + const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]); + const internalSavedObjectsClient = new SavedObjectsClient(internalRepo); + await checkRunningSessions( + internalSavedObjectsClient, + coreStart.elasticsearch.client.asInternalUser, + logger + ); + + return { + runAt: new Date(Date.now() + MONITOR_INTERVAL * 1000), + state: {}, + }; + }, + }; + }; +} + +export function registerSearchSessionsTask( + core: CoreSetup, + taskManager: TaskManagerSetupContract, + logger: Logger +) { + taskManager.registerTaskDefinitions({ + [SEARCH_SESSIONS_TASK_TYPE]: { + title: 'Search Sessions Monitor', + createTaskRunner: searchSessionRunner(core, logger), + }, + }); +} + +export async function scheduleSearchSessionsTasks( + taskManager: TaskManagerStartContract, + logger: Logger +) { + await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID); + + try { + await taskManager.ensureScheduled({ + id: SEARCH_SESSIONS_TASK_ID, + taskType: SEARCH_SESSIONS_TASK_TYPE, + schedule: { + interval: `${MONITOR_INTERVAL}s`, + }, + state: {}, + params: {}, + }); + + logger.debug(`Background search task, scheduled to run`); + } catch (e) { + logger.debug(`Error scheduling task, received ${e.message}`); + } +} diff --git a/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts b/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts index 1d03ee5cc6aa..3114e746d045 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/session_service.test.ts @@ -10,17 +10,15 @@ import type { SearchStrategyDependencies } from '../../../../../../src/plugins/d import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks'; import { SearchSessionStatus } from '../../../common'; import { SEARCH_SESSION_TYPE } from '../../saved_objects'; -import { - SearchSessionDependencies, - SearchSessionService, - INMEM_TRACKING_INTERVAL, - MAX_UPDATE_RETRIES, - SessionInfo, -} from './session_service'; +import { SearchSessionDependencies, SearchSessionService, SessionInfo } from './session_service'; import { createRequestHash } from './utils'; import moment from 'moment'; import { coreMock } from 'src/core/server/mocks'; import { ConfigSchema } from '../../../config'; +// @ts-ignore +import { taskManagerMock } from '../../../../task_manager/server/mocks'; +import { INMEM_TRACKING_INTERVAL, MAX_UPDATE_RETRIES } from './constants'; +import { SearchStatus } from './types'; const flushPromises = () => new Promise((resolve) => setImmediate(resolve)); @@ -340,6 +338,7 @@ describe('SearchSessionService', () => { [requestHash]: { id: searchId, strategy: MOCK_STRATEGY, + status: SearchStatus.IN_PROGRESS, }, }, }); @@ -421,7 +420,11 @@ describe('SearchSessionService', () => { }, }, }); - await service.start(coreMock.createStart(), config$); + const mockTaskManager = taskManagerMock.createStart(); + await service.start(coreMock.createStart(), { + config$, + taskManager: mockTaskManager, + }); await flushPromises(); }); diff --git a/x-pack/plugins/data_enhanced/server/search/session/session_service.ts b/x-pack/plugins/data_enhanced/server/search/session/session_service.ts index 8f590e163952..8c9e0dad4957 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/session_service.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/session_service.ts @@ -14,7 +14,9 @@ import { SavedObjectsClientContract, Logger, SavedObject, + CoreSetup, SavedObjectsBulkUpdateObject, + SavedObjectsFindOptions, } from '../../../../../../src/core/server'; import { IKibanaSearchRequest, @@ -29,21 +31,27 @@ import { ISessionService, SearchStrategyDependencies, } from '../../../../../../src/plugins/data/server'; +import { + TaskManagerSetupContract, + TaskManagerStartContract, +} from '../../../../task_manager/server'; import { SearchSessionSavedObjectAttributes, - SearchSessionFindOptions, SearchSessionRequestInfo, SearchSessionStatus, } from '../../../common'; import { SEARCH_SESSION_TYPE } from '../../saved_objects'; import { createRequestHash } from './utils'; import { ConfigSchema } from '../../../config'; - -const INMEM_MAX_SESSIONS = 10000; -const DEFAULT_EXPIRATION = 7 * 24 * 60 * 60 * 1000; -export const INMEM_TRACKING_INTERVAL = 10 * 1000; -export const INMEM_TRACKING_TIMEOUT_SEC = 60; -export const MAX_UPDATE_RETRIES = 3; +import { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task'; +import { + DEFAULT_EXPIRATION, + INMEM_MAX_SESSIONS, + INMEM_TRACKING_INTERVAL, + INMEM_TRACKING_TIMEOUT_SEC, + MAX_UPDATE_RETRIES, +} from './constants'; +import { SearchStatus } from './types'; export interface SearchSessionDependencies { savedObjectsClient: SavedObjectsClientContract; @@ -55,6 +63,14 @@ export interface SessionInfo { ids: Map; } +interface SetupDependencies { + taskManager: TaskManagerSetupContract; +} + +interface StartDependencies { + taskManager: TaskManagerStartContract; + config$: Observable; +} export class SearchSessionService implements ISessionService { /** * Map of sessionId to { [requestHash]: searchId } @@ -66,8 +82,12 @@ export class SearchSessionService implements ISessionService { constructor(private readonly logger: Logger) {} - public async start(core: CoreStart, config$: Observable) { - return this.setupMonitoring(core, config$); + public setup(core: CoreSetup, deps: SetupDependencies) { + registerSearchSessionsTask(core, deps.taskManager, this.logger); + } + + public async start(core: CoreStart, deps: StartDependencies) { + return this.setupMonitoring(core, deps); } public stop() { @@ -75,9 +95,10 @@ export class SearchSessionService implements ISessionService { clearTimeout(this.monitorTimer); } - private setupMonitoring = async (core: CoreStart, config$: Observable) => { - const config = await config$.pipe(first()).toPromise(); + private setupMonitoring = async (core: CoreStart, deps: StartDependencies) => { + const config = await deps.config$.pipe(first()).toPromise(); if (config.search.sendToBackground.enabled) { + scheduleSearchSessionsTasks(deps.taskManager, this.logger); this.logger.debug(`setupMonitoring | Enabling monitoring`); const internalRepo = core.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]); this.internalSavedObjectsClient = new SavedObjectsClient(internalRepo); @@ -281,7 +302,7 @@ export class SearchSessionService implements ISessionService { // TODO: Throw an error if this session doesn't belong to this user public find = ( - options: SearchSessionFindOptions, + options: Omit, { savedObjectsClient }: SearchSessionDependencies ) => { return savedObjectsClient.find({ @@ -326,6 +347,7 @@ export class SearchSessionService implements ISessionService { const searchInfo = { id: searchId, strategy: strategy!, + status: SearchStatus.IN_PROGRESS, }; // If there is already a saved object for this session, update it to include this request/ID. @@ -387,7 +409,7 @@ export class SearchSessionService implements ISessionService { save: (sessionId: string, attributes: Partial) => this.save(sessionId, attributes, deps), get: (sessionId: string) => this.get(sessionId, deps), - find: (options: SearchSessionFindOptions) => this.find(options, deps), + find: (options: SavedObjectsFindOptions) => this.find(options, deps), update: (sessionId: string, attributes: Partial) => this.update(sessionId, attributes, deps), delete: (sessionId: string) => this.delete(sessionId, deps), diff --git a/x-pack/plugins/data_enhanced/server/search/session/types.ts b/x-pack/plugins/data_enhanced/server/search/session/types.ts new file mode 100644 index 000000000000..c30e03f70d2d --- /dev/null +++ b/x-pack/plugins/data_enhanced/server/search/session/types.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export enum SearchStatus { + IN_PROGRESS = 'in_progress', + ERROR = 'error', + COMPLETE = 'complete', +} diff --git a/x-pack/plugins/data_enhanced/server/search/types.ts b/x-pack/plugins/data_enhanced/server/search/types.ts index f01ac51a1516..4401b7211fb6 100644 --- a/x-pack/plugins/data_enhanced/server/search/types.ts +++ b/x-pack/plugins/data_enhanced/server/search/types.ts @@ -4,14 +4,20 @@ * you may not use this file except in compliance with the Elastic License. */ -import { SearchResponse } from 'elasticsearch'; +import { SearchResponse, ShardsResponse } from 'elasticsearch'; export interface AsyncSearchResponse { id?: string; response: SearchResponse; + start_time_in_millis: number; + expiration_time_in_millis: number; is_partial: boolean; is_running: boolean; } +export interface AsyncSearchStatusResponse extends Omit { + completion_status: number; + _shards: ShardsResponse; +} export interface EqlSearchResponse extends SearchResponse { id?: string; diff --git a/x-pack/plugins/data_enhanced/tsconfig.json b/x-pack/plugins/data_enhanced/tsconfig.json index 28969652f23d..ec5c656ac50b 100644 --- a/x-pack/plugins/data_enhanced/tsconfig.json +++ b/x-pack/plugins/data_enhanced/tsconfig.json @@ -22,6 +22,7 @@ { "path": "../../../src/plugins/kibana_react/tsconfig.json" }, { "path": "../../../src/plugins/kibana_utils/tsconfig.json" }, { "path": "../../../src/plugins/usage_collection/tsconfig.json" }, + { "path": "../task_manager/tsconfig.json" }, { "path": "../features/tsconfig.json" }, ] diff --git a/x-pack/plugins/alerts/server/lib/delete_task_if_it_exists.test.ts b/x-pack/plugins/task_manager/server/lib/remove_if_exists.test.ts similarity index 50% rename from x-pack/plugins/alerts/server/lib/delete_task_if_it_exists.test.ts rename to x-pack/plugins/task_manager/server/lib/remove_if_exists.test.ts index 84a1743387c9..17ccb97c322f 100644 --- a/x-pack/plugins/alerts/server/lib/delete_task_if_it_exists.test.ts +++ b/x-pack/plugins/task_manager/server/lib/remove_if_exists.test.ts @@ -5,40 +5,40 @@ */ import uuid from 'uuid'; -import { taskManagerMock } from '../../../task_manager/server/mocks'; import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; -import { deleteTaskIfItExists } from './delete_task_if_it_exists'; +import { removeIfExists } from './remove_if_exists'; +import { taskStoreMock } from '../task_store.mock'; -describe('deleteTaskIfItExists', () => { +describe('removeIfExists', () => { test('removes the task by its ID', async () => { - const tm = taskManagerMock.createStart(); + const ts = taskStoreMock.create({}); const id = uuid.v4(); - expect(await deleteTaskIfItExists(tm, id)).toBe(undefined); + expect(await removeIfExists(ts, id)).toBe(undefined); - expect(tm.remove).toHaveBeenCalledWith(id); + expect(ts.remove).toHaveBeenCalledWith(id); }); test('handles 404 errors caused by the task not existing', async () => { - const tm = taskManagerMock.createStart(); + const ts = taskStoreMock.create({}); const id = uuid.v4(); - tm.remove.mockRejectedValue(SavedObjectsErrorHelpers.createGenericNotFoundError('task', id)); + ts.remove.mockRejectedValue(SavedObjectsErrorHelpers.createGenericNotFoundError('task', id)); - expect(await deleteTaskIfItExists(tm, id)).toBe(undefined); + expect(await removeIfExists(ts, id)).toBe(undefined); - expect(tm.remove).toHaveBeenCalledWith(id); + expect(ts.remove).toHaveBeenCalledWith(id); }); test('throws if any other errro is caused by task removal', async () => { - const tm = taskManagerMock.createStart(); + const ts = taskStoreMock.create({}); const id = uuid.v4(); const error = SavedObjectsErrorHelpers.createInvalidVersionError(uuid.v4()); - tm.remove.mockRejectedValue(error); + ts.remove.mockRejectedValue(error); - expect(deleteTaskIfItExists(tm, id)).rejects.toBe(error); + expect(removeIfExists(ts, id)).rejects.toBe(error); - expect(tm.remove).toHaveBeenCalledWith(id); + expect(ts.remove).toHaveBeenCalledWith(id); }); }); diff --git a/x-pack/plugins/alerts/server/lib/delete_task_if_it_exists.ts b/x-pack/plugins/task_manager/server/lib/remove_if_exists.ts similarity index 59% rename from x-pack/plugins/alerts/server/lib/delete_task_if_it_exists.ts rename to x-pack/plugins/task_manager/server/lib/remove_if_exists.ts index 53bb1b5cb5d5..77ab20683c3c 100644 --- a/x-pack/plugins/alerts/server/lib/delete_task_if_it_exists.ts +++ b/x-pack/plugins/task_manager/server/lib/remove_if_exists.ts @@ -3,12 +3,19 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import { TaskManagerStartContract } from '../../../task_manager/server'; import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; +import { TaskStore } from '../task_store'; -export async function deleteTaskIfItExists(taskManager: TaskManagerStartContract, taskId: string) { +/** + * Removes a task from the store, ignoring a not found error + * Other errors are re-thrown + * + * @param taskStore + * @param taskId + */ +export async function removeIfExists(taskStore: TaskStore, taskId: string) { try { - await taskManager.remove(taskId); + await taskStore.remove(taskId); } catch (err) { if (!SavedObjectsErrorHelpers.isNotFoundError(err)) { throw err; diff --git a/x-pack/plugins/task_manager/server/mocks.ts b/x-pack/plugins/task_manager/server/mocks.ts index 4a78a0b49001..45c077e64fff 100644 --- a/x-pack/plugins/task_manager/server/mocks.ts +++ b/x-pack/plugins/task_manager/server/mocks.ts @@ -22,6 +22,7 @@ const createStartMock = () => { schedule: jest.fn(), runNow: jest.fn(), ensureScheduled: jest.fn(), + removeIfExists: jest.fn(), }; return mock; }; diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 70688cd169d7..260d12565d4b 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -18,6 +18,7 @@ import { TaskDefinition } from './task'; import { TaskPollingLifecycle } from './polling_lifecycle'; import { TaskManagerConfig } from './config'; import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware'; +import { removeIfExists } from './lib/remove_if_exists'; import { setupSavedObjects } from './saved_objects'; import { TaskTypeDictionary } from './task_type_dictionary'; import { FetchResult, SearchOpts, TaskStore } from './task_store'; @@ -35,7 +36,9 @@ export type TaskManagerStartContract = Pick< TaskScheduling, 'schedule' | 'runNow' | 'ensureScheduled' > & - Pick; + Pick & { + removeIfExists: TaskStore['remove']; + }; export class TaskManagerPlugin implements Plugin { @@ -156,6 +159,7 @@ export class TaskManagerPlugin fetch: (opts: SearchOpts): Promise => taskStore.fetch(opts), get: (id: string) => taskStore.get(id), remove: (id: string) => taskStore.remove(id), + removeIfExists: (id: string) => removeIfExists(taskStore, id), schedule: (...args) => taskScheduling.schedule(...args), ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args), runNow: (...args) => taskScheduling.runNow(...args),