[Metrics UI] change composite.size of snapshot ES query to improve speed (#95994)

* change composite.size of snapshot query to improve speed

* use value from kibana config to set compositeSize, clean up unused config properties

* fix test

* change config name

* fix reference
This commit is contained in:
Sandra Gonzales 2021-04-08 13:36:15 -04:00 committed by GitHub
parent d904f8d1bb
commit f56a646b8e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 237 additions and 114 deletions

View file

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Server } from '@hapi/hapi';
import JoiNamespace from 'joi';
export interface KbnServer extends Server {
usage: any;
}
// NP_TODO: this is only used in the root index file AFAICT, can remove after migrating to NP
export const getConfigSchema = (Joi: typeof JoiNamespace) => {
const InfraDefaultSourceConfigSchema = Joi.object({
metricAlias: Joi.string(),
logAlias: Joi.string(),
fields: Joi.object({
container: Joi.string(),
host: Joi.string(),
message: Joi.array().items(Joi.string()).single(),
pod: Joi.string(),
tiebreaker: Joi.string(),
timestamp: Joi.string(),
}),
});
// NP_TODO: make sure this is all represented in the NP config schema
const InfraRootConfigSchema = Joi.object({
enabled: Joi.boolean().default(true),
query: Joi.object({
partitionSize: Joi.number(),
partitionFactor: Joi.number(),
}).default(),
sources: Joi.object()
.keys({
default: InfraDefaultSourceConfigSchema,
})
.default(),
}).default();
return InfraRootConfigSchema;
};

View file

