[Search Sessions] Split tasks (#99967) (#103212)

* cancel the previous session

* split to 3 tasks

* fixes

* cancellation

* updated tests

* split out and improve jest tests

* cleanup previous session properly

* don't fail delete and cancel if item was already cleaned up

* test

* test

* ignore resource_not_found_exception when deleting an already cleared \ expired async search

* jest

* update jest

* api int

* fix jest

* testssss

* Code review @dosant

* types

* remove any

* Fix merge

* type

* test

* jest

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>

Co-authored-by: Liza Katz <lizka.k@gmail.com>
Co-authored-by: Liza K <liza.katz@elastic.co>
This commit is contained in:
Kibana Machine 2021-06-28 16:26:15 -04:00 committed by GitHub
parent f8d131f035
commit f6bd243406
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 1446 additions and 651 deletions

View file

@ -44,10 +44,20 @@ export const searchSessionsConfigSchema = schema.object({
*/
pageSize: schema.number({ defaultValue: 100 }),
/**
* trackingInterval controls how often we track search session objects progress
* trackingInterval controls how often we track persisted search session objects progress
*/
trackingInterval: schema.duration({ defaultValue: '10s' }),
/**
* cleanupInterval controls how often we track non-persisted search session objects for cleanup
*/
cleanupInterval: schema.duration({ defaultValue: '60s' }),
/**
* expireInterval controls how often we track persisted search session objects for expiration
*/
expireInterval: schema.duration({ defaultValue: '60m' }),
/**
* monitoringTaskTimeout controls for how long task manager waits for search session monitoring task to complete before considering it timed out,
* If tasks timeouts it receives cancel signal and next task starts in "trackingInterval" time

View file

@ -98,6 +98,14 @@ describe('Session service', () => {
expect(nowProvider.reset).toHaveBeenCalled();
});
it("Can clear other apps' session", async () => {
sessionService.start();
expect(sessionService.getSessionId()).not.toBeUndefined();
currentAppId$.next('change');
sessionService.clear();
expect(sessionService.getSessionId()).toBeUndefined();
});
it("Can start a new session in case there is other apps' stale session", async () => {
const s1 = sessionService.start();
expect(sessionService.getSessionId()).not.toBeUndefined();

View file

@ -5,10 +5,7 @@
* 2.0.
*/
import {
checkRunningSessions as checkRunningSessions$,
CheckRunningSessionsDeps,
} from './check_running_sessions';
import { checkNonPersistedSessions as checkNonPersistedSessions$ } from './check_non_persiseted_sessions';
import {
SearchSessionStatus,
SearchSessionSavedObjectAttributes,
@ -16,22 +13,20 @@ import {
EQL_SEARCH_STRATEGY,
} from '../../../../../../src/plugins/data/common';
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import { SearchSessionsConfig, SearchStatus } from './types';
import { SearchSessionsConfig, CheckSearchSessionsDeps, SearchStatus } from './types';
import moment from 'moment';
import {
SavedObjectsBulkUpdateObject,
SavedObjectsDeleteOptions,
SavedObjectsClientContract,
} from '../../../../../../src/core/server';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
jest.useFakeTimers();
const checkRunningSessions = (deps: CheckRunningSessionsDeps, config: SearchSessionsConfig) =>
checkRunningSessions$(deps, config).toPromise();
const checkNonPersistedSessions = (deps: CheckSearchSessionsDeps, config: SearchSessionsConfig) =>
checkNonPersistedSessions$(deps, config).toPromise();
describe('getSearchStatus', () => {
describe('checkNonPersistedSessions', () => {
let mockClient: any;
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
const config: SearchSessionsConfig = {
@ -42,7 +37,9 @@ describe('getSearchStatus', () => {
maxUpdateRetries: 3,
defaultExpiration: moment.duration(7, 'd'),
trackingInterval: moment.duration(10, 's'),
expireInterval: moment.duration(10, 'm'),
monitoringTaskTimeout: moment.duration(5, 'm'),
cleanupInterval: moment.duration(10, 's'),
management: {} as any,
};
const mockLogger: any = {
@ -51,16 +48,6 @@ describe('getSearchStatus', () => {
error: jest.fn(),
};
const emptySO = {
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(10, 's')),
idMapping: {},
},
};
beforeEach(() => {
savedObjectsClient = savedObjectsClientMock.create();
mockClient = {
@ -81,7 +68,7 @@ describe('getSearchStatus', () => {
total: 0,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -94,240 +81,7 @@ describe('getSearchStatus', () => {
expect(savedObjectsClient.delete).not.toBeCalled();
});
describe('pagination', () => {
test('fetches one page if not objects exist', async () => {
savedObjectsClient.find.mockResolvedValueOnce({
saved_objects: [],
total: 0,
} as any);
await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);
expect(savedObjectsClient.find).toHaveBeenCalledTimes(1);
});
test('fetches one page if less than page size object are returned', async () => {
savedObjectsClient.find.mockResolvedValueOnce({
saved_objects: [emptySO, emptySO],
total: 5,
} as any);
await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);
expect(savedObjectsClient.find).toHaveBeenCalledTimes(1);
});
test('fetches two pages if exactly page size objects are returned', async () => {
let i = 0;
savedObjectsClient.find.mockImplementation(() => {
return new Promise((resolve) => {
resolve({
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
total: 5,
page: i,
} as any);
});
});
await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
// validate that page number increases
const { page: page1 } = savedObjectsClient.find.mock.calls[0][0];
const { page: page2 } = savedObjectsClient.find.mock.calls[1][0];
expect(page1).toBe(1);
expect(page2).toBe(2);
});
test('fetches two pages if page size +1 objects are returned', async () => {
let i = 0;
savedObjectsClient.find.mockImplementation(() => {
return new Promise((resolve) => {
resolve({
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [emptySO],
total: 5,
page: i,
} as any);
});
});
await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
});
test('fetching is abortable', async () => {
let i = 0;
const abort$ = new Subject();
savedObjectsClient.find.mockImplementation(() => {
return new Promise((resolve) => {
if (++i === 2) {
abort$.next();
}
resolve({
saved_objects: i <= 5 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
total: 25,
page: i,
} as any);
});
});
await checkRunningSessions$(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
)
.pipe(takeUntil(abort$))
.toPromise();
jest.runAllTimers();
// if not for `abort$` then this would be called 6 times!
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
});
test('sorting is by "touched"', async () => {
savedObjectsClient.find.mockResolvedValueOnce({
saved_objects: [],
total: 0,
} as any);
await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);
expect(savedObjectsClient.find).toHaveBeenCalledWith(
expect.objectContaining({ sortField: 'touched', sortOrder: 'asc' })
);
});
test('sessions fetched in the beginning are processed even if sessions in the end fail', async () => {
let i = 0;
savedObjectsClient.find.mockImplementation(() => {
return new Promise((resolve, reject) => {
if (++i === 2) {
reject(new Error('Fake find error...'));
}
resolve({
saved_objects:
i <= 5
? [
i === 1
? {
id: '123',
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(2, 'm')),
idMapping: {
'map-key': {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
id: 'async-id',
},
},
},
}
: emptySO,
emptySO,
emptySO,
emptySO,
emptySO,
]
: [],
total: 25,
page: i,
} as any);
});
});
await checkRunningSessions$(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
).toPromise();
jest.runAllTimers();
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
// by checking that delete was called we validate that sessions from session that were successfully fetched were processed
expect(mockClient.asyncSearch.delete).toBeCalled();
const { id } = mockClient.asyncSearch.delete.mock.calls[0][0];
expect(id).toBe('async-id');
});
});
describe('delete', () => {
test('doesnt delete a persisted session', async () => {
savedObjectsClient.find.mockResolvedValue({
saved_objects: [
{
id: '123',
attributes: {
persisted: true,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(30, 'm')),
touched: moment().subtract(moment.duration(10, 'm')),
idMapping: {},
},
},
],
total: 1,
} as any);
await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);
expect(savedObjectsClient.bulkUpdate).not.toBeCalled();
expect(savedObjectsClient.delete).not.toBeCalled();
});
test('doesnt delete a non persisted, recently touched session', async () => {
savedObjectsClient.find.mockResolvedValue({
saved_objects: [
@ -336,6 +90,7 @@ describe('getSearchStatus', () => {
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
expires: moment().add(moment.duration(3, 'm')),
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(10, 's')),
idMapping: {},
@ -344,7 +99,7 @@ describe('getSearchStatus', () => {
],
total: 1,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -367,6 +122,7 @@ describe('getSearchStatus', () => {
status: SearchSessionStatus.COMPLETE,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(1, 'm')),
expires: moment().add(moment.duration(3, 'm')),
idMapping: {
'search-hash': {
id: 'search-id',
@ -379,7 +135,7 @@ describe('getSearchStatus', () => {
],
total: 1,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -401,6 +157,7 @@ describe('getSearchStatus', () => {
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
expires: moment().add(moment.duration(3, 'm')),
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(2, 'm')),
idMapping: {
@ -415,7 +172,7 @@ describe('getSearchStatus', () => {
total: 1,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -441,6 +198,7 @@ describe('getSearchStatus', () => {
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(2, 'm')),
expires: moment().add(moment.duration(3, 'm')),
idMapping: {
'map-key': {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
@ -453,7 +211,7 @@ describe('getSearchStatus', () => {
total: 1,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -481,6 +239,7 @@ describe('getSearchStatus', () => {
attributes: {
persisted: false,
status: SearchSessionStatus.COMPLETE,
expires: moment().add(moment.duration(3, 'm')),
created: moment().subtract(moment.duration(30, 'm')),
touched: moment().subtract(moment.duration(6, 'm')),
idMapping: {
@ -501,7 +260,7 @@ describe('getSearchStatus', () => {
total: 1,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -530,6 +289,7 @@ describe('getSearchStatus', () => {
attributes: {
persisted: false,
status: SearchSessionStatus.COMPLETE,
expires: moment().add(moment.duration(3, 'm')),
created: moment().subtract(moment.duration(30, 'm')),
touched: moment().subtract(moment.duration(6, 'm')),
idMapping: {
@ -545,7 +305,7 @@ describe('getSearchStatus', () => {
total: 1,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -573,6 +333,7 @@ describe('getSearchStatus', () => {
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(10, 's')),
expires: moment().add(moment.duration(3, 'm')),
idMapping: {
'search-hash': {
id: 'search-id',
@ -594,7 +355,7 @@ describe('getSearchStatus', () => {
},
});
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -614,6 +375,7 @@ describe('getSearchStatus', () => {
id: '123',
attributes: {
status: SearchSessionStatus.ERROR,
expires: moment().add(moment.duration(3, 'm')),
idMapping: {
'search-hash': {
id: 'search-id',
@ -633,7 +395,7 @@ describe('getSearchStatus', () => {
total: 1,
} as any);
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -653,6 +415,7 @@ describe('getSearchStatus', () => {
namespaces: ['awesome'],
attributes: {
status: SearchSessionStatus.IN_PROGRESS,
expires: moment().add(moment.duration(3, 'm')),
touched: '123',
idMapping: {
'search-hash': {
@ -676,7 +439,7 @@ describe('getSearchStatus', () => {
},
});
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -696,6 +459,7 @@ describe('getSearchStatus', () => {
const so = {
attributes: {
status: SearchSessionStatus.IN_PROGRESS,
expires: moment().add(moment.duration(3, 'm')),
touched: '123',
idMapping: {
'search-hash': {
@ -719,7 +483,7 @@ describe('getSearchStatus', () => {
},
});
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,
@ -744,6 +508,7 @@ describe('getSearchStatus', () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
attributes: {
expires: moment().add(moment.duration(3, 'm')),
idMapping: {
'search-hash': {
id: 'search-id',
@ -766,7 +531,7 @@ describe('getSearchStatus', () => {
},
});
await checkRunningSessions(
await checkNonPersistedSessions(
{
savedObjectsClient,
client: mockClient,

View file

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsFindResult } from 'kibana/server';
import moment from 'moment';
import { EMPTY } from 'rxjs';
import { catchError, concatMap } from 'rxjs/operators';
import {
nodeBuilder,
ENHANCED_ES_SEARCH_STRATEGY,
SEARCH_SESSION_TYPE,
SearchSessionSavedObjectAttributes,
SearchSessionStatus,
KueryNode,
} from '../../../../../../src/plugins/data/common';
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
import { SearchSessionsConfig, CheckSearchSessionsDeps } from './types';
import { bulkUpdateSessions, getAllSessionsStatusUpdates } from './update_session_status';
export const SEARCH_SESSIONS_CLEANUP_TASK_TYPE = 'search_sessions_cleanup';
export const SEARCH_SESSIONS_CLEANUP_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_CLEANUP_TASK_TYPE}`;
function isSessionStale(
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>,
config: SearchSessionsConfig
) {
const curTime = moment();
// Delete cancelled sessions immediately
if (session.attributes.status === SearchSessionStatus.CANCELLED) return true;
// Delete if a running session wasn't polled for in the last notTouchedInProgressTimeout OR
// if a completed \ errored \ canceled session wasn't saved for within notTouchedTimeout
return (
(session.attributes.status === SearchSessionStatus.IN_PROGRESS &&
curTime.diff(moment(session.attributes.touched), 'ms') >
config.notTouchedInProgressTimeout.asMilliseconds()) ||
(session.attributes.status !== SearchSessionStatus.IN_PROGRESS &&
curTime.diff(moment(session.attributes.touched), 'ms') >
config.notTouchedTimeout.asMilliseconds())
);
}
function checkNonPersistedSessionsPage(
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig,
filter: KueryNode,
page: number
) {
const { logger, client, savedObjectsClient } = deps;
logger.debug(`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Fetching sessions from page ${page}`);
return getSearchSessionsPage$(deps, filter, config.pageSize, page).pipe(
concatMap(async (nonPersistedSearchSessions) => {
if (!nonPersistedSearchSessions.total) return nonPersistedSearchSessions;
logger.debug(
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Found ${nonPersistedSearchSessions.total} sessions, processing ${nonPersistedSearchSessions.saved_objects.length}`
);
const updatedSessions = await getAllSessionsStatusUpdates(deps, nonPersistedSearchSessions);
const deletedSessionIds: string[] = [];
await Promise.all(
nonPersistedSearchSessions.saved_objects.map(async (session) => {
if (isSessionStale(session, config)) {
// delete saved object to free up memory
// TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session!
// Maybe we want to change state to deleted and cleanup later?
logger.debug(`Deleting stale session | ${session.id}`);
try {
deletedSessionIds.push(session.id);
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id, {
namespace: session.namespaces?.[0],
});
} catch (e) {
logger.error(
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Error while deleting session ${session.id}: ${e.message}`
);
}
// Send a delete request for each async search to ES
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
const searchInfo = session.attributes.idMapping[searchKey];
if (searchInfo.strategy === ENHANCED_ES_SEARCH_STRATEGY) {
try {
await client.asyncSearch.delete({ id: searchInfo.id });
} catch (e) {
if (e.message !== 'resource_not_found_exception') {
logger.error(
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Error while deleting async_search ${searchInfo.id}: ${e.message}`
);
}
}
}
});
}
})
);
const nonDeletedSessions = updatedSessions.filter((updateSession) => {
return deletedSessionIds.indexOf(updateSession.id) === -1;
});
await bulkUpdateSessions(deps, nonDeletedSessions);
return nonPersistedSearchSessions;
})
);
}
export function checkNonPersistedSessions(
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig
) {
const { logger } = deps;
const filters = nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'false');
return checkSearchSessionsByPage(checkNonPersistedSessionsPage, deps, config, filters).pipe(
catchError((e) => {
logger.error(
`${SEARCH_SESSIONS_CLEANUP_TASK_TYPE} Error while processing sessions: ${e?.message}`
);
return EMPTY;
})
);
}

View file

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { checkPersistedSessionsProgress } from './check_persisted_sessions';
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import { SearchSessionsConfig } from './types';
import moment from 'moment';
import { SavedObjectsClientContract } from '../../../../../../src/core/server';
describe('checkPersistedSessionsProgress', () => {
let mockClient: any;
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
const config: SearchSessionsConfig = {
enabled: true,
pageSize: 5,
notTouchedInProgressTimeout: moment.duration(1, 'm'),
notTouchedTimeout: moment.duration(5, 'm'),
maxUpdateRetries: 3,
defaultExpiration: moment.duration(7, 'd'),
trackingInterval: moment.duration(10, 's'),
cleanupInterval: moment.duration(10, 's'),
expireInterval: moment.duration(10, 'm'),
monitoringTaskTimeout: moment.duration(5, 'm'),
management: {} as any,
};
const mockLogger: any = {
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
beforeEach(() => {
savedObjectsClient = savedObjectsClientMock.create();
mockClient = {
asyncSearch: {
status: jest.fn(),
delete: jest.fn(),
},
eql: {
status: jest.fn(),
delete: jest.fn(),
},
};
});
test('fetches only running persisted sessions', async () => {
savedObjectsClient.find.mockResolvedValue({
saved_objects: [],
total: 0,
} as any);
await checkPersistedSessionsProgress(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);
const [findInput] = savedObjectsClient.find.mock.calls[0];
expect(findInput.filter.arguments[0].arguments[0].value).toBe(
'search-session.attributes.persisted'
);
expect(findInput.filter.arguments[0].arguments[1].value).toBe('true');
expect(findInput.filter.arguments[1].arguments[0].value).toBe(
'search-session.attributes.status'
);
expect(findInput.filter.arguments[1].arguments[1].value).toBe('in_progress');
});
});

View file

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { EMPTY, Observable } from 'rxjs';
import { catchError, concatMap } from 'rxjs/operators';
import {
nodeBuilder,
SEARCH_SESSION_TYPE,
SearchSessionStatus,
KueryNode,
} from '../../../../../../src/plugins/data/common';
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
import { SearchSessionsConfig, CheckSearchSessionsDeps, SearchSessionsResponse } from './types';
import { bulkUpdateSessions, getAllSessionsStatusUpdates } from './update_session_status';
export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor';
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
function checkPersistedSessionsPage(
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig,
filter: KueryNode,
page: number
): Observable<SearchSessionsResponse> {
const { logger } = deps;
logger.debug(`${SEARCH_SESSIONS_TASK_TYPE} Fetching sessions from page ${page}`);
return getSearchSessionsPage$(deps, filter, config.pageSize, page).pipe(
concatMap(async (persistedSearchSessions) => {
if (!persistedSearchSessions.total) return persistedSearchSessions;
logger.debug(
`${SEARCH_SESSIONS_TASK_TYPE} Found ${persistedSearchSessions.total} sessions, processing ${persistedSearchSessions.saved_objects.length}`
);
const updatedSessions = await getAllSessionsStatusUpdates(deps, persistedSearchSessions);
await bulkUpdateSessions(deps, updatedSessions);
return persistedSearchSessions;
})
);
}
export function checkPersistedSessionsProgress(
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig
) {
const { logger } = deps;
const persistedSessionsFilter = nodeBuilder.and([
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'true'),
nodeBuilder.is(
`${SEARCH_SESSION_TYPE}.attributes.status`,
SearchSessionStatus.IN_PROGRESS.toString()
),
]);
return checkSearchSessionsByPage(
checkPersistedSessionsPage,
deps,
config,
persistedSessionsFilter
).pipe(
catchError((e) => {
logger.error(`${SEARCH_SESSIONS_TASK_TYPE} Error while processing sessions: ${e?.message}`);
return EMPTY;
})
);
}

View file

@ -1,257 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
ElasticsearchClient,
Logger,
SavedObjectsClientContract,
SavedObjectsFindResult,
SavedObjectsUpdateResponse,
} from 'kibana/server';
import moment from 'moment';
import { EMPTY, from, Observable } from 'rxjs';
import { catchError, concatMap } from 'rxjs/operators';
import {
nodeBuilder,
ENHANCED_ES_SEARCH_STRATEGY,
SEARCH_SESSION_TYPE,
SearchSessionRequestInfo,
SearchSessionSavedObjectAttributes,
SearchSessionStatus,
} from '../../../../../../src/plugins/data/common';
import { getSearchStatus } from './get_search_status';
import { getSessionStatus } from './get_session_status';
import { SearchSessionsConfig, SearchStatus } from './types';
export interface CheckRunningSessionsDeps {
savedObjectsClient: SavedObjectsClientContract;
client: ElasticsearchClient;
logger: Logger;
}
function isSessionStale(
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>,
config: SearchSessionsConfig,
logger: Logger
) {
const curTime = moment();
// Delete if a running session wasn't polled for in the last notTouchedInProgressTimeout OR
// if a completed \ errored \ canceled session wasn't saved for within notTouchedTimeout
return (
(session.attributes.status === SearchSessionStatus.IN_PROGRESS &&
curTime.diff(moment(session.attributes.touched), 'ms') >
config.notTouchedInProgressTimeout.asMilliseconds()) ||
(session.attributes.status !== SearchSessionStatus.IN_PROGRESS &&
curTime.diff(moment(session.attributes.touched), 'ms') >
config.notTouchedTimeout.asMilliseconds())
);
}
async function updateSessionStatus(
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>,
client: ElasticsearchClient,
logger: Logger
) {
let sessionUpdated = false;
// Check statuses of all running searches
await Promise.all(
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
const updateSearchRequest = (
currentStatus: Pick<SearchSessionRequestInfo, 'status' | 'error'>
) => {
sessionUpdated = true;
session.attributes.idMapping[searchKey] = {
...session.attributes.idMapping[searchKey],
...currentStatus,
};
};
const searchInfo = session.attributes.idMapping[searchKey];
if (searchInfo.status === SearchStatus.IN_PROGRESS) {
try {
const currentStatus = await getSearchStatus(client, searchInfo.id);
if (currentStatus.status !== searchInfo.status) {
logger.debug(`search ${searchInfo.id} | status changed to ${currentStatus.status}`);
updateSearchRequest(currentStatus);
}
} catch (e) {
logger.error(e);
updateSearchRequest({
status: SearchStatus.ERROR,
error: e.message || e.meta.error?.caused_by?.reason,
});
}
}
})
);
// And only then derive the session's status
const sessionStatus = getSessionStatus(session.attributes);
if (sessionStatus !== session.attributes.status) {
const now = new Date().toISOString();
session.attributes.status = sessionStatus;
session.attributes.touched = now;
if (sessionStatus === SearchSessionStatus.COMPLETE) {
session.attributes.completed = now;
} else if (session.attributes.completed) {
session.attributes.completed = null;
}
sessionUpdated = true;
}
return sessionUpdated;
}
function getSavedSearchSessionsPage$(
{ savedObjectsClient, logger }: CheckRunningSessionsDeps,
config: SearchSessionsConfig,
page: number
) {
logger.debug(`Fetching saved search sessions page ${page}`);
return from(
savedObjectsClient.find<SearchSessionSavedObjectAttributes>({
page,
perPage: config.pageSize,
type: SEARCH_SESSION_TYPE,
namespaces: ['*'],
// process older sessions first
sortField: 'touched',
sortOrder: 'asc',
filter: nodeBuilder.or([
nodeBuilder.and([
nodeBuilder.is(
`${SEARCH_SESSION_TYPE}.attributes.status`,
SearchSessionStatus.IN_PROGRESS.toString()
),
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'true'),
]),
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'false'),
]),
})
);
}
function checkRunningSessionsPage(
deps: CheckRunningSessionsDeps,
config: SearchSessionsConfig,
page: number
) {
const { logger, client, savedObjectsClient } = deps;
return getSavedSearchSessionsPage$(deps, config, page).pipe(
concatMap(async (runningSearchSessionsResponse) => {
if (!runningSearchSessionsResponse.total) return;
logger.debug(
`Found ${runningSearchSessionsResponse.total} running sessions, processing ${runningSearchSessionsResponse.saved_objects.length} sessions from page ${page}`
);
const updatedSessions = new Array<
SavedObjectsFindResult<SearchSessionSavedObjectAttributes>
>();
await Promise.all(
runningSearchSessionsResponse.saved_objects.map(async (session) => {
const updated = await updateSessionStatus(session, client, logger);
let deleted = false;
if (!session.attributes.persisted) {
if (isSessionStale(session, config, logger)) {
// delete saved object to free up memory
// TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session!
// Maybe we want to change state to deleted and cleanup later?
logger.debug(`Deleting stale session | ${session.id}`);
try {
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id, {
namespace: session.namespaces?.[0],
});
deleted = true;
} catch (e) {
logger.error(
`Error while deleting stale search session ${session.id}: ${e.message}`
);
}
// Send a delete request for each async search to ES
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
const searchInfo = session.attributes.idMapping[searchKey];
if (searchInfo.strategy === ENHANCED_ES_SEARCH_STRATEGY) {
try {
await client.asyncSearch.delete({ id: searchInfo.id });
} catch (e) {
logger.error(
`Error while deleting async_search ${searchInfo.id}: ${e.message}`
);
}
}
});
}
}
if (updated && !deleted) {
updatedSessions.push(session);
}
})
);
// Do a bulk update
if (updatedSessions.length) {
// If there's an error, we'll try again in the next iteration, so there's no need to check the output.
const updatedResponse = await savedObjectsClient.bulkUpdate<SearchSessionSavedObjectAttributes>(
updatedSessions.map((session) => ({
...session,
namespace: session.namespaces?.[0],
}))
);
const success: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
const fail: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
updatedResponse.saved_objects.forEach((savedObjectResponse) => {
if ('error' in savedObjectResponse) {
fail.push(savedObjectResponse);
logger.error(
`Error while updating search session ${savedObjectResponse?.id}: ${savedObjectResponse.error?.message}`
);
} else {
success.push(savedObjectResponse);
}
});
logger.debug(`Updating search sessions: success: ${success.length}, fail: ${fail.length}`);
}
return runningSearchSessionsResponse;
})
);
}
export function checkRunningSessions(deps: CheckRunningSessionsDeps, config: SearchSessionsConfig) {
const { logger } = deps;
const checkRunningSessionsByPage = (nextPage = 1): Observable<void> =>
checkRunningSessionsPage(deps, config, nextPage).pipe(
concatMap((result) => {
if (!result || !result.saved_objects || result.saved_objects.length < config.pageSize) {
return EMPTY;
} else {
// TODO: while processing previous page session list might have been changed and we might skip a session,
// because it would appear now on a different "page".
// This isn't critical, as we would pick it up on a next task iteration, but maybe we could improve this somehow
return checkRunningSessionsByPage(result.page + 1);
}
})
);
return checkRunningSessionsByPage().pipe(
catchError((e) => {
logger.error(`Error while processing search sessions: ${e?.message}`);
return EMPTY;
})
);
}

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { EMPTY, Observable } from 'rxjs';
import { catchError, concatMap } from 'rxjs/operators';
import {
nodeBuilder,
SEARCH_SESSION_TYPE,
SearchSessionStatus,
KueryNode,
} from '../../../../../../src/plugins/data/common';
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
import { SearchSessionsConfig, CheckSearchSessionsDeps, SearchSessionsResponse } from './types';
import { bulkUpdateSessions, getAllSessionsStatusUpdates } from './update_session_status';
export const SEARCH_SESSIONS_EXPIRE_TASK_TYPE = 'search_sessions_expire';
export const SEARCH_SESSIONS_EXPIRE_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_EXPIRE_TASK_TYPE}`;
function checkSessionExpirationPage(
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig,
filter: KueryNode,
page: number
): Observable<SearchSessionsResponse> {
const { logger } = deps;
logger.debug(`${SEARCH_SESSIONS_EXPIRE_TASK_TYPE} Fetching sessions from page ${page}`);
return getSearchSessionsPage$(deps, filter, config.pageSize, page).pipe(
concatMap(async (searchSessions) => {
if (!searchSessions.total) return searchSessions;
logger.debug(
`${SEARCH_SESSIONS_EXPIRE_TASK_TYPE} Found ${searchSessions.total} sessions, processing ${searchSessions.saved_objects.length}`
);
const updatedSessions = await getAllSessionsStatusUpdates(deps, searchSessions);
await bulkUpdateSessions(deps, updatedSessions);
return searchSessions;
})
);
}
export function checkPersistedCompletedSessionExpiration(
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig
) {
const { logger } = deps;
const persistedSessionsFilter = nodeBuilder.and([
nodeBuilder.is(`${SEARCH_SESSION_TYPE}.attributes.persisted`, 'true'),
nodeBuilder.is(
`${SEARCH_SESSION_TYPE}.attributes.status`,
SearchSessionStatus.COMPLETE.toString()
),
]);
return checkSearchSessionsByPage(
checkSessionExpirationPage,
deps,
config,
persistedSessionsFilter
).pipe(
catchError((e) => {
logger.error(
`${SEARCH_SESSIONS_EXPIRE_TASK_TYPE} Error while processing sessions: ${e?.message}`
);
return EMPTY;
})
);
}

View file

@ -0,0 +1,282 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { checkSearchSessionsByPage, getSearchSessionsPage$ } from './get_search_session_page';
import {
SearchSessionStatus,
ENHANCED_ES_SEARCH_STRATEGY,
} from '../../../../../../src/plugins/data/common';
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import { SearchSessionsConfig, SearchStatus } from './types';
import moment from 'moment';
import { SavedObjectsClientContract } from '../../../../../../src/core/server';
import { of, Subject, throwError } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
jest.useFakeTimers();
describe('checkSearchSessionsByPage', () => {
const mockClient = {} as any;
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
const config: SearchSessionsConfig = {
enabled: true,
pageSize: 5,
management: {} as any,
} as any;
const mockLogger: any = {
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
const emptySO = {
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(10, 's')),
idMapping: {},
},
};
beforeEach(() => {
savedObjectsClient = savedObjectsClientMock.create();
});
describe('getSearchSessionsPage$', () => {
test('sorting is by "touched"', async () => {
savedObjectsClient.find.mockResolvedValueOnce({
saved_objects: [],
total: 0,
} as any);
await getSearchSessionsPage$(
{
savedObjectsClient,
} as any,
{
type: 'literal',
},
1,
1
);
expect(savedObjectsClient.find).toHaveBeenCalledWith(
expect.objectContaining({ sortField: 'touched', sortOrder: 'asc' })
);
});
});
describe('pagination', () => {
test('fetches one page if got empty response', async () => {
const checkFn = jest.fn().mockReturnValue(of(undefined));
await checkSearchSessionsByPage(
checkFn,
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config,
[]
).toPromise();
expect(checkFn).toHaveBeenCalledTimes(1);
});
test('fetches one page if got response with no saved objects', async () => {
const checkFn = jest.fn().mockReturnValue(
of({
total: 0,
})
);
await checkSearchSessionsByPage(
checkFn,
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config,
[]
).toPromise();
expect(checkFn).toHaveBeenCalledTimes(1);
});
test('fetches one page if less than page size object are returned', async () => {
const checkFn = jest.fn().mockReturnValue(
of({
saved_objects: [emptySO, emptySO],
total: 5,
})
);
await checkSearchSessionsByPage(
checkFn,
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config,
[]
).toPromise();
expect(checkFn).toHaveBeenCalledTimes(1);
});
test('fetches two pages if exactly page size objects are returned', async () => {
let i = 0;
const checkFn = jest.fn().mockImplementation(() =>
of({
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
total: 5,
page: i,
})
);
await checkSearchSessionsByPage(
checkFn,
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config,
[]
).toPromise();
expect(checkFn).toHaveBeenCalledTimes(2);
// validate that page number increases
const page1 = checkFn.mock.calls[0][3];
const page2 = checkFn.mock.calls[1][3];
expect(page1).toBe(1);
expect(page2).toBe(2);
});
test('fetches two pages if page size +1 objects are returned', async () => {
let i = 0;
const checkFn = jest.fn().mockImplementation(() =>
of({
saved_objects: i++ === 0 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [emptySO],
total: i === 0 ? 5 : 1,
page: i,
})
);
await checkSearchSessionsByPage(
checkFn,
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config,
[]
).toPromise();
expect(checkFn).toHaveBeenCalledTimes(2);
});
test('sessions fetched in the beginning are processed even if sessions in the end fail', async () => {
let i = 0;
const checkFn = jest.fn().mockImplementation(() => {
if (++i === 2) {
return throwError('Fake find error...');
}
return of({
saved_objects:
i <= 5
? [
i === 1
? {
id: '123',
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(2, 'm')),
idMapping: {
'map-key': {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
id: 'async-id',
status: SearchStatus.IN_PROGRESS,
},
},
},
}
: emptySO,
emptySO,
emptySO,
emptySO,
emptySO,
]
: [],
total: 25,
page: i,
});
});
await checkSearchSessionsByPage(
checkFn,
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config,
[]
)
.toPromise()
.catch(() => {});
expect(checkFn).toHaveBeenCalledTimes(2);
});
test('fetching is abortable', async () => {
let i = 0;
const abort$ = new Subject();
const checkFn = jest.fn().mockImplementation(() => {
if (++i === 2) {
abort$.next();
}
return of({
saved_objects: i <= 5 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
total: 25,
page: i,
});
});
await checkSearchSessionsByPage(
checkFn,
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config,
[]
)
.pipe(takeUntil(abort$))
.toPromise()
.catch(() => {});
jest.runAllTimers();
// if not for `abort$` then this would be called 6 times!
expect(checkFn).toHaveBeenCalledTimes(2);
});
});
});

View file

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsClientContract, Logger } from 'kibana/server';
import { from, Observable, EMPTY } from 'rxjs';
import { concatMap } from 'rxjs/operators';
import {
SearchSessionSavedObjectAttributes,
SEARCH_SESSION_TYPE,
KueryNode,
} from '../../../../../../src/plugins/data/common';
import { CheckSearchSessionsDeps, CheckSearchSessionsFn, SearchSessionsConfig } from './types';
export interface GetSessionsDeps {
savedObjectsClient: SavedObjectsClientContract;
logger: Logger;
}
export function getSearchSessionsPage$(
{ savedObjectsClient }: GetSessionsDeps,
filter: KueryNode,
pageSize: number,
page: number
) {
return from(
savedObjectsClient.find<SearchSessionSavedObjectAttributes>({
page,
perPage: pageSize,
type: SEARCH_SESSION_TYPE,
namespaces: ['*'],
// process older sessions first
sortField: 'touched',
sortOrder: 'asc',
filter,
})
);
}
export const checkSearchSessionsByPage = (
checkFn: CheckSearchSessionsFn,
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig,
filters: any,
nextPage = 1
): Observable<void> =>
checkFn(deps, config, filters, nextPage).pipe(
concatMap((result) => {
if (!result || !result.saved_objects || result.saved_objects.length < config.pageSize) {
return EMPTY;
} else {
// TODO: while processing previous page session list might have been changed and we might skip a session,
// because it would appear now on a different "page".
// This isn't critical, as we would pick it up on a next task iteration, but maybe we could improve this somehow
return checkSearchSessionsByPage(checkFn, deps, config, filters, result.page + 1);
}
})
);

View file

@ -6,4 +6,3 @@
*/
export * from './session_service';
export { registerSearchSessionsTask, scheduleSearchSessionsTasks } from './monitoring_task';

View file

@ -1,119 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Duration } from 'moment';
import { filter, takeUntil } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
RunContext,
TaskRunCreatorFunction,
} from '../../../../task_manager/server';
import { checkRunningSessions } from './check_running_sessions';
import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server';
import { ConfigSchema } from '../../../config';
import { SEARCH_SESSION_TYPE } from '../../../../../../src/plugins/data/common';
import { DataEnhancedStartDependencies } from '../../type';
export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor';
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
interface SearchSessionTaskDeps {
taskManager: TaskManagerSetupContract;
logger: Logger;
config: ConfigSchema;
}
function searchSessionRunner(
core: CoreSetup<DataEnhancedStartDependencies>,
{ logger, config }: SearchSessionTaskDeps
): TaskRunCreatorFunction {
return ({ taskInstance }: RunContext) => {
const aborted$ = new BehaviorSubject<boolean>(false);
return {
async run() {
const sessionConfig = config.search.sessions;
const [coreStart] = await core.getStartServices();
if (!sessionConfig.enabled) {
logger.debug('Search sessions are disabled. Skipping task.');
return;
}
if (aborted$.getValue()) return;
const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]);
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
await checkRunningSessions(
{
savedObjectsClient: internalSavedObjectsClient,
client: coreStart.elasticsearch.client.asInternalUser,
logger,
},
sessionConfig
)
.pipe(takeUntil(aborted$.pipe(filter((aborted) => aborted))))
.toPromise();
return {
state: {},
};
},
cancel: async () => {
aborted$.next(true);
},
};
};
}
export function registerSearchSessionsTask(
core: CoreSetup<DataEnhancedStartDependencies>,
deps: SearchSessionTaskDeps
) {
deps.taskManager.registerTaskDefinitions({
[SEARCH_SESSIONS_TASK_TYPE]: {
title: 'Search Sessions Monitor',
createTaskRunner: searchSessionRunner(core, deps),
timeout: `${deps.config.search.sessions.monitoringTaskTimeout.asSeconds()}s`,
},
});
}
export async function unscheduleSearchSessionsTask(
taskManager: TaskManagerStartContract,
logger: Logger
) {
try {
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
logger.debug(`Search sessions cleared`);
} catch (e) {
logger.error(`Error clearing task, received ${e.message}`);
}
}
export async function scheduleSearchSessionsTasks(
taskManager: TaskManagerStartContract,
logger: Logger,
trackingInterval: Duration
) {
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
try {
await taskManager.ensureScheduled({
id: SEARCH_SESSIONS_TASK_ID,
taskType: SEARCH_SESSIONS_TASK_TYPE,
schedule: {
interval: `${trackingInterval.asSeconds()}s`,
},
state: {},
params: {},
});
logger.debug(`Search sessions task, scheduled to run`);
} catch (e) {
logger.error(`Error scheduling task, received ${e.message}`);
}
}

View file

@ -79,7 +79,9 @@ describe('SearchSessionService', () => {
maxUpdateRetries: MAX_UPDATE_RETRIES,
defaultExpiration: moment.duration(7, 'd'),
monitoringTaskTimeout: moment.duration(5, 'm'),
cleanupInterval: moment.duration(10, 's'),
trackingInterval: moment.duration(10, 's'),
expireInterval: moment.duration(10, 'm'),
management: {} as any,
},
},
@ -157,7 +159,9 @@ describe('SearchSessionService', () => {
maxUpdateRetries: MAX_UPDATE_RETRIES,
defaultExpiration: moment.duration(7, 'd'),
trackingInterval: moment.duration(10, 's'),
expireInterval: moment.duration(10, 'm'),
monitoringTaskTimeout: moment.duration(5, 'm'),
cleanupInterval: moment.duration(10, 's'),
management: {} as any,
},
},

View file

@ -43,11 +43,26 @@ import { createRequestHash } from './utils';
import { ConfigSchema } from '../../../config';
import {
registerSearchSessionsTask,
scheduleSearchSessionsTasks,
scheduleSearchSessionsTask,
unscheduleSearchSessionsTask,
} from './monitoring_task';
} from './setup_task';
import { SearchSessionsConfig, SearchStatus } from './types';
import { DataEnhancedStartDependencies } from '../../type';
import {
checkPersistedSessionsProgress,
SEARCH_SESSIONS_TASK_ID,
SEARCH_SESSIONS_TASK_TYPE,
} from './check_persisted_sessions';
import {
SEARCH_SESSIONS_CLEANUP_TASK_TYPE,
checkNonPersistedSessions,
SEARCH_SESSIONS_CLEANUP_TASK_ID,
} from './check_non_persiseted_sessions';
import {
SEARCH_SESSIONS_EXPIRE_TASK_TYPE,
SEARCH_SESSIONS_EXPIRE_TASK_ID,
checkPersistedCompletedSessionExpiration,
} from './expire_persisted_sessions';
export interface SearchSessionDependencies {
savedObjectsClient: SavedObjectsClientContract;
@ -89,11 +104,35 @@ export class SearchSessionService
}
public setup(core: CoreSetup<DataEnhancedStartDependencies>, deps: SetupDependencies) {
registerSearchSessionsTask(core, {
const taskDeps = {
config: this.config,
taskManager: deps.taskManager,
logger: this.logger,
});
};
registerSearchSessionsTask(
core,
taskDeps,
SEARCH_SESSIONS_TASK_TYPE,
'persisted session progress',
checkPersistedSessionsProgress
);
registerSearchSessionsTask(
core,
taskDeps,
SEARCH_SESSIONS_CLEANUP_TASK_TYPE,
'non persisted session cleanup',
checkNonPersistedSessions
);
registerSearchSessionsTask(
core,
taskDeps,
SEARCH_SESSIONS_EXPIRE_TASK_TYPE,
'complete session expiration',
checkPersistedCompletedSessionExpiration
);
}
public async start(core: CoreStart, deps: StartDependencies) {
@ -103,14 +142,37 @@ export class SearchSessionService
public stop() {}
private setupMonitoring = async (core: CoreStart, deps: StartDependencies) => {
const taskDeps = {
config: this.config,
taskManager: deps.taskManager,
logger: this.logger,
};
if (this.sessionConfig.enabled) {
scheduleSearchSessionsTasks(
deps.taskManager,
this.logger,
scheduleSearchSessionsTask(
taskDeps,
SEARCH_SESSIONS_TASK_ID,
SEARCH_SESSIONS_TASK_TYPE,
this.sessionConfig.trackingInterval
);
scheduleSearchSessionsTask(
taskDeps,
SEARCH_SESSIONS_CLEANUP_TASK_ID,
SEARCH_SESSIONS_CLEANUP_TASK_TYPE,
this.sessionConfig.cleanupInterval
);
scheduleSearchSessionsTask(
taskDeps,
SEARCH_SESSIONS_EXPIRE_TASK_ID,
SEARCH_SESSIONS_EXPIRE_TASK_TYPE,
this.sessionConfig.expireInterval
);
} else {
unscheduleSearchSessionsTask(deps.taskManager, this.logger);
unscheduleSearchSessionsTask(taskDeps, SEARCH_SESSIONS_TASK_ID);
unscheduleSearchSessionsTask(taskDeps, SEARCH_SESSIONS_CLEANUP_TASK_ID);
unscheduleSearchSessionsTask(taskDeps, SEARCH_SESSIONS_EXPIRE_TASK_ID);
}
};

View file

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Duration } from 'moment';
import { filter, takeUntil } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';
import { RunContext, TaskRunCreatorFunction } from '../../../../task_manager/server';
import { CoreSetup, SavedObjectsClient } from '../../../../../../src/core/server';
import { SEARCH_SESSION_TYPE } from '../../../../../../src/plugins/data/common';
import { DataEnhancedStartDependencies } from '../../type';
import {
SearchSessionTaskSetupDeps,
SearchSessionTaskStartDeps,
SearchSessionTaskFn,
} from './types';
export function searchSessionTaskRunner(
core: CoreSetup<DataEnhancedStartDependencies>,
deps: SearchSessionTaskSetupDeps,
title: string,
checkFn: SearchSessionTaskFn
): TaskRunCreatorFunction {
const { logger, config } = deps;
return ({ taskInstance }: RunContext) => {
const aborted$ = new BehaviorSubject<boolean>(false);
return {
async run() {
try {
const sessionConfig = config.search.sessions;
const [coreStart] = await core.getStartServices();
if (!sessionConfig.enabled) {
logger.debug(`Search sessions are disabled. Skipping task ${title}.`);
return;
}
if (aborted$.getValue()) return;
const internalRepo = coreStart.savedObjects.createInternalRepository([
SEARCH_SESSION_TYPE,
]);
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
await checkFn(
{
logger,
client: coreStart.elasticsearch.client.asInternalUser,
savedObjectsClient: internalSavedObjectsClient,
},
sessionConfig
)
.pipe(takeUntil(aborted$.pipe(filter((aborted) => aborted))))
.toPromise();
return {
state: {},
};
} catch (e) {
logger.error(`An error occurred. Skipping task ${title}.`);
}
},
cancel: async () => {
aborted$.next(true);
},
};
};
}
export function registerSearchSessionsTask(
core: CoreSetup<DataEnhancedStartDependencies>,
deps: SearchSessionTaskSetupDeps,
taskType: string,
title: string,
checkFn: SearchSessionTaskFn
) {
deps.taskManager.registerTaskDefinitions({
[taskType]: {
title,
createTaskRunner: searchSessionTaskRunner(core, deps, title, checkFn),
timeout: `${deps.config.search.sessions.monitoringTaskTimeout.asSeconds()}s`,
},
});
}
export async function unscheduleSearchSessionsTask(
{ taskManager, logger }: SearchSessionTaskStartDeps,
taskId: string
) {
try {
await taskManager.removeIfExists(taskId);
logger.debug(`${taskId} cleared`);
} catch (e) {
logger.error(`${taskId} Error clearing task ${e.message}`);
}
}
export async function scheduleSearchSessionsTask(
{ taskManager, logger }: SearchSessionTaskStartDeps,
taskId: string,
taskType: string,
interval: Duration
) {
await taskManager.removeIfExists(taskId);
try {
await taskManager.ensureScheduled({
id: taskId,
taskType,
schedule: {
interval: `${interval.asSeconds()}s`,
},
state: {},
params: {},
});
logger.debug(`${taskId} scheduled to run`);
} catch (e) {
logger.error(`${taskId} Error scheduling task ${e.message}`);
}
}

View file

@ -5,6 +5,18 @@
* 2.0.
*/
import {
ElasticsearchClient,
Logger,
SavedObjectsClientContract,
SavedObjectsFindResponse,
} from 'kibana/server';
import { Observable } from 'rxjs';
import { KueryNode, SearchSessionSavedObjectAttributes } from 'src/plugins/data/common';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../../../x-pack/plugins/task_manager/server';
import { ConfigSchema } from '../../../config';
export enum SearchStatus {
@ -14,3 +26,38 @@ export enum SearchStatus {
}
export type SearchSessionsConfig = ConfigSchema['search']['sessions'];
export interface CheckSearchSessionsDeps {
savedObjectsClient: SavedObjectsClientContract;
client: ElasticsearchClient;
logger: Logger;
}
export interface SearchSessionTaskSetupDeps {
taskManager: TaskManagerSetupContract;
logger: Logger;
config: ConfigSchema;
}
export interface SearchSessionTaskStartDeps {
taskManager: TaskManagerStartContract;
logger: Logger;
config: ConfigSchema;
}
export type SearchSessionTaskFn = (
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig
) => Observable<void>;
export type SearchSessionsResponse = SavedObjectsFindResponse<
SearchSessionSavedObjectAttributes,
unknown
>;
export type CheckSearchSessionsFn = (
deps: CheckSearchSessionsDeps,
config: SearchSessionsConfig,
filter: KueryNode,
page: number
) => Observable<SearchSessionsResponse>;

View file

@ -0,0 +1,323 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { bulkUpdateSessions, updateSessionStatus } from './update_session_status';
import {
SearchSessionStatus,
SearchSessionSavedObjectAttributes,
} from '../../../../../../src/plugins/data/common';
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import { SearchStatus } from './types';
import moment from 'moment';
import {
SavedObjectsBulkUpdateObject,
SavedObjectsClientContract,
SavedObjectsFindResult,
} from '../../../../../../src/core/server';
describe('bulkUpdateSessions', () => {
let mockClient: any;
let savedObjectsClient: jest.Mocked<SavedObjectsClientContract>;
const mockLogger: any = {
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
beforeEach(() => {
savedObjectsClient = savedObjectsClientMock.create();
mockClient = {
asyncSearch: {
status: jest.fn(),
delete: jest.fn(),
},
eql: {
status: jest.fn(),
delete: jest.fn(),
},
};
});
describe('updateSessionStatus', () => {
test('updates expired session', async () => {
const so: SavedObjectsFindResult<SearchSessionSavedObjectAttributes> = {
id: '123',
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
expires: moment().subtract(moment.duration(5, 'd')),
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
} as any;
const updated = await updateSessionStatus(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
so
);
expect(updated).toBeTruthy();
expect(so.attributes.status).toBe(SearchSessionStatus.EXPIRED);
});
test('does nothing if the search is still running', async () => {
const so = {
id: '123',
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(10, 's')),
expires: moment().add(moment.duration(5, 'd')),
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
} as any;
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: true,
is_running: true,
},
});
const updated = await updateSessionStatus(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
so
);
expect(updated).toBeFalsy();
expect(so.attributes.status).toBe(SearchSessionStatus.IN_PROGRESS);
});
test("doesn't re-check completed or errored searches", async () => {
const so = {
id: '123',
attributes: {
expires: moment().add(moment.duration(5, 'd')),
status: SearchSessionStatus.ERROR,
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.COMPLETE,
},
'another-search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.ERROR,
},
},
},
} as any;
const updated = await updateSessionStatus(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
so
);
expect(updated).toBeFalsy();
expect(mockClient.asyncSearch.status).not.toBeCalled();
});
test('updates to complete if the search is done', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
attributes: {
status: SearchSessionStatus.IN_PROGRESS,
touched: '123',
expires: moment().add(moment.duration(5, 'd')),
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
} as any;
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 200,
},
});
const updated = await updateSessionStatus(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
so
);
expect(updated).toBeTruthy();
expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' });
expect(so.attributes.status).toBe(SearchSessionStatus.COMPLETE);
expect(so.attributes.status).toBe(SearchSessionStatus.COMPLETE);
expect(so.attributes.touched).not.toBe('123');
expect(so.attributes.completed).not.toBeUndefined();
expect(so.attributes.idMapping['search-hash'].status).toBe(SearchStatus.COMPLETE);
expect(so.attributes.idMapping['search-hash'].error).toBeUndefined();
});
test('updates to error if the search is errored', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
attributes: {
expires: moment().add(moment.duration(5, 'd')),
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
} as any;
mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 500,
},
});
const updated = await updateSessionStatus(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
so
);
expect(updated).toBeTruthy();
expect(so.attributes.status).toBe(SearchSessionStatus.ERROR);
expect(so.attributes.touched).not.toBe('123');
expect(so.attributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR);
expect(so.attributes.idMapping['search-hash'].error).toBe(
'Search completed with a 500 status'
);
});
});
describe('bulkUpdateSessions', () => {
test('does nothing if there are no open sessions', async () => {
await bulkUpdateSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
[]
);
expect(savedObjectsClient.bulkUpdate).not.toBeCalled();
expect(savedObjectsClient.delete).not.toBeCalled();
});
test('updates in space', async () => {
const so = {
namespaces: ['awesome'],
attributes: {
expires: moment().add(moment.duration(5, 'd')),
status: SearchSessionStatus.IN_PROGRESS,
touched: '123',
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
} as any;
savedObjectsClient.bulkUpdate = jest.fn().mockResolvedValue({
saved_objects: [so],
});
await bulkUpdateSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
[so]
);
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];
const updatedAttributes = updateInput[0] as SavedObjectsBulkUpdateObject;
expect(updatedAttributes.namespace).toBe('awesome');
});
test('logs failures', async () => {
const so = {
namespaces: ['awesome'],
attributes: {
expires: moment().add(moment.duration(5, 'd')),
status: SearchSessionStatus.IN_PROGRESS,
touched: '123',
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
} as any;
savedObjectsClient.bulkUpdate = jest.fn().mockResolvedValue({
saved_objects: [
{
error: 'nope',
},
],
});
await bulkUpdateSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
[so]
);
expect(savedObjectsClient.bulkUpdate).toBeCalledTimes(1);
expect(mockLogger.error).toBeCalledTimes(1);
});
});
});

View file

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsFindResult, SavedObjectsUpdateResponse } from 'kibana/server';
import {
SearchSessionRequestInfo,
SearchSessionSavedObjectAttributes,
SearchSessionStatus,
} from '../../../../../../src/plugins/data/common';
import { getSearchStatus } from './get_search_status';
import { getSessionStatus } from './get_session_status';
import { CheckSearchSessionsDeps, SearchSessionsResponse, SearchStatus } from './types';
import { isSearchSessionExpired } from './utils';
export async function updateSessionStatus(
{ logger, client }: CheckSearchSessionsDeps,
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>
) {
let sessionUpdated = false;
const isExpired = isSearchSessionExpired(session);
if (!isExpired) {
// Check statuses of all running searches
await Promise.all(
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
const updateSearchRequest = (
currentStatus: Pick<SearchSessionRequestInfo, 'status' | 'error'>
) => {
sessionUpdated = true;
session.attributes.idMapping[searchKey] = {
...session.attributes.idMapping[searchKey],
...currentStatus,
};
};
const searchInfo = session.attributes.idMapping[searchKey];
if (searchInfo.status === SearchStatus.IN_PROGRESS) {
try {
const currentStatus = await getSearchStatus(client, searchInfo.id);
if (currentStatus.status !== searchInfo.status) {
logger.debug(`search ${searchInfo.id} | status changed to ${currentStatus.status}`);
updateSearchRequest(currentStatus);
}
} catch (e) {
logger.error(e);
updateSearchRequest({
status: SearchStatus.ERROR,
error: e.message || e.meta.error?.caused_by?.reason,
});
}
}
})
);
}
// And only then derive the session's status
const sessionStatus = isExpired
? SearchSessionStatus.EXPIRED
: getSessionStatus(session.attributes);
if (sessionStatus !== session.attributes.status) {
const now = new Date().toISOString();
session.attributes.status = sessionStatus;
session.attributes.touched = now;
if (sessionStatus === SearchSessionStatus.COMPLETE) {
session.attributes.completed = now;
} else if (session.attributes.completed) {
session.attributes.completed = null;
}
sessionUpdated = true;
}
return sessionUpdated;
}
export async function getAllSessionsStatusUpdates(
deps: CheckSearchSessionsDeps,
searchSessions: SearchSessionsResponse
) {
const updatedSessions = new Array<SavedObjectsFindResult<SearchSessionSavedObjectAttributes>>();
await Promise.all(
searchSessions.saved_objects.map(async (session) => {
const updated = await updateSessionStatus(deps, session);
if (updated) {
updatedSessions.push(session);
}
})
);
return updatedSessions;
}
export async function bulkUpdateSessions(
{ logger, savedObjectsClient }: CheckSearchSessionsDeps,
updatedSessions: Array<SavedObjectsFindResult<SearchSessionSavedObjectAttributes>>
) {
if (updatedSessions.length) {
// If there's an error, we'll try again in the next iteration, so there's no need to check the output.
const updatedResponse = await savedObjectsClient.bulkUpdate<SearchSessionSavedObjectAttributes>(
updatedSessions.map((session) => ({
...session,
namespace: session.namespaces?.[0],
}))
);
const success: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
const fail: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];
updatedResponse.saved_objects.forEach((savedObjectResponse) => {
if ('error' in savedObjectResponse) {
fail.push(savedObjectResponse);
logger.error(
`Error while updating search session ${savedObjectResponse?.id}: ${savedObjectResponse.error?.message}`
);
} else {
success.push(savedObjectResponse);
}
});
logger.debug(`Updating search sessions: success: ${success.length}, fail: ${fail.length}`);
}
}

View file

@ -7,6 +7,9 @@
import { createHash } from 'crypto';
import stringify from 'json-stable-stringify';
import { SavedObjectsFindResult } from 'kibana/server';
import moment from 'moment';
import { SearchSessionSavedObjectAttributes } from 'src/plugins/data/common';
/**
* Generate the hash for this request so that, in the future, this hash can be used to look up
@ -17,3 +20,9 @@ export function createRequestHash(keys: Record<any, any>) {
const { preference, ...params } = keys;
return createHash(`sha256`).update(stringify(params)).digest('hex');
}
export function isSearchSessionExpired(
session: SavedObjectsFindResult<SearchSessionSavedObjectAttributes>
) {
return moment(session.attributes.expires).isBefore(moment());
}

View file

@ -33,6 +33,7 @@ export async function getApiIntegrationConfig({ readConfigFile }: FtrConfigProvi
'--xpack.data_enhanced.search.sessions.enabled=true', // enable WIP send to background UI
'--xpack.data_enhanced.search.sessions.notTouchedTimeout=15s', // shorten notTouchedTimeout for quicker testing
'--xpack.data_enhanced.search.sessions.trackingInterval=5s', // shorten trackingInterval for quicker testing
'--xpack.data_enhanced.search.sessions.cleanupInterval=5s', // shorten cleanupInterval for quicker testing
],
},
esTestCluster: {