Instrument task manager

This commit is contained in:
Dario Gieselaar 2021-05-03 13:52:54 +02:00
parent cbcc800a6b
commit 9b6bb25b1f
7 changed files with 203 additions and 149 deletions

View file

@ -16,6 +16,8 @@ export interface SpanOptions {
labels?: Record<string, string>;
}
type Span = Exclude<typeof agent.currentSpan, undefined | null>;
export function parseSpanOptions(optionsOrName: SpanOptions | string) {
const options = typeof optionsOrName === 'string' ? { name: optionsOrName } : optionsOrName;
@ -30,7 +32,7 @@ const runInNewContext = <T extends (...args: any[]) => any>(cb: T): ReturnType<T
export async function withSpan<T>(
optionsOrName: SpanOptions | string,
cb: () => Promise<T>
cb: (span?: Span) => Promise<T>
): Promise<T> {
const options = parseSpanOptions(optionsOrName);
@ -71,13 +73,17 @@ export async function withSpan<T>(
span.addLabels(labels);
}
return cb()
return cb(span)
.then((res) => {
span.outcome = 'success';
if (!span.outcome || span.outcome === 'unknown') {
span.outcome = 'success';
}
return res;
})
.catch((err) => {
span.outcome = 'failure';
if (!span.outcome || span.outcome === 'unknown') {
span.outcome = 'failure';
}
throw err;
})
.finally(() => {

View file

@ -7,6 +7,7 @@
import type { PublicMethodsOf } from '@kbn/utility-types';
import { Logger, KibanaRequest } from 'src/core/server';
import { withSpan } from '@kbn/apm-utils';
import { validateParams, validateConfig, validateSecrets } from './validate_with_schema';
import {
ActionTypeExecutorResult,
@ -78,113 +79,135 @@ export class ActionExecutor {
);
}
const {
logger,
spaces,
getServices,
encryptedSavedObjectsClient,
actionTypeRegistry,
eventLogger,
preconfiguredActions,
getActionsClientWithRequest,
} = this.actionExecutorContext!;
const services = getServices(request);
const spaceId = spaces && spaces.getSpaceId(request);
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
const { actionTypeId, name, config, secrets } = await getActionInfo(
await getActionsClientWithRequest(request, source),
encryptedSavedObjectsClient,
preconfiguredActions,
actionId,
namespace.namespace
);
if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) {
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
}
const actionType = actionTypeRegistry.get(actionTypeId);
let validatedParams: Record<string, unknown>;
let validatedConfig: Record<string, unknown>;
let validatedSecrets: Record<string, unknown>;
try {
validatedParams = validateParams(actionType, params);
validatedConfig = validateConfig(actionType, config);
validatedSecrets = validateSecrets(actionType, secrets);
} catch (err) {
return { status: 'error', actionId, message: err.message, retry: false };
}
const actionLabel = `${actionTypeId}:${actionId}: ${name}`;
logger.debug(`executing action ${actionLabel}`);
const event: IEvent = {
event: { action: EVENT_LOG_ACTIONS.execute },
kibana: {
saved_objects: [
{
rel: SAVED_OBJECT_REL_PRIMARY,
type: 'action',
id: actionId,
...namespace,
},
],
return withSpan(
{
name: `execute_action`,
type: 'actions',
labels: {
actionId,
},
},
};
async (span) => {
const {
logger,
spaces,
getServices,
encryptedSavedObjectsClient,
actionTypeRegistry,
eventLogger,
preconfiguredActions,
getActionsClientWithRequest,
} = this.actionExecutorContext!;
eventLogger.startTiming(event);
let rawResult: ActionTypeExecutorResult<unknown>;
try {
rawResult = await actionType.executor({
actionId,
services,
params: validatedParams,
config: validatedConfig,
secrets: validatedSecrets,
});
} catch (err) {
rawResult = {
actionId,
status: 'error',
message: 'an error occurred while running the action executor',
serviceMessage: err.message,
retry: false,
};
}
eventLogger.stopTiming(event);
const services = getServices(request);
const spaceId = spaces && spaces.getSpaceId(request);
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
// allow null-ish return to indicate success
const result = rawResult || {
actionId,
status: 'ok',
};
const { actionTypeId, name, config, secrets } = await getActionInfo(
await getActionsClientWithRequest(request, source),
encryptedSavedObjectsClient,
preconfiguredActions,
actionId,
namespace.namespace
);
event.event = event.event || {};
if (span) {
span.name = `execute_action ${actionTypeId}`;
span.addLabels({
actionTypeId,
});
}
if (result.status === 'ok') {
event.event.outcome = 'success';
event.message = `action executed: ${actionLabel}`;
} else if (result.status === 'error') {
event.event.outcome = 'failure';
event.message = `action execution failure: ${actionLabel}`;
event.error = event.error || {};
event.error.message = actionErrorToMessage(result);
logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`);
} else {
event.event.outcome = 'failure';
event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`;
event.error = event.error || {};
event.error.message = 'action execution returned unexpected result';
logger.warn(
`action execution failure: ${actionLabel}: returned unexpected result "${result.status}"`
);
}
if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) {
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
}
const actionType = actionTypeRegistry.get(actionTypeId);
eventLogger.logEvent(event);
return result;
let validatedParams: Record<string, unknown>;
let validatedConfig: Record<string, unknown>;
let validatedSecrets: Record<string, unknown>;
try {
validatedParams = validateParams(actionType, params);
validatedConfig = validateConfig(actionType, config);
validatedSecrets = validateSecrets(actionType, secrets);
} catch (err) {
span?.setOutcome('failure');
return { status: 'error', actionId, message: err.message, retry: false };
}
const actionLabel = `${actionTypeId}:${actionId}: ${name}`;
logger.debug(`executing action ${actionLabel}`);
const event: IEvent = {
event: { action: EVENT_LOG_ACTIONS.execute },
kibana: {
saved_objects: [
{
rel: SAVED_OBJECT_REL_PRIMARY,
type: 'action',
id: actionId,
...namespace,
},
],
},
};
eventLogger.startTiming(event);
let rawResult: ActionTypeExecutorResult<unknown>;
try {
rawResult = await actionType.executor({
actionId,
services,
params: validatedParams,
config: validatedConfig,
secrets: validatedSecrets,
});
} catch (err) {
rawResult = {
actionId,
status: 'error',
message: 'an error occurred while running the action executor',
serviceMessage: err.message,
retry: false,
};
}
eventLogger.stopTiming(event);
// allow null-ish return to indicate success
const result = rawResult || {
actionId,
status: 'ok',
};
event.event = event.event || {};
if (result.status === 'ok') {
span?.setOutcome('success');
event.event.outcome = 'success';
event.message = `action executed: ${actionLabel}`;
} else if (result.status === 'error') {
span?.setOutcome('failure');
event.event.outcome = 'failure';
event.message = `action execution failure: ${actionLabel}`;
event.error = event.error || {};
event.error.message = actionErrorToMessage(result);
logger.warn(`action execution failure: ${actionLabel}: ${event.error.message}`);
} else {
span?.setOutcome('failure');
event.event.outcome = 'failure';
event.message = `action execution returned unexpected result: ${actionLabel}: "${result.status}"`;
event.error = event.error || {};
event.error.message = 'action execution returned unexpected result';
logger.warn(
`action execution failure: ${actionLabel}: returned unexpected result "${result.status}"`
);
}
eventLogger.logEvent(event);
return result;
}
);
}
}

View file

@ -379,34 +379,41 @@ export class TaskClaiming {
sort.unshift('_score');
}
const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager');
const result = await this.taskStore.updateByQuery(
{
query: matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
),
script: updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
),
sort,
},
{
max_docs: size,
}
const apmTrans = apm.startTransaction(
'markAvailableTasksAsClaimed',
`taskManager markAvailableTasksAsClaimed`
);
if (apmTrans) apmTrans.end();
return result;
try {
const result = await this.taskStore.updateByQuery(
{
query: matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
),
script: updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
),
sort,
},
{
max_docs: size,
}
);
apmTrans?.end('success');
return result;
} catch (err) {
apmTrans?.end('failure');
throw err;
}
}
/**

View file

@ -29,6 +29,9 @@
"status": {
"type": "keyword"
},
"traceparent": {
"type": "text"
},
"params": {
"type": "text"
},

View file

@ -257,6 +257,8 @@ export interface TaskInstance {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
state: Record<string, any>;
traceparent?: string;
/**
* The id of the user who scheduled this task.
*/
@ -364,6 +366,7 @@ export type SerializedConcreteTaskInstance = Omit<
> & {
state: string;
params: string;
traceparent: string;
scheduledAt: string;
startedAt: string | null;
retryAt: string | null;

View file

@ -12,6 +12,7 @@
*/
import apm from 'elastic-apm-node';
import { withSpan } from '@kbn/apm-utils';
import { performance } from 'perf_hooks';
import { identity, defaults, flow } from 'lodash';
import { Logger, SavedObjectsErrorHelpers } from '../../../../../src/core/server';
@ -242,30 +243,40 @@ export class TaskManagerRunner implements TaskRunner {
);
}
this.logger.debug(`Running task ${this}`);
const modifiedContext = await this.beforeRun({
taskInstance: this.instance.task,
const apmTrans = apm.startTransaction(this.taskType, 'taskManager run', {
childOf: this.instance.task.traceparent,
});
const modifiedContext = await withSpan({ name: 'before run', type: 'task manager' }, () =>
this.beforeRun({
taskInstance: this.instance.task,
})
);
const stopTaskTimer = startTaskTimer();
const apmTrans = apm.startTransaction(`taskManager run`, 'taskManager');
apmTrans?.addLabels({
taskType: this.taskType,
});
try {
this.task = this.definition.createTaskRunner(modifiedContext);
const result = await this.task.run();
const result = await withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run());
const validatedResult = this.validateResult(result);
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(validatedResult, stopTaskTimer())
);
if (apmTrans) apmTrans.end('success');
return this.processResult(validatedResult, stopTaskTimer());
return processedResult;
} catch (err) {
this.logger.error(`Task ${this} failed: ${err}`);
// in error scenario, we can not get the RunResult
// re-use modifiedContext's state, which is correct as of beforeRun
if (apmTrans) apmTrans.end('error');
return this.processResult(
asErr({ error: err, state: modifiedContext.taskInstance.state }),
stopTaskTimer()
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(
asErr({ error: err, state: modifiedContext.taskInstance.state }),
stopTaskTimer()
)
);
if (apmTrans) apmTrans.end('failure');
return processedResult;
}
}
@ -285,10 +296,7 @@ export class TaskManagerRunner implements TaskRunner {
}
performance.mark('markTaskAsRunning_start');
const apmTrans = apm.startTransaction(`taskManager markTaskAsRunning`, 'taskManager');
apmTrans?.addLabels({
taskType: this.taskType,
});
const apmTrans = apm.startTransaction('taskManager', 'taskManager markTaskAsRunning');
const now = new Date();
try {

View file

@ -10,6 +10,7 @@ import { filter } from 'rxjs/operators';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option';
import agent from 'elastic-apm-node';
import { Logger } from '../../../../src/core/server';
import { asOk, either, map, mapErr, promiseResult } from './lib/result_type';
import {
@ -85,7 +86,10 @@ export class TaskScheduling {
...options,
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
return await this.store.schedule(modifiedTask);
return await this.store.schedule({
...modifiedTask,
traceparent: agent.currentTraceparent ?? '',
});
}
/**