[Search] Search Sessions Monitoring Task (#85253)

* Monitor ids

* import fix

* solve circular dep

* eslint

* mock circular dep

* max retries test

* mock circular dep

* test

* jest <(-:C

* jestttttt

* [data.search] Move search method inside session service and add tests

* merge

* Move background session service to data_enhanced plugin

* Better logs
Save IDs only in monitoring loop

* Fix types

* Space aware session service

* ts

* initial

* initial

* Fix session service saving

* merge fix

* stable stringify

* INMEM_MAX_SESSIONS

* INMEM_MAX_SESSIONS

* use the status API

* Move task scheduling behind a feature flag

* Update x-pack/plugins/data_enhanced/server/search/session/session_service.ts

Co-authored-by: Anton Dosov <dosantappdev@gmail.com>

* Add unit tests

* Update x-pack/plugins/data_enhanced/server/search/session/session_service.ts

Co-authored-by: Anton Dosov <dosantappdev@gmail.com>

* Use setTimeout to schedule monitoring steps

* Update request_utils.ts

* settimeout

* tiny cleanup

* Core review + use client.asyncSearch.status

* update ts

* fix unit test

* code review fixes

* Save individual search errors on SO

* Don't re-fetch completed or errored searches

* Rename Background Sessions to Search Sessions (with a send to background action)

* doc

* doc

* jest fun

* rename rfc

* translations

* merge fix

* merge fix

* code review

* update so name in features

* Move deleteTaskIfItExists to task manager

* task_manager to ts project

* Move deleteTaskIfItExists to public contract

* mock

* use task store

* ts

* code review

* code review + jest

* Alerting code review

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Anton Dosov <dosantappdev@gmail.com>
Co-authored-by: restrry <restrry@gmail.com>
This commit is contained in:
Liza Katz 2021-01-11 16:36:38 +02:00 committed by GitHub
parent dd853998da
commit 3eeec0f571
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 741 additions and 60 deletions

View file

@ -43,7 +43,6 @@ import {
import { EncryptedSavedObjectsClient } from '../../../encrypted_saved_objects/server';
import { TaskManagerStartContract } from '../../../task_manager/server';
import { taskInstanceToAlertTaskInstance } from '../task_runner/alert_task_instance';
import { deleteTaskIfItExists } from '../lib/delete_task_if_it_exists';
import { RegistryAlertType, UntypedNormalizedAlertType } from '../alert_type_registry';
import { AlertsAuthorization, WriteOperations, ReadOperations } from '../authorization';
import { IEventLogClient } from '../../../../plugins/event_log/server';
@ -602,7 +601,7 @@ export class AlertsClient {
const removeResult = await this.unsecuredSavedObjectsClient.delete('alert', id);
await Promise.all([
taskIdToRemove ? deleteTaskIfItExists(this.taskManager, taskIdToRemove) : null,
taskIdToRemove ? this.taskManager.removeIfExists(taskIdToRemove) : null,
apiKeyToInvalidate
? markApiKeyForInvalidation(
{ apiKey: apiKeyToInvalidate },
@ -1060,7 +1059,7 @@ export class AlertsClient {
await Promise.all([
attributes.scheduledTaskId
? deleteTaskIfItExists(this.taskManager, attributes.scheduledTaskId)
? this.taskManager.removeIfExists(attributes.scheduledTaskId)
: null,
apiKeyToInvalidate
? await markApiKeyForInvalidation(

View file

@ -110,7 +110,7 @@ describe('delete()', () => {
const result = await alertsClient.delete({ id: '1' });
expect(result).toEqual({ success: true });
expect(unsecuredSavedObjectsClient.delete).toHaveBeenCalledWith('alert', '1');
expect(taskManager.remove).toHaveBeenCalledWith('task-123');
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
expect(unsecuredSavedObjectsClient.create.mock.calls[0][0]).toBe(
'api_key_pending_invalidation'
);
@ -135,7 +135,7 @@ describe('delete()', () => {
const result = await alertsClient.delete({ id: '1' });
expect(result).toEqual({ success: true });
expect(unsecuredSavedObjectsClient.delete).toHaveBeenCalledWith('alert', '1');
expect(taskManager.remove).toHaveBeenCalledWith('task-123');
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.get).toHaveBeenCalledWith('alert', '1');
expect(alertsClientParams.logger.error).toHaveBeenCalledWith(
@ -153,7 +153,7 @@ describe('delete()', () => {
});
await alertsClient.delete({ id: '1' });
expect(taskManager.remove).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalled();
});
test(`doesn't invalidate API key when apiKey is null`, async () => {
@ -217,8 +217,8 @@ describe('delete()', () => {
);
});
test('throws error when taskManager.remove throws an error', async () => {
taskManager.remove.mockRejectedValue(new Error('TM Fail'));
test('throws error when taskManager.removeIfExists throws an error', async () => {
taskManager.removeIfExists.mockRejectedValue(new Error('TM Fail'));
await expect(alertsClient.delete({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"TM Fail"`

View file

@ -199,7 +199,7 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.remove).toHaveBeenCalledWith('task-123');
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
expect(
(unsecuredSavedObjectsClient.create.mock.calls[0][1] as InvalidatePendingApiKey).apiKeyId
).toBe('123');
@ -254,7 +254,7 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.remove).toHaveBeenCalledWith('task-123');
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled();
});
@ -280,7 +280,7 @@ describe('disable()', () => {
await alertsClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled();
expect(taskManager.remove).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled();
});
@ -314,7 +314,7 @@ describe('disable()', () => {
await alertsClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
expect(taskManager.remove).toHaveBeenCalled();
expect(taskManager.removeIfExists).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled();
expect(alertsClientParams.logger.error).toHaveBeenCalledWith(
'disable(): Failed to load API key to invalidate on alert 1: Fail'
@ -338,7 +338,7 @@ describe('disable()', () => {
});
test('throws when failing to remove task from task manager', async () => {
taskManager.remove.mockRejectedValueOnce(new Error('Failed to remove task'));
taskManager.removeIfExists.mockRejectedValueOnce(new Error('Failed to remove task'));
await expect(alertsClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to remove task"`

View file

@ -13,7 +13,6 @@ export {
IAsyncSearchOptions,
pollSearch,
SearchSessionSavedObjectAttributes,
SearchSessionFindOptions,
SearchSessionStatus,
SearchSessionRequestInfo,
} from './search';

View file

@ -13,18 +13,55 @@ export interface SearchSessionSavedObjectAttributes {
* App that created the session. e.g 'discover'
*/
appId: string;
/**
* Creation time of the session
*/
created: string;
/**
* Expiration time of the session. Expiration itself is managed by Elasticsearch.
*/
expires: string;
/**
* status
*/
status: string;
/**
* urlGeneratorId
*/
urlGeneratorId: string;
/**
* The application state that was used to create the session.
* Should be used, for example, to re-load an expired search session.
*/
initialState: Record<string, unknown>;
/**
* Application state that should be used to restore the session.
* For example, relative dates are conveted to absolute ones.
*/
restoreState: Record<string, unknown>;
/**
* Mapping of search request hashes to their corresponsing info (async search id, etc.)
*/
idMapping: Record<string, SearchSessionRequestInfo>;
}
export interface SearchSessionRequestInfo {
id: string; // ID of the async search request
strategy: string; // Search strategy used to submit the search request
/**
* ID of the async search request
*/
id: string;
/**
* Search strategy used to submit the search request
*/
strategy: string;
/**
* status
*/
status: string;
/**
* An optional error. Set if status is set to error.
*/
error?: string;
}
export interface SearchSessionFindOptions {

View file

@ -8,7 +8,8 @@
"requiredPlugins": [
"bfetch",
"data",
"features"
"features",
"taskManager"
],
"optionalPlugins": ["kibanaUtils", "usageCollection"],
"server": true,

View file

@ -5,6 +5,7 @@
*/
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'kibana/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import {
PluginSetup as DataPluginSetup,
PluginStart as DataPluginStart,
@ -24,9 +25,15 @@ import { getUiSettings } from './ui_settings';
interface SetupDependencies {
data: DataPluginSetup;
usageCollection?: UsageCollectionSetup;
taskManager: TaskManagerSetupContract;
}
export interface StartDependencies {
data: DataPluginStart;
taskManager: TaskManagerStartContract;
}
export class EnhancedDataServerPlugin implements Plugin<void, void, SetupDependencies> {
export class EnhancedDataServerPlugin
implements Plugin<void, void, SetupDependencies, StartDependencies> {
private readonly logger: Logger;
private sessionService!: SearchSessionService;
@ -65,10 +72,17 @@ export class EnhancedDataServerPlugin implements Plugin<void, void, SetupDepende
const router = core.http.createRouter();
registerSessionRoutes(router);
this.sessionService.setup(core, {
taskManager: deps.taskManager,
});
}
public start(core: CoreStart) {
this.sessionService.start(core, this.initializerContext.config.create());
public start(core: CoreStart, { taskManager }: StartDependencies) {
this.sessionService.start(core, {
taskManager,
config$: this.initializerContext.config.create(),
});
}
public stop() {

View file

@ -0,0 +1,191 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { checkRunningSessions } from './check_running_sessions';
import { SearchSessionStatus, SearchSessionSavedObjectAttributes } from '../../../common';
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import type { SavedObjectsClientContract } from 'kibana/server';
import { SearchStatus } from './types';
describe('getSearchStatus', () => {
let mockClient: any;
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
const mockLogger: any = {
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
beforeEach(() => {
savedObjectsClient = savedObjectsClientMock.create();
mockClient = {
asyncSearch: {
status: jest.fn(),
},
};
});
test('does nothing if there are no open sessions', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
savedObjectsClient.find.mockResolvedValue({
saved_objects: [],
total: 0,
} as any);
await checkRunningSessions(savedObjectsClient, mockClient, mockLogger);
expect(savedObjectsClient.bulkUpdate).not.toBeCalled();
});
test('does nothing if there are no searchIds in the saved object', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
savedObjectsClient.find.mockResolvedValue({
saved_objects: [
{
attributes: {
idMapping: {},
},
},
],
total: 1,
} as any);
await checkRunningSessions(savedObjectsClient, mockClient, mockLogger);
expect(savedObjectsClient.bulkUpdate).not.toBeCalled();
});
test('does nothing if the search is still running', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
attributes: {
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [so],
total: 1,
} as any);
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: true,
is_running: true,
},
});
await checkRunningSessions(savedObjectsClient, mockClient, mockLogger);
expect(savedObjectsClient.bulkUpdate).not.toBeCalled();
});
test("doesn't re-check completed or errored searches", async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
attributes: {
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.COMPLETE,
},
'another-search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.ERROR,
},
},
},
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [so],
total: 1,
} as any);
await checkRunningSessions(savedObjectsClient, mockClient, mockLogger);
expect(mockClient.asyncSearch.status).not.toBeCalled();
});
test('updates to complete if the search is done', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
attributes: {
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [so],
total: 1,
} as any);
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 200,
},
});
await checkRunningSessions(savedObjectsClient, mockClient, mockLogger);
expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' });
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];
const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes;
expect(updatedAttributes.status).toBe(SearchSessionStatus.COMPLETE);
expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.COMPLETE);
expect(updatedAttributes.idMapping['search-hash'].error).toBeUndefined();
});
test('updates to error if the search is errored', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
attributes: {
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [so],
total: 1,
} as any);
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 500,
},
});
await checkRunningSessions(savedObjectsClient, mockClient, mockLogger);
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];
const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes;
expect(updatedAttributes.status).toBe(SearchSessionStatus.ERROR);
expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR);
expect(updatedAttributes.idMapping['search-hash'].error).toBe(
'Search completed with a 500 status'
);
});
});

