[Task Manager] adds more granular polling results to monitoring stats (#87494)
Added the following values to the Polling stats: - **NoAvailableWorkers**: This tells us when a polling cycle resulted in no tasks being claimed due to there being no available workers - **RunningAtCapacity**: This tells us when a polling cycle resulted in tasks being claimed at 100% capacity of the available workers - **Failed**: This tells us when the poller failed to claim
This commit is contained in:
parent
93262ba736
commit
e0db4a3f0b
|
@ -179,9 +179,21 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
|
|||
/* What is the frequency of polling cycle result?
|
||||
Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */
|
||||
"result_frequency_percent_as_number": {
|
||||
/* This tells us that the polling cycle didnt claim any new tasks */
|
||||
"NoTasksClaimed": 94,
|
||||
"RanOutOfCapacity": 0, /* This is a legacy result, we might want to rename - it tells us when a polling cycle resulted in claiming more tasks than we had workers for, butt he name doesn't make much sense outside of the context of the code */
|
||||
"PoolFilled": 6
|
||||
/* This is a legacy result we are renaming in 8.0.0 -
|
||||
it tells us when a polling cycle resulted in claiming more tasks
|
||||
than we had workers for, butt he name doesn't make much sense outside of the context of the code */
|
||||
"RanOutOfCapacity": 0,
|
||||
/* This is a legacy result we are renaming in 8.0.0 -
|
||||
it tells us when a polling cycle resulted in tasks being claimed but less the the available workers */
|
||||
"PoolFilled": 6,
|
||||
/* This tells us when a polling cycle resulted in no tasks being claimed due to there being no available workers */
|
||||
"NoAvailableWorkers": 0,
|
||||
/* This tells us when a polling cycle resulted in tasks being claimed at 100% capacity of the available workers */
|
||||
"RunningAtCapacity": 0,
|
||||
/* This tells us when the poller failed to claim */
|
||||
"Failed": 0
|
||||
}
|
||||
},
|
||||
/* on average, the tasks in this deployment run 1.7s after their scheduled time */
|
||||
|
|
|
@ -8,6 +8,7 @@ import _ from 'lodash';
|
|||
import sinon from 'sinon';
|
||||
import { fillPool } from './fill_pool';
|
||||
import { TaskPoolRunResult } from '../task_pool';
|
||||
import { asOk } from './result_type';
|
||||
|
||||
describe('fillPool', () => {
|
||||
test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => {
|
||||
|
@ -16,7 +17,7 @@ describe('fillPool', () => {
|
|||
[4, 5],
|
||||
];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
|
||||
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
|
||||
const converter = _.identity;
|
||||
|
||||
|
@ -31,7 +32,7 @@ describe('fillPool', () => {
|
|||
[4, 5],
|
||||
];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
|
||||
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
|
||||
const converter = _.identity;
|
||||
|
||||
|
@ -46,7 +47,7 @@ describe('fillPool', () => {
|
|||
[4, 5],
|
||||
];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
|
||||
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
|
||||
const converter = (x: number) => x.toString();
|
||||
|
||||
|
@ -80,7 +81,7 @@ describe('fillPool', () => {
|
|||
[4, 5],
|
||||
];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
|
||||
|
||||
await fillPool(fetchAvailableTasks, converter, run);
|
||||
} catch (err) {
|
||||
|
@ -95,7 +96,7 @@ describe('fillPool', () => {
|
|||
[4, 5],
|
||||
];
|
||||
let index = 0;
|
||||
const fetchAvailableTasks = async () => tasks[index++] || [];
|
||||
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
|
||||
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
|
||||
const converter = (x: number) => {
|
||||
throw new Error(`can not convert ${x}`);
|
||||
|
|
|
@ -6,15 +6,19 @@
|
|||
|
||||
import { performance } from 'perf_hooks';
|
||||
import { TaskPoolRunResult } from '../task_pool';
|
||||
import { Result, map } from './result_type';
|
||||
|
||||
export enum FillPoolResult {
|
||||
Failed = 'Failed',
|
||||
NoAvailableWorkers = 'NoAvailableWorkers',
|
||||
NoTasksClaimed = 'NoTasksClaimed',
|
||||
RunningAtCapacity = 'RunningAtCapacity',
|
||||
RanOutOfCapacity = 'RanOutOfCapacity',
|
||||
PoolFilled = 'PoolFilled',
|
||||
}
|
||||
|
||||
type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
|
||||
type Fetcher<T> = () => Promise<T[]>;
|
||||
type Fetcher<T, E> = () => Promise<Result<T[], E>>;
|
||||
type Converter<T1, T2> = (t: T1) => T2;
|
||||
|
||||
/**
|
||||
|
@ -30,33 +34,43 @@ type Converter<T1, T2> = (t: T1) => T2;
|
|||
* @param converter - a function that converts task records to the appropriate task runner
|
||||
*/
|
||||
export async function fillPool<TRecord, TRunner>(
|
||||
fetchAvailableTasks: Fetcher<TRecord>,
|
||||
fetchAvailableTasks: Fetcher<TRecord, FillPoolResult>,
|
||||
converter: Converter<TRecord, TRunner>,
|
||||
run: BatchRun<TRunner>
|
||||
): Promise<FillPoolResult> {
|
||||
performance.mark('fillPool.start');
|
||||
const instances = await fetchAvailableTasks();
|
||||
return map<TRecord[], FillPoolResult, Promise<FillPoolResult>>(
|
||||
await fetchAvailableTasks(),
|
||||
async (instances) => {
|
||||
if (!instances.length) {
|
||||
performance.mark('fillPool.bailNoTasks');
|
||||
performance.measure(
|
||||
'fillPool.activityDurationUntilNoTasks',
|
||||
'fillPool.start',
|
||||
'fillPool.bailNoTasks'
|
||||
);
|
||||
return FillPoolResult.NoTasksClaimed;
|
||||
}
|
||||
|
||||
if (!instances.length) {
|
||||
performance.mark('fillPool.bailNoTasks');
|
||||
performance.measure(
|
||||
'fillPool.activityDurationUntilNoTasks',
|
||||
'fillPool.start',
|
||||
'fillPool.bailNoTasks'
|
||||
);
|
||||
return FillPoolResult.NoTasksClaimed;
|
||||
}
|
||||
const tasks = instances.map(converter);
|
||||
const tasks = instances.map(converter);
|
||||
|
||||
if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
|
||||
performance.mark('fillPool.bailExhaustedCapacity');
|
||||
performance.measure(
|
||||
'fillPool.activityDurationUntilExhaustedCapacity',
|
||||
'fillPool.start',
|
||||
'fillPool.bailExhaustedCapacity'
|
||||
);
|
||||
return FillPoolResult.RanOutOfCapacity;
|
||||
}
|
||||
performance.mark('fillPool.cycle');
|
||||
return FillPoolResult.PoolFilled;
|
||||
switch (await run(tasks)) {
|
||||
case TaskPoolRunResult.RanOutOfCapacity:
|
||||
performance.mark('fillPool.bailExhaustedCapacity');
|
||||
performance.measure(
|
||||
'fillPool.activityDurationUntilExhaustedCapacity',
|
||||
'fillPool.start',
|
||||
'fillPool.bailExhaustedCapacity'
|
||||
);
|
||||
return FillPoolResult.RanOutOfCapacity;
|
||||
case TaskPoolRunResult.RunningAtCapacity:
|
||||
performance.mark('fillPool.cycle');
|
||||
return FillPoolResult.RunningAtCapacity;
|
||||
default:
|
||||
performance.mark('fillPool.cycle');
|
||||
return FillPoolResult.PoolFilled;
|
||||
}
|
||||
},
|
||||
async (result) => result
|
||||
);
|
||||
}
|
||||
|
|
|
@ -427,25 +427,95 @@ describe('Task Run Statistics', () => {
|
|||
taskStats.map((taskStat) => taskStat.value.polling.result_frequency_percent_as_number)
|
||||
).toEqual([
|
||||
// NoTasksClaimed
|
||||
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
|
||||
{
|
||||
NoTasksClaimed: 100,
|
||||
RanOutOfCapacity: 0,
|
||||
PoolFilled: 0,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// NoTasksClaimed, NoTasksClaimed,
|
||||
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
|
||||
{
|
||||
NoTasksClaimed: 100,
|
||||
RanOutOfCapacity: 0,
|
||||
PoolFilled: 0,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed
|
||||
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
|
||||
{
|
||||
NoTasksClaimed: 100,
|
||||
RanOutOfCapacity: 0,
|
||||
PoolFilled: 0,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled
|
||||
{ NoTasksClaimed: 75, RanOutOfCapacity: 0, PoolFilled: 25 },
|
||||
{
|
||||
NoTasksClaimed: 75,
|
||||
RanOutOfCapacity: 0,
|
||||
PoolFilled: 25,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled
|
||||
{ NoTasksClaimed: 60, RanOutOfCapacity: 0, PoolFilled: 40 },
|
||||
{
|
||||
NoTasksClaimed: 60,
|
||||
RanOutOfCapacity: 0,
|
||||
PoolFilled: 40,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled
|
||||
{ NoTasksClaimed: 40, RanOutOfCapacity: 0, PoolFilled: 60 },
|
||||
{
|
||||
NoTasksClaimed: 40,
|
||||
RanOutOfCapacity: 0,
|
||||
PoolFilled: 60,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity
|
||||
{ NoTasksClaimed: 20, RanOutOfCapacity: 20, PoolFilled: 60 },
|
||||
{
|
||||
NoTasksClaimed: 20,
|
||||
RanOutOfCapacity: 20,
|
||||
PoolFilled: 60,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity
|
||||
{ NoTasksClaimed: 0, RanOutOfCapacity: 40, PoolFilled: 60 },
|
||||
{
|
||||
NoTasksClaimed: 0,
|
||||
RanOutOfCapacity: 40,
|
||||
PoolFilled: 60,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed
|
||||
{ NoTasksClaimed: 20, RanOutOfCapacity: 40, PoolFilled: 40 },
|
||||
{
|
||||
NoTasksClaimed: 20,
|
||||
RanOutOfCapacity: 40,
|
||||
PoolFilled: 40,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
// PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed, NoTasksClaimed
|
||||
{ NoTasksClaimed: 40, RanOutOfCapacity: 40, PoolFilled: 20 },
|
||||
{
|
||||
NoTasksClaimed: 40,
|
||||
RanOutOfCapacity: 40,
|
||||
PoolFilled: 20,
|
||||
Failed: 0,
|
||||
NoAvailableWorkers: 0,
|
||||
RunningAtCapacity: 0,
|
||||
},
|
||||
]);
|
||||
resolve();
|
||||
} catch (e) {
|
||||
|
|
|
@ -52,8 +52,11 @@ export interface TaskRunStat extends JsonObject {
|
|||
interface FillPoolRawStat extends JsonObject {
|
||||
last_successful_poll: string;
|
||||
result_frequency_percent_as_number: {
|
||||
[FillPoolResult.Failed]: number;
|
||||
[FillPoolResult.NoAvailableWorkers]: number;
|
||||
[FillPoolResult.NoTasksClaimed]: number;
|
||||
[FillPoolResult.RanOutOfCapacity]: number;
|
||||
[FillPoolResult.RunningAtCapacity]: number;
|
||||
[FillPoolResult.PoolFilled]: number;
|
||||
};
|
||||
}
|
||||
|
@ -163,8 +166,11 @@ const DEFAULT_TASK_RUN_FREQUENCIES = {
|
|||
[TaskRunResult.Failed]: 0,
|
||||
};
|
||||
const DEFAULT_POLLING_FREQUENCIES = {
|
||||
[FillPoolResult.Failed]: 0,
|
||||
[FillPoolResult.NoAvailableWorkers]: 0,
|
||||
[FillPoolResult.NoTasksClaimed]: 0,
|
||||
[FillPoolResult.RanOutOfCapacity]: 0,
|
||||
[FillPoolResult.RunningAtCapacity]: 0,
|
||||
[FillPoolResult.PoolFilled]: 0,
|
||||
};
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
|
|||
import { tap } from 'rxjs/operators';
|
||||
import { Logger } from '../../../../src/core/server';
|
||||
|
||||
import { Result, asErr, mapErr } from './lib/result_type';
|
||||
import { Result, asErr, mapErr, asOk } from './lib/result_type';
|
||||
import { ManagedConfiguration } from './lib/create_managed_configuration';
|
||||
import { TaskManagerConfig } from './config';
|
||||
|
||||
|
@ -232,7 +232,7 @@ export async function claimAvailableTasks(
|
|||
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
|
||||
availableWorkers: number,
|
||||
logger: Logger
|
||||
) {
|
||||
): Promise<Result<ClaimOwnershipResult['docs'], FillPoolResult>> {
|
||||
if (availableWorkers > 0) {
|
||||
performance.mark('claimAvailableTasks_start');
|
||||
|
||||
|
@ -260,12 +260,13 @@ export async function claimAvailableTasks(
|
|||
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
|
||||
);
|
||||
}
|
||||
return docs;
|
||||
return asOk(docs);
|
||||
} catch (ex) {
|
||||
if (identifyEsError(ex).includes('cannot execute [inline] scripts')) {
|
||||
logger.warn(
|
||||
`Task Manager cannot operate when inline scripts are disabled in Elasticsearch`
|
||||
);
|
||||
return asErr(FillPoolResult.Failed);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
|
@ -275,6 +276,6 @@ export async function claimAvailableTasks(
|
|||
logger.debug(
|
||||
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.`
|
||||
);
|
||||
return asErr(FillPoolResult.NoAvailableWorkers);
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ describe('TaskPool', () => {
|
|||
]
|
||||
`);
|
||||
|
||||
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
|
||||
expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity);
|
||||
});
|
||||
|
||||
test('should log when running a Task fails', async () => {
|
||||
|
@ -242,7 +242,7 @@ describe('TaskPool', () => {
|
|||
},
|
||||
]);
|
||||
|
||||
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
|
||||
expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity);
|
||||
expect(pool.occupiedWorkers).toEqual(2);
|
||||
expect(pool.availableWorkers).toEqual(0);
|
||||
|
||||
|
|
|
@ -22,7 +22,11 @@ interface Opts {
|
|||
}
|
||||
|
||||
export enum TaskPoolRunResult {
|
||||
// This means we're running all the tasks we claimed
|
||||
RunningAllClaimedTasks = 'RunningAllClaimedTasks',
|
||||
// This means we're running all the tasks we claimed and we're at capacity
|
||||
RunningAtCapacity = 'RunningAtCapacity',
|
||||
// This means we're prematurely out of capacity and have accidentally claimed more tasks than we had capacity for
|
||||
RanOutOfCapacity = 'RanOutOfCapacity',
|
||||
}
|
||||
|
||||
|
@ -123,6 +127,8 @@ export class TaskPool {
|
|||
return this.attemptToRun(leftOverTasks);
|
||||
}
|
||||
return TaskPoolRunResult.RanOutOfCapacity;
|
||||
} else if (!this.availableWorkers) {
|
||||
return TaskPoolRunResult.RunningAtCapacity;
|
||||
}
|
||||
return TaskPoolRunResult.RunningAllClaimedTasks;
|
||||
}
|
||||
|
|
|
@ -178,6 +178,9 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
expect(typeof polling.result_frequency_percent_as_number.NoTasksClaimed).to.eql('number');
|
||||
expect(typeof polling.result_frequency_percent_as_number.RanOutOfCapacity).to.eql('number');
|
||||
expect(typeof polling.result_frequency_percent_as_number.PoolFilled).to.eql('number');
|
||||
expect(typeof polling.result_frequency_percent_as_number.NoAvailableWorkers).to.eql('number');
|
||||
expect(typeof polling.result_frequency_percent_as_number.RunningAtCapacity).to.eql('number');
|
||||
expect(typeof polling.result_frequency_percent_as_number.Failed).to.eql('number');
|
||||
|
||||
expect(typeof drift.p50).to.eql('number');
|
||||
expect(typeof drift.p90).to.eql('number');
|
||||
|
|
Loading…
Reference in a new issue