removes usage of the _id field in Task manager (#54765)

As of Elasticsearch 8.0.0 it will no longer be possible to use the _id field on documents.
This PR removes the usage that Task Manager makes of this field and switches to pinned queries to achieve a similar effect.
This commit is contained in:
Gidi Meir Morris 2020-01-16 09:55:51 +00:00 committed by GitHub
parent 81a7f89448
commit 8458e47614
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 629 additions and 451 deletions

View file

@ -17,7 +17,7 @@ import {
TaskInstanceWithId,
TaskDefinition,
} from '../../../../plugins/task_manager/server/task.js';
import { FetchOpts } from '../../../../plugins/task_manager/server/task_store.js';
import { SearchOpts } from '../../../../plugins/task_manager/server/task_store.js';
// Once all plugins are migrated to NP and we can remove Legacy TaskManager in version 8.0.0,
// this can be removed
@ -46,7 +46,7 @@ export function createLegacyApi(legacyTaskManager: Promise<TaskManager>): Legacy
registerTaskDefinitions: (taskDefinitions: TaskDictionary<TaskDefinition>) => {
legacyTaskManager.then((tm: TaskManager) => tm.registerTaskDefinitions(taskDefinitions));
},
fetch: (opts: FetchOpts) => legacyTaskManager.then((tm: TaskManager) => tm.fetch(opts)),
fetch: (opts: SearchOpts) => legacyTaskManager.then((tm: TaskManager) => tm.fetch(opts)),
remove: (id: string) => legacyTaskManager.then((tm: TaskManager) => tm.remove(id)),
schedule: (taskInstance: TaskInstanceWithDeprecatedFields, options?: any) =>
legacyTaskManager.then((tm: TaskManager) => tm.schedule(taskInstance, options)),

View file

@ -6,6 +6,9 @@
import { schema, TypeOf } from '@kbn/config-schema';
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
/* The maximum number of times a task will be attempted before being abandoned as failed */
@ -15,7 +18,7 @@ export const configSchema = schema.object({
}),
/* How often, in milliseconds, the task manager will look for more work. */
poll_interval: schema.number({
defaultValue: 3000,
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
@ -35,7 +38,7 @@ export const configSchema = schema.object({
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.number({
defaultValue: 10,
defaultValue: DEFAULT_MAX_WORKERS,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),

View file

@ -9,9 +9,9 @@ import {
asUpdateByQuery,
shouldBeOneOf,
mustBeAllOf,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
ExistsFilter,
TermFilter,
RangeFilter,
} from './query_clauses';
import {
@ -51,7 +51,7 @@ describe('mark_available_tasks_as_claimed', () => {
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has an schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsBoolClause | TermBoolClause | RangeBoolClause>(
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Object.entries(definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || defaultMaxAttempts)

View file

@ -3,24 +3,24 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { defaultsDeep } from 'lodash';
import {
BoolClause,
IDsClause,
SortClause,
ScriptClause,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
ExistsFilter,
TermFilter,
RangeFilter,
mustBeAllOf,
MustCondition,
MustNotCondition,
} from './query_clauses';
export const TaskWithSchedule: ExistsBoolClause = {
export const TaskWithSchedule: ExistsFilter = {
exists: { field: 'task.schedule' },
};
export function taskWithLessThanMaxAttempts(
type: string,
maxAttempts: number
): BoolClause<TermBoolClause | RangeBoolClause> {
): MustCondition<TermFilter | RangeFilter> {
return {
bool: {
must: [
@ -37,34 +37,37 @@ export function taskWithLessThanMaxAttempts(
};
}
export const IdleTaskWithExpiredRunAt: BoolClause<TermBoolClause | RangeBoolClause> = {
export function tasksClaimedByOwner(taskManagerId: string) {
return mustBeAllOf(
{
term: {
'task.ownerId': taskManagerId,
},
},
{ term: { 'task.status': 'claiming' } }
);
}
export const IdleTaskWithExpiredRunAt: MustCondition<TermFilter | RangeFilter> = {
bool: {
must: [{ term: { 'task.status': 'idle' } }, { range: { 'task.runAt': { lte: 'now' } } }],
},
};
export const taskWithIDsAndRunnableStatus = (
claimTasksById: string[]
): BoolClause<TermBoolClause | IDsClause> => ({
export const InactiveTasks: MustNotCondition<TermFilter | RangeFilter> = {
bool: {
must: [
must_not: [
{
bool: {
should: [{ term: { 'task.status': 'idle' } }, { term: { 'task.status': 'failed' } }],
},
},
{
ids: {
values: claimTasksById,
should: [{ term: { 'task.status': 'running' } }, { term: { 'task.status': 'claiming' } }],
},
},
{ range: { 'task.retryAt': { gt: 'now' } } },
],
},
});
};
export const RunningOrClaimingTaskWithExpiredRetryAt: BoolClause<
TermBoolClause | RangeBoolClause
> = {
export const RunningOrClaimingTaskWithExpiredRetryAt: MustCondition<TermFilter | RangeFilter> = {
bool: {
must: [
{
@ -95,31 +98,6 @@ if (doc['task.runAt'].size()!=0) {
},
};
const SORT_VALUE_TO_BE_FIRST = 0;
export const sortByIdsThenByScheduling = (claimTasksById: string[]): SortClause => {
const {
_script: {
script: { source },
},
} = SortByRunAtAndRetryAt;
return defaultsDeep(
{
_script: {
script: {
source: `
if(params.ids.contains(doc['_id'].value)){
return ${SORT_VALUE_TO_BE_FIRST};
}
${source}
`,
params: { ids: claimTasksById },
},
},
},
SortByRunAtAndRetryAt
);
};
export const updateFields = (fieldUpdates: {
[field: string]: string | number | Date;
}): ScriptClause => ({

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import _ from 'lodash';
import {
MustCondition,
shouldBeOneOf,
mustBeAllOf,
ExistsFilter,
TermFilter,
RangeFilter,
matchesClauses,
} from './query_clauses';
describe('matchesClauses', () => {
test('merges multiple types of Bool Clauses into one', () => {
const TaskWithSchedule: ExistsFilter = {
exists: { field: 'task.schedule' },
};
const IdleTaskWithExpiredRunAt: MustCondition<TermFilter | RangeFilter> = {
bool: {
must: [{ term: { 'task.status': 'idle' } }, { range: { 'task.runAt': { lte: 'now' } } }],
},
};
const RunningTask: MustCondition<TermFilter> = {
bool: {
must: [{ term: { 'task.status': 'running' } }],
},
};
expect(
matchesClauses(
mustBeAllOf(TaskWithSchedule),
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
RunningTask,
IdleTaskWithExpiredRunAt
)
)
).toMatchObject({
bool: {
must: [TaskWithSchedule],
should: [RunningTask, IdleTaskWithExpiredRunAt],
},
});
});
});

View file

@ -4,30 +4,150 @@
* you may not use this file except in compliance with the Elastic License.
*/
export interface TermBoolClause {
/**
* Terminology
* ===========
* The terms for the differenct clauses in an Elasticsearch query can be confusing, here are some
* clarifications that might help you understand the Typescript types we use here.
*
* Given the following Query:
* {
* "query": { (1)
* "bool": { (2)
* "must":
* [
* (3) { "term" : { "tag" : "wow" } },
* { "term" : { "tag" : "elasticsearch" } },
* {
* "must" : { "term" : { "user" : "kimchy" } }
* }
* ]
* }
* }
* }
*
* These are referred to as:
* (1). BoolClause / BoolClauseWithAnyCondition
* (2). BoolCondition / AnyBoolCondition
* (3). BoolClauseFilter
*
*/
export interface TermFilter {
term: { [field: string]: string | string[] };
}
export interface RangeBoolClause {
range: { [field: string]: { lte: string | number } | { lt: string | number } };
export interface RangeFilter {
range: {
[field: string]: { lte: string | number } | { lt: string | number } | { gt: string | number };
};
}
export interface ExistsBoolClause {
export interface ExistsFilter {
exists: { field: string };
}
export interface IDsClause {
ids: {
values: string[];
};
type BoolClauseFilter = TermFilter | RangeFilter | ExistsFilter;
type BoolClauseFiltering<T extends BoolClauseFilter> =
| BoolClauseWithAnyCondition<T>
| PinnedQuery<T>
| T;
enum Conditions {
Should = 'should',
Must = 'must',
MustNot = 'must_not',
Filter = 'filter',
}
export interface ShouldClause<T> {
should: Array<BoolClause<T> | IDsClause | T>;
/**
* Describe a specific BoolClause Condition with a BoolClauseFilter on it, such as:
* ```
* {
* must : [
* T, ...
* ]
* }
* ```
*/
type BoolCondition<C extends Conditions, T extends BoolClauseFilter> = {
[c in C]: Array<BoolClauseFiltering<T>>;
};
/**
* Describe a Bool clause with a specific Condition, such as:
* ```
* {
* // described by BoolClause
* bool: {
* // described by BoolCondition
* must: [
* T, ...
* ]
* }
* }
* ```
*/
interface BoolClause<C extends Conditions, T extends BoolClauseFilter> {
bool: BoolCondition<C, T>;
}
export interface MustClause<T> {
must: Array<BoolClause<T> | IDsClause | T>;
}
export interface BoolClause<T> {
bool: MustClause<T> | ShouldClause<T>;
/**
* Describe a Bool clause with mixed Conditions, such as:
* ```
* {
* // described by BoolClause<...>
* bool: {
* // described by BoolCondition<Conditions.Must, ...>
* must : {
* ...
* },
* // described by BoolCondition<Conditions.Should, ...>
* should : {
* ...
* }
* }
* }
* ```
*/
type AnyBoolCondition<T extends BoolClauseFilter> = {
[Condition in Conditions]?: Array<BoolClauseFiltering<T>>;
};
/**
* Describe a Bool Condition with any Condition on it, so it can handle both:
* ```
* {
* bool: {
* must : {
* ...
* }
* }
* }
* ```
*
* and:
*
* ```
* {
* bool: {
* must_not : {
* ...
* }
* }
* }
* ```
*/
export interface BoolClauseWithAnyCondition<T extends BoolClauseFilter> {
bool: AnyBoolCondition<T>;
}
/**
* Describe the various Bool Clause Conditions we support, as specified in the Conditions enum
*/
export type ShouldCondition<T extends BoolClauseFilter> = BoolClause<Conditions.Should, T>;
export type MustCondition<T extends BoolClauseFilter> = BoolClause<Conditions.Must, T>;
export type MustNotCondition<T extends BoolClauseFilter> = BoolClause<Conditions.MustNot, T>;
export type FilterCondition<T extends BoolClauseFilter> = BoolClause<Conditions.Filter, T>;
export interface SortClause {
_script: {
type: string;
@ -39,6 +159,8 @@ export interface SortClause {
};
};
}
export type SortOptions = string | SortClause | Array<string | SortClause>;
export interface ScriptClause {
source: string;
lang: string;
@ -46,18 +168,34 @@ export interface ScriptClause {
[field: string]: string | number | Date;
};
}
export interface UpdateByQuery<T> {
query: BoolClause<T>;
sort: SortClause;
export interface UpdateByQuery<T extends BoolClauseFilter> {
query: PinnedQuery<T> | BoolClauseWithAnyCondition<T>;
sort: SortOptions;
seq_no_primary_term: true;
script: ScriptClause;
}
export function shouldBeOneOf<T>(
...should: Array<BoolClause<T> | IDsClause | T>
): {
bool: ShouldClause<T>;
} {
export interface PinnedQuery<T extends BoolClauseFilter> {
pinned: PinnedClause<T>;
}
export interface PinnedClause<T extends BoolClauseFilter> {
ids: string[];
organic: BoolClauseWithAnyCondition<T>;
}
export function matchesClauses<T extends BoolClauseFilter>(
...clauses: Array<BoolClauseWithAnyCondition<T>>
): BoolClauseWithAnyCondition<T> {
return {
bool: Object.assign({}, ...clauses.map(clause => clause.bool)),
};
}
export function shouldBeOneOf<T extends BoolClauseFilter>(
...should: Array<BoolClauseFiltering<T>>
): ShouldCondition<T> {
return {
bool: {
should,
@ -65,11 +203,9 @@ export function shouldBeOneOf<T>(
};
}
export function mustBeAllOf<T>(
...must: Array<BoolClause<T> | IDsClause | T>
): {
bool: MustClause<T>;
} {
export function mustBeAllOf<T extends BoolClauseFilter>(
...must: Array<BoolClauseFiltering<T>>
): MustCondition<T> {
return {
bool: {
must,
@ -77,14 +213,36 @@ export function mustBeAllOf<T>(
};
}
export function asUpdateByQuery<T>({
export function filterDownBy<T extends BoolClauseFilter>(
...filter: Array<BoolClauseFiltering<T>>
): FilterCondition<T> {
return {
bool: {
filter,
},
};
}
export function asPinnedQuery<T extends BoolClauseFilter>(
ids: PinnedClause<T>['ids'],
organic: PinnedClause<T>['organic']
): PinnedQuery<T> {
return {
pinned: {
ids,
organic,
},
};
}
export function asUpdateByQuery<T extends BoolClauseFilter>({
query,
update,
sort,
}: {
query: BoolClause<T>;
update: ScriptClause;
sort: SortClause;
query: UpdateByQuery<T>['query'];
update: UpdateByQuery<T>['script'];
sort: UpdateByQuery<T>['sort'];
}): UpdateByQuery<T> {
return {
query,

View file

@ -48,11 +48,11 @@ import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'
import { TaskPool } from './task_pool';
import { TaskManagerRunner, TaskRunner } from './task_runner';
import {
FetchOpts,
FetchResult,
TaskStore,
OwnershipClaimingOpts,
ClaimOwnershipResult,
SearchOpts,
} from './task_store';
import { identifyEsError } from './lib/identify_es_error';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
@ -323,12 +323,12 @@ export class TaskManager {
}
/**
* Fetches a paginatable list of scheduled tasks.
* Fetches a list of scheduled tasks.
*
* @param opts - The query options used to filter tasks
* @returns {Promise<FetchResult>}
*/
public async fetch(opts: FetchOpts): Promise<FetchResult> {
public async fetch(opts: SearchOpts): Promise<FetchResult> {
await this.waitUntilStarted();
return this.store.fetch(opts);
}

View file

@ -16,7 +16,7 @@ import {
TaskStatus,
TaskLifecycleResult,
} from './task';
import { FetchOpts, StoreOpts, OwnershipClaimingOpts, TaskStore } from './task_store';
import { StoreOpts, OwnershipClaimingOpts, TaskStore, SearchOpts } from './task_store';
import { savedObjectsRepositoryMock } from '../../../../src/core/server/mocks';
import {
SavedObjectsSerializer,
@ -175,7 +175,7 @@ describe('TaskStore', () => {
});
describe('fetch', () => {
async function testFetch(opts?: FetchOpts, hits: any[] = []) {
async function testFetch(opts?: SearchOpts, hits: any[] = []) {
const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits } }));
const store = new TaskStore({
index: 'tasky',
@ -203,7 +203,7 @@ describe('TaskStore', () => {
expect(args).toMatchObject({
index: 'tasky',
body: {
sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }],
sort: [{ 'task.runAt': 'asc' }],
query: { term: { type: 'task' } },
},
});
@ -226,122 +226,6 @@ describe('TaskStore', () => {
},
});
});
test('sorts by id if custom sort does not have an id sort in it', async () => {
const { args } = await testFetch({
sort: [{ 'task.taskType': 'desc' }],
});
expect(args).toMatchObject({
body: {
sort: [{ 'task.taskType': 'desc' }, { _id: 'desc' }],
},
});
});
test('allows custom sort by id', async () => {
const { args } = await testFetch({
sort: [{ _id: 'asc' }],
});
expect(args).toMatchObject({
body: {
sort: [{ _id: 'asc' }],
},
});
});
test('allows specifying pagination', async () => {
const now = new Date();
const searchAfter = [now, '143243sdafa32'];
const { args } = await testFetch({
searchAfter,
});
expect(args).toMatchObject({
body: {
search_after: searchAfter,
},
});
});
test('returns paginated tasks', async () => {
const runAt = new Date();
const { result } = await testFetch(undefined, [
{
_id: 'aaa',
_source: {
type: 'task',
task: {
runAt,
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: 'idle',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
scope: ['reporting'],
},
},
sort: ['a', 1],
},
{
_id: 'bbb',
_source: {
type: 'task',
task: {
runAt,
taskType: 'bar',
schedule: { interval: '5m' },
attempts: 2,
status: 'running',
params: '{ "shazm": 1 }',
state: '{ "henry": "The 8th" }',
user: 'dabo',
scope: ['reporting', 'ceo'],
},
},
sort: ['b', 2],
},
]);
expect(result).toEqual({
docs: [
{
attempts: 0,
id: 'aaa',
schedule: undefined,
params: { hello: 'world' },
runAt,
scheduledAt: mockedDate,
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'idle',
taskType: 'foo',
user: 'jimbo',
retryAt: undefined,
startedAt: undefined,
},
{
attempts: 2,
id: 'bbb',
schedule: { interval: '5m' },
params: { shazm: 1 },
runAt,
scheduledAt: mockedDate,
scope: ['reporting', 'ceo'],
state: { henry: 'The 8th' },
status: 'running',
taskType: 'bar',
user: 'dabo',
retryAt: undefined,
startedAt: undefined,
},
],
searchAfter: ['b', 2],
});
});
});
describe('claimAvailableTasks', () => {
@ -448,121 +332,6 @@ describe('TaskStore', () => {
{
bool: {
must: [
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
],
},
});
});
test('it supports claiming specific tasks by id', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const {
args: {
updateByQuery: {
body: { query, sort },
},
},
} = await testClaimAvailableTasks({
opts: {
maxAttempts,
definitions: {
foo: {
type: 'foo',
title: '',
createTaskRunner: jest.fn(),
},
bar: {
type: 'bar',
title: '',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
},
},
claimingOpts: {
claimOwnershipUntil: new Date(),
size: 10,
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
},
});
expect(query).toMatchObject({
bool: {
must: [
{ term: { type: 'task' } },
{
bool: {
should: [
{
bool: {
must: [
@ -633,25 +402,166 @@ describe('TaskStore', () => {
],
},
},
],
filter: [
{
bool: {
must: [
must_not: [
{
bool: {
should: [
{ term: { 'task.status': 'idle' } },
{ term: { 'task.status': 'failed' } },
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { gt: 'now' } } },
],
},
},
],
},
},
],
},
});
});
test('it supports claiming specific tasks by id', async () => {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const {
args: {
updateByQuery: {
body: { query, sort },
},
},
} = await testClaimAvailableTasks({
opts: {
maxAttempts,
definitions: {
foo: {
type: 'foo',
title: '',
createTaskRunner: jest.fn(),
},
bar: {
type: 'bar',
title: '',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
},
},
claimingOpts: {
claimOwnershipUntil: new Date(),
size: 10,
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
},
});
expect(query).toMatchObject({
bool: {
must: [
{ term: { type: 'task' } },
{
bool: {
must: [
{
pinned: {
ids: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
organic: {
bool: {
must: [
{
bool: {
should: [
{
bool: {
must: [
{ term: { 'task.status': 'idle' } },
{ range: { 'task.runAt': { lte: 'now' } } },
],
},
},
{
bool: {
must: [
{
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { lte: 'now' } } },
],
},
},
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
},
},
],
filter: [
{
bool: {
must_not: [
{
ids: {
values: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
bool: {
should: [
{ term: { 'task.status': 'running' } },
{ term: { 'task.status': 'claiming' } },
],
},
},
{ range: { 'task.retryAt': { gt: 'now' } } },
],
},
},
@ -662,34 +572,26 @@ describe('TaskStore', () => {
},
});
expect(sort).toMatchObject({
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if(params.ids.contains(doc['_id'].value)){
return 0;
}
expect(sort).toMatchObject([
'_score',
{
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if (doc['task.retryAt'].size()!=0) {
return doc['task.retryAt'].value.toInstant().toEpochMilli();
}
if (doc['task.runAt'].size()!=0) {
return doc['task.runAt'].value.toInstant().toEpochMilli();
}
`,
params: {
ids: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
`,
},
},
},
});
]);
});
test('it claims tasks by setting their ownerId, status and retryAt', async () => {

View file

@ -36,22 +36,23 @@ import {
asUpdateByQuery,
shouldBeOneOf,
mustBeAllOf,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
BoolClause,
IDsClause,
filterDownBy,
ExistsFilter,
TermFilter,
RangeFilter,
asPinnedQuery,
matchesClauses,
} from './queries/query_clauses';
import {
updateFields,
IdleTaskWithExpiredRunAt,
InactiveTasks,
RunningOrClaimingTaskWithExpiredRetryAt,
TaskWithSchedule,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
taskWithIDsAndRunnableStatus,
sortByIdsThenByScheduling,
tasksClaimedByOwner,
} from './queries/mark_available_tasks_as_claimed';
export interface StoreOpts {
@ -65,18 +66,13 @@ export interface StoreOpts {
}
export interface SearchOpts {
searchAfter?: any[];
sort?: object | object[];
sort?: string | object | object[];
query?: object;
size?: number;
seq_no_primary_term?: boolean;
search_after?: any[];
}
export interface FetchOpts extends SearchOpts {
sort?: object[];
}
export interface UpdateByQuerySearchOpts extends SearchOpts {
script?: object;
}
@ -92,7 +88,6 @@ export interface OwnershipClaimingOpts {
}
export interface FetchResult {
searchAfter: any[];
docs: ConcreteTaskInstance[];
}
@ -152,8 +147,8 @@ export class TaskStore {
return this.events$;
}
private emitEvent = (event: TaskClaim) => {
this.events$.next(event);
private emitEvents = (events: TaskClaim[]) => {
events.forEach(event => this.events$.next(event));
};
/**
@ -180,16 +175,16 @@ export class TaskStore {
}
/**
* Fetches a paginatable list of scheduled tasks.
* Fetches a list of scheduled tasks with default sorting.
*
* @param opts - The query options used to filter tasks
*/
public async fetch(opts: FetchOpts = {}): Promise<FetchResult> {
const sort = paginatableSort(opts.sort);
public async fetch({ sort = [{ 'task.runAt': 'asc' }], ...opts }: SearchOpts = {}): Promise<
FetchResult
> {
return this.search({
...opts,
sort,
search_after: opts.searchAfter,
query: opts.query,
});
}
@ -211,28 +206,30 @@ export class TaskStore {
this.serializer.generateRawId(undefined, 'task', id)
);
const claimedTasks = await this.markAvailableTasksAsClaimed(
const numberOfTasksClaimed = await this.markAvailableTasksAsClaimed(
claimOwnershipUntil,
claimTasksByIdWithRawIds,
size
);
const docs =
claimedTasks > 0 ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) : [];
numberOfTasksClaimed > 0
? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size)
: [];
// emit success/fail events for claimed tasks by id
if (claimTasksById && claimTasksById.length) {
docs.map(doc => asTaskClaimEvent(doc.id, asOk(doc))).forEach(this.emitEvent);
this.emitEvents(docs.map(doc => asTaskClaimEvent(doc.id, asOk(doc))));
difference(
claimTasksById,
docs.map(doc => doc.id)
)
.map(id => asTaskClaimEvent(id, asErr(new Error(`failed to claim task '${id}'`))))
.forEach(this.emitEvent);
this.emitEvents(
difference(
claimTasksById,
docs.map(doc => doc.id)
).map(id => asTaskClaimEvent(id, asErr(new Error(`failed to claim task '${id}'`))))
);
}
return {
claimedTasks,
claimedTasks: numberOfTasksClaimed,
docs,
};
};
@ -247,7 +244,7 @@ export class TaskStore {
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has a schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsBoolClause | TermBoolClause | RangeBoolClause>(
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Object.entries(this.definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || this.maxAttempts)
@ -255,36 +252,33 @@ export class TaskStore {
)
);
const { query, sort } =
claimTasksById && claimTasksById.length
? {
query: shouldBeOneOf<
| ExistsBoolClause
| TermBoolClause
| RangeBoolClause
| BoolClause<TermBoolClause | IDsClause>
>(queryForScheduledTasks, taskWithIDsAndRunnableStatus(claimTasksById)),
sort: sortByIdsThenByScheduling(claimTasksById),
}
: {
query: queryForScheduledTasks,
sort: SortByRunAtAndRetryAt,
};
const { updated } = await this.updateByQuery(
asUpdateByQuery({
query,
query: matchesClauses(
mustBeAllOf(
claimTasksById && claimTasksById.length
? asPinnedQuery(claimTasksById, queryForScheduledTasks)
: queryForScheduledTasks
),
filterDownBy(InactiveTasks)
),
update: updateFields({
ownerId: this.taskManagerId,
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
sort,
sort: [
// sort by score first, so the "pinned" Tasks are first
'_score',
// the nsort by other fields
SortByRunAtAndRetryAt,
],
}),
{
max_docs: size,
}
);
return updated;
}
@ -295,24 +289,14 @@ export class TaskStore {
claimTasksById: OwnershipClaimingOpts['claimTasksById'],
size: OwnershipClaimingOpts['size']
): Promise<ConcreteTaskInstance[]> {
const claimedTasksQuery = tasksClaimedByOwner(this.taskManagerId);
const { docs } = await this.search({
query: {
bool: {
must: [
{
term: {
'task.ownerId': this.taskManagerId,
},
},
{ term: { 'task.status': 'claiming' } },
],
},
},
size,
sort:
query:
claimTasksById && claimTasksById.length
? sortByIdsThenByScheduling(claimTasksById)
: SortByRunAtAndRetryAt,
? asPinnedQuery(claimTasksById, claimedTasksQuery)
: claimedTasksQuery,
size,
sort: SortByRunAtAndRetryAt,
seq_no_primary_term: true,
});
@ -397,7 +381,6 @@ export class TaskStore {
.map(doc => this.serializer.rawToSavedObject(doc))
.map(doc => omit(doc, 'namespace') as SavedObject)
.map(savedObjectToConcreteTaskInstance),
searchAfter: (rawDocs.length && rawDocs[rawDocs.length - 1].sort) || [],
};
}
@ -427,20 +410,6 @@ export class TaskStore {
}
}
function paginatableSort(sort: any[] = []) {
const sortById = { _id: 'desc' };
if (!sort.length) {
return [{ 'task.runAt': 'asc' }, sortById];
}
if (sort.find(({ _id }) => !!_id)) {
return sort;
}
return [...sort, sortById];
}
function taskInstanceToAttributes(doc: TaskInstance): SavedObjectAttributes {
return {
...omit(doc, 'id', 'version'),

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
const { DEFAULT_MAX_WORKERS } = require('../../../../plugins/task_manager/server/config.ts');
const { EventEmitter } = require('events');
import { initRoutes } from './init_routes';
@ -16,6 +17,7 @@ const once = function(emitter, event) {
export default function TaskTestingAPI(kibana) {
const taskTestingEvents = new EventEmitter();
taskTestingEvents.setMaxListeners(DEFAULT_MAX_WORKERS * 2);
return new kibana.Plugin({
name: 'sampleTask',

View file

@ -193,15 +193,44 @@ export function initRoutes(server, taskManager, legacyTaskManager, taskTestingEv
},
});
server.route({
path: '/api/sample_tasks/task/{taskId}',
method: 'GET',
async handler(request) {
try {
return taskManager.fetch({
query: {
bool: {
must: [
{
ids: {
values: [`task:${request.params.taskId}`],
},
},
],
},
},
});
} catch (err) {
return err;
}
},
});
server.route({
path: '/api/sample_tasks',
method: 'DELETE',
async handler() {
try {
const { docs: tasks } = await taskManager.fetch({
query: taskManagerQuery,
});
return Promise.all(tasks.map(task => taskManager.remove(task.id)));
let tasksFound = 0;
do {
const { docs: tasks } = await taskManager.fetch({
query: taskManagerQuery,
});
tasksFound = tasks.length;
await Promise.all(tasks.map(task => taskManager.remove(task.id)));
} while (tasksFound > 0);
return 'OK';
} catch (err) {
return err;
}

View file

@ -13,6 +13,13 @@ const {
task: { properties: taskManagerIndexMapping },
} = require('../../../../legacy/plugins/task_manager/server/mappings.json');
const {
DEFAULT_MAX_WORKERS,
DEFAULT_POLL_INTERVAL,
} = require('../../../../plugins/task_manager/server/config.ts');
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
export default function({ getService }) {
const es = getService('legacyEs');
const log = getService('log');
@ -22,11 +29,12 @@ export default function({ getService }) {
const supertest = supertestAsPromised(url.format(config.get('servers.kibana')));
describe('scheduling and running tasks', () => {
beforeEach(() =>
supertest
.delete('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.expect(200)
beforeEach(
async () =>
await supertest
.delete('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.expect(200)
);
beforeEach(async () => {
@ -56,11 +64,19 @@ export default function({ getService }) {
.then(response => response.body);
}
function historyDocs() {
function currentTask(task) {
return supertest
.get(`/api/sample_tasks/task/${task}`)
.send({ task })
.expect(200)
.then(response => response.body.docs[0]);
}
function historyDocs(taskId) {
return es
.search({
index: testHistoryIndex,
q: 'type:task',
q: taskId ? `taskId:${taskId}` : 'type:task',
})
.then(result => result.hits.hits);
}
@ -223,7 +239,7 @@ export default function({ getService }) {
});
await retry.try(async () => {
expect((await historyDocs()).length).to.eql(1);
expect((await historyDocs(originalTask.id)).length).to.eql(1);
const [task] = (await currentTasks()).docs;
expect(task.attempts).to.eql(0);
@ -318,6 +334,81 @@ export default function({ getService }) {
});
});
it('should prioritize tasks which are called using runNow', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
schedule: { interval: `30m` },
params: {},
});
await retry.try(async () => {
const docs = await historyDocs(originalTask.id);
expect(docs.length).to.eql(1);
const task = await currentTask(originalTask.id);
expect(task.state.count).to.eql(1);
// ensure this task shouldnt run for another half hour
expectReschedule(Date.parse(originalTask.runAt), task, 30 * 60000);
});
const taskToBeReleased = await scheduleTask({
taskType: 'sampleTask',
params: { waitForEvent: 'releaseSingleTask' },
});
await retry.try(async () => {
// wait for taskToBeReleased to stall
expect((await historyDocs(taskToBeReleased.id)).length).to.eql(1);
});
// schedule multiple tasks that should force
// Task Manager to use up its worker capacity
// causing tasks to pile up
await Promise.all(
_.times(DEFAULT_MAX_WORKERS + _.random(1, DEFAULT_MAX_WORKERS), () =>
scheduleTask({
taskType: 'sampleTask',
params: {
waitForEvent: 'releaseTheOthers',
},
})
)
);
// we need to ensure that TM has a chance to fill its queue with the stalling tasks
await delay(DEFAULT_POLL_INTERVAL);
// call runNow for our task
const runNowResult = runTaskNow({
id: originalTask.id,
});
// we need to ensure that TM has a chance to push the runNow task into the queue
// before we release the stalled task, so lets give it a chance
await delay(DEFAULT_POLL_INTERVAL);
// and release only one slot in our worker queue
await releaseTasksWaitingForEventToComplete('releaseSingleTask');
expect(await runNowResult).to.eql({ id: originalTask.id });
await retry.try(async () => {
const task = await currentTask(originalTask.id);
expect(task.state.count).to.eql(2);
});
// drain tasks, othrwise they'll keep Task Manager stalled
await retry.try(async () => {
await releaseTasksWaitingForEventToComplete('releaseTheOthers');
const tasks = (await currentTasks()).docs.filter(
task => task.params.originalParams.waitForEvent === 'releaseTheOthers'
);
expect(tasks.length).to.eql(0);
});
});
it('should return a task run error result when running a task now fails', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
@ -329,10 +420,7 @@ export default function({ getService }) {
const docs = await historyDocs();
expect(docs.filter(taskDoc => taskDoc._source.taskId === originalTask.id).length).to.eql(1);
const [task] = (await currentTasks()).docs.filter(
taskDoc => taskDoc.id === originalTask.id
);
const task = await currentTask(originalTask.id);
expect(task.state.count).to.eql(1);
// ensure this task shouldnt run for another half hour
@ -364,9 +452,7 @@ export default function({ getService }) {
(await historyDocs()).filter(taskDoc => taskDoc._source.taskId === originalTask.id).length
).to.eql(2);
const [task] = (await currentTasks()).docs.filter(
taskDoc => taskDoc.id === originalTask.id
);
const task = await currentTask(originalTask.id);
expect(task.attempts).to.eql(1);
});
});