View file

@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import {
Logger,
ElasticsearchClient,
SavedObjectsFindResult,
SavedObjectsClientContract,
} from 'kibana/server';
import {
SearchSessionStatus,
SearchSessionSavedObjectAttributes,
SearchSessionRequestInfo,
} from '../../../common';
import { SEARCH_SESSION_TYPE } from '../../saved_objects';
import { getSearchStatus } from './get_search_status';
import { getSessionStatus } from './get_session_status';
import { SearchStatus } from './types';
export async function checkRunningSessions(
savedObjectsClient: SavedObjectsClientContract,
client: ElasticsearchClient,
logger: Logger
): Promise<void> {
try {
const runningSearchSessionsResponse = await savedObjectsClient.find<SearchSessionSavedObjectAttributes>(
{
type: SEARCH_SESSION_TYPE,
search: SearchSessionStatus.IN_PROGRESS.toString(),
searchFields: ['status'],
namespaces: ['*'],
}
);
if (!runningSearchSessionsResponse.total) return;
logger.debug(`Found ${runningSearchSessionsResponse.total} running sessions`);
const updatedSessions = new Array<SavedObjectsFindResult<SearchSessionSavedObjectAttributes>>();
let sessionUpdated = false;
await Promise.all(
runningSearchSessionsResponse.saved_objects.map(async (session) => {
// Check statuses of all running searches
await Promise.all(
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
const updateSearchRequest = (
currentStatus: Pick<SearchSessionRequestInfo, 'status' | 'error'>
) => {
sessionUpdated = true;
session.attributes.idMapping[searchKey] = {
...session.attributes.idMapping[searchKey],
...currentStatus,
};
};
const searchInfo = session.attributes.idMapping[searchKey];
if (searchInfo.status === SearchStatus.IN_PROGRESS) {
try {
const currentStatus = await getSearchStatus(client, searchInfo.id);
if (currentStatus.status !== SearchStatus.IN_PROGRESS) {
updateSearchRequest(currentStatus);
}
} catch (e) {
logger.error(e);
updateSearchRequest({
status: SearchStatus.ERROR,
error: e.message || e.meta.error?.caused_by?.reason,
});
}
}
})
);
// And only then derive the session's status
const sessionStatus = getSessionStatus(session.attributes);
if (sessionStatus !== SearchSessionStatus.IN_PROGRESS) {
session.attributes.status = sessionStatus;
sessionUpdated = true;
}
if (sessionUpdated) {
updatedSessions.push(session);
}
})
);
if (updatedSessions.length) {
// If there's an error, we'll try again in the next iteration, so there's no need to check the output.
const updatedResponse = await savedObjectsClient.bulkUpdate<SearchSessionSavedObjectAttributes>(
updatedSessions
);
logger.debug(`Updated ${updatedResponse.saved_objects.length} background sessions`);
}
} catch (err) {
logger.error(err);
}
}

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export const INMEM_MAX_SESSIONS = 10000;
export const DEFAULT_EXPIRATION = 7 * 24 * 60 * 60 * 1000;
export const INMEM_TRACKING_INTERVAL = 10 * 1000;
export const INMEM_TRACKING_TIMEOUT_SEC = 60;
export const MAX_UPDATE_RETRIES = 3;

