[ML] Remove unused code. (#113263) (#113691)

- Removes unused code in server/lib/correlations.
- The only remaining file get_filters.ts was moved to lib/search_strategies/queries. (Note at the moment Kibana search strategies are only used for correlations, but at some point we might want to restructure that possibly to lib/search_strategies/correlations/queries to make it more clear).

Co-authored-by: Walter Rafelsberger <walter@elastic.co>
This commit is contained in:
Kibana Machine 2021-10-05 07:33:16 -04:00 committed by GitHub
parent 8b32279f97
commit 4b4ea845fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 6 additions and 731 deletions

View file

@ -1,157 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isEmpty, omit } from 'lodash';
import { EventOutcome } from '../../../../common/event_outcome';
import {
processSignificantTermAggs,
TopSigTerm,
} from '../process_significant_term_aggs';
import { AggregationOptionsByType } from '../../../../../../../src/core/types/elasticsearch';
import { ESFilter } from '../../../../../../../src/core/types/elasticsearch';
import { EVENT_OUTCOME } from '../../../../common/elasticsearch_fieldnames';
import { ProcessorEvent } from '../../../../common/processor_event';
import { Setup } from '../../helpers/setup_request';
import { getBucketSize } from '../../helpers/get_bucket_size';
import {
getTimeseriesAggregation,
getFailedTransactionRateTimeSeries,
} from '../../helpers/transaction_error_rate';
import { CorrelationsOptions, getCorrelationsFilters } from '../get_filters';
interface Options extends CorrelationsOptions {
fieldNames: string[];
setup: Setup;
}
export async function getCorrelationsForFailedTransactions(options: Options) {
const { fieldNames, setup, start, end } = options;
const { apmEventClient } = setup;
const filters = getCorrelationsFilters(options);
const params = {
apm: { events: [ProcessorEvent.transaction] },
track_total_hits: true,
body: {
size: 0,
query: {
bool: { filter: filters },
},
aggs: {
failed_transactions: {
filter: { term: { [EVENT_OUTCOME]: EventOutcome.failure } },
// significant term aggs
aggs: fieldNames.reduce((acc, fieldName) => {
return {
...acc,
[fieldName]: {
significant_terms: {
size: 10,
field: fieldName,
background_filter: {
bool: {
filter: filters,
must_not: {
term: { [EVENT_OUTCOME]: EventOutcome.failure },
},
},
},
},
},
};
}, {} as Record<string, { significant_terms: AggregationOptionsByType['significant_terms'] }>),
},
},
},
};
const response = await apmEventClient.search(
'get_correlations_for_failed_transactions',
params
);
if (!response.aggregations) {
return { significantTerms: [] };
}
const sigTermAggs = omit(
response.aggregations?.failed_transactions,
'doc_count'
);
const topSigTerms = processSignificantTermAggs({ sigTermAggs });
return getErrorRateTimeSeries({ setup, filters, topSigTerms, start, end });
}
export async function getErrorRateTimeSeries({
setup,
filters,
topSigTerms,
start,
end,
}: {
setup: Setup;
filters: ESFilter[];
topSigTerms: TopSigTerm[];
start: number;
end: number;
}) {
const { apmEventClient } = setup;
const { intervalString } = getBucketSize({ start, end, numBuckets: 15 });
if (isEmpty(topSigTerms)) {
return { significantTerms: [] };
}
const timeseriesAgg = getTimeseriesAggregation(start, end, intervalString);
const perTermAggs = topSigTerms.reduce(
(acc, term, index) => {
acc[`term_${index}`] = {
filter: { term: { [term.fieldName]: term.fieldValue } },
aggs: { timeseries: timeseriesAgg },
};
return acc;
},
{} as {
[key: string]: {
filter: AggregationOptionsByType['filter'];
aggs: { timeseries: typeof timeseriesAgg };
};
}
);
const params = {
// TODO: add support for metrics
apm: { events: [ProcessorEvent.transaction] },
body: {
size: 0,
query: { bool: { filter: filters } },
aggs: perTermAggs,
},
};
const response = await apmEventClient.search(
'get_error_rate_timeseries',
params
);
const { aggregations } = response;
if (!aggregations) {
return { significantTerms: [] };
}
return {
significantTerms: topSigTerms.map((topSig, index) => {
const agg = aggregations[`term_${index}`]!;
return {
...topSig,
timeseries: getFailedTransactionRateTimeSeries(agg.timeseries.buckets),
};
}),
};
}

View file