@ -33,15 +33,25 @@ type ConditionResult = InventoryMetricConditions & {
isError: boolean;
};
export const evaluateCondition = async (
condition: InventoryMetricConditions,
nodeType: InventoryItemType,
source: InfraSource,
logQueryFields: LogQueryFields,
esClient: ElasticsearchClient,
filterQuery?: string,
lookbackSize?: number
): Promise<Record<string, ConditionResult>> => {
export const evaluateCondition = async ({
condition,
nodeType,
source,
logQueryFields,
esClient,
compositeSize,
filterQuery,
lookbackSize,
}: {
condition: InventoryMetricConditions;
nodeType: InventoryItemType;
source: InfraSource;
logQueryFields: LogQueryFields;
esClient: ElasticsearchClient;
compositeSize: number;
filterQuery?: string;
lookbackSize?: number;
}): Promise<Record<string, ConditionResult>> => {
const { comparator, warningComparator, metric, customMetric } = condition;
let { threshold, warningThreshold } = condition;
@ -61,6 +71,7 @@ export const evaluateCondition = async (
timerange,
source,
logQueryFields,
compositeSize,
filterQuery,
customMetric
);
@ -105,6 +116,7 @@ const getData = async (
timerange: InfraTimerangeInput,
source: InfraSource,
logQueryFields: LogQueryFields,
compositeSize: number,
filterQuery?: string,
customMetric?: SnapshotCustomMetricInput
) => {
@ -128,7 +140,13 @@ const getData = async (
includeTimeseries: Boolean(timerange.lookbackSize),
};
try {
const { nodes } = await getNodes(client, snapshotRequest, source, logQueryFields);
const { nodes } = await getNodes(
client,
snapshotRequest,
source,
logQueryFields,
compositeSize
);
if (!nodes.length) return { [UNGROUPED_FACTORY_KEY]: null }; // No Data state

View file

@ -73,16 +73,19 @@ export const createInventoryMetricThresholdExecutor = (libs: InfraBackendLibs) =
services.savedObjectsClient
);
const compositeSize = libs.configuration.inventory.compositeSize;
const results = await Promise.all(
criteria.map((c) =>
evaluateCondition(
c,
criteria.map((condition) =>
evaluateCondition({
condition,
nodeType,
source,
logQueryFields,
services.scopedClusterClient.asCurrentUser,
filterQuery
)
esClient: services.scopedClusterClient.asCurrentUser,
compositeSize,
filterQuery,
})
)
);

View file

@ -32,6 +32,7 @@ interface PreviewInventoryMetricThresholdAlertParams {
params: InventoryMetricThresholdParams;
source: InfraSource;
logQueryFields: LogQueryFields;
compositeSize: number;
lookback: Unit;
alertInterval: string;
alertThrottle: string;
@ -46,6 +47,7 @@ export const previewInventoryMetricThresholdAlert: (
params,
source,
logQueryFields,
compositeSize,
lookback,
alertInterval,
alertThrottle,
@ -70,8 +72,17 @@ export const previewInventoryMetricThresholdAlert: (
try {
const results = await Promise.all(
criteria.map((c) =>
evaluateCondition(c, nodeType, source, logQueryFields, esClient, filterQuery, lookbackSize)
criteria.map((condition) =>
evaluateCondition({
condition,
nodeType,
source,
logQueryFields,
esClient,
compositeSize,
filterQuery,
lookbackSize,
})
)
);

View file

@ -424,9 +424,8 @@ describe('The metric threshold alert type', () => {
const createMockStaticConfiguration = (sources: any) => ({
enabled: true,
query: {
partitionSize: 1,
partitionFactor: 1,
inventory: {
compositeSize: 2000,
},
sources,
});

View file

@ -5,7 +5,6 @@
* 2.0.
*/
import { InfraSourceConfiguration } from '../../common/source_configuration/source_configuration';
import { InfraFieldsDomain } from './domains/fields_domain';
import { InfraLogEntriesDomain } from './domains/log_entries_domain';
import { InfraMetricsDomain } from './domains/metrics_domain';
@ -28,14 +27,3 @@ export interface InfraBackendLibs extends InfraDomainLibs {
sourceStatus: InfraSourceStatus;
getLogQueryFields: GetLogQueryFields;
}
export interface InfraConfiguration {
enabled: boolean;
query: {
partitionSize: number;
partitionFactor: number;
};
sources: {
default: InfraSourceConfiguration;
};
}

View file

@ -134,9 +134,8 @@ describe('the InfraSources lib', () => {
const createMockStaticConfiguration = (sources: any) => ({
enabled: true,
query: {
partitionSize: 1,
partitionFactor: 1,
inventory: {
compositeSize: 2000,
},
sources,
});

View file

@ -35,9 +35,8 @@ import { createGetLogQueryFields } from './services/log_queries/get_log_query_fi
export const config = {
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
query: schema.object({
partitionSize: schema.number({ defaultValue: 75 }),
partitionFactor: schema.number({ defaultValue: 1.2 }),
inventory: schema.object({
compositeSize: schema.number({ defaultValue: 2000 }),
}),
sources: schema.maybe(
schema.object({

View file

@ -29,6 +29,7 @@ export const initAlertPreviewRoute = ({
framework,
sources,
getLogQueryFields,
configuration,
}: InfraBackendLibs) => {
framework.registerRoute(
{
@ -56,6 +57,8 @@ export const initAlertPreviewRoute = ({
sourceId || 'default'
);
const compositeSize = configuration.inventory.compositeSize;
try {
switch (alertType) {
case METRIC_THRESHOLD_ALERT_TYPE_ID: {
@ -96,6 +99,7 @@ export const initAlertPreviewRoute = ({
lookback,
source,
logQueryFields,
compositeSize,
alertInterval,
alertThrottle,
alertNotifyWhen,

View file

@ -40,7 +40,7 @@ export const initSnapshotRoute = (libs: InfraBackendLibs) => {
requestContext.core.savedObjects.client,
snapshotRequest.sourceId
);
const compositeSize = libs.configuration.inventory.compositeSize;
const logQueryFields = await libs.getLogQueryFields(
snapshotRequest.sourceId,
requestContext.core.savedObjects.client
@ -49,7 +49,13 @@ export const initSnapshotRoute = (libs: InfraBackendLibs) => {
UsageCollector.countNode(snapshotRequest.nodeType);
const client = createSearchClient(requestContext, framework);
const snapshotResponse = await getNodes(client, snapshotRequest, source, logQueryFields);
const snapshotResponse = await getNodes(
client,
snapshotRequest,
source,
logQueryFields,
compositeSize
);
return response.ok({
body: SnapshotNodeResponseRT.encode(snapshotResponse),

View file

@ -19,18 +19,26 @@ export interface SourceOverrides {
timestamp: string;
}
const transformAndQueryData = async (
client: ESSearchClient,
snapshotRequest: SnapshotRequest,
source: InfraSource,
sourceOverrides?: SourceOverrides
) => {
const metricsApiRequest = await transformRequestToMetricsAPIRequest(
const transformAndQueryData = async ({
client,
snapshotRequest,
source,
compositeSize,
sourceOverrides,
}: {
client: ESSearchClient;
snapshotRequest: SnapshotRequest;
source: InfraSource;
compositeSize: number;
sourceOverrides?: SourceOverrides;
}) => {
const metricsApiRequest = await transformRequestToMetricsAPIRequest({
client,
source,
snapshotRequest,
sourceOverrides
);
compositeSize,
sourceOverrides,
});
const metricsApiResponse = await queryAllData(client, metricsApiRequest);
const snapshotResponse = transformMetricsApiResponseToSnapshotResponse(
metricsApiRequest,
@ -45,30 +53,39 @@ export const getNodes = async (
client: ESSearchClient,
snapshotRequest: SnapshotRequest,
source: InfraSource,
logQueryFields: LogQueryFields
logQueryFields: LogQueryFields,
compositeSize: number
) => {
let nodes;
if (snapshotRequest.metrics.find((metric) => metric.type === 'logRate')) {
// *Only* the log rate metric has been requested
if (snapshotRequest.metrics.length === 1) {
nodes = await transformAndQueryData(client, snapshotRequest, source, logQueryFields);
nodes = await transformAndQueryData({
client,
snapshotRequest,
source,
compositeSize,
sourceOverrides: logQueryFields,
});
} else {
// A scenario whereby a single host might be shipping metrics and logs.
const metricsWithoutLogsMetrics = snapshotRequest.metrics.filter(
(metric) => metric.type !== 'logRate'
);
const nodesWithoutLogsMetrics = await transformAndQueryData(
const nodesWithoutLogsMetrics = await transformAndQueryData({
client,
{ ...snapshotRequest, metrics: metricsWithoutLogsMetrics },
source
);
const logRateNodes = await transformAndQueryData(
client,
{ ...snapshotRequest, metrics: [{ type: 'logRate' }] },
snapshotRequest: { ...snapshotRequest, metrics: metricsWithoutLogsMetrics },
source,
logQueryFields
);
compositeSize,
});
const logRateNodes = await transformAndQueryData({
client,
snapshotRequest: { ...snapshotRequest, metrics: [{ type: 'logRate' }] },
source,
compositeSize,
sourceOverrides: logQueryFields,
});
// Merge nodes where possible - e.g. a single host is shipping metrics and logs
const mergedNodes = nodesWithoutLogsMetrics.nodes.map((node) => {
const logRateNode = logRateNodes.nodes.find(
@ -91,7 +108,7 @@ export const getNodes = async (
};
}
} else {
nodes = await transformAndQueryData(client, snapshotRequest, source);
nodes = await transformAndQueryData({ client, snapshotRequest, source, compositeSize });
}
return nodes;

View file

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { transformRequestToMetricsAPIRequest } from './transform_request_to_metrics_api_request';
import { ESSearchClient } from '../../../lib/metrics/types';
import { InfraSource } from '../../../lib/sources';
import { SnapshotRequest } from '../../../../common/http_api';
jest.mock('./create_timerange_with_interval', () => {
return {
createTimeRangeWithInterval: () => ({
interval: '60s',
from: 1605705900000,
to: 1605706200000,
}),
};
});
describe('transformRequestToMetricsAPIRequest', () => {
test('returns a MetricsApiRequest given parameters', async () => {
const compositeSize = 3000;
const result = await transformRequestToMetricsAPIRequest({
client: {} as ESSearchClient,
source,
snapshotRequest,
compositeSize,
});
expect(result).toEqual(metricsApiRequest);
});
});
const source: InfraSource = {
id: 'default',
version: 'WzkzNjk5LDVd',
updatedAt: 1617384456384,
origin: 'stored',
configuration: {
name: 'Default',
description: '',
metricAlias: 'metrics-*,metricbeat-*',
logAlias: 'logs-*,filebeat-*,kibana_sample_data_logs*',
fields: {
container: 'container.id',
host: 'host.name',
message: ['message', '@message'],
pod: 'kubernetes.pod.uid',
tiebreaker: '_doc',
timestamp: '@timestamp',
},
inventoryDefaultView: '0',
metricsExplorerDefaultView: '0',
logColumns: [
{ timestampColumn: { id: '5e7f964a-be8a-40d8-88d2-fbcfbdca0e2f' } },
{ fieldColumn: { id: ' eb9777a8-fcd3-420e-ba7d-172fff6da7a2', field: 'event.dataset' } },
{ messageColumn: { id: 'b645d6da-824b-4723-9a2a-e8cece1645c0' } },
{ fieldColumn: { id: '906175e0-a293-42b2-929f-87a203e6fbec', field: 'agent.name' } },
],
anomalyThreshold: 50,
},
};
const snapshotRequest: SnapshotRequest = {
metrics: [{ type: 'cpu' }],
groupBy: [],
nodeType: 'pod',
timerange: { interval: '1m', to: 1605706200000, from: 1605705000000, lookbackSize: 5 },
filterQuery: '',
sourceId: 'default',
accountId: '',
region: '',
includeTimeseries: true,
};
const metricsApiRequest = {
indexPattern: 'metrics-*,metricbeat-*',
timerange: { field: '@timestamp', from: 1605705900000, to: 1605706200000, interval: '60s' },
metrics: [
{
id: 'cpu',
aggregations: {
cpu_with_limit: { avg: { field: 'kubernetes.pod.cpu.usage.limit.pct' } },
cpu_without_limit: { avg: { field: 'kubernetes.pod.cpu.usage.node.pct' } },
cpu: {
bucket_script: {
buckets_path: { with_limit: 'cpu_with_limit', without_limit: 'cpu_without_limit' },
script: {
source: 'params.with_limit > 0.0 ? params.with_limit : params.without_limit',
lang: 'painless',
},
gap_policy: 'skip',
},
},
},
},
{
id: '__metadata__',
aggregations: {
__metadata__: {
top_metrics: {
metrics: [{ field: 'kubernetes.pod.name' }, { field: 'kubernetes.pod.ip' }],
size: 1,
sort: { '@timestamp': 'desc' },
},
},
},
},
],
limit: 3000,
alignDataToEnd: true,
groupBy: ['kubernetes.pod.uid'],
};

View file

@ -15,12 +15,19 @@ import { transformSnapshotMetricsToMetricsAPIMetrics } from './transform_snapsho
import { META_KEY } from './constants';
import { SourceOverrides } from './get_nodes';
export const transformRequestToMetricsAPIRequest = async (
client: ESSearchClient,
source: InfraSource,
snapshotRequest: SnapshotRequest,
sourceOverrides?: SourceOverrides
): Promise<MetricsAPIRequest> => {
export const transformRequestToMetricsAPIRequest = async ({
client,
source,
snapshotRequest,
compositeSize,
sourceOverrides,
}: {
client: ESSearchClient;
source: InfraSource;
snapshotRequest: SnapshotRequest;
compositeSize: number;
sourceOverrides?: SourceOverrides;
}): Promise<MetricsAPIRequest> => {
const timeRangeWithIntervalApplied = await createTimeRangeWithInterval(client, {
...snapshotRequest,
filterQuery: parseFilterQuery(snapshotRequest.filterQuery),
@ -36,7 +43,9 @@ export const transformRequestToMetricsAPIRequest = async (
interval: timeRangeWithIntervalApplied.interval,
},
metrics: transformSnapshotMetricsToMetricsAPIMetrics(snapshotRequest),
limit: snapshotRequest.overrideCompositeSize ? snapshotRequest.overrideCompositeSize : 5,
limit: snapshotRequest.overrideCompositeSize
? snapshotRequest.overrideCompositeSize
: compositeSize,
alignDataToEnd: true,
};