View file

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchStatus } from './types';
import { getSearchStatus } from './get_search_status';
describe('getSearchStatus', () => {
let mockClient: any;
beforeEach(() => {
mockClient = {
asyncSearch: {
status: jest.fn(),
},
};
});
test('returns an error status if search is partial and not running', () => {
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: true,
is_running: false,
completion_status: 200,
},
});
expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR);
});
test('returns an error status if completion_status is an error', () => {
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 500,
},
});
expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR);
});
test('returns an error status if gets an ES error', () => {
mockClient.asyncSearch.status.mockResolvedValue({
error: {
root_cause: {
reason: 'not found',
},
},
});
expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR);
});
test('returns an error status throws', () => {
mockClient.asyncSearch.status.mockRejectedValue(new Error('O_o'));
expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.ERROR);
});
test('returns a complete status', () => {
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 200,
},
});
expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.COMPLETE);
});
test('returns a running status otherwise', () => {
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: true,
completion_status: undefined,
},
});
expect(getSearchStatus(mockClient, '123')).resolves.toBe(SearchStatus.IN_PROGRESS);
});
});

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { ApiResponse } from '@elastic/elasticsearch';
import { ElasticsearchClient } from 'src/core/server';
import { SearchStatus } from './types';
import { AsyncSearchStatusResponse } from '../types';
import { SearchSessionRequestInfo } from '../../../common';
export async function getSearchStatus(
client: ElasticsearchClient,
asyncId: string
): Promise<Pick<SearchSessionRequestInfo, 'status' | 'error'>> {
// TODO: Handle strategies other than the default one
const apiResponse: ApiResponse<AsyncSearchStatusResponse> = await client.asyncSearch.status({
id: asyncId,
});
const response = apiResponse.body;
if ((response.is_partial && !response.is_running) || response.completion_status >= 400) {
return {
status: SearchStatus.ERROR,
error: i18n.translate('xpack.data.search.statusError', {
defaultMessage: `Search completed with a {errorCode} status`,
values: { errorCode: response.completion_status },
}),
};
} else if (!response.is_partial && !response.is_running) {
return {
status: SearchStatus.COMPLETE,
error: undefined,
};
} else {
return {
status: SearchStatus.IN_PROGRESS,
error: undefined,
};
}
}

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchStatus } from './types';
import { getSessionStatus } from './get_session_status';
import { SearchSessionStatus } from '../../../common';
describe('getSessionStatus', () => {
test("returns an in_progress status if there's nothing inside the session", () => {
const session: any = {
idMapping: {},
};
expect(getSessionStatus(session)).toBe(SearchSessionStatus.IN_PROGRESS);
});
test("returns an error status if there's at least one error", () => {
const session: any = {
idMapping: {
a: { status: SearchStatus.IN_PROGRESS },
b: { status: SearchStatus.ERROR, error: 'Nope' },
c: { status: SearchStatus.COMPLETE },
},
};
expect(getSessionStatus(session)).toBe(SearchSessionStatus.ERROR);
});
test('returns a complete status if all are complete', () => {
const session: any = {
idMapping: {
a: { status: SearchStatus.COMPLETE },
b: { status: SearchStatus.COMPLETE },
c: { status: SearchStatus.COMPLETE },
},
};
expect(getSessionStatus(session)).toBe(SearchSessionStatus.COMPLETE);
});
test('returns a running status if some are still running', () => {
const session: any = {
idMapping: {
a: { status: SearchStatus.IN_PROGRESS },
b: { status: SearchStatus.COMPLETE },
c: { status: SearchStatus.IN_PROGRESS },
},
};
expect(getSessionStatus(session)).toBe(SearchSessionStatus.IN_PROGRESS);
});
});

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchSessionSavedObjectAttributes, SearchSessionStatus } from '../../../common';
import { SearchStatus } from './types';
export function getSessionStatus(session: SearchSessionSavedObjectAttributes): SearchSessionStatus {
const searchStatuses = Object.values(session.idMapping);
if (searchStatuses.some((item) => item.status === SearchStatus.ERROR)) {
return SearchSessionStatus.ERROR;
} else if (
searchStatuses.length > 0 &&
searchStatuses.every((item) => item.status === SearchStatus.COMPLETE)
) {
return SearchSessionStatus.COMPLETE;
} else {
return SearchSessionStatus.IN_PROGRESS;
}
}

