[ML] Anomaly detection rule lookback interval improvements (#97370)

* [ML] add advanced settings

* [ML] default advanced settings

* [ML] advanced settings validators

* [ML] range control for top n buckets

* [ML] execute rule with a new query for most recent anomalies

* [ML] find most anomalous bucket from the top N

* Revert "[ML] range control for top n buckets"

This reverts commit e039f250

* [ML] validate check interval against the lookback interval

* [ML] update descriptions

* [ML] fix test subjects

* [ML] update warning message

* [ML] add functional tests

* [ML] adjust unit tests, mark getLookbackInterval

* [ML] update lookback interval description and warning message

* [ML] update fetchResult tsDoc

* [ML] cleanup

* [ML] fix imports to reduce bundle size

* [ML] round up lookback interval

* [ML] update functional test assertion

* [ML] async import for validator
This commit is contained in:
Dima Arnautov 2021-04-20 10:52:54 +02:00 committed by GitHub
parent 13411882eb
commit 20b585d122
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 718 additions and 68 deletions

View file

@ -47,3 +47,5 @@ export const ML_ALERT_TYPES_CONFIG: Record<
};
export const ALERT_PREVIEW_SAMPLE_SIZE = 5;
export const TOP_N_BUCKETS_COUNT = 1;

View file

@ -93,4 +93,11 @@ export type MlAnomalyDetectionAlertParams = {
severity: number;
resultType: AnomalyResultType;
includeInterim: boolean;
lookbackInterval: string | null | undefined;
topNBuckets: number | null | undefined;
} & AlertTypeParams;
export type MlAnomalyDetectionAlertAdvancedSettings = Pick<
MlAnomalyDetectionAlertParams,
'lookbackInterval' | 'topNBuckets'
>;

View file

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getLookbackInterval, resolveLookbackInterval } from './alerts';
import type { CombinedJobWithStats, Datafeed, Job } from '../types/anomaly_detection_jobs';
describe('resolveLookbackInterval', () => {
test('resolves interval for bucket spans bigger than 1m', () => {
const testJobs = [
{
analysis_config: {
bucket_span: '15m',
},
},
] as Job[];
const testDatafeeds = [
{
query_delay: '65630ms',
},
] as Datafeed[];
expect(resolveLookbackInterval(testJobs, testDatafeeds)).toBe('32m');
});
test('resolves interval for bucket spans smaller than 1m', () => {
const testJobs = [
{
analysis_config: {
bucket_span: '50s',
},
},
] as Job[];
const testDatafeeds = [
{
query_delay: '20s',
},
] as Datafeed[];
expect(resolveLookbackInterval(testJobs, testDatafeeds)).toBe('3m');
});
test('resolves interval for bucket spans smaller than 1m without query dealay', () => {
const testJobs = [
{
analysis_config: {
bucket_span: '59s',
},
},
] as Job[];
const testDatafeeds = [{}] as Datafeed[];
expect(resolveLookbackInterval(testJobs, testDatafeeds)).toBe('3m');
});
});
describe('getLookbackInterval', () => {
test('resolves interval for bucket spans bigger than 1m', () => {
const testJobs = [
{
analysis_config: {
bucket_span: '15m',
},
datafeed_config: {
query_delay: '65630ms',
},
},
] as CombinedJobWithStats[];
expect(getLookbackInterval(testJobs)).toBe('32m');
});
});

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CombinedJobWithStats, Datafeed, Job } from '../types/anomaly_detection_jobs';
import { resolveMaxTimeInterval } from './job_utils';
import { isDefined } from '../types/guards';
import { parseInterval } from './parse_interval';
const narrowBucketLength = 60;
/**
* Resolves the lookback interval for the rule
* using the formula max(2m, 2 * bucket_span) + query_delay + 1s.
* and rounds up to a whole number of minutes.
*/
export function resolveLookbackInterval(jobs: Job[], datafeeds: Datafeed[]): string {
const bucketSpanInSeconds = Math.ceil(
resolveMaxTimeInterval(jobs.map((v) => v.analysis_config.bucket_span)) ?? 0
);
const queryDelayInSeconds = Math.ceil(
resolveMaxTimeInterval(datafeeds.map((v) => v.query_delay).filter(isDefined)) ?? 0
);
const result =
Math.max(2 * narrowBucketLength, 2 * bucketSpanInSeconds) + queryDelayInSeconds + 1;
return `${Math.ceil(result / 60)}m`;
}
/**
* @deprecated We should avoid using {@link CombinedJobWithStats}. Replace usages with {@link resolveLookbackInterval} when
* Kibana API returns mapped job and the datafeed configs.
*/
export function getLookbackInterval(jobs: CombinedJobWithStats[]): string {
return resolveLookbackInterval(
jobs,
jobs.map((v) => v.datafeed_config)
);
}
export function getTopNBuckets(job: Job): number {
const bucketSpan = parseInterval(job.analysis_config.bucket_span);
if (bucketSpan === null) {
throw new Error('Unable to resolve a bucket span length');
}
return Math.ceil(narrowBucketLength / bucketSpan.asSeconds());
}