@ -1,56 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ProcessorEvent } from '../../../../common/processor_event';
import { getBucketSize } from '../../helpers/get_bucket_size';
import {
getTimeseriesAggregation,
getFailedTransactionRateTimeSeries,
} from '../../helpers/transaction_error_rate';
import { Setup } from '../../helpers/setup_request';
import { CorrelationsOptions, getCorrelationsFilters } from '../get_filters';
interface Options extends CorrelationsOptions {
setup: Setup;
}
export async function getOverallErrorTimeseries(options: Options) {
const { setup, start, end } = options;
const filters = getCorrelationsFilters(options);
const { apmEventClient } = setup;
const { intervalString } = getBucketSize({ start, end, numBuckets: 15 });
const params = {
// TODO: add support for metrics
apm: { events: [ProcessorEvent.transaction] },
body: {
size: 0,
query: { bool: { filter: filters } },
aggs: {
timeseries: getTimeseriesAggregation(start, end, intervalString),
},
},
};
const response = await apmEventClient.search(
'get_error_rate_timeseries',
params
);
const { aggregations } = response;
if (!aggregations) {
return { overall: null };
}
return {
overall: {
timeseries: getFailedTransactionRateTimeSeries(
aggregations.timeseries.buckets
),
},
};
}

View file

@ -1,121 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { AggregationOptionsByType } from '../../../../../../../src/core/types/elasticsearch';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import { ProcessorEvent } from '../../../../common/processor_event';
import { getDurationForPercentile } from './get_duration_for_percentile';
import { processSignificantTermAggs } from '../process_significant_term_aggs';
import { getLatencyDistribution } from './get_latency_distribution';
import { withApmSpan } from '../../../utils/with_apm_span';
import { CorrelationsOptions, getCorrelationsFilters } from '../get_filters';
import { Setup } from '../../helpers/setup_request';
interface Options extends CorrelationsOptions {
durationPercentile: number;
fieldNames: string[];
maxLatency: number;
distributionInterval: number;
setup: Setup;
}
export async function getCorrelationsForSlowTransactions(options: Options) {
return withApmSpan('get_correlations_for_slow_transactions', async () => {
const {
durationPercentile,
fieldNames,
setup,
maxLatency,
distributionInterval,
} = options;
const { apmEventClient } = setup;
const filters = getCorrelationsFilters(options);
const durationForPercentile = await getDurationForPercentile({
durationPercentile,
filters,
setup,
});
if (!durationForPercentile) {
return { significantTerms: [] };
}
const params = {
apm: { events: [ProcessorEvent.transaction] },
body: {
size: 0,
query: {
bool: {
// foreground filters
filter: filters,
must: {
function_score: {
query: {
range: {
[TRANSACTION_DURATION]: { gte: durationForPercentile },
},
},
script_score: {
script: {
source: `Math.log(2 + doc['${TRANSACTION_DURATION}'].value)`,
},
},
},
},
},
},
aggs: fieldNames.reduce((acc, fieldName) => {
return {
...acc,
[fieldName]: {
significant_terms: {
size: 10,
field: fieldName,
background_filter: {
bool: {
filter: [
...filters,
{
range: {
[TRANSACTION_DURATION]: {
lt: durationForPercentile,
},
},
},
],
},
},
},
},
};
}, {} as Record<string, { significant_terms: AggregationOptionsByType['significant_terms'] }>),
},
};
const response = await apmEventClient.search(
'get_significant_terms',
params
);
if (!response.aggregations) {
return { significantTerms: [] };
}
const topSigTerms = processSignificantTermAggs({
sigTermAggs: response.aggregations,
});
const significantTerms = await getLatencyDistribution({
setup,
filters,
topSigTerms,
maxLatency,
distributionInterval,
});
return { significantTerms };
});
}

View file

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ESFilter } from '../../../../../../../src/core/types/elasticsearch';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import { ProcessorEvent } from '../../../../common/processor_event';
import { Setup } from '../../helpers/setup_request';
export async function getDurationForPercentile({
durationPercentile,
filters,
setup,
}: {
durationPercentile: number;
filters: ESFilter[];
setup: Setup;
}) {
const { apmEventClient } = setup;
const res = await apmEventClient.search('get_duration_for_percentiles', {
apm: {
events: [ProcessorEvent.transaction],
},
body: {
size: 0,
query: {
bool: { filter: filters },
},
aggs: {
percentile: {
percentiles: {
field: TRANSACTION_DURATION,
percents: [durationPercentile],
},
},
},
},
});
const duration = Object.values(res.aggregations?.percentile.values || {})[0];
return duration || 0;
}

View file

