From f384c484b7039025299df0dfe0ce8a32361e51c6 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Mon, 11 Jan 2021 18:32:24 +0000 Subject: [PATCH] [Task Manager] adds additional polling stats to Task Manager monitoring (#87766) Adds additional polling stats to Task Manager monitoring: - **duration**: Running average of polling duration measuring the time from the scheduled polling cycle start until all claimed tasks are marked as running - **claim_conflicts**: Running average of number of version clashes caused by the markAvailableTasksAsClaimed stage of the polling cycle - **claim_mismatches**: Running average of mismatch between the number of tasks updated by the markAvailableTasksAsClaimed stage of the polling cycle and the number of docs found by the sweepForClaimedTasks stage - **load** - Running average of the percentage of workers in use at the end of each polling cycle. --- .../plugins/task_manager/server/MONITORING.md | 59 ++++++++-- .../task_manager/server/lib/fill_pool.test.ts | 75 +++++++++---- .../task_manager/server/lib/fill_pool.ts | 48 +++++--- .../monitoring/task_run_statistics.test.ts | 38 +++++-- .../server/monitoring/task_run_statistics.ts | 104 ++++++++++++++---- .../server/polling_lifecycle.test.ts | 14 ++- .../task_manager/server/polling_lifecycle.ts | 50 ++++++--- .../task_manager/server/routes/health.test.ts | 4 + .../task_manager/server/task_events.ts | 40 ++++++- .../plugins/task_manager/server/task_pool.ts | 23 +++- .../server/task_running/task_runner.ts | 4 - .../task_manager/server/task_scheduling.ts | 12 +- .../plugins/task_manager/server/task_store.ts | 31 +++--- .../test_suites/task_manager/health_route.ts | 14 ++- 14 files changed, 384 insertions(+), 132 deletions(-) diff --git a/x-pack/plugins/task_manager/server/MONITORING.md b/x-pack/plugins/task_manager/server/MONITORING.md index 3595b8631748..64481e81c60a 100644 --- a/x-pack/plugins/task_manager/server/MONITORING.md +++ b/x-pack/plugins/task_manager/server/MONITORING.md @@ -78,9 +78,10 @@ These are "Cold" stat which are updated at a regular cadence, configured by the #### The Runtime Section The `runtime` tracks Task Manager's performance as it runs, making note of task execution time, _drift_ etc. These include: - - The time it takes a task to run (mean and median, using a configurable running average window, `50` by default) - - The average _drift_ that tasks experience (mean and median, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes. - - The polling rate (the timestamp of the last time a polling cycle completed) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages) + - The time it takes a task to run (p50, p90, p95 & p99, using a configurable running average window, `50` by default) + - The average _drift_ that tasks experience (p50, p90, p95 & p99, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes. + - The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of workers is in use at the end of each polling cycle. + - The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages) - The `Success | Retry | Failure ratio` by task type. This is different than the workload stats which tell you what's in the queue, but ca't keep track of retries and of non recurring tasks as they're wiped off the index when completed. These are "Hot" stats which are updated reactively as Tasks are executed and interacted with. @@ -174,10 +175,34 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g "status": "OK", "value": { "polling": { - /* When was the last polling cycle? */ + /* When was the last polling cycle? */ "last_successful_poll": "2020-10-05T17:57:55.411Z", - /* What is the frequency of polling cycle result? - Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */ + /* Running average of polling duration measuring the time from the scheduled polling cycle + start until all claimed tasks are marked as running */ + "duration": { + "p50": 4, + "p90": 12, + "p95": 12, + "p99": 12 + }, + /* Running average of number of version clashes caused by the markAvailableTasksAsClaimed stage + of the polling cycle */ + "claim_conflicts": { + "p50": 0, + "p90": 0, + "p95": 0, + "p99": 0 + }, + /* Running average of mismatch between the number of tasks updated by the markAvailableTasksAsClaimed stage + of the polling cycle and the number of docs found by the sweepForClaimedTasks stage */ + "claim_mismatches": { + "p50": 0, + "p90": 0, + "p95": 0, + "p99": 0 + }, + /* What is the frequency of polling cycle result? + Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */ "result_frequency_percent_as_number": { /* This tells us that the polling cycle didnt claim any new tasks */ "NoTasksClaimed": 94, @@ -196,14 +221,25 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g "Failed": 0 } }, - /* on average, the tasks in this deployment run 1.7s after their scheduled time */ + /* on average, 50% of the tasks in this deployment run at most 1.7s after their scheduled time */ "drift": { - "mean": 1720, - "median": 2276 + "p50": 1720, + "p90": 2274, + "p95": 2574, + "p99": 3221 + }, + /* on average, 50% of the tasks polling cycles in this deployment result at most in 25% of workers being in use. + We track this in percentages rather than absolute count as max_workers can change over time in response + to changing circumstance. */ + "load": { + "p50": 25, + "p90": 80, + "p95": 100, + "p99": 100 }, "execution": { "duration": { - /* on average, the `endpoint:user-artifact-packager` tasks take 15ms to run */ + /* on average, the `endpoint:user-artifact-packager` tasks take 15ms to run */ "endpoint:user-artifact-packager": { "mean": 15, "median": 14.5 @@ -230,7 +266,8 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g } }, "result_frequency_percent_as_number": { - /* and 100% of `endpoint:user-artifact-packager` have completed in success (within the running average window, so the past 50 runs (by default, configrable by `monitored_stats_running_average_window`) */ + /* and 100% of `endpoint:user-artifact-packager` have completed in success (within the running average window, + so the past 50 runs (by default, configrable by `monitored_stats_running_average_window`) */ "endpoint:user-artifact-packager": { "status": "OK", "Success": 100, diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts index a2c1eb514aeb..effdee78b650 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts @@ -6,24 +6,62 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { fillPool } from './fill_pool'; +import { fillPool, FillPoolResult } from './fill_pool'; import { TaskPoolRunResult } from '../task_pool'; -import { asOk } from './result_type'; +import { asOk, Result } from './result_type'; +import { ClaimOwnershipResult } from '../task_store'; +import { ConcreteTaskInstance, TaskStatus } from '../task'; +import { TaskManagerRunner } from '../task_running/task_runner'; + +jest.mock('../task_running/task_runner'); describe('fillPool', () => { + function mockFetchAvailableTasks( + tasksToMock: number[][] + ): () => Promise> { + const tasks: ConcreteTaskInstance[][] = tasksToMock.map((ids) => mockTaskInstances(ids)); + let index = 0; + return async () => + asOk({ + stats: { + tasksUpdated: tasks[index + 1]?.length ?? 0, + tasksConflicted: 0, + tasksClaimed: 0, + }, + docs: tasks[index++] || [], + }); + } + + const mockTaskInstances = (ids: number[]): ConcreteTaskInstance[] => + ids.map((id) => ({ + id: `${id}`, + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(0), + scheduledAt: new Date(0), + startedAt: new Date(0), + retryAt: new Date(0), + state: { + startedAt: new Date(0), + }, + taskType: '', + params: {}, + ownerId: null, + })); + test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => { const tasks = [ [1, 2, 3], [4, 5], ]; - let index = 0; - const fetchAvailableTasks = async () => asOk(tasks[index++] || []); + const fetchAvailableTasks = mockFetchAvailableTasks(tasks); const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks); const converter = _.identity; await fillPool(fetchAvailableTasks, converter, run); - expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]); + expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3])); }); test('stops filling when the pool has no more capacity', async () => { @@ -31,14 +69,13 @@ describe('fillPool', () => { [1, 2, 3], [4, 5], ]; - let index = 0; - const fetchAvailableTasks = async () => asOk(tasks[index++] || []); + const fetchAvailableTasks = mockFetchAvailableTasks(tasks); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = _.identity; await fillPool(fetchAvailableTasks, converter, run); - expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]); + expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3])); }); test('calls the converter on the records prior to running', async () => { @@ -46,10 +83,10 @@ describe('fillPool', () => { [1, 2, 3], [4, 5], ]; - let index = 0; - const fetchAvailableTasks = async () => asOk(tasks[index++] || []); + const fetchAvailableTasks = mockFetchAvailableTasks(tasks); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); - const converter = (x: number) => x.toString(); + const converter = (instance: ConcreteTaskInstance) => + (instance.id as unknown) as TaskManagerRunner; await fillPool(fetchAvailableTasks, converter, run); @@ -59,7 +96,8 @@ describe('fillPool', () => { describe('error handling', () => { test('throws exception from fetchAvailableTasks', async () => { const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); - const converter = (x: number) => x.toString(); + const converter = (instance: ConcreteTaskInstance) => + (instance.id as unknown) as TaskManagerRunner; try { const fetchAvailableTasks = async () => Promise.reject('fetch is not working'); @@ -73,15 +111,15 @@ describe('fillPool', () => { test('throws exception from run', async () => { const run = sinon.spy(() => Promise.reject('run is not working')); - const converter = (x: number) => x.toString(); + const converter = (instance: ConcreteTaskInstance) => + (instance.id as unknown) as TaskManagerRunner; try { const tasks = [ [1, 2, 3], [4, 5], ]; - let index = 0; - const fetchAvailableTasks = async () => asOk(tasks[index++] || []); + const fetchAvailableTasks = mockFetchAvailableTasks(tasks); await fillPool(fetchAvailableTasks, converter, run); } catch (err) { @@ -95,11 +133,10 @@ describe('fillPool', () => { [1, 2, 3], [4, 5], ]; - let index = 0; - const fetchAvailableTasks = async () => asOk(tasks[index++] || []); + const fetchAvailableTasks = mockFetchAvailableTasks(tasks); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); - const converter = (x: number) => { - throw new Error(`can not convert ${x}`); + const converter = (instance: ConcreteTaskInstance) => { + throw new Error(`can not convert ${instance.id}`); }; await fillPool(fetchAvailableTasks, converter, run); diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.ts index 5ab173755662..c58c074e2255 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.ts @@ -5,7 +5,11 @@ */ import { performance } from 'perf_hooks'; +import { ConcreteTaskInstance } from '../task'; +import { WithTaskTiming, startTaskTimer } from '../task_events'; import { TaskPoolRunResult } from '../task_pool'; +import { TaskManagerRunner } from '../task_running'; +import { ClaimOwnershipResult } from '../task_store'; import { Result, map } from './result_type'; export enum FillPoolResult { @@ -17,9 +21,10 @@ export enum FillPoolResult { PoolFilled = 'PoolFilled', } -type BatchRun = (tasks: T[]) => Promise; -type Fetcher = () => Promise>; -type Converter = (t: T1) => T2; +export type ClaimAndFillPoolResult = Partial> & { + result: FillPoolResult; +}; +export type TimedFillPoolResult = WithTaskTiming; /** * Given a function that runs a batch of tasks (e.g. taskPool.run), a function @@ -33,26 +38,35 @@ type Converter = (t: T1) => T2; * @param fetchAvailableTasks - a function that fetches task records (e.g. store.fetchAvailableTasks) * @param converter - a function that converts task records to the appropriate task runner */ -export async function fillPool( - fetchAvailableTasks: Fetcher, - converter: Converter, - run: BatchRun -): Promise { +export async function fillPool( + fetchAvailableTasks: () => Promise>, + converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner, + run: (tasks: TaskManagerRunner[]) => Promise +): Promise { performance.mark('fillPool.start'); - return map>( + const stopTaskTimer = startTaskTimer(); + const augmentTimingTo = ( + result: FillPoolResult, + stats?: ClaimOwnershipResult['stats'] + ): TimedFillPoolResult => ({ + result, + stats, + timing: stopTaskTimer(), + }); + return map>( await fetchAvailableTasks(), - async (instances) => { - if (!instances.length) { + async ({ docs, stats }) => { + if (!docs.length) { performance.mark('fillPool.bailNoTasks'); performance.measure( 'fillPool.activityDurationUntilNoTasks', 'fillPool.start', 'fillPool.bailNoTasks' ); - return FillPoolResult.NoTasksClaimed; + return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats); } - const tasks = instances.map(converter); + const tasks = docs.map(converter); switch (await run(tasks)) { case TaskPoolRunResult.RanOutOfCapacity: @@ -62,15 +76,15 @@ export async function fillPool( 'fillPool.start', 'fillPool.bailExhaustedCapacity' ); - return FillPoolResult.RanOutOfCapacity; + return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats); case TaskPoolRunResult.RunningAtCapacity: performance.mark('fillPool.cycle'); - return FillPoolResult.RunningAtCapacity; + return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats); default: performance.mark('fillPool.cycle'); - return FillPoolResult.PoolFilled; + return augmentTimingTo(FillPoolResult.PoolFilled, stats); } }, - async (result) => result + async (result) => augmentTimingTo(result) ); } diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index 7d5a8811dbe2..21ea72cbbb00 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -523,16 +523,34 @@ describe('Task Run Statistics', () => { } }); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.PoolFilled))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.RanOutOfCapacity))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.RanOutOfCapacity))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed))); - events$.next(asTaskPollingCycleEvent(asOk(FillPoolResult.NoTasksClaimed))); + const timing = { + start: 0, + stop: 0, + }; + events$.next( + asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing })) + ); + events$.next( + asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing })) + ); + events$.next( + asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing })) + ); + events$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.PoolFilled, timing }))); + events$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.PoolFilled, timing }))); + events$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.PoolFilled, timing }))); + events$.next( + asTaskPollingCycleEvent(asOk({ result: FillPoolResult.RanOutOfCapacity, timing })) + ); + events$.next( + asTaskPollingCycleEvent(asOk({ result: FillPoolResult.RanOutOfCapacity, timing })) + ); + events$.next( + asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing })) + ); + events$.next( + asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing })) + ); }); }); }); diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index c1851789a769..3933443296c4 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -7,7 +7,7 @@ import { combineLatest, Observable } from 'rxjs'; import { filter, startWith, map } from 'rxjs/operators'; import { JsonObject } from 'src/plugins/kibana_utils/common'; -import { mapValues } from 'lodash'; +import { isNumber, mapValues } from 'lodash'; import { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { @@ -17,11 +17,12 @@ import { ErroredTask, RanTask, TaskTiming, + isTaskManagerStatEvent, } from '../task_events'; import { isOk, Ok, unwrap } from '../lib/result_type'; import { ConcreteTaskInstance } from '../task'; import { TaskRunResult } from '../task_running'; -import { FillPoolResult } from '../lib/fill_pool'; +import { FillPoolResult, ClaimAndFillPoolResult } from '../lib/fill_pool'; import { AveragedStat, calculateRunningAverage, @@ -35,6 +36,9 @@ import { TaskExecutionFailureThreshold, TaskManagerConfig } from '../config'; interface FillPoolStat extends JsonObject { last_successful_poll: string; + duration: number[]; + claim_conflicts: number[]; + claim_mismatches: number[]; result_frequency_percent_as_number: FillPoolResult[]; } @@ -45,6 +49,7 @@ interface ExecutionStat extends JsonObject { export interface TaskRunStat extends JsonObject { drift: number[]; + load: number[]; execution: ExecutionStat; polling: FillPoolStat | Omit; } @@ -74,6 +79,7 @@ type ResultFrequencySummary = ResultFrequency & { export interface SummarizedTaskRunStat extends JsonObject { drift: AveragedStat; + load: AveragedStat; execution: { duration: Record; result_frequency_percent_as_number: Record; @@ -86,7 +92,9 @@ export function createTaskRunAggregator( runningAverageWindowSize: number ): AggregatedStatProvider { const taskRunEventToStat = createTaskRunEventToStat(runningAverageWindowSize); - const taskRunEvents$: Observable> = taskPollingLifecycle.events.pipe( + const taskRunEvents$: Observable< + Pick + > = taskPollingLifecycle.events.pipe( filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent) && hasTiming(taskEvent)), map((taskEvent: TaskLifecycleEvent) => { const { task, result }: RanTask | ErroredTask = unwrap((taskEvent as TaskRun).event); @@ -94,21 +102,55 @@ export function createTaskRunAggregator( }) ); + const loadQueue = createRunningAveragedStat(runningAverageWindowSize); + const taskManagerLoadStatEvents$: Observable< + Pick + > = taskPollingLifecycle.events.pipe( + filter( + (taskEvent: TaskLifecycleEvent) => + isTaskManagerStatEvent(taskEvent) && + taskEvent.id === 'load' && + isOk(taskEvent.event) + ), + map((taskEvent: TaskLifecycleEvent) => { + return { + load: loadQueue(((taskEvent.event as unknown) as Ok).value), + }; + }) + ); + const resultFrequencyQueue = createRunningAveragedStat(runningAverageWindowSize); + const pollingDurationQueue = createRunningAveragedStat(runningAverageWindowSize); + const claimConflictsQueue = createRunningAveragedStat(runningAverageWindowSize); + const claimMismatchesQueue = createRunningAveragedStat(runningAverageWindowSize); const taskPollingEvents$: Observable< Pick > = taskPollingLifecycle.events.pipe( filter( (taskEvent: TaskLifecycleEvent) => - isTaskPollingCycleEvent(taskEvent) && isOk(taskEvent.event) + isTaskPollingCycleEvent(taskEvent) && isOk(taskEvent.event) ), map((taskEvent: TaskLifecycleEvent) => { + const { + result, + stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {}, + } = ((taskEvent.event as unknown) as Ok).value; + const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0); return { polling: { last_successful_poll: new Date().toISOString(), - result_frequency_percent_as_number: resultFrequencyQueue( - ((taskEvent.event as unknown) as Ok).value - ), + // Track how long the polling cycle took from begining until all claimed tasks were marked as running + duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(), + // Track how many version conflicts occured during polling + claim_conflicts: isNumber(tasksConflicted) + ? claimConflictsQueue(tasksConflicted) + : claimConflictsQueue(), + // Track how much of a mismatch there is between claimed and updated + claim_mismatches: + isNumber(tasksClaimed) && isNumber(tasksUpdated) + ? claimMismatchesQueue(tasksUpdated - tasksClaimed) + : claimMismatchesQueue(), + result_frequency_percent_as_number: resultFrequencyQueue(result), }, }; }) @@ -118,21 +160,34 @@ export function createTaskRunAggregator( taskRunEvents$.pipe( startWith({ drift: [], execution: { duration: {}, result_frequency_percent_as_number: {} } }) ), + taskManagerLoadStatEvents$.pipe(startWith({ load: [] })), taskPollingEvents$.pipe( startWith({ - polling: { result_frequency_percent_as_number: [] }, + polling: { + duration: [], + claim_conflicts: [], + claim_mismatches: [], + result_frequency_percent_as_number: [], + }, }) ), ]).pipe( - map(([taskRun, polling]: [Omit, Pick]) => { - return { - key: 'runtime', - value: { - ...taskRun, - ...polling, - }, - } as AggregatedStat; - }) + map( + ([taskRun, load, polling]: [ + Pick, + Pick, + Pick + ]) => { + return { + key: 'runtime', + value: { + ...taskRun, + ...load, + ...polling, + }, + } as AggregatedStat; + } + ) ); } @@ -176,9 +231,16 @@ const DEFAULT_POLLING_FREQUENCIES = { export function summarizeTaskRunStat( { - // eslint-disable-next-line @typescript-eslint/naming-convention - polling: { last_successful_poll, result_frequency_percent_as_number: pollingResultFrequency }, + polling: { + // eslint-disable-next-line @typescript-eslint/naming-convention + last_successful_poll, + duration: pollingDuration, + result_frequency_percent_as_number: pollingResultFrequency, + claim_conflicts: claimConflicts, + claim_mismatches: claimMismatches, + }, drift, + load, execution: { duration, result_frequency_percent_as_number: executionResultFrequency }, }: TaskRunStat, config: TaskManagerConfig @@ -187,12 +249,16 @@ export function summarizeTaskRunStat( value: { polling: { ...(last_successful_poll ? { last_successful_poll } : {}), + duration: calculateRunningAverage(pollingDuration as number[]), + claim_conflicts: calculateRunningAverage(claimConflicts as number[]), + claim_mismatches: calculateRunningAverage(claimMismatches as number[]), result_frequency_percent_as_number: { ...DEFAULT_POLLING_FREQUENCIES, ...calculateFrequency(pollingResultFrequency as FillPoolResult[]), }, }, drift: calculateRunningAverage(drift), + load: calculateRunningAverage(load), execution: { duration: mapValues(duration, (typedDurations) => calculateRunningAverage(typedDurations)), result_frequency_percent_as_number: mapValues( diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 0f807976970c..bf3ff6da9fbd 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -120,7 +120,12 @@ describe('TaskPollingLifecycle', () => { describe('claimAvailableTasks', () => { test('should claim Available Tasks when there are available workers', () => { const logger = mockLogger(); - const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 })); + const claim = jest.fn(() => + Promise.resolve({ + docs: [], + stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 }, + }) + ); const availableWorkers = 1; @@ -131,7 +136,12 @@ describe('TaskPollingLifecycle', () => { test('should not claim Available Tasks when there are no available workers', () => { const logger = mockLogger(); - const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 })); + const claim = jest.fn(() => + Promise.resolve({ + docs: [], + stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 }, + }) + ); const availableWorkers = 0; diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 1876c52b0029..a4522f350f74 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -12,7 +12,7 @@ import { Option, some, map as mapOptional } from 'fp-ts/lib/Option'; import { tap } from 'rxjs/operators'; import { Logger } from '../../../../src/core/server'; -import { Result, asErr, mapErr, asOk } from './lib/result_type'; +import { Result, asErr, mapErr, asOk, map } from './lib/result_type'; import { ManagedConfiguration } from './lib/create_managed_configuration'; import { TaskManagerConfig } from './config'; @@ -24,8 +24,9 @@ import { asTaskRunRequestEvent, TaskPollingCycle, asTaskPollingCycleEvent, + TaskManagerStat, } from './task_events'; -import { fillPool, FillPoolResult } from './lib/fill_pool'; +import { fillPool, FillPoolResult, TimedFillPoolResult } from './lib/fill_pool'; import { Middleware } from './lib/middleware'; import { intervalFromNow } from './lib/intervals'; import { ConcreteTaskInstance } from './task'; @@ -56,7 +57,8 @@ export type TaskLifecycleEvent = | TaskRun | TaskClaim | TaskRunRequest - | TaskPollingCycle; + | TaskPollingCycle + | TaskManagerStat; /** * The public interface into the task manager system. @@ -99,8 +101,9 @@ export class TaskPollingLifecycle { this.definitions = definitions; this.store = taskStore; + const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event); // pipe store events into the lifecycle event stream - this.store.events.subscribe((event) => this.events$.next(event)); + this.store.events.subscribe(emitEvent); this.bufferedStore = new BufferedTaskStore(this.store, { bufferMaxOperations: config.max_workers, @@ -111,6 +114,7 @@ export class TaskPollingLifecycle { logger, maxWorkers$: maxWorkersConfiguration$, }); + this.pool.load.subscribe(emitEvent); const { max_poll_inactivity_cycles: maxPollInactivityCycles, @@ -119,10 +123,10 @@ export class TaskPollingLifecycle { // the task poller that polls for work on fixed intervals and on demand const poller$: Observable< - Result> - > = createObservableMonitor>, Error>( + Result> + > = createObservableMonitor>, Error>( () => - createTaskPoller({ + createTaskPoller({ logger, pollInterval$: pollIntervalConfiguration$, bufferCapacity: config.request_capacity, @@ -189,7 +193,7 @@ export class TaskPollingLifecycle { return !this.pollingSubscription.closed; } - private pollForWork = async (...tasksToClaim: string[]): Promise => { + private pollForWork = async (...tasksToClaim: string[]): Promise => { return fillPool( // claim available tasks () => @@ -206,7 +210,9 @@ export class TaskPollingLifecycle { ); }; - private subscribeToPoller(poller$: Observable>>) { + private subscribeToPoller( + poller$: Observable>> + ) { return poller$ .pipe( tap( @@ -221,8 +227,14 @@ export class TaskPollingLifecycle { }) ) ) - .subscribe((event: Result>) => { - this.emitEvent(asTaskPollingCycleEvent(event)); + .subscribe((result: Result>) => { + this.emitEvent( + map( + result, + ({ timing, ...event }) => asTaskPollingCycleEvent(asOk(event), timing), + (event) => asTaskPollingCycleEvent(asErr(event)) + ) + ); }); } } @@ -232,18 +244,22 @@ export async function claimAvailableTasks( claim: (opts: OwnershipClaimingOpts) => Promise, availableWorkers: number, logger: Logger -): Promise> { +): Promise> { if (availableWorkers > 0) { performance.mark('claimAvailableTasks_start'); try { - const { docs, claimedTasks } = await claim({ + const claimResult = await claim({ size: availableWorkers, claimOwnershipUntil: intervalFromNow('30s')!, claimTasksById, }); + const { + docs, + stats: { tasksClaimed }, + } = claimResult; - if (claimedTasks === 0) { + if (tasksClaimed === 0) { performance.mark('claimAvailableTasks.noTasks'); } performance.mark('claimAvailableTasks_stop'); @@ -253,14 +269,14 @@ export async function claimAvailableTasks( 'claimAvailableTasks_stop' ); - if (docs.length !== claimedTasks) { + if (docs.length !== tasksClaimed) { logger.warn( - `[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${ + `[Task Ownership error]: ${tasksClaimed} tasks were claimed by Kibana, but ${ docs.length } task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})` ); } - return asOk(docs); + return asOk(claimResult); } catch (ex) { if (identifyEsError(ex).includes('cannot execute [inline] scripts')) { logger.warn( diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index 5a0cef8eda94..e0b34a3d1df1 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -356,12 +356,16 @@ function mockHealthStats(overrides = {}) { timestamp: new Date().toISOString(), value: { drift: [1000, 60000], + load: [0, 100, 75], execution: { duration: [], result_frequency_percent_as_number: [], }, polling: { last_successful_poll: new Date().toISOString(), + duration: [500, 400, 3000], + claim_conflicts: [0, 100, 75], + claim_mismatches: [0, 100, 75], result_frequency_percent_as_number: [ 'NoTasksClaimed', 'NoTasksClaimed', diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index 0b2ae3023deb..fc09738a149a 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -9,7 +9,7 @@ import { Option } from 'fp-ts/lib/Option'; import { ConcreteTaskInstance } from './task'; import { Result, Err } from './lib/result_type'; -import { FillPoolResult } from './lib/fill_pool'; +import { ClaimAndFillPoolResult } from './lib/fill_pool'; import { PollingError } from './polling'; import { TaskRunResult } from './task_running'; @@ -19,23 +19,25 @@ export enum TaskEventType { TASK_RUN = 'TASK_RUN', TASK_RUN_REQUEST = 'TASK_RUN_REQUEST', TASK_POLLING_CYCLE = 'TASK_POLLING_CYCLE', + TASK_MANAGER_STAT = 'TASK_MANAGER_STAT', } export interface TaskTiming { start: number; stop: number; } +export type WithTaskTiming = T & { timing: TaskTiming }; export function startTaskTimer(): () => TaskTiming { const start = Date.now(); return () => ({ start, stop: Date.now() }); } -export interface TaskEvent { - id?: string; +export interface TaskEvent { + id?: ID; timing?: TaskTiming; type: TaskEventType; - event: Result; + event: Result; } export interface RanTask { task: ConcreteTaskInstance; @@ -49,7 +51,17 @@ export type TaskMarkRunning = TaskEvent; export type TaskRun = TaskEvent; export type TaskClaim = TaskEvent>; export type TaskRunRequest = TaskEvent; -export type TaskPollingCycle = TaskEvent>; +export type TaskPollingCycle = TaskEvent>; + +export type TaskManagerStats = 'load'; +export type TaskManagerStat = TaskEvent; + +export type OkResultOf = EventType extends TaskEvent + ? OkResult + : never; +export type ErrResultOf = EventType extends TaskEvent + ? ErrorResult + : never; export function asTaskMarkRunningEvent( id: string, @@ -105,7 +117,7 @@ export function asTaskRunRequestEvent( } export function asTaskPollingCycleEvent( - event: Result>, + event: Result>, timing?: TaskTiming ): TaskPollingCycle { return { @@ -115,6 +127,17 @@ export function asTaskPollingCycleEvent( }; } +export function asTaskManagerStatEvent( + id: TaskManagerStats, + event: Result +): TaskManagerStat { + return { + id, + type: TaskEventType.TASK_MANAGER_STAT, + event, + }; +} + export function isTaskMarkRunningEvent( taskEvent: TaskEvent ): taskEvent is TaskMarkRunning { @@ -136,3 +159,8 @@ export function isTaskPollingCycleEvent( ): taskEvent is TaskPollingCycle { return taskEvent.type === TaskEventType.TASK_POLLING_CYCLE; } +export function isTaskManagerStatEvent( + taskEvent: TaskEvent +): taskEvent is TaskManagerStat { + return taskEvent.type === TaskEventType.TASK_MANAGER_STAT; +} diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index 6946cd613e0a..561a222310f3 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -8,13 +8,15 @@ * This module contains the logic that ensures we don't run too many * tasks at once in a given Kibana instance. */ -import { Observable } from 'rxjs'; +import { Observable, Subject } from 'rxjs'; import moment, { Duration } from 'moment'; import { performance } from 'perf_hooks'; import { padStart } from 'lodash'; import { Logger } from '../../../../src/core/server'; import { TaskRunner } from './task_running'; import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error'; +import { TaskManagerStat, asTaskManagerStatEvent } from './task_events'; +import { asOk } from './lib/result_type'; interface Opts { maxWorkers$: Observable; @@ -39,6 +41,7 @@ export class TaskPool { private maxWorkers: number = 0; private running = new Set(); private logger: Logger; + private load$ = new Subject(); /** * Creates an instance of TaskPool. @@ -56,6 +59,10 @@ export class TaskPool { }); } + public get load(): Observable { + return this.load$; + } + /** * Gets how many workers are currently in use. */ @@ -64,17 +71,21 @@ export class TaskPool { } /** - * Gets how many workers are currently available. + * Gets % of workers in use */ - public get availableWorkers() { - return this.maxWorkers - this.occupiedWorkers; + public get workerLoad() { + return this.maxWorkers ? Math.round((this.occupiedWorkers * 100) / this.maxWorkers) : 100; } /** * Gets how many workers are currently available. */ - public get hasAvailableWorkers() { - return this.availableWorkers > 0; + public get availableWorkers() { + // emit load whenever we check how many available workers there are + // this should happen less often than the actual changes to the worker queue + // so is lighter than emitting the load every time we add/remove a task from the queue + this.load$.next(asTaskManagerStatEvent('load', asOk(this.workerLoad))); + return this.maxWorkers - this.occupiedWorkers; } /** diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 23d21d205ec2..d281a65da332 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -76,10 +76,6 @@ export enum TaskRunResult { Success = 'Success', // Recurring Task completed successfully SuccessRescheduled = 'Success', - // // Task completed successfully after a retry - // SuccessfulRetry = 'SuccessfulRetry', - // // Recurring Task completed successfully after a retry - // SuccessfulRetryRescheduled = 'SuccessfulRetry', // Task has failed and a retry has been scheduled RetryScheduled = 'RetryScheduled', // Task has failed diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 9806ada386e4..bb4812ae6fce 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -16,6 +16,8 @@ import { isTaskRunRequestEvent, RanTask, ErroredTask, + OkResultOf, + ErrResultOf, } from './task_events'; import { Middleware } from './lib/middleware'; import { @@ -29,7 +31,6 @@ import { import { TaskStore } from './task_store'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle'; -import { FillPoolResult } from './lib/fill_pool'; const VERSION_CONFLICT_STATUS = 409; @@ -125,19 +126,16 @@ export class TaskScheduling { return reject(await this.identifyTaskFailureReason(taskId, error)); }, taskEvent.event); } else { - either< - RanTask | ConcreteTaskInstance | FillPoolResult, - Error | ErroredTask | Option - >( + either, ErrResultOf>( taskEvent.event, - (taskInstance: RanTask | ConcreteTaskInstance | FillPoolResult) => { + (taskInstance: OkResultOf) => { // resolve if the task has run sucessfully if (isTaskRunEvent(taskEvent)) { subscription.unsubscribe(); resolve({ id: (taskInstance as RanTask).task.id }); } }, - async (errorResult: Error | ErroredTask | Option) => { + async (errorResult: ErrResultOf) => { // reject if any error event takes place for the requested task subscription.unsubscribe(); return reject( diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 0d5d2431e227..5d17c6246088 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -98,7 +98,11 @@ export interface FetchResult { } export interface ClaimOwnershipResult { - claimedTasks: number; + stats: { + tasksUpdated: number; + tasksConflicted: number; + tasksClaimed: number; + }; docs: ConcreteTaskInstance[]; } @@ -214,16 +218,13 @@ export class TaskStore { this.serializer.generateRawId(undefined, 'task', id) ); - const numberOfTasksClaimed = await this.markAvailableTasksAsClaimed( - claimOwnershipUntil, - claimTasksByIdWithRawIds, - size - ); + const { + updated: tasksUpdated, + version_conflicts: tasksConflicted, + } = await this.markAvailableTasksAsClaimed(claimOwnershipUntil, claimTasksByIdWithRawIds, size); const docs = - numberOfTasksClaimed > 0 - ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) - : []; + tasksUpdated > 0 ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) : []; const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => claimTasksById.includes(doc.id) @@ -250,7 +251,11 @@ export class TaskStore { ]); return { - claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length, + stats: { + tasksUpdated, + tasksConflicted, + tasksClaimed: documentsClaimedById.length + documentsClaimedBySchedule.length, + }, docs: docs.filter((doc) => doc.status === TaskStatus.Claiming), }; }; @@ -259,7 +264,7 @@ export class TaskStore { claimOwnershipUntil: OwnershipClaimingOpts['claimOwnershipUntil'], claimTasksById: OwnershipClaimingOpts['claimTasksById'], size: OwnershipClaimingOpts['size'] - ): Promise { + ): Promise { const registeredTaskTypes = this.definitions.getAllTypes(); const taskMaxAttempts = [...this.definitions].reduce((accumulator, [type, { maxAttempts }]) => { return { ...accumulator, [type]: maxAttempts || this.maxAttempts }; @@ -282,7 +287,7 @@ export class TaskStore { } const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager'); - const { updated } = await this.updateByQuery( + const result = await this.updateByQuery( asUpdateByQuery({ query: matchesClauses( mustBeAllOf( @@ -309,7 +314,7 @@ export class TaskStore { ); if (apmTrans) apmTrans.end(); - return updated; + return result; } /** diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts index eb8e35fd871f..4c84ca1298e1 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts @@ -33,12 +33,14 @@ interface MonitoringStats { timestamp: string; value: { drift: Record; + load: Record; execution: { duration: Record>; result_frequency_percent_as_number: Record>; }; polling: { last_successful_poll: string; + duration: Record; result_frequency_percent_as_number: Record; }; }; @@ -170,7 +172,7 @@ export default function ({ getService }: FtrProviderContext) { const { runtime: { - value: { drift, polling, execution }, + value: { drift, load, polling, execution }, }, } = (await getHealth()).stats; @@ -182,11 +184,21 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof polling.result_frequency_percent_as_number.RunningAtCapacity).to.eql('number'); expect(typeof polling.result_frequency_percent_as_number.Failed).to.eql('number'); + expect(typeof polling.duration.p50).to.eql('number'); + expect(typeof polling.duration.p90).to.eql('number'); + expect(typeof polling.duration.p95).to.eql('number'); + expect(typeof polling.duration.p99).to.eql('number'); + expect(typeof drift.p50).to.eql('number'); expect(typeof drift.p90).to.eql('number'); expect(typeof drift.p95).to.eql('number'); expect(typeof drift.p99).to.eql('number'); + expect(typeof load.p50).to.eql('number'); + expect(typeof load.p90).to.eql('number'); + expect(typeof load.p95).to.eql('number'); + expect(typeof load.p99).to.eql('number'); + expect(typeof execution.duration.sampleTask.p50).to.eql('number'); expect(typeof execution.duration.sampleTask.p90).to.eql('number'); expect(typeof execution.duration.sampleTask.p95).to.eql('number');