View file

@ -20,7 +20,7 @@ import {
getSafeAggregationName,
getLatestDataOrBucketTimestamp,
getEarliestDatafeedStartTime,
resolveBucketSpanInSeconds,
resolveMaxTimeInterval,
} from './job_utils';
import { CombinedJob, Job } from '../types/anomaly_detection_jobs';
import moment from 'moment';
@ -606,7 +606,10 @@ describe('ML - job utils', () => {
describe('resolveBucketSpanInSeconds', () => {
test('should resolve maximum bucket interval', () => {
expect(resolveBucketSpanInSeconds(['15m', '1h', '6h', '90s'])).toBe(21600);
expect(resolveMaxTimeInterval(['15m', '1h', '6h', '90s'])).toBe(21600);
});
test('returns undefined for an empty array', () => {
expect(resolveMaxTimeInterval([])).toBe(undefined);
});
});
});

View file

@ -831,14 +831,16 @@ export function splitIndexPatternNames(indexPatternName: string): string[] {
}
/**
* Resolves the longest bucket span from the list.
* @param bucketSpans Collection of bucket spans
* Resolves the longest time interval from the list.
* @param timeIntervals Collection of the strings representing time intervals, e.g. ['15m', '1h', '2d']
*/
export function resolveBucketSpanInSeconds(bucketSpans: string[]): number {
return Math.max(
...bucketSpans
export function resolveMaxTimeInterval(timeIntervals: string[]): number | undefined {
const result = Math.max(
...timeIntervals
.map((b) => parseInterval(b))
.filter(isDefined)
.map((v) => v.asSeconds())
);
return Number.isFinite(result) ? result : undefined;
}

View file

@ -7,6 +7,7 @@
import { ALLOWED_DATA_UNITS } from '../constants/validation';
import { parseInterval } from './parse_interval';
import { isPopulatedObject } from './object_utils';
/**
* Provides a validator function for maximum allowed input length.
@ -85,6 +86,10 @@ export function memoryInputValidator(allowedUnits = ALLOWED_DATA_UNITS) {
export function timeIntervalInputValidator() {
return (value: string) => {
if (value === '') {
return null;
}
const r = parseInterval(value);
if (r === null) {
return {
@ -95,3 +100,32 @@ export function timeIntervalInputValidator() {
return null;
};
}
export interface NumberValidationResult {
min: boolean;
max: boolean;
}
export function numberValidator(conditions?: { min?: number; max?: number }) {
if (
conditions?.min !== undefined &&
conditions.max !== undefined &&
conditions.min > conditions.max
) {
throw new Error('Invalid validator conditions');
}
return (value: number): NumberValidationResult | null => {
const result = {} as NumberValidationResult;
if (conditions?.min !== undefined && value < conditions.min) {
result.min = true;
}
if (conditions?.max !== undefined && value > conditions.max) {
result.max = true;
}
if (isPopulatedObject(result)) {
return result;
}
return null;
};
}

View file

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import { FormattedMessage } from '@kbn/i18n/react';
import {
EuiAccordion,
EuiDescribedFormGroup,
EuiFieldNumber,
EuiFormRow,
EuiHorizontalRule,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import { MlAnomalyDetectionAlertAdvancedSettings } from '../../common/types/alerts';
import { TimeIntervalControl } from './time_interval_control';
import { TOP_N_BUCKETS_COUNT } from '../../common/constants/alerts';
interface AdvancedSettingsProps {
value: MlAnomalyDetectionAlertAdvancedSettings;
onChange: (update: Partial<MlAnomalyDetectionAlertAdvancedSettings>) => void;
}
export const AdvancedSettings: FC<AdvancedSettingsProps> = React.memo(({ value, onChange }) => {
return (
<EuiAccordion
id="mlAnomalyAlertAdvancedSettings"
buttonContent={
<FormattedMessage
id="xpack.ml.anomalyDetectionAlert.advancedSettingsLabel"
defaultMessage="Advanced settings"
/>
}
data-test-subj={'mlAnomalyAlertAdvancedSettingsTrigger'}
>
<EuiSpacer size="m" />
<EuiDescribedFormGroup
gutterSize={'s'}
titleSize={'xxs'}
title={
<h4>
<FormattedMessage
id="xpack.ml.anomalyDetectionAlert.lookbackIntervalLabel"
defaultMessage="Lookback interval"
/>
</h4>
}
description={
<EuiText size={'xs'}>
<FormattedMessage
id="xpack.ml.anomalyDetectionAlert.lookbackIntervalDescription"
defaultMessage="Time interval to query the anomalies data during each rule condition check. By default, is derived from the bucket span of the job and the query delay of the datafeed."
/>
</EuiText>
}
>
<TimeIntervalControl
value={value.lookbackInterval}
label={
<FormattedMessage
id="xpack.ml.anomalyDetectionAlert.lookbackIntervalLabel"
defaultMessage="Lookback interval"
/>
}
onChange={(update) => {
onChange({ lookbackInterval: update });
}}
data-test-subj={'mlAnomalyAlertLookbackInterval'}
/>
</EuiDescribedFormGroup>
<EuiDescribedFormGroup
gutterSize={'s'}
titleSize={'xxs'}
title={
<h4>
<FormattedMessage
id="xpack.ml.anomalyDetectionAlert.topNBucketsLabel"
defaultMessage="Number of latest buckets"
/>
</h4>
}
description={
<EuiText size={'xs'}>
<FormattedMessage
id="xpack.ml.anomalyDetectionAlert.topNBucketsDescription"
defaultMessage="The number of latest buckets to check to obtain the highest anomaly."
/>
</EuiText>
}
>
<EuiFormRow
label={
<FormattedMessage
id="xpack.ml.anomalyDetectionAlert.topNBucketsLabel"
defaultMessage="Number of latest buckets"
/>
}
>
<EuiFieldNumber
value={value.topNBuckets ?? TOP_N_BUCKETS_COUNT}
min={1}
onChange={(e) => {
onChange({ topNBuckets: Number(e.target.value) });
}}
data-test-subj={'mlAnomalyAlertTopNBuckets'}
/>
</EuiFormRow>
</EuiDescribedFormGroup>
<EuiHorizontalRule margin={'m'} />
</EuiAccordion>
);
});

View file

@ -5,40 +5,35 @@
* 2.0.
*/
import React, { FC, useMemo } from 'react';
import React, { FC } from 'react';
import { FormattedMessage } from '@kbn/i18n/react';
import { EuiCallOut, EuiSpacer } from '@elastic/eui';
import { parseInterval } from '../../common/util/parse_interval';
import { CombinedJobWithStats } from '../../common/types/anomaly_detection_jobs';
import { DATAFEED_STATE } from '../../common/constants/states';
import { resolveBucketSpanInSeconds } from '../../common/util/job_utils';
import { MlAnomalyDetectionAlertParams } from '../../common/types/alerts';
interface ConfigValidatorProps {
alertInterval: string;
jobConfigs: CombinedJobWithStats[];
alertParams: MlAnomalyDetectionAlertParams;
}
/**
* Validated alert configuration
*/
export const ConfigValidator: FC<ConfigValidatorProps> = React.memo(
({ jobConfigs = [], alertInterval }) => {
const resultBucketSpanInSeconds = useMemo(
() => resolveBucketSpanInSeconds(jobConfigs.map((v) => v.analysis_config.bucket_span)),
[jobConfigs]
);
const resultBucketSpanString =
resultBucketSpanInSeconds % 60 === 0
? `${resultBucketSpanInSeconds / 60}m`
: `${resultBucketSpanInSeconds}s`;
({ jobConfigs = [], alertInterval, alertParams }) => {
if (jobConfigs.length === 0) return null;
const alertIntervalInSeconds = parseInterval(alertInterval)!.asSeconds();
const isAlertIntervalTooHigh = resultBucketSpanInSeconds < alertIntervalInSeconds;
const lookbackIntervalInSeconds =
!!alertParams.lookbackInterval && parseInterval(alertParams.lookbackInterval)?.asSeconds();
const isAlertIntervalTooHigh =
lookbackIntervalInSeconds && lookbackIntervalInSeconds < alertIntervalInSeconds;
const jobWithoutStartedDatafeed = jobConfigs
.filter((job) => job.datafeed_config.state !== DATAFEED_STATE.STARTED)
@ -66,9 +61,9 @@ export const ConfigValidator: FC<ConfigValidatorProps> = React.memo(
<li>
<FormattedMessage
id="xpack.ml.alertConditionValidation.alertIntervalTooHighMessage"
defaultMessage="The check interval is greater than the maximum bucket span of the selected jobs. Reduce it to {resultBucketSpan} to avoid excessive delay in receiving notifications."
defaultMessage="The check interval is greater than the lookback interval. Reduce it to {lookbackInterval} to avoid potentially missing notifications."
values={{
resultBucketSpan: resultBucketSpanString,
lookbackInterval: alertParams.lookbackInterval,
}}
/>
</li>

View file

@ -18,11 +18,17 @@ import { ResultTypeSelector } from './result_type_selector';
import { alertingApiProvider } from '../application/services/ml_api_service/alerting';
import { PreviewAlertCondition } from './preview_alert_condition';
import { ANOMALY_THRESHOLD } from '../../common';
import { MlAnomalyDetectionAlertParams } from '../../common/types/alerts';
import {
MlAnomalyDetectionAlertAdvancedSettings,
MlAnomalyDetectionAlertParams,
} from '../../common/types/alerts';
import { ANOMALY_RESULT_TYPE } from '../../common/constants/anomalies';
import { InterimResultsControl } from './interim_results_control';
import { ConfigValidator } from './config_validator';
import { CombinedJobWithStats } from '../../common/types/anomaly_detection_jobs';
import { AdvancedSettings } from './advanced_settings';
import { getLookbackInterval, getTopNBuckets } from '../../common/util/alerts';
import { isDefined } from '../../common/types/guards';
interface MlAnomalyAlertTriggerProps {
alertParams: MlAnomalyDetectionAlertParams;
@ -114,6 +120,28 @@ const MlAnomalyAlertTrigger: FC<MlAnomalyAlertTriggerProps> = ({
}
});
const advancedSettings = useMemo(() => {
let { lookbackInterval, topNBuckets } = alertParams;
if (!isDefined(lookbackInterval) && jobConfigs.length > 0) {
lookbackInterval = getLookbackInterval(jobConfigs);
}
if (!isDefined(topNBuckets) && jobConfigs.length > 0) {
topNBuckets = getTopNBuckets(jobConfigs[0]);
}
return {
lookbackInterval,
topNBuckets,
};
}, [alertParams.lookbackInterval, alertParams.topNBuckets, jobConfigs]);
const resultParams = useMemo(() => {
return {
...alertParams,
...advancedSettings,
};
}, [alertParams, advancedSettings]);
return (
<EuiForm data-test-subj={'mlAnomalyAlertForm'}>
<EuiFlexGroup gutterSize={'none'} justifyContent={'flexEnd'}>
@ -139,7 +167,11 @@ const MlAnomalyAlertTrigger: FC<MlAnomalyAlertTriggerProps> = ({
errors={errors.jobSelection}
/>
<ConfigValidator jobConfigs={jobConfigs} alertInterval={alertInterval} />
<ConfigValidator
jobConfigs={jobConfigs}
alertInterval={alertInterval}
alertParams={resultParams}
/>
<ResultTypeSelector
value={alertParams.resultType}
@ -157,6 +189,17 @@ const MlAnomalyAlertTrigger: FC<MlAnomalyAlertTriggerProps> = ({
/>
<EuiSpacer size="m" />
<AdvancedSettings
value={advancedSettings}
onChange={useCallback((update) => {
Object.keys(update).forEach((k) => {
setAlertParams(k, update[k as keyof MlAnomalyDetectionAlertAdvancedSettings]);
});
}, [])}
/>
<EuiSpacer size="m" />
<PreviewAlertCondition alertingApiService={alertingApiService} alertParams={alertParams} />
<EuiSpacer size="m" />

View file

@ -11,7 +11,10 @@ import { ML_ALERT_TYPES } from '../../common/constants/alerts';
import { MlAnomalyDetectionAlertParams } from '../../common/types/alerts';
import { TriggersAndActionsUIPublicPluginSetup } from '../../../triggers_actions_ui/public';
export function registerMlAlerts(triggersActionsUi: TriggersAndActionsUIPublicPluginSetup) {
export async function registerMlAlerts(triggersActionsUi: TriggersAndActionsUIPublicPluginSetup) {
// async import validators to reduce initial bundle size
const { validateLookbackInterval, validateTopNBucket } = await import('./validators');
triggersActionsUi.alertTypeRegistry.register({
id: ML_ALERT_TYPES.ANOMALY_DETECTION,
description: i18n.translate('xpack.ml.alertTypes.anomalyDetection.description', {
@ -28,7 +31,9 @@ export function registerMlAlerts(triggersActionsUi: TriggersAndActionsUIPublicPl
jobSelection: new Array<string>(),
severity: new Array<string>(),
resultType: new Array<string>(),
},
topNBuckets: new Array<string>(),
lookbackInterval: new Array<string>(),
} as Record<keyof MlAnomalyDetectionAlertParams, string[]>,
};
if (
@ -58,6 +63,28 @@ export function registerMlAlerts(triggersActionsUi: TriggersAndActionsUIPublicPl
);
}
if (
!!alertParams.lookbackInterval &&
validateLookbackInterval(alertParams.lookbackInterval)
) {
validationResult.errors.lookbackInterval.push(
i18n.translate('xpack.ml.alertTypes.anomalyDetection.lookbackInterval.errorMessage', {
defaultMessage: 'Lookback interval is invalid',
})
);
}
if (
typeof alertParams.topNBuckets === 'number' &&
validateTopNBucket(alertParams.topNBuckets)
) {
validationResult.errors.topNBuckets.push(
i18n.translate('xpack.ml.alertTypes.anomalyDetection.topNBuckets.errorMessage', {
defaultMessage: 'Number of buckets is invalid',
})
);
}
return validationResult;
},
requiresAppContext: false,

View file

@ -67,7 +67,7 @@ export const SeverityControl: FC<SeveritySelectorProps> = React.memo(({ value, o
value={value ?? ANOMALY_THRESHOLD.LOW}
onChange={(e) => {
// @ts-ignore Property 'value' does not exist on type 'EventTarget' | (EventTarget & HTMLInputElement)
onChange(e.target.value);
onChange(Number(e.target.value));
}}
showLabels
showValue

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { EuiFieldText, EuiFormRow, EuiFieldTextProps } from '@elastic/eui';
import React, { FC, ReactNode, useMemo } from 'react';
import { invalidTimeIntervalMessage } from '../application/jobs/new_job/common/job_validator/util';
import { composeValidators } from '../../common';
import { timeIntervalInputValidator } from '../../common/util/validators';
type TimeIntervalControlProps = Omit<EuiFieldTextProps, 'value' | 'onChange'> & {
label: string | ReactNode;
value: string | null | undefined;
onChange: (update: string) => void;
};
export const TimeIntervalControl: FC<TimeIntervalControlProps> = ({
value,
onChange,
label,
...fieldTextProps
}) => {
const validators = useMemo(() => composeValidators(timeIntervalInputValidator()), []);
const validationErrors = useMemo(() => validators(value), [value]);
const isInvalid = value !== undefined && !!validationErrors;
return (
<EuiFormRow
label={label}
isInvalid={isInvalid}
error={invalidTimeIntervalMessage(value ?? undefined)}
>
<EuiFieldText
{...fieldTextProps}
placeholder="15d, 6m"
value={value ?? ''}
onChange={(e) => {
onChange(e.target.value);
}}
isInvalid={isInvalid}
/>
</EuiFormRow>
);
};

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { numberValidator, timeIntervalInputValidator } from '../../common/util/validators';
export const validateLookbackInterval = timeIntervalInputValidator();
export const validateTopNBucket = numberValidator({ min: 1 });

View file

@ -7,8 +7,6 @@
import Boom from '@hapi/boom';
import rison from 'rison-node';
import { ElasticsearchClient } from 'kibana/server';
import moment from 'moment';
import { Duration } from 'moment/moment';
import { MlClient } from '../ml_client';
import {
@ -27,8 +25,10 @@ import {
} from '../../../common/types/alerts';
import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
import { MlJobsResponse } from '../../../common/types/job_service';
import { resolveBucketSpanInSeconds } from '../../../common/util/job_utils';
import { resolveMaxTimeInterval } from '../../../common/util/job_utils';
import { isDefined } from '../../../common/types/guards';
import { getTopNBuckets, resolveLookbackInterval } from '../../../common/util/alerts';
import type { DatafeedsService } from '../../models/job_service/datafeeds';
type AggResultsResponse = { key?: number } & {
[key in PreviewResultsKeys]: {
@ -40,12 +40,21 @@ type AggResultsResponse = { key?: number } & {
};
};
/**
* Mapping for result types and corresponding score fields.
*/
const resultTypeScoreMapping = {
[ANOMALY_RESULT_TYPE.BUCKET]: 'anomaly_score',
[ANOMALY_RESULT_TYPE.RECORD]: 'record_score',
[ANOMALY_RESULT_TYPE.INFLUENCER]: 'influencer_score',
};
/**
* Alerting related server-side methods
* @param mlClient
* @param esClient
* @param datafeedsService
*/
export function alertingServiceProvider(mlClient: MlClient, esClient: ElasticsearchClient) {
export function alertingServiceProvider(mlClient: MlClient, datafeedsService: DatafeedsService) {
const getAggResultsLabel = (resultType: AnomalyResultType) => {
return {
aggGroupLabel: `${resultType}_results` as PreviewResultsKeys,
@ -332,7 +341,16 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
if (jobsResponse.length === 0) {
// Probably assigned groups don't contain any jobs anymore.
return;
throw new Error("Couldn't find the job with provided id");
}
const maxBucket = resolveMaxTimeInterval(
jobsResponse.map((v) => v.analysis_config.bucket_span)
);
if (maxBucket === undefined) {
// Technically it's not possible, just in case.
throw new Error('Unable to resolve a valid bucket length');
}
/**
@ -341,9 +359,7 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
*/
const lookBackTimeInterval = `${Math.max(
// Double the max bucket span
Math.round(
resolveBucketSpanInSeconds(jobsResponse.map((v) => v.analysis_config.bucket_span)) * 2
),
Math.round(maxBucket * 2),
checkIntervalGap ? Math.round(checkIntervalGap.asSeconds()) : 0
)}s`;
@ -368,7 +384,7 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
},
{
terms: {
result_type: Object.values(ANOMALY_RESULT_TYPE),
result_type: Object.values(ANOMALY_RESULT_TYPE) as string[],
},
},
...(params.includeInterim
@ -431,6 +447,139 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
).filter(isDefined);
};
/**
* Fetches the most recent anomaly according the top N buckets within the lookback interval
* that satisfies a rule criteria.
*
* @param params - Alert params
*/
const fetchResult = async (
params: MlAnomalyDetectionAlertParams
): Promise<AlertExecutionResult | undefined> => {
const jobAndGroupIds = [
...(params.jobSelection.jobIds ?? []),
...(params.jobSelection.groupIds ?? []),
];
// Extract jobs from group ids and make sure provided jobs assigned to a current space
const jobsResponse = (
await mlClient.getJobs<MlJobsResponse>({ job_id: jobAndGroupIds.join(',') })
).body.jobs;
if (jobsResponse.length === 0) {
// Probably assigned groups don't contain any jobs anymore.
return;
}
const jobIds = jobsResponse.map((v) => v.job_id);
const dataFeeds = await datafeedsService.getDatafeedByJobId(jobIds);
const maxBucketInSeconds = resolveMaxTimeInterval(
jobsResponse.map((v) => v.analysis_config.bucket_span)
);
if (maxBucketInSeconds === undefined) {
// Technically it's not possible, just in case.
throw new Error('Unable to resolve a valid bucket length');
}
const lookBackTimeInterval: string =
params.lookbackInterval ?? resolveLookbackInterval(jobsResponse, dataFeeds ?? []);
const topNBuckets: number = params.topNBuckets ?? getTopNBuckets(jobsResponse[0]);
const requestBody = {
size: 0,
query: {
bool: {
filter: [
{
terms: { job_id: jobIds },
},
{
terms: {
result_type: Object.values(ANOMALY_RESULT_TYPE) as string[],
},
},
{
range: {
timestamp: {
gte: `now-${lookBackTimeInterval}`,
},
},
},
...(params.includeInterim
? []
: [
{
term: { is_interim: false },
},
]),
],
},
},
aggs: {
alerts_over_time: {
date_histogram: {
field: 'timestamp',
fixed_interval: `${maxBucketInSeconds}s`,
order: {
_key: 'desc' as const,
},
},
aggs: {
max_score: {
max: {
field: resultTypeScoreMapping[params.resultType],
},
},
...getResultTypeAggRequest(params.resultType, params.severity),
truncate: {
bucket_sort: {
size: topNBuckets,
},
},
},
},
},
};
const response = await mlClient.anomalySearch(
{
// @ts-expect-error
body: requestBody,
},
jobIds
);
const result = response.body.aggregations as {
alerts_over_time: {
buckets: Array<
{
doc_count: number;
key: number;
key_as_string: string;
max_score: {
value: number;
};
} & AggResultsResponse
>;
};
};
if (result.alerts_over_time.buckets.length === 0) {
return;
}
// Find the most anomalous result from the top N buckets
const topResult = result.alerts_over_time.buckets.reduce((prev, current) =>
prev.max_score.value > current.max_score.value ? prev : current
);
return getResultsFormatter(params.resultType)(topResult);
};
/**
* TODO Replace with URL generator when https://github.com/elastic/kibana/issues/59453 is resolved
* @param r
@ -520,17 +669,8 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
startedAt: Date,
previousStartedAt: Date | null
): Promise<AnomalyDetectionAlertContext | undefined> => {
const checkIntervalGap = previousStartedAt
? moment.duration(moment(startedAt).diff(previousStartedAt))
: undefined;
const result = await fetchResult(params);
const res = await fetchAnomalies(params, undefined, checkIntervalGap);
if (!res) {
throw new Error('No results found');
}
const result = res[0];
if (!result) return;
const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType);

View file

@ -34,6 +34,8 @@ interface Results {
};
}
export type DatafeedsService = ReturnType<typeof datafeedsProvider>;
export function datafeedsProvider(client: IScopedClusterClient, mlClient: MlClient) {
async function forceStartDatafeeds(datafeedIds: string[], start?: number, end?: number) {
const jobIds = await getJobIdsByDatafeedId();
@ -168,25 +170,39 @@ export function datafeedsProvider(client: IScopedClusterClient, mlClient: MlClie
}, {} as { [id: string]: string });
}
async function getDatafeedByJobId(
jobId: string[],
excludeGenerated?: boolean
): Promise<Datafeed[] | undefined>;
async function getDatafeedByJobId(
jobId: string,
excludeGenerated?: boolean
): Promise<Datafeed | undefined> {
): Promise<Datafeed | undefined>;
async function getDatafeedByJobId(
jobId: string | string[],
excludeGenerated?: boolean
): Promise<Datafeed | Datafeed[] | undefined> {
const jobIds = Array.isArray(jobId) ? jobId : [jobId];
async function findDatafeed() {
// if the job was doesn't use the standard datafeedId format
// get all the datafeeds and match it with the jobId
const {
body: { datafeeds },
} = await mlClient.getDatafeeds(excludeGenerated ? { exclude_generated: true } : {}); //
for (const result of datafeeds) {
if (result.job_id === jobId) {
return result;
}
} = await mlClient.getDatafeeds(excludeGenerated ? { exclude_generated: true } : {});
if (typeof jobId === 'string') {
return datafeeds.find((v) => v.job_id === jobId);
}
if (Array.isArray(jobId)) {
return datafeeds.filter((v) => jobIds.includes(v.job_id));
}
}
// if the job was created by the wizard,
// then we can assume it uses the standard format of the datafeedId
const assumedDefaultDatafeedId = `datafeed-${jobId}`;
const assumedDefaultDatafeedId = jobIds.map((v) => `datafeed-${v}`).join(',');
try {
const {
body: { datafeeds: datafeedsResults },
@ -194,12 +210,22 @@ export function datafeedsProvider(client: IScopedClusterClient, mlClient: MlClie
datafeed_id: assumedDefaultDatafeedId,
...(excludeGenerated ? { exclude_generated: true } : {}),
});
if (
Array.isArray(datafeedsResults) &&
datafeedsResults.length === 1 &&
datafeedsResults[0].job_id === jobId
) {
return datafeedsResults[0];
if (Array.isArray(datafeedsResults)) {
const result = datafeedsResults.filter((d) => jobIds.includes(d.job_id));
if (typeof jobId === 'string') {
if (datafeedsResults.length === 1 && datafeedsResults[0].job_id === jobId) {
return datafeedsResults[0];
} else {
return await findDatafeed();
}
}
if (result.length === jobIds.length) {
return datafeedsResults;
} else {
return await findDatafeed();
}
} else {
return await findDatafeed();
}

View file

@ -9,6 +9,7 @@ import { RouteInitialization } from '../types';
import { wrapError } from '../client/error_wrapper';
import { alertingServiceProvider } from '../lib/alerts/alerting_service';
import { mlAnomalyDetectionAlertPreviewRequest } from './schemas/alerting_schema';
import { datafeedsProvider } from '../models/job_service/datafeeds';
export function alertingRoutes({ router, routeGuard }: RouteInitialization) {
/**
@ -32,7 +33,10 @@ export function alertingRoutes({ router, routeGuard }: RouteInitialization) {
},
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response, client }) => {
try {
const alertingService = alertingServiceProvider(mlClient, client.asInternalUser);
const alertingService = alertingServiceProvider(
mlClient,
datafeedsProvider(client, mlClient)
);
const result = await alertingService.preview(request.body);

View file

@ -26,13 +26,19 @@ export const mlAnomalyDetectionAlertParams = schema.object({
},
}
),
severity: schema.number(),
/** Anomaly score threshold */
severity: schema.number({ min: 0, max: 100 }),
/** Result type to alert upon */
resultType: schema.oneOf([
schema.literal(ANOMALY_RESULT_TYPE.RECORD),
schema.literal(ANOMALY_RESULT_TYPE.BUCKET),
schema.literal(ANOMALY_RESULT_TYPE.INFLUENCER),
]),
includeInterim: schema.boolean({ defaultValue: true }),
/** User's override for the lookback interval */
lookbackInterval: schema.nullable(schema.string()),
/** User's override for the top N buckets */
topNBuckets: schema.nullable(schema.number({ min: 1 })),
});
export const mlAnomalyDetectionAlertPreviewRequest = schema.object({

View file

@ -8,6 +8,7 @@
import { KibanaRequest, SavedObjectsClientContract } from 'kibana/server';
import { GetGuards } from '../shared_services';
import { alertingServiceProvider, MlAlertingService } from '../../lib/alerts/alerting_service';
import { datafeedsProvider } from '../../models/job_service/datafeeds';
export function getAlertingServiceProvider(getGuards: GetGuards) {
return {
@ -21,7 +22,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) {
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).preview(...args)
alertingServiceProvider(mlClient, datafeedsProvider(scopedClient, mlClient)).preview(
...args
)
);
},
execute: async (
@ -31,7 +34,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) {
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).execute(...args)
alertingServiceProvider(mlClient, datafeedsProvider(scopedClient, mlClient)).execute(
...args
)
);
},
};

View file

@ -16,6 +16,7 @@ export function MachineLearningAlertingProvider(
const retry = getService('retry');
const comboBox = getService('comboBox');
const testSubjects = getService('testSubjects');
const find = getService('find');
return {
async selectAnomalyDetectionAlertType() {
@ -100,5 +101,47 @@ export function MachineLearningAlertingProvider(
await testSubjects.existOrFail(`mlAnomalyAlertPreviewCallout`);
});
},
async assertLookbackInterval(expectedValue: string) {
const actualValue = await testSubjects.getAttribute(
'mlAnomalyAlertLookbackInterval',
'value'
);
expect(actualValue).to.eql(
expectedValue,
`Expected lookback interval to equal ${expectedValue}, got ${actualValue}`
);
},
async assertTopNBuckets(expectedNumberOfBuckets: number) {
const actualValue = await testSubjects.getAttribute('mlAnomalyAlertTopNBuckets', 'value');
expect(actualValue).to.eql(
expectedNumberOfBuckets,
`Expected number of buckets to equal ${expectedNumberOfBuckets}, got ${actualValue}`
);
},
async setLookbackInterval(interval: string) {
await this.ensureAdvancedSectionOpen();
await testSubjects.setValue('mlAnomalyAlertLookbackInterval', interval);
await this.assertLookbackInterval(interval);
},
async setTopNBuckets(numberOfBuckets: number) {
await this.ensureAdvancedSectionOpen();
await testSubjects.setValue('mlAnomalyAlertTopNBuckets', numberOfBuckets.toString());
await this.assertTopNBuckets(numberOfBuckets);
},
async ensureAdvancedSectionOpen() {
await retry.tryForTime(5000, async () => {
const isVisible = await find.existsByDisplayedByCssSelector(
'#mlAnomalyAlertAdvancedSettings'
);
if (!isVisible) {
await testSubjects.click('mlAnomalyAlertAdvancedSettingsTrigger');
}
});
},
};
}

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { Datafeed } from '@elastic/elasticsearch/api/types';
import { FtrProviderContext } from '../../ftr_provider_context';
import { DATAFEED_STATE } from '../../../../plugins/ml/common/constants/states';
@ -39,7 +40,7 @@ function createTestJobAndDatafeed() {
categorization_examples_limit: 4,
},
},
datafeed: {
datafeed: ({
datafeed_id: `datafeed-${jobId}`,
job_id: jobId,
query: {
@ -53,8 +54,9 @@ function createTestJobAndDatafeed() {
must_not: [],
},
},
query_delay: '120s',
indices: ['ft_ecommerce'],
},
} as unknown) as Datafeed,
};
}
@ -83,7 +85,6 @@ export default ({ getPageObjects, getService }: FtrProviderContext) => {
// @ts-expect-error not full interface
await ml.api.createAnomalyDetectionJob(job);
await ml.api.openAnomalyDetectionJob(job.job_id);
// @ts-expect-error not full interface
await ml.api.createDatafeed(datafeed);
await ml.api.startDatafeed(datafeed.datafeed_id);
await ml.api.waitForDatafeedState(datafeed.datafeed_id, DATAFEED_STATE.STARTED);
@ -109,6 +110,10 @@ export default ({ getPageObjects, getService }: FtrProviderContext) => {
await ml.alerting.selectResultType('record');
await ml.alerting.setSeverity(10);
await ml.testExecution.logTestStep('should populate advanced settings with default values');
await ml.alerting.assertTopNBuckets(1);
await ml.alerting.assertLookbackInterval('123m');
await ml.testExecution.logTestStep('should preview the alert condition');
await ml.alerting.assertPreviewButtonState(false);
await ml.alerting.setTestInterval('2y');