@ -1,98 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { AggregationOptionsByType } from '../../../../../../../src/core/types/elasticsearch';
import { ESFilter } from '../../../../../../../src/core/types/elasticsearch';
import { ProcessorEvent } from '../../../../common/processor_event';
import { Setup } from '../../helpers/setup_request';
import { TopSigTerm } from '../process_significant_term_aggs';
import {
getDistributionAggregation,
trimBuckets,
} from './get_overall_latency_distribution';
export async function getLatencyDistribution({
setup,
filters,
topSigTerms,
maxLatency,
distributionInterval,
}: {
setup: Setup;
filters: ESFilter[];
topSigTerms: TopSigTerm[];
maxLatency: number;
distributionInterval: number;
}) {
const { apmEventClient } = setup;
const distributionAgg = getDistributionAggregation(
maxLatency,
distributionInterval
);
const perTermAggs = topSigTerms.reduce(
(acc, term, index) => {
acc[`term_${index}`] = {
filter: { term: { [term.fieldName]: term.fieldValue } },
aggs: {
distribution: distributionAgg,
},
};
return acc;
},
{} as Record<
string,
{
filter: AggregationOptionsByType['filter'];
aggs: {
distribution: typeof distributionAgg;
};
}
>
);
const params = {
// TODO: add support for metrics
apm: { events: [ProcessorEvent.transaction] },
body: {
size: 0,
query: { bool: { filter: filters } },
aggs: perTermAggs,
},
};
const response = await apmEventClient.search(
'get_latency_distribution',
params
);
type Agg = NonNullable<typeof response.aggregations>;
if (!response.aggregations) {
return [];
}
return topSigTerms.map((topSig, index) => {
// ignore the typescript error since existence of response.aggregations is already checked:
// @ts-expect-error
const agg = response.aggregations[`term_${index}`] as Agg[string];
const total = agg.distribution.doc_count;
const buckets = trimBuckets(
agg.distribution.dist_filtered_by_latency.buckets
);
return {
...topSig,
distribution: buckets.map((bucket) => ({
x: bucket.key,
y: (bucket.doc_count / total) * 100,
})),
};
});
}

View file

@ -1,58 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ESFilter } from '../../../../../../../src/core/types/elasticsearch';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import { ProcessorEvent } from '../../../../common/processor_event';
import { Setup } from '../../helpers/setup_request';
import { TopSigTerm } from '../process_significant_term_aggs';
export async function getMaxLatency({
setup,
filters,
topSigTerms = [],
}: {
setup: Setup;
filters: ESFilter[];
topSigTerms?: TopSigTerm[];
}) {
const { apmEventClient } = setup;
const params = {
// TODO: add support for metrics
apm: { events: [ProcessorEvent.transaction] },
body: {
size: 0,
query: {
bool: {
filter: filters,
...(topSigTerms.length
? {
// only include docs containing the significant terms
should: topSigTerms.map((term) => ({
term: { [term.fieldName]: term.fieldValue },
})),
minimum_should_match: 1,
}
: null),
},
},
aggs: {
// TODO: add support for metrics
// max_latency: { max: { field: TRANSACTION_DURATION } },
max_latency: {
percentiles: { field: TRANSACTION_DURATION, percents: [99] },
},
},
},
};
const response = await apmEventClient.search('get_max_latency', params);
// return response.aggregations?.max_latency.value;
return Object.values(response.aggregations?.max_latency.values ?? {})[0];
}

View file

@ -1,110 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { dropRightWhile } from 'lodash';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import { ProcessorEvent } from '../../../../common/processor_event';
import { getMaxLatency } from './get_max_latency';
import { withApmSpan } from '../../../utils/with_apm_span';
import { CorrelationsOptions, getCorrelationsFilters } from '../get_filters';
import { Setup } from '../../helpers/setup_request';
export const INTERVAL_BUCKETS = 15;
interface Options extends CorrelationsOptions {
setup: Setup;
}
export function getDistributionAggregation(
maxLatency: number,
distributionInterval: number
) {
return {
filter: { range: { [TRANSACTION_DURATION]: { lte: maxLatency } } },
aggs: {
dist_filtered_by_latency: {
histogram: {
// TODO: add support for metrics
field: TRANSACTION_DURATION,
interval: distributionInterval,
min_doc_count: 0,
extended_bounds: {
min: 0,
max: maxLatency,
},
},
},
},
};
}
export async function getOverallLatencyDistribution(options: Options) {
const { setup } = options;
const filters = getCorrelationsFilters(options);
return withApmSpan('get_overall_latency_distribution', async () => {
const { apmEventClient } = setup;
const maxLatency = await getMaxLatency({ setup, filters });
if (!maxLatency) {
return {
maxLatency: null,
distributionInterval: null,
overallDistribution: null,
};
}
const distributionInterval = Math.floor(maxLatency / INTERVAL_BUCKETS);
const params = {
// TODO: add support for metrics
apm: { events: [ProcessorEvent.transaction] },
body: {
size: 0,
query: { bool: { filter: filters } },
aggs: {
// overall distribution agg
distribution: getDistributionAggregation(
maxLatency,
distributionInterval
),
},
},
};
const response = await apmEventClient.search(
'get_terms_distribution',
params
);
if (!response.aggregations) {
return {
maxLatency,
distributionInterval,
overallDistribution: null,
};
}
const { distribution } = response.aggregations;
const total = distribution.doc_count;
const buckets = trimBuckets(distribution.dist_filtered_by_latency.buckets);
return {
maxLatency,
distributionInterval,
overallDistribution: buckets.map((bucket) => ({
x: bucket.key,
y: (bucket.doc_count / total) * 100,
})),
};
});
}
// remove trailing buckets that are empty and out of bounds of the desired number of buckets
export function trimBuckets<T extends { doc_count: number }>(buckets: T[]) {
return dropRightWhile(
buckets,
(bucket, index) => bucket.doc_count === 0 && index > INTERVAL_BUCKETS - 1
);
}

