[ML] Fix time range query in the Anomaly detection alert execution (#93939)

This commit is contained in:
Dima Arnautov 2021-03-08 21:00:23 +01:00 committed by GitHub
parent d5e4a2ae7d
commit 6b26d19c4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 84 deletions

View file

@ -14,7 +14,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in
export interface AlertExecutionResult {
count: number;
key: number;
key?: number;
alertInstanceKey: string;
isInterim: boolean;
jobIds: string[];

View file

@ -107,7 +107,7 @@ const MlAnomalyAlertTrigger: FC<MlAnomalyAlertTriggerProps> = ({
// Set defaults
severity: ANOMALY_THRESHOLD.CRITICAL,
resultType: ANOMALY_RESULT_TYPE.BUCKET,
includeInterim: true,
includeInterim: false,
// Preserve job selection
jobSelection,
});

View file

@ -28,6 +28,17 @@ import {
import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
import { MlJobsResponse } from '../../../common/types/job_service';
import { resolveBucketSpanInSeconds } from '../../../common/util/job_utils';
import { isDefined } from '../../../common/types/guards';
type AggResultsResponse = { key?: number } & {
[key in PreviewResultsKeys]: {
doc_count: number;
} & {
[hitsKey in TopHitsResultsKeys]: {
hits: { hits: any[] };
};
};
};
/**
* Alerting related server-side methods
@ -253,6 +264,51 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
return source.job_id;
};
const getResultsFormatter = (resultType: AnomalyResultType) => {
const resultsLabel = getAggResultsLabel(resultType);
return (v: AggResultsResponse): AlertExecutionResult | undefined => {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
if (aggTypeResults.doc_count === 0) {
return;
}
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;
const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);
return {
count: aggTypeResults.doc_count,
key: v.key,
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
bucketRange: {
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
};
}) as RecordAnomalyAlertDoc[],
topInfluencers: v.influencer_results.top_influencer_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
};
}) as InfluencerAnomalyAlertDoc[],
};
};
};
/**
* Builds a request body
* @param params - Alert params
@ -325,17 +381,19 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
],
},
},
aggs: {
alerts_over_time: {
date_histogram: {
field: 'timestamp',
fixed_interval: lookBackTimeInterval,
// Ignore empty buckets
min_doc_count: 1,
},
aggs: getResultTypeAggRequest(params.resultType as AnomalyResultType, params.severity),
},
},
aggs: previewTimeInterval
? {
alerts_over_time: {
date_histogram: {
field: 'timestamp',
fixed_interval: lookBackTimeInterval,
// Ignore empty buckets
min_doc_count: 1,
},
aggs: getResultTypeAggRequest(params.resultType, params.severity),
},
}
: getResultTypeAggRequest(params.resultType, params.severity),
};
const response = await mlClient.anomalySearch(
@ -345,67 +403,30 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
jobIds
);
const result = response.body.aggregations as {
alerts_over_time: {
buckets: Array<
{
doc_count: number;
key: number;
key_as_string: string;
} & {
[key in PreviewResultsKeys]: {
doc_count: number;
} & {
[hitsKey in TopHitsResultsKeys]: {
hits: { hits: any[] };
};
};
}
>;
};
};
const result = response.body.aggregations;
const resultsLabel = getAggResultsLabel(params.resultType as AnomalyResultType);
const resultsLabel = getAggResultsLabel(params.resultType);
return (
result.alerts_over_time.buckets
// Filter out empty buckets
.filter((v) => v.doc_count > 0 && v[resultsLabel.aggGroupLabel].doc_count > 0)
// Map response
.map((v) => {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;
const formatter = getResultsFormatter(params.resultType);
const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);
return {
count: aggTypeResults.doc_count,
key: v.key,
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
bucketRange: {
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
})) as RecordAnomalyAlertDoc[],
topInfluencers: v.influencer_results.top_influencer_hits.hits.hits.map((h) => ({
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
})) as InfluencerAnomalyAlertDoc[],
return (previewTimeInterval
? (result as {
alerts_over_time: {
buckets: Array<
{
doc_count: number;
key: number;
key_as_string: string;
} & AggResultsResponse
>;
};
})
);
}).alerts_over_time.buckets
// Filter out empty buckets
.filter((v) => v.doc_count > 0 && v[resultsLabel.aggGroupLabel].doc_count > 0)
// Map response
.map(formatter)
: [formatter(result as AggResultsResponse)]
).filter(isDefined);
};
/**
@ -510,7 +531,7 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea
const result = res[0];
if (!result) return;
const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType);
const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType);
const executionResult = {
...result,

View file

@ -63,19 +63,19 @@ export function registerAnomalyDetectionAlertType({
{
name: 'timestamp',
description: i18n.translate('xpack.ml.alertContext.timestampDescription', {
defaultMessage: 'Timestamp of the anomaly',
defaultMessage: 'The bucket timestamp of the anomaly',
}),
},
{
name: 'timestampIso8601',
description: i18n.translate('xpack.ml.alertContext.timestampIso8601Description', {
defaultMessage: 'Time in ISO8601 format',
defaultMessage: 'The bucket time of the anomaly in ISO8601 format',
}),
},
{
name: 'jobIds',
description: i18n.translate('xpack.ml.alertContext.jobIdsDescription', {
defaultMessage: 'List of job IDs triggered the alert instance',
defaultMessage: 'List of job IDs that triggered the alert instance',
}),
},
{
@ -87,7 +87,7 @@ export function registerAnomalyDetectionAlertType({
{
name: 'score',
description: i18n.translate('xpack.ml.alertContext.scoreDescription', {
defaultMessage: 'Anomaly score',
defaultMessage: 'Anomaly score at the time of the notification action',
}),
},
{
@ -109,14 +109,6 @@ export function registerAnomalyDetectionAlertType({
}),
useWithTripleBracesInTemplates: true,
},
// TODO remove when https://github.com/elastic/kibana/pull/90525 is merged
{
name: 'kibanaBaseUrl',
description: i18n.translate('xpack.ml.alertContext.kibanaBasePathUrlDescription', {
defaultMessage: 'Kibana base path',
}),
useWithTripleBracesInTemplates: true,
},
],
},
producer: PLUGIN_ID,

View file

@ -8,6 +8,7 @@
import { schema, TypeOf } from '@kbn/config-schema';
import { i18n } from '@kbn/i18n';
import { ALERT_PREVIEW_SAMPLE_SIZE } from '../../../common/constants/alerts';
import { ANOMALY_RESULT_TYPE } from '../../../common/constants/anomalies';
export const mlAnomalyDetectionAlertParams = schema.object({
jobSelection: schema.object(
@ -26,7 +27,11 @@ export const mlAnomalyDetectionAlertParams = schema.object({
}
),
severity: schema.number(),
resultType: schema.string(),
resultType: schema.oneOf([
schema.literal(ANOMALY_RESULT_TYPE.RECORD),
schema.literal(ANOMALY_RESULT_TYPE.BUCKET),
schema.literal(ANOMALY_RESULT_TYPE.INFLUENCER),
]),
includeInterim: schema.boolean({ defaultValue: true }),
});