View file

@ -5,3 +5,4 @@
*/
export * from './session_service';
export { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task';

View file

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import {
TaskManagerSetupContract,
TaskManagerStartContract,
RunContext,
} from '../../../../task_manager/server';
import { checkRunningSessions } from './check_running_sessions';
import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server';
import { SEARCH_SESSION_TYPE } from '../../saved_objects';
export const SEARCH_SESSIONS_TASK_TYPE = 'bg_monitor';
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
export const MONITOR_INTERVAL = 15; // in seconds
function searchSessionRunner(core: CoreSetup, logger: Logger) {
return ({ taskInstance }: RunContext) => {
return {
async run() {
const [coreStart] = await core.getStartServices();
const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]);
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
await checkRunningSessions(
internalSavedObjectsClient,
coreStart.elasticsearch.client.asInternalUser,
logger
);
return {
runAt: new Date(Date.now() + MONITOR_INTERVAL * 1000),
state: {},
};
},
};
};
}
export function registerSearchSessionsTask(
core: CoreSetup,
taskManager: TaskManagerSetupContract,
logger: Logger
) {
taskManager.registerTaskDefinitions({
[SEARCH_SESSIONS_TASK_TYPE]: {
title: 'Search Sessions Monitor',
createTaskRunner: searchSessionRunner(core, logger),
},
});
}
export async function scheduleSearchSessionsTasks(
taskManager: TaskManagerStartContract,
logger: Logger
) {
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
try {
await taskManager.ensureScheduled({
id: SEARCH_SESSIONS_TASK_ID,
taskType: SEARCH_SESSIONS_TASK_TYPE,
schedule: {
interval: `${MONITOR_INTERVAL}s`,
},
state: {},
params: {},
});
logger.debug(`Background search task, scheduled to run`);
} catch (e) {
logger.debug(`Error scheduling task, received ${e.message}`);
}
}