View file

@ -1,80 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { orderBy } from 'lodash';
import {
AggregationOptionsByType,
AggregationResultOf,
} from '../../../../../../src/core/types/elasticsearch';
export interface TopSigTerm {
fieldName: string;
fieldValue: string | number;
score: number;
impact: number;
fieldCount: number;
valueCount: number;
}
type SigTermAgg = AggregationResultOf<
{ significant_terms: AggregationOptionsByType['significant_terms'] },
{}
>;
function getMaxImpactScore(scores: number[]) {
if (scores.length === 0) {
return 0;
}
const sortedScores = scores.sort((a, b) => b - a);
const maxScore = sortedScores[0];
// calculate median
const halfSize = scores.length / 2;
const medianIndex = Math.floor(halfSize);
const medianScore =
medianIndex < halfSize
? sortedScores[medianIndex]
: (sortedScores[medianIndex - 1] + sortedScores[medianIndex]) / 2;
return Math.max(maxScore, medianScore * 2);
}
export function processSignificantTermAggs({
sigTermAggs,
}: {
sigTermAggs: Record<string, SigTermAgg | object>;
}) {
const significantTerms = Object.entries(sigTermAggs)
// filter entries with buckets, i.e. Significant terms aggs
.filter((entry): entry is [string, SigTermAgg] => {
const [, agg] = entry;
return 'buckets' in agg;
})
.flatMap(([fieldName, agg]) => {
return agg.buckets.map((bucket) => ({
fieldName,
fieldValue: bucket.key,
fieldCount: agg.doc_count,
valueCount: bucket.doc_count,
score: bucket.score,
}));
});
const maxImpactScore = getMaxImpactScore(
significantTerms.map(({ score }) => score)
);
// get top 10 terms ordered by score
const topSigTerms = orderBy(significantTerms, 'score', 'desc')
.map((significantTerm) => ({
...significantTerm,
impact: significantTerm.score / maxImpactScore,
}))
.slice(0, 10);
return topSigTerms;
}

View file

@ -5,16 +5,16 @@
* 2.0.
*/
import { ESFilter } from '../../../../../../src/core/types/elasticsearch';
import { rangeQuery, kqlQuery } from '../../../../observability/server';
import { environmentQuery } from '../../../common/utils/environment_query';
import { ESFilter } from '../../../../../../../src/core/types/elasticsearch';
import { rangeQuery, kqlQuery } from '../../../../../observability/server';
import { environmentQuery } from '../../../../common/utils/environment_query';
import {
SERVICE_NAME,
TRANSACTION_NAME,
TRANSACTION_TYPE,
PROCESSOR_EVENT,
} from '../../../common/elasticsearch_fieldnames';
import { ProcessorEvent } from '../../../common/processor_event';
} from '../../../../common/elasticsearch_fieldnames';
import { ProcessorEvent } from '../../../../common/processor_event';
export interface CorrelationsOptions {
environment: string;

View file

@ -15,7 +15,7 @@ import type {
SearchStrategyParams,
} from '../../../../common/search_strategies/types';
import { rangeRt } from '../../../routes/default_api_types';
import { getCorrelationsFilters } from '../../correlations/get_filters';
import { getCorrelationsFilters } from './get_filters';
export const getTermsQuery = ({ fieldName, fieldValue }: FieldValuePair) => {
return { term: { [fieldName]: fieldValue } };