[Task Manager] adds additional polling stats to Task Manager monitoring (#87766)

Adds additional polling stats to Task Manager monitoring:

- **duration**: Running average of polling duration measuring the time from the scheduled polling cycle start until all claimed tasks are marked as running
- **claim_conflicts**: Running average of number of version clashes caused by the markAvailableTasksAsClaimed stage of the polling cycle
- **claim_mismatches**: Running average of mismatch between the number of tasks updated by the markAvailableTasksAsClaimed stage of the polling cycle and the number of docs found by the sweepForClaimedTasks stage
- **load** - Running average of the percentage of workers in use at the end of each polling cycle.
This commit is contained in:
Gidi Meir Morris 2021-01-11 18:32:24 +00:00 committed by GitHub
parent d4b3ea9c3d
commit f384c484b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 384 additions and 132 deletions

View file

@ -78,9 +78,10 @@ These are "Cold" stat which are updated at a regular cadence, configured by the
#### The Runtime Section
The `runtime` tracks Task Manager's performance as it runs, making note of task execution time, _drift_ etc.
These include:
- The time it takes a task to run (mean and median, using a configurable running average window, `50` by default)
- The average _drift_ that tasks experience (mean and median, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes.
- The polling rate (the timestamp of the last time a polling cycle completed) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The time it takes a task to run (p50, p90, p95 & p99, using a configurable running average window, `50` by default)
- The average _drift_ that tasks experience (p50, p90, p95 & p99, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes.
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of workers is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The `Success | Retry | Failure ratio` by task type. This is different than the workload stats which tell you what's in the queue, but ca't keep track of retries and of non recurring tasks as they're wiped off the index when completed.
These are "Hot" stats which are updated reactively as Tasks are executed and interacted with.
@ -174,10 +175,34 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
"status": "OK",
"value": {
"polling": {
/* When was the last polling cycle? */
/* When was the last polling cycle? */
"last_successful_poll": "2020-10-05T17:57:55.411Z",
/* What is the frequency of polling cycle result?
Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */
/* Running average of polling duration measuring the time from the scheduled polling cycle
start until all claimed tasks are marked as running */
"duration": {
"p50": 4,
"p90": 12,
"p95": 12,
"p99": 12
},
/* Running average of number of version clashes caused by the markAvailableTasksAsClaimed stage
of the polling cycle */
"claim_conflicts": {
"p50": 0,
"p90": 0,
"p95": 0,
"p99": 0
},
/* Running average of mismatch between the number of tasks updated by the markAvailableTasksAsClaimed stage
of the polling cycle and the number of docs found by the sweepForClaimedTasks stage */
"claim_mismatches": {
"p50": 0,
"p90": 0,
"p95": 0,
"p99": 0
},
/* What is the frequency of polling cycle result?
Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */
"result_frequency_percent_as_number": {
/* This tells us that the polling cycle didnt claim any new tasks */
"NoTasksClaimed": 94,
@ -196,14 +221,25 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
"Failed": 0
}
},
/* on average, the tasks in this deployment run 1.7s after their scheduled time */
/* on average, 50% of the tasks in this deployment run at most 1.7s after their scheduled time */
"drift": {
"mean": 1720,
"median": 2276
"p50": 1720,
"p90": 2274,
"p95": 2574,
"p99": 3221
},
/* on average, 50% of the tasks polling cycles in this deployment result at most in 25% of workers being in use.
We track this in percentages rather than absolute count as max_workers can change over time in response
to changing circumstance. */
"load": {
"p50": 25,
"p90": 80,
"p95": 100,
"p99": 100
},
"execution": {
"duration": {
/* on average, the `endpoint:user-artifact-packager` tasks take 15ms to run */
/* on average, the `endpoint:user-artifact-packager` tasks take 15ms to run */
"endpoint:user-artifact-packager": {
"mean": 15,
"median": 14.5
@ -230,7 +266,8 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
}
},
"result_frequency_percent_as_number": {
/* and 100% of `endpoint:user-artifact-packager` have completed in success (within the running average window, so the past 50 runs (by default, configrable by `monitored_stats_running_average_window`) */
/* and 100% of `endpoint:user-artifact-packager` have completed in success (within the running average window,
so the past 50 runs (by default, configrable by `monitored_stats_running_average_window`) */
"endpoint:user-artifact-packager": {
"status": "OK",
"Success": 100,

View file

@ -6,24 +6,62 @@
import _ from 'lodash';
import sinon from 'sinon';
import { fillPool } from './fill_pool';
import { fillPool, FillPoolResult } from './fill_pool';
import { TaskPoolRunResult } from '../task_pool';
import { asOk } from './result_type';
import { asOk, Result } from './result_type';
import { ClaimOwnershipResult } from '../task_store';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import { TaskManagerRunner } from '../task_running/task_runner';
jest.mock('../task_running/task_runner');
describe('fillPool', () => {
function mockFetchAvailableTasks(
tasksToMock: number[][]
): () => Promise<Result<ClaimOwnershipResult, FillPoolResult>> {
const tasks: ConcreteTaskInstance[][] = tasksToMock.map((ids) => mockTaskInstances(ids));
let index = 0;
return async () =>
asOk({
stats: {
tasksUpdated: tasks[index + 1]?.length ?? 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
docs: tasks[index++] || [],
});
}
const mockTaskInstances = (ids: number[]): ConcreteTaskInstance[] =>
ids.map((id) => ({
id: `${id}`,
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(0),
scheduledAt: new Date(0),
startedAt: new Date(0),
retryAt: new Date(0),
state: {
startedAt: new Date(0),
},
taskType: '',
params: {},
ownerId: null,
}));
test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;
await fillPool(fetchAvailableTasks, converter, run);
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]);
expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3]));
});
test('stops filling when the pool has no more capacity', async () => {
@ -31,14 +69,13 @@ describe('fillPool', () => {
[1, 2, 3],
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = _.identity;
await fillPool(fetchAvailableTasks, converter, run);
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]);
expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3]));
});
test('calls the converter on the records prior to running', async () => {
@ -46,10 +83,10 @@ describe('fillPool', () => {
[1, 2, 3],
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();
const converter = (instance: ConcreteTaskInstance) =>
(instance.id as unknown) as TaskManagerRunner;
await fillPool(fetchAvailableTasks, converter, run);
@ -59,7 +96,8 @@ describe('fillPool', () => {
describe('error handling', () => {
test('throws exception from fetchAvailableTasks', async () => {
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();
const converter = (instance: ConcreteTaskInstance) =>
(instance.id as unknown) as TaskManagerRunner;
try {
const fetchAvailableTasks = async () => Promise.reject('fetch is not working');
@ -73,15 +111,15 @@ describe('fillPool', () => {
test('throws exception from run', async () => {
const run = sinon.spy(() => Promise.reject('run is not working'));
const converter = (x: number) => x.toString();
const converter = (instance: ConcreteTaskInstance) =>
(instance.id as unknown) as TaskManagerRunner;
try {
const tasks = [
[1, 2, 3],
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
@ -95,11 +133,10 @@ describe('fillPool', () => {
[1, 2, 3],
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => {
throw new Error(`can not convert ${x}`);
const converter = (instance: ConcreteTaskInstance) => {
throw new Error(`can not convert ${instance.id}`);
};
await fillPool(fetchAvailableTasks, converter, run);

View file

@ -5,7 +5,11 @@
*/
import { performance } from 'perf_hooks';
import { ConcreteTaskInstance } from '../task';
import { WithTaskTiming, startTaskTimer } from '../task_events';
import { TaskPoolRunResult } from '../task_pool';
import { TaskManagerRunner } from '../task_running';
import { ClaimOwnershipResult } from '../task_store';
import { Result, map } from './result_type';
export enum FillPoolResult {
@ -17,9 +21,10 @@ export enum FillPoolResult {
PoolFilled = 'PoolFilled',
}
type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
type Fetcher<T, E> = () => Promise<Result<T[], E>>;
type Converter<T1, T2> = (t: T1) => T2;
export type ClaimAndFillPoolResult = Partial<Pick<ClaimOwnershipResult, 'stats'>> & {
result: FillPoolResult;
};
export type TimedFillPoolResult = WithTaskTiming<ClaimAndFillPoolResult>;
/**
* Given a function that runs a batch of tasks (e.g. taskPool.run), a function
@ -33,26 +38,35 @@ type Converter<T1, T2> = (t: T1) => T2;
* @param fetchAvailableTasks - a function that fetches task records (e.g. store.fetchAvailableTasks)
* @param converter - a function that converts task records to the appropriate task runner
*/
export async function fillPool<TRecord, TRunner>(
fetchAvailableTasks: Fetcher<TRecord, FillPoolResult>,
converter: Converter<TRecord, TRunner>,
run: BatchRun<TRunner>
): Promise<FillPoolResult> {
export async function fillPool(
fetchAvailableTasks: () => Promise<Result<ClaimOwnershipResult, FillPoolResult>>,
converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner,
run: (tasks: TaskManagerRunner[]) => Promise<TaskPoolRunResult>
): Promise<TimedFillPoolResult> {
performance.mark('fillPool.start');
return map<TRecord[], FillPoolResult, Promise<FillPoolResult>>(
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
result: FillPoolResult,
stats?: ClaimOwnershipResult['stats']
): TimedFillPoolResult => ({
result,
stats,
timing: stopTaskTimer(),
});
return map<ClaimOwnershipResult, FillPoolResult, Promise<TimedFillPoolResult>>(
await fetchAvailableTasks(),
async (instances) => {
if (!instances.length) {
async ({ docs, stats }) => {
if (!docs.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats);
}
const tasks = instances.map(converter);
const tasks = docs.map(converter);
switch (await run(tasks)) {
case TaskPoolRunResult.RanOutOfCapacity:
@ -62,15 +76,15 @@ export async function fillPool<TRecord, TRunner>(
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats);
case TaskPoolRunResult.RunningAtCapacity:
performance.mark('fillPool.cycle');
return FillPoolResult.RunningAtCapacity;
return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats);
default:
performance.mark('fillPool.cycle');
return FillPoolResult.PoolFilled;
return augmentTimingTo(FillPoolResult.PoolFilled, stats);
}
},
async (result) => result
async (result) => augmentTimingTo(result)
);
}

View file

@ -523,16 +523,34 @@ describe('Task Run Statistics', () => {
}
});
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.RanOutOfCapacity)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.RanOutOfCapacity)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)));
events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed)));
const timing = {
start: 0,
stop: 0,
};
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
events$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.PoolFilled, timing })));
events$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.PoolFilled, timing })));
events$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.PoolFilled, timing })));
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.RanOutOfCapacity, timing }))
);
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.RanOutOfCapacity, timing }))
);
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
});
});
});