View file

@ -10,17 +10,15 @@ import type { SearchStrategyDependencies } from '../../../../../../src/plugins/d
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import { SearchSessionStatus } from '../../../common';
import { SEARCH_SESSION_TYPE } from '../../saved_objects';
import {
SearchSessionDependencies,
SearchSessionService,
INMEM_TRACKING_INTERVAL,
MAX_UPDATE_RETRIES,
SessionInfo,
} from './session_service';
import { SearchSessionDependencies, SearchSessionService, SessionInfo } from './session_service';
import { createRequestHash } from './utils';
import moment from 'moment';
import { coreMock } from 'src/core/server/mocks';
import { ConfigSchema } from '../../../config';
// @ts-ignore
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { INMEM_TRACKING_INTERVAL, MAX_UPDATE_RETRIES } from './constants';
import { SearchStatus } from './types';
const flushPromises = () => new Promise((resolve) => setImmediate(resolve));
@ -340,6 +338,7 @@ describe('SearchSessionService', () => {
[requestHash]: {
id: searchId,
strategy: MOCK_STRATEGY,
status: SearchStatus.IN_PROGRESS,
},
},
});
@ -421,7 +420,11 @@ describe('SearchSessionService', () => {
},
},
});
await service.start(coreMock.createStart(), config$);
const mockTaskManager = taskManagerMock.createStart();
await service.start(coreMock.createStart(), {
config$,
taskManager: mockTaskManager,
});
await flushPromises();
});

View file

@ -14,7 +14,9 @@ import {
SavedObjectsClientContract,
Logger,
SavedObject,
CoreSetup,
SavedObjectsBulkUpdateObject,
SavedObjectsFindOptions,
} from '../../../../../../src/core/server';
import {
IKibanaSearchRequest,
@ -29,21 +31,27 @@ import {
ISessionService,
SearchStrategyDependencies,
} from '../../../../../../src/plugins/data/server';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import {
SearchSessionSavedObjectAttributes,
SearchSessionFindOptions,
SearchSessionRequestInfo,
SearchSessionStatus,
} from '../../../common';
import { SEARCH_SESSION_TYPE } from '../../saved_objects';
import { createRequestHash } from './utils';
import { ConfigSchema } from '../../../config';
const INMEM_MAX_SESSIONS = 10000;
const DEFAULT_EXPIRATION = 7 * 24 * 60 * 60 * 1000;
export const INMEM_TRACKING_INTERVAL = 10 * 1000;
export const INMEM_TRACKING_TIMEOUT_SEC = 60;
export const MAX_UPDATE_RETRIES = 3;
import { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task';
import {
DEFAULT_EXPIRATION,
INMEM_MAX_SESSIONS,
INMEM_TRACKING_INTERVAL,
INMEM_TRACKING_TIMEOUT_SEC,
MAX_UPDATE_RETRIES,
} from './constants';
import { SearchStatus } from './types';
export interface SearchSessionDependencies {
savedObjectsClient: SavedObjectsClientContract;
@ -55,6 +63,14 @@ export interface SessionInfo {
ids: Map<string, SearchSessionRequestInfo>;
}
interface SetupDependencies {
taskManager: TaskManagerSetupContract;
}
interface StartDependencies {
taskManager: TaskManagerStartContract;
config$: Observable<ConfigSchema>;
}
export class SearchSessionService implements ISessionService {
/**
* Map of sessionId to { [requestHash]: searchId }
@ -66,8 +82,12 @@ export class SearchSessionService implements ISessionService {
constructor(private readonly logger: Logger) {}
public async start(core: CoreStart, config$: Observable<ConfigSchema>) {
return this.setupMonitoring(core, config$);
public setup(core: CoreSetup, deps: SetupDependencies) {
registerSearchSessionsTask(core, deps.taskManager, this.logger);
}
public async start(core: CoreStart, deps: StartDependencies) {
return this.setupMonitoring(core, deps);
}
public stop() {
@ -75,9 +95,10 @@ export class SearchSessionService implements ISessionService {
clearTimeout(this.monitorTimer);
}
private setupMonitoring = async (core: CoreStart, config$: Observable<ConfigSchema>) => {
const config = await config$.pipe(first()).toPromise();
private setupMonitoring = async (core: CoreStart, deps: StartDependencies) => {
const config = await deps.config$.pipe(first()).toPromise();
if (config.search.sendToBackground.enabled) {
scheduleSearchSessionsTasks(deps.taskManager, this.logger);
this.logger.debug(`setupMonitoring | Enabling monitoring`);
const internalRepo = core.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]);
this.internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
@ -281,7 +302,7 @@ export class SearchSessionService implements ISessionService {
// TODO: Throw an error if this session doesn't belong to this user
public find = (
options: SearchSessionFindOptions,
options: Omit<SavedObjectsFindOptions, 'type'>,
{ savedObjectsClient }: SearchSessionDependencies
) => {
return savedObjectsClient.find<SearchSessionSavedObjectAttributes>({
@ -326,6 +347,7 @@ export class SearchSessionService implements ISessionService {
const searchInfo = {
id: searchId,
strategy: strategy!,
status: SearchStatus.IN_PROGRESS,
};
// If there is already a saved object for this session, update it to include this request/ID.
@ -387,7 +409,7 @@ export class SearchSessionService implements ISessionService {
save: (sessionId: string, attributes: Partial<SearchSessionSavedObjectAttributes>) =>
this.save(sessionId, attributes, deps),
get: (sessionId: string) => this.get(sessionId, deps),
find: (options: SearchSessionFindOptions) => this.find(options, deps),
find: (options: SavedObjectsFindOptions) => this.find(options, deps),
update: (sessionId: string, attributes: Partial<SearchSessionSavedObjectAttributes>) =>
this.update(sessionId, attributes, deps),
delete: (sessionId: string) => this.delete(sessionId, deps),

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export enum SearchStatus {
IN_PROGRESS = 'in_progress',
ERROR = 'error',
COMPLETE = 'complete',
}

View file

@ -4,14 +4,20 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchResponse } from 'elasticsearch';
import { SearchResponse, ShardsResponse } from 'elasticsearch';
export interface AsyncSearchResponse<T = unknown> {
id?: string;
response: SearchResponse<T>;
start_time_in_millis: number;
expiration_time_in_millis: number;
is_partial: boolean;
is_running: boolean;
}
export interface AsyncSearchStatusResponse extends Omit<AsyncSearchResponse, 'response'> {
completion_status: number;
_shards: ShardsResponse;
}
export interface EqlSearchResponse<T = unknown> extends SearchResponse<T> {
id?: string;

View file

@ -22,6 +22,7 @@
{ "path": "../../../src/plugins/kibana_react/tsconfig.json" },
{ "path": "../../../src/plugins/kibana_utils/tsconfig.json" },
{ "path": "../../../src/plugins/usage_collection/tsconfig.json" },
{ "path": "../task_manager/tsconfig.json" },
{ "path": "../features/tsconfig.json" },
]

View file

@ -5,40 +5,40 @@
*/
import uuid from 'uuid';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import { deleteTaskIfItExists } from './delete_task_if_it_exists';
import { removeIfExists } from './remove_if_exists';
import { taskStoreMock } from '../task_store.mock';
describe('deleteTaskIfItExists', () => {
describe('removeIfExists', () => {
test('removes the task by its ID', async () => {
const tm = taskManagerMock.createStart();
const ts = taskStoreMock.create({});
const id = uuid.v4();
expect(await deleteTaskIfItExists(tm, id)).toBe(undefined);
expect(await removeIfExists(ts, id)).toBe(undefined);
expect(tm.remove).toHaveBeenCalledWith(id);
expect(ts.remove).toHaveBeenCalledWith(id);
});
test('handles 404 errors caused by the task not existing', async () => {
const tm = taskManagerMock.createStart();
const ts = taskStoreMock.create({});
const id = uuid.v4();
tm.remove.mockRejectedValue(SavedObjectsErrorHelpers.createGenericNotFoundError('task', id));
ts.remove.mockRejectedValue(SavedObjectsErrorHelpers.createGenericNotFoundError('task', id));
expect(await deleteTaskIfItExists(tm, id)).toBe(undefined);
expect(await removeIfExists(ts, id)).toBe(undefined);
expect(tm.remove).toHaveBeenCalledWith(id);
expect(ts.remove).toHaveBeenCalledWith(id);
});
test('throws if any other errro is caused by task removal', async () => {
const tm = taskManagerMock.createStart();
const ts = taskStoreMock.create({});
const id = uuid.v4();
const error = SavedObjectsErrorHelpers.createInvalidVersionError(uuid.v4());
tm.remove.mockRejectedValue(error);
ts.remove.mockRejectedValue(error);
expect(deleteTaskIfItExists(tm, id)).rejects.toBe(error);
expect(removeIfExists(ts, id)).rejects.toBe(error);
expect(tm.remove).toHaveBeenCalledWith(id);
expect(ts.remove).toHaveBeenCalledWith(id);
});
});

View file

@ -3,12 +3,19 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TaskManagerStartContract } from '../../../task_manager/server';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import { TaskStore } from '../task_store';
export async function deleteTaskIfItExists(taskManager: TaskManagerStartContract, taskId: string) {
/**
* Removes a task from the store, ignoring a not found error
* Other errors are re-thrown
*
* @param taskStore
* @param taskId
*/
export async function removeIfExists(taskStore: TaskStore, taskId: string) {
try {
await taskManager.remove(taskId);
await taskStore.remove(taskId);
} catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
throw err;

View file

@ -22,6 +22,7 @@ const createStartMock = () => {
schedule: jest.fn(),
runNow: jest.fn(),
ensureScheduled: jest.fn(),
removeIfExists: jest.fn(),
};
return mock;
};

View file

@ -18,6 +18,7 @@ import { TaskDefinition } from './task';
import { TaskPollingLifecycle } from './polling_lifecycle';
import { TaskManagerConfig } from './config';
import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware';
import { removeIfExists } from './lib/remove_if_exists';
import { setupSavedObjects } from './saved_objects';
import { TaskTypeDictionary } from './task_type_dictionary';
import { FetchResult, SearchOpts, TaskStore } from './task_store';
@ -35,7 +36,9 @@ export type TaskManagerStartContract = Pick<
TaskScheduling,
'schedule' | 'runNow' | 'ensureScheduled'
> &
Pick<TaskStore, 'fetch' | 'get' | 'remove'>;
Pick<TaskStore, 'fetch' | 'get' | 'remove'> & {
removeIfExists: TaskStore['remove'];
};
export class TaskManagerPlugin
implements Plugin<TaskManagerSetupContract, TaskManagerStartContract> {
@ -156,6 +159,7 @@ export class TaskManagerPlugin
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),
get: (id: string) => taskStore.get(id),
remove: (id: string) => taskStore.remove(id),
removeIfExists: (id: string) => removeIfExists(taskStore, id),
schedule: (...args) => taskScheduling.schedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runNow: (...args) => taskScheduling.runNow(...args),