View file

@ -7,7 +7,7 @@
import { combineLatest, Observable } from 'rxjs';
import { filter, startWith, map } from 'rxjs/operators';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { mapValues } from 'lodash';
import { isNumber, mapValues } from 'lodash';
import { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import {
@ -17,11 +17,12 @@ import {
ErroredTask,
RanTask,
TaskTiming,
isTaskManagerStatEvent,
} from '../task_events';
import { isOk, Ok, unwrap } from '../lib/result_type';
import { ConcreteTaskInstance } from '../task';
import { TaskRunResult } from '../task_running';
import { FillPoolResult } from '../lib/fill_pool';
import { FillPoolResult, ClaimAndFillPoolResult } from '../lib/fill_pool';
import {
AveragedStat,
calculateRunningAverage,
@ -35,6 +36,9 @@ import { TaskExecutionFailureThreshold, TaskManagerConfig } from '../config';
interface FillPoolStat extends JsonObject {
last_successful_poll: string;
duration: number[];
claim_conflicts: number[];
claim_mismatches: number[];
result_frequency_percent_as_number: FillPoolResult[];
}
@ -45,6 +49,7 @@ interface ExecutionStat extends JsonObject {
export interface TaskRunStat extends JsonObject {
drift: number[];
load: number[];
execution: ExecutionStat;
polling: FillPoolStat | Omit<FillPoolStat, 'last_successful_poll'>;
}
@ -74,6 +79,7 @@ type ResultFrequencySummary = ResultFrequency & {
export interface SummarizedTaskRunStat extends JsonObject {
drift: AveragedStat;
load: AveragedStat;
execution: {
duration: Record<string, AveragedStat>;
result_frequency_percent_as_number: Record<string, ResultFrequencySummary>;
@ -86,7 +92,9 @@ export function createTaskRunAggregator(
runningAverageWindowSize: number
): AggregatedStatProvider<TaskRunStat> {
const taskRunEventToStat = createTaskRunEventToStat(runningAverageWindowSize);
const taskRunEvents$: Observable<Omit<TaskRunStat, 'polling'>> = taskPollingLifecycle.events.pipe(
const taskRunEvents$: Observable<
Pick<TaskRunStat, 'drift' | 'execution'>
> = taskPollingLifecycle.events.pipe(
filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent) && hasTiming(taskEvent)),
map((taskEvent: TaskLifecycleEvent) => {
const { task, result }: RanTask | ErroredTask = unwrap((taskEvent as TaskRun).event);
@ -94,21 +102,55 @@ export function createTaskRunAggregator(
})
);
const loadQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const taskManagerLoadStatEvents$: Observable<
Pick<TaskRunStat, 'load'>
> = taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) &&
taskEvent.id === 'load' &&
isOk<number, never>(taskEvent.event)
),
map((taskEvent: TaskLifecycleEvent) => {
return {
load: loadQueue(((taskEvent.event as unknown) as Ok<number>).value),
};
})
);
const resultFrequencyQueue = createRunningAveragedStat<FillPoolResult>(runningAverageWindowSize);
const pollingDurationQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimConflictsQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimMismatchesQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const taskPollingEvents$: Observable<
Pick<TaskRunStat, 'polling'>
> = taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskPollingCycleEvent(taskEvent) && isOk<FillPoolResult, unknown>(taskEvent.event)
isTaskPollingCycleEvent(taskEvent) && isOk<ClaimAndFillPoolResult, unknown>(taskEvent.event)
),
map((taskEvent: TaskLifecycleEvent) => {
const {
result,
stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {},
} = ((taskEvent.event as unknown) as Ok<ClaimAndFillPoolResult>).value;
const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0);
return {
polling: {
last_successful_poll: new Date().toISOString(),
result_frequency_percent_as_number: resultFrequencyQueue(
((taskEvent.event as unknown) as Ok<FillPoolResult>).value
),
// Track how long the polling cycle took from begining until all claimed tasks were marked as running
duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(),
// Track how many version conflicts occured during polling
claim_conflicts: isNumber(tasksConflicted)
? claimConflictsQueue(tasksConflicted)
: claimConflictsQueue(),
// Track how much of a mismatch there is between claimed and updated
claim_mismatches:
isNumber(tasksClaimed) && isNumber(tasksUpdated)
? claimMismatchesQueue(tasksUpdated - tasksClaimed)
: claimMismatchesQueue(),
result_frequency_percent_as_number: resultFrequencyQueue(result),
},
};
})
@ -118,21 +160,34 @@ export function createTaskRunAggregator(
taskRunEvents$.pipe(
startWith({ drift: [], execution: { duration: {}, result_frequency_percent_as_number: {} } })
),
taskManagerLoadStatEvents$.pipe(startWith({ load: [] })),
taskPollingEvents$.pipe(
startWith({
polling: { result_frequency_percent_as_number: [] },
polling: {
duration: [],
claim_conflicts: [],
claim_mismatches: [],
result_frequency_percent_as_number: [],
},
})
),
]).pipe(
map(([taskRun, polling]: [Omit<TaskRunStat, 'polling'>, Pick<TaskRunStat, 'polling'>]) => {
return {
key: 'runtime',
value: {
...taskRun,
...polling,
},
} as AggregatedStat<TaskRunStat>;
})
map(
([taskRun, load, polling]: [
Pick<TaskRunStat, 'drift' | 'execution'>,
Pick<TaskRunStat, 'load'>,
Pick<TaskRunStat, 'polling'>
]) => {
return {
key: 'runtime',
value: {
...taskRun,
...load,
...polling,
},
} as AggregatedStat<TaskRunStat>;
}
)
);
}
@ -176,9 +231,16 @@ const DEFAULT_POLLING_FREQUENCIES = {
export function summarizeTaskRunStat(
{
// eslint-disable-next-line @typescript-eslint/naming-convention
polling: { last_successful_poll, result_frequency_percent_as_number: pollingResultFrequency },
polling: {
// eslint-disable-next-line @typescript-eslint/naming-convention
last_successful_poll,
duration: pollingDuration,
result_frequency_percent_as_number: pollingResultFrequency,
claim_conflicts: claimConflicts,
claim_mismatches: claimMismatches,
},
drift,
load,
execution: { duration, result_frequency_percent_as_number: executionResultFrequency },
}: TaskRunStat,
config: TaskManagerConfig
@ -187,12 +249,16 @@ export function summarizeTaskRunStat(
value: {
polling: {
...(last_successful_poll ? { last_successful_poll } : {}),
duration: calculateRunningAverage(pollingDuration as number[]),
claim_conflicts: calculateRunningAverage(claimConflicts as number[]),
claim_mismatches: calculateRunningAverage(claimMismatches as number[]),
result_frequency_percent_as_number: {
...DEFAULT_POLLING_FREQUENCIES,
...calculateFrequency<FillPoolResult>(pollingResultFrequency as FillPoolResult[]),
},
},
drift: calculateRunningAverage(drift),
load: calculateRunningAverage(load),
execution: {
duration: mapValues(duration, (typedDurations) => calculateRunningAverage(typedDurations)),
result_frequency_percent_as_number: mapValues(

View file

@ -120,7 +120,12 @@ describe('TaskPollingLifecycle', () => {
describe('claimAvailableTasks', () => {
test('should claim Available Tasks when there are available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));
const claim = jest.fn(() =>
Promise.resolve({
docs: [],
stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 },
})
);
const availableWorkers = 1;
@ -131,7 +136,12 @@ describe('TaskPollingLifecycle', () => {
test('should not claim Available Tasks when there are no available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));
const claim = jest.fn(() =>
Promise.resolve({
docs: [],
stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 },
})
);
const availableWorkers = 0;

View file

@ -12,7 +12,7 @@ import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { tap } from 'rxjs/operators';
import { Logger } from '../../../../src/core/server';
import { Result, asErr, mapErr, asOk } from './lib/result_type';
import { Result, asErr, mapErr, asOk, map } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig } from './config';
@ -24,8 +24,9 @@ import {
asTaskRunRequestEvent,
TaskPollingCycle,
asTaskPollingCycleEvent,
TaskManagerStat,
} from './task_events';
import { fillPool, FillPoolResult } from './lib/fill_pool';
import { fillPool, FillPoolResult, TimedFillPoolResult } from './lib/fill_pool';
import { Middleware } from './lib/middleware';
import { intervalFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
@ -56,7 +57,8 @@ export type TaskLifecycleEvent =
| TaskRun
| TaskClaim
| TaskRunRequest
| TaskPollingCycle;
| TaskPollingCycle
| TaskManagerStat;
/**
* The public interface into the task manager system.
@ -99,8 +101,9 @@ export class TaskPollingLifecycle {
this.definitions = definitions;
this.store = taskStore;
const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event);
// pipe store events into the lifecycle event stream
this.store.events.subscribe((event) => this.events$.next(event));
this.store.events.subscribe(emitEvent);
this.bufferedStore = new BufferedTaskStore(this.store, {
bufferMaxOperations: config.max_workers,
@ -111,6 +114,7 @@ export class TaskPollingLifecycle {
logger,
maxWorkers$: maxWorkersConfiguration$,
});
this.pool.load.subscribe(emitEvent);
const {
max_poll_inactivity_cycles: maxPollInactivityCycles,
@ -119,10 +123,10 @@ export class TaskPollingLifecycle {
// the task poller that polls for work on fixed intervals and on demand
const poller$: Observable<
Result<FillPoolResult, PollingError<string>>
> = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
Result<TimedFillPoolResult, PollingError<string>>
> = createObservableMonitor<Result<TimedFillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, FillPoolResult>({
createTaskPoller<string, TimedFillPoolResult>({
logger,
pollInterval$: pollIntervalConfiguration$,
bufferCapacity: config.request_capacity,
@ -189,7 +193,7 @@ export class TaskPollingLifecycle {
return !this.pollingSubscription.closed;
}
private pollForWork = async (...tasksToClaim: string[]): Promise<FillPoolResult> => {
private pollForWork = async (...tasksToClaim: string[]): Promise<TimedFillPoolResult> => {
return fillPool(
// claim available tasks
() =>
@ -206,7 +210,9 @@ export class TaskPollingLifecycle {
);
};
private subscribeToPoller(poller$: Observable<Result<FillPoolResult, PollingError<string>>>) {
private subscribeToPoller(
poller$: Observable<Result<TimedFillPoolResult, PollingError<string>>>
) {
return poller$
.pipe(
tap(
@ -221,8 +227,14 @@ export class TaskPollingLifecycle {
})
)
)
.subscribe((event: Result<FillPoolResult, PollingError<string>>) => {
this.emitEvent(asTaskPollingCycleEvent<string>(event));
.subscribe((result: Result<TimedFillPoolResult, PollingError<string>>) => {
this.emitEvent(
map(
result,
({ timing, ...event }) => asTaskPollingCycleEvent<string>(asOk(event), timing),
(event) => asTaskPollingCycleEvent<string>(asErr(event))
)
);
});
}
}
@ -232,18 +244,22 @@ export async function claimAvailableTasks(
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
): Promise<Result<ClaimOwnershipResult['docs'], FillPoolResult>> {
): Promise<Result<ClaimOwnershipResult, FillPoolResult>> {
if (availableWorkers > 0) {
performance.mark('claimAvailableTasks_start');
try {
const { docs, claimedTasks } = await claim({
const claimResult = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
});
const {
docs,
stats: { tasksClaimed },
} = claimResult;
if (claimedTasks === 0) {
if (tasksClaimed === 0) {
performance.mark('claimAvailableTasks.noTasks');
}
performance.mark('claimAvailableTasks_stop');
@ -253,14 +269,14 @@ export async function claimAvailableTasks(
'claimAvailableTasks_stop'
);
if (docs.length !== claimedTasks) {
if (docs.length !== tasksClaimed) {
logger.warn(
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
`[Task Ownership error]: ${tasksClaimed} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return asOk(docs);
return asOk(claimResult);
} catch (ex) {
if (identifyEsError(ex).includes('cannot execute [inline] scripts')) {
logger.warn(

View file

@ -356,12 +356,16 @@ function mockHealthStats(overrides = {}) {
timestamp: new Date().toISOString(),
value: {
drift: [1000, 60000],
load: [0, 100, 75],
execution: {
duration: [],
result_frequency_percent_as_number: [],
},
polling: {
last_successful_poll: new Date().toISOString(),
duration: [500, 400, 3000],
claim_conflicts: [0, 100, 75],
claim_mismatches: [0, 100, 75],
result_frequency_percent_as_number: [
'NoTasksClaimed',
'NoTasksClaimed',

View file

@ -9,7 +9,7 @@ import { Option } from 'fp-ts/lib/Option';
import { ConcreteTaskInstance } from './task';
import { Result, Err } from './lib/result_type';
import { FillPoolResult } from './lib/fill_pool';
import { ClaimAndFillPoolResult } from './lib/fill_pool';
import { PollingError } from './polling';
import { TaskRunResult } from './task_running';
@ -19,23 +19,25 @@ export enum TaskEventType {
TASK_RUN = 'TASK_RUN',
TASK_RUN_REQUEST = 'TASK_RUN_REQUEST',
TASK_POLLING_CYCLE = 'TASK_POLLING_CYCLE',
TASK_MANAGER_STAT = 'TASK_MANAGER_STAT',
}
export interface TaskTiming {
start: number;
stop: number;
}
export type WithTaskTiming<T> = T & { timing: TaskTiming };
export function startTaskTimer(): () => TaskTiming {
const start = Date.now();
return () => ({ start, stop: Date.now() });
}
export interface TaskEvent<T, E> {
id?: string;
export interface TaskEvent<OkResult, ErrorResult, ID = string> {
id?: ID;
timing?: TaskTiming;
type: TaskEventType;
event: Result<T, E>;
event: Result<OkResult, ErrorResult>;
}
export interface RanTask {
task: ConcreteTaskInstance;
@ -49,7 +51,17 @@ export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<RanTask, ErroredTask>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Option<ConcreteTaskInstance>>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskPollingCycle<T = string> = TaskEvent<FillPoolResult, PollingError<T>>;
export type TaskPollingCycle<T = string> = TaskEvent<ClaimAndFillPoolResult, PollingError<T>>;
export type TaskManagerStats = 'load';
export type TaskManagerStat = TaskEvent<number, never, TaskManagerStats>;
export type OkResultOf<EventType> = EventType extends TaskEvent<infer OkResult, infer ErrorResult>
? OkResult
: never;
export type ErrResultOf<EventType> = EventType extends TaskEvent<infer OkResult, infer ErrorResult>
? ErrorResult
: never;
export function asTaskMarkRunningEvent(
id: string,
@ -105,7 +117,7 @@ export function asTaskRunRequestEvent(
}
export function asTaskPollingCycleEvent<T = string>(
event: Result<FillPoolResult, PollingError<T>>,
event: Result<ClaimAndFillPoolResult, PollingError<T>>,
timing?: TaskTiming
): TaskPollingCycle<T> {
return {
@ -115,6 +127,17 @@ export function asTaskPollingCycleEvent<T = string>(
};
}
export function asTaskManagerStatEvent(
id: TaskManagerStats,
event: Result<number, never>
): TaskManagerStat {
return {
id,
type: TaskEventType.TASK_MANAGER_STAT,
event,
};
}
export function isTaskMarkRunningEvent(
taskEvent: TaskEvent<unknown, unknown>
): taskEvent is TaskMarkRunning {
@ -136,3 +159,8 @@ export function isTaskPollingCycleEvent<T = string>(
): taskEvent is TaskPollingCycle<T> {
return taskEvent.type === TaskEventType.TASK_POLLING_CYCLE;
}
export function isTaskManagerStatEvent(
taskEvent: TaskEvent<unknown, unknown>
): taskEvent is TaskManagerStat {
return taskEvent.type === TaskEventType.TASK_MANAGER_STAT;
}

View file

@ -8,13 +8,15 @@
* This module contains the logic that ensures we don't run too many
* tasks at once in a given Kibana instance.
*/
import { Observable } from 'rxjs';
import { Observable, Subject } from 'rxjs';
import moment, { Duration } from 'moment';
import { performance } from 'perf_hooks';
import { padStart } from 'lodash';
import { Logger } from '../../../../src/core/server';
import { TaskRunner } from './task_running';
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';
import { TaskManagerStat, asTaskManagerStatEvent } from './task_events';
import { asOk } from './lib/result_type';
interface Opts {
maxWorkers$: Observable<number>;
@ -39,6 +41,7 @@ export class TaskPool {
private maxWorkers: number = 0;
private running = new Set<TaskRunner>();
private logger: Logger;
private load$ = new Subject<TaskManagerStat>();
/**
* Creates an instance of TaskPool.
@ -56,6 +59,10 @@ export class TaskPool {
});
}
public get load(): Observable<TaskManagerStat> {
return this.load$;
}
/**
* Gets how many workers are currently in use.
*/
@ -64,17 +71,21 @@ export class TaskPool {
}
/**
* Gets how many workers are currently available.
* Gets % of workers in use
*/
public get availableWorkers() {
return this.maxWorkers - this.occupiedWorkers;
public get workerLoad() {
return this.maxWorkers ? Math.round((this.occupiedWorkers * 100) / this.maxWorkers) : 100;
}
/**
* Gets how many workers are currently available.
*/
public get hasAvailableWorkers() {
return this.availableWorkers > 0;
public get availableWorkers() {
// emit load whenever we check how many available workers there are
// this should happen less often than the actual changes to the worker queue
// so is lighter than emitting the load every time we add/remove a task from the queue
this.load$.next(asTaskManagerStatEvent('load', asOk(this.workerLoad)));
return this.maxWorkers - this.occupiedWorkers;
}
/**

View file

@ -76,10 +76,6 @@ export enum TaskRunResult {
Success = 'Success',
// Recurring Task completed successfully
SuccessRescheduled = 'Success',
// // Task completed successfully after a retry
// SuccessfulRetry = 'SuccessfulRetry',
// // Recurring Task completed successfully after a retry
// SuccessfulRetryRescheduled = 'SuccessfulRetry',
// Task has failed and a retry has been scheduled
RetryScheduled = 'RetryScheduled',
// Task has failed

View file

@ -16,6 +16,8 @@ import {
isTaskRunRequestEvent,
RanTask,
ErroredTask,
OkResultOf,
ErrResultOf,
} from './task_events';
import { Middleware } from './lib/middleware';
import {
@ -29,7 +31,6 @@ import {
import { TaskStore } from './task_store';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle';
import { FillPoolResult } from './lib/fill_pool';
const VERSION_CONFLICT_STATUS = 409;
@ -125,19 +126,16 @@ export class TaskScheduling {
return reject(await this.identifyTaskFailureReason(taskId, error));
}, taskEvent.event);
} else {
either<
RanTask | ConcreteTaskInstance | FillPoolResult,
Error | ErroredTask | Option<ConcreteTaskInstance>
>(
either<OkResultOf<TaskLifecycleEvent>, ErrResultOf<TaskLifecycleEvent>>(
taskEvent.event,
(taskInstance: RanTask | ConcreteTaskInstance | FillPoolResult) => {
(taskInstance: OkResultOf<TaskLifecycleEvent>) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: (taskInstance as RanTask).task.id });
}
},
async (errorResult: Error | ErroredTask | Option<ConcreteTaskInstance>) => {
async (errorResult: ErrResultOf<TaskLifecycleEvent>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
return reject(

View file

@ -98,7 +98,11 @@ export interface FetchResult {
}
export interface ClaimOwnershipResult {
claimedTasks: number;
stats: {
tasksUpdated: number;
tasksConflicted: number;
tasksClaimed: number;
};
docs: ConcreteTaskInstance[];
}
@ -214,16 +218,13 @@ export class TaskStore {
this.serializer.generateRawId(undefined, 'task', id)
);
const numberOfTasksClaimed = await this.markAvailableTasksAsClaimed(
claimOwnershipUntil,
claimTasksByIdWithRawIds,
size
);
const {
updated: tasksUpdated,
version_conflicts: tasksConflicted,
} = await this.markAvailableTasksAsClaimed(claimOwnershipUntil, claimTasksByIdWithRawIds, size);
const docs =
numberOfTasksClaimed > 0
? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size)
: [];
tasksUpdated > 0 ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) : [];
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
claimTasksById.includes(doc.id)
@ -250,7 +251,11 @@ export class TaskStore {
]);
return {
claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length,
stats: {
tasksUpdated,
tasksConflicted,
tasksClaimed: documentsClaimedById.length + documentsClaimedBySchedule.length,
},
docs: docs.filter((doc) => doc.status === TaskStatus.Claiming),
};
};
@ -259,7 +264,7 @@ export class TaskStore {
claimOwnershipUntil: OwnershipClaimingOpts['claimOwnershipUntil'],
claimTasksById: OwnershipClaimingOpts['claimTasksById'],
size: OwnershipClaimingOpts['size']
): Promise<number> {
): Promise<UpdateByQueryResult> {
const registeredTaskTypes = this.definitions.getAllTypes();
const taskMaxAttempts = [...this.definitions].reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts || this.maxAttempts };
@ -282,7 +287,7 @@ export class TaskStore {
}
const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager');
const { updated } = await this.updateByQuery(
const result = await this.updateByQuery(
asUpdateByQuery({
query: matchesClauses(
mustBeAllOf(
@ -309,7 +314,7 @@ export class TaskStore {
);
if (apmTrans) apmTrans.end();
return updated;
return result;
}
/**

View file

@ -33,12 +33,14 @@ interface MonitoringStats {
timestamp: string;
value: {
drift: Record<string, object>;
load: Record<string, object>;
execution: {
duration: Record<string, Record<string, object>>;
result_frequency_percent_as_number: Record<string, Record<string, object>>;
};
polling: {
last_successful_poll: string;
duration: Record<string, object>;
result_frequency_percent_as_number: Record<string, number>;
};
};
@ -170,7 +172,7 @@ export default function ({ getService }: FtrProviderContext) {
const {
runtime: {
value: { drift, polling, execution },
value: { drift, load, polling, execution },
},
} = (await getHealth()).stats;
@ -182,11 +184,21 @@ export default function ({ getService }: FtrProviderContext) {
expect(typeof polling.result_frequency_percent_as_number.RunningAtCapacity).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.Failed).to.eql('number');
expect(typeof polling.duration.p50).to.eql('number');
expect(typeof polling.duration.p90).to.eql('number');
expect(typeof polling.duration.p95).to.eql('number');
expect(typeof polling.duration.p99).to.eql('number');
expect(typeof drift.p50).to.eql('number');
expect(typeof drift.p90).to.eql('number');
expect(typeof drift.p95).to.eql('number');
expect(typeof drift.p99).to.eql('number');
expect(typeof load.p50).to.eql('number');
expect(typeof load.p90).to.eql('number');
expect(typeof load.p95).to.eql('number');
expect(typeof load.p99).to.eql('number');
expect(typeof execution.duration.sampleTask.p50).to.eql('number');
expect(typeof execution.duration.sampleTask.p90).to.eql('number');
expect(typeof execution.duration.sampleTask.p95).to